feat: 실시간 용어설명 조회 기능 추가

This commit is contained in:
djeon
2025-10-29 21:50:16 +09:00
parent a84449e88d
commit 8bb91f646f
21 changed files with 1038 additions and 2759 deletions
+96 -4
View File
@@ -5,6 +5,7 @@ Azure Event Hub Consumer 서비스
import asyncio
import json
import logging
import re
from typing import Dict, Any, Optional, Union, List
from datetime import datetime
@@ -112,7 +113,13 @@ class EventHubConsumer:
try:
# 이벤트 데이터 파싱
event_body = event.body_as_str()
event_data = json.loads(event_body)
logger.debug(f"원본 이벤트 데이터 (처음 200자): {event_body[:200]}")
# Java LocalDateTime 배열을 문자열로 변환하여 JSON 파싱 가능하게 변환
converted_body = self._convert_java_datetime_arrays(event_body)
logger.debug(f"변환된 이벤트 데이터 (처음 200자): {converted_body[:200]}")
event_data = json.loads(converted_body)
event_type = event_data.get('eventType', 'unknown')
logger.info(f"이벤트 수신: {event_type}")
@@ -132,7 +139,7 @@ class EventHubConsumer:
except json.JSONDecodeError as e:
logger.error(f"이벤트 파싱 실패: {str(e)}")
except Exception as e:
logger.error(f"이벤트 처리 실패: {str(e)}")
logger.error(f"이벤트 처리 실패: {str(e)}", exc_info=True)
async def _on_error(self, partition_context, error):
"""
@@ -144,6 +151,46 @@ class EventHubConsumer:
"""
logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}")
def _convert_java_datetime_arrays(self, json_str: str) -> str:
"""
JSON 문자열 내의 Java LocalDateTime 배열을 ISO 8601 문자열로 변환
Java의 Jackson이 LocalDateTime을 배열 형식으로 직렬화하는 것을
Python이 파싱 가능한 문자열 형식으로 변환
Args:
json_str: 원본 JSON 문자열
Returns:
변환된 JSON 문자열
Examples:
>>> _convert_java_datetime_arrays('{"timestamp":[2025,10,29,10,25,37,579030000]}')
'{"timestamp":"2025-10-29T10:25:37.579030"}'
"""
# Java LocalDateTime 배열 패턴: [년,월,일,시,분,초,나노초]
# 나노초는 항상 7개 요소로 전송됨
pattern = r'\[(\d{4}),(\d{1,2}),(\d{1,2}),(\d{1,2}),(\d{1,2}),(\d{1,2}),(\d+)\]'
def replace_datetime(match):
year = int(match.group(1))
month = int(match.group(2))
day = int(match.group(3))
hour = int(match.group(4))
minute = int(match.group(5))
second = int(match.group(6))
nanosecond = int(match.group(7))
# 나노초를 마이크로초로 변환
microsecond = nanosecond // 1000
# ISO 8601 형식 문자열 생성
dt = datetime(year, month, day, hour, minute, second, microsecond)
return f'"{dt.isoformat()}"'
# 모든 datetime 배열을 문자열로 변환
return re.sub(pattern, replace_datetime, json_str)
async def _process_segment_event(self, event_data: Dict[str, Any]):
"""
세그먼트 생성 이벤트 처리 - 용어검색 실행
@@ -162,6 +209,9 @@ class EventHubConsumer:
text = event_data.get("text", "")
meeting_id = event_data.get("meetingId")
# 이벤트 데이터 구조 로깅 (디버깅용)
logger.debug(f"이벤트 데이터 키: {list(event_data.keys())}")
if not text:
logger.warning(f"세그먼트 {segment_id}에 텍스트가 없습니다")
return
@@ -242,8 +292,50 @@ class EventHubConsumer:
else:
logger.info(f"세그먼트 {segment_id}에서 매칭되는 용어를 찾지 못했습니다")
# 7. 선택적: 검색 결과를 별도 테이블에 저장하거나 Event Hub로 발행
# TODO: 필요시 검색 결과를 저장하거나 downstream 서비스로 전달
# 7. SSE를 통해 결과 전송
# Event Hub 메시지에서 sessionId 추출 (여러 필드 확인)
session_id = event_data.get("sessionId") or event_data.get("session_id") or event_data.get("meetingId") or meeting_id
logger.info(f"SSE 전송 시도: sessionId={session_id}, meetingId={meeting_id}")
if session_id:
from ..services.sse_manager import sse_manager
# 용어 정보를 직렬화 가능한 형태로 변환
terms_data = []
for result in results:
term = result["term"]
terms_data.append({
"term_id": term.term_id,
"term_name": term.term_name,
"definition": term.definition,
"category": term.category,
"synonyms": term.synonyms,
"related_terms": term.related_terms,
"context": term.context,
"relevance_score": result["relevance_score"],
"match_type": result.get("match_type", "unknown")
})
# SSE로 전송
success = await sse_manager.send_to_session(
session_id=session_id,
data={
"segment_id": segment_id,
"meeting_id": meeting_id,
"text": text[:100], # 텍스트 일부만 전송
"terms": terms_data,
"total_count": len(terms_data)
},
event_type="term_result"
)
if success:
logger.info(f"용어 검색 결과를 SSE로 전송 완료: {session_id}")
else:
logger.warning(f"SSE 전송 실패 (세션 미연결): {session_id}")
else:
logger.warning("이벤트 데이터에 sessionId가 없어 SSE 전송을 건너뜁니다")
except Exception as e:
logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True)
+183
View File
@@ -0,0 +1,183 @@
"""
SSE(Server-Sent Events) 연결 관리자
"""
import asyncio
import logging
from typing import Dict, Any, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class SSEManager:
"""SSE 연결 관리자"""
def __init__(self, max_connections: int = 1000, heartbeat_interval: int = 30):
"""
초기화
Args:
max_connections: 최대 동시 연결 수
heartbeat_interval: Heartbeat 전송 간격 (초)
"""
self._connections: Dict[str, asyncio.Queue] = {}
self._last_activity: Dict[str, datetime] = {}
self.max_connections = max_connections
self.heartbeat_interval = heartbeat_interval
self._cleanup_task: Optional[asyncio.Task] = None
async def start(self):
"""SSE Manager 시작 - 정리 태스크 실행"""
self._cleanup_task = asyncio.create_task(self._cleanup_inactive_connections())
logger.info("SSE Manager 시작됨")
async def stop(self):
"""SSE Manager 중지"""
if self._cleanup_task:
self._cleanup_task.cancel()
self._connections.clear()
self._last_activity.clear()
logger.info("SSE Manager 중지됨")
def register(self, session_id: str) -> asyncio.Queue:
"""
새 SSE 연결 등록
Args:
session_id: 세션 ID
Returns:
메시지 큐
Raises:
ValueError: 최대 연결 수 초과 시
"""
if len(self._connections) >= self.max_connections:
raise ValueError(f"최대 연결 수({self.max_connections})를 초과했습니다")
if session_id in self._connections:
logger.warning(f"세션 {session_id}가 이미 연결되어 있습니다")
return self._connections[session_id]
queue = asyncio.Queue(maxsize=100)
self._connections[session_id] = queue
self._last_activity[session_id] = datetime.now()
logger.info(f"SSE 연결 등록: {session_id} (전체 연결 수: {len(self._connections)})")
return queue
def unregister(self, session_id: str):
"""
SSE 연결 제거
Args:
session_id: 세션 ID
"""
if session_id in self._connections:
del self._connections[session_id]
del self._last_activity[session_id]
logger.info(f"SSE 연결 제거: {session_id} (전체 연결 수: {len(self._connections)})")
async def send_to_session(self, session_id: str, data: Dict[str, Any], event_type: str = "message") -> bool:
"""
특정 세션에 데이터 전송
Args:
session_id: 세션 ID
data: 전송할 데이터
event_type: 이벤트 타입
Returns:
전송 성공 여부
"""
if session_id not in self._connections:
logger.warning(f"세션 {session_id}가 연결되어 있지 않습니다")
return False
try:
message = {
"event": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
}
queue = self._connections[session_id]
# 큐가 가득 차면 오래된 메시지 제거
if queue.full():
try:
queue.get_nowait()
logger.warning(f"세션 {session_id} 큐가 가득 차서 오래된 메시지를 제거했습니다")
except asyncio.QueueEmpty:
pass
await queue.put(message)
self._last_activity[session_id] = datetime.now()
logger.debug(f"메시지 전송 성공: {session_id} (이벤트: {event_type})")
return True
except Exception as e:
logger.error(f"메시지 전송 실패: {session_id}, 에러: {str(e)}")
return False
async def send_heartbeat(self, session_id: str) -> bool:
"""
Heartbeat 전송
Args:
session_id: 세션 ID
Returns:
전송 성공 여부
"""
return await self.send_to_session(
session_id,
{"type": "heartbeat"},
event_type="heartbeat"
)
def is_connected(self, session_id: str) -> bool:
"""
연결 상태 확인
Args:
session_id: 세션 ID
Returns:
연결 여부
"""
return session_id in self._connections
def get_active_sessions(self) -> list:
"""활성 세션 목록 반환"""
return list(self._connections.keys())
async def _cleanup_inactive_connections(self):
"""비활성 연결 정리 (백그라운드 태스크)"""
timeout_minutes = 30
while True:
try:
await asyncio.sleep(60) # 1분마다 확인
now = datetime.now()
inactive_sessions = []
for session_id, last_time in self._last_activity.items():
elapsed = (now - last_time).total_seconds() / 60
if elapsed > timeout_minutes:
inactive_sessions.append(session_id)
for session_id in inactive_sessions:
logger.info(f"비활성 세션 제거: {session_id} ({timeout_minutes}분 초과)")
self.unregister(session_id)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"연결 정리 중 에러: {str(e)}")
# 전역 SSE Manager 인스턴스
sse_manager = SSEManager()