diff --git a/rag/check_active_consumers.py b/rag/check_active_consumers.py new file mode 100644 index 0000000..e38ba26 --- /dev/null +++ b/rag/check_active_consumers.py @@ -0,0 +1,191 @@ +""" +Event Hub의 활성 Consumer(host) 조회 스크립트 + +Blob Storage의 ownership 정보를 통해 현재 어떤 Consumer가 +어떤 파티션을 읽고 있는지 확인합니다. +""" +import asyncio +import json +from datetime import datetime, timezone +from pathlib import Path +from azure.storage.blob import BlobServiceClient +from src.utils.config import load_config + + +async def check_active_consumers(): + """활성 Consumer 조회""" + + # 설정 로드 + config_path = Path(__file__).parent / "config.yaml" + config = load_config(str(config_path)) + + eventhub_config = config['eventhub'] + storage_conn_str = eventhub_config['storage']['connection_string'] + container_name = eventhub_config['storage']['container_name'] + consumer_group = eventhub_config['consumer_group'] + + print("=" * 80) + print("📊 Event Hub Active Consumers") + print("=" * 80) + print(f"Consumer Group: {consumer_group}") + print(f"Container: {container_name}") + print() + + # Blob Service Client 생성 + blob_service = BlobServiceClient.from_connection_string(storage_conn_str) + container_client = blob_service.get_container_client(container_name) + + # Ownership blob 조회 (파티션 소유권 정보) + ownership_prefix = f"{consumer_group}/ownership/" + checkpoint_prefix = f"{consumer_group}/checkpoint/" + + print("-" * 80) + print("🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)") + print("-" * 80) + + ownership_blobs = container_client.list_blobs(name_starts_with=ownership_prefix) + active_consumers = {} + partition_owners = {} + + for blob in ownership_blobs: + # Blob 내용 읽기 + blob_client = container_client.get_blob_client(blob.name) + content = blob_client.download_blob().readall() + + try: + ownership_data = json.loads(content) + + partition_id = blob.name.split('/')[-1] + owner_id = ownership_data.get('ownerIdentifier', 'unknown') + last_modified = blob.last_modified + + # 현재 시간과 비교하여 활성 상태 확인 (30초 이내 갱신되었으면 활성) + now = datetime.now(timezone.utc) + time_diff = (now - last_modified).total_seconds() + is_active = time_diff < 60 # 60초 이내면 활성으로 간주 + + partition_owners[partition_id] = { + 'owner_id': owner_id, + 'last_modified': last_modified, + 'is_active': is_active, + 'time_diff': time_diff + } + + # Consumer별 집계 + if owner_id not in active_consumers: + active_consumers[owner_id] = { + 'partitions': [], + 'last_seen': last_modified, + 'is_active': is_active + } + + active_consumers[owner_id]['partitions'].append(partition_id) + + # 가장 최근 시간으로 업데이트 + if last_modified > active_consumers[owner_id]['last_seen']: + active_consumers[owner_id]['last_seen'] = last_modified + active_consumers[owner_id]['is_active'] = is_active + + except json.JSONDecodeError: + print(f"⚠️ 파티션 {partition_id}: 파싱 실패") + continue + + # 파티션별 출력 + if partition_owners: + print() + for partition_id, info in sorted(partition_owners.items()): + status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE" + print(f"파티션 {partition_id}:") + print(f" 상태: {status}") + print(f" Owner ID: {info['owner_id'][:40]}...") + print(f" 마지막 갱신: {info['last_modified'].strftime('%Y-%m-%d %H:%M:%S')} UTC") + print(f" 경과 시간: {int(info['time_diff'])}초 전") + print() + else: + print("⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.") + print() + + # Consumer별 요약 + print("-" * 80) + print("📋 Consumer 요약") + print("-" * 80) + + if active_consumers: + for idx, (owner_id, info) in enumerate(active_consumers.items(), 1): + status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE" + print(f"\nConsumer #{idx} {status}") + print(f" Owner ID: {owner_id}") + print(f" 소유 파티션: {', '.join(info['partitions'])}") + print(f" 파티션 개수: {len(info['partitions'])}") + print(f" 마지막 활동: {info['last_seen'].strftime('%Y-%m-%d %H:%M:%S')} UTC") + else: + print("\n활성 Consumer가 없습니다.") + + # Checkpoint 정보 조회 + print() + print("-" * 80) + print("📍 Checkpoint 정보 (마지막 읽은 위치)") + print("-" * 80) + + checkpoint_blobs = container_client.list_blobs(name_starts_with=checkpoint_prefix) + + for blob in checkpoint_blobs: + blob_client = container_client.get_blob_client(blob.name) + content = blob_client.download_blob().readall() + + try: + checkpoint_data = json.loads(content) + + partition_id = blob.name.split('/')[-1] + offset = checkpoint_data.get('offset', 'N/A') + sequence_number = checkpoint_data.get('sequenceNumber', 'N/A') + + print(f"\n파티션 {partition_id}:") + print(f" Offset: {offset}") + print(f" Sequence Number: {sequence_number}") + + except json.JSONDecodeError: + continue + + print() + print("=" * 80) + + # 모든 Consumer Group 조회 + print() + print("-" * 80) + print("📂 모든 Consumer Groups") + print("-" * 80) + + all_blobs = container_client.list_blobs() + consumer_groups = set() + + for blob in all_blobs: + # Consumer Group 이름은 첫 번째 경로 세그먼트 + parts = blob.name.split('/') + if len(parts) > 0: + consumer_groups.add(parts[0]) + + if consumer_groups: + print() + for cg in sorted(consumer_groups): + current = " ⬅️ 현재" if cg == consumer_group else "" + print(f" - {cg}{current}") + else: + print("\nConsumer Group 정보가 없습니다.") + + print() + print("=" * 80) + + +def main(): + """메인 함수""" + try: + asyncio.run(check_active_consumers()) + except Exception as e: + print(f"❌ 에러 발생: {str(e)}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + main() diff --git a/rag/config.yaml b/rag/config.yaml index bf8ffe7..449119f 100644 --- a/rag/config.yaml +++ b/rag/config.yaml @@ -51,7 +51,7 @@ eventhub: # Application Settings app: - name: "Vector DB Service" + name: "RAG Service" version: "1.0.0" debug: true log_level: INFO @@ -84,6 +84,19 @@ related_documents: ttl: 3600 # 1시간 prefix: "doc:" +# RAG 회의록 설정 +rag_minutes: + # 검색 설정 + search: + top_k: 5 + similarity_threshold: 0.7 + + # 캐싱 설정 (full_content 조회 결과 캐싱) + cache: + ttl: 1800 # 30분 (회의록은 변경이 적으므로 짧게 설정) + prefix: "minutes:" + related_ttl: 3600 # 연관 회의록 검색 결과는 1시간 + # 데이터 로딩 data: terms_dir: design/aidata diff --git a/rag/docs/api-related-minutes.md b/rag/docs/api-related-minutes.md new file mode 100644 index 0000000..3f97195 --- /dev/null +++ b/rag/docs/api-related-minutes.md @@ -0,0 +1,291 @@ +# 연관 회의록 조회 API 구현 문서 + +## 📋 개요 + +**준호** (Backend Developer) + +회의록 ID를 기준으로 유사한 연관 회의록을 조회하는 API를 구현했습니다. + +## 🎯 구현 방식: Option A (DB 조회 후 벡터 검색) + +### 선택 이유 +- ✅ **정확도 우선**: full_content 기반 검색으로 높은 정확도 보장 +- ✅ **확장성**: 향후 full_content 기반 추가 기능 확장 용이 +- ✅ **성능 최적화**: Redis 캐싱으로 DB 조회 오버헤드 최소화 + +### 처리 흐름 + +``` +1. 요청 수신 + ↓ +2. Redis 캐시 조회 (연관 회의록 결과) + ├─ HIT → 즉시 반환 + └─ MISS → 3단계로 진행 + ↓ +3. Redis 캐시 조회 (회의록 full_content) + ├─ HIT → 5단계로 진행 + └─ MISS → 4단계로 진행 + ↓ +4. DB 조회 (rag_minutes 테이블) + ├─ full_content 획득 + └─ Redis에 캐싱 (TTL: 30분) + ↓ +5. 벡터 임베딩 생성 (Azure OpenAI) + ↓ +6. 벡터 유사도 검색 (자기 자신 제외) + ↓ +7. 결과 Redis 캐싱 (TTL: 1시간) + ↓ +8. 응답 반환 +``` + +## 🔧 구현 상세 + +### 1. 데이터 모델 추가 + +**파일**: `rag/src/models/minutes.py` + +```python +class RelatedMinutesRequest(BaseModel): + """연관 회의록 조회 요청""" + minute_id: str # 기준 회의록 ID + meeting_title: str # 회의 제목 (현재 미사용) + summary: str # 회의록 요약 (현재 미사용) + top_k: int = 5 # 반환할 최대 결과 수 + similarity_threshold: float = 0.7 # 최소 유사도 임계값 + +class RelatedMinutesResponse(BaseModel): + """연관 회의록 조회 응답""" + minutes: RagMinutes # 회의록 정보 + similarity_score: float # 유사도 점수 (0.0 ~ 1.0) +``` + +### 2. DB 쿼리 함수 개선 + +**파일**: `rag/src/db/rag_minutes_db.py` + +**수정 내용**: `search_by_vector()` 함수에 `exclude_minutes_id` 파라미터 추가 + +```python +def search_by_vector( + self, + query_embedding: List[float], + top_k: int = 5, + similarity_threshold: float = 0.7, + exclude_minutes_id: Optional[str] = None # 자기 자신 제외 +) -> List[Dict[str, Any]]: + """벡터 유사도 검색""" + # WHERE 절에 minutes_id != %s 조건 추가 + # 자기 자신을 결과에서 제외 +``` + +### 3. Redis 캐싱 유틸리티 구현 + +**파일**: `rag/src/utils/redis_cache.py` + +**주요 기능**: +- `get(key)`: 캐시 조회 +- `set(key, value, ttl)`: 캐시 저장 +- `delete(key)`: 캐시 삭제 +- `delete_pattern(pattern)`: 패턴 매칭 일괄 삭제 + +**특징**: +- Redis 연결 실패 시 자동으로 캐싱 비활성화 (서비스 장애 방지) +- JSON 직렬화/역직렬화 자동 처리 + +### 4. 설정 추가 + +**파일**: `rag/config.yaml` + +```yaml +rag_minutes: + search: + top_k: 5 + similarity_threshold: 0.7 + cache: + ttl: 1800 # 회의록 조회 결과: 30분 + prefix: "minutes:" + related_ttl: 3600 # 연관 회의록 검색 결과: 1시간 +``` + +### 5. API 엔드포인트 구현 + +**파일**: `rag/src/api/main.py` + +**엔드포인트**: `POST /api/minutes/related` + +**Request Body**: +```json +{ + "minute_id": "MIN-2025-001", + "meeting_title": "2025 Q1 마케팅 전략 회의", + "summary": "2025년 1분기 마케팅 전략 수립을 위한 회의", + "top_k": 5, + "similarity_threshold": 0.7 +} +``` + +**Response**: +```json +[ + { + "minutes": { + "meeting_id": "MTG-2024-050", + "title": "2024 Q4 마케팅 성과 분석", + "minutes_id": "MIN-2024-050", + "full_content": "...", + ... + }, + "similarity_score": 0.85 + }, + ... +] +``` + +## ⚡ 성능 최적화 + +### 2단계 캐싱 전략 + +1. **회의록 조회 결과 캐싱** (TTL: 30분) + - 키: `minutes:{minute_id}` + - DB 조회 횟수 감소 + +2. **연관 회의록 검색 결과 캐싱** (TTL: 1시간) + - 키: `minutes:related:{minute_id}:{top_k}:{threshold}` + - 벡터 검색 및 임베딩 생성 비용 절감 + +### 예상 성능 개선 + +| 케이스 | 처리 시간 | 비고 | +|--------|----------|------| +| 캐시 MISS (최초 요청) | ~2-3초 | DB 조회 + 임베딩 생성 + 벡터 검색 | +| 회의록 캐시 HIT | ~1-2초 | 임베딩 생성 + 벡터 검색 | +| 연관 회의록 캐시 HIT | ~50ms | 캐시에서 즉시 반환 | + +## 🔍 사용 예시 + +### Python (requests) + +```python +import requests + +url = "http://localhost:8000/api/minutes/related" +data = { + "minute_id": "MIN-2025-001", + "meeting_title": "2025 Q1 마케팅 전략 회의", + "summary": "2025년 1분기 마케팅 전략 수립", + "top_k": 5, + "similarity_threshold": 0.7 +} + +response = requests.post(url, json=data) +related_minutes = response.json() + +for item in related_minutes: + print(f"제목: {item['minutes']['title']}") + print(f"유사도: {item['similarity_score']:.2f}") + print("---") +``` + +### curl + +```bash +curl -X POST "http://localhost:8000/api/minutes/related" \ + -H "Content-Type: application/json" \ + -d '{ + "minute_id": "MIN-2025-001", + "meeting_title": "2025 Q1 마케팅 전략 회의", + "summary": "2025년 1분기 마케팅 전략 수립", + "top_k": 5, + "similarity_threshold": 0.7 + }' +``` + +## 🧪 테스트 시나리오 + +### 1. 정상 케이스 +- ✅ 존재하는 minute_id로 요청 +- ✅ 유사한 회의록 5개 반환 +- ✅ 자기 자신은 결과에서 제외 + +### 2. 캐싱 동작 확인 +- ✅ 최초 요청: 캐시 MISS → DB 조회 +- ✅ 2번째 요청: 캐시 HIT → 즉시 반환 +- ✅ TTL 경과 후: 캐시 MISS → 재조회 + +### 3. 에러 케이스 +- ❌ 존재하지 않는 minute_id → 404 오류 +- ❌ 잘못된 요청 형식 → 422 오류 + +## 📊 데이터베이스 영향 + +### 쿼리 패턴 + +```sql +-- 1. 회의록 조회 (캐시 MISS 시) +SELECT * FROM rag_minutes WHERE minutes_id = 'MIN-2025-001'; + +-- 2. 벡터 유사도 검색 (캐시 MISS 시) +SELECT *, + 1 - (embedding <=> '{vector}'::vector) as similarity_score +FROM rag_minutes +WHERE embedding IS NOT NULL + AND 1 - (embedding <=> '{vector}'::vector) >= 0.7 + AND minutes_id != 'MIN-2025-001' -- 자기 자신 제외 +ORDER BY embedding <=> '{vector}'::vector +LIMIT 5; +``` + +### 인덱스 활용 + +- `minutes_id`: Primary Key 인덱스 활용 +- `embedding`: pgvector 인덱스 활용 (IVFFlat 또는 HNSW) + +## 🔐 보안 고려사항 + +1. **SQL Injection 방지**: psycopg2 파라미터 바인딩 사용 +2. **Rate Limiting**: 향후 추가 권장 (분당 요청 수 제한) +3. **인증/인가**: 향후 JWT 토큰 기반 인증 추가 권장 + +## 🚀 향후 개선 사항 + +### 1. Hybrid 검색 지원 (Option B 통합) +- summary를 활용한 빠른 1차 검색 +- full_content로 정밀한 2차 검색 +- 적응형 임계값 조정 + +### 2. 성능 모니터링 +- 캐시 히트율 추적 +- API 응답 시간 측정 +- 벡터 검색 성능 분석 + +### 3. 검색 품질 개선 +- 시간 가중치 적용 (최근 회의록 우선) +- 부서/팀별 필터링 옵션 +- 주제별 카테고리 분류 + +## 📝 파일 변경 목록 + +| 파일 | 변경 유형 | 설명 | +|------|----------|------| +| `rag/src/models/minutes.py` | 수정 | Request/Response 모델 추가 | +| `rag/src/db/rag_minutes_db.py` | 수정 | exclude_minutes_id 파라미터 추가 | +| `rag/src/utils/redis_cache.py` | 신규 | Redis 캐싱 유틸리티 | +| `rag/src/api/main.py` | 수정 | API 엔드포인트 추가 | +| `rag/config.yaml` | 수정 | rag_minutes 설정 추가 | + +## ✅ 구현 완료 체크리스트 + +- [x] Request/Response 모델 정의 +- [x] DB 쿼리 함수 개선 (자기 자신 제외) +- [x] Redis 캐싱 유틸리티 구현 +- [x] API 엔드포인트 구현 +- [x] 2단계 캐싱 전략 적용 +- [x] 설정 파일 업데이트 +- [x] 문서 작성 + +--- + +**작성자**: 준호 (Backend Developer) +**작성일**: 2025-10-29 +**버전**: 1.0.0 diff --git a/rag/docs/basic_tier_workaround.md b/rag/docs/basic_tier_workaround.md new file mode 100644 index 0000000..2f2c6a6 --- /dev/null +++ b/rag/docs/basic_tier_workaround.md @@ -0,0 +1,356 @@ +# Event Hub Basic Tier 제약사항 및 해결 방법 + +## 현재 상황 + +``` +Event Hub Namespace: hgzero-eventhub-ns +Resource Group: rg-digitalgarage-02 +Location: koreacentral +Tier: Basic ⚠️ +Status: Active +``` + +## Basic Tier 제약사항 + +| 기능 | Basic | Standard | Premium | +|------|-------|----------|---------| +| **Consumer Groups** | **1개만** ($Default) | **최대 20개** | 최대 100개 | +| 파티션 | 최대 32개 | 최대 32개 | 최대 100개 | +| 메시지 보존 | 1일 | 7일 | 90일 | +| Capture | ❌ | ✅ | ✅ | +| 가격 | 낮음 | 중간 | 높음 | + +**Basic Tier에서는 `$Default` Consumer Group만 사용할 수 있습니다.** + +--- + +## 해결 방법 + +### 방법 1: Tier 업그레이드 (권장) ⭐️ + +Event Hub를 Basic → Standard로 업그레이드하면 최대 20개의 Consumer Group 사용 가능 + +#### 업그레이드 방법 + +**Azure Portal:** +1. Event Hub Namespace (hgzero-eventhub-ns) 페이지로 이동 +2. 왼쪽 메뉴에서 "Pricing tier" 클릭 +3. "Standard" 선택 +4. "Apply" 버튼 클릭 +5. 약 1-2분 후 적용 완료 + +**Azure CLI:** +```bash +az eventhubs namespace update \ + --resource-group rg-digitalgarage-02 \ + --name hgzero-eventhub-ns \ + --sku Standard \ + --capacity 1 +``` + +**비용:** +- Basic: ~$0.015/시간 (~$11/월) +- Standard: ~$0.03/시간 (~$22/월) +- 차이: 약 $11/월 추가 + +**장점:** +- ✅ 여러 Consumer Group 사용 가능 +- ✅ 메시지 보존 기간 7일로 증가 +- ✅ Capture 기능 사용 가능 +- ✅ 영구적 해결책 + +**단점:** +- ❌ 비용 증가 (약 2배) +- ❌ 관리자 승인 필요할 수 있음 + +--- + +### 방법 2: $Default Consumer Group 공유 사용 (임시) + +개발/테스트 시 기존 프로덕션 Consumer를 잠시 중지하고 테스트 + +#### 실행 순서 + +**Step 1: 프로덕션 Consumer 중지** +```bash +# 기존 Consumer 프로세스 확인 +ps aux | grep "start_consumer.py" | grep -v grep + +# PID 확인 후 종료 +kill + +# 예시 +kill 51257 +``` + +**Step 2: 테스트 Consumer 실행** +```bash +cd /Users/daewoong/home/workspace/HGZero/rag +python start_consumer.py +``` + +**Step 3: 테스트 완료 후 프로덕션 재시작** +```bash +# Ctrl+C로 테스트 Consumer 종료 + +# 프로덕션 Consumer 재시작 +python start_consumer.py & +``` + +**장점:** +- ✅ 추가 비용 없음 +- ✅ 즉시 테스트 가능 +- ✅ 권한 불필요 + +**단점:** +- ❌ 프로덕션 서비스 중단 +- ❌ 동시 실행 불가 +- ❌ 임시 방법 + +**주의사항:** +- ⚠️ 프로덕션 환경에서만 사용 중이라면 비추천 +- ⚠️ 업무 시간 외에만 테스트 +- ⚠️ 테스트 후 반드시 프로덕션 재시작 확인 + +--- + +### 방법 3: Checkpoint 위치 변경으로 독립 실행 + +동일한 Consumer Group을 사용하되, checkpoint 저장 위치를 다르게 하여 독립적으로 실행 + +#### 구현 방법 + +**Step 1: 별도 Storage Container 생성** + +Azure Portal: +1. Storage Account (hgzerostorage) 접속 +2. "Containers" 클릭 +3. "+ Container" 클릭 +4. 이름: `hgzero-checkpoints-dev` +5. Create + +**Step 2: config_dev.yaml 생성** + +```yaml +# config_dev.yaml +eventhub: + connection_string: ${EVENTHUB_CONNECTION_STRING} + name: ${EVENTHUB_NAME} + consumer_group: "$Default" # 동일 + storage: + connection_string: ${AZURE_STORAGE_CONNECTION_STRING} + container_name: "hgzero-checkpoints-dev" # 다른 컨테이너! + +# 나머지는 config.yaml과 동일 +``` + +**Step 3: 개발용 Consumer 실행 스크립트** + +```python +# start_dev_consumer.py +import asyncio +from pathlib import Path + +from src.utils.config import load_config +from src.db.rag_minutes_db import RagMinutesDB +from src.db.postgres_vector import PostgresVectorDB +from src.utils.embedding import EmbeddingGenerator +from src.services.eventhub_consumer import start_consumer + +async def main(): + # 개발용 설정 파일 로드 + config_path = Path(__file__).parent / "config_dev.yaml" + config = load_config(str(config_path)) + + # ... 나머지는 start_consumer.py와 동일 + +if __name__ == "__main__": + asyncio.run(main()) +``` + +**Step 4: 실행** + +```bash +# Terminal 1: 프로덕션 Consumer +python start_consumer.py + +# Terminal 2: 개발 Consumer (다른 checkpoint) +python start_dev_consumer.py +``` + +**장점:** +- ✅ 동시 실행 가능 +- ✅ 독립적인 checkpoint +- ✅ 추가 비용 거의 없음 (Storage만) +- ✅ Tier 업그레이드 불필요 + +**단점:** +- ❌ 동일 파티션을 두 Consumer가 읽으려 하면 ownership 경쟁 +- ❌ 한쪽만 이벤트 수신 (ownership을 가진 쪽) +- ❌ 완전한 독립 실행은 아님 + +**결론:** +- 이 방법은 **checkpoint만 독립**이고, **여전히 ownership 경쟁** 발생 +- **동시 실행은 불가능** +- 권장하지 않음 ❌ + +--- + +### 방법 4: 로컬 개발 환경에서만 테스트 + +Event Hub 대신 로컬 메시지 큐 사용 + +#### 옵션 A: Azure Event Hub Emulator (권장) + +현재는 공식 Emulator가 없으므로, Kafka나 RabbitMQ로 대체 + +#### 옵션 B: 메모리 큐로 테스트 + +```python +# test_consumer_local.py +import asyncio +import json +from queue import Queue + +# 로컬 메모리 큐 +local_queue = Queue() + +async def test_event_processing(): + """로컬에서 이벤트 처리 로직만 테스트""" + + # 테스트 이벤트 생성 + test_event = { + 'eventType': 'MINUTES_FINALIZED', + 'data': { + 'meetingId': 'test-001', + 'title': '테스트 회의', + 'minutesId': 'minutes-001', + 'sections': [] + } + } + + # Consumer의 _process_minutes_event 로직만 테스트 + from src.services.eventhub_consumer import EventHubConsumer + + # 실제 처리 로직 테스트 + # ... + +asyncio.run(test_event_processing()) +``` + +**장점:** +- ✅ 완전 독립 실행 +- ✅ 무료 +- ✅ 빠른 테스트 + +**단점:** +- ❌ 실제 Event Hub와 다른 환경 +- ❌ 통합 테스트 불가 + +--- + +## 권장 솔루션 + +### 개발 단계별 권장 방법 + +| 단계 | 권장 방법 | 이유 | +|------|----------|------| +| **로컬 개발** | 방법 4 (메모리 큐) | 빠르고 비용 없음 | +| **통합 테스트** | 방법 2 ($Default 공유) | 실제 환경, 단기간 | +| **지속적 개발** | 방법 1 (Tier 업그레이드) | 영구 해결, 동시 실행 | +| **프로덕션** | 방법 1 (Standard Tier) | 고가용성, 확장성 | + +### 최종 권장: Tier 업그레이드 + +**이유:** +1. ✅ 여러 Consumer Group으로 개발/테스트 분리 +2. ✅ 메시지 보존 기간 증가 (1일 → 7일) +3. ✅ 향후 확장 가능 (Capture 등) +4. ✅ 프로덕션 안정성 향상 + +**비용 대비 효과:** +- 월 $11 추가로 개발 생산성 향상 +- 프로덕션 중단 없이 테스트 가능 +- 장기적으로 더 경제적 + +--- + +## 실행 가이드 + +### 즉시 테스트가 필요한 경우 (방법 2) + +```bash +# 1. 프로덕션 Consumer 중지 +ps aux | grep "start_consumer.py" | grep -v grep +kill + +# 2. 테스트 실행 +cd /Users/daewoong/home/workspace/HGZero/rag +python start_consumer.py + +# 3. 테스트 완료 후 +# Ctrl+C로 종료 + +# 4. 프로덕션 재시작 +python start_consumer.py & +``` + +### Tier 업그레이드 후 (방법 1) + +```bash +# 1. Tier 업그레이드 (Azure Portal 또는 CLI) +az eventhubs namespace update \ + --resource-group rg-digitalgarage-02 \ + --name hgzero-eventhub-ns \ + --sku Standard + +# 2. Consumer Group 생성 +az eventhubs eventhub consumer-group create \ + --resource-group rg-digitalgarage-02 \ + --namespace-name hgzero-eventhub-ns \ + --eventhub-name hgzero-eventhub-name \ + --name development + +# 3. config.yaml 수정 +# consumer_group: "development" + +# 4. 프로덕션은 계속 실행, 개발 Consumer 별도 실행 +python start_consumer.py +``` + +--- + +## 요약 + +### 현재 상황 +- ❌ Event Hub Tier: **Basic** +- ❌ Consumer Group: **$Default만 사용 가능** +- ❌ 추가 Consumer Group 생성 불가 + +### 해결책 +1. **단기**: 프로덕션 Consumer 잠시 중지하고 테스트 (방법 2) +2. **장기**: Standard Tier로 업그레이드 (방법 1) ⭐️ + +### 다음 액션 +관리자에게 **Tier 업그레이드** 요청하거나, +긴급하다면 **프로덕션 중지 후 테스트** 진행 + +--- + +## 참고: Tier 비교 + +``` +Basic Tier (현재) +├─ Consumer Groups: 1개 ($Default만) +├─ 보존 기간: 1일 +├─ 비용: ~$11/월 +└─ ❌ 개발/테스트 분리 불가 + +Standard Tier (권장) +├─ Consumer Groups: 최대 20개 ✅ +├─ 보존 기간: 7일 +├─ 비용: ~$22/월 (+$11) +└─ ✅ 개발/테스트 완전 분리 +``` + +Standard Tier로 업그레이드하는 것을 강력히 권장합니다! 🎯 diff --git a/rag/docs/check_consumers_guide.md b/rag/docs/check_consumers_guide.md new file mode 100644 index 0000000..58def93 --- /dev/null +++ b/rag/docs/check_consumers_guide.md @@ -0,0 +1,367 @@ +# Event Hub Active Consumers 조회 가이드 + +## 개요 + +Event Hub를 현재 읽고 있는 Consumer(host)를 확인하는 방법을 설명합니다. + +## 방법 1: 제공된 스크립트 사용 (가장 쉬움) ⭐️ + +### 실행 + +```bash +cd /Users/daewoong/home/workspace/HGZero/rag +python check_active_consumers.py +``` + +### 출력 정보 + +``` +📊 Event Hub Active Consumers +================================================================================ +Consumer Group: $Default +Container: hgzero-checkpoints + +🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer) +-------------------------------------------------------------------------------- +파티션 0: + 상태: 🟢 ACTIVE + Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95... + 마지막 갱신: 2025-10-29 12:35:42 UTC + 경과 시간: 15초 전 + +📋 Consumer 요약 +-------------------------------------------------------------------------------- +Consumer #1 🟢 ACTIVE + Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95 + 소유 파티션: 0 + 파티션 개수: 1 + 마지막 활동: 2025-10-29 12:35:42 UTC + +📍 Checkpoint 정보 (마지막 읽은 위치) +-------------------------------------------------------------------------------- +파티션 0: + Offset: 120259090624 + Sequence Number: 232 +``` + +--- + +## 방법 2: Python 코드로 직접 조회 + +### 스크립트 예시 + +```python +import asyncio +import json +from datetime import datetime, timezone +from pathlib import Path +from azure.storage.blob import BlobServiceClient +from src.utils.config import load_config + + +async def check_consumers(): + """활성 Consumer 조회""" + + # 설정 로드 + config_path = Path('config.yaml') + config = load_config(str(config_path)) + + eventhub_config = config['eventhub'] + storage_conn_str = eventhub_config['storage']['connection_string'] + container_name = eventhub_config['storage']['container_name'] + consumer_group = eventhub_config['consumer_group'] + + # Blob Service Client + blob_service = BlobServiceClient.from_connection_string(storage_conn_str) + container = blob_service.get_container_client(container_name) + + # Ownership 정보 조회 + ownership_prefix = f"{consumer_group}/ownership/" + + print(f"Consumer Group: {consumer_group}\n") + + for blob in container.list_blobs(name_starts_with=ownership_prefix): + blob_client = container.get_blob_client(blob.name) + content = blob_client.download_blob().readall() + + ownership_data = json.loads(content) + partition_id = blob.name.split('/')[-1] + owner_id = ownership_data.get('ownerIdentifier', 'unknown') + + # 활성 상태 확인 (60초 이내 갱신) + now = datetime.now(timezone.utc) + time_diff = (now - blob.last_modified).total_seconds() + is_active = time_diff < 60 + + status = "🟢 ACTIVE" if is_active else "🔴 INACTIVE" + + print(f"파티션 {partition_id}: {status}") + print(f" Owner: {owner_id}") + print(f" 갱신: {int(time_diff)}초 전\n") + + +asyncio.run(check_consumers()) +``` + +--- + +## 방법 3: Azure Portal에서 확인 + +### 단계 + +1. **Azure Portal** 접속 (https://portal.azure.com) +2. **Storage Account** 이동 + - "hgzerostorage" 검색 +3. **Containers** 클릭 + - "hgzero-checkpoints" 선택 +4. **폴더 구조 확인** + ``` + $Default/ + ├── ownership/ + │ └── 0 ← 파티션 0의 소유권 정보 + └── checkpoint/ + └── 0 ← 파티션 0의 checkpoint + ``` +5. **ownership/0 파일 다운로드** 또는 **View** 클릭 +6. **JSON 내용 확인** + ```json + { + "ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95", + "lastModifiedTime": "2025-10-29T12:35:42Z", + ... + } + ``` + +### 정보 해석 + +- **ownerIdentifier**: 현재 Consumer의 고유 ID +- **lastModifiedTime**: 마지막 lease 갱신 시간 + - 60초 이내: 🟢 활성 상태 + - 60초 이상: 🔴 비활성 (Consumer 종료됨) + +--- + +## 방법 4: Azure CLI로 조회 + +### Blob 목록 확인 + +```bash +# Storage Account 키 조회 +az storage account keys list \ + --resource-group rg-digitalgarage-02 \ + --account-name hgzerostorage \ + --query "[0].value" -o tsv + +# Blob 목록 조회 +az storage blob list \ + --account-name hgzerostorage \ + --container-name hgzero-checkpoints \ + --prefix "\$Default/ownership/" \ + --output table +``` + +### Blob 내용 다운로드 + +```bash +# ownership 정보 다운로드 +az storage blob download \ + --account-name hgzerostorage \ + --container-name hgzero-checkpoints \ + --name "\$Default/ownership/0" \ + --file /tmp/ownership.json + +# 내용 확인 +cat /tmp/ownership.json | python3 -m json.tool +``` + +--- + +## 방법 5: 프로세스 레벨에서 확인 + +### 로컬 머신에서 실행 중인 Consumer + +```bash +# Consumer 프로세스 확인 +ps aux | grep "start_consumer.py" | grep -v grep + +# 출력: +# daewoong 81447 0.0 0.2 python start_consumer.py +``` + +### 네트워크 연결 확인 + +```bash +# Event Hub와의 연결 확인 +lsof -i -n | grep 81447 | grep ESTABLISHED + +# 출력: +# python3.1 81447 daewoong 9u IPv6 TCP ... -> ...servicebus.windows.net:https (ESTABLISHED) +``` + +**의미:** +- Consumer 프로세스가 실행 중 +- Event Hub와 HTTPS 연결 유지 중 +- 이벤트 수신 대기 중 + +--- + +## 주요 개념 + +### Ownership (소유권) + +``` +파티션 0 + ↓ +Ownership Blob (Blob Storage) + ↓ +{ + "ownerIdentifier": "consumer-uuid", + "lastModifiedTime": "2025-10-29T12:35:42Z", + "eTag": "...", + ... +} + ↓ +Lease 메커니즘 (30초마다 갱신) + ↓ +갱신 실패 시 → 다른 Consumer가 인수 가능 +``` + +### Checkpoint (읽은 위치) + +``` +파티션 0 + ↓ +Checkpoint Blob + ↓ +{ + "offset": "120259090624", + "sequenceNumber": 232, + ... +} + ↓ +Consumer 재시작 시 → 이 위치부터 재개 +``` + +### Consumer 상태 판단 + +| 조건 | 상태 | 의미 | +|------|------|------| +| lastModifiedTime < 60초 | 🟢 ACTIVE | 정상 동작 중 | +| 60초 < lastModifiedTime < 180초 | 🟡 WARNING | Lease 갱신 지연 | +| lastModifiedTime > 180초 | 🔴 INACTIVE | Consumer 종료 | + +--- + +## 트러블슈팅 + +### 문제 1: Ownership 정보가 없음 + +**증상:** +``` +⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다. +``` + +**원인:** +- Consumer가 한 번도 실행되지 않음 +- Consumer가 ownership claim에 실패함 + +**해결:** +1. Consumer 프로세스 확인: `ps aux | grep start_consumer` +2. Consumer 로그 확인 +3. 권한 확인 (Storage Account 접근 권한) + +### 문제 2: 여러 Owner ID가 나타남 + +**증상:** +``` +Consumer #1: owner-id-1 (파티션 0, 2, 4) +Consumer #2: owner-id-2 (파티션 1, 3, 5) +``` + +**원인:** +- 정상: 여러 Consumer가 파티션을 나눠서 처리 +- 파티션이 여러 개이고 Consumer도 여러 개 + +**확인:** +- 각 파티션이 하나의 Consumer만 소유하면 정상 +- 동일 파티션을 여러 Consumer가 소유하면 문제 + +### 문제 3: lastModified가 오래됨 + +**증상:** +``` +파티션 0: + 상태: 🔴 INACTIVE + 마지막 갱신: 2025-10-29 10:00:00 UTC + 경과 시간: 7200초 전 (2시간) +``` + +**원인:** +- Consumer가 종료됨 +- Consumer가 응답 없음 (hang) +- 네트워크 문제 + +**해결:** +1. Consumer 프로세스 확인 +2. Consumer 재시작 +3. 로그 확인하여 에러 파악 + +--- + +## 실시간 모니터링 + +### 스크립트: 주기적 확인 + +```bash +#!/bin/bash +# monitor_consumers.sh + +while true; do + clear + echo "=== $(date) ===" + python check_active_consumers.py + sleep 30 # 30초마다 갱신 +done +``` + +### 실행 + +```bash +chmod +x monitor_consumers.sh +./monitor_consumers.sh +``` + +--- + +## 요약 + +### 빠른 확인 방법 + +```bash +# 1. 스크립트 실행 (가장 쉬움) +python check_active_consumers.py + +# 2. 프로세스 확인 +ps aux | grep start_consumer + +# 3. Azure Portal에서 Blob 확인 +# Storage Account > Containers > hgzero-checkpoints > $Default/ownership +``` + +### 확인 항목 체크리스트 + +- [ ] Consumer 프로세스 실행 중인가? +- [ ] Ownership 정보가 있는가? +- [ ] lastModified가 60초 이내인가? +- [ ] 각 파티션이 하나의 Consumer만 소유하는가? +- [ ] Checkpoint가 갱신되고 있는가? + +모든 항목이 체크되면 Consumer가 정상 동작 중입니다! ✅ + +--- + +## 참고 파일 + +- 스크립트: `/Users/daewoong/home/workspace/HGZero/rag/check_active_consumers.py` +- 설정: `/Users/daewoong/home/workspace/HGZero/rag/config.yaml` +- Blob Storage: `hgzerostorage/hgzero-checkpoints` diff --git a/rag/docs/consumer_group_setup.md b/rag/docs/consumer_group_setup.md new file mode 100644 index 0000000..03d2bf0 --- /dev/null +++ b/rag/docs/consumer_group_setup.md @@ -0,0 +1,494 @@ +# Consumer Group 생성 및 할당 가이드 + +## 목차 +1. [Consumer Group이란?](#consumer-group이란) +2. [생성 방법](#생성-방법) +3. [코드에서 사용하기](#코드에서-사용하기) +4. [검증 방법](#검증-방법) +5. [트러블슈팅](#트러블슈팅) + +--- + +## Consumer Group이란? + +Consumer Group은 Event Hub의 이벤트를 **독립적으로 읽는 논리적 그룹**입니다. + +### 주요 특징 +- 각 Consumer Group은 독립적인 checkpoint를 유지 +- 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음 +- 기본 Consumer Group: `$Default` (자동 생성됨) + +### 사용 사례 +| Consumer Group | 용도 | 설명 | +|---------------|------|------| +| `$Default` | 프로덕션 | 실제 운영 환경에서 사용 | +| `development` | 개발 | 개발자가 로컬에서 테스트 | +| `test` | 테스트 | QA 테스트 환경 | +| `analytics` | 분석 | 데이터 분석 및 모니터링 | +| `backup` | 백업 | 이벤트 백업 및 아카이빙 | + +--- + +## 생성 방법 + +### 방법 1: Azure Portal (가장 쉬움) ⭐️ + +#### 단계별 가이드 + +**Step 1: Azure Portal 접속** +``` +https://portal.azure.com +``` + +**Step 2: Event Hub Namespace 찾기** +1. 상단 검색창에 "hgzero-eventhub-ns" 입력 +2. "Event Hubs Namespaces" 아래의 결과 클릭 + +**Step 3: Event Hub 선택** +1. 왼쪽 메뉴에서 "Event Hubs" 클릭 +2. "hgzero-eventhub-name" 선택 + +**Step 4: Consumer Groups 관리** +1. 왼쪽 메뉴에서 "Consumer groups" 클릭 +2. 현재 목록에 `$Default`만 있을 것임 + +**Step 5: 새 Consumer Group 생성** +1. 상단의 "+ Consumer group" 버튼 클릭 +2. Name 입력: + - 개발용: `development` + - 테스트용: `test` + - 분석용: `analytics` +3. "Create" 버튼 클릭 + +**Step 6: 생성 확인** +- 목록에 새로운 Consumer Group이 추가되었는지 확인 +- 예: `$Default`, `development`, `test` + +--- + +### 방법 2: Azure CLI + +#### 사전 요구사항 +```bash +# Azure CLI 설치 확인 +az --version + +# 로그인 +az login +``` + +#### Consumer Group 생성 +```bash +# 리소스 그룹 확인 (필요 시) +az eventhubs namespace show \ + --name hgzero-eventhub-ns \ + --query resourceGroup -o tsv + +# Consumer Group 생성 +az eventhubs eventhub consumer-group create \ + --resource-group \ + --namespace-name hgzero-eventhub-ns \ + --eventhub-name hgzero-eventhub-name \ + --name development + +# 생성 확인 +az eventhubs eventhub consumer-group list \ + --resource-group \ + --namespace-name hgzero-eventhub-ns \ + --eventhub-name hgzero-eventhub-name \ + --output table +``` + +#### 출력 예시 +``` +Name ResourceGroup +----------- --------------- +$Default hgzero-rg +development hgzero-rg +test hgzero-rg +``` + +--- + +### 방법 3: Python Management SDK + +#### 설치 +```bash +pip install azure-mgmt-eventhub azure-identity +``` + +#### 코드 +```python +from azure.mgmt.eventhub import EventHubManagementClient +from azure.identity import DefaultAzureCredential + +# 인증 +credential = DefaultAzureCredential() +subscription_id = "" + +# 관리 클라이언트 생성 +mgmt_client = EventHubManagementClient(credential, subscription_id) + +# Consumer Group 생성 +mgmt_client.consumer_groups.create_or_update( + resource_group_name='', + namespace_name='hgzero-eventhub-ns', + event_hub_name='hgzero-eventhub-name', + consumer_group_name='development', + parameters={} # 추가 설정 가능 +) + +print("✅ Consumer Group 'development' 생성 완료") + +# Consumer Group 목록 조회 +consumer_groups = mgmt_client.consumer_groups.list_by_event_hub( + resource_group_name='', + namespace_name='hgzero-eventhub-ns', + event_hub_name='hgzero-eventhub-name' +) + +print("\n현재 Consumer Groups:") +for cg in consumer_groups: + print(f" - {cg.name}") +``` + +--- + +## 코드에서 사용하기 + +### 1. config.yaml 수정 + +#### 기존 설정 +```yaml +eventhub: + connection_string: ${EVENTHUB_CONNECTION_STRING} + name: ${EVENTHUB_NAME} + consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP} # "$Default" + storage: + connection_string: ${AZURE_STORAGE_CONNECTION_STRING} + container_name: ${AZURE_STORAGE_CONTAINER_NAME} +``` + +#### 개발 환경용 설정 +```yaml +eventhub: + connection_string: ${EVENTHUB_CONNECTION_STRING} + name: ${EVENTHUB_NAME} + consumer_group: "development" # 직접 지정 + storage: + connection_string: ${AZURE_STORAGE_CONNECTION_STRING} + container_name: ${AZURE_STORAGE_CONTAINER_NAME} +``` + +### 2. .env 파일 수정 (선택사항) + +#### 환경 변수로 관리하는 경우 +```bash +# .env +EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..." +EVENTHUB_NAME=hgzero-eventhub-name +AZURE_EVENTHUB_CONSUMER_GROUP=development # 변경 +AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..." +AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints +``` + +### 3. Consumer 실행 + +#### 개발 환경 +```bash +# config.yaml의 consumer_group을 "development"로 설정 +cd /Users/daewoong/home/workspace/HGZero/rag +python start_consumer.py +``` + +#### 프로덕션 환경 +```bash +# config.yaml의 consumer_group을 "$Default"로 설정 +python start_consumer.py +``` + +#### 여러 환경 동시 실행 +```bash +# Terminal 1: 프로덕션 Consumer +AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py + +# Terminal 2: 개발 Consumer +AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py + +# Terminal 3: 테스트 Consumer +AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py +``` + +--- + +## 검증 방법 + +### 1. Consumer Group 목록 확인 + +#### Python으로 확인 +```python +import asyncio +from pathlib import Path +from azure.eventhub.aio import EventHubConsumerClient +from src.utils.config import load_config + +async def list_consumer_groups(): + """사용 가능한 Consumer Group 확인""" + config_path = Path('config.yaml') + config = load_config(str(config_path)) + + eventhub_config = config['eventhub'] + + # 여기서는 실제로 Management API를 사용해야 하지만 + # Consumer Client로는 자신이 속한 그룹만 확인 가능 + + print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}") + +asyncio.run(list_consumer_groups()) +``` + +### 2. 다른 Consumer Group으로 테스트 + +```python +import asyncio +import json +from pathlib import Path +from azure.eventhub.aio import EventHubConsumerClient +from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore +from src.utils.config import load_config + +async def test_consumer_group(consumer_group_name): + """특정 Consumer Group으로 이벤트 수신 테스트""" + config_path = Path('config.yaml') + config = load_config(str(config_path)) + + eventhub_config = config['eventhub'] + + # Checkpoint Store + checkpoint_store = BlobCheckpointStore.from_connection_string( + eventhub_config['storage']['connection_string'], + eventhub_config['storage']['container_name'] + ) + + # Consumer Client (Consumer Group 지정) + client = EventHubConsumerClient.from_connection_string( + eventhub_config['connection_string'], + consumer_group=consumer_group_name, # 여기서 지정! + eventhub_name=eventhub_config['name'], + checkpoint_store=checkpoint_store + ) + + print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...") + + event_count = 0 + + async def on_event(partition_context, event): + nonlocal event_count + event_count += 1 + print(f"이벤트 수신: #{event_count}") + await partition_context.update_checkpoint(event) + + async def on_error(partition_context, error): + print(f"에러: {error}") + + try: + async with client: + receive_task = asyncio.create_task( + client.receive( + on_event=on_event, + on_error=on_error, + starting_position="@earliest" + ) + ) + + await asyncio.sleep(10) # 10초 대기 + receive_task.cancel() + + try: + await receive_task + except asyncio.CancelledError: + pass + + print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신") + + except Exception as e: + print(f"❌ 에러 발생: {str(e)}") + +# 테스트 실행 +asyncio.run(test_consumer_group("development")) +``` + +### 3. Checkpoint 저장 위치 확인 + +각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다: + +``` +hgzero-checkpoints (Container) +├─ $Default/ +│ ├─ ownership/ +│ │ └─ 0 (파티션 0의 소유권 정보) +│ └─ checkpoint/ +│ └─ 0 (파티션 0의 checkpoint) +├─ development/ +│ ├─ ownership/ +│ │ └─ 0 +│ └─ checkpoint/ +│ └─ 0 +└─ test/ + ├─ ownership/ + │ └─ 0 + └─ checkpoint/ + └─ 0 +``` + +--- + +## 트러블슈팅 + +### 문제 1: Consumer Group을 찾을 수 없음 + +**증상** +``` +azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found. +``` + +**원인**: Consumer Group이 실제로 생성되지 않음 + +**해결**: +1. Azure Portal에서 Consumer Group 목록 확인 +2. 없으면 생성 +3. 이름 오타 확인 (대소문자 구분) + +### 문제 2: Ownership claim 실패 + +**증상** +``` +EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming. +``` + +**원인**: +- 동일 Consumer Group에 이미 다른 Consumer가 실행 중 +- 파티션보다 Consumer가 많음 + +**해결**: +1. 기존 Consumer 종료 +2. 다른 Consumer Group 사용 +3. 파티션 수 증가 + +### 문제 3: 이벤트를 읽지 못함 + +**증상**: Consumer는 실행되지만 이벤트가 수신되지 않음 + +**원인**: +- Starting position이 최신으로 설정됨 +- 새 이벤트가 없음 + +**해결**: +```python +# config.yaml 또는 코드에서 +await client.receive( + on_event=on_event, + on_error=on_error, + starting_position="@earliest" # 처음부터 읽기 +) +``` + +### 문제 4: Checkpoint가 초기화되지 않음 + +**증상**: 새 Consumer Group인데 이미 읽은 것처럼 동작 + +**원인**: Blob Storage에 이전 checkpoint가 남아있음 + +**해결**: +```bash +# Azure Portal에서 +# Storage Account > Containers > hgzero-checkpoints +# 해당 Consumer Group 폴더 삭제 +``` + +또는 코드로: +```python +from azure.storage.blob import BlobServiceClient + +blob_service = BlobServiceClient.from_connection_string( + AZURE_STORAGE_CONNECTION_STRING +) +container = blob_service.get_container_client("hgzero-checkpoints") + +# 특정 Consumer Group의 checkpoint 삭제 +prefix = "development/" +blobs = container.list_blobs(name_starts_with=prefix) +for blob in blobs: + container.delete_blob(blob.name) + print(f"삭제: {blob.name}") +``` + +--- + +## 권장 설정 + +### 환경별 Consumer Group 전략 + +```yaml +# 프로덕션 +production: + consumer_group: "$Default" + starting_position: "-1" # 최신 이벤트부터 + checkpoint_interval: 30 # 30초마다 checkpoint + +# 개발 +development: + consumer_group: "development" + starting_position: "@earliest" # 처음부터 + checkpoint_interval: 10 # 자주 checkpoint (테스트용) + +# 테스트 +test: + consumer_group: "test" + starting_position: "@earliest" + checkpoint_interval: 5 # 매우 자주 (빠른 테스트) + +# 분석 +analytics: + consumer_group: "analytics" + starting_position: "@earliest" # 모든 데이터 분석 + checkpoint_interval: 60 # 덜 자주 (성능) +``` + +### 실행 스크립트 예시 + +```bash +#!/bin/bash +# start_dev_consumer.sh + +export AZURE_EVENTHUB_CONSUMER_GROUP=development +cd /Users/daewoong/home/workspace/HGZero/rag +python start_consumer.py +``` + +--- + +## 요약 + +### 빠른 시작 체크리스트 + +- [ ] Azure Portal에서 Consumer Group 생성 + - [ ] 이름: `development` + - [ ] Event Hub: `hgzero-eventhub-name` +- [ ] `config.yaml` 수정 + - [ ] `consumer_group: "development"` 설정 +- [ ] Consumer 실행 + - [ ] `python start_consumer.py` +- [ ] 검증 + - [ ] 이벤트 수신 확인 + - [ ] Blob Storage에 checkpoint 생성 확인 + +### Consumer Group 활용 팁 + +1. **프로덕션과 개발 분리**: 항상 다른 Consumer Group 사용 +2. **테스트는 독립적으로**: `test` Consumer Group으로 자유롭게 실험 +3. **Checkpoint 관리**: 필요시 삭제하여 처음부터 재처리 +4. **모니터링**: 각 Consumer Group별 lag 모니터링 +5. **비용 최적화**: 불필요한 Consumer Group은 삭제 + +이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯 diff --git a/rag/eventhub_guide.md b/rag/eventhub_guide.md new file mode 100644 index 0000000..7e911e9 --- /dev/null +++ b/rag/eventhub_guide.md @@ -0,0 +1,378 @@ +# Event Hub Consumer Guide + +## 핵심 개념: Partition Ownership (파티션 소유권) + +Event Hub에서는 **같은 Consumer Group 내에서 하나의 파티션은 동시에 오직 하나의 Consumer만 읽을 수 있습니다**. 이를 "Exclusive Consumer" 패턴이라고 합니다. + +## 왜 이런 제약이 있나요? + +### 1. 순서 보장 (Ordering) + +``` +파티션 0: [이벤트1] → [이벤트2] → [이벤트3] + +❌ 잘못된 경우 (여러 Consumer가 동시 읽기): +Consumer A: 이벤트1 처리 중... (느림) +Consumer B: 이벤트2 처리 완료 ✓ +Consumer C: 이벤트3 처리 완료 ✓ +→ 처리 순서: 2 → 3 → 1 (순서 뒤바뀜!) + +✅ 올바른 경우 (하나의 Consumer만): +Consumer A: 이벤트1 ✓ → 이벤트2 ✓ → 이벤트3 ✓ +→ 처리 순서: 1 → 2 → 3 (순서 보장!) +``` + +### 2. Checkpoint 일관성 + +``` +❌ 여러 Consumer가 각자 checkpoint: +Consumer A: offset 100까지 읽음 → checkpoint 저장 +Consumer B: offset 150까지 읽음 → checkpoint 덮어씀 +Consumer A 재시작 → offset 150부터 읽음 → offset 100~149 누락! + +✅ 하나의 Consumer만: +Consumer A: offset 100 → 110 → 120 → ... (순차적) +재시작 시 → 마지막 checkpoint부터 정확히 재개 +``` + +### 3. 중복 처리 방지 + +``` +❌ 여러 Consumer가 동일 이벤트 읽기: +Consumer A: 주문 이벤트 처리 → 결제 완료 +Consumer B: 동일 주문 이벤트 처리 → 중복 결제! + +✅ 하나의 Consumer만: +Consumer A: 주문 이벤트 처리 → 1번만 결제 ✓ +``` + +## Ownership Claim 메커니즘 + +### 동작 과정 + +``` +Consumer A (PID 51257) - 먼저 시작 + ↓ +Blob Storage에 파티션 0 소유권 요청 + ↓ +✅ 승인 (Owner: A, Lease: 30초) + ↓ +30초마다 Lease 갱신 + ↓ +계속 소유권 유지 + + +Consumer B (테스트) - 나중에 시작 + ↓ +Blob Storage에 파티션 0 소유권 요청 + ↓ +❌ 거부 (이미 A가 소유 중) + ↓ +"hasn't claimed an ownership" 로그 + ↓ +계속 재시도 (대기 상태) +``` + +### Blob Storage에 저장되는 정보 + +```json +{ + "partitionId": "0", + "ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95", + "lastModifiedTime": "2025-10-29T02:17:49Z", + "eTag": "\"0x8DCF7E8F9B3C1A0\"", + "offset": "120259090624", + "sequenceNumber": 232 +} +``` + +## 현재 상황 분석 + +``` +Event Hub: hgzero-eventhub-name +├─ 파티션 수: 1개 (파티션 0) +└─ Consumer Group: $Default + +실행 중: +├─ Consumer A (PID 51257): 파티션 0 소유 ✅ +│ ├─ 이벤트 정상 수신 중 +│ ├─ Lease 주기적 갱신 중 +│ └─ Checkpoint 저장 중 +│ +└─ 테스트 Consumer들: 소유권 없음 ❌ + ├─ 파티션 0 claim 시도 + ├─ 계속 거부당함 + ├─ "hasn't claimed an ownership" 로그 + └─ 이벤트 수신 불가 +``` + +## 해결 방법 + +### Option 1: 기존 Consumer 종료 후 재시작 + +```bash +# 기존 Consumer 종료 +kill 51257 + +# 새로 시작 +cd /Users/daewoong/home/workspace/HGZero/rag +python start_consumer.py +``` + +**장점**: 간단 +**단점**: 다운타임 발생 (Lease 만료까지 최대 30초) + +### Option 2: 다른 Consumer Group 사용 (권장) + +```yaml +# config.yaml +eventhub: + consumer_group: "test-group" # $Default 대신 사용 +``` + +**장점**: +- 기존 Consumer에 영향 없음 +- 독립적으로 모든 이벤트 읽기 가능 +- 개발/테스트에 이상적 + +**단점**: 리소스 추가 사용 + +### Option 3: 파티션 수평 확장 + +``` +Event Hub 파티션 증가: 1개 → 3개 +Consumer 실행: 3개 + +분산: +├─ Consumer A: 파티션 0 +├─ Consumer B: 파티션 1 +└─ Consumer C: 파티션 2 + +→ 병렬 처리로 3배 성능 향상! +``` + +**장점**: 높은 처리량 +**단점**: 비용 증가, 전체 순서는 보장 안 됨 (파티션 내 순서만 보장) + +## Consumer Group 비교 + +``` +┌─────────────────────────────────────────┐ +│ Event Hub: hgzero-eventhub │ +│ 파티션 0: [이벤트들...] │ +└─────────────────────────────────────────┘ + │ + ├─────────────────┬─────────────────┐ + │ │ │ + Consumer Group Consumer Group Consumer Group + "$Default" "analytics" "backup" + │ │ │ + Consumer A Consumer B Consumer C + (RAG 처리) (분석 처리) (백업 처리) + │ │ │ + 각자 독립적으로 동일한 파티션 0의 모든 이벤트 읽음 + 각자 독립적인 Checkpoint 유지 +``` + +## 파티션과 Consumer 수 관계 + +### Case 1: Consumer 1개 +``` +Event Hub: 파티션 3개 (P0, P1, P2) +Consumer Group: $Default + +├─ Consumer A: P0, P1, P2 모두 소유 +└─ 모든 파티션 처리 (순차적) +``` + +### Case 2: Consumer 3개 (이상적) +``` +Event Hub: 파티션 3개 (P0, P1, P2) +Consumer Group: $Default + +├─ Consumer A: P0 소유 +├─ Consumer B: P1 소유 +└─ Consumer C: P2 소유 + +→ 병렬 처리로 최대 성능! +``` + +### Case 3: Consumer 5개 (과잉) +``` +Event Hub: 파티션 3개 (P0, P1, P2) +Consumer Group: $Default + +├─ Consumer A: P0 소유 +├─ Consumer B: P1 소유 +├─ Consumer C: P2 소유 +├─ Consumer D: 소유한 파티션 없음 (대기) +└─ Consumer E: 소유한 파티션 없음 (대기) + +→ D, E는 이벤트를 읽지 못하고 대기만 함 +``` + +## 베스트 프랙티스 + +| 환경 | Consumer 수 | Consumer Group | 파티션 수 | +|------|-------------|----------------|-----------| +| **프로덕션** | = 파티션 수 | production | 처리량에 맞게 | +| **개발** | 1개 | development | 1~2개 | +| **테스트** | 1개 | test | 1개 | +| **분석** | 1개 | analytics | (공유) | + +### 권장 사항 + +1. **프로덕션 환경** + - Consumer 수 = 파티션 수 (1:1 매핑) + - 고가용성을 위해 각 Consumer를 다른 서버에 배치 + - Consumer 수 > 파티션 수로 설정하면 일부는 대기 상태 (Standby) + +2. **개발/테스트 환경** + - 별도 Consumer Group 사용 + - 파티션 1개로 충분 + - 필요시 checkpoint를 초기화하여 처음부터 재처리 + +3. **모니터링** + - Ownership claim 실패 로그 모니터링 + - Lease 갱신 실패 알림 설정 + - Checkpoint lag 모니터링 + +4. **장애 복구** + - Lease timeout 고려 (기본 30초) + - Consumer 장애 시 자동 재분배 (30초 이내) + - Checkpoint로부터 정확한 위치에서 재개 + +## Consumer 프로세스 관리 명령어 + +### 프로세스 확인 + +```bash +# Consumer 프로세스 확인 +ps aux | grep "start_consumer.py" | grep -v grep + +# 상세 정보 (실행시간, CPU, 메모리) +ps -p -o pid,etime,%cpu,%mem,cmd + +# 네트워크 연결 확인 +lsof -i -n | grep + +# 모든 Python 프로세스 확인 +ps aux | grep python | grep -v grep +``` + +### 프로세스 종료 + +```bash +# 정상 종료 (SIGTERM) +kill + +# 강제 종료 (SIGKILL) +kill -9 + +# 이름으로 종료 +pkill -f start_consumer.py +``` + +## 테스트 이벤트 전송 + +```python +from azure.eventhub import EventHubProducerClient, EventData +import json +import os +from dotenv import load_dotenv + +load_dotenv('rag/.env') + +conn_str = os.getenv('EVENTHUB_CONNECTION_STRING') +eventhub_name = os.getenv('EVENTHUB_NAME') + +test_event = { + 'eventType': 'MINUTES_FINALIZED', + 'data': { + 'meetingId': 'test-meeting-001', + 'title': '테스트 회의', + 'minutesId': 'test-minutes-001', + 'sections': [ + { + 'sectionId': 'section-001', + 'type': 'DISCUSSION', + 'title': '논의 사항', + 'content': '테스트 논의 내용입니다.', + 'order': 1, + 'verified': True + } + ] + } +} + +producer = EventHubProducerClient.from_connection_string( + conn_str=conn_str, + eventhub_name=eventhub_name +) + +event_data_batch = producer.create_batch() +event_data_batch.add(EventData(json.dumps(test_event))) + +producer.send_batch(event_data_batch) +print('✅ 테스트 이벤트 전송 완료') + +producer.close() +``` + +## Event Hub 파티션 정보 조회 + +```python +import asyncio +from azure.eventhub.aio import EventHubConsumerClient + +async def check_partitions(): + client = EventHubConsumerClient.from_connection_string( + conn_str=EVENTHUB_CONNECTION_STRING, + consumer_group="$Default", + eventhub_name=EVENTHUB_NAME + ) + + async with client: + partition_ids = await client.get_partition_ids() + print(f"파티션 개수: {len(partition_ids)}") + print(f"파티션 IDs: {partition_ids}") + + for partition_id in partition_ids: + props = await client.get_partition_properties(partition_id) + print(f"\n파티션 {partition_id}:") + print(f" 시퀀스 번호: {props['last_enqueued_sequence_number']}") + print(f" 오프셋: {props['last_enqueued_offset']}") + print(f" 마지막 이벤트 시간: {props['last_enqueued_time_utc']}") + +asyncio.run(check_partitions()) +``` + +## 정리 + +### "에러"가 아니라 "설계된 동작"입니다 + +1. ✅ **정상**: Consumer A가 파티션 소유 → 이벤트 처리 +2. ✅ **정상**: Consumer B가 claim 실패 → 대기 +3. ✅ **정상**: Consumer A 종료 시 → Consumer B가 자동 인수 + +### 이 메커니즘의 장점 + +- 📌 **순서 보장**: 파티션 내 이벤트 순서 유지 +- 📌 **정확히 한 번 처리**: 중복 처리 방지 +- 📌 **자동 장애 복구**: Consumer 장애 시 자동 재분배 +- 📌 **수평 확장**: 파티션 추가로 처리량 증가 + +### 현재 상황 해결 + +**권장**: 다른 Consumer Group을 사용하여 테스트하시는 것이 가장 안전하고 효율적입니다! + +```yaml +# 개발/테스트용 Consumer Group 설정 +eventhub: + consumer_group: "development" # 또는 "test" +``` + +이렇게 하면: +- 기존 프로덕션 Consumer에 영향 없음 +- 독립적으로 모든 이벤트를 처음부터 읽을 수 있음 +- 여러 번 테스트 가능 diff --git a/rag/src/api/__pycache__/main.cpython-311.pyc b/rag/src/api/__pycache__/main.cpython-311.pyc index cd6d2fb..4a523ac 100644 Binary files a/rag/src/api/__pycache__/main.cpython-311.pyc and b/rag/src/api/__pycache__/main.cpython-311.pyc differ diff --git a/rag/src/api/main.py b/rag/src/api/main.py index d6e59da..819ee3c 100644 --- a/rag/src/api/main.py +++ b/rag/src/api/main.py @@ -22,7 +22,9 @@ from ..models.document import ( ) from ..models.minutes import ( MinutesSearchRequest, - MinutesSearchResult + MinutesSearchResult, + RelatedMinutesRequest, + RelatedMinutesResponse ) from ..db.postgres_vector import PostgresVectorDB from ..db.azure_search import AzureAISearchDB @@ -31,6 +33,7 @@ from ..services.claude_service import ClaudeService from ..utils.config import load_config, get_database_url from ..utils.embedding import EmbeddingGenerator from ..utils.text_processor import extract_nouns_as_query +from ..utils.redis_cache import RedisCache # 로깅 설정 logging.basicConfig( @@ -62,6 +65,7 @@ _doc_db = None _rag_minutes_db = None _embedding_gen = None _claude_service = None +_redis_cache = None def get_config(): @@ -139,6 +143,22 @@ def get_claude_service(): return _claude_service +def get_redis_cache(): + """Redis 캐시""" + global _redis_cache + if _redis_cache is None: + config = get_config() + redis_config = config["redis"] + _redis_cache = RedisCache( + host=redis_config["host"], + port=redis_config["port"], + db=redis_config["db"], + password=redis_config.get("password"), + decode_responses=redis_config.get("decode_responses", True) + ) + return _redis_cache + + # ============================================================================ # 용어집 API # ============================================================================ @@ -501,6 +521,123 @@ async def get_minutes_stats(rag_minutes_db: RagMinutesDB = Depends(get_rag_minut raise HTTPException(status_code=500, detail=str(e)) +@app.post("/api/minutes/related", response_model=List[RelatedMinutesResponse]) +async def get_related_minutes( + request: RelatedMinutesRequest, + rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db), + embedding_gen: EmbeddingGenerator = Depends(get_embedding_gen), + redis_cache: RedisCache = Depends(get_redis_cache) +): + """ + 연관 회의록 조회 (Option A: DB 조회 후 벡터 검색 + Redis 캐싱) + + Args: + request: 연관 회의록 조회 요청 + - minute_id: 기준 회의록 ID + - meeting_title: 회의 제목 (미사용) + - summary: 회의록 요약 (미사용) + - top_k: 반환할 최대 결과 수 + - similarity_threshold: 최소 유사도 임계값 + + Returns: + 연관 회의록 리스트 + + Process: + 1. Redis 캐시에서 연관 회의록 결과 조회 + 2. 캐시 MISS 시: + a. minute_id로 rag_minutes 테이블에서 회의록 조회 (캐싱) + b. full_content를 벡터 임베딩으로 변환 + c. 벡터 DB에서 유사도 검색 (자기 자신 제외) + d. 결과를 Redis에 캐싱 + 3. 연관 회의록 목록 반환 + """ + try: + config = get_config() + cache_config = config.get("rag_minutes", {}).get("cache", {}) + cache_prefix = cache_config.get("prefix", "minutes:") + minutes_ttl = cache_config.get("ttl", 1800) + related_ttl = cache_config.get("related_ttl", 3600) + + logger.info(f"연관 회의록 조회 시작: minute_id={request.minute_id}") + + # 1. 캐시 키 생성 + related_cache_key = ( + f"{cache_prefix}related:{request.minute_id}:" + f"{request.top_k}:{request.similarity_threshold}" + ) + + # 2. 캐시 조회 + cached_results = redis_cache.get(related_cache_key) + if cached_results: + logger.info(f"연관 회의록 캐시 HIT: {related_cache_key}") + return [ + RelatedMinutesResponse(**result) + for result in cached_results + ] + + # 3. 캐시 MISS - DB 조회 + logger.info(f"연관 회의록 캐시 MISS: {related_cache_key}") + + # 3-1. 회의록 조회 (캐싱) + minutes_cache_key = f"{cache_prefix}{request.minute_id}" + base_minutes = redis_cache.get(minutes_cache_key) + + if base_minutes: + logger.info(f"회의록 캐시 HIT: {minutes_cache_key}") + # RagMinutes 객체로 변환 + from ..models.minutes import RagMinutes + base_minutes = RagMinutes(**base_minutes) + else: + logger.info(f"회의록 캐시 MISS: {minutes_cache_key}") + base_minutes = rag_minutes_db.get_minutes_by_id(request.minute_id) + if not base_minutes: + raise HTTPException( + status_code=404, + detail=f"회의록을 찾을 수 없습니다: {request.minute_id}" + ) + # 캐시 저장 + redis_cache.set(minutes_cache_key, base_minutes.dict(), minutes_ttl) + + logger.info(f"기준 회의록 조회 완료: {base_minutes.title}") + + # 3-2. full_content를 벡터 임베딩으로 변환 + query_embedding = embedding_gen.generate_embedding(base_minutes.full_content) + logger.info(f"임베딩 생성 완료: {len(query_embedding)}차원") + + # 3-3. 벡터 유사도 검색 (자기 자신 제외) + results = rag_minutes_db.search_by_vector( + query_embedding=query_embedding, + top_k=request.top_k, + similarity_threshold=request.similarity_threshold, + exclude_minutes_id=request.minute_id + ) + + # 4. 응답 형식으로 변환 + related_minutes = [ + RelatedMinutesResponse( + minutes=result["minutes"], + similarity_score=result["similarity_score"] + ) + for result in results + ] + + # 5. 결과 캐싱 + redis_cache.set( + related_cache_key, + [r.dict() for r in related_minutes], + related_ttl + ) + + logger.info(f"연관 회의록 조회 완료: {len(related_minutes)}개 결과") + return related_minutes + + except HTTPException: + raise + except Exception as e: + logger.error(f"연관 회의록 조회 실패: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/rag/src/db/__pycache__/rag_minutes_db.cpython-311.pyc b/rag/src/db/__pycache__/rag_minutes_db.cpython-311.pyc index 2c7cbfe..7cbf029 100644 Binary files a/rag/src/db/__pycache__/rag_minutes_db.cpython-311.pyc and b/rag/src/db/__pycache__/rag_minutes_db.cpython-311.pyc differ diff --git a/rag/src/db/rag_minutes_db.py b/rag/src/db/rag_minutes_db.py index abc27cb..cd00f18 100644 --- a/rag/src/db/rag_minutes_db.py +++ b/rag/src/db/rag_minutes_db.py @@ -92,6 +92,8 @@ class RagMinutesDB: if field in minutes_dict and minutes_dict[field]: if isinstance(minutes_dict[field], datetime): minutes_dict[field] = minutes_dict[field].isoformat() + + minutes_dict.pop("embedding") return RagMinutes(**minutes_dict) @@ -189,7 +191,8 @@ class RagMinutesDB: self, query_embedding: List[float], top_k: int = 5, - similarity_threshold: float = 0.7 + similarity_threshold: float = 0.7, + exclude_minutes_id: Optional[str] = None ) -> List[Dict[str, Any]]: """ 벡터 유사도 검색 @@ -198,27 +201,34 @@ class RagMinutesDB: query_embedding: 쿼리 임베딩 벡터 top_k: 반환할 최대 결과 수 similarity_threshold: 최소 유사도 임계값 + exclude_minutes_id: 제외할 회의록 ID (연관 회의록 검색 시 자기 자신 제외) Returns: 검색 결과 리스트 """ with self.get_connection() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: - cur.execute(""" + # 제외 조건 추가 + exclude_condition = "" + params = [query_embedding, query_embedding, similarity_threshold, query_embedding, top_k] + + if exclude_minutes_id: + exclude_condition = "AND minutes_id != %s" + # 파라미터 순서: 처음 4개는 embedding 검색용, 5번째는 exclude용, 6번째는 limit용 + params = [query_embedding, query_embedding, similarity_threshold, exclude_minutes_id, query_embedding, top_k] + + query = f""" SELECT *, 1 - (embedding <=> %s::vector) as similarity_score FROM rag_minutes WHERE embedding IS NOT NULL AND 1 - (embedding <=> %s::vector) >= %s + {exclude_condition} ORDER BY embedding <=> %s::vector LIMIT %s - """, ( - query_embedding, - query_embedding, - similarity_threshold, - query_embedding, - top_k - )) + """ + + cur.execute(query, params) results = [] for row in cur.fetchall(): @@ -229,7 +239,7 @@ class RagMinutesDB: "similarity_score": float(similarity_score) }) - logger.info(f"벡터 검색 완료: {len(results)}개 결과") + logger.info(f"벡터 검색 완료: {len(results)}개 결과 (exclude: {exclude_minutes_id})") return results def search_by_keyword( diff --git a/rag/src/models/__pycache__/minutes.cpython-311.pyc b/rag/src/models/__pycache__/minutes.cpython-311.pyc index ed05bce..ead0c5d 100644 Binary files a/rag/src/models/__pycache__/minutes.cpython-311.pyc and b/rag/src/models/__pycache__/minutes.cpython-311.pyc differ diff --git a/rag/src/models/minutes.py b/rag/src/models/minutes.py index d3b2ff3..abcd3f9 100644 --- a/rag/src/models/minutes.py +++ b/rag/src/models/minutes.py @@ -106,3 +106,41 @@ class MinutesSearchResult(BaseModel): "similarity_score": 0.92 } } + + +class RelatedMinutesRequest(BaseModel): + """연관 회의록 조회 요청""" + minute_id: str = Field(..., description="기준 회의록 ID") + meeting_title: str = Field(..., description="회의 제목") + summary: str = Field(..., description="회의록 요약") + top_k: int = Field(5, ge=1, le=20, description="반환할 최대 결과 수") + similarity_threshold: float = Field(0.7, ge=0.0, le=1.0, description="최소 유사도 임계값") + + class Config: + json_schema_extra = { + "example": { + "minute_id": "MIN-2025-001", + "meeting_title": "2025 Q1 마케팅 전략 회의", + "summary": "2025년 1분기 마케팅 전략 수립을 위한 회의", + "top_k": 5, + "similarity_threshold": 0.7 + } + } + + +class RelatedMinutesResponse(BaseModel): + """연관 회의록 조회 응답""" + minutes: RagMinutes + similarity_score: float = Field(..., ge=0.0, le=1.0, description="유사도 점수") + + class Config: + json_schema_extra = { + "example": { + "minutes": { + "meeting_id": "MTG-2025-002", + "title": "2024 Q4 마케팅 성과 분석", + "minutes_id": "MIN-2024-050" + }, + "similarity_score": 0.85 + } + } diff --git a/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc b/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc index 8ffff59..9555a26 100644 Binary files a/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc and b/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc differ diff --git a/rag/src/services/eventhub_consumer.py b/rag/src/services/eventhub_consumer.py index 6a6c6ef..e844e25 100644 --- a/rag/src/services/eventhub_consumer.py +++ b/rag/src/services/eventhub_consumer.py @@ -1,6 +1,6 @@ """ Azure Event Hub Consumer 서비스 -회의록 확정 이벤트를 consume하여 RAG 저장소에 저장 +회의록 확정 이벤트 및 세그먼트 생성 이벤트를 consume """ import asyncio import json @@ -13,7 +13,9 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore from ..models.minutes import RagMinutes, MinutesSection from ..db.rag_minutes_db import RagMinutesDB +from ..db.postgres_vector import PostgresVectorDB from ..utils.embedding import EmbeddingGenerator +from ..utils.text_processor import extract_nouns_as_query logger = logging.getLogger(__name__) @@ -29,7 +31,9 @@ class EventHubConsumer: storage_connection_string: str, storage_container_name: str, rag_minutes_db: RagMinutesDB, - embedding_gen: EmbeddingGenerator + embedding_gen: EmbeddingGenerator, + term_db: Optional[PostgresVectorDB] = None, + config: Optional[Dict[str, Any]] = None ): """ 초기화 @@ -42,6 +46,8 @@ class EventHubConsumer: storage_container_name: Checkpoint 저장 컨테이너 이름 rag_minutes_db: RAG Minutes 데이터베이스 embedding_gen: Embedding 생성기 + term_db: 용어집 데이터베이스 (선택) + config: 설정 딕셔너리 (선택) """ self.connection_string = connection_string self.eventhub_name = eventhub_name @@ -50,6 +56,8 @@ class EventHubConsumer: self.storage_container_name = storage_container_name self.rag_minutes_db = rag_minutes_db self.embedding_gen = embedding_gen + self.term_db = term_db + self.config = config or {} self.client: Optional[EventHubConsumerClient] = None self.is_running = False @@ -106,13 +114,18 @@ class EventHubConsumer: event_body = event.body_as_str() event_data = json.loads(event_body) - logger.info(f"이벤트 수신: {event_data.get('eventType', 'unknown')}") - logger.info(f"이벤트 수신: {event_data.get('data', 'unknown')}") + event_type = event_data.get('eventType', 'unknown') + logger.info(f"이벤트 수신: {event_type}") - # 회의록 확정 이벤트 처리 - if event_data.get("eventType") == "MINUTES_FINALIZED": + # 이벤트 타입별 처리 + if event_type == "MINUTES_FINALIZED": + # 회의록 확정 이벤트 await self._process_minutes_event(event_data) + elif event_type == "SegmentCreated": + # 세그먼트 생성 이벤트 - 용어검색 실행 + await self._process_segment_event(event_data) + # Checkpoint 업데이트 await partition_context.update_checkpoint(event) @@ -131,6 +144,110 @@ class EventHubConsumer: """ logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}") + async def _process_segment_event(self, event_data: Dict[str, Any]): + """ + 세그먼트 생성 이벤트 처리 - 용어검색 실행 + + Args: + event_data: 이벤트 데이터 + """ + try: + # 용어집 DB가 없으면 스킵 + if not self.term_db: + logger.debug("용어집 DB가 설정되지 않아 용어검색을 스킵합니다") + return + + # 세그먼트 데이터 추출 + segment_id = event_data.get("segmentId") + text = event_data.get("text", "") + meeting_id = event_data.get("meetingId") + + if not text: + logger.warning(f"세그먼트 {segment_id}에 텍스트가 없습니다") + return + + logger.info(f"세그먼트 용어검색 시작: {segment_id} (회의: {meeting_id})") + logger.info(f"텍스트: {text[:100]}...") + + # 1. 명사 추출하여 검색 쿼리 생성 + search_query = extract_nouns_as_query(text) + logger.info(f"검색 쿼리 변환: '{text[:30]}...' → '{search_query}'") + + # 2. 용어검색 설정 + config = self.config.get("term_glossary", {}) + search_config = config.get("search", {}) + + top_k = search_config.get("top_k", 5) + confidence_threshold = search_config.get("confidence_threshold", 0.7) + keyword_weight = search_config.get("keyword_weight", 0.4) + vector_weight = search_config.get("vector_weight", 0.6) + + # 3. 키워드 검색 + keyword_results = self.term_db.search_by_keyword( + query=search_query, + top_k=top_k, + confidence_threshold=confidence_threshold + ) + + # 4. 벡터 검색 + query_embedding = self.embedding_gen.generate_embedding(search_query) + vector_results = self.term_db.search_by_vector( + query_embedding=query_embedding, + top_k=top_k, + confidence_threshold=confidence_threshold + ) + + # 5. 하이브리드 검색 결과 통합 (RRF) + results = [] + seen_ids = set() + + # 키워드 결과 가중치 적용 + for result in keyword_results: + term_id = result["term"].term_id + if term_id not in seen_ids: + result["relevance_score"] *= keyword_weight + result["match_type"] = "hybrid" + results.append(result) + seen_ids.add(term_id) + + # 벡터 결과 가중치 적용 + for result in vector_results: + term_id = result["term"].term_id + if term_id not in seen_ids: + result["relevance_score"] *= vector_weight + result["match_type"] = "hybrid" + results.append(result) + seen_ids.add(term_id) + else: + # 이미 있는 경우 점수 합산 + for r in results: + if r["term"].term_id == term_id: + r["relevance_score"] += result["relevance_score"] * vector_weight + break + + # 점수 기준 재정렬 + results.sort(key=lambda x: x["relevance_score"], reverse=True) + results = results[:top_k] + + # 6. 검색 결과 로깅 + if results: + logger.info(f"세그먼트 {segment_id} 용어검색 완료: {len(results)}개 발견") + for idx, result in enumerate(results, 1): + term = result["term"] + score = result["relevance_score"] + logger.info( + f" [{idx}] {term.term_name} " + f"(카테고리: {term.category}, 점수: {score:.3f})" + ) + else: + logger.info(f"세그먼트 {segment_id}에서 매칭되는 용어를 찾지 못했습니다") + + # 7. 선택적: 검색 결과를 별도 테이블에 저장하거나 Event Hub로 발행 + # TODO: 필요시 검색 결과를 저장하거나 downstream 서비스로 전달 + + except Exception as e: + logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True) + def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]: """ Java LocalDateTime 배열을 ISO 8601 문자열로 변환 @@ -302,7 +419,8 @@ class EventHubConsumer: async def start_consumer( config: Dict[str, Any], rag_minutes_db: RagMinutesDB, - embedding_gen: EmbeddingGenerator + embedding_gen: EmbeddingGenerator, + term_db: Optional[PostgresVectorDB] = None ): """ Event Hub Consumer 시작 (비동기) @@ -311,6 +429,7 @@ async def start_consumer( config: 설정 딕셔너리 rag_minutes_db: RAG Minutes 데이터베이스 embedding_gen: Embedding 생성기 + term_db: 용어집 데이터베이스 (선택) """ eventhub_config = config["eventhub"] @@ -321,7 +440,9 @@ async def start_consumer( storage_connection_string=eventhub_config["storage"]["connection_string"], storage_container_name=eventhub_config["storage"]["container_name"], rag_minutes_db=rag_minutes_db, - embedding_gen=embedding_gen + embedding_gen=embedding_gen, + term_db=term_db, + config=config ) try: diff --git a/rag/src/utils/__pycache__/redis_cache.cpython-311.pyc b/rag/src/utils/__pycache__/redis_cache.cpython-311.pyc new file mode 100644 index 0000000..128904e Binary files /dev/null and b/rag/src/utils/__pycache__/redis_cache.cpython-311.pyc differ diff --git a/rag/src/utils/redis_cache.py b/rag/src/utils/redis_cache.py new file mode 100644 index 0000000..ff0ffc8 --- /dev/null +++ b/rag/src/utils/redis_cache.py @@ -0,0 +1,206 @@ +""" +Redis 캐싱 유틸리티 +""" +import redis +import json +import logging +from typing import Optional, Any +from functools import wraps + +logger = logging.getLogger(__name__) + + +class RedisCache: + """Redis 캐싱 클래스""" + + def __init__( + self, + host: str = "localhost", + port: int = 6379, + db: int = 0, + password: Optional[str] = None, + decode_responses: bool = True + ): + """ + 초기화 + + Args: + host: Redis 호스트 + port: Redis 포트 + db: Redis DB 번호 + password: Redis 비밀번호 + decode_responses: 응답 디코딩 여부 + """ + try: + self.client = redis.Redis( + host=host, + port=port, + db=db, + password=password, + decode_responses=decode_responses + ) + # 연결 테스트 + self.client.ping() + logger.info(f"Redis 연결 성공: {host}:{port}") + except Exception as e: + logger.warning(f"Redis 연결 실패: {str(e)} - 캐싱 비활성화") + self.client = None + + def get(self, key: str) -> Optional[Any]: + """ + 캐시에서 값 조회 + + Args: + key: 캐시 키 + + Returns: + 캐시된 값 또는 None + """ + if not self.client: + return None + + try: + value = self.client.get(key) + if value: + logger.debug(f"캐시 HIT: {key}") + return json.loads(value) + logger.debug(f"캐시 MISS: {key}") + return None + except Exception as e: + logger.error(f"캐시 조회 실패 ({key}): {str(e)}") + return None + + def set(self, key: str, value: Any, ttl: int = 3600) -> bool: + """ + 캐시에 값 저장 + + Args: + key: 캐시 키 + value: 저장할 값 + ttl: 만료 시간 (초) + + Returns: + 성공 여부 + """ + if not self.client: + return False + + try: + serialized = json.dumps(value, ensure_ascii=False) + self.client.setex(key, ttl, serialized) + logger.debug(f"캐시 저장: {key} (TTL: {ttl}s)") + return True + except Exception as e: + logger.error(f"캐시 저장 실패 ({key}): {str(e)}") + return False + + def delete(self, key: str) -> bool: + """ + 캐시 삭제 + + Args: + key: 캐시 키 + + Returns: + 성공 여부 + """ + if not self.client: + return False + + try: + self.client.delete(key) + logger.debug(f"캐시 삭제: {key}") + return True + except Exception as e: + logger.error(f"캐시 삭제 실패 ({key}): {str(e)}") + return False + + def delete_pattern(self, pattern: str) -> int: + """ + 패턴 매칭으로 여러 캐시 삭제 + + Args: + pattern: 캐시 키 패턴 (예: "minutes:*") + + Returns: + 삭제된 키 개수 + """ + if not self.client: + return 0 + + try: + keys = self.client.keys(pattern) + if keys: + count = self.client.delete(*keys) + logger.info(f"캐시 일괄 삭제: {count}개 키 ({pattern})") + return count + return 0 + except Exception as e: + logger.error(f"캐시 패턴 삭제 실패 ({pattern}): {str(e)}") + return 0 + + def exists(self, key: str) -> bool: + """ + 캐시 존재 여부 확인 + + Args: + key: 캐시 키 + + Returns: + 존재 여부 + """ + if not self.client: + return False + + try: + return self.client.exists(key) > 0 + except Exception as e: + logger.error(f"캐시 존재 확인 실패 ({key}): {str(e)}") + return False + + +def cached(prefix: str, ttl: int = 3600, key_builder=None): + """ + 함수 결과 캐싱 데코레이터 + + Args: + prefix: 캐시 키 prefix + ttl: 만료 시간 (초) + key_builder: 캐시 키 생성 함수 (선택사항) + + Example: + @cached(prefix="minutes:", ttl=1800) + def get_minutes_by_id(minutes_id: str): + ... + """ + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + # Redis 캐시 인스턴스 확인 + cache = getattr(self, '_cache', None) + if not cache or not cache.client: + return func(self, *args, **kwargs) + + # 캐시 키 생성 + if key_builder: + cache_key = key_builder(*args, **kwargs) + else: + # 기본: 첫 번째 인자를 키로 사용 + cache_key = f"{prefix}{args[0] if args else ''}" + + # 캐시 조회 + cached_value = cache.get(cache_key) + if cached_value is not None: + return cached_value + + # 함수 실행 + result = func(self, *args, **kwargs) + + # 결과 캐싱 + if result is not None: + cache.set(cache_key, result, ttl) + + return result + + return wrapper + return decorator diff --git a/rag/start_all.sh b/rag/start_all.sh new file mode 100644 index 0000000..d4c9d11 --- /dev/null +++ b/rag/start_all.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# RAG 서비스 - API 서버와 Event Hub Consumer 동시 실행 스크립트 + +set -e # 에러 발생 시 스크립트 종료 + +echo "==========================================" +echo "RAG 서비스 시작" +echo "==========================================" + +# 로그 디렉토리 생성 +mkdir -p logs + +# Event Hub Consumer를 백그라운드로 실행 +echo "[1/2] Event Hub Consumer 시작..." +python start_consumer.py > logs/consumer.log 2>&1 & +CONSUMER_PID=$! +echo "Consumer PID: $CONSUMER_PID" + +# API 서버 시작 (포그라운드) +echo "[2/2] REST API 서버 시작..." +python -m uvicorn src.api.main:app --host 0.0.0.0 --port 8000 & +API_PID=$! +echo "API Server PID: $API_PID" + +# PID 파일 저장 +echo $CONSUMER_PID > logs/consumer.pid +echo $API_PID > logs/api.pid + +echo "==========================================" +echo "RAG 서비스 시작 완료" +echo " - API Server: http://0.0.0.0:8000" +echo " - Consumer PID: $CONSUMER_PID" +echo " - API PID: $API_PID" +echo "==========================================" + +# 종료 시그널 처리 (graceful shutdown) +trap "echo 'Shutting down...'; kill $CONSUMER_PID $API_PID; exit 0" SIGTERM SIGINT + +# 두 프로세스 모두 실행 중인지 모니터링 +while kill -0 $CONSUMER_PID 2>/dev/null && kill -0 $API_PID 2>/dev/null; do + sleep 5 +done + +# 하나라도 종료되면 모두 종료 +echo "One of the processes stopped. Shutting down all..." +kill $CONSUMER_PID $API_PID 2>/dev/null || true +wait diff --git a/rag/start_all_services.py b/rag/start_all_services.py new file mode 100644 index 0000000..56e0887 --- /dev/null +++ b/rag/start_all_services.py @@ -0,0 +1,180 @@ +""" +RAG 서비스 통합 실행 스크립트 +API 서버와 Event Hub Consumer를 동시에 실행 +""" +import asyncio +import logging +import multiprocessing +import signal +import sys +import time +from pathlib import Path + +import uvicorn + +from src.utils.config import load_config, get_database_url +from src.db.rag_minutes_db import RagMinutesDB +from src.db.postgres_vector import PostgresVectorDB +from src.utils.embedding import EmbeddingGenerator +from src.services.eventhub_consumer import start_consumer + +# 로깅 설정 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +def run_api_server(): + """ + REST API 서버 실행 (별도 프로세스) + """ + try: + logger.info("=" * 50) + logger.info("REST API 서버 시작") + logger.info("=" * 50) + + uvicorn.run( + "src.api.main:app", + host="0.0.0.0", + port=8000, + log_level="info", + access_log=True + ) + except Exception as e: + logger.error(f"API 서버 실행 실패: {str(e)}") + sys.exit(1) + + +def run_event_consumer(): + """ + Event Hub Consumer 실행 (별도 프로세스) + """ + try: + logger.info("=" * 50) + logger.info("Event Hub Consumer 시작") + logger.info("=" * 50) + + # 설정 로드 + config_path = Path(__file__).parent / "config.yaml" + config = load_config(str(config_path)) + + # 데이터베이스 연결 + db_url = get_database_url(config) + rag_minutes_db = RagMinutesDB(db_url) + logger.info("RAG Minutes DB 연결 완료") + + # 용어집 데이터베이스 연결 + term_db = PostgresVectorDB(db_url) + logger.info("용어집 DB 연결 완료") + + # Embedding 생성기 초기화 + azure_openai = config["azure_openai"] + embedding_gen = EmbeddingGenerator( + api_key=azure_openai["api_key"], + endpoint=azure_openai["endpoint"], + model=azure_openai["embedding_model"], + dimension=azure_openai["embedding_dimension"], + api_version=azure_openai["api_version"] + ) + logger.info("Embedding 생성기 초기화 완료") + + # Event Hub Consumer 시작 + asyncio.run(start_consumer(config, rag_minutes_db, embedding_gen, term_db)) + + except KeyboardInterrupt: + logger.info("Consumer 종료 신호 수신") + except Exception as e: + logger.error(f"Consumer 실행 실패: {str(e)}") + sys.exit(1) + + +def main(): + """ + 메인 함수: 두 프로세스를 생성하고 관리 + """ + logger.info("=" * 60) + logger.info("RAG 서비스 통합 시작") + logger.info(" - REST API 서버: http://0.0.0.0:8000") + logger.info(" - Event Hub Consumer: Background") + logger.info("=" * 60) + + # 프로세스 생성 + api_process = multiprocessing.Process( + target=run_api_server, + name="API-Server" + ) + consumer_process = multiprocessing.Process( + target=run_event_consumer, + name="Event-Consumer" + ) + + # 종료 시그널 핸들러 + def signal_handler(signum, frame): + logger.info("\n종료 신호 수신. 프로세스 종료 중...") + + if api_process.is_alive(): + logger.info("API 서버 종료 중...") + api_process.terminate() + api_process.join(timeout=5) + if api_process.is_alive(): + api_process.kill() + + if consumer_process.is_alive(): + logger.info("Consumer 종료 중...") + consumer_process.terminate() + consumer_process.join(timeout=5) + if consumer_process.is_alive(): + consumer_process.kill() + + logger.info("모든 프로세스 종료 완료") + sys.exit(0) + + # 시그널 핸들러 등록 + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + try: + # 프로세스 시작 + api_process.start() + time.sleep(2) # API 서버 시작 대기 + + consumer_process.start() + time.sleep(2) # Consumer 시작 대기 + + logger.info("=" * 60) + logger.info("모든 서비스 시작 완료") + logger.info(f" - API Server PID: {api_process.pid}") + logger.info(f" - Consumer PID: {consumer_process.pid}") + logger.info("=" * 60) + + # 프로세스 모니터링 + while True: + if not api_process.is_alive(): + logger.error("API 서버 프로세스 종료됨") + consumer_process.terminate() + break + + if not consumer_process.is_alive(): + logger.error("Consumer 프로세스 종료됨") + api_process.terminate() + break + + time.sleep(5) + + # 대기 + api_process.join() + consumer_process.join() + + except Exception as e: + logger.error(f"서비스 실행 중 에러: {str(e)}") + api_process.terminate() + consumer_process.terminate() + sys.exit(1) + + +if __name__ == "__main__": + # multiprocessing을 위한 설정 + multiprocessing.set_start_method('spawn', force=True) + main() diff --git a/rag/start_consumer.py b/rag/start_consumer.py index 7d47e83..7d35fbd 100644 --- a/rag/start_consumer.py +++ b/rag/start_consumer.py @@ -4,6 +4,7 @@ from pathlib import Path from src.utils.config import load_config, get_database_url from src.db.rag_minutes_db import RagMinutesDB +from src.db.postgres_vector import PostgresVectorDB from src.utils.embedding import EmbeddingGenerator from src.services.eventhub_consumer import start_consumer @@ -28,8 +29,11 @@ async def main(): # 데이터베이스 연결 db_url = get_database_url(config) rag_minutes_db = RagMinutesDB(db_url) + logger.info("RAG Minutes DB 연결 완료") - logger.info("데이터베이스 연결 완료") + # 용어집 데이터베이스 연결 + term_db = PostgresVectorDB(db_url) + logger.info("용어집 DB 연결 완료") # Embedding 생성기 초기화 azure_openai = config["azure_openai"] @@ -45,7 +49,7 @@ async def main(): # Event Hub Consumer 시작 logger.info("Event Hub Consumer 시작...") - await start_consumer(config, rag_minutes_db, embedding_gen) + await start_consumer(config, rag_minutes_db, embedding_gen, term_db) except KeyboardInterrupt: logger.info("프로그램 종료")