hgzero/ai-python/app/api/v1/suggestions.py
Minseo-Jo 938cfdb98e feat: AI 서비스 Swagger UI 및 프론트엔드 연동 문서 추가
- Swagger UI를 Spring Boot 스타일(/swagger-ui.html)로 변경
- OpenAPI 스펙 개선 및 상세한 API 문서화
- 프론트엔드 연동을 위한 API-DOCUMENTATION.md 추가
  - SSE 연결 예시 (JavaScript/React)
  - 응답 스키마 및 TypeScript 인터페이스
  - 동작 흐름 및 주의사항
- FastAPI 설정 파일(config.py) 추가
- API 엔드포인트:
  - GET /api/v1/ai/suggestions/meetings/{meeting_id}/stream (SSE)
  - GET /health (헬스 체크)
  - GET /v3/api-docs (OpenAPI JSON)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-28 16:58:01 +09:00

148 lines
4.8 KiB
Python

"""AI 제안사항 SSE 엔드포인트"""
from fastapi import APIRouter
from sse_starlette.sse import EventSourceResponse
import logging
import asyncio
from typing import AsyncGenerator
from app.models import RealtimeSuggestionsResponse
from app.services.claude_service import ClaudeService
from app.services.redis_service import RedisService
from app.config import get_settings
logger = logging.getLogger(__name__)
router = APIRouter()
settings = get_settings()
# 서비스 인스턴스
claude_service = ClaudeService()
@router.get(
"/meetings/{meeting_id}/stream",
summary="실시간 AI 제안사항 스트리밍",
description="""
회의 중 실시간으로 AI 제안사항을 Server-Sent Events(SSE)로 스트리밍합니다.
### 동작 방식
1. Redis에서 누적된 회의 텍스트 조회 (5초마다)
2. 임계값(10개 세그먼트) 이상이면 Claude API로 분석
3. 분석 결과를 SSE 이벤트로 전송
### SSE 이벤트 형식
```
event: ai-suggestion
id: {segment_count}
data: {"suggestions": [...]}
```
### 클라이언트 연결 예시 (JavaScript)
```javascript
const eventSource = new EventSource(
'http://localhost:8087/api/v1/ai/suggestions/meetings/{meeting_id}/stream'
);
eventSource.addEventListener('ai-suggestion', (event) => {
const data = JSON.parse(event.data);
console.log('새로운 제안사항:', data.suggestions);
});
eventSource.onerror = (error) => {
console.error('SSE 오류:', error);
eventSource.close();
};
```
### 주의사항
- 연결은 클라이언트가 종료할 때까지 유지됩니다
- 네트워크 타임아웃 설정이 충분히 길어야 합니다
- 브라우저는 자동으로 재연결을 시도합니다
""",
responses={
200: {
"description": "SSE 스트림 연결 성공",
"content": {
"text/event-stream": {
"example": """event: ai-suggestion
id: 15
data: {"suggestions":[{"id":"550e8400-e29b-41d4-a716-446655440000","content":"신제품의 타겟 고객층을 20-30대로 설정하고, 모바일 우선 전략을 취하기로 논의 중입니다.","timestamp":"14:23:45","confidence":0.92}]}
"""
}
}
}
}
)
async def stream_ai_suggestions(meeting_id: str):
"""
실시간 AI 제안사항 SSE 스트리밍
Args:
meeting_id: 회의 ID
Returns:
Server-Sent Events 스트림
"""
logger.info(f"SSE 스트림 시작 - meetingId: {meeting_id}")
async def event_generator() -> AsyncGenerator:
"""SSE 이벤트 생성기"""
redis_service = RedisService()
try:
# Redis 연결
await redis_service.connect()
previous_count = 0
while True:
# 현재 세그먼트 개수 확인
current_count = await redis_service.get_segment_count(meeting_id)
# 임계값 이상이고, 이전보다 증가했으면 분석
if (current_count >= settings.min_segments_for_analysis
and current_count > previous_count):
# 누적된 텍스트 조회
accumulated_text = await redis_service.get_accumulated_text(meeting_id)
if accumulated_text:
# Claude API로 분석
suggestions = await claude_service.analyze_suggestions(accumulated_text)
if suggestions.suggestions:
# SSE 이벤트 전송
yield {
"event": "ai-suggestion",
"id": str(current_count),
"data": suggestions.json()
}
logger.info(
f"AI 제안사항 발행 - meetingId: {meeting_id}, "
f"개수: {len(suggestions.suggestions)}"
)
previous_count = current_count
# 5초마다 체크
await asyncio.sleep(5)
except asyncio.CancelledError:
logger.info(f"SSE 스트림 종료 - meetingId: {meeting_id}")
# 회의 종료 시 데이터 정리는 선택사항 (나중에 조회 필요할 수도)
# await redis_service.cleanup_meeting_data(meeting_id)
except Exception as e:
logger.error(f"SSE 스트림 오류 - meetingId: {meeting_id}", exc_info=e)
finally:
await redis_service.disconnect()
return EventSourceResponse(event_generator())
@router.get("/test")
async def test_endpoint():
"""테스트 엔드포인트"""
return {"message": "AI Suggestions API is working", "port": settings.port}