"""Azure Event Hub 서비스 - STT 텍스트 수신""" import asyncio import logging import json from datetime import datetime from azure.eventhub.aio import EventHubConsumerClient from app.config import get_settings from app.services.redis_service import RedisService logger = logging.getLogger(__name__) settings = get_settings() class EventHubService: """Event Hub 리스너 - STT 텍스트 실시간 수신""" def __init__(self): self.client = None self.redis_service = RedisService() async def start(self): """Event Hub 리스닝 시작""" if not settings.eventhub_connection_string: logger.warning("Event Hub 연결 문자열이 설정되지 않음 - Event Hub 리스너 비활성화") return logger.info("Event Hub 리스너 시작") try: # Redis 연결 await self.redis_service.connect() # Event Hub 클라이언트 생성 self.client = EventHubConsumerClient.from_connection_string( conn_str=settings.eventhub_connection_string, consumer_group=settings.eventhub_consumer_group, eventhub_name=settings.eventhub_name, ) # 이벤트 수신 시작 async with self.client: await self.client.receive( on_event=self.on_event, on_error=self.on_error, starting_position="-1", # 최신 이벤트부터 ) except Exception as e: logger.error(f"Event Hub 리스너 오류: {e}") finally: await self.redis_service.disconnect() async def on_event(self, partition_context, event): """ 이벤트 수신 핸들러 이벤트 형식 (STT Service에서 발행): { "eventType": "TranscriptSegmentReady", "meetingId": "meeting-123", "text": "변환된 텍스트", "timestamp": 1234567890000 } """ try: # 이벤트 원본 데이터 추출 try: # Event Hub 데이터는 bytes 또는 str일 수 있음 if hasattr(event, 'body_as_str'): raw_body = event.body_as_str() elif hasattr(event, 'body'): raw_body = event.body.decode('utf-8') if isinstance(event.body, bytes) else str(event.body) else: logger.error(f"이벤트 타입 미지원: {type(event)}") return logger.info(f"수신한 이벤트 원본 (처음 300자): {raw_body[:300]}") logger.debug(f"이벤트 전체 길이: {len(raw_body)}자") except Exception as extract_error: logger.error(f"이벤트 데이터 추출 실패: {extract_error}", exc_info=True) return # 이벤트 데이터 파싱 try: event_data = json.loads(raw_body) except json.JSONDecodeError as json_error: logger.error(f"JSON 파싱 실패 - 전체 데이터: {raw_body}") logger.error(f"파싱 에러: {json_error}") return event_type = event_data.get("eventType") meeting_id = event_data.get("meetingId") text = event_data.get("text") timestamp_raw = event_data.get("timestamp") # timestamp 변환: LocalDateTime 배열 → Unix timestamp (ms) # Java LocalDateTime은 [year, month, day, hour, minute, second, nano] 형식 if isinstance(timestamp_raw, list) and len(timestamp_raw) >= 3: year, month, day = timestamp_raw[0:3] hour = timestamp_raw[3] if len(timestamp_raw) > 3 else 0 minute = timestamp_raw[4] if len(timestamp_raw) > 4 else 0 second = timestamp_raw[5] if len(timestamp_raw) > 5 else 0 dt = datetime(year, month, day, hour, minute, second) timestamp = int(dt.timestamp() * 1000) # milliseconds else: timestamp = int(timestamp_raw) if timestamp_raw else int(datetime.now().timestamp() * 1000) # SegmentCreated 이벤트 처리 if event_type == "SegmentCreated" and meeting_id and text: logger.info( f"STT 텍스트 수신 - meetingId: {meeting_id}, " f"텍스트 길이: {len(text)}, timestamp: {timestamp}" ) try: # Redis에 텍스트 축적 (슬라이딩 윈도우) await self.redis_service.add_transcript_segment( meeting_id=meeting_id, text=text, timestamp=timestamp ) logger.info(f"✅ Redis 저장 완료 - meetingId: {meeting_id}, timestamp: {timestamp}") except Exception as redis_error: logger.error(f"❌ Redis 저장 실패 - meetingId: {meeting_id}, 오류: {redis_error}", exc_info=True) # MVP 개발: checkpoint 업데이트 제거 (InMemory 모드) # await partition_context.update_checkpoint(event) except Exception as e: logger.error(f"이벤트 처리 오류: {e}", exc_info=True) async def on_error(self, partition_context, error): """에러 핸들러""" logger.error( f"Event Hub 에러 - Partition: {partition_context.partition_id}, " f"Error: {error}" ) async def stop(self): """Event Hub 리스너 종료""" if self.client: await self.client.close() logger.info("Event Hub 리스너 종료") # 백그라운드 태스크로 실행할 함수 async def start_eventhub_listener(): """Event Hub 리스너 백그라운드 실행""" service = EventHubService() await service.start()