This commit is contained in:
Minseo-Jo 2025-10-29 16:02:01 +09:00
commit 5c32362278
23 changed files with 13407 additions and 32 deletions

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ build/*/*/*
**/.gradle/ **/.gradle/
.vscode/ .vscode/
**/.vscode/ **/.vscode/
rag/venv/*
# Serena # Serena
serena/ serena/

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,191 @@
"""
Event Hub의 활성 Consumer(host) 조회 스크립트
Blob Storage의 ownership 정보를 통해 현재 어떤 Consumer가
어떤 파티션을 읽고 있는지 확인합니다.
"""
import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from src.utils.config import load_config
async def check_active_consumers():
"""활성 Consumer 조회"""
# 설정 로드
config_path = Path(__file__).parent / "config.yaml"
config = load_config(str(config_path))
eventhub_config = config['eventhub']
storage_conn_str = eventhub_config['storage']['connection_string']
container_name = eventhub_config['storage']['container_name']
consumer_group = eventhub_config['consumer_group']
print("=" * 80)
print("📊 Event Hub Active Consumers")
print("=" * 80)
print(f"Consumer Group: {consumer_group}")
print(f"Container: {container_name}")
print()
# Blob Service Client 생성
blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
container_client = blob_service.get_container_client(container_name)
# Ownership blob 조회 (파티션 소유권 정보)
ownership_prefix = f"{consumer_group}/ownership/"
checkpoint_prefix = f"{consumer_group}/checkpoint/"
print("-" * 80)
print("🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)")
print("-" * 80)
ownership_blobs = container_client.list_blobs(name_starts_with=ownership_prefix)
active_consumers = {}
partition_owners = {}
for blob in ownership_blobs:
# Blob 내용 읽기
blob_client = container_client.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
try:
ownership_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
owner_id = ownership_data.get('ownerIdentifier', 'unknown')
last_modified = blob.last_modified
# 현재 시간과 비교하여 활성 상태 확인 (30초 이내 갱신되었으면 활성)
now = datetime.now(timezone.utc)
time_diff = (now - last_modified).total_seconds()
is_active = time_diff < 60 # 60초 이내면 활성으로 간주
partition_owners[partition_id] = {
'owner_id': owner_id,
'last_modified': last_modified,
'is_active': is_active,
'time_diff': time_diff
}
# Consumer별 집계
if owner_id not in active_consumers:
active_consumers[owner_id] = {
'partitions': [],
'last_seen': last_modified,
'is_active': is_active
}
active_consumers[owner_id]['partitions'].append(partition_id)
# 가장 최근 시간으로 업데이트
if last_modified > active_consumers[owner_id]['last_seen']:
active_consumers[owner_id]['last_seen'] = last_modified
active_consumers[owner_id]['is_active'] = is_active
except json.JSONDecodeError:
print(f"⚠️ 파티션 {partition_id}: 파싱 실패")
continue
# 파티션별 출력
if partition_owners:
print()
for partition_id, info in sorted(partition_owners.items()):
status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE"
print(f"파티션 {partition_id}:")
print(f" 상태: {status}")
print(f" Owner ID: {info['owner_id'][:40]}...")
print(f" 마지막 갱신: {info['last_modified'].strftime('%Y-%m-%d %H:%M:%S')} UTC")
print(f" 경과 시간: {int(info['time_diff'])}초 전")
print()
else:
print("⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.")
print()
# Consumer별 요약
print("-" * 80)
print("📋 Consumer 요약")
print("-" * 80)
if active_consumers:
for idx, (owner_id, info) in enumerate(active_consumers.items(), 1):
status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE"
print(f"\nConsumer #{idx} {status}")
print(f" Owner ID: {owner_id}")
print(f" 소유 파티션: {', '.join(info['partitions'])}")
print(f" 파티션 개수: {len(info['partitions'])}")
print(f" 마지막 활동: {info['last_seen'].strftime('%Y-%m-%d %H:%M:%S')} UTC")
else:
print("\n활성 Consumer가 없습니다.")
# Checkpoint 정보 조회
print()
print("-" * 80)
print("📍 Checkpoint 정보 (마지막 읽은 위치)")
print("-" * 80)
checkpoint_blobs = container_client.list_blobs(name_starts_with=checkpoint_prefix)
for blob in checkpoint_blobs:
blob_client = container_client.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
try:
checkpoint_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
offset = checkpoint_data.get('offset', 'N/A')
sequence_number = checkpoint_data.get('sequenceNumber', 'N/A')
print(f"\n파티션 {partition_id}:")
print(f" Offset: {offset}")
print(f" Sequence Number: {sequence_number}")
except json.JSONDecodeError:
continue
print()
print("=" * 80)
# 모든 Consumer Group 조회
print()
print("-" * 80)
print("📂 모든 Consumer Groups")
print("-" * 80)
all_blobs = container_client.list_blobs()
consumer_groups = set()
for blob in all_blobs:
# Consumer Group 이름은 첫 번째 경로 세그먼트
parts = blob.name.split('/')
if len(parts) > 0:
consumer_groups.add(parts[0])
if consumer_groups:
print()
for cg in sorted(consumer_groups):
current = " ⬅️ 현재" if cg == consumer_group else ""
print(f" - {cg}{current}")
else:
print("\nConsumer Group 정보가 없습니다.")
print()
print("=" * 80)
def main():
"""메인 함수"""
try:
asyncio.run(check_active_consumers())
except Exception as e:
print(f"❌ 에러 발생: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@ -51,7 +51,7 @@ eventhub:
# Application Settings # Application Settings
app: app:
name: "Vector DB Service" name: "RAG Service"
version: "1.0.0" version: "1.0.0"
debug: true debug: true
log_level: INFO log_level: INFO
@ -84,6 +84,19 @@ related_documents:
ttl: 3600 # 1시간 ttl: 3600 # 1시간
prefix: "doc:" prefix: "doc:"
# RAG 회의록 설정
rag_minutes:
# 검색 설정
search:
top_k: 5
similarity_threshold: 0.7
# 캐싱 설정 (full_content 조회 결과 캐싱)
cache:
ttl: 1800 # 30분 (회의록은 변경이 적으므로 짧게 설정)
prefix: "minutes:"
related_ttl: 3600 # 연관 회의록 검색 결과는 1시간
# 데이터 로딩 # 데이터 로딩
data: data:
terms_dir: design/aidata terms_dir: design/aidata

View File

@ -0,0 +1,291 @@
# 연관 회의록 조회 API 구현 문서
## 📋 개요
**준호** (Backend Developer)
회의록 ID를 기준으로 유사한 연관 회의록을 조회하는 API를 구현했습니다.
## 🎯 구현 방식: Option A (DB 조회 후 벡터 검색)
### 선택 이유
- ✅ **정확도 우선**: full_content 기반 검색으로 높은 정확도 보장
- ✅ **확장성**: 향후 full_content 기반 추가 기능 확장 용이
- ✅ **성능 최적화**: Redis 캐싱으로 DB 조회 오버헤드 최소화
### 처리 흐름
```
1. 요청 수신
2. Redis 캐시 조회 (연관 회의록 결과)
├─ HIT → 즉시 반환
└─ MISS → 3단계로 진행
3. Redis 캐시 조회 (회의록 full_content)
├─ HIT → 5단계로 진행
└─ MISS → 4단계로 진행
4. DB 조회 (rag_minutes 테이블)
├─ full_content 획득
└─ Redis에 캐싱 (TTL: 30분)
5. 벡터 임베딩 생성 (Azure OpenAI)
6. 벡터 유사도 검색 (자기 자신 제외)
7. 결과 Redis 캐싱 (TTL: 1시간)
8. 응답 반환
```
## 🔧 구현 상세
### 1. 데이터 모델 추가
**파일**: `rag/src/models/minutes.py`
```python
class RelatedMinutesRequest(BaseModel):
"""연관 회의록 조회 요청"""
minute_id: str # 기준 회의록 ID
meeting_title: str # 회의 제목 (현재 미사용)
summary: str # 회의록 요약 (현재 미사용)
top_k: int = 5 # 반환할 최대 결과 수
similarity_threshold: float = 0.7 # 최소 유사도 임계값
class RelatedMinutesResponse(BaseModel):
"""연관 회의록 조회 응답"""
minutes: RagMinutes # 회의록 정보
similarity_score: float # 유사도 점수 (0.0 ~ 1.0)
```
### 2. DB 쿼리 함수 개선
**파일**: `rag/src/db/rag_minutes_db.py`
**수정 내용**: `search_by_vector()` 함수에 `exclude_minutes_id` 파라미터 추가
```python
def search_by_vector(
self,
query_embedding: List[float],
top_k: int = 5,
similarity_threshold: float = 0.7,
exclude_minutes_id: Optional[str] = None # 자기 자신 제외
) -> List[Dict[str, Any]]:
"""벡터 유사도 검색"""
# WHERE 절에 minutes_id != %s 조건 추가
# 자기 자신을 결과에서 제외
```
### 3. Redis 캐싱 유틸리티 구현
**파일**: `rag/src/utils/redis_cache.py`
**주요 기능**:
- `get(key)`: 캐시 조회
- `set(key, value, ttl)`: 캐시 저장
- `delete(key)`: 캐시 삭제
- `delete_pattern(pattern)`: 패턴 매칭 일괄 삭제
**특징**:
- Redis 연결 실패 시 자동으로 캐싱 비활성화 (서비스 장애 방지)
- JSON 직렬화/역직렬화 자동 처리
### 4. 설정 추가
**파일**: `rag/config.yaml`
```yaml
rag_minutes:
search:
top_k: 5
similarity_threshold: 0.7
cache:
ttl: 1800 # 회의록 조회 결과: 30분
prefix: "minutes:"
related_ttl: 3600 # 연관 회의록 검색 결과: 1시간
```
### 5. API 엔드포인트 구현
**파일**: `rag/src/api/main.py`
**엔드포인트**: `POST /api/minutes/related`
**Request Body**:
```json
{
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립을 위한 회의",
"top_k": 5,
"similarity_threshold": 0.7
}
```
**Response**:
```json
[
{
"minutes": {
"meeting_id": "MTG-2024-050",
"title": "2024 Q4 마케팅 성과 분석",
"minutes_id": "MIN-2024-050",
"full_content": "...",
...
},
"similarity_score": 0.85
},
...
]
```
## ⚡ 성능 최적화
### 2단계 캐싱 전략
1. **회의록 조회 결과 캐싱** (TTL: 30분)
- 키: `minutes:{minute_id}`
- DB 조회 횟수 감소
2. **연관 회의록 검색 결과 캐싱** (TTL: 1시간)
- 키: `minutes:related:{minute_id}:{top_k}:{threshold}`
- 벡터 검색 및 임베딩 생성 비용 절감
### 예상 성능 개선
| 케이스 | 처리 시간 | 비고 |
|--------|----------|------|
| 캐시 MISS (최초 요청) | ~2-3초 | DB 조회 + 임베딩 생성 + 벡터 검색 |
| 회의록 캐시 HIT | ~1-2초 | 임베딩 생성 + 벡터 검색 |
| 연관 회의록 캐시 HIT | ~50ms | 캐시에서 즉시 반환 |
## 🔍 사용 예시
### Python (requests)
```python
import requests
url = "http://localhost:8000/api/minutes/related"
data = {
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립",
"top_k": 5,
"similarity_threshold": 0.7
}
response = requests.post(url, json=data)
related_minutes = response.json()
for item in related_minutes:
print(f"제목: {item['minutes']['title']}")
print(f"유사도: {item['similarity_score']:.2f}")
print("---")
```
### curl
```bash
curl -X POST "http://localhost:8000/api/minutes/related" \
-H "Content-Type: application/json" \
-d '{
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립",
"top_k": 5,
"similarity_threshold": 0.7
}'
```
## 🧪 테스트 시나리오
### 1. 정상 케이스
- ✅ 존재하는 minute_id로 요청
- ✅ 유사한 회의록 5개 반환
- ✅ 자기 자신은 결과에서 제외
### 2. 캐싱 동작 확인
- ✅ 최초 요청: 캐시 MISS → DB 조회
- ✅ 2번째 요청: 캐시 HIT → 즉시 반환
- ✅ TTL 경과 후: 캐시 MISS → 재조회
### 3. 에러 케이스
- ❌ 존재하지 않는 minute_id → 404 오류
- ❌ 잘못된 요청 형식 → 422 오류
## 📊 데이터베이스 영향
### 쿼리 패턴
```sql
-- 1. 회의록 조회 (캐시 MISS 시)
SELECT * FROM rag_minutes WHERE minutes_id = 'MIN-2025-001';
-- 2. 벡터 유사도 검색 (캐시 MISS 시)
SELECT *,
1 - (embedding <=> '{vector}'::vector) as similarity_score
FROM rag_minutes
WHERE embedding IS NOT NULL
AND 1 - (embedding <=> '{vector}'::vector) >= 0.7
AND minutes_id != 'MIN-2025-001' -- 자기 자신 제외
ORDER BY embedding <=> '{vector}'::vector
LIMIT 5;
```
### 인덱스 활용
- `minutes_id`: Primary Key 인덱스 활용
- `embedding`: pgvector 인덱스 활용 (IVFFlat 또는 HNSW)
## 🔐 보안 고려사항
1. **SQL Injection 방지**: psycopg2 파라미터 바인딩 사용
2. **Rate Limiting**: 향후 추가 권장 (분당 요청 수 제한)
3. **인증/인가**: 향후 JWT 토큰 기반 인증 추가 권장
## 🚀 향후 개선 사항
### 1. Hybrid 검색 지원 (Option B 통합)
- summary를 활용한 빠른 1차 검색
- full_content로 정밀한 2차 검색
- 적응형 임계값 조정
### 2. 성능 모니터링
- 캐시 히트율 추적
- API 응답 시간 측정
- 벡터 검색 성능 분석
### 3. 검색 품질 개선
- 시간 가중치 적용 (최근 회의록 우선)
- 부서/팀별 필터링 옵션
- 주제별 카테고리 분류
## 📝 파일 변경 목록
| 파일 | 변경 유형 | 설명 |
|------|----------|------|
| `rag/src/models/minutes.py` | 수정 | Request/Response 모델 추가 |
| `rag/src/db/rag_minutes_db.py` | 수정 | exclude_minutes_id 파라미터 추가 |
| `rag/src/utils/redis_cache.py` | 신규 | Redis 캐싱 유틸리티 |
| `rag/src/api/main.py` | 수정 | API 엔드포인트 추가 |
| `rag/config.yaml` | 수정 | rag_minutes 설정 추가 |
## ✅ 구현 완료 체크리스트
- [x] Request/Response 모델 정의
- [x] DB 쿼리 함수 개선 (자기 자신 제외)
- [x] Redis 캐싱 유틸리티 구현
- [x] API 엔드포인트 구현
- [x] 2단계 캐싱 전략 적용
- [x] 설정 파일 업데이트
- [x] 문서 작성
---
**작성자**: 준호 (Backend Developer)
**작성일**: 2025-10-29
**버전**: 1.0.0

View File

@ -0,0 +1,356 @@
# Event Hub Basic Tier 제약사항 및 해결 방법
## 현재 상황
```
Event Hub Namespace: hgzero-eventhub-ns
Resource Group: rg-digitalgarage-02
Location: koreacentral
Tier: Basic ⚠️
Status: Active
```
## Basic Tier 제약사항
| 기능 | Basic | Standard | Premium |
|------|-------|----------|---------|
| **Consumer Groups** | **1개만** ($Default) | **최대 20개** | 최대 100개 |
| 파티션 | 최대 32개 | 최대 32개 | 최대 100개 |
| 메시지 보존 | 1일 | 7일 | 90일 |
| Capture | ❌ | ✅ | ✅ |
| 가격 | 낮음 | 중간 | 높음 |
**Basic Tier에서는 `$Default` Consumer Group만 사용할 수 있습니다.**
---
## 해결 방법
### 방법 1: Tier 업그레이드 (권장) ⭐️
Event Hub를 Basic → Standard로 업그레이드하면 최대 20개의 Consumer Group 사용 가능
#### 업그레이드 방법
**Azure Portal:**
1. Event Hub Namespace (hgzero-eventhub-ns) 페이지로 이동
2. 왼쪽 메뉴에서 "Pricing tier" 클릭
3. "Standard" 선택
4. "Apply" 버튼 클릭
5. 약 1-2분 후 적용 완료
**Azure CLI:**
```bash
az eventhubs namespace update \
--resource-group rg-digitalgarage-02 \
--name hgzero-eventhub-ns \
--sku Standard \
--capacity 1
```
**비용:**
- Basic: ~$0.015/시간 (~$11/월)
- Standard: ~$0.03/시간 (~$22/월)
- 차이: 약 $11/월 추가
**장점:**
- ✅ 여러 Consumer Group 사용 가능
- ✅ 메시지 보존 기간 7일로 증가
- ✅ Capture 기능 사용 가능
- ✅ 영구적 해결책
**단점:**
- ❌ 비용 증가 (약 2배)
- ❌ 관리자 승인 필요할 수 있음
---
### 방법 2: $Default Consumer Group 공유 사용 (임시)
개발/테스트 시 기존 프로덕션 Consumer를 잠시 중지하고 테스트
#### 실행 순서
**Step 1: 프로덕션 Consumer 중지**
```bash
# 기존 Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# PID 확인 후 종료
kill <PID>
# 예시
kill 51257
```
**Step 2: 테스트 Consumer 실행**
```bash
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
**Step 3: 테스트 완료 후 프로덕션 재시작**
```bash
# Ctrl+C로 테스트 Consumer 종료
# 프로덕션 Consumer 재시작
python start_consumer.py &
```
**장점:**
- ✅ 추가 비용 없음
- ✅ 즉시 테스트 가능
- ✅ 권한 불필요
**단점:**
- ❌ 프로덕션 서비스 중단
- ❌ 동시 실행 불가
- ❌ 임시 방법
**주의사항:**
- ⚠️ 프로덕션 환경에서만 사용 중이라면 비추천
- ⚠️ 업무 시간 외에만 테스트
- ⚠️ 테스트 후 반드시 프로덕션 재시작 확인
---
### 방법 3: Checkpoint 위치 변경으로 독립 실행
동일한 Consumer Group을 사용하되, checkpoint 저장 위치를 다르게 하여 독립적으로 실행
#### 구현 방법
**Step 1: 별도 Storage Container 생성**
Azure Portal:
1. Storage Account (hgzerostorage) 접속
2. "Containers" 클릭
3. "+ Container" 클릭
4. 이름: `hgzero-checkpoints-dev`
5. Create
**Step 2: config_dev.yaml 생성**
```yaml
# config_dev.yaml
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: "$Default" # 동일
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: "hgzero-checkpoints-dev" # 다른 컨테이너!
# 나머지는 config.yaml과 동일
```
**Step 3: 개발용 Consumer 실행 스크립트**
```python
# start_dev_consumer.py
import asyncio
from pathlib import Path
from src.utils.config import load_config
from src.db.rag_minutes_db import RagMinutesDB
from src.db.postgres_vector import PostgresVectorDB
from src.utils.embedding import EmbeddingGenerator
from src.services.eventhub_consumer import start_consumer
async def main():
# 개발용 설정 파일 로드
config_path = Path(__file__).parent / "config_dev.yaml"
config = load_config(str(config_path))
# ... 나머지는 start_consumer.py와 동일
if __name__ == "__main__":
asyncio.run(main())
```
**Step 4: 실행**
```bash
# Terminal 1: 프로덕션 Consumer
python start_consumer.py
# Terminal 2: 개발 Consumer (다른 checkpoint)
python start_dev_consumer.py
```
**장점:**
- ✅ 동시 실행 가능
- ✅ 독립적인 checkpoint
- ✅ 추가 비용 거의 없음 (Storage만)
- ✅ Tier 업그레이드 불필요
**단점:**
- ❌ 동일 파티션을 두 Consumer가 읽으려 하면 ownership 경쟁
- ❌ 한쪽만 이벤트 수신 (ownership을 가진 쪽)
- ❌ 완전한 독립 실행은 아님
**결론:**
- 이 방법은 **checkpoint만 독립**이고, **여전히 ownership 경쟁** 발생
- **동시 실행은 불가능**
- 권장하지 않음 ❌
---
### 방법 4: 로컬 개발 환경에서만 테스트
Event Hub 대신 로컬 메시지 큐 사용
#### 옵션 A: Azure Event Hub Emulator (권장)
현재는 공식 Emulator가 없으므로, Kafka나 RabbitMQ로 대체
#### 옵션 B: 메모리 큐로 테스트
```python
# test_consumer_local.py
import asyncio
import json
from queue import Queue
# 로컬 메모리 큐
local_queue = Queue()
async def test_event_processing():
"""로컬에서 이벤트 처리 로직만 테스트"""
# 테스트 이벤트 생성
test_event = {
'eventType': 'MINUTES_FINALIZED',
'data': {
'meetingId': 'test-001',
'title': '테스트 회의',
'minutesId': 'minutes-001',
'sections': []
}
}
# Consumer의 _process_minutes_event 로직만 테스트
from src.services.eventhub_consumer import EventHubConsumer
# 실제 처리 로직 테스트
# ...
asyncio.run(test_event_processing())
```
**장점:**
- ✅ 완전 독립 실행
- ✅ 무료
- ✅ 빠른 테스트
**단점:**
- ❌ 실제 Event Hub와 다른 환경
- ❌ 통합 테스트 불가
---
## 권장 솔루션
### 개발 단계별 권장 방법
| 단계 | 권장 방법 | 이유 |
|------|----------|------|
| **로컬 개발** | 방법 4 (메모리 큐) | 빠르고 비용 없음 |
| **통합 테스트** | 방법 2 ($Default 공유) | 실제 환경, 단기간 |
| **지속적 개발** | 방법 1 (Tier 업그레이드) | 영구 해결, 동시 실행 |
| **프로덕션** | 방법 1 (Standard Tier) | 고가용성, 확장성 |
### 최종 권장: Tier 업그레이드
**이유:**
1. ✅ 여러 Consumer Group으로 개발/테스트 분리
2. ✅ 메시지 보존 기간 증가 (1일 → 7일)
3. ✅ 향후 확장 가능 (Capture 등)
4. ✅ 프로덕션 안정성 향상
**비용 대비 효과:**
- 월 $11 추가로 개발 생산성 향상
- 프로덕션 중단 없이 테스트 가능
- 장기적으로 더 경제적
---
## 실행 가이드
### 즉시 테스트가 필요한 경우 (방법 2)
```bash
# 1. 프로덕션 Consumer 중지
ps aux | grep "start_consumer.py" | grep -v grep
kill <PID>
# 2. 테스트 실행
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
# 3. 테스트 완료 후
# Ctrl+C로 종료
# 4. 프로덕션 재시작
python start_consumer.py &
```
### Tier 업그레이드 후 (방법 1)
```bash
# 1. Tier 업그레이드 (Azure Portal 또는 CLI)
az eventhubs namespace update \
--resource-group rg-digitalgarage-02 \
--name hgzero-eventhub-ns \
--sku Standard
# 2. Consumer Group 생성
az eventhubs eventhub consumer-group create \
--resource-group rg-digitalgarage-02 \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--name development
# 3. config.yaml 수정
# consumer_group: "development"
# 4. 프로덕션은 계속 실행, 개발 Consumer 별도 실행
python start_consumer.py
```
---
## 요약
### 현재 상황
- ❌ Event Hub Tier: **Basic**
- ❌ Consumer Group: **$Default만 사용 가능**
- ❌ 추가 Consumer Group 생성 불가
### 해결책
1. **단기**: 프로덕션 Consumer 잠시 중지하고 테스트 (방법 2)
2. **장기**: Standard Tier로 업그레이드 (방법 1) ⭐️
### 다음 액션
관리자에게 **Tier 업그레이드** 요청하거나,
긴급하다면 **프로덕션 중지 후 테스트** 진행
---
## 참고: Tier 비교
```
Basic Tier (현재)
├─ Consumer Groups: 1개 ($Default만)
├─ 보존 기간: 1일
├─ 비용: ~$11/월
└─ ❌ 개발/테스트 분리 불가
Standard Tier (권장)
├─ Consumer Groups: 최대 20개 ✅
├─ 보존 기간: 7일
├─ 비용: ~$22/월 (+$11)
└─ ✅ 개발/테스트 완전 분리
```
Standard Tier로 업그레이드하는 것을 강력히 권장합니다! 🎯

View File

@ -0,0 +1,367 @@
# Event Hub Active Consumers 조회 가이드
## 개요
Event Hub를 현재 읽고 있는 Consumer(host)를 확인하는 방법을 설명합니다.
## 방법 1: 제공된 스크립트 사용 (가장 쉬움) ⭐️
### 실행
```bash
cd /Users/daewoong/home/workspace/HGZero/rag
python check_active_consumers.py
```
### 출력 정보
```
📊 Event Hub Active Consumers
================================================================================
Consumer Group: $Default
Container: hgzero-checkpoints
🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)
--------------------------------------------------------------------------------
파티션 0:
상태: 🟢 ACTIVE
Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95...
마지막 갱신: 2025-10-29 12:35:42 UTC
경과 시간: 15초 전
📋 Consumer 요약
--------------------------------------------------------------------------------
Consumer #1 🟢 ACTIVE
Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95
소유 파티션: 0
파티션 개수: 1
마지막 활동: 2025-10-29 12:35:42 UTC
📍 Checkpoint 정보 (마지막 읽은 위치)
--------------------------------------------------------------------------------
파티션 0:
Offset: 120259090624
Sequence Number: 232
```
---
## 방법 2: Python 코드로 직접 조회
### 스크립트 예시
```python
import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from src.utils.config import load_config
async def check_consumers():
"""활성 Consumer 조회"""
# 설정 로드
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
storage_conn_str = eventhub_config['storage']['connection_string']
container_name = eventhub_config['storage']['container_name']
consumer_group = eventhub_config['consumer_group']
# Blob Service Client
blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
container = blob_service.get_container_client(container_name)
# Ownership 정보 조회
ownership_prefix = f"{consumer_group}/ownership/"
print(f"Consumer Group: {consumer_group}\n")
for blob in container.list_blobs(name_starts_with=ownership_prefix):
blob_client = container.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
ownership_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
owner_id = ownership_data.get('ownerIdentifier', 'unknown')
# 활성 상태 확인 (60초 이내 갱신)
now = datetime.now(timezone.utc)
time_diff = (now - blob.last_modified).total_seconds()
is_active = time_diff < 60
status = "🟢 ACTIVE" if is_active else "🔴 INACTIVE"
print(f"파티션 {partition_id}: {status}")
print(f" Owner: {owner_id}")
print(f" 갱신: {int(time_diff)}초 전\n")
asyncio.run(check_consumers())
```
---
## 방법 3: Azure Portal에서 확인
### 단계
1. **Azure Portal** 접속 (https://portal.azure.com)
2. **Storage Account** 이동
- "hgzerostorage" 검색
3. **Containers** 클릭
- "hgzero-checkpoints" 선택
4. **폴더 구조 확인**
```
$Default/
├── ownership/
│ └── 0 ← 파티션 0의 소유권 정보
└── checkpoint/
└── 0 ← 파티션 0의 checkpoint
```
5. **ownership/0 파일 다운로드** 또는 **View** 클릭
6. **JSON 내용 확인**
```json
{
"ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95",
"lastModifiedTime": "2025-10-29T12:35:42Z",
...
}
```
### 정보 해석
- **ownerIdentifier**: 현재 Consumer의 고유 ID
- **lastModifiedTime**: 마지막 lease 갱신 시간
- 60초 이내: 🟢 활성 상태
- 60초 이상: 🔴 비활성 (Consumer 종료됨)
---
## 방법 4: Azure CLI로 조회
### Blob 목록 확인
```bash
# Storage Account 키 조회
az storage account keys list \
--resource-group rg-digitalgarage-02 \
--account-name hgzerostorage \
--query "[0].value" -o tsv
# Blob 목록 조회
az storage blob list \
--account-name hgzerostorage \
--container-name hgzero-checkpoints \
--prefix "\$Default/ownership/" \
--output table
```
### Blob 내용 다운로드
```bash
# ownership 정보 다운로드
az storage blob download \
--account-name hgzerostorage \
--container-name hgzero-checkpoints \
--name "\$Default/ownership/0" \
--file /tmp/ownership.json
# 내용 확인
cat /tmp/ownership.json | python3 -m json.tool
```
---
## 방법 5: 프로세스 레벨에서 확인
### 로컬 머신에서 실행 중인 Consumer
```bash
# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# 출력:
# daewoong 81447 0.0 0.2 python start_consumer.py
```
### 네트워크 연결 확인
```bash
# Event Hub와의 연결 확인
lsof -i -n | grep 81447 | grep ESTABLISHED
# 출력:
# python3.1 81447 daewoong 9u IPv6 TCP ... -> ...servicebus.windows.net:https (ESTABLISHED)
```
**의미:**
- Consumer 프로세스가 실행 중
- Event Hub와 HTTPS 연결 유지 중
- 이벤트 수신 대기 중
---
## 주요 개념
### Ownership (소유권)
```
파티션 0
Ownership Blob (Blob Storage)
{
"ownerIdentifier": "consumer-uuid",
"lastModifiedTime": "2025-10-29T12:35:42Z",
"eTag": "...",
...
}
Lease 메커니즘 (30초마다 갱신)
갱신 실패 시 → 다른 Consumer가 인수 가능
```
### Checkpoint (읽은 위치)
```
파티션 0
Checkpoint Blob
{
"offset": "120259090624",
"sequenceNumber": 232,
...
}
Consumer 재시작 시 → 이 위치부터 재개
```
### Consumer 상태 판단
| 조건 | 상태 | 의미 |
|------|------|------|
| lastModifiedTime < 60초 | 🟢 ACTIVE | 정상 동작 |
| 60초 < lastModifiedTime < 180초 | 🟡 WARNING | Lease 갱신 지연 |
| lastModifiedTime > 180초 | 🔴 INACTIVE | Consumer 종료 |
---
## 트러블슈팅
### 문제 1: Ownership 정보가 없음
**증상:**
```
⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.
```
**원인:**
- Consumer가 한 번도 실행되지 않음
- Consumer가 ownership claim에 실패함
**해결:**
1. Consumer 프로세스 확인: `ps aux | grep start_consumer`
2. Consumer 로그 확인
3. 권한 확인 (Storage Account 접근 권한)
### 문제 2: 여러 Owner ID가 나타남
**증상:**
```
Consumer #1: owner-id-1 (파티션 0, 2, 4)
Consumer #2: owner-id-2 (파티션 1, 3, 5)
```
**원인:**
- 정상: 여러 Consumer가 파티션을 나눠서 처리
- 파티션이 여러 개이고 Consumer도 여러 개
**확인:**
- 각 파티션이 하나의 Consumer만 소유하면 정상
- 동일 파티션을 여러 Consumer가 소유하면 문제
### 문제 3: lastModified가 오래됨
**증상:**
```
파티션 0:
상태: 🔴 INACTIVE
마지막 갱신: 2025-10-29 10:00:00 UTC
경과 시간: 7200초 전 (2시간)
```
**원인:**
- Consumer가 종료됨
- Consumer가 응답 없음 (hang)
- 네트워크 문제
**해결:**
1. Consumer 프로세스 확인
2. Consumer 재시작
3. 로그 확인하여 에러 파악
---
## 실시간 모니터링
### 스크립트: 주기적 확인
```bash
#!/bin/bash
# monitor_consumers.sh
while true; do
clear
echo "=== $(date) ==="
python check_active_consumers.py
sleep 30 # 30초마다 갱신
done
```
### 실행
```bash
chmod +x monitor_consumers.sh
./monitor_consumers.sh
```
---
## 요약
### 빠른 확인 방법
```bash
# 1. 스크립트 실행 (가장 쉬움)
python check_active_consumers.py
# 2. 프로세스 확인
ps aux | grep start_consumer
# 3. Azure Portal에서 Blob 확인
# Storage Account > Containers > hgzero-checkpoints > $Default/ownership
```
### 확인 항목 체크리스트
- [ ] Consumer 프로세스 실행 중인가?
- [ ] Ownership 정보가 있는가?
- [ ] lastModified가 60초 이내인가?
- [ ] 각 파티션이 하나의 Consumer만 소유하는가?
- [ ] Checkpoint가 갱신되고 있는가?
모든 항목이 체크되면 Consumer가 정상 동작 중입니다! ✅
---
## 참고 파일
- 스크립트: `/Users/daewoong/home/workspace/HGZero/rag/check_active_consumers.py`
- 설정: `/Users/daewoong/home/workspace/HGZero/rag/config.yaml`
- Blob Storage: `hgzerostorage/hgzero-checkpoints`

View File

@ -0,0 +1,494 @@
# Consumer Group 생성 및 할당 가이드
## 목차
1. [Consumer Group이란?](#consumer-group이란)
2. [생성 방법](#생성-방법)
3. [코드에서 사용하기](#코드에서-사용하기)
4. [검증 방법](#검증-방법)
5. [트러블슈팅](#트러블슈팅)
---
## Consumer Group이란?
Consumer Group은 Event Hub의 이벤트를 **독립적으로 읽는 논리적 그룹**입니다.
### 주요 특징
- 각 Consumer Group은 독립적인 checkpoint를 유지
- 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음
- 기본 Consumer Group: `$Default` (자동 생성됨)
### 사용 사례
| Consumer Group | 용도 | 설명 |
|---------------|------|------|
| `$Default` | 프로덕션 | 실제 운영 환경에서 사용 |
| `development` | 개발 | 개발자가 로컬에서 테스트 |
| `test` | 테스트 | QA 테스트 환경 |
| `analytics` | 분석 | 데이터 분석 및 모니터링 |
| `backup` | 백업 | 이벤트 백업 및 아카이빙 |
---
## 생성 방법
### 방법 1: Azure Portal (가장 쉬움) ⭐️
#### 단계별 가이드
**Step 1: Azure Portal 접속**
```
https://portal.azure.com
```
**Step 2: Event Hub Namespace 찾기**
1. 상단 검색창에 "hgzero-eventhub-ns" 입력
2. "Event Hubs Namespaces" 아래의 결과 클릭
**Step 3: Event Hub 선택**
1. 왼쪽 메뉴에서 "Event Hubs" 클릭
2. "hgzero-eventhub-name" 선택
**Step 4: Consumer Groups 관리**
1. 왼쪽 메뉴에서 "Consumer groups" 클릭
2. 현재 목록에 `$Default`만 있을 것임
**Step 5: 새 Consumer Group 생성**
1. 상단의 "+ Consumer group" 버튼 클릭
2. Name 입력:
- 개발용: `development`
- 테스트용: `test`
- 분석용: `analytics`
3. "Create" 버튼 클릭
**Step 6: 생성 확인**
- 목록에 새로운 Consumer Group이 추가되었는지 확인
- 예: `$Default`, `development`, `test`
---
### 방법 2: Azure CLI
#### 사전 요구사항
```bash
# Azure CLI 설치 확인
az --version
# 로그인
az login
```
#### Consumer Group 생성
```bash
# 리소스 그룹 확인 (필요 시)
az eventhubs namespace show \
--name hgzero-eventhub-ns \
--query resourceGroup -o tsv
# Consumer Group 생성
az eventhubs eventhub consumer-group create \
--resource-group <YOUR_RESOURCE_GROUP> \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--name development
# 생성 확인
az eventhubs eventhub consumer-group list \
--resource-group <YOUR_RESOURCE_GROUP> \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--output table
```
#### 출력 예시
```
Name ResourceGroup
----------- ---------------
$Default hgzero-rg
development hgzero-rg
test hgzero-rg
```
---
### 방법 3: Python Management SDK
#### 설치
```bash
pip install azure-mgmt-eventhub azure-identity
```
#### 코드
```python
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential
# 인증
credential = DefaultAzureCredential()
subscription_id = "<YOUR_SUBSCRIPTION_ID>"
# 관리 클라이언트 생성
mgmt_client = EventHubManagementClient(credential, subscription_id)
# Consumer Group 생성
mgmt_client.consumer_groups.create_or_update(
resource_group_name='<YOUR_RESOURCE_GROUP>',
namespace_name='hgzero-eventhub-ns',
event_hub_name='hgzero-eventhub-name',
consumer_group_name='development',
parameters={} # 추가 설정 가능
)
print("✅ Consumer Group 'development' 생성 완료")
# Consumer Group 목록 조회
consumer_groups = mgmt_client.consumer_groups.list_by_event_hub(
resource_group_name='<YOUR_RESOURCE_GROUP>',
namespace_name='hgzero-eventhub-ns',
event_hub_name='hgzero-eventhub-name'
)
print("\n현재 Consumer Groups:")
for cg in consumer_groups:
print(f" - {cg.name}")
```
---
## 코드에서 사용하기
### 1. config.yaml 수정
#### 기존 설정
```yaml
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP} # "$Default"
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
```
#### 개발 환경용 설정
```yaml
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: "development" # 직접 지정
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
```
### 2. .env 파일 수정 (선택사항)
#### 환경 변수로 관리하는 경우
```bash
# .env
EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
EVENTHUB_NAME=hgzero-eventhub-name
AZURE_EVENTHUB_CONSUMER_GROUP=development # 변경
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..."
AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints
```
### 3. Consumer 실행
#### 개발 환경
```bash
# config.yaml의 consumer_group을 "development"로 설정
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
#### 프로덕션 환경
```bash
# config.yaml의 consumer_group을 "$Default"로 설정
python start_consumer.py
```
#### 여러 환경 동시 실행
```bash
# Terminal 1: 프로덕션 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py
# Terminal 2: 개발 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py
# Terminal 3: 테스트 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py
```
---
## 검증 방법
### 1. Consumer Group 목록 확인
#### Python으로 확인
```python
import asyncio
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from src.utils.config import load_config
async def list_consumer_groups():
"""사용 가능한 Consumer Group 확인"""
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
# 여기서는 실제로 Management API를 사용해야 하지만
# Consumer Client로는 자신이 속한 그룹만 확인 가능
print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}")
asyncio.run(list_consumer_groups())
```
### 2. 다른 Consumer Group으로 테스트
```python
import asyncio
import json
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from src.utils.config import load_config
async def test_consumer_group(consumer_group_name):
"""특정 Consumer Group으로 이벤트 수신 테스트"""
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
# Checkpoint Store
checkpoint_store = BlobCheckpointStore.from_connection_string(
eventhub_config['storage']['connection_string'],
eventhub_config['storage']['container_name']
)
# Consumer Client (Consumer Group 지정)
client = EventHubConsumerClient.from_connection_string(
eventhub_config['connection_string'],
consumer_group=consumer_group_name, # 여기서 지정!
eventhub_name=eventhub_config['name'],
checkpoint_store=checkpoint_store
)
print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...")
event_count = 0
async def on_event(partition_context, event):
nonlocal event_count
event_count += 1
print(f"이벤트 수신: #{event_count}")
await partition_context.update_checkpoint(event)
async def on_error(partition_context, error):
print(f"에러: {error}")
try:
async with client:
receive_task = asyncio.create_task(
client.receive(
on_event=on_event,
on_error=on_error,
starting_position="@earliest"
)
)
await asyncio.sleep(10) # 10초 대기
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신")
except Exception as e:
print(f"❌ 에러 발생: {str(e)}")
# 테스트 실행
asyncio.run(test_consumer_group("development"))
```
### 3. Checkpoint 저장 위치 확인
각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다:
```
hgzero-checkpoints (Container)
├─ $Default/
│ ├─ ownership/
│ │ └─ 0 (파티션 0의 소유권 정보)
│ └─ checkpoint/
│ └─ 0 (파티션 0의 checkpoint)
├─ development/
│ ├─ ownership/
│ │ └─ 0
│ └─ checkpoint/
│ └─ 0
└─ test/
├─ ownership/
│ └─ 0
└─ checkpoint/
└─ 0
```
---
## 트러블슈팅
### 문제 1: Consumer Group을 찾을 수 없음
**증상**
```
azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found.
```
**원인**: Consumer Group이 실제로 생성되지 않음
**해결**:
1. Azure Portal에서 Consumer Group 목록 확인
2. 없으면 생성
3. 이름 오타 확인 (대소문자 구분)
### 문제 2: Ownership claim 실패
**증상**
```
EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming.
```
**원인**:
- 동일 Consumer Group에 이미 다른 Consumer가 실행 중
- 파티션보다 Consumer가 많음
**해결**:
1. 기존 Consumer 종료
2. 다른 Consumer Group 사용
3. 파티션 수 증가
### 문제 3: 이벤트를 읽지 못함
**증상**: Consumer는 실행되지만 이벤트가 수신되지 않음
**원인**:
- Starting position이 최신으로 설정됨
- 새 이벤트가 없음
**해결**:
```python
# config.yaml 또는 코드에서
await client.receive(
on_event=on_event,
on_error=on_error,
starting_position="@earliest" # 처음부터 읽기
)
```
### 문제 4: Checkpoint가 초기화되지 않음
**증상**: 새 Consumer Group인데 이미 읽은 것처럼 동작
**원인**: Blob Storage에 이전 checkpoint가 남아있음
**해결**:
```bash
# Azure Portal에서
# Storage Account > Containers > hgzero-checkpoints
# 해당 Consumer Group 폴더 삭제
```
또는 코드로:
```python
from azure.storage.blob import BlobServiceClient
blob_service = BlobServiceClient.from_connection_string(
AZURE_STORAGE_CONNECTION_STRING
)
container = blob_service.get_container_client("hgzero-checkpoints")
# 특정 Consumer Group의 checkpoint 삭제
prefix = "development/"
blobs = container.list_blobs(name_starts_with=prefix)
for blob in blobs:
container.delete_blob(blob.name)
print(f"삭제: {blob.name}")
```
---
## 권장 설정
### 환경별 Consumer Group 전략
```yaml
# 프로덕션
production:
consumer_group: "$Default"
starting_position: "-1" # 최신 이벤트부터
checkpoint_interval: 30 # 30초마다 checkpoint
# 개발
development:
consumer_group: "development"
starting_position: "@earliest" # 처음부터
checkpoint_interval: 10 # 자주 checkpoint (테스트용)
# 테스트
test:
consumer_group: "test"
starting_position: "@earliest"
checkpoint_interval: 5 # 매우 자주 (빠른 테스트)
# 분석
analytics:
consumer_group: "analytics"
starting_position: "@earliest" # 모든 데이터 분석
checkpoint_interval: 60 # 덜 자주 (성능)
```
### 실행 스크립트 예시
```bash
#!/bin/bash
# start_dev_consumer.sh
export AZURE_EVENTHUB_CONSUMER_GROUP=development
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
---
## 요약
### 빠른 시작 체크리스트
- [ ] Azure Portal에서 Consumer Group 생성
- [ ] 이름: `development`
- [ ] Event Hub: `hgzero-eventhub-name`
- [ ] `config.yaml` 수정
- [ ] `consumer_group: "development"` 설정
- [ ] Consumer 실행
- [ ] `python start_consumer.py`
- [ ] 검증
- [ ] 이벤트 수신 확인
- [ ] Blob Storage에 checkpoint 생성 확인
### Consumer Group 활용 팁
1. **프로덕션과 개발 분리**: 항상 다른 Consumer Group 사용
2. **테스트는 독립적으로**: `test` Consumer Group으로 자유롭게 실험
3. **Checkpoint 관리**: 필요시 삭제하여 처음부터 재처리
4. **모니터링**: 각 Consumer Group별 lag 모니터링
5. **비용 최적화**: 불필요한 Consumer Group은 삭제
이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯

378
rag/eventhub_guide.md Normal file
View File

@ -0,0 +1,378 @@
# Event Hub Consumer Guide
## 핵심 개념: Partition Ownership (파티션 소유권)
Event Hub에서는 **같은 Consumer Group 내에서 하나의 파티션은 동시에 오직 하나의 Consumer만 읽을 수 있습니다**. 이를 "Exclusive Consumer" 패턴이라고 합니다.
## 왜 이런 제약이 있나요?
### 1. 순서 보장 (Ordering)
```
파티션 0: [이벤트1] → [이벤트2] → [이벤트3]
❌ 잘못된 경우 (여러 Consumer가 동시 읽기):
Consumer A: 이벤트1 처리 중... (느림)
Consumer B: 이벤트2 처리 완료 ✓
Consumer C: 이벤트3 처리 완료 ✓
→ 처리 순서: 2 → 3 → 1 (순서 뒤바뀜!)
✅ 올바른 경우 (하나의 Consumer만):
Consumer A: 이벤트1 ✓ → 이벤트2 ✓ → 이벤트3 ✓
→ 처리 순서: 1 → 2 → 3 (순서 보장!)
```
### 2. Checkpoint 일관성
```
❌ 여러 Consumer가 각자 checkpoint:
Consumer A: offset 100까지 읽음 → checkpoint 저장
Consumer B: offset 150까지 읽음 → checkpoint 덮어씀
Consumer A 재시작 → offset 150부터 읽음 → offset 100~149 누락!
✅ 하나의 Consumer만:
Consumer A: offset 100 → 110 → 120 → ... (순차적)
재시작 시 → 마지막 checkpoint부터 정확히 재개
```
### 3. 중복 처리 방지
```
❌ 여러 Consumer가 동일 이벤트 읽기:
Consumer A: 주문 이벤트 처리 → 결제 완료
Consumer B: 동일 주문 이벤트 처리 → 중복 결제!
✅ 하나의 Consumer만:
Consumer A: 주문 이벤트 처리 → 1번만 결제 ✓
```
## Ownership Claim 메커니즘
### 동작 과정
```
Consumer A (PID 51257) - 먼저 시작
Blob Storage에 파티션 0 소유권 요청
✅ 승인 (Owner: A, Lease: 30초)
30초마다 Lease 갱신
계속 소유권 유지
Consumer B (테스트) - 나중에 시작
Blob Storage에 파티션 0 소유권 요청
❌ 거부 (이미 A가 소유 중)
"hasn't claimed an ownership" 로그
계속 재시도 (대기 상태)
```
### Blob Storage에 저장되는 정보
```json
{
"partitionId": "0",
"ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95",
"lastModifiedTime": "2025-10-29T02:17:49Z",
"eTag": "\"0x8DCF7E8F9B3C1A0\"",
"offset": "120259090624",
"sequenceNumber": 232
}
```
## 현재 상황 분석
```
Event Hub: hgzero-eventhub-name
├─ 파티션 수: 1개 (파티션 0)
└─ Consumer Group: $Default
실행 중:
├─ Consumer A (PID 51257): 파티션 0 소유 ✅
│ ├─ 이벤트 정상 수신 중
│ ├─ Lease 주기적 갱신 중
│ └─ Checkpoint 저장 중
└─ 테스트 Consumer들: 소유권 없음 ❌
├─ 파티션 0 claim 시도
├─ 계속 거부당함
├─ "hasn't claimed an ownership" 로그
└─ 이벤트 수신 불가
```
## 해결 방법
### Option 1: 기존 Consumer 종료 후 재시작
```bash
# 기존 Consumer 종료
kill 51257
# 새로 시작
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
**장점**: 간단
**단점**: 다운타임 발생 (Lease 만료까지 최대 30초)
### Option 2: 다른 Consumer Group 사용 (권장)
```yaml
# config.yaml
eventhub:
consumer_group: "test-group" # $Default 대신 사용
```
**장점**:
- 기존 Consumer에 영향 없음
- 독립적으로 모든 이벤트 읽기 가능
- 개발/테스트에 이상적
**단점**: 리소스 추가 사용
### Option 3: 파티션 수평 확장
```
Event Hub 파티션 증가: 1개 → 3개
Consumer 실행: 3개
분산:
├─ Consumer A: 파티션 0
├─ Consumer B: 파티션 1
└─ Consumer C: 파티션 2
→ 병렬 처리로 3배 성능 향상!
```
**장점**: 높은 처리량
**단점**: 비용 증가, 전체 순서는 보장 안 됨 (파티션 내 순서만 보장)
## Consumer Group 비교
```
┌─────────────────────────────────────────┐
│ Event Hub: hgzero-eventhub │
│ 파티션 0: [이벤트들...] │
└─────────────────────────────────────────┘
├─────────────────┬─────────────────┐
│ │ │
Consumer Group Consumer Group Consumer Group
"$Default" "analytics" "backup"
│ │ │
Consumer A Consumer B Consumer C
(RAG 처리) (분석 처리) (백업 처리)
│ │ │
각자 독립적으로 동일한 파티션 0의 모든 이벤트 읽음
각자 독립적인 Checkpoint 유지
```
## 파티션과 Consumer 수 관계
### Case 1: Consumer 1개
```
Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default
├─ Consumer A: P0, P1, P2 모두 소유
└─ 모든 파티션 처리 (순차적)
```
### Case 2: Consumer 3개 (이상적)
```
Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default
├─ Consumer A: P0 소유
├─ Consumer B: P1 소유
└─ Consumer C: P2 소유
→ 병렬 처리로 최대 성능!
```
### Case 3: Consumer 5개 (과잉)
```
Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default
├─ Consumer A: P0 소유
├─ Consumer B: P1 소유
├─ Consumer C: P2 소유
├─ Consumer D: 소유한 파티션 없음 (대기)
└─ Consumer E: 소유한 파티션 없음 (대기)
→ D, E는 이벤트를 읽지 못하고 대기만 함
```
## 베스트 프랙티스
| 환경 | Consumer 수 | Consumer Group | 파티션 수 |
|------|-------------|----------------|-----------|
| **프로덕션** | = 파티션 수 | production | 처리량에 맞게 |
| **개발** | 1개 | development | 1~2개 |
| **테스트** | 1개 | test | 1개 |
| **분석** | 1개 | analytics | (공유) |
### 권장 사항
1. **프로덕션 환경**
- Consumer 수 = 파티션 수 (1:1 매핑)
- 고가용성을 위해 각 Consumer를 다른 서버에 배치
- Consumer 수 > 파티션 수로 설정하면 일부는 대기 상태 (Standby)
2. **개발/테스트 환경**
- 별도 Consumer Group 사용
- 파티션 1개로 충분
- 필요시 checkpoint를 초기화하여 처음부터 재처리
3. **모니터링**
- Ownership claim 실패 로그 모니터링
- Lease 갱신 실패 알림 설정
- Checkpoint lag 모니터링
4. **장애 복구**
- Lease timeout 고려 (기본 30초)
- Consumer 장애 시 자동 재분배 (30초 이내)
- Checkpoint로부터 정확한 위치에서 재개
## Consumer 프로세스 관리 명령어
### 프로세스 확인
```bash
# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# 상세 정보 (실행시간, CPU, 메모리)
ps -p <PID> -o pid,etime,%cpu,%mem,cmd
# 네트워크 연결 확인
lsof -i -n | grep <PID>
# 모든 Python 프로세스 확인
ps aux | grep python | grep -v grep
```
### 프로세스 종료
```bash
# 정상 종료 (SIGTERM)
kill <PID>
# 강제 종료 (SIGKILL)
kill -9 <PID>
# 이름으로 종료
pkill -f start_consumer.py
```
## 테스트 이벤트 전송
```python
from azure.eventhub import EventHubProducerClient, EventData
import json
import os
from dotenv import load_dotenv
load_dotenv('rag/.env')
conn_str = os.getenv('EVENTHUB_CONNECTION_STRING')
eventhub_name = os.getenv('EVENTHUB_NAME')
test_event = {
'eventType': 'MINUTES_FINALIZED',
'data': {
'meetingId': 'test-meeting-001',
'title': '테스트 회의',
'minutesId': 'test-minutes-001',
'sections': [
{
'sectionId': 'section-001',
'type': 'DISCUSSION',
'title': '논의 사항',
'content': '테스트 논의 내용입니다.',
'order': 1,
'verified': True
}
]
}
}
producer = EventHubProducerClient.from_connection_string(
conn_str=conn_str,
eventhub_name=eventhub_name
)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(json.dumps(test_event)))
producer.send_batch(event_data_batch)
print('✅ 테스트 이벤트 전송 완료')
producer.close()
```
## Event Hub 파티션 정보 조회
```python
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
async def check_partitions():
client = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STRING,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME
)
async with client:
partition_ids = await client.get_partition_ids()
print(f"파티션 개수: {len(partition_ids)}")
print(f"파티션 IDs: {partition_ids}")
for partition_id in partition_ids:
props = await client.get_partition_properties(partition_id)
print(f"\n파티션 {partition_id}:")
print(f" 시퀀스 번호: {props['last_enqueued_sequence_number']}")
print(f" 오프셋: {props['last_enqueued_offset']}")
print(f" 마지막 이벤트 시간: {props['last_enqueued_time_utc']}")
asyncio.run(check_partitions())
```
## 정리
### "에러"가 아니라 "설계된 동작"입니다
1. ✅ **정상**: Consumer A가 파티션 소유 → 이벤트 처리
2. ✅ **정상**: Consumer B가 claim 실패 → 대기
3. ✅ **정상**: Consumer A 종료 시 → Consumer B가 자동 인수
### 이 메커니즘의 장점
- 📌 **순서 보장**: 파티션 내 이벤트 순서 유지
- 📌 **정확히 한 번 처리**: 중복 처리 방지
- 📌 **자동 장애 복구**: Consumer 장애 시 자동 재분배
- 📌 **수평 확장**: 파티션 추가로 처리량 증가
### 현재 상황 해결
**권장**: 다른 Consumer Group을 사용하여 테스트하시는 것이 가장 안전하고 효율적입니다!
```yaml
# 개발/테스트용 Consumer Group 설정
eventhub:
consumer_group: "development" # 또는 "test"
```
이렇게 하면:
- 기존 프로덕션 Consumer에 영향 없음
- 독립적으로 모든 이벤트를 처음부터 읽을 수 있음
- 여러 번 테스트 가능

View File

@ -22,7 +22,9 @@ from ..models.document import (
) )
from ..models.minutes import ( from ..models.minutes import (
MinutesSearchRequest, MinutesSearchRequest,
MinutesSearchResult MinutesSearchResult,
RelatedMinutesRequest,
RelatedMinutesResponse
) )
from ..db.postgres_vector import PostgresVectorDB from ..db.postgres_vector import PostgresVectorDB
from ..db.azure_search import AzureAISearchDB from ..db.azure_search import AzureAISearchDB
@ -31,6 +33,7 @@ from ..services.claude_service import ClaudeService
from ..utils.config import load_config, get_database_url from ..utils.config import load_config, get_database_url
from ..utils.embedding import EmbeddingGenerator from ..utils.embedding import EmbeddingGenerator
from ..utils.text_processor import extract_nouns_as_query from ..utils.text_processor import extract_nouns_as_query
from ..utils.redis_cache import RedisCache
# 로깅 설정 # 로깅 설정
logging.basicConfig( logging.basicConfig(
@ -62,6 +65,7 @@ _doc_db = None
_rag_minutes_db = None _rag_minutes_db = None
_embedding_gen = None _embedding_gen = None
_claude_service = None _claude_service = None
_redis_cache = None
def get_config(): def get_config():
@ -139,6 +143,22 @@ def get_claude_service():
return _claude_service return _claude_service
def get_redis_cache():
"""Redis 캐시"""
global _redis_cache
if _redis_cache is None:
config = get_config()
redis_config = config["redis"]
_redis_cache = RedisCache(
host=redis_config["host"],
port=redis_config["port"],
db=redis_config["db"],
password=redis_config.get("password"),
decode_responses=redis_config.get("decode_responses", True)
)
return _redis_cache
# ============================================================================ # ============================================================================
# 용어집 API # 용어집 API
# ============================================================================ # ============================================================================
@ -156,7 +176,7 @@ async def root():
} }
@app.post("/api/terms/search", response_model=List[TermSearchResult]) @app.post("/api/rag/terms/search", response_model=List[TermSearchResult])
async def search_terms( async def search_terms(
request: TermSearchRequest, request: TermSearchRequest,
term_db: PostgresVectorDB = Depends(get_term_db), term_db: PostgresVectorDB = Depends(get_term_db),
@ -253,7 +273,7 @@ async def search_terms(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/terms/{term_id}", response_model=Term) @app.get("/api/rag/terms/{term_id}", response_model=Term)
async def get_term( async def get_term(
term_id: str, term_id: str,
term_db: PostgresVectorDB = Depends(get_term_db) term_db: PostgresVectorDB = Depends(get_term_db)
@ -281,7 +301,7 @@ async def get_term(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/terms/{term_id}/explain", response_model=TermExplanation) @app.post("/api/rag/terms/{term_id}/explain", response_model=TermExplanation)
async def explain_term( async def explain_term(
term_id: str, term_id: str,
request: TermExplainRequest, request: TermExplainRequest,
@ -327,7 +347,7 @@ async def explain_term(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/terms/stats", response_model=TermStats) @app.get("/api/rag/terms/stats", response_model=TermStats)
async def get_term_stats(term_db: PostgresVectorDB = Depends(get_term_db)): async def get_term_stats(term_db: PostgresVectorDB = Depends(get_term_db)):
"""용어 통계 조회""" """용어 통계 조회"""
try: try:
@ -349,7 +369,7 @@ async def get_term_stats(term_db: PostgresVectorDB = Depends(get_term_db)):
# 관련자료 API # 관련자료 API
# ============================================================================ # ============================================================================
@app.post("/api/documents/search", response_model=List[DocumentSearchResult]) @app.post("/api/rag/documents/search", response_model=List[DocumentSearchResult])
async def search_documents( async def search_documents(
request: DocumentSearchRequest, request: DocumentSearchRequest,
doc_db: AzureAISearchDB = Depends(get_doc_db), doc_db: AzureAISearchDB = Depends(get_doc_db),
@ -395,7 +415,7 @@ async def search_documents(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/documents/stats", response_model=DocumentStats) @app.get("/api/rag/documents/stats", response_model=DocumentStats)
async def get_document_stats(doc_db: AzureAISearchDB = Depends(get_doc_db)): async def get_document_stats(doc_db: AzureAISearchDB = Depends(get_doc_db)):
"""문서 통계 조회""" """문서 통계 조회"""
try: try:
@ -417,7 +437,7 @@ async def get_document_stats(doc_db: AzureAISearchDB = Depends(get_doc_db)):
# RAG 회의록 API # RAG 회의록 API
# ============================================================================ # ============================================================================
@app.post("/api/minutes/search", response_model=List[MinutesSearchResult]) @app.post("/api/rag/minutes/search", response_model=List[MinutesSearchResult])
async def search_related_minutes( async def search_related_minutes(
request: MinutesSearchRequest, request: MinutesSearchRequest,
rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db), rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db),
@ -461,7 +481,7 @@ async def search_related_minutes(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/minutes/{minutes_id}") @app.get("/api/rag/minutes/{minutes_id}")
async def get_minutes( async def get_minutes(
minutes_id: str, minutes_id: str,
rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db) rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db)
@ -489,7 +509,7 @@ async def get_minutes(
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/minutes/stats") @app.get("/api/rag/minutes/stats")
async def get_minutes_stats(rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db)): async def get_minutes_stats(rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db)):
"""회의록 통계 조회""" """회의록 통계 조회"""
try: try:
@ -501,6 +521,123 @@ async def get_minutes_stats(rag_minutes_db: RagMinutesDB = Depends(get_rag_minut
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/rag/minutes/related", response_model=List[RelatedMinutesResponse])
async def get_related_minutes(
request: RelatedMinutesRequest,
rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db),
embedding_gen: EmbeddingGenerator = Depends(get_embedding_gen),
redis_cache: RedisCache = Depends(get_redis_cache)
):
"""
연관 회의록 조회 (Option A: DB 조회 벡터 검색 + Redis 캐싱)
Args:
request: 연관 회의록 조회 요청
- minute_id: 기준 회의록 ID
- meeting_title: 회의 제목 (미사용)
- summary: 회의록 요약 (미사용)
- top_k: 반환할 최대 결과
- similarity_threshold: 최소 유사도 임계값
Returns:
연관 회의록 리스트
Process:
1. Redis 캐시에서 연관 회의록 결과 조회
2. 캐시 MISS :
a. minute_id로 rag_minutes 테이블에서 회의록 조회 (캐싱)
b. full_content를 벡터 임베딩으로 변환
c. 벡터 DB에서 유사도 검색 (자기 자신 제외)
d. 결과를 Redis에 캐싱
3. 연관 회의록 목록 반환
"""
try:
config = get_config()
cache_config = config.get("rag_minutes", {}).get("cache", {})
cache_prefix = cache_config.get("prefix", "minutes:")
minutes_ttl = cache_config.get("ttl", 1800)
related_ttl = cache_config.get("related_ttl", 3600)
logger.info(f"연관 회의록 조회 시작: minute_id={request.minute_id}")
# 1. 캐시 키 생성
related_cache_key = (
f"{cache_prefix}related:{request.minute_id}:"
f"{request.top_k}:{request.similarity_threshold}"
)
# 2. 캐시 조회
cached_results = redis_cache.get(related_cache_key)
if cached_results:
logger.info(f"연관 회의록 캐시 HIT: {related_cache_key}")
return [
RelatedMinutesResponse(**result)
for result in cached_results
]
# 3. 캐시 MISS - DB 조회
logger.info(f"연관 회의록 캐시 MISS: {related_cache_key}")
# 3-1. 회의록 조회 (캐싱)
minutes_cache_key = f"{cache_prefix}{request.minute_id}"
base_minutes = redis_cache.get(minutes_cache_key)
if base_minutes:
logger.info(f"회의록 캐시 HIT: {minutes_cache_key}")
# RagMinutes 객체로 변환
from ..models.minutes import RagMinutes
base_minutes = RagMinutes(**base_minutes)
else:
logger.info(f"회의록 캐시 MISS: {minutes_cache_key}")
base_minutes = rag_minutes_db.get_minutes_by_id(request.minute_id)
if not base_minutes:
raise HTTPException(
status_code=404,
detail=f"회의록을 찾을 수 없습니다: {request.minute_id}"
)
# 캐시 저장
redis_cache.set(minutes_cache_key, base_minutes.dict(), minutes_ttl)
logger.info(f"기준 회의록 조회 완료: {base_minutes.title}")
# 3-2. full_content를 벡터 임베딩으로 변환
query_embedding = embedding_gen.generate_embedding(base_minutes.full_content)
logger.info(f"임베딩 생성 완료: {len(query_embedding)}차원")
# 3-3. 벡터 유사도 검색 (자기 자신 제외)
results = rag_minutes_db.search_by_vector(
query_embedding=query_embedding,
top_k=request.top_k,
similarity_threshold=request.similarity_threshold,
exclude_minutes_id=request.minute_id
)
# 4. 응답 형식으로 변환
related_minutes = [
RelatedMinutesResponse(
minutes=result["minutes"],
similarity_score=result["similarity_score"]
)
for result in results
]
# 5. 결과 캐싱
redis_cache.set(
related_cache_key,
[r.dict() for r in related_minutes],
related_ttl
)
logger.info(f"연관 회의록 조회 완료: {len(related_minutes)}개 결과")
return related_minutes
except HTTPException:
raise
except Exception as e:
logger.error(f"연관 회의록 조회 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -93,6 +93,8 @@ class RagMinutesDB:
if isinstance(minutes_dict[field], datetime): if isinstance(minutes_dict[field], datetime):
minutes_dict[field] = minutes_dict[field].isoformat() minutes_dict[field] = minutes_dict[field].isoformat()
minutes_dict.pop("embedding")
return RagMinutes(**minutes_dict) return RagMinutes(**minutes_dict)
def insert_minutes(self, minutes: RagMinutes) -> bool: def insert_minutes(self, minutes: RagMinutes) -> bool:
@ -189,7 +191,8 @@ class RagMinutesDB:
self, self,
query_embedding: List[float], query_embedding: List[float],
top_k: int = 5, top_k: int = 5,
similarity_threshold: float = 0.7 similarity_threshold: float = 0.7,
exclude_minutes_id: Optional[str] = None
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
벡터 유사도 검색 벡터 유사도 검색
@ -198,27 +201,34 @@ class RagMinutesDB:
query_embedding: 쿼리 임베딩 벡터 query_embedding: 쿼리 임베딩 벡터
top_k: 반환할 최대 결과 top_k: 반환할 최대 결과
similarity_threshold: 최소 유사도 임계값 similarity_threshold: 최소 유사도 임계값
exclude_minutes_id: 제외할 회의록 ID (연관 회의록 검색 자기 자신 제외)
Returns: Returns:
검색 결과 리스트 검색 결과 리스트
""" """
with self.get_connection() as conn: with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur: with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(""" # 제외 조건 추가
exclude_condition = ""
params = [query_embedding, query_embedding, similarity_threshold, query_embedding, top_k]
if exclude_minutes_id:
exclude_condition = "AND minutes_id != %s"
# 파라미터 순서: 처음 4개는 embedding 검색용, 5번째는 exclude용, 6번째는 limit용
params = [query_embedding, query_embedding, similarity_threshold, exclude_minutes_id, query_embedding, top_k]
query = f"""
SELECT *, SELECT *,
1 - (embedding <=> %s::vector) as similarity_score 1 - (embedding <=> %s::vector) as similarity_score
FROM rag_minutes FROM rag_minutes
WHERE embedding IS NOT NULL WHERE embedding IS NOT NULL
AND 1 - (embedding <=> %s::vector) >= %s AND 1 - (embedding <=> %s::vector) >= %s
{exclude_condition}
ORDER BY embedding <=> %s::vector ORDER BY embedding <=> %s::vector
LIMIT %s LIMIT %s
""", ( """
query_embedding,
query_embedding, cur.execute(query, params)
similarity_threshold,
query_embedding,
top_k
))
results = [] results = []
for row in cur.fetchall(): for row in cur.fetchall():
@ -229,7 +239,7 @@ class RagMinutesDB:
"similarity_score": float(similarity_score) "similarity_score": float(similarity_score)
}) })
logger.info(f"벡터 검색 완료: {len(results)}개 결과") logger.info(f"벡터 검색 완료: {len(results)}개 결과 (exclude: {exclude_minutes_id})")
return results return results
def search_by_keyword( def search_by_keyword(

View File

@ -106,3 +106,41 @@ class MinutesSearchResult(BaseModel):
"similarity_score": 0.92 "similarity_score": 0.92
} }
} }
class RelatedMinutesRequest(BaseModel):
"""연관 회의록 조회 요청"""
minute_id: str = Field(..., description="기준 회의록 ID")
meeting_title: str = Field(..., description="회의 제목")
summary: str = Field(..., description="회의록 요약")
top_k: int = Field(5, ge=1, le=20, description="반환할 최대 결과 수")
similarity_threshold: float = Field(0.7, ge=0.0, le=1.0, description="최소 유사도 임계값")
class Config:
json_schema_extra = {
"example": {
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립을 위한 회의",
"top_k": 5,
"similarity_threshold": 0.7
}
}
class RelatedMinutesResponse(BaseModel):
"""연관 회의록 조회 응답"""
minutes: RagMinutes
similarity_score: float = Field(..., ge=0.0, le=1.0, description="유사도 점수")
class Config:
json_schema_extra = {
"example": {
"minutes": {
"meeting_id": "MTG-2025-002",
"title": "2024 Q4 마케팅 성과 분석",
"minutes_id": "MIN-2024-050"
},
"similarity_score": 0.85
}
}

View File

@ -1,6 +1,6 @@
""" """
Azure Event Hub Consumer 서비스 Azure Event Hub Consumer 서비스
회의록 확정 이벤트consume하여 RAG 저장소에 저장 회의록 확정 이벤트 세그먼트 생성 이벤트를 consume
""" """
import asyncio import asyncio
import json import json
@ -13,7 +13,9 @@ from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from ..models.minutes import RagMinutes, MinutesSection from ..models.minutes import RagMinutes, MinutesSection
from ..db.rag_minutes_db import RagMinutesDB from ..db.rag_minutes_db import RagMinutesDB
from ..db.postgres_vector import PostgresVectorDB
from ..utils.embedding import EmbeddingGenerator from ..utils.embedding import EmbeddingGenerator
from ..utils.text_processor import extract_nouns_as_query
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,7 +31,9 @@ class EventHubConsumer:
storage_connection_string: str, storage_connection_string: str,
storage_container_name: str, storage_container_name: str,
rag_minutes_db: RagMinutesDB, rag_minutes_db: RagMinutesDB,
embedding_gen: EmbeddingGenerator embedding_gen: EmbeddingGenerator,
term_db: Optional[PostgresVectorDB] = None,
config: Optional[Dict[str, Any]] = None
): ):
""" """
초기화 초기화
@ -42,6 +46,8 @@ class EventHubConsumer:
storage_container_name: Checkpoint 저장 컨테이너 이름 storage_container_name: Checkpoint 저장 컨테이너 이름
rag_minutes_db: RAG Minutes 데이터베이스 rag_minutes_db: RAG Minutes 데이터베이스
embedding_gen: Embedding 생성기 embedding_gen: Embedding 생성기
term_db: 용어집 데이터베이스 (선택)
config: 설정 딕셔너리 (선택)
""" """
self.connection_string = connection_string self.connection_string = connection_string
self.eventhub_name = eventhub_name self.eventhub_name = eventhub_name
@ -50,6 +56,8 @@ class EventHubConsumer:
self.storage_container_name = storage_container_name self.storage_container_name = storage_container_name
self.rag_minutes_db = rag_minutes_db self.rag_minutes_db = rag_minutes_db
self.embedding_gen = embedding_gen self.embedding_gen = embedding_gen
self.term_db = term_db
self.config = config or {}
self.client: Optional[EventHubConsumerClient] = None self.client: Optional[EventHubConsumerClient] = None
self.is_running = False self.is_running = False
@ -106,13 +114,18 @@ class EventHubConsumer:
event_body = event.body_as_str() event_body = event.body_as_str()
event_data = json.loads(event_body) event_data = json.loads(event_body)
logger.info(f"이벤트 수신: {event_data.get('eventType', 'unknown')}") event_type = event_data.get('eventType', 'unknown')
logger.info(f"이벤트 수신: {event_data.get('data', 'unknown')}") logger.info(f"이벤트 수신: {event_type}")
# 회의록 확정 이벤트 처리 # 이벤트 타입별 처리
if event_data.get("eventType") == "MINUTES_FINALIZED": if event_type == "MINUTES_FINALIZED":
# 회의록 확정 이벤트
await self._process_minutes_event(event_data) await self._process_minutes_event(event_data)
elif event_type == "SegmentCreated":
# 세그먼트 생성 이벤트 - 용어검색 실행
await self._process_segment_event(event_data)
# Checkpoint 업데이트 # Checkpoint 업데이트
await partition_context.update_checkpoint(event) await partition_context.update_checkpoint(event)
@ -131,6 +144,110 @@ class EventHubConsumer:
""" """
logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}") logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}")
async def _process_segment_event(self, event_data: Dict[str, Any]):
"""
세그먼트 생성 이벤트 처리 - 용어검색 실행
Args:
event_data: 이벤트 데이터
"""
try:
# 용어집 DB가 없으면 스킵
if not self.term_db:
logger.debug("용어집 DB가 설정되지 않아 용어검색을 스킵합니다")
return
# 세그먼트 데이터 추출
segment_id = event_data.get("segmentId")
text = event_data.get("text", "")
meeting_id = event_data.get("meetingId")
if not text:
logger.warning(f"세그먼트 {segment_id}에 텍스트가 없습니다")
return
logger.info(f"세그먼트 용어검색 시작: {segment_id} (회의: {meeting_id})")
logger.info(f"텍스트: {text[:100]}...")
# 1. 명사 추출하여 검색 쿼리 생성
search_query = extract_nouns_as_query(text)
logger.info(f"검색 쿼리 변환: '{text[:30]}...''{search_query}'")
# 2. 용어검색 설정
config = self.config.get("term_glossary", {})
search_config = config.get("search", {})
top_k = search_config.get("top_k", 5)
confidence_threshold = search_config.get("confidence_threshold", 0.7)
keyword_weight = search_config.get("keyword_weight", 0.4)
vector_weight = search_config.get("vector_weight", 0.6)
# 3. 키워드 검색
keyword_results = self.term_db.search_by_keyword(
query=search_query,
top_k=top_k,
confidence_threshold=confidence_threshold
)
# 4. 벡터 검색
query_embedding = self.embedding_gen.generate_embedding(search_query)
vector_results = self.term_db.search_by_vector(
query_embedding=query_embedding,
top_k=top_k,
confidence_threshold=confidence_threshold
)
# 5. 하이브리드 검색 결과 통합 (RRF)
results = []
seen_ids = set()
# 키워드 결과 가중치 적용
for result in keyword_results:
term_id = result["term"].term_id
if term_id not in seen_ids:
result["relevance_score"] *= keyword_weight
result["match_type"] = "hybrid"
results.append(result)
seen_ids.add(term_id)
# 벡터 결과 가중치 적용
for result in vector_results:
term_id = result["term"].term_id
if term_id not in seen_ids:
result["relevance_score"] *= vector_weight
result["match_type"] = "hybrid"
results.append(result)
seen_ids.add(term_id)
else:
# 이미 있는 경우 점수 합산
for r in results:
if r["term"].term_id == term_id:
r["relevance_score"] += result["relevance_score"] * vector_weight
break
# 점수 기준 재정렬
results.sort(key=lambda x: x["relevance_score"], reverse=True)
results = results[:top_k]
# 6. 검색 결과 로깅
if results:
logger.info(f"세그먼트 {segment_id} 용어검색 완료: {len(results)}개 발견")
for idx, result in enumerate(results, 1):
term = result["term"]
score = result["relevance_score"]
logger.info(
f" [{idx}] {term.term_name} "
f"(카테고리: {term.category}, 점수: {score:.3f})"
)
else:
logger.info(f"세그먼트 {segment_id}에서 매칭되는 용어를 찾지 못했습니다")
# 7. 선택적: 검색 결과를 별도 테이블에 저장하거나 Event Hub로 발행
# TODO: 필요시 검색 결과를 저장하거나 downstream 서비스로 전달
except Exception as e:
logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True)
def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]: def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]:
""" """
Java LocalDateTime 배열을 ISO 8601 문자열로 변환 Java LocalDateTime 배열을 ISO 8601 문자열로 변환
@ -302,7 +419,8 @@ class EventHubConsumer:
async def start_consumer( async def start_consumer(
config: Dict[str, Any], config: Dict[str, Any],
rag_minutes_db: RagMinutesDB, rag_minutes_db: RagMinutesDB,
embedding_gen: EmbeddingGenerator embedding_gen: EmbeddingGenerator,
term_db: Optional[PostgresVectorDB] = None
): ):
""" """
Event Hub Consumer 시작 (비동기) Event Hub Consumer 시작 (비동기)
@ -311,6 +429,7 @@ async def start_consumer(
config: 설정 딕셔너리 config: 설정 딕셔너리
rag_minutes_db: RAG Minutes 데이터베이스 rag_minutes_db: RAG Minutes 데이터베이스
embedding_gen: Embedding 생성기 embedding_gen: Embedding 생성기
term_db: 용어집 데이터베이스 (선택)
""" """
eventhub_config = config["eventhub"] eventhub_config = config["eventhub"]
@ -321,7 +440,9 @@ async def start_consumer(
storage_connection_string=eventhub_config["storage"]["connection_string"], storage_connection_string=eventhub_config["storage"]["connection_string"],
storage_container_name=eventhub_config["storage"]["container_name"], storage_container_name=eventhub_config["storage"]["container_name"],
rag_minutes_db=rag_minutes_db, rag_minutes_db=rag_minutes_db,
embedding_gen=embedding_gen embedding_gen=embedding_gen,
term_db=term_db,
config=config
) )
try: try:

Binary file not shown.

View File

@ -0,0 +1,206 @@
"""
Redis 캐싱 유틸리티
"""
import redis
import json
import logging
from typing import Optional, Any
from functools import wraps
logger = logging.getLogger(__name__)
class RedisCache:
"""Redis 캐싱 클래스"""
def __init__(
self,
host: str = "localhost",
port: int = 6379,
db: int = 0,
password: Optional[str] = None,
decode_responses: bool = True
):
"""
초기화
Args:
host: Redis 호스트
port: Redis 포트
db: Redis DB 번호
password: Redis 비밀번호
decode_responses: 응답 디코딩 여부
"""
try:
self.client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=decode_responses
)
# 연결 테스트
self.client.ping()
logger.info(f"Redis 연결 성공: {host}:{port}")
except Exception as e:
logger.warning(f"Redis 연결 실패: {str(e)} - 캐싱 비활성화")
self.client = None
def get(self, key: str) -> Optional[Any]:
"""
캐시에서 조회
Args:
key: 캐시
Returns:
캐시된 또는 None
"""
if not self.client:
return None
try:
value = self.client.get(key)
if value:
logger.debug(f"캐시 HIT: {key}")
return json.loads(value)
logger.debug(f"캐시 MISS: {key}")
return None
except Exception as e:
logger.error(f"캐시 조회 실패 ({key}): {str(e)}")
return None
def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
"""
캐시에 저장
Args:
key: 캐시
value: 저장할
ttl: 만료 시간 ()
Returns:
성공 여부
"""
if not self.client:
return False
try:
serialized = json.dumps(value, ensure_ascii=False)
self.client.setex(key, ttl, serialized)
logger.debug(f"캐시 저장: {key} (TTL: {ttl}s)")
return True
except Exception as e:
logger.error(f"캐시 저장 실패 ({key}): {str(e)}")
return False
def delete(self, key: str) -> bool:
"""
캐시 삭제
Args:
key: 캐시
Returns:
성공 여부
"""
if not self.client:
return False
try:
self.client.delete(key)
logger.debug(f"캐시 삭제: {key}")
return True
except Exception as e:
logger.error(f"캐시 삭제 실패 ({key}): {str(e)}")
return False
def delete_pattern(self, pattern: str) -> int:
"""
패턴 매칭으로 여러 캐시 삭제
Args:
pattern: 캐시 패턴 (: "minutes:*")
Returns:
삭제된 개수
"""
if not self.client:
return 0
try:
keys = self.client.keys(pattern)
if keys:
count = self.client.delete(*keys)
logger.info(f"캐시 일괄 삭제: {count}개 키 ({pattern})")
return count
return 0
except Exception as e:
logger.error(f"캐시 패턴 삭제 실패 ({pattern}): {str(e)}")
return 0
def exists(self, key: str) -> bool:
"""
캐시 존재 여부 확인
Args:
key: 캐시
Returns:
존재 여부
"""
if not self.client:
return False
try:
return self.client.exists(key) > 0
except Exception as e:
logger.error(f"캐시 존재 확인 실패 ({key}): {str(e)}")
return False
def cached(prefix: str, ttl: int = 3600, key_builder=None):
"""
함수 결과 캐싱 데코레이터
Args:
prefix: 캐시 prefix
ttl: 만료 시간 ()
key_builder: 캐시 생성 함수 (선택사항)
Example:
@cached(prefix="minutes:", ttl=1800)
def get_minutes_by_id(minutes_id: str):
...
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Redis 캐시 인스턴스 확인
cache = getattr(self, '_cache', None)
if not cache or not cache.client:
return func(self, *args, **kwargs)
# 캐시 키 생성
if key_builder:
cache_key = key_builder(*args, **kwargs)
else:
# 기본: 첫 번째 인자를 키로 사용
cache_key = f"{prefix}{args[0] if args else ''}"
# 캐시 조회
cached_value = cache.get(cache_key)
if cached_value is not None:
return cached_value
# 함수 실행
result = func(self, *args, **kwargs)
# 결과 캐싱
if result is not None:
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator

47
rag/start_all.sh Normal file
View File

@ -0,0 +1,47 @@
#!/bin/bash
# RAG 서비스 - API 서버와 Event Hub Consumer 동시 실행 스크립트
set -e # 에러 발생 시 스크립트 종료
echo "=========================================="
echo "RAG 서비스 시작"
echo "=========================================="
# 로그 디렉토리 생성
mkdir -p logs
# Event Hub Consumer를 백그라운드로 실행
echo "[1/2] Event Hub Consumer 시작..."
python start_consumer.py > logs/consumer.log 2>&1 &
CONSUMER_PID=$!
echo "Consumer PID: $CONSUMER_PID"
# API 서버 시작 (포그라운드)
echo "[2/2] REST API 서버 시작..."
python -m uvicorn src.api.main:app --host 0.0.0.0 --port 8000 &
API_PID=$!
echo "API Server PID: $API_PID"
# PID 파일 저장
echo $CONSUMER_PID > logs/consumer.pid
echo $API_PID > logs/api.pid
echo "=========================================="
echo "RAG 서비스 시작 완료"
echo " - API Server: http://0.0.0.0:8000"
echo " - Consumer PID: $CONSUMER_PID"
echo " - API PID: $API_PID"
echo "=========================================="
# 종료 시그널 처리 (graceful shutdown)
trap "echo 'Shutting down...'; kill $CONSUMER_PID $API_PID; exit 0" SIGTERM SIGINT
# 두 프로세스 모두 실행 중인지 모니터링
while kill -0 $CONSUMER_PID 2>/dev/null && kill -0 $API_PID 2>/dev/null; do
sleep 5
done
# 하나라도 종료되면 모두 종료
echo "One of the processes stopped. Shutting down all..."
kill $CONSUMER_PID $API_PID 2>/dev/null || true
wait

180
rag/start_all_services.py Normal file
View File

@ -0,0 +1,180 @@
"""
RAG 서비스 통합 실행 스크립트
API 서버와 Event Hub Consumer를 동시에 실행
"""
import asyncio
import logging
import multiprocessing
import signal
import sys
import time
from pathlib import Path
import uvicorn
from src.utils.config import load_config, get_database_url
from src.db.rag_minutes_db import RagMinutesDB
from src.db.postgres_vector import PostgresVectorDB
from src.utils.embedding import EmbeddingGenerator
from src.services.eventhub_consumer import start_consumer
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def run_api_server():
"""
REST API 서버 실행 (별도 프로세스)
"""
try:
logger.info("=" * 50)
logger.info("REST API 서버 시작")
logger.info("=" * 50)
uvicorn.run(
"src.api.main:app",
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True
)
except Exception as e:
logger.error(f"API 서버 실행 실패: {str(e)}")
sys.exit(1)
def run_event_consumer():
"""
Event Hub Consumer 실행 (별도 프로세스)
"""
try:
logger.info("=" * 50)
logger.info("Event Hub Consumer 시작")
logger.info("=" * 50)
# 설정 로드
config_path = Path(__file__).parent / "config.yaml"
config = load_config(str(config_path))
# 데이터베이스 연결
db_url = get_database_url(config)
rag_minutes_db = RagMinutesDB(db_url)
logger.info("RAG Minutes DB 연결 완료")
# 용어집 데이터베이스 연결
term_db = PostgresVectorDB(db_url)
logger.info("용어집 DB 연결 완료")
# Embedding 생성기 초기화
azure_openai = config["azure_openai"]
embedding_gen = EmbeddingGenerator(
api_key=azure_openai["api_key"],
endpoint=azure_openai["endpoint"],
model=azure_openai["embedding_model"],
dimension=azure_openai["embedding_dimension"],
api_version=azure_openai["api_version"]
)
logger.info("Embedding 생성기 초기화 완료")
# Event Hub Consumer 시작
asyncio.run(start_consumer(config, rag_minutes_db, embedding_gen, term_db))
except KeyboardInterrupt:
logger.info("Consumer 종료 신호 수신")
except Exception as e:
logger.error(f"Consumer 실행 실패: {str(e)}")
sys.exit(1)
def main():
"""
메인 함수: 프로세스를 생성하고 관리
"""
logger.info("=" * 60)
logger.info("RAG 서비스 통합 시작")
logger.info(" - REST API 서버: http://0.0.0.0:8000")
logger.info(" - Event Hub Consumer: Background")
logger.info("=" * 60)
# 프로세스 생성
api_process = multiprocessing.Process(
target=run_api_server,
name="API-Server"
)
consumer_process = multiprocessing.Process(
target=run_event_consumer,
name="Event-Consumer"
)
# 종료 시그널 핸들러
def signal_handler(signum, frame):
logger.info("\n종료 신호 수신. 프로세스 종료 중...")
if api_process.is_alive():
logger.info("API 서버 종료 중...")
api_process.terminate()
api_process.join(timeout=5)
if api_process.is_alive():
api_process.kill()
if consumer_process.is_alive():
logger.info("Consumer 종료 중...")
consumer_process.terminate()
consumer_process.join(timeout=5)
if consumer_process.is_alive():
consumer_process.kill()
logger.info("모든 프로세스 종료 완료")
sys.exit(0)
# 시그널 핸들러 등록
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
# 프로세스 시작
api_process.start()
time.sleep(2) # API 서버 시작 대기
consumer_process.start()
time.sleep(2) # Consumer 시작 대기
logger.info("=" * 60)
logger.info("모든 서비스 시작 완료")
logger.info(f" - API Server PID: {api_process.pid}")
logger.info(f" - Consumer PID: {consumer_process.pid}")
logger.info("=" * 60)
# 프로세스 모니터링
while True:
if not api_process.is_alive():
logger.error("API 서버 프로세스 종료됨")
consumer_process.terminate()
break
if not consumer_process.is_alive():
logger.error("Consumer 프로세스 종료됨")
api_process.terminate()
break
time.sleep(5)
# 대기
api_process.join()
consumer_process.join()
except Exception as e:
logger.error(f"서비스 실행 중 에러: {str(e)}")
api_process.terminate()
consumer_process.terminate()
sys.exit(1)
if __name__ == "__main__":
# multiprocessing을 위한 설정
multiprocessing.set_start_method('spawn', force=True)
main()

View File

@ -4,6 +4,7 @@ from pathlib import Path
from src.utils.config import load_config, get_database_url from src.utils.config import load_config, get_database_url
from src.db.rag_minutes_db import RagMinutesDB from src.db.rag_minutes_db import RagMinutesDB
from src.db.postgres_vector import PostgresVectorDB
from src.utils.embedding import EmbeddingGenerator from src.utils.embedding import EmbeddingGenerator
from src.services.eventhub_consumer import start_consumer from src.services.eventhub_consumer import start_consumer
@ -28,8 +29,11 @@ async def main():
# 데이터베이스 연결 # 데이터베이스 연결
db_url = get_database_url(config) db_url = get_database_url(config)
rag_minutes_db = RagMinutesDB(db_url) rag_minutes_db = RagMinutesDB(db_url)
logger.info("RAG Minutes DB 연결 완료")
logger.info("데이터베이스 연결 완료") # 용어집 데이터베이스 연결
term_db = PostgresVectorDB(db_url)
logger.info("용어집 DB 연결 완료")
# Embedding 생성기 초기화 # Embedding 생성기 초기화
azure_openai = config["azure_openai"] azure_openai = config["azure_openai"]
@ -45,7 +49,7 @@ async def main():
# Event Hub Consumer 시작 # Event Hub Consumer 시작
logger.info("Event Hub Consumer 시작...") logger.info("Event Hub Consumer 시작...")
await start_consumer(config, rag_minutes_db, embedding_gen) await start_consumer(config, rag_minutes_db, embedding_gen, term_db)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("프로그램 종료") logger.info("프로그램 종료")