""" Azure Event Hub Consumer 서비스 회의록 확정 이벤트 및 세그먼트 생성 이벤트를 consume """ import asyncio import json import logging import re from typing import Dict, Any, Optional, Union, List from datetime import datetime from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore from ..models.minutes import RagMinutes, MinutesSection from ..db.rag_minutes_db import RagMinutesDB from ..db.postgres_vector import PostgresVectorDB from ..utils.embedding import EmbeddingGenerator from ..utils.text_processor import extract_nouns_as_query logger = logging.getLogger(__name__) class EventHubConsumer: """Event Hub Consumer 서비스""" def __init__( self, connection_string: str, eventhub_name: str, consumer_group: str, storage_connection_string: str, storage_container_name: str, rag_minutes_db: RagMinutesDB, embedding_gen: EmbeddingGenerator, term_db: Optional[PostgresVectorDB] = None, config: Optional[Dict[str, Any]] = None ): """ 초기화 Args: connection_string: Event Hub 연결 문자열 eventhub_name: Event Hub 이름 consumer_group: Consumer Group 이름 storage_connection_string: Azure Storage 연결 문자열 storage_container_name: Checkpoint 저장 컨테이너 이름 rag_minutes_db: RAG Minutes 데이터베이스 embedding_gen: Embedding 생성기 term_db: 용어집 데이터베이스 (선택) config: 설정 딕셔너리 (선택) """ self.connection_string = connection_string self.eventhub_name = eventhub_name self.consumer_group = consumer_group self.storage_connection_string = storage_connection_string self.storage_container_name = storage_container_name self.rag_minutes_db = rag_minutes_db self.embedding_gen = embedding_gen self.term_db = term_db self.config = config or {} self.client: Optional[EventHubConsumerClient] = None self.is_running = False async def start(self): """Consumer 시작""" try: # Checkpoint Store 생성 checkpoint_store = BlobCheckpointStore.from_connection_string( self.storage_connection_string, self.storage_container_name ) # Event Hub Consumer Client 생성 self.client = EventHubConsumerClient.from_connection_string( self.connection_string, consumer_group=self.consumer_group, eventhub_name=self.eventhub_name, checkpoint_store=checkpoint_store ) self.is_running = True logger.info("Event Hub Consumer 시작") # 이벤트 수신 시작 async with self.client: await self.client.receive( on_event=self._on_event, on_error=self._on_error, starting_position="-1" # 처음부터 읽기 ) except Exception as e: logger.error(f"Event Hub Consumer 시작 실패: {str(e)}") self.is_running = False raise async def stop(self): """Consumer 중지""" self.is_running = False if self.client: await self.client.close() logger.info("Event Hub Consumer 중지") async def _on_event(self, partition_context, event): """ 이벤트 수신 핸들러 Args: partition_context: 파티션 컨텍스트 event: Event Hub 이벤트 """ try: # 이벤트 데이터 파싱 event_body = event.body_as_str() logger.debug(f"원본 이벤트 데이터 (처음 200자): {event_body[:200]}") # Java LocalDateTime 배열을 문자열로 변환하여 JSON 파싱 가능하게 변환 converted_body = self._convert_java_datetime_arrays(event_body) logger.debug(f"변환된 이벤트 데이터 (처음 200자): {converted_body[:200]}") event_data = json.loads(converted_body) event_type = event_data.get('eventType', 'unknown') logger.info(f"이벤트 수신: {event_type}") # 이벤트 타입별 처리 if event_type == "MINUTES_FINALIZED": # 회의록 확정 이벤트 await self._process_minutes_event(event_data) elif event_type == "SegmentCreated": # 세그먼트 생성 이벤트 - 용어검색 및 연관 회의록 검색 실행 await self._process_segment_event(event_data) # Checkpoint 업데이트 await partition_context.update_checkpoint(event) except json.JSONDecodeError as e: logger.error(f"이벤트 파싱 실패: {str(e)}") except Exception as e: logger.error(f"이벤트 처리 실패: {str(e)}", exc_info=True) async def _on_error(self, partition_context, error): """ 에러 핸들러 Args: partition_context: 파티션 컨텍스트 error: 에러 객체 """ logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}") def _convert_java_datetime_arrays(self, json_str: str) -> str: """ JSON 문자열 내의 Java LocalDateTime 배열을 ISO 8601 문자열로 변환 Java의 Jackson이 LocalDateTime을 배열 형식으로 직렬화하는 것을 Python이 파싱 가능한 문자열 형식으로 변환 Args: json_str: 원본 JSON 문자열 Returns: 변환된 JSON 문자열 Examples: >>> _convert_java_datetime_arrays('{"timestamp":[2025,10,29,10,25,37,579030000]}') '{"timestamp":"2025-10-29T10:25:37.579030"}' """ # Java LocalDateTime 배열 패턴: [년,월,일,시,분,초,나노초] # 나노초는 항상 7개 요소로 전송됨 pattern = r'\[(\d{4}),(\d{1,2}),(\d{1,2}),(\d{1,2}),(\d{1,2}),(\d{1,2}),(\d+)\]' def replace_datetime(match): year = int(match.group(1)) month = int(match.group(2)) day = int(match.group(3)) hour = int(match.group(4)) minute = int(match.group(5)) second = int(match.group(6)) nanosecond = int(match.group(7)) # 나노초를 마이크로초로 변환 microsecond = nanosecond // 1000 # ISO 8601 형식 문자열 생성 dt = datetime(year, month, day, hour, minute, second, microsecond) return f'"{dt.isoformat()}"' # 모든 datetime 배열을 문자열로 변환 return re.sub(pattern, replace_datetime, json_str) async def _process_segment_event(self, event_data: Dict[str, Any]): """ 세그먼트 생성 이벤트 처리 - 용어검색 실행 Args: event_data: 이벤트 데이터 """ try: # 용어집 DB가 없으면 스킵 if not self.term_db: logger.debug("용어집 DB가 설정되지 않아 용어검색을 스킵합니다") return # 세그먼트 데이터 추출 segment_id = event_data.get("segmentId") text = event_data.get("text", "") meeting_id = event_data.get("meetingId") # 이벤트 데이터 구조 로깅 (디버깅용) logger.debug(f"이벤트 데이터 키: {list(event_data.keys())}") if not text: logger.warning(f"세그먼트 {segment_id}에 텍스트가 없습니다") return logger.info(f"세그먼트 용어검색 시작: {segment_id} (회의: {meeting_id})") logger.info(f"텍스트: {text[:100]}...") # 1. 명사 추출하여 검색 쿼리 생성 search_query = extract_nouns_as_query(text) logger.info(f"검색 쿼리 변환: '{text[:30]}...' → '{search_query}'") # 2. 용어검색 설정 config = self.config.get("term_glossary", {}) search_config = config.get("search", {}) top_k = search_config.get("top_k", 5) confidence_threshold = search_config.get("confidence_threshold", 0.7) keyword_weight = search_config.get("keyword_weight", 0.4) vector_weight = search_config.get("vector_weight", 0.6) # 3. 키워드 검색 keyword_results = self.term_db.search_by_keyword( query=search_query, top_k=top_k, confidence_threshold=confidence_threshold ) # 4. 벡터 검색 query_embedding = self.embedding_gen.generate_embedding(search_query) vector_results = self.term_db.search_by_vector( query_embedding=query_embedding, top_k=top_k, confidence_threshold=confidence_threshold ) # 5. 하이브리드 검색 결과 통합 (RRF) results = [] seen_ids = set() # 키워드 결과 가중치 적용 for result in keyword_results: term_id = result["term"].term_id if term_id not in seen_ids: result["relevance_score"] *= keyword_weight result["match_type"] = "hybrid" results.append(result) seen_ids.add(term_id) # 벡터 결과 가중치 적용 for result in vector_results: term_id = result["term"].term_id if term_id not in seen_ids: result["relevance_score"] *= vector_weight result["match_type"] = "hybrid" results.append(result) seen_ids.add(term_id) else: # 이미 있는 경우 점수 합산 for r in results: if r["term"].term_id == term_id: r["relevance_score"] += result["relevance_score"] * vector_weight break # 점수 기준 재정렬 results.sort(key=lambda x: x["relevance_score"], reverse=True) results = results[:top_k] # 6. 검색 결과 로깅 if results: logger.info(f"세그먼트 {segment_id} 용어검색 완료: {len(results)}개 발견") for idx, result in enumerate(results, 1): term = result["term"] score = result["relevance_score"] logger.info( f" [{idx}] {term.term_name} " f"(카테고리: {term.category}, 점수: {score:.3f})" ) else: logger.info(f"세그먼트 {segment_id}에서 매칭되는 용어를 찾지 못했습니다") # 7. SSE를 통해 결과 전송 # Event Hub 메시지에서 sessionId 추출 (여러 필드 확인) session_id = event_data.get("sessionId") # session_id = event_data.get("sessionId") or event_data.get("session_id") or event_data.get("meetingId") or meeting_id logger.info(f"SSE 전송 시도: sessionId={session_id}, meetingId={meeting_id}") if session_id: from ..services.sse_manager import sse_manager # 용어 정보를 직렬화 가능한 형태로 변환 terms_data = [] for result in results: term = result["term"] terms_data.append({ "term_id": term.term_id, "term_name": term.term_name, "definition": term.definition, "category": term.category, "synonyms": term.synonyms, "related_terms": term.related_terms, "context": term.context, "relevance_score": result["relevance_score"], "match_type": result.get("match_type", "unknown") }) # SSE로 전송 success = await sse_manager.send_to_session( session_id=session_id, data={ "segment_id": segment_id, "meeting_id": meeting_id, "text": text[:100], # 텍스트 일부만 전송 "terms": terms_data, "total_count": len(terms_data) }, event_type="term_result" ) if success: logger.info(f"용어 검색 결과를 SSE로 전송 완료: {session_id}") else: logger.warning(f"SSE 전송 실패 (세션 미연결): {session_id}") else: logger.warning("이벤트 데이터에 sessionId가 없어 SSE 전송을 건너뜁니다") # 8. 연관 회의록 검색 및 SSE 전송 await self._search_and_send_related_minutes(text, session_id, segment_id, meeting_id) except Exception as e: logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True) async def _search_and_send_related_minutes( self, text: str, session_id: Optional[str], segment_id: str, meeting_id: str ): """ 연관 회의록 검색 및 SSE 전송 Args: text: 세그먼트 텍스트 session_id: 세션 ID segment_id: 세그먼트 ID meeting_id: 회의 ID """ try: # RAG Minutes DB가 없으면 스킵 if not self.rag_minutes_db: logger.debug("RAG Minutes DB가 설정되지 않아 연관 회의록 검색을 스킵합니다") return if not text: logger.warning(f"세그먼트 {segment_id}에 텍스트가 없어 연관 회의록 검색을 스킵합니다") return logger.info(f"세그먼트 연관 회의록 검색 시작: {segment_id} (회의: {meeting_id})") logger.info(f"검색 텍스트: {text[:100]}...") # 1. 텍스트를 임베딩으로 변환 query_embedding = self.embedding_gen.generate_embedding(text) logger.info(f"임베딩 생성 완료: {len(query_embedding)}차원") # 2. 연관 회의록 검색 설정 config = self.config.get("rag_minutes", {}) search_config = config.get("search", {}) top_k = search_config.get("top_k", 5) similarity_threshold = search_config.get("similarity_threshold", 0.7) # 3. 벡터 유사도 검색 results = self.rag_minutes_db.search_by_vector( query_embedding=query_embedding, top_k=top_k, similarity_threshold=similarity_threshold ) # 4. 검색 결과 로깅 if results: logger.info(f"세그먼트 {segment_id} 연관 회의록 검색 완료: {len(results)}개 발견") for idx, result in enumerate(results, 1): minutes = result["minutes"] score = result["similarity_score"] logger.info( f" [{idx}] {minutes.title} " f"(회의 ID: {minutes.meeting_id}, 유사도: {score:.3f})" ) else: logger.info(f"세그먼트 {segment_id}에서 연관 회의록을 찾지 못했습니다") # 5. SSE를 통해 결과 전송 if session_id: from ..services.sse_manager import sse_manager # 회의록 정보를 직렬화 가능한 형태로 변환 minutes_data = [] for result in results: minutes = result["minutes"] minutes_data.append({ "minutes_id": minutes.minutes_id, "meeting_id": minutes.meeting_id, "title": minutes.title, "purpose": minutes.purpose, "scheduled_at": minutes.scheduled_at, "location": minutes.location, "finalized_at": minutes.finalized_at, "similarity_score": result["similarity_score"] }) # SSE로 전송 success = await sse_manager.send_to_session( session_id=session_id, data={ "segment_id": segment_id, "meeting_id": meeting_id, "text": text[:100], # 텍스트 일부만 전송 "related_minutes": minutes_data, "total_count": len(minutes_data) }, event_type="related_minutes_result" ) if success: logger.info(f"연관 회의록 검색 결과를 SSE로 전송 완료: {session_id}") else: logger.warning(f"SSE 전송 실패 (세션 미연결): {session_id}") else: logger.warning("세션 ID가 없어 연관 회의록 SSE 전송을 건너뜁니다") except Exception as e: logger.error(f"연관 회의록 검색 및 전송 실패: {str(e)}", exc_info=True) def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]: """ Java LocalDateTime 배열을 ISO 8601 문자열로 변환 Java의 Jackson이 LocalDateTime을 배열 형식으로 직렬화할 때 사용 배열 형식: [년, 월, 일, 시, 분, 초, 나노초] Args: value: datetime 값 (str, list, None) Returns: ISO 8601 형식 문자열 또는 None Examples: >>> _convert_datetime_array_to_string([2025, 11, 1, 13, 55, 54, 388000000]) "2025-11-01T13:55:54.388000" >>> _convert_datetime_array_to_string("2025-11-01T13:55:54.388") "2025-11-01T13:55:54.388" >>> _convert_datetime_array_to_string(None) None """ if value is None: return None # 이미 문자열이면 그대로 반환 if isinstance(value, str): return value # 배열 형식 [년, 월, 일, 시, 분, 초, 나노초] if isinstance(value, list) and len(value) >= 6: try: year, month, day, hour, minute, second = value[:6] # 나노초를 마이크로초로 변환 (Python datetime은 마이크로초 사용) microsecond = value[6] // 1000 if len(value) > 6 else 0 dt = datetime(year, month, day, hour, minute, second, microsecond) return dt.isoformat() except (ValueError, TypeError) as e: logger.warning(f"날짜 배열 변환 실패: {value}, 에러: {str(e)}") return None logger.warning(f"지원하지 않는 날짜 형식: {type(value)}, 값: {value}") return None async def _process_minutes_event(self, event_data: Dict[str, Any]): """ 회의록 확정 이벤트 처리 Args: event_data: 이벤트 데이터 """ try: # 회의록 데이터 추출 minutes_data = event_data.get("data", {}) # Meeting 정보 meeting_id = minutes_data.get("meetingId") title = minutes_data.get("title") purpose = minutes_data.get("purpose") description = minutes_data.get("description") # Java LocalDateTime 배열을 문자열로 변환 scheduled_at = self._convert_datetime_array_to_string( minutes_data.get("scheduledAt") ) location = minutes_data.get("location") organizer_id = minutes_data.get("organizerId") # Minutes 정보 minutes_id = minutes_data.get("minutesId") minutes_status = minutes_data.get("status", "FINALIZED") minutes_version = minutes_data.get("version", 1) created_by = minutes_data.get("createdBy") finalized_by = minutes_data.get("finalizedBy") # Java LocalDateTime 배열을 문자열로 변환 finalized_at = self._convert_datetime_array_to_string( minutes_data.get("finalizedAt") ) # Sections 정보 sections_data = minutes_data.get("sections", []) sections = [ MinutesSection( section_id=section.get("sectionId"), type=section.get("type"), title=section.get("title"), content=section.get("content", ""), order=section.get("order", 0), verified=section.get("verified", False) ) for section in sections_data ] # 전체 회의록 내용 생성 (검색용) full_content = self._generate_full_content(title, purpose, sections) logger.info(f"회의록 내용 생성 완료: {len(full_content)} 글자") # Embedding 생성 logger.info(f"Embedding 생성 시작: {minutes_id}") embedding = self.embedding_gen.generate_embedding(full_content) logger.info(f"Embedding 생성 완료: {len(embedding)} 차원") # RagMinutes 객체 생성 rag_minutes = RagMinutes( meeting_id=meeting_id, title=title, purpose=purpose, description=description, scheduled_at=scheduled_at, location=location, organizer_id=organizer_id, minutes_id=minutes_id, minutes_status=minutes_status, minutes_version=minutes_version, created_by=created_by, finalized_by=finalized_by, finalized_at=finalized_at, sections=sections, full_content=full_content, embedding=embedding, created_at=datetime.now().isoformat(), updated_at=datetime.now().isoformat() ) # 데이터베이스에 저장 success = self.rag_minutes_db.insert_minutes(rag_minutes) if success: logger.info(f"회의록 RAG 저장 성공: {minutes_id}") else: logger.error(f"회의록 RAG 저장 실패: {minutes_id}") except Exception as e: logger.error(f"회의록 이벤트 처리 실패: {str(e)}") raise def _generate_full_content(self, title: str, purpose: Optional[str], sections: list) -> str: """ 전체 회의록 내용 생성 (검색용 텍스트) Args: title: 회의 제목 purpose: 회의 목적 sections: 회의록 섹션 목록 Returns: 전체 회의록 내용 """ content_parts = [] # 제목 if title: content_parts.append(f"제목: {title}") # 목적 if purpose: content_parts.append(f"목적: {purpose}") # 섹션별 내용 for section in sections: if section.content: content_parts.append(f"\n[{section.title}]\n{section.content}") return "\n\n".join(content_parts) async def start_consumer( config: Dict[str, Any], rag_minutes_db: RagMinutesDB, embedding_gen: EmbeddingGenerator, term_db: Optional[PostgresVectorDB] = None ): """ Event Hub Consumer 시작 (비동기) Args: config: 설정 딕셔너리 rag_minutes_db: RAG Minutes 데이터베이스 embedding_gen: Embedding 생성기 term_db: 용어집 데이터베이스 (선택) """ eventhub_config = config["eventhub"] consumer = EventHubConsumer( connection_string=eventhub_config["connection_string"], eventhub_name=eventhub_config["name"], consumer_group=eventhub_config["consumer_group"], storage_connection_string=eventhub_config["storage"]["connection_string"], storage_container_name=eventhub_config["storage"]["container_name"], rag_minutes_db=rag_minutes_db, embedding_gen=embedding_gen, term_db=term_db, config=config ) try: await consumer.start() except KeyboardInterrupt: logger.info("Consumer 종료 신호 수신") await consumer.stop() except Exception as e: logger.error(f"Consumer 실행 중 에러: {str(e)}") await consumer.stop() raise