mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 11:26:25 +00:00
- AI 서비스: Redis 캐싱 및 EventHub 통합 개선 - STT 서비스: 오디오 버퍼링 및 변환 기능 추가 - 설정 파일 업데이트 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
152 lines
5.9 KiB
Python
152 lines
5.9 KiB
Python
"""Azure Event Hub 서비스 - STT 텍스트 수신"""
|
|
import asyncio
|
|
import logging
|
|
import json
|
|
from datetime import datetime
|
|
from azure.eventhub.aio import EventHubConsumerClient
|
|
|
|
from app.config import get_settings
|
|
from app.services.redis_service import RedisService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
settings = get_settings()
|
|
|
|
|
|
class EventHubService:
|
|
"""Event Hub 리스너 - STT 텍스트 실시간 수신"""
|
|
|
|
def __init__(self):
|
|
self.client = None
|
|
self.redis_service = RedisService()
|
|
|
|
async def start(self):
|
|
"""Event Hub 리스닝 시작"""
|
|
if not settings.eventhub_connection_string:
|
|
logger.warning("Event Hub 연결 문자열이 설정되지 않음 - Event Hub 리스너 비활성화")
|
|
return
|
|
|
|
logger.info("Event Hub 리스너 시작")
|
|
|
|
try:
|
|
# Redis 연결
|
|
await self.redis_service.connect()
|
|
|
|
# Event Hub 클라이언트 생성
|
|
self.client = EventHubConsumerClient.from_connection_string(
|
|
conn_str=settings.eventhub_connection_string,
|
|
consumer_group=settings.eventhub_consumer_group,
|
|
eventhub_name=settings.eventhub_name,
|
|
)
|
|
|
|
# 이벤트 수신 시작
|
|
async with self.client:
|
|
await self.client.receive(
|
|
on_event=self.on_event,
|
|
on_error=self.on_error,
|
|
starting_position="-1", # 최신 이벤트부터
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Event Hub 리스너 오류: {e}")
|
|
finally:
|
|
await self.redis_service.disconnect()
|
|
|
|
async def on_event(self, partition_context, event):
|
|
"""
|
|
이벤트 수신 핸들러
|
|
|
|
이벤트 형식 (STT Service에서 발행):
|
|
{
|
|
"eventType": "TranscriptSegmentReady",
|
|
"meetingId": "meeting-123",
|
|
"text": "변환된 텍스트",
|
|
"timestamp": 1234567890000
|
|
}
|
|
"""
|
|
try:
|
|
# 이벤트 원본 데이터 추출
|
|
try:
|
|
# Event Hub 데이터는 bytes 또는 str일 수 있음
|
|
if hasattr(event, 'body_as_str'):
|
|
raw_body = event.body_as_str()
|
|
elif hasattr(event, 'body'):
|
|
raw_body = event.body.decode('utf-8') if isinstance(event.body, bytes) else str(event.body)
|
|
else:
|
|
logger.error(f"이벤트 타입 미지원: {type(event)}")
|
|
return
|
|
|
|
logger.info(f"수신한 이벤트 원본 (처음 300자): {raw_body[:300]}")
|
|
logger.debug(f"이벤트 전체 길이: {len(raw_body)}자")
|
|
except Exception as extract_error:
|
|
logger.error(f"이벤트 데이터 추출 실패: {extract_error}", exc_info=True)
|
|
return
|
|
|
|
# 이벤트 데이터 파싱
|
|
try:
|
|
event_data = json.loads(raw_body)
|
|
except json.JSONDecodeError as json_error:
|
|
logger.error(f"JSON 파싱 실패 - 전체 데이터: {raw_body}")
|
|
logger.error(f"파싱 에러: {json_error}")
|
|
return
|
|
|
|
event_type = event_data.get("eventType")
|
|
meeting_id = event_data.get("meetingId")
|
|
text = event_data.get("text")
|
|
timestamp_raw = event_data.get("timestamp")
|
|
|
|
# timestamp 변환: LocalDateTime 배열 → Unix timestamp (ms)
|
|
# Java LocalDateTime은 [year, month, day, hour, minute, second, nano] 형식
|
|
if isinstance(timestamp_raw, list) and len(timestamp_raw) >= 3:
|
|
year, month, day = timestamp_raw[0:3]
|
|
hour = timestamp_raw[3] if len(timestamp_raw) > 3 else 0
|
|
minute = timestamp_raw[4] if len(timestamp_raw) > 4 else 0
|
|
second = timestamp_raw[5] if len(timestamp_raw) > 5 else 0
|
|
dt = datetime(year, month, day, hour, minute, second)
|
|
timestamp = int(dt.timestamp() * 1000) # milliseconds
|
|
else:
|
|
timestamp = int(timestamp_raw) if timestamp_raw else int(datetime.now().timestamp() * 1000)
|
|
|
|
# SegmentCreated 이벤트 처리
|
|
if event_type == "SegmentCreated" and meeting_id and text:
|
|
logger.info(
|
|
f"STT 텍스트 수신 - meetingId: {meeting_id}, "
|
|
f"텍스트 길이: {len(text)}, timestamp: {timestamp}"
|
|
)
|
|
|
|
try:
|
|
# Redis에 텍스트 축적 (슬라이딩 윈도우)
|
|
await self.redis_service.add_transcript_segment(
|
|
meeting_id=meeting_id,
|
|
text=text,
|
|
timestamp=timestamp
|
|
)
|
|
logger.info(f"✅ Redis 저장 완료 - meetingId: {meeting_id}, timestamp: {timestamp}")
|
|
except Exception as redis_error:
|
|
logger.error(f"❌ Redis 저장 실패 - meetingId: {meeting_id}, 오류: {redis_error}", exc_info=True)
|
|
|
|
# MVP 개발: checkpoint 업데이트 제거 (InMemory 모드)
|
|
# await partition_context.update_checkpoint(event)
|
|
|
|
except Exception as e:
|
|
logger.error(f"이벤트 처리 오류: {e}", exc_info=True)
|
|
|
|
async def on_error(self, partition_context, error):
|
|
"""에러 핸들러"""
|
|
logger.error(
|
|
f"Event Hub 에러 - Partition: {partition_context.partition_id}, "
|
|
f"Error: {error}"
|
|
)
|
|
|
|
async def stop(self):
|
|
"""Event Hub 리스너 종료"""
|
|
if self.client:
|
|
await self.client.close()
|
|
logger.info("Event Hub 리스너 종료")
|
|
|
|
|
|
# 백그라운드 태스크로 실행할 함수
|
|
async def start_eventhub_listener():
|
|
"""Event Hub 리스너 백그라운드 실행"""
|
|
service = EventHubService()
|
|
await service.start()
|