mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 19:36:23 +00:00
495 lines
12 KiB
Markdown
495 lines
12 KiB
Markdown
# Consumer Group 생성 및 할당 가이드
|
|
|
|
## 목차
|
|
1. [Consumer Group이란?](#consumer-group이란)
|
|
2. [생성 방법](#생성-방법)
|
|
3. [코드에서 사용하기](#코드에서-사용하기)
|
|
4. [검증 방법](#검증-방법)
|
|
5. [트러블슈팅](#트러블슈팅)
|
|
|
|
---
|
|
|
|
## Consumer Group이란?
|
|
|
|
Consumer Group은 Event Hub의 이벤트를 **독립적으로 읽는 논리적 그룹**입니다.
|
|
|
|
### 주요 특징
|
|
- 각 Consumer Group은 독립적인 checkpoint를 유지
|
|
- 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음
|
|
- 기본 Consumer Group: `$Default` (자동 생성됨)
|
|
|
|
### 사용 사례
|
|
| Consumer Group | 용도 | 설명 |
|
|
|---------------|------|------|
|
|
| `$Default` | 프로덕션 | 실제 운영 환경에서 사용 |
|
|
| `development` | 개발 | 개발자가 로컬에서 테스트 |
|
|
| `test` | 테스트 | QA 테스트 환경 |
|
|
| `analytics` | 분석 | 데이터 분석 및 모니터링 |
|
|
| `backup` | 백업 | 이벤트 백업 및 아카이빙 |
|
|
|
|
---
|
|
|
|
## 생성 방법
|
|
|
|
### 방법 1: Azure Portal (가장 쉬움) ⭐️
|
|
|
|
#### 단계별 가이드
|
|
|
|
**Step 1: Azure Portal 접속**
|
|
```
|
|
https://portal.azure.com
|
|
```
|
|
|
|
**Step 2: Event Hub Namespace 찾기**
|
|
1. 상단 검색창에 "hgzero-eventhub-ns" 입력
|
|
2. "Event Hubs Namespaces" 아래의 결과 클릭
|
|
|
|
**Step 3: Event Hub 선택**
|
|
1. 왼쪽 메뉴에서 "Event Hubs" 클릭
|
|
2. "hgzero-eventhub-name" 선택
|
|
|
|
**Step 4: Consumer Groups 관리**
|
|
1. 왼쪽 메뉴에서 "Consumer groups" 클릭
|
|
2. 현재 목록에 `$Default`만 있을 것임
|
|
|
|
**Step 5: 새 Consumer Group 생성**
|
|
1. 상단의 "+ Consumer group" 버튼 클릭
|
|
2. Name 입력:
|
|
- 개발용: `development`
|
|
- 테스트용: `test`
|
|
- 분석용: `analytics`
|
|
3. "Create" 버튼 클릭
|
|
|
|
**Step 6: 생성 확인**
|
|
- 목록에 새로운 Consumer Group이 추가되었는지 확인
|
|
- 예: `$Default`, `development`, `test`
|
|
|
|
---
|
|
|
|
### 방법 2: Azure CLI
|
|
|
|
#### 사전 요구사항
|
|
```bash
|
|
# Azure CLI 설치 확인
|
|
az --version
|
|
|
|
# 로그인
|
|
az login
|
|
```
|
|
|
|
#### Consumer Group 생성
|
|
```bash
|
|
# 리소스 그룹 확인 (필요 시)
|
|
az eventhubs namespace show \
|
|
--name hgzero-eventhub-ns \
|
|
--query resourceGroup -o tsv
|
|
|
|
# Consumer Group 생성
|
|
az eventhubs eventhub consumer-group create \
|
|
--resource-group <YOUR_RESOURCE_GROUP> \
|
|
--namespace-name hgzero-eventhub-ns \
|
|
--eventhub-name hgzero-eventhub-name \
|
|
--name development
|
|
|
|
# 생성 확인
|
|
az eventhubs eventhub consumer-group list \
|
|
--resource-group <YOUR_RESOURCE_GROUP> \
|
|
--namespace-name hgzero-eventhub-ns \
|
|
--eventhub-name hgzero-eventhub-name \
|
|
--output table
|
|
```
|
|
|
|
#### 출력 예시
|
|
```
|
|
Name ResourceGroup
|
|
----------- ---------------
|
|
$Default hgzero-rg
|
|
development hgzero-rg
|
|
test hgzero-rg
|
|
```
|
|
|
|
---
|
|
|
|
### 방법 3: Python Management SDK
|
|
|
|
#### 설치
|
|
```bash
|
|
pip install azure-mgmt-eventhub azure-identity
|
|
```
|
|
|
|
#### 코드
|
|
```python
|
|
from azure.mgmt.eventhub import EventHubManagementClient
|
|
from azure.identity import DefaultAzureCredential
|
|
|
|
# 인증
|
|
credential = DefaultAzureCredential()
|
|
subscription_id = "<YOUR_SUBSCRIPTION_ID>"
|
|
|
|
# 관리 클라이언트 생성
|
|
mgmt_client = EventHubManagementClient(credential, subscription_id)
|
|
|
|
# Consumer Group 생성
|
|
mgmt_client.consumer_groups.create_or_update(
|
|
resource_group_name='<YOUR_RESOURCE_GROUP>',
|
|
namespace_name='hgzero-eventhub-ns',
|
|
event_hub_name='hgzero-eventhub-name',
|
|
consumer_group_name='development',
|
|
parameters={} # 추가 설정 가능
|
|
)
|
|
|
|
print("✅ Consumer Group 'development' 생성 완료")
|
|
|
|
# Consumer Group 목록 조회
|
|
consumer_groups = mgmt_client.consumer_groups.list_by_event_hub(
|
|
resource_group_name='<YOUR_RESOURCE_GROUP>',
|
|
namespace_name='hgzero-eventhub-ns',
|
|
event_hub_name='hgzero-eventhub-name'
|
|
)
|
|
|
|
print("\n현재 Consumer Groups:")
|
|
for cg in consumer_groups:
|
|
print(f" - {cg.name}")
|
|
```
|
|
|
|
---
|
|
|
|
## 코드에서 사용하기
|
|
|
|
### 1. config.yaml 수정
|
|
|
|
#### 기존 설정
|
|
```yaml
|
|
eventhub:
|
|
connection_string: ${EVENTHUB_CONNECTION_STRING}
|
|
name: ${EVENTHUB_NAME}
|
|
consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP} # "$Default"
|
|
storage:
|
|
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
|
|
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
|
|
```
|
|
|
|
#### 개발 환경용 설정
|
|
```yaml
|
|
eventhub:
|
|
connection_string: ${EVENTHUB_CONNECTION_STRING}
|
|
name: ${EVENTHUB_NAME}
|
|
consumer_group: "development" # 직접 지정
|
|
storage:
|
|
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
|
|
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
|
|
```
|
|
|
|
### 2. .env 파일 수정 (선택사항)
|
|
|
|
#### 환경 변수로 관리하는 경우
|
|
```bash
|
|
# .env
|
|
EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
|
|
EVENTHUB_NAME=hgzero-eventhub-name
|
|
AZURE_EVENTHUB_CONSUMER_GROUP=development # 변경
|
|
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..."
|
|
AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints
|
|
```
|
|
|
|
### 3. Consumer 실행
|
|
|
|
#### 개발 환경
|
|
```bash
|
|
# config.yaml의 consumer_group을 "development"로 설정
|
|
cd /Users/daewoong/home/workspace/HGZero/rag
|
|
python start_consumer.py
|
|
```
|
|
|
|
#### 프로덕션 환경
|
|
```bash
|
|
# config.yaml의 consumer_group을 "$Default"로 설정
|
|
python start_consumer.py
|
|
```
|
|
|
|
#### 여러 환경 동시 실행
|
|
```bash
|
|
# Terminal 1: 프로덕션 Consumer
|
|
AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py
|
|
|
|
# Terminal 2: 개발 Consumer
|
|
AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py
|
|
|
|
# Terminal 3: 테스트 Consumer
|
|
AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py
|
|
```
|
|
|
|
---
|
|
|
|
## 검증 방법
|
|
|
|
### 1. Consumer Group 목록 확인
|
|
|
|
#### Python으로 확인
|
|
```python
|
|
import asyncio
|
|
from pathlib import Path
|
|
from azure.eventhub.aio import EventHubConsumerClient
|
|
from src.utils.config import load_config
|
|
|
|
async def list_consumer_groups():
|
|
"""사용 가능한 Consumer Group 확인"""
|
|
config_path = Path('config.yaml')
|
|
config = load_config(str(config_path))
|
|
|
|
eventhub_config = config['eventhub']
|
|
|
|
# 여기서는 실제로 Management API를 사용해야 하지만
|
|
# Consumer Client로는 자신이 속한 그룹만 확인 가능
|
|
|
|
print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}")
|
|
|
|
asyncio.run(list_consumer_groups())
|
|
```
|
|
|
|
### 2. 다른 Consumer Group으로 테스트
|
|
|
|
```python
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from azure.eventhub.aio import EventHubConsumerClient
|
|
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
|
|
from src.utils.config import load_config
|
|
|
|
async def test_consumer_group(consumer_group_name):
|
|
"""특정 Consumer Group으로 이벤트 수신 테스트"""
|
|
config_path = Path('config.yaml')
|
|
config = load_config(str(config_path))
|
|
|
|
eventhub_config = config['eventhub']
|
|
|
|
# Checkpoint Store
|
|
checkpoint_store = BlobCheckpointStore.from_connection_string(
|
|
eventhub_config['storage']['connection_string'],
|
|
eventhub_config['storage']['container_name']
|
|
)
|
|
|
|
# Consumer Client (Consumer Group 지정)
|
|
client = EventHubConsumerClient.from_connection_string(
|
|
eventhub_config['connection_string'],
|
|
consumer_group=consumer_group_name, # 여기서 지정!
|
|
eventhub_name=eventhub_config['name'],
|
|
checkpoint_store=checkpoint_store
|
|
)
|
|
|
|
print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...")
|
|
|
|
event_count = 0
|
|
|
|
async def on_event(partition_context, event):
|
|
nonlocal event_count
|
|
event_count += 1
|
|
print(f"이벤트 수신: #{event_count}")
|
|
await partition_context.update_checkpoint(event)
|
|
|
|
async def on_error(partition_context, error):
|
|
print(f"에러: {error}")
|
|
|
|
try:
|
|
async with client:
|
|
receive_task = asyncio.create_task(
|
|
client.receive(
|
|
on_event=on_event,
|
|
on_error=on_error,
|
|
starting_position="@earliest"
|
|
)
|
|
)
|
|
|
|
await asyncio.sleep(10) # 10초 대기
|
|
receive_task.cancel()
|
|
|
|
try:
|
|
await receive_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신")
|
|
|
|
except Exception as e:
|
|
print(f"❌ 에러 발생: {str(e)}")
|
|
|
|
# 테스트 실행
|
|
asyncio.run(test_consumer_group("development"))
|
|
```
|
|
|
|
### 3. Checkpoint 저장 위치 확인
|
|
|
|
각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다:
|
|
|
|
```
|
|
hgzero-checkpoints (Container)
|
|
├─ $Default/
|
|
│ ├─ ownership/
|
|
│ │ └─ 0 (파티션 0의 소유권 정보)
|
|
│ └─ checkpoint/
|
|
│ └─ 0 (파티션 0의 checkpoint)
|
|
├─ development/
|
|
│ ├─ ownership/
|
|
│ │ └─ 0
|
|
│ └─ checkpoint/
|
|
│ └─ 0
|
|
└─ test/
|
|
├─ ownership/
|
|
│ └─ 0
|
|
└─ checkpoint/
|
|
└─ 0
|
|
```
|
|
|
|
---
|
|
|
|
## 트러블슈팅
|
|
|
|
### 문제 1: Consumer Group을 찾을 수 없음
|
|
|
|
**증상**
|
|
```
|
|
azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found.
|
|
```
|
|
|
|
**원인**: Consumer Group이 실제로 생성되지 않음
|
|
|
|
**해결**:
|
|
1. Azure Portal에서 Consumer Group 목록 확인
|
|
2. 없으면 생성
|
|
3. 이름 오타 확인 (대소문자 구분)
|
|
|
|
### 문제 2: Ownership claim 실패
|
|
|
|
**증상**
|
|
```
|
|
EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming.
|
|
```
|
|
|
|
**원인**:
|
|
- 동일 Consumer Group에 이미 다른 Consumer가 실행 중
|
|
- 파티션보다 Consumer가 많음
|
|
|
|
**해결**:
|
|
1. 기존 Consumer 종료
|
|
2. 다른 Consumer Group 사용
|
|
3. 파티션 수 증가
|
|
|
|
### 문제 3: 이벤트를 읽지 못함
|
|
|
|
**증상**: Consumer는 실행되지만 이벤트가 수신되지 않음
|
|
|
|
**원인**:
|
|
- Starting position이 최신으로 설정됨
|
|
- 새 이벤트가 없음
|
|
|
|
**해결**:
|
|
```python
|
|
# config.yaml 또는 코드에서
|
|
await client.receive(
|
|
on_event=on_event,
|
|
on_error=on_error,
|
|
starting_position="@earliest" # 처음부터 읽기
|
|
)
|
|
```
|
|
|
|
### 문제 4: Checkpoint가 초기화되지 않음
|
|
|
|
**증상**: 새 Consumer Group인데 이미 읽은 것처럼 동작
|
|
|
|
**원인**: Blob Storage에 이전 checkpoint가 남아있음
|
|
|
|
**해결**:
|
|
```bash
|
|
# Azure Portal에서
|
|
# Storage Account > Containers > hgzero-checkpoints
|
|
# 해당 Consumer Group 폴더 삭제
|
|
```
|
|
|
|
또는 코드로:
|
|
```python
|
|
from azure.storage.blob import BlobServiceClient
|
|
|
|
blob_service = BlobServiceClient.from_connection_string(
|
|
AZURE_STORAGE_CONNECTION_STRING
|
|
)
|
|
container = blob_service.get_container_client("hgzero-checkpoints")
|
|
|
|
# 특정 Consumer Group의 checkpoint 삭제
|
|
prefix = "development/"
|
|
blobs = container.list_blobs(name_starts_with=prefix)
|
|
for blob in blobs:
|
|
container.delete_blob(blob.name)
|
|
print(f"삭제: {blob.name}")
|
|
```
|
|
|
|
---
|
|
|
|
## 권장 설정
|
|
|
|
### 환경별 Consumer Group 전략
|
|
|
|
```yaml
|
|
# 프로덕션
|
|
production:
|
|
consumer_group: "$Default"
|
|
starting_position: "-1" # 최신 이벤트부터
|
|
checkpoint_interval: 30 # 30초마다 checkpoint
|
|
|
|
# 개발
|
|
development:
|
|
consumer_group: "development"
|
|
starting_position: "@earliest" # 처음부터
|
|
checkpoint_interval: 10 # 자주 checkpoint (테스트용)
|
|
|
|
# 테스트
|
|
test:
|
|
consumer_group: "test"
|
|
starting_position: "@earliest"
|
|
checkpoint_interval: 5 # 매우 자주 (빠른 테스트)
|
|
|
|
# 분석
|
|
analytics:
|
|
consumer_group: "analytics"
|
|
starting_position: "@earliest" # 모든 데이터 분석
|
|
checkpoint_interval: 60 # 덜 자주 (성능)
|
|
```
|
|
|
|
### 실행 스크립트 예시
|
|
|
|
```bash
|
|
#!/bin/bash
|
|
# start_dev_consumer.sh
|
|
|
|
export AZURE_EVENTHUB_CONSUMER_GROUP=development
|
|
cd /Users/daewoong/home/workspace/HGZero/rag
|
|
python start_consumer.py
|
|
```
|
|
|
|
---
|
|
|
|
## 요약
|
|
|
|
### 빠른 시작 체크리스트
|
|
|
|
- [ ] Azure Portal에서 Consumer Group 생성
|
|
- [ ] 이름: `development`
|
|
- [ ] Event Hub: `hgzero-eventhub-name`
|
|
- [ ] `config.yaml` 수정
|
|
- [ ] `consumer_group: "development"` 설정
|
|
- [ ] Consumer 실행
|
|
- [ ] `python start_consumer.py`
|
|
- [ ] 검증
|
|
- [ ] 이벤트 수신 확인
|
|
- [ ] Blob Storage에 checkpoint 생성 확인
|
|
|
|
### Consumer Group 활용 팁
|
|
|
|
1. **프로덕션과 개발 분리**: 항상 다른 Consumer Group 사용
|
|
2. **테스트는 독립적으로**: `test` Consumer Group으로 자유롭게 실험
|
|
3. **Checkpoint 관리**: 필요시 삭제하여 처음부터 재처리
|
|
4. **모니터링**: 각 Consumer Group별 lag 모니터링
|
|
5. **비용 최적화**: 불필요한 Consumer Group은 삭제
|
|
|
|
이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯
|