"""AI 제안사항 SSE 엔드포인트""" from fastapi import APIRouter from sse_starlette.sse import EventSourceResponse import logging import asyncio from typing import AsyncGenerator from app.models import RealtimeSuggestionsResponse from app.services.claude_service import ClaudeService from app.services.redis_service import RedisService from app.config import get_settings logger = logging.getLogger(__name__) router = APIRouter() settings = get_settings() # 서비스 인스턴스 claude_service = ClaudeService() @router.get( "/meetings/{meeting_id}/stream", summary="실시간 AI 제안사항 스트리밍", description=""" 회의 중 실시간으로 AI 제안사항을 Server-Sent Events(SSE)로 스트리밍합니다. ### 동작 방식 1. Redis에서 누적된 회의 텍스트 조회 (5초마다) 2. 임계값(10개 세그먼트) 이상이면 Claude API로 분석 3. 분석 결과를 SSE 이벤트로 전송 ### SSE 이벤트 형식 ``` event: ai-suggestion id: {segment_count} data: {"suggestions": [...]} ``` ### 클라이언트 연결 예시 (JavaScript) ```javascript const eventSource = new EventSource( 'http://localhost:8087/api/v1/ai/suggestions/meetings/{meeting_id}/stream' ); eventSource.addEventListener('ai-suggestion', (event) => { const data = JSON.parse(event.data); console.log('새로운 제안사항:', data.suggestions); }); eventSource.onerror = (error) => { console.error('SSE 오류:', error); eventSource.close(); }; ``` ### 주의사항 - 연결은 클라이언트가 종료할 때까지 유지됩니다 - 네트워크 타임아웃 설정이 충분히 길어야 합니다 - 브라우저는 자동으로 재연결을 시도합니다 """, responses={ 200: { "description": "SSE 스트림 연결 성공", "content": { "text/event-stream": { "example": """event: ai-suggestion id: 15 data: {"suggestions":[{"id":"550e8400-e29b-41d4-a716-446655440000","content":"신제품의 타겟 고객층을 20-30대로 설정하고, 모바일 우선 전략을 취하기로 논의 중입니다.","timestamp":"14:23:45","confidence":0.92}]} """ } } } } ) async def stream_ai_suggestions(meeting_id: str): """ 실시간 AI 제안사항 SSE 스트리밍 Args: meeting_id: 회의 ID Returns: Server-Sent Events 스트림 """ logger.info(f"SSE 스트림 시작 - meetingId: {meeting_id}") async def event_generator() -> AsyncGenerator: """SSE 이벤트 생성기""" redis_service = RedisService() try: # Redis 연결 await redis_service.connect() previous_count = 0 while True: # 현재 세그먼트 개수 확인 current_count = await redis_service.get_segment_count(meeting_id) # 임계값 이상이고, 이전보다 증가했으면 분석 if (current_count >= settings.min_segments_for_analysis and current_count > previous_count): # 누적된 텍스트 조회 accumulated_text = await redis_service.get_accumulated_text(meeting_id) if accumulated_text: # Claude API로 분석 suggestions = await claude_service.analyze_suggestions(accumulated_text) if suggestions.suggestions: # SSE 이벤트 전송 yield { "event": "ai-suggestion", "id": str(current_count), "data": suggestions.json() } logger.info( f"AI 제안사항 발행 - meetingId: {meeting_id}, " f"개수: {len(suggestions.suggestions)}" ) previous_count = current_count # 5초마다 체크 await asyncio.sleep(5) except asyncio.CancelledError: logger.info(f"SSE 스트림 종료 - meetingId: {meeting_id}") # 회의 종료 시 데이터 정리는 선택사항 (나중에 조회 필요할 수도) # await redis_service.cleanup_meeting_data(meeting_id) except Exception as e: logger.error(f"SSE 스트림 오류 - meetingId: {meeting_id}", exc_info=e) finally: await redis_service.disconnect() return EventSourceResponse(event_generator()) @router.get("/test") async def test_endpoint(): """테스트 엔드포인트""" return {"message": "AI Suggestions API is working", "port": settings.port}