AI 서비스 Python 마이그레이션 및 프론트엔드 연동 문서 추가

주요 변경사항:
- AI 서비스 Java → Python (FastAPI) 완전 마이그레이션
- 포트 변경: 8083 → 8086
- SSE 스트리밍 기능 구현 및 테스트 완료
- Claude API 연동 (claude-3-5-sonnet-20241022)
- Redis 슬라이딩 윈도우 방식 텍스트 축적
- Azure Event Hub 연동 준비 (STT 텍스트 수신)

프론트엔드 연동 지원:
- API 연동 가이드 업데이트 (Python 버전 반영)
- Mock 데이터 개발 가이드 신규 작성
- STT 개발 완료 전까지 Mock 데이터로 UI 개발 가능

기술 스택:
- Python 3.13
- FastAPI 0.104.1
- Anthropic Claude API 0.42.0
- Redis (asyncio) 5.0.1
- Azure Event Hub 5.11.4
- Pydantic 2.10.5

테스트 결과:
-  서비스 시작 정상
-  헬스 체크 성공
-  SSE 스트리밍 동작 확인
-  Redis 연결 정상

다음 단계:
- STT (Azure Speech) 서비스 연동 개발
- Event Hub를 통한 실시간 텍스트 수신
- E2E 통합 테스트 (STT → AI → Frontend)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Minseo-Jo
2025-10-27 11:52:30 +09:00
parent 9d71646b2e
commit 9bf3597cec
20 changed files with 2144 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
"""서비스 레이어"""
+147
View File
@@ -0,0 +1,147 @@
"""Claude API 서비스"""
import anthropic
import json
import logging
from typing import List
from datetime import datetime
import uuid
from app.config import get_settings
from app.models import SimpleSuggestion, RealtimeSuggestionsResponse
logger = logging.getLogger(__name__)
settings = get_settings()
class ClaudeService:
"""Claude API 클라이언트"""
def __init__(self):
self.client = None
if settings.claude_api_key:
self.client = anthropic.Anthropic(api_key=settings.claude_api_key)
async def analyze_suggestions(self, transcript_text: str) -> RealtimeSuggestionsResponse:
"""
회의 텍스트를 분석하여 AI 제안사항 생성
Args:
transcript_text: 누적된 회의 텍스트
Returns:
RealtimeSuggestionsResponse
"""
if not self.client:
logger.warning("Claude API 키가 설정되지 않음 - Mock 데이터 반환")
return self._generate_mock_suggestions()
logger.info(f"Claude API 호출 - 텍스트 길이: {len(transcript_text)}")
system_prompt = """당신은 회의록 작성 전문 AI 어시스턴트입니다.
실시간 회의 텍스트를 분석하여 **중요한 제안사항만** 추출하세요.
**추출 기준**:
- 회의 안건과 직접 관련된 내용
- 논의가 필요한 주제
- 결정된 사항
- 액션 아이템
**제외할 내용**:
- 잡담, 농담, 인사말
- 회의와 무관한 대화
- 단순 확인이나 질의응답
**응답 형식**: JSON만 반환 (다른 설명 없이)
{
"suggestions": [
{
"content": "구체적인 제안 내용 (1-2문장으로 명확하게)",
"confidence": 0.9
}
]
}
**주의**:
- 각 제안은 독립적이고 명확해야 함
- 회의 맥락에서 실제 중요한 내용만 포함
- confidence는 0-1 사이 값 (확신 정도)"""
try:
response = self.client.messages.create(
model=settings.claude_model,
max_tokens=settings.claude_max_tokens,
temperature=settings.claude_temperature,
system=system_prompt,
messages=[
{
"role": "user",
"content": f"다음 회의 내용을 분석해주세요:\n\n{transcript_text}"
}
]
)
# 응답 파싱
content_text = response.content[0].text
suggestions_data = self._parse_claude_response(content_text)
logger.info(f"Claude API 응답 성공 - 제안사항: {len(suggestions_data.get('suggestions', []))}")
return RealtimeSuggestionsResponse(
suggestions=[
SimpleSuggestion(
id=str(uuid.uuid4()),
content=s["content"],
timestamp=self._get_current_timestamp(),
confidence=s.get("confidence", 0.8)
)
for s in suggestions_data.get("suggestions", [])
]
)
except Exception as e:
logger.error(f"Claude API 호출 실패: {e}")
return RealtimeSuggestionsResponse(suggestions=[])
def _parse_claude_response(self, text: str) -> dict:
"""Claude 응답에서 JSON 추출 및 파싱"""
# ```json ... ``` 제거
if "```json" in text:
start = text.find("```json") + 7
end = text.rfind("```")
text = text[start:end].strip()
elif "```" in text:
start = text.find("```") + 3
end = text.rfind("```")
text = text[start:end].strip()
try:
return json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"JSON 파싱 실패: {e}, 원문: {text[:200]}")
return {"suggestions": []}
def _get_current_timestamp(self) -> str:
"""현재 타임스탬프 (HH:MM:SS)"""
return datetime.now().strftime("%H:%M:%S")
def _generate_mock_suggestions(self) -> RealtimeSuggestionsResponse:
"""Mock 제안사항 생성 (테스트용)"""
mock_suggestions = [
"신제품의 타겟 고객층을 20-30대로 설정하고, 모바일 우선 전략을 취하기로 논의 중입니다.",
"개발 일정: 1차 프로토타입은 11월 15일까지 완성, 2차 베타는 12월 1일 론칭",
"마케팅 예산 배분에 대해 SNS 광고 60%, 인플루언서 마케팅 40%로 의견이 나왔으나 추가 검토 필요"
]
import random
content = random.choice(mock_suggestions)
return RealtimeSuggestionsResponse(
suggestions=[
SimpleSuggestion(
id=str(uuid.uuid4()),
content=content,
timestamp=self._get_current_timestamp(),
confidence=0.85
)
]
)
+114
View File
@@ -0,0 +1,114 @@
"""Azure Event Hub 서비스 - STT 텍스트 수신"""
import asyncio
import logging
import json
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from app.config import get_settings
from app.services.redis_service import RedisService
logger = logging.getLogger(__name__)
settings = get_settings()
class EventHubService:
"""Event Hub 리스너 - STT 텍스트 실시간 수신"""
def __init__(self):
self.client = None
self.redis_service = RedisService()
async def start(self):
"""Event Hub 리스닝 시작"""
if not settings.eventhub_connection_string:
logger.warning("Event Hub 연결 문자열이 설정되지 않음 - Event Hub 리스너 비활성화")
return
logger.info("Event Hub 리스너 시작")
try:
# Redis 연결
await self.redis_service.connect()
# Event Hub 클라이언트 생성
self.client = EventHubConsumerClient.from_connection_string(
conn_str=settings.eventhub_connection_string,
consumer_group=settings.eventhub_consumer_group,
eventhub_name=settings.eventhub_name,
)
# 이벤트 수신 시작
async with self.client:
await self.client.receive(
on_event=self.on_event,
on_error=self.on_error,
starting_position="-1", # 최신 이벤트부터
)
except Exception as e:
logger.error(f"Event Hub 리스너 오류: {e}")
finally:
await self.redis_service.disconnect()
async def on_event(self, partition_context, event):
"""
이벤트 수신 핸들러
이벤트 형식 (STT Service에서 발행):
{
"eventType": "TranscriptSegmentReady",
"meetingId": "meeting-123",
"text": "변환된 텍스트",
"timestamp": 1234567890000
}
"""
try:
# 이벤트 데이터 파싱
event_data = json.loads(event.body_as_str())
event_type = event_data.get("eventType")
meeting_id = event_data.get("meetingId")
text = event_data.get("text")
timestamp = event_data.get("timestamp")
if event_type == "TranscriptSegmentReady" and meeting_id and text:
logger.info(
f"STT 텍스트 수신 - meetingId: {meeting_id}, "
f"텍스트 길이: {len(text)}"
)
# Redis에 텍스트 축적 (슬라이딩 윈도우)
await self.redis_service.add_transcript_segment(
meeting_id=meeting_id,
text=text,
timestamp=timestamp
)
logger.debug(f"Redis 저장 완료 - meetingId: {meeting_id}")
# 체크포인트 업데이트
await partition_context.update_checkpoint(event)
except Exception as e:
logger.error(f"이벤트 처리 오류: {e}", exc_info=True)
async def on_error(self, partition_context, error):
"""에러 핸들러"""
logger.error(
f"Event Hub 에러 - Partition: {partition_context.partition_id}, "
f"Error: {error}"
)
async def stop(self):
"""Event Hub 리스너 종료"""
if self.client:
await self.client.close()
logger.info("Event Hub 리스너 종료")
# 백그라운드 태스크로 실행할 함수
async def start_eventhub_listener():
"""Event Hub 리스너 백그라운드 실행"""
service = EventHubService()
await service.start()
+117
View File
@@ -0,0 +1,117 @@
"""Redis 서비스 - 실시간 텍스트 축적"""
import redis.asyncio as redis
import logging
from typing import List
from app.config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
class RedisService:
"""Redis 서비스 (슬라이딩 윈도우 방식)"""
def __init__(self):
self.redis_client = None
async def connect(self):
"""Redis 연결"""
try:
self.redis_client = await redis.Redis(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_password,
db=settings.redis_db,
decode_responses=True
)
await self.redis_client.ping()
logger.info("Redis 연결 성공")
except Exception as e:
logger.error(f"Redis 연결 실패: {e}")
raise
async def disconnect(self):
"""Redis 연결 종료"""
if self.redis_client:
await self.redis_client.close()
logger.info("Redis 연결 종료")
async def add_transcript_segment(
self,
meeting_id: str,
text: str,
timestamp: int
):
"""
실시간 텍스트 세그먼트 추가 (슬라이딩 윈도우)
Args:
meeting_id: 회의 ID
text: 텍스트 세그먼트
timestamp: 타임스탬프 (밀리초)
"""
key = f"meeting:{meeting_id}:transcript"
value = f"{timestamp}:{text}"
# Sorted Set에 추가 (타임스탬프를 스코어로)
await self.redis_client.zadd(key, {value: timestamp})
# 설정된 시간 이전 데이터 제거 (기본 5분)
retention_ms = settings.text_retention_seconds * 1000
cutoff_time = timestamp - retention_ms
await self.redis_client.zremrangebyscore(key, 0, cutoff_time)
logger.debug(f"텍스트 세그먼트 추가 - meetingId: {meeting_id}")
async def get_accumulated_text(self, meeting_id: str) -> str:
"""
누적된 텍스트 조회 (최근 5분)
Args:
meeting_id: 회의 ID
Returns:
누적된 텍스트 (시간순)
"""
key = f"meeting:{meeting_id}:transcript"
# 최신순으로 모든 세그먼트 조회
segments = await self.redis_client.zrevrange(key, 0, -1)
if not segments:
return ""
# 타임스탬프 제거하고 텍스트만 추출
texts = []
for seg in segments:
parts = seg.split(":", 1)
if len(parts) == 2:
texts.append(parts[1])
# 시간순으로 정렬 (역순으로 조회했으므로 다시 뒤집기)
return "\n".join(reversed(texts))
async def get_segment_count(self, meeting_id: str) -> int:
"""
누적된 세그먼트 개수
Args:
meeting_id: 회의 ID
Returns:
세그먼트 개수
"""
key = f"meeting:{meeting_id}:transcript"
count = await self.redis_client.zcard(key)
return count if count else 0
async def cleanup_meeting_data(self, meeting_id: str):
"""
회의 종료 시 데이터 정리
Args:
meeting_id: 회의 ID
"""
key = f"meeting:{meeting_id}:transcript"
await self.redis_client.delete(key)
logger.info(f"회의 데이터 정리 완료 - meetingId: {meeting_id}")