hgzero/rag/src/services/eventhub_consumer.py

457 lines
16 KiB
Python

"""
Azure Event Hub Consumer 서비스
회의록 확정 이벤트 및 세그먼트 생성 이벤트를 consume
"""
import asyncio
import json
import logging
from typing import Dict, Any, Optional, Union, List
from datetime import datetime
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from ..models.minutes import RagMinutes, MinutesSection
from ..db.rag_minutes_db import RagMinutesDB
from ..db.postgres_vector import PostgresVectorDB
from ..utils.embedding import EmbeddingGenerator
from ..utils.text_processor import extract_nouns_as_query
logger = logging.getLogger(__name__)
class EventHubConsumer:
"""Event Hub Consumer 서비스"""
def __init__(
self,
connection_string: str,
eventhub_name: str,
consumer_group: str,
storage_connection_string: str,
storage_container_name: str,
rag_minutes_db: RagMinutesDB,
embedding_gen: EmbeddingGenerator,
term_db: Optional[PostgresVectorDB] = None,
config: Optional[Dict[str, Any]] = None
):
"""
초기화
Args:
connection_string: Event Hub 연결 문자열
eventhub_name: Event Hub 이름
consumer_group: Consumer Group 이름
storage_connection_string: Azure Storage 연결 문자열
storage_container_name: Checkpoint 저장 컨테이너 이름
rag_minutes_db: RAG Minutes 데이터베이스
embedding_gen: Embedding 생성기
term_db: 용어집 데이터베이스 (선택)
config: 설정 딕셔너리 (선택)
"""
self.connection_string = connection_string
self.eventhub_name = eventhub_name
self.consumer_group = consumer_group
self.storage_connection_string = storage_connection_string
self.storage_container_name = storage_container_name
self.rag_minutes_db = rag_minutes_db
self.embedding_gen = embedding_gen
self.term_db = term_db
self.config = config or {}
self.client: Optional[EventHubConsumerClient] = None
self.is_running = False
async def start(self):
"""Consumer 시작"""
try:
# Checkpoint Store 생성
checkpoint_store = BlobCheckpointStore.from_connection_string(
self.storage_connection_string,
self.storage_container_name
)
# Event Hub Consumer Client 생성
self.client = EventHubConsumerClient.from_connection_string(
self.connection_string,
consumer_group=self.consumer_group,
eventhub_name=self.eventhub_name,
checkpoint_store=checkpoint_store
)
self.is_running = True
logger.info("Event Hub Consumer 시작")
# 이벤트 수신 시작
async with self.client:
await self.client.receive(
on_event=self._on_event,
on_error=self._on_error,
starting_position="-1" # 처음부터 읽기
)
except Exception as e:
logger.error(f"Event Hub Consumer 시작 실패: {str(e)}")
self.is_running = False
raise
async def stop(self):
"""Consumer 중지"""
self.is_running = False
if self.client:
await self.client.close()
logger.info("Event Hub Consumer 중지")
async def _on_event(self, partition_context, event):
"""
이벤트 수신 핸들러
Args:
partition_context: 파티션 컨텍스트
event: Event Hub 이벤트
"""
try:
# 이벤트 데이터 파싱
event_body = event.body_as_str()
event_data = json.loads(event_body)
event_type = event_data.get('eventType', 'unknown')
logger.info(f"이벤트 수신: {event_type}")
# 이벤트 타입별 처리
if event_type == "MINUTES_FINALIZED":
# 회의록 확정 이벤트
await self._process_minutes_event(event_data)
elif event_type == "SegmentCreated":
# 세그먼트 생성 이벤트 - 용어검색 실행
await self._process_segment_event(event_data)
# Checkpoint 업데이트
await partition_context.update_checkpoint(event)
except json.JSONDecodeError as e:
logger.error(f"이벤트 파싱 실패: {str(e)}")
except Exception as e:
logger.error(f"이벤트 처리 실패: {str(e)}")
async def _on_error(self, partition_context, error):
"""
에러 핸들러
Args:
partition_context: 파티션 컨텍스트
error: 에러 객체
"""
logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}")
async def _process_segment_event(self, event_data: Dict[str, Any]):
"""
세그먼트 생성 이벤트 처리 - 용어검색 실행
Args:
event_data: 이벤트 데이터
"""
try:
# 용어집 DB가 없으면 스킵
if not self.term_db:
logger.debug("용어집 DB가 설정되지 않아 용어검색을 스킵합니다")
return
# 세그먼트 데이터 추출
segment_id = event_data.get("segmentId")
text = event_data.get("text", "")
meeting_id = event_data.get("meetingId")
if not text:
logger.warning(f"세그먼트 {segment_id}에 텍스트가 없습니다")
return
logger.info(f"세그먼트 용어검색 시작: {segment_id} (회의: {meeting_id})")
logger.info(f"텍스트: {text[:100]}...")
# 1. 명사 추출하여 검색 쿼리 생성
search_query = extract_nouns_as_query(text)
logger.info(f"검색 쿼리 변환: '{text[:30]}...''{search_query}'")
# 2. 용어검색 설정
config = self.config.get("term_glossary", {})
search_config = config.get("search", {})
top_k = search_config.get("top_k", 5)
confidence_threshold = search_config.get("confidence_threshold", 0.7)
keyword_weight = search_config.get("keyword_weight", 0.4)
vector_weight = search_config.get("vector_weight", 0.6)
# 3. 키워드 검색
keyword_results = self.term_db.search_by_keyword(
query=search_query,
top_k=top_k,
confidence_threshold=confidence_threshold
)
# 4. 벡터 검색
query_embedding = self.embedding_gen.generate_embedding(search_query)
vector_results = self.term_db.search_by_vector(
query_embedding=query_embedding,
top_k=top_k,
confidence_threshold=confidence_threshold
)
# 5. 하이브리드 검색 결과 통합 (RRF)
results = []
seen_ids = set()
# 키워드 결과 가중치 적용
for result in keyword_results:
term_id = result["term"].term_id
if term_id not in seen_ids:
result["relevance_score"] *= keyword_weight
result["match_type"] = "hybrid"
results.append(result)
seen_ids.add(term_id)
# 벡터 결과 가중치 적용
for result in vector_results:
term_id = result["term"].term_id
if term_id not in seen_ids:
result["relevance_score"] *= vector_weight
result["match_type"] = "hybrid"
results.append(result)
seen_ids.add(term_id)
else:
# 이미 있는 경우 점수 합산
for r in results:
if r["term"].term_id == term_id:
r["relevance_score"] += result["relevance_score"] * vector_weight
break
# 점수 기준 재정렬
results.sort(key=lambda x: x["relevance_score"], reverse=True)
results = results[:top_k]
# 6. 검색 결과 로깅
if results:
logger.info(f"세그먼트 {segment_id} 용어검색 완료: {len(results)}개 발견")
for idx, result in enumerate(results, 1):
term = result["term"]
score = result["relevance_score"]
logger.info(
f" [{idx}] {term.term_name} "
f"(카테고리: {term.category}, 점수: {score:.3f})"
)
else:
logger.info(f"세그먼트 {segment_id}에서 매칭되는 용어를 찾지 못했습니다")
# 7. 선택적: 검색 결과를 별도 테이블에 저장하거나 Event Hub로 발행
# TODO: 필요시 검색 결과를 저장하거나 downstream 서비스로 전달
except Exception as e:
logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True)
def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]:
"""
Java LocalDateTime 배열을 ISO 8601 문자열로 변환
Java의 Jackson이 LocalDateTime을 배열 형식으로 직렬화할 때 사용
배열 형식: [년, 월, 일, 시, 분, 초, 나노초]
Args:
value: datetime 값 (str, list, None)
Returns:
ISO 8601 형식 문자열 또는 None
Examples:
>>> _convert_datetime_array_to_string([2025, 11, 1, 13, 55, 54, 388000000])
"2025-11-01T13:55:54.388000"
>>> _convert_datetime_array_to_string("2025-11-01T13:55:54.388")
"2025-11-01T13:55:54.388"
>>> _convert_datetime_array_to_string(None)
None
"""
if value is None:
return None
# 이미 문자열이면 그대로 반환
if isinstance(value, str):
return value
# 배열 형식 [년, 월, 일, 시, 분, 초, 나노초]
if isinstance(value, list) and len(value) >= 6:
try:
year, month, day, hour, minute, second = value[:6]
# 나노초를 마이크로초로 변환 (Python datetime은 마이크로초 사용)
microsecond = value[6] // 1000 if len(value) > 6 else 0
dt = datetime(year, month, day, hour, minute, second, microsecond)
return dt.isoformat()
except (ValueError, TypeError) as e:
logger.warning(f"날짜 배열 변환 실패: {value}, 에러: {str(e)}")
return None
logger.warning(f"지원하지 않는 날짜 형식: {type(value)}, 값: {value}")
return None
async def _process_minutes_event(self, event_data: Dict[str, Any]):
"""
회의록 확정 이벤트 처리
Args:
event_data: 이벤트 데이터
"""
try:
# 회의록 데이터 추출
minutes_data = event_data.get("data", {})
# Meeting 정보
meeting_id = minutes_data.get("meetingId")
title = minutes_data.get("title")
purpose = minutes_data.get("purpose")
description = minutes_data.get("description")
# Java LocalDateTime 배열을 문자열로 변환
scheduled_at = self._convert_datetime_array_to_string(
minutes_data.get("scheduledAt")
)
location = minutes_data.get("location")
organizer_id = minutes_data.get("organizerId")
# Minutes 정보
minutes_id = minutes_data.get("minutesId")
minutes_status = minutes_data.get("status", "FINALIZED")
minutes_version = minutes_data.get("version", 1)
created_by = minutes_data.get("createdBy")
finalized_by = minutes_data.get("finalizedBy")
# Java LocalDateTime 배열을 문자열로 변환
finalized_at = self._convert_datetime_array_to_string(
minutes_data.get("finalizedAt")
)
# Sections 정보
sections_data = minutes_data.get("sections", [])
sections = [
MinutesSection(
section_id=section.get("sectionId"),
type=section.get("type"),
title=section.get("title"),
content=section.get("content", ""),
order=section.get("order", 0),
verified=section.get("verified", False)
)
for section in sections_data
]
# 전체 회의록 내용 생성 (검색용)
full_content = self._generate_full_content(title, purpose, sections)
logger.info(f"회의록 내용 생성 완료: {len(full_content)} 글자")
# Embedding 생성
logger.info(f"Embedding 생성 시작: {minutes_id}")
embedding = self.embedding_gen.generate_embedding(full_content)
logger.info(f"Embedding 생성 완료: {len(embedding)} 차원")
# RagMinutes 객체 생성
rag_minutes = RagMinutes(
meeting_id=meeting_id,
title=title,
purpose=purpose,
description=description,
scheduled_at=scheduled_at,
location=location,
organizer_id=organizer_id,
minutes_id=minutes_id,
minutes_status=minutes_status,
minutes_version=minutes_version,
created_by=created_by,
finalized_by=finalized_by,
finalized_at=finalized_at,
sections=sections,
full_content=full_content,
embedding=embedding,
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat()
)
# 데이터베이스에 저장
success = self.rag_minutes_db.insert_minutes(rag_minutes)
if success:
logger.info(f"회의록 RAG 저장 성공: {minutes_id}")
else:
logger.error(f"회의록 RAG 저장 실패: {minutes_id}")
except Exception as e:
logger.error(f"회의록 이벤트 처리 실패: {str(e)}")
raise
def _generate_full_content(self, title: str, purpose: Optional[str], sections: list) -> str:
"""
전체 회의록 내용 생성 (검색용 텍스트)
Args:
title: 회의 제목
purpose: 회의 목적
sections: 회의록 섹션 목록
Returns:
전체 회의록 내용
"""
content_parts = []
# 제목
if title:
content_parts.append(f"제목: {title}")
# 목적
if purpose:
content_parts.append(f"목적: {purpose}")
# 섹션별 내용
for section in sections:
if section.content:
content_parts.append(f"\n[{section.title}]\n{section.content}")
return "\n\n".join(content_parts)
async def start_consumer(
config: Dict[str, Any],
rag_minutes_db: RagMinutesDB,
embedding_gen: EmbeddingGenerator,
term_db: Optional[PostgresVectorDB] = None
):
"""
Event Hub Consumer 시작 (비동기)
Args:
config: 설정 딕셔너리
rag_minutes_db: RAG Minutes 데이터베이스
embedding_gen: Embedding 생성기
term_db: 용어집 데이터베이스 (선택)
"""
eventhub_config = config["eventhub"]
consumer = EventHubConsumer(
connection_string=eventhub_config["connection_string"],
eventhub_name=eventhub_config["name"],
consumer_group=eventhub_config["consumer_group"],
storage_connection_string=eventhub_config["storage"]["connection_string"],
storage_container_name=eventhub_config["storage"]["container_name"],
rag_minutes_db=rag_minutes_db,
embedding_gen=embedding_gen,
term_db=term_db,
config=config
)
try:
await consumer.start()
except KeyboardInterrupt:
logger.info("Consumer 종료 신호 수신")
await consumer.stop()
except Exception as e:
logger.error(f"Consumer 실행 중 에러: {str(e)}")
await consumer.stop()
raise