mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 11:26:25 +00:00
- AI 서비스: Redis 캐싱 및 EventHub 통합 개선 - STT 서비스: 오디오 버퍼링 및 변환 기능 추가 - 설정 파일 업데이트 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
195 lines
7.0 KiB
Python
195 lines
7.0 KiB
Python
"""AI 제안사항 SSE 엔드포인트"""
|
|
from fastapi import APIRouter, Response
|
|
from fastapi.responses import StreamingResponse
|
|
from sse_starlette.sse import EventSourceResponse
|
|
import logging
|
|
import asyncio
|
|
from typing import AsyncGenerator
|
|
from app.models import RealtimeSuggestionsResponse
|
|
from app.services.claude_service import ClaudeService
|
|
from app.services.redis_service import RedisService
|
|
from app.config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter()
|
|
settings = get_settings()
|
|
|
|
# 서비스 인스턴스
|
|
claude_service = ClaudeService()
|
|
|
|
|
|
@router.get(
|
|
"/meetings/{meeting_id}/stream",
|
|
summary="실시간 AI 제안사항 스트리밍",
|
|
description="""
|
|
회의 중 실시간으로 AI 제안사항을 Server-Sent Events(SSE)로 스트리밍합니다.
|
|
|
|
### 동작 방식
|
|
1. Redis에서 누적된 회의 텍스트 조회 (5초마다)
|
|
2. 임계값(4개 세그먼트, 약 60초) 이상이면 Claude API로 분석
|
|
3. 분석 결과를 SSE 이벤트로 전송
|
|
|
|
### SSE 이벤트 형식
|
|
```
|
|
event: ai-suggestion
|
|
id: {segment_count}
|
|
data: {"suggestions": [...]}
|
|
```
|
|
|
|
### 클라이언트 연결 예시 (JavaScript)
|
|
```javascript
|
|
const eventSource = new EventSource(
|
|
'http://localhost:8087/api/v1/ai/suggestions/meetings/{meeting_id}/stream'
|
|
);
|
|
|
|
eventSource.addEventListener('ai-suggestion', (event) => {
|
|
const data = JSON.parse(event.data);
|
|
console.log('새로운 제안사항:', data.suggestions);
|
|
});
|
|
|
|
eventSource.onerror = (error) => {
|
|
console.error('SSE 오류:', error);
|
|
eventSource.close();
|
|
};
|
|
```
|
|
|
|
### 주의사항
|
|
- 연결은 클라이언트가 종료할 때까지 유지됩니다
|
|
- 네트워크 타임아웃 설정이 충분히 길어야 합니다
|
|
- 브라우저는 자동으로 재연결을 시도합니다
|
|
""",
|
|
responses={
|
|
200: {
|
|
"description": "SSE 스트림 연결 성공",
|
|
"content": {
|
|
"text/event-stream": {
|
|
"example": """event: ai-suggestion
|
|
id: 15
|
|
data: {"suggestions":[{"id":"550e8400-e29b-41d4-a716-446655440000","content":"신제품의 타겟 고객층을 20-30대로 설정하고, 모바일 우선 전략을 취하기로 논의 중입니다.","timestamp":"14:23:45","confidence":0.92}]}
|
|
|
|
"""
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
async def stream_ai_suggestions(meeting_id: str):
|
|
"""
|
|
실시간 AI 제안사항 SSE 스트리밍
|
|
|
|
Args:
|
|
meeting_id: 회의 ID
|
|
|
|
Returns:
|
|
Server-Sent Events 스트림
|
|
"""
|
|
logger.info(f"SSE 스트림 시작 - meetingId: {meeting_id}")
|
|
|
|
async def event_generator() -> AsyncGenerator:
|
|
"""SSE 이벤트 생성기"""
|
|
redis_service = RedisService()
|
|
|
|
try:
|
|
# Redis 연결
|
|
await redis_service.connect()
|
|
|
|
previous_count = 0
|
|
|
|
# Keep-alive를 위한 주석 전송
|
|
yield {
|
|
"event": "ping",
|
|
"data": "connected"
|
|
}
|
|
|
|
while True:
|
|
# 현재 세그먼트 개수 확인
|
|
current_count = await redis_service.get_segment_count(meeting_id)
|
|
logger.debug(f"세그먼트 카운트 - meetingId: {meeting_id}, count: {current_count}, prev: {previous_count}")
|
|
|
|
# 임계값 이상이고, 이전보다 증가했으면 분석
|
|
if (current_count >= settings.min_segments_for_analysis
|
|
and current_count > previous_count):
|
|
|
|
# 누적된 텍스트 조회
|
|
accumulated_text = await redis_service.get_accumulated_text(meeting_id)
|
|
|
|
if accumulated_text:
|
|
logger.info(f"텍스트 누적 완료 - meetingId: {meeting_id}, 길이: {len(accumulated_text)}")
|
|
|
|
# 이미 생성된 제안사항 조회
|
|
existing_suggestions = await redis_service.get_generated_suggestions(meeting_id)
|
|
|
|
# Claude API로 분석
|
|
suggestions = await claude_service.analyze_suggestions(accumulated_text)
|
|
|
|
if suggestions.suggestions:
|
|
# 중복 제거: 새로운 제안사항만 필터링
|
|
new_suggestions = [
|
|
s for s in suggestions.suggestions
|
|
if s.content not in existing_suggestions
|
|
]
|
|
|
|
if new_suggestions:
|
|
# 새로운 제안사항만 SSE 이벤트 전송
|
|
from app.models import RealtimeSuggestionsResponse
|
|
filtered_response = RealtimeSuggestionsResponse(suggestions=new_suggestions)
|
|
|
|
yield {
|
|
"event": "ai-suggestion",
|
|
"id": str(current_count),
|
|
"data": filtered_response.json()
|
|
}
|
|
|
|
# Redis에 새로운 제안사항 저장
|
|
for suggestion in new_suggestions:
|
|
await redis_service.add_generated_suggestion(
|
|
meeting_id,
|
|
suggestion.content
|
|
)
|
|
|
|
logger.info(
|
|
f"AI 제안사항 발행 - meetingId: {meeting_id}, "
|
|
f"전체: {len(suggestions.suggestions)}, 신규: {len(new_suggestions)}"
|
|
)
|
|
else:
|
|
logger.info(
|
|
f"중복 제거 후 신규 제안사항 없음 - meetingId: {meeting_id}"
|
|
)
|
|
|
|
previous_count = current_count
|
|
|
|
# Keep-alive 주석 전송 (SSE 연결 유지)
|
|
yield {
|
|
"event": "ping",
|
|
"data": f"alive-{current_count}"
|
|
}
|
|
|
|
# 5초마다 체크
|
|
await asyncio.sleep(5)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"SSE 스트림 종료 - meetingId: {meeting_id}")
|
|
# 회의 종료 시 데이터 정리는 선택사항 (나중에 조회 필요할 수도)
|
|
# await redis_service.cleanup_meeting_data(meeting_id)
|
|
|
|
except Exception as e:
|
|
logger.error(f"SSE 스트림 오류 - meetingId: {meeting_id}", exc_info=e)
|
|
|
|
finally:
|
|
await redis_service.disconnect()
|
|
|
|
# CORS 헤더를 포함한 EventSourceResponse 반환
|
|
return EventSourceResponse(
|
|
event_generator(),
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/test")
|
|
async def test_endpoint():
|
|
"""테스트 엔드포인트"""
|
|
return {"message": "AI Suggestions API is working", "port": settings.port}
|