feat: rag 서비스 Event Hub 연동 및 연관 회의록 API 추가

This commit is contained in:
djeon
2025-10-29 15:29:40 +09:00
parent 5859b1c498
commit ad7975efbd
20 changed files with 2855 additions and 22 deletions
+291
View File
@@ -0,0 +1,291 @@
# 연관 회의록 조회 API 구현 문서
## 📋 개요
**준호** (Backend Developer)
회의록 ID를 기준으로 유사한 연관 회의록을 조회하는 API를 구현했습니다.
## 🎯 구현 방식: Option A (DB 조회 후 벡터 검색)
### 선택 이유
-**정확도 우선**: full_content 기반 검색으로 높은 정확도 보장
-**확장성**: 향후 full_content 기반 추가 기능 확장 용이
-**성능 최적화**: Redis 캐싱으로 DB 조회 오버헤드 최소화
### 처리 흐름
```
1. 요청 수신
2. Redis 캐시 조회 (연관 회의록 결과)
├─ HIT → 즉시 반환
└─ MISS → 3단계로 진행
3. Redis 캐시 조회 (회의록 full_content)
├─ HIT → 5단계로 진행
└─ MISS → 4단계로 진행
4. DB 조회 (rag_minutes 테이블)
├─ full_content 획득
└─ Redis에 캐싱 (TTL: 30분)
5. 벡터 임베딩 생성 (Azure OpenAI)
6. 벡터 유사도 검색 (자기 자신 제외)
7. 결과 Redis 캐싱 (TTL: 1시간)
8. 응답 반환
```
## 🔧 구현 상세
### 1. 데이터 모델 추가
**파일**: `rag/src/models/minutes.py`
```python
class RelatedMinutesRequest(BaseModel):
"""연관 회의록 조회 요청"""
minute_id: str # 기준 회의록 ID
meeting_title: str # 회의 제목 (현재 미사용)
summary: str # 회의록 요약 (현재 미사용)
top_k: int = 5 # 반환할 최대 결과 수
similarity_threshold: float = 0.7 # 최소 유사도 임계값
class RelatedMinutesResponse(BaseModel):
"""연관 회의록 조회 응답"""
minutes: RagMinutes # 회의록 정보
similarity_score: float # 유사도 점수 (0.0 ~ 1.0)
```
### 2. DB 쿼리 함수 개선
**파일**: `rag/src/db/rag_minutes_db.py`
**수정 내용**: `search_by_vector()` 함수에 `exclude_minutes_id` 파라미터 추가
```python
def search_by_vector(
self,
query_embedding: List[float],
top_k: int = 5,
similarity_threshold: float = 0.7,
exclude_minutes_id: Optional[str] = None # 자기 자신 제외
) -> List[Dict[str, Any]]:
"""벡터 유사도 검색"""
# WHERE 절에 minutes_id != %s 조건 추가
# 자기 자신을 결과에서 제외
```
### 3. Redis 캐싱 유틸리티 구현
**파일**: `rag/src/utils/redis_cache.py`
**주요 기능**:
- `get(key)`: 캐시 조회
- `set(key, value, ttl)`: 캐시 저장
- `delete(key)`: 캐시 삭제
- `delete_pattern(pattern)`: 패턴 매칭 일괄 삭제
**특징**:
- Redis 연결 실패 시 자동으로 캐싱 비활성화 (서비스 장애 방지)
- JSON 직렬화/역직렬화 자동 처리
### 4. 설정 추가
**파일**: `rag/config.yaml`
```yaml
rag_minutes:
search:
top_k: 5
similarity_threshold: 0.7
cache:
ttl: 1800 # 회의록 조회 결과: 30분
prefix: "minutes:"
related_ttl: 3600 # 연관 회의록 검색 결과: 1시간
```
### 5. API 엔드포인트 구현
**파일**: `rag/src/api/main.py`
**엔드포인트**: `POST /api/minutes/related`
**Request Body**:
```json
{
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립을 위한 회의",
"top_k": 5,
"similarity_threshold": 0.7
}
```
**Response**:
```json
[
{
"minutes": {
"meeting_id": "MTG-2024-050",
"title": "2024 Q4 마케팅 성과 분석",
"minutes_id": "MIN-2024-050",
"full_content": "...",
...
},
"similarity_score": 0.85
},
...
]
```
## ⚡ 성능 최적화
### 2단계 캐싱 전략
1. **회의록 조회 결과 캐싱** (TTL: 30분)
- 키: `minutes:{minute_id}`
- DB 조회 횟수 감소
2. **연관 회의록 검색 결과 캐싱** (TTL: 1시간)
- 키: `minutes:related:{minute_id}:{top_k}:{threshold}`
- 벡터 검색 및 임베딩 생성 비용 절감
### 예상 성능 개선
| 케이스 | 처리 시간 | 비고 |
|--------|----------|------|
| 캐시 MISS (최초 요청) | ~2-3초 | DB 조회 + 임베딩 생성 + 벡터 검색 |
| 회의록 캐시 HIT | ~1-2초 | 임베딩 생성 + 벡터 검색 |
| 연관 회의록 캐시 HIT | ~50ms | 캐시에서 즉시 반환 |
## 🔍 사용 예시
### Python (requests)
```python
import requests
url = "http://localhost:8000/api/minutes/related"
data = {
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립",
"top_k": 5,
"similarity_threshold": 0.7
}
response = requests.post(url, json=data)
related_minutes = response.json()
for item in related_minutes:
print(f"제목: {item['minutes']['title']}")
print(f"유사도: {item['similarity_score']:.2f}")
print("---")
```
### curl
```bash
curl -X POST "http://localhost:8000/api/minutes/related" \
-H "Content-Type: application/json" \
-d '{
"minute_id": "MIN-2025-001",
"meeting_title": "2025 Q1 마케팅 전략 회의",
"summary": "2025년 1분기 마케팅 전략 수립",
"top_k": 5,
"similarity_threshold": 0.7
}'
```
## 🧪 테스트 시나리오
### 1. 정상 케이스
- ✅ 존재하는 minute_id로 요청
- ✅ 유사한 회의록 5개 반환
- ✅ 자기 자신은 결과에서 제외
### 2. 캐싱 동작 확인
- ✅ 최초 요청: 캐시 MISS → DB 조회
- ✅ 2번째 요청: 캐시 HIT → 즉시 반환
- ✅ TTL 경과 후: 캐시 MISS → 재조회
### 3. 에러 케이스
- ❌ 존재하지 않는 minute_id → 404 오류
- ❌ 잘못된 요청 형식 → 422 오류
## 📊 데이터베이스 영향
### 쿼리 패턴
```sql
-- 1. 회의록 조회 (캐시 MISS 시)
SELECT * FROM rag_minutes WHERE minutes_id = 'MIN-2025-001';
-- 2. 벡터 유사도 검색 (캐시 MISS 시)
SELECT *,
1 - (embedding <=> '{vector}'::vector) as similarity_score
FROM rag_minutes
WHERE embedding IS NOT NULL
AND 1 - (embedding <=> '{vector}'::vector) >= 0.7
AND minutes_id != 'MIN-2025-001' -- 자기 자신 제외
ORDER BY embedding <=> '{vector}'::vector
LIMIT 5;
```
### 인덱스 활용
- `minutes_id`: Primary Key 인덱스 활용
- `embedding`: pgvector 인덱스 활용 (IVFFlat 또는 HNSW)
## 🔐 보안 고려사항
1. **SQL Injection 방지**: psycopg2 파라미터 바인딩 사용
2. **Rate Limiting**: 향후 추가 권장 (분당 요청 수 제한)
3. **인증/인가**: 향후 JWT 토큰 기반 인증 추가 권장
## 🚀 향후 개선 사항
### 1. Hybrid 검색 지원 (Option B 통합)
- summary를 활용한 빠른 1차 검색
- full_content로 정밀한 2차 검색
- 적응형 임계값 조정
### 2. 성능 모니터링
- 캐시 히트율 추적
- API 응답 시간 측정
- 벡터 검색 성능 분석
### 3. 검색 품질 개선
- 시간 가중치 적용 (최근 회의록 우선)
- 부서/팀별 필터링 옵션
- 주제별 카테고리 분류
## 📝 파일 변경 목록
| 파일 | 변경 유형 | 설명 |
|------|----------|------|
| `rag/src/models/minutes.py` | 수정 | Request/Response 모델 추가 |
| `rag/src/db/rag_minutes_db.py` | 수정 | exclude_minutes_id 파라미터 추가 |
| `rag/src/utils/redis_cache.py` | 신규 | Redis 캐싱 유틸리티 |
| `rag/src/api/main.py` | 수정 | API 엔드포인트 추가 |
| `rag/config.yaml` | 수정 | rag_minutes 설정 추가 |
## ✅ 구현 완료 체크리스트
- [x] Request/Response 모델 정의
- [x] DB 쿼리 함수 개선 (자기 자신 제외)
- [x] Redis 캐싱 유틸리티 구현
- [x] API 엔드포인트 구현
- [x] 2단계 캐싱 전략 적용
- [x] 설정 파일 업데이트
- [x] 문서 작성
---
**작성자**: 준호 (Backend Developer)
**작성일**: 2025-10-29
**버전**: 1.0.0
+356
View File
@@ -0,0 +1,356 @@
# Event Hub Basic Tier 제약사항 및 해결 방법
## 현재 상황
```
Event Hub Namespace: hgzero-eventhub-ns
Resource Group: rg-digitalgarage-02
Location: koreacentral
Tier: Basic ⚠️
Status: Active
```
## Basic Tier 제약사항
| 기능 | Basic | Standard | Premium |
|------|-------|----------|---------|
| **Consumer Groups** | **1개만** ($Default) | **최대 20개** | 최대 100개 |
| 파티션 | 최대 32개 | 최대 32개 | 최대 100개 |
| 메시지 보존 | 1일 | 7일 | 90일 |
| Capture | ❌ | ✅ | ✅ |
| 가격 | 낮음 | 중간 | 높음 |
**Basic Tier에서는 `$Default` Consumer Group만 사용할 수 있습니다.**
---
## 해결 방법
### 방법 1: Tier 업그레이드 (권장) ⭐️
Event Hub를 Basic → Standard로 업그레이드하면 최대 20개의 Consumer Group 사용 가능
#### 업그레이드 방법
**Azure Portal:**
1. Event Hub Namespace (hgzero-eventhub-ns) 페이지로 이동
2. 왼쪽 메뉴에서 "Pricing tier" 클릭
3. "Standard" 선택
4. "Apply" 버튼 클릭
5. 약 1-2분 후 적용 완료
**Azure CLI:**
```bash
az eventhubs namespace update \
--resource-group rg-digitalgarage-02 \
--name hgzero-eventhub-ns \
--sku Standard \
--capacity 1
```
**비용:**
- Basic: ~$0.015/시간 (~$11/월)
- Standard: ~$0.03/시간 (~$22/월)
- 차이: 약 $11/월 추가
**장점:**
- ✅ 여러 Consumer Group 사용 가능
- ✅ 메시지 보존 기간 7일로 증가
- ✅ Capture 기능 사용 가능
- ✅ 영구적 해결책
**단점:**
- ❌ 비용 증가 (약 2배)
- ❌ 관리자 승인 필요할 수 있음
---
### 방법 2: $Default Consumer Group 공유 사용 (임시)
개발/테스트 시 기존 프로덕션 Consumer를 잠시 중지하고 테스트
#### 실행 순서
**Step 1: 프로덕션 Consumer 중지**
```bash
# 기존 Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# PID 확인 후 종료
kill <PID>
# 예시
kill 51257
```
**Step 2: 테스트 Consumer 실행**
```bash
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
**Step 3: 테스트 완료 후 프로덕션 재시작**
```bash
# Ctrl+C로 테스트 Consumer 종료
# 프로덕션 Consumer 재시작
python start_consumer.py &
```
**장점:**
- ✅ 추가 비용 없음
- ✅ 즉시 테스트 가능
- ✅ 권한 불필요
**단점:**
- ❌ 프로덕션 서비스 중단
- ❌ 동시 실행 불가
- ❌ 임시 방법
**주의사항:**
- ⚠️ 프로덕션 환경에서만 사용 중이라면 비추천
- ⚠️ 업무 시간 외에만 테스트
- ⚠️ 테스트 후 반드시 프로덕션 재시작 확인
---
### 방법 3: Checkpoint 위치 변경으로 독립 실행
동일한 Consumer Group을 사용하되, checkpoint 저장 위치를 다르게 하여 독립적으로 실행
#### 구현 방법
**Step 1: 별도 Storage Container 생성**
Azure Portal:
1. Storage Account (hgzerostorage) 접속
2. "Containers" 클릭
3. "+ Container" 클릭
4. 이름: `hgzero-checkpoints-dev`
5. Create
**Step 2: config_dev.yaml 생성**
```yaml
# config_dev.yaml
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: "$Default" # 동일
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: "hgzero-checkpoints-dev" # 다른 컨테이너!
# 나머지는 config.yaml과 동일
```
**Step 3: 개발용 Consumer 실행 스크립트**
```python
# start_dev_consumer.py
import asyncio
from pathlib import Path
from src.utils.config import load_config
from src.db.rag_minutes_db import RagMinutesDB
from src.db.postgres_vector import PostgresVectorDB
from src.utils.embedding import EmbeddingGenerator
from src.services.eventhub_consumer import start_consumer
async def main():
# 개발용 설정 파일 로드
config_path = Path(__file__).parent / "config_dev.yaml"
config = load_config(str(config_path))
# ... 나머지는 start_consumer.py와 동일
if __name__ == "__main__":
asyncio.run(main())
```
**Step 4: 실행**
```bash
# Terminal 1: 프로덕션 Consumer
python start_consumer.py
# Terminal 2: 개발 Consumer (다른 checkpoint)
python start_dev_consumer.py
```
**장점:**
- ✅ 동시 실행 가능
- ✅ 독립적인 checkpoint
- ✅ 추가 비용 거의 없음 (Storage만)
- ✅ Tier 업그레이드 불필요
**단점:**
- ❌ 동일 파티션을 두 Consumer가 읽으려 하면 ownership 경쟁
- ❌ 한쪽만 이벤트 수신 (ownership을 가진 쪽)
- ❌ 완전한 독립 실행은 아님
**결론:**
- 이 방법은 **checkpoint만 독립**이고, **여전히 ownership 경쟁** 발생
- **동시 실행은 불가능**
- 권장하지 않음 ❌
---
### 방법 4: 로컬 개발 환경에서만 테스트
Event Hub 대신 로컬 메시지 큐 사용
#### 옵션 A: Azure Event Hub Emulator (권장)
현재는 공식 Emulator가 없으므로, Kafka나 RabbitMQ로 대체
#### 옵션 B: 메모리 큐로 테스트
```python
# test_consumer_local.py
import asyncio
import json
from queue import Queue
# 로컬 메모리 큐
local_queue = Queue()
async def test_event_processing():
"""로컬에서 이벤트 처리 로직만 테스트"""
# 테스트 이벤트 생성
test_event = {
'eventType': 'MINUTES_FINALIZED',
'data': {
'meetingId': 'test-001',
'title': '테스트 회의',
'minutesId': 'minutes-001',
'sections': []
}
}
# Consumer의 _process_minutes_event 로직만 테스트
from src.services.eventhub_consumer import EventHubConsumer
# 실제 처리 로직 테스트
# ...
asyncio.run(test_event_processing())
```
**장점:**
- ✅ 완전 독립 실행
- ✅ 무료
- ✅ 빠른 테스트
**단점:**
- ❌ 실제 Event Hub와 다른 환경
- ❌ 통합 테스트 불가
---
## 권장 솔루션
### 개발 단계별 권장 방법
| 단계 | 권장 방법 | 이유 |
|------|----------|------|
| **로컬 개발** | 방법 4 (메모리 큐) | 빠르고 비용 없음 |
| **통합 테스트** | 방법 2 ($Default 공유) | 실제 환경, 단기간 |
| **지속적 개발** | 방법 1 (Tier 업그레이드) | 영구 해결, 동시 실행 |
| **프로덕션** | 방법 1 (Standard Tier) | 고가용성, 확장성 |
### 최종 권장: Tier 업그레이드
**이유:**
1. ✅ 여러 Consumer Group으로 개발/테스트 분리
2. ✅ 메시지 보존 기간 증가 (1일 → 7일)
3. ✅ 향후 확장 가능 (Capture 등)
4. ✅ 프로덕션 안정성 향상
**비용 대비 효과:**
- 월 $11 추가로 개발 생산성 향상
- 프로덕션 중단 없이 테스트 가능
- 장기적으로 더 경제적
---
## 실행 가이드
### 즉시 테스트가 필요한 경우 (방법 2)
```bash
# 1. 프로덕션 Consumer 중지
ps aux | grep "start_consumer.py" | grep -v grep
kill <PID>
# 2. 테스트 실행
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
# 3. 테스트 완료 후
# Ctrl+C로 종료
# 4. 프로덕션 재시작
python start_consumer.py &
```
### Tier 업그레이드 후 (방법 1)
```bash
# 1. Tier 업그레이드 (Azure Portal 또는 CLI)
az eventhubs namespace update \
--resource-group rg-digitalgarage-02 \
--name hgzero-eventhub-ns \
--sku Standard
# 2. Consumer Group 생성
az eventhubs eventhub consumer-group create \
--resource-group rg-digitalgarage-02 \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--name development
# 3. config.yaml 수정
# consumer_group: "development"
# 4. 프로덕션은 계속 실행, 개발 Consumer 별도 실행
python start_consumer.py
```
---
## 요약
### 현재 상황
- ❌ Event Hub Tier: **Basic**
- ❌ Consumer Group: **$Default만 사용 가능**
- ❌ 추가 Consumer Group 생성 불가
### 해결책
1. **단기**: 프로덕션 Consumer 잠시 중지하고 테스트 (방법 2)
2. **장기**: Standard Tier로 업그레이드 (방법 1) ⭐️
### 다음 액션
관리자에게 **Tier 업그레이드** 요청하거나,
긴급하다면 **프로덕션 중지 후 테스트** 진행
---
## 참고: Tier 비교
```
Basic Tier (현재)
├─ Consumer Groups: 1개 ($Default만)
├─ 보존 기간: 1일
├─ 비용: ~$11/월
└─ ❌ 개발/테스트 분리 불가
Standard Tier (권장)
├─ Consumer Groups: 최대 20개 ✅
├─ 보존 기간: 7일
├─ 비용: ~$22/월 (+$11)
└─ ✅ 개발/테스트 완전 분리
```
Standard Tier로 업그레이드하는 것을 강력히 권장합니다! 🎯
+367
View File
@@ -0,0 +1,367 @@
# Event Hub Active Consumers 조회 가이드
## 개요
Event Hub를 현재 읽고 있는 Consumer(host)를 확인하는 방법을 설명합니다.
## 방법 1: 제공된 스크립트 사용 (가장 쉬움) ⭐️
### 실행
```bash
cd /Users/daewoong/home/workspace/HGZero/rag
python check_active_consumers.py
```
### 출력 정보
```
📊 Event Hub Active Consumers
================================================================================
Consumer Group: $Default
Container: hgzero-checkpoints
🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)
--------------------------------------------------------------------------------
파티션 0:
상태: 🟢 ACTIVE
Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95...
마지막 갱신: 2025-10-29 12:35:42 UTC
경과 시간: 15초 전
📋 Consumer 요약
--------------------------------------------------------------------------------
Consumer #1 🟢 ACTIVE
Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95
소유 파티션: 0
파티션 개수: 1
마지막 활동: 2025-10-29 12:35:42 UTC
📍 Checkpoint 정보 (마지막 읽은 위치)
--------------------------------------------------------------------------------
파티션 0:
Offset: 120259090624
Sequence Number: 232
```
---
## 방법 2: Python 코드로 직접 조회
### 스크립트 예시
```python
import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from src.utils.config import load_config
async def check_consumers():
"""활성 Consumer 조회"""
# 설정 로드
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
storage_conn_str = eventhub_config['storage']['connection_string']
container_name = eventhub_config['storage']['container_name']
consumer_group = eventhub_config['consumer_group']
# Blob Service Client
blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
container = blob_service.get_container_client(container_name)
# Ownership 정보 조회
ownership_prefix = f"{consumer_group}/ownership/"
print(f"Consumer Group: {consumer_group}\n")
for blob in container.list_blobs(name_starts_with=ownership_prefix):
blob_client = container.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
ownership_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
owner_id = ownership_data.get('ownerIdentifier', 'unknown')
# 활성 상태 확인 (60초 이내 갱신)
now = datetime.now(timezone.utc)
time_diff = (now - blob.last_modified).total_seconds()
is_active = time_diff < 60
status = "🟢 ACTIVE" if is_active else "🔴 INACTIVE"
print(f"파티션 {partition_id}: {status}")
print(f" Owner: {owner_id}")
print(f" 갱신: {int(time_diff)}초 전\n")
asyncio.run(check_consumers())
```
---
## 방법 3: Azure Portal에서 확인
### 단계
1. **Azure Portal** 접속 (https://portal.azure.com)
2. **Storage Account** 이동
- "hgzerostorage" 검색
3. **Containers** 클릭
- "hgzero-checkpoints" 선택
4. **폴더 구조 확인**
```
$Default/
├── ownership/
│ └── 0 ← 파티션 0의 소유권 정보
└── checkpoint/
└── 0 ← 파티션 0의 checkpoint
```
5. **ownership/0 파일 다운로드** 또는 **View** 클릭
6. **JSON 내용 확인**
```json
{
"ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95",
"lastModifiedTime": "2025-10-29T12:35:42Z",
...
}
```
### 정보 해석
- **ownerIdentifier**: 현재 Consumer의 고유 ID
- **lastModifiedTime**: 마지막 lease 갱신 시간
- 60초 이내: 🟢 활성 상태
- 60초 이상: 🔴 비활성 (Consumer 종료됨)
---
## 방법 4: Azure CLI로 조회
### Blob 목록 확인
```bash
# Storage Account 키 조회
az storage account keys list \
--resource-group rg-digitalgarage-02 \
--account-name hgzerostorage \
--query "[0].value" -o tsv
# Blob 목록 조회
az storage blob list \
--account-name hgzerostorage \
--container-name hgzero-checkpoints \
--prefix "\$Default/ownership/" \
--output table
```
### Blob 내용 다운로드
```bash
# ownership 정보 다운로드
az storage blob download \
--account-name hgzerostorage \
--container-name hgzero-checkpoints \
--name "\$Default/ownership/0" \
--file /tmp/ownership.json
# 내용 확인
cat /tmp/ownership.json | python3 -m json.tool
```
---
## 방법 5: 프로세스 레벨에서 확인
### 로컬 머신에서 실행 중인 Consumer
```bash
# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# 출력:
# daewoong 81447 0.0 0.2 python start_consumer.py
```
### 네트워크 연결 확인
```bash
# Event Hub와의 연결 확인
lsof -i -n | grep 81447 | grep ESTABLISHED
# 출력:
# python3.1 81447 daewoong 9u IPv6 TCP ... -> ...servicebus.windows.net:https (ESTABLISHED)
```
**의미:**
- Consumer 프로세스가 실행 중
- Event Hub와 HTTPS 연결 유지 중
- 이벤트 수신 대기 중
---
## 주요 개념
### Ownership (소유권)
```
파티션 0
Ownership Blob (Blob Storage)
{
"ownerIdentifier": "consumer-uuid",
"lastModifiedTime": "2025-10-29T12:35:42Z",
"eTag": "...",
...
}
Lease 메커니즘 (30초마다 갱신)
갱신 실패 시 → 다른 Consumer가 인수 가능
```
### Checkpoint (읽은 위치)
```
파티션 0
Checkpoint Blob
{
"offset": "120259090624",
"sequenceNumber": 232,
...
}
Consumer 재시작 시 → 이 위치부터 재개
```
### Consumer 상태 판단
| 조건 | 상태 | 의미 |
|------|------|------|
| lastModifiedTime < 60초 | 🟢 ACTIVE | 정상 동작 중 |
| 60초 < lastModifiedTime < 180초 | 🟡 WARNING | Lease 갱신 지연 |
| lastModifiedTime > 180초 | 🔴 INACTIVE | Consumer 종료 |
---
## 트러블슈팅
### 문제 1: Ownership 정보가 없음
**증상:**
```
⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.
```
**원인:**
- Consumer가 한 번도 실행되지 않음
- Consumer가 ownership claim에 실패함
**해결:**
1. Consumer 프로세스 확인: `ps aux | grep start_consumer`
2. Consumer 로그 확인
3. 권한 확인 (Storage Account 접근 권한)
### 문제 2: 여러 Owner ID가 나타남
**증상:**
```
Consumer #1: owner-id-1 (파티션 0, 2, 4)
Consumer #2: owner-id-2 (파티션 1, 3, 5)
```
**원인:**
- 정상: 여러 Consumer가 파티션을 나눠서 처리
- 파티션이 여러 개이고 Consumer도 여러 개
**확인:**
- 각 파티션이 하나의 Consumer만 소유하면 정상
- 동일 파티션을 여러 Consumer가 소유하면 문제
### 문제 3: lastModified가 오래됨
**증상:**
```
파티션 0:
상태: 🔴 INACTIVE
마지막 갱신: 2025-10-29 10:00:00 UTC
경과 시간: 7200초 전 (2시간)
```
**원인:**
- Consumer가 종료됨
- Consumer가 응답 없음 (hang)
- 네트워크 문제
**해결:**
1. Consumer 프로세스 확인
2. Consumer 재시작
3. 로그 확인하여 에러 파악
---
## 실시간 모니터링
### 스크립트: 주기적 확인
```bash
#!/bin/bash
# monitor_consumers.sh
while true; do
clear
echo "=== $(date) ==="
python check_active_consumers.py
sleep 30 # 30초마다 갱신
done
```
### 실행
```bash
chmod +x monitor_consumers.sh
./monitor_consumers.sh
```
---
## 요약
### 빠른 확인 방법
```bash
# 1. 스크립트 실행 (가장 쉬움)
python check_active_consumers.py
# 2. 프로세스 확인
ps aux | grep start_consumer
# 3. Azure Portal에서 Blob 확인
# Storage Account > Containers > hgzero-checkpoints > $Default/ownership
```
### 확인 항목 체크리스트
- [ ] Consumer 프로세스 실행 중인가?
- [ ] Ownership 정보가 있는가?
- [ ] lastModified가 60초 이내인가?
- [ ] 각 파티션이 하나의 Consumer만 소유하는가?
- [ ] Checkpoint가 갱신되고 있는가?
모든 항목이 체크되면 Consumer가 정상 동작 중입니다! ✅
---
## 참고 파일
- 스크립트: `/Users/daewoong/home/workspace/HGZero/rag/check_active_consumers.py`
- 설정: `/Users/daewoong/home/workspace/HGZero/rag/config.yaml`
- Blob Storage: `hgzerostorage/hgzero-checkpoints`
+494
View File
@@ -0,0 +1,494 @@
# Consumer Group 생성 및 할당 가이드
## 목차
1. [Consumer Group이란?](#consumer-group이란)
2. [생성 방법](#생성-방법)
3. [코드에서 사용하기](#코드에서-사용하기)
4. [검증 방법](#검증-방법)
5. [트러블슈팅](#트러블슈팅)
---
## Consumer Group이란?
Consumer Group은 Event Hub의 이벤트를 **독립적으로 읽는 논리적 그룹**입니다.
### 주요 특징
- 각 Consumer Group은 독립적인 checkpoint를 유지
- 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음
- 기본 Consumer Group: `$Default` (자동 생성됨)
### 사용 사례
| Consumer Group | 용도 | 설명 |
|---------------|------|------|
| `$Default` | 프로덕션 | 실제 운영 환경에서 사용 |
| `development` | 개발 | 개발자가 로컬에서 테스트 |
| `test` | 테스트 | QA 테스트 환경 |
| `analytics` | 분석 | 데이터 분석 및 모니터링 |
| `backup` | 백업 | 이벤트 백업 및 아카이빙 |
---
## 생성 방법
### 방법 1: Azure Portal (가장 쉬움) ⭐️
#### 단계별 가이드
**Step 1: Azure Portal 접속**
```
https://portal.azure.com
```
**Step 2: Event Hub Namespace 찾기**
1. 상단 검색창에 "hgzero-eventhub-ns" 입력
2. "Event Hubs Namespaces" 아래의 결과 클릭
**Step 3: Event Hub 선택**
1. 왼쪽 메뉴에서 "Event Hubs" 클릭
2. "hgzero-eventhub-name" 선택
**Step 4: Consumer Groups 관리**
1. 왼쪽 메뉴에서 "Consumer groups" 클릭
2. 현재 목록에 `$Default`만 있을 것임
**Step 5: 새 Consumer Group 생성**
1. 상단의 "+ Consumer group" 버튼 클릭
2. Name 입력:
- 개발용: `development`
- 테스트용: `test`
- 분석용: `analytics`
3. "Create" 버튼 클릭
**Step 6: 생성 확인**
- 목록에 새로운 Consumer Group이 추가되었는지 확인
- 예: `$Default`, `development`, `test`
---
### 방법 2: Azure CLI
#### 사전 요구사항
```bash
# Azure CLI 설치 확인
az --version
# 로그인
az login
```
#### Consumer Group 생성
```bash
# 리소스 그룹 확인 (필요 시)
az eventhubs namespace show \
--name hgzero-eventhub-ns \
--query resourceGroup -o tsv
# Consumer Group 생성
az eventhubs eventhub consumer-group create \
--resource-group <YOUR_RESOURCE_GROUP> \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--name development
# 생성 확인
az eventhubs eventhub consumer-group list \
--resource-group <YOUR_RESOURCE_GROUP> \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--output table
```
#### 출력 예시
```
Name ResourceGroup
----------- ---------------
$Default hgzero-rg
development hgzero-rg
test hgzero-rg
```
---
### 방법 3: Python Management SDK
#### 설치
```bash
pip install azure-mgmt-eventhub azure-identity
```
#### 코드
```python
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential
# 인증
credential = DefaultAzureCredential()
subscription_id = "<YOUR_SUBSCRIPTION_ID>"
# 관리 클라이언트 생성
mgmt_client = EventHubManagementClient(credential, subscription_id)
# Consumer Group 생성
mgmt_client.consumer_groups.create_or_update(
resource_group_name='<YOUR_RESOURCE_GROUP>',
namespace_name='hgzero-eventhub-ns',
event_hub_name='hgzero-eventhub-name',
consumer_group_name='development',
parameters={} # 추가 설정 가능
)
print("✅ Consumer Group 'development' 생성 완료")
# Consumer Group 목록 조회
consumer_groups = mgmt_client.consumer_groups.list_by_event_hub(
resource_group_name='<YOUR_RESOURCE_GROUP>',
namespace_name='hgzero-eventhub-ns',
event_hub_name='hgzero-eventhub-name'
)
print("\n현재 Consumer Groups:")
for cg in consumer_groups:
print(f" - {cg.name}")
```
---
## 코드에서 사용하기
### 1. config.yaml 수정
#### 기존 설정
```yaml
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP} # "$Default"
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
```
#### 개발 환경용 설정
```yaml
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: "development" # 직접 지정
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
```
### 2. .env 파일 수정 (선택사항)
#### 환경 변수로 관리하는 경우
```bash
# .env
EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
EVENTHUB_NAME=hgzero-eventhub-name
AZURE_EVENTHUB_CONSUMER_GROUP=development # 변경
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..."
AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints
```
### 3. Consumer 실행
#### 개발 환경
```bash
# config.yaml의 consumer_group을 "development"로 설정
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
#### 프로덕션 환경
```bash
# config.yaml의 consumer_group을 "$Default"로 설정
python start_consumer.py
```
#### 여러 환경 동시 실행
```bash
# Terminal 1: 프로덕션 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py
# Terminal 2: 개발 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py
# Terminal 3: 테스트 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py
```
---
## 검증 방법
### 1. Consumer Group 목록 확인
#### Python으로 확인
```python
import asyncio
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from src.utils.config import load_config
async def list_consumer_groups():
"""사용 가능한 Consumer Group 확인"""
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
# 여기서는 실제로 Management API를 사용해야 하지만
# Consumer Client로는 자신이 속한 그룹만 확인 가능
print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}")
asyncio.run(list_consumer_groups())
```
### 2. 다른 Consumer Group으로 테스트
```python
import asyncio
import json
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from src.utils.config import load_config
async def test_consumer_group(consumer_group_name):
"""특정 Consumer Group으로 이벤트 수신 테스트"""
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
# Checkpoint Store
checkpoint_store = BlobCheckpointStore.from_connection_string(
eventhub_config['storage']['connection_string'],
eventhub_config['storage']['container_name']
)
# Consumer Client (Consumer Group 지정)
client = EventHubConsumerClient.from_connection_string(
eventhub_config['connection_string'],
consumer_group=consumer_group_name, # 여기서 지정!
eventhub_name=eventhub_config['name'],
checkpoint_store=checkpoint_store
)
print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...")
event_count = 0
async def on_event(partition_context, event):
nonlocal event_count
event_count += 1
print(f"이벤트 수신: #{event_count}")
await partition_context.update_checkpoint(event)
async def on_error(partition_context, error):
print(f"에러: {error}")
try:
async with client:
receive_task = asyncio.create_task(
client.receive(
on_event=on_event,
on_error=on_error,
starting_position="@earliest"
)
)
await asyncio.sleep(10) # 10초 대기
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신")
except Exception as e:
print(f"❌ 에러 발생: {str(e)}")
# 테스트 실행
asyncio.run(test_consumer_group("development"))
```
### 3. Checkpoint 저장 위치 확인
각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다:
```
hgzero-checkpoints (Container)
├─ $Default/
│ ├─ ownership/
│ │ └─ 0 (파티션 0의 소유권 정보)
│ └─ checkpoint/
│ └─ 0 (파티션 0의 checkpoint)
├─ development/
│ ├─ ownership/
│ │ └─ 0
│ └─ checkpoint/
│ └─ 0
└─ test/
├─ ownership/
│ └─ 0
└─ checkpoint/
└─ 0
```
---
## 트러블슈팅
### 문제 1: Consumer Group을 찾을 수 없음
**증상**
```
azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found.
```
**원인**: Consumer Group이 실제로 생성되지 않음
**해결**:
1. Azure Portal에서 Consumer Group 목록 확인
2. 없으면 생성
3. 이름 오타 확인 (대소문자 구분)
### 문제 2: Ownership claim 실패
**증상**
```
EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming.
```
**원인**:
- 동일 Consumer Group에 이미 다른 Consumer가 실행 중
- 파티션보다 Consumer가 많음
**해결**:
1. 기존 Consumer 종료
2. 다른 Consumer Group 사용
3. 파티션 수 증가
### 문제 3: 이벤트를 읽지 못함
**증상**: Consumer는 실행되지만 이벤트가 수신되지 않음
**원인**:
- Starting position이 최신으로 설정됨
- 새 이벤트가 없음
**해결**:
```python
# config.yaml 또는 코드에서
await client.receive(
on_event=on_event,
on_error=on_error,
starting_position="@earliest" # 처음부터 읽기
)
```
### 문제 4: Checkpoint가 초기화되지 않음
**증상**: 새 Consumer Group인데 이미 읽은 것처럼 동작
**원인**: Blob Storage에 이전 checkpoint가 남아있음
**해결**:
```bash
# Azure Portal에서
# Storage Account > Containers > hgzero-checkpoints
# 해당 Consumer Group 폴더 삭제
```
또는 코드로:
```python
from azure.storage.blob import BlobServiceClient
blob_service = BlobServiceClient.from_connection_string(
AZURE_STORAGE_CONNECTION_STRING
)
container = blob_service.get_container_client("hgzero-checkpoints")
# 특정 Consumer Group의 checkpoint 삭제
prefix = "development/"
blobs = container.list_blobs(name_starts_with=prefix)
for blob in blobs:
container.delete_blob(blob.name)
print(f"삭제: {blob.name}")
```
---
## 권장 설정
### 환경별 Consumer Group 전략
```yaml
# 프로덕션
production:
consumer_group: "$Default"
starting_position: "-1" # 최신 이벤트부터
checkpoint_interval: 30 # 30초마다 checkpoint
# 개발
development:
consumer_group: "development"
starting_position: "@earliest" # 처음부터
checkpoint_interval: 10 # 자주 checkpoint (테스트용)
# 테스트
test:
consumer_group: "test"
starting_position: "@earliest"
checkpoint_interval: 5 # 매우 자주 (빠른 테스트)
# 분석
analytics:
consumer_group: "analytics"
starting_position: "@earliest" # 모든 데이터 분석
checkpoint_interval: 60 # 덜 자주 (성능)
```
### 실행 스크립트 예시
```bash
#!/bin/bash
# start_dev_consumer.sh
export AZURE_EVENTHUB_CONSUMER_GROUP=development
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
```
---
## 요약
### 빠른 시작 체크리스트
- [ ] Azure Portal에서 Consumer Group 생성
- [ ] 이름: `development`
- [ ] Event Hub: `hgzero-eventhub-name`
- [ ] `config.yaml` 수정
- [ ] `consumer_group: "development"` 설정
- [ ] Consumer 실행
- [ ] `python start_consumer.py`
- [ ] 검증
- [ ] 이벤트 수신 확인
- [ ] Blob Storage에 checkpoint 생성 확인
### Consumer Group 활용 팁
1. **프로덕션과 개발 분리**: 항상 다른 Consumer Group 사용
2. **테스트는 독립적으로**: `test` Consumer Group으로 자유롭게 실험
3. **Checkpoint 관리**: 필요시 삭제하여 처음부터 재처리
4. **모니터링**: 각 Consumer Group별 lag 모니터링
5. **비용 최적화**: 불필요한 Consumer Group은 삭제
이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯