diff --git a/rag/requirements.txt b/rag/requirements.txt index b5ebe84..b70c488 100644 --- a/rag/requirements.txt +++ b/rag/requirements.txt @@ -51,7 +51,6 @@ structlog==23.2.0 pytest==7.4.3 pytest-asyncio==0.21.1 pytest-cov==4.1.0 -httpx==0.25.2 # Development black==23.12.0 diff --git a/rag/src/api/__pycache__/main.cpython-311.pyc b/rag/src/api/__pycache__/main.cpython-311.pyc index 1cc8c53..fa862d5 100644 Binary files a/rag/src/api/__pycache__/main.cpython-311.pyc and b/rag/src/api/__pycache__/main.cpython-311.pyc differ diff --git a/rag/src/api/__pycache__/minutes_routes.cpython-311.pyc b/rag/src/api/__pycache__/minutes_routes.cpython-311.pyc new file mode 100644 index 0000000..1bd4a97 Binary files /dev/null and b/rag/src/api/__pycache__/minutes_routes.cpython-311.pyc differ diff --git a/rag/src/api/main.py b/rag/src/api/main.py index b5dd99b..ef30ce5 100644 --- a/rag/src/api/main.py +++ b/rag/src/api/main.py @@ -37,6 +37,7 @@ from ..utils.embedding import EmbeddingGenerator from ..utils.text_processor import extract_nouns_as_query from ..utils.redis_cache import RedisCache from . import term_routes +from . import minutes_routes # 로깅 설정 logging.basicConfig( @@ -63,6 +64,7 @@ app.add_middleware( # SSE 라우터 등록 app.include_router(term_routes.router) +app.include_router(minutes_routes.router) # 앱 시작/종료 이벤트 핸들러 @@ -732,4 +734,4 @@ async def get_related_minutes( if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) + uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/rag/src/api/minutes_routes.py b/rag/src/api/minutes_routes.py new file mode 100644 index 0000000..04c9e65 --- /dev/null +++ b/rag/src/api/minutes_routes.py @@ -0,0 +1,93 @@ +""" +연관 회의록 관련 API 엔드포인트 +""" +from fastapi import APIRouter, HTTPException +from sse_starlette.sse import EventSourceResponse +import asyncio +import json +import logging + +from ..services.sse_manager import sse_manager + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/rag/minutes", tags=["minutes"]) + + +@router.get("/stream/{session_id}") +async def stream_related_minutes(session_id: str): + """ + 연관 회의록 검색 결과 SSE 스트림 + + Args: + session_id: 회의 세션 ID + + Returns: + SSE 스트림 + """ + try: + # SSE 연결 등록 + queue = sse_manager.register(session_id) + logger.info(f"연관 회의록 스트림 시작: {session_id}") + + async def event_generator(): + """SSE 이벤트 생성기""" + try: + # 연결 확인 메시지 + yield { + "event": "connected", + "data": json.dumps({"session_id": session_id, "status": "connected"}) + } + + # 메시지 수신 및 전송 + while True: + try: + # Timeout을 두어 주기적으로 heartbeat 전송 + message = await asyncio.wait_for(queue.get(), timeout=30.0) + + yield { + "event": message["event"], + "data": json.dumps(message["data"]) + } + + except asyncio.TimeoutError: + # Heartbeat 전송 + yield { + "event": "heartbeat", + "data": json.dumps({"type": "heartbeat"}) + } + + except asyncio.CancelledError: + logger.info(f"연관 회의록 스트림 취소됨: {session_id}") + except Exception as e: + logger.error(f"이벤트 생성 중 에러: {str(e)}") + finally: + # 연결 정리 + sse_manager.unregister(session_id) + logger.info(f"연관 회의록 스트림 종료: {session_id}") + + return EventSourceResponse(event_generator()) + + except ValueError as e: + raise HTTPException(status_code=429, detail=str(e)) + except Exception as e: + logger.error(f"스트림 시작 실패: {str(e)}") + raise HTTPException(status_code=500, detail="스트림 시작 실패") + + +@router.get("/stream/{session_id}/status") +async def get_stream_status(session_id: str): + """ + 스트림 연결 상태 확인 + + Args: + session_id: 회의 세션 ID + + Returns: + 연결 상태 + """ + is_connected = sse_manager.is_connected(session_id) + + return { + "session_id": session_id, + "connected": is_connected + } diff --git a/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc b/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc index b36bd70..a60373d 100644 Binary files a/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc and b/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc differ diff --git a/rag/src/services/eventhub_consumer.py b/rag/src/services/eventhub_consumer.py index a4c8d7f..2b784ab6 100644 --- a/rag/src/services/eventhub_consumer.py +++ b/rag/src/services/eventhub_consumer.py @@ -130,7 +130,7 @@ class EventHubConsumer: await self._process_minutes_event(event_data) elif event_type == "SegmentCreated": - # 세그먼트 생성 이벤트 - 용어검색 실행 + # 세그먼트 생성 이벤트 - 용어검색 및 연관 회의록 검색 실행 await self._process_segment_event(event_data) # Checkpoint 업데이트 @@ -294,7 +294,8 @@ class EventHubConsumer: # 7. SSE를 통해 결과 전송 # Event Hub 메시지에서 sessionId 추출 (여러 필드 확인) - session_id = event_data.get("sessionId") or event_data.get("session_id") or event_data.get("meetingId") or meeting_id + session_id = event_data.get("sessionId") + # session_id = event_data.get("sessionId") or event_data.get("session_id") or event_data.get("meetingId") or meeting_id logger.info(f"SSE 전송 시도: sessionId={session_id}, meetingId={meeting_id}") @@ -337,9 +338,114 @@ class EventHubConsumer: else: logger.warning("이벤트 데이터에 sessionId가 없어 SSE 전송을 건너뜁니다") + # 8. 연관 회의록 검색 및 SSE 전송 + await self._search_and_send_related_minutes(text, session_id, segment_id, meeting_id) + except Exception as e: logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True) + async def _search_and_send_related_minutes( + self, + text: str, + session_id: Optional[str], + segment_id: str, + meeting_id: str + ): + """ + 연관 회의록 검색 및 SSE 전송 + + Args: + text: 세그먼트 텍스트 + session_id: 세션 ID + segment_id: 세그먼트 ID + meeting_id: 회의 ID + """ + try: + # RAG Minutes DB가 없으면 스킵 + if not self.rag_minutes_db: + logger.debug("RAG Minutes DB가 설정되지 않아 연관 회의록 검색을 스킵합니다") + return + + if not text: + logger.warning(f"세그먼트 {segment_id}에 텍스트가 없어 연관 회의록 검색을 스킵합니다") + return + + logger.info(f"세그먼트 연관 회의록 검색 시작: {segment_id} (회의: {meeting_id})") + logger.info(f"검색 텍스트: {text[:100]}...") + + # 1. 텍스트를 임베딩으로 변환 + query_embedding = self.embedding_gen.generate_embedding(text) + logger.info(f"임베딩 생성 완료: {len(query_embedding)}차원") + + # 2. 연관 회의록 검색 설정 + config = self.config.get("rag_minutes", {}) + search_config = config.get("search", {}) + + top_k = search_config.get("top_k", 5) + similarity_threshold = search_config.get("similarity_threshold", 0.7) + + # 3. 벡터 유사도 검색 + results = self.rag_minutes_db.search_by_vector( + query_embedding=query_embedding, + top_k=top_k, + similarity_threshold=similarity_threshold + ) + + # 4. 검색 결과 로깅 + if results: + logger.info(f"세그먼트 {segment_id} 연관 회의록 검색 완료: {len(results)}개 발견") + for idx, result in enumerate(results, 1): + minutes = result["minutes"] + score = result["similarity_score"] + logger.info( + f" [{idx}] {minutes.title} " + f"(회의 ID: {minutes.meeting_id}, 유사도: {score:.3f})" + ) + else: + logger.info(f"세그먼트 {segment_id}에서 연관 회의록을 찾지 못했습니다") + + # 5. SSE를 통해 결과 전송 + if session_id: + from ..services.sse_manager import sse_manager + + # 회의록 정보를 직렬화 가능한 형태로 변환 + minutes_data = [] + for result in results: + minutes = result["minutes"] + minutes_data.append({ + "minutes_id": minutes.minutes_id, + "meeting_id": minutes.meeting_id, + "title": minutes.title, + "purpose": minutes.purpose, + "scheduled_at": minutes.scheduled_at, + "location": minutes.location, + "finalized_at": minutes.finalized_at, + "similarity_score": result["similarity_score"] + }) + + # SSE로 전송 + success = await sse_manager.send_to_session( + session_id=session_id, + data={ + "segment_id": segment_id, + "meeting_id": meeting_id, + "text": text[:100], # 텍스트 일부만 전송 + "related_minutes": minutes_data, + "total_count": len(minutes_data) + }, + event_type="related_minutes_result" + ) + + if success: + logger.info(f"연관 회의록 검색 결과를 SSE로 전송 완료: {session_id}") + else: + logger.warning(f"SSE 전송 실패 (세션 미연결): {session_id}") + else: + logger.warning("세션 ID가 없어 연관 회의록 SSE 전송을 건너뜁니다") + + except Exception as e: + logger.error(f"연관 회의록 검색 및 전송 실패: {str(e)}", exc_info=True) + def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]: """ Java LocalDateTime 배열을 ISO 8601 문자열로 변환