mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 12:36:23 +00:00
12 KiB
12 KiB
Consumer Group 생성 및 할당 가이드
목차
Consumer Group이란?
Consumer Group은 Event Hub의 이벤트를 독립적으로 읽는 논리적 그룹입니다.
주요 특징
- 각 Consumer Group은 독립적인 checkpoint를 유지
- 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음
- 기본 Consumer Group:
$Default(자동 생성됨)
사용 사례
| Consumer Group | 용도 | 설명 |
|---|---|---|
$Default |
프로덕션 | 실제 운영 환경에서 사용 |
development |
개발 | 개발자가 로컬에서 테스트 |
test |
테스트 | QA 테스트 환경 |
analytics |
분석 | 데이터 분석 및 모니터링 |
backup |
백업 | 이벤트 백업 및 아카이빙 |
생성 방법
방법 1: Azure Portal (가장 쉬움) ⭐️
단계별 가이드
Step 1: Azure Portal 접속
https://portal.azure.com
Step 2: Event Hub Namespace 찾기
- 상단 검색창에 "hgzero-eventhub-ns" 입력
- "Event Hubs Namespaces" 아래의 결과 클릭
Step 3: Event Hub 선택
- 왼쪽 메뉴에서 "Event Hubs" 클릭
- "hgzero-eventhub-name" 선택
Step 4: Consumer Groups 관리
- 왼쪽 메뉴에서 "Consumer groups" 클릭
- 현재 목록에
$Default만 있을 것임
Step 5: 새 Consumer Group 생성
- 상단의 "+ Consumer group" 버튼 클릭
- Name 입력:
- 개발용:
development - 테스트용:
test - 분석용:
analytics
- 개발용:
- "Create" 버튼 클릭
Step 6: 생성 확인
- 목록에 새로운 Consumer Group이 추가되었는지 확인
- 예:
$Default,development,test
방법 2: Azure CLI
사전 요구사항
# Azure CLI 설치 확인
az --version
# 로그인
az login
Consumer Group 생성
# 리소스 그룹 확인 (필요 시)
az eventhubs namespace show \
--name hgzero-eventhub-ns \
--query resourceGroup -o tsv
# Consumer Group 생성
az eventhubs eventhub consumer-group create \
--resource-group <YOUR_RESOURCE_GROUP> \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--name development
# 생성 확인
az eventhubs eventhub consumer-group list \
--resource-group <YOUR_RESOURCE_GROUP> \
--namespace-name hgzero-eventhub-ns \
--eventhub-name hgzero-eventhub-name \
--output table
출력 예시
Name ResourceGroup
----------- ---------------
$Default hgzero-rg
development hgzero-rg
test hgzero-rg
방법 3: Python Management SDK
설치
pip install azure-mgmt-eventhub azure-identity
코드
from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential
# 인증
credential = DefaultAzureCredential()
subscription_id = "<YOUR_SUBSCRIPTION_ID>"
# 관리 클라이언트 생성
mgmt_client = EventHubManagementClient(credential, subscription_id)
# Consumer Group 생성
mgmt_client.consumer_groups.create_or_update(
resource_group_name='<YOUR_RESOURCE_GROUP>',
namespace_name='hgzero-eventhub-ns',
event_hub_name='hgzero-eventhub-name',
consumer_group_name='development',
parameters={} # 추가 설정 가능
)
print("✅ Consumer Group 'development' 생성 완료")
# Consumer Group 목록 조회
consumer_groups = mgmt_client.consumer_groups.list_by_event_hub(
resource_group_name='<YOUR_RESOURCE_GROUP>',
namespace_name='hgzero-eventhub-ns',
event_hub_name='hgzero-eventhub-name'
)
print("\n현재 Consumer Groups:")
for cg in consumer_groups:
print(f" - {cg.name}")
코드에서 사용하기
1. config.yaml 수정
기존 설정
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP} # "$Default"
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
개발 환경용 설정
eventhub:
connection_string: ${EVENTHUB_CONNECTION_STRING}
name: ${EVENTHUB_NAME}
consumer_group: "development" # 직접 지정
storage:
connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
container_name: ${AZURE_STORAGE_CONTAINER_NAME}
2. .env 파일 수정 (선택사항)
환경 변수로 관리하는 경우
# .env
EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
EVENTHUB_NAME=hgzero-eventhub-name
AZURE_EVENTHUB_CONSUMER_GROUP=development # 변경
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..."
AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints
3. Consumer 실행
개발 환경
# config.yaml의 consumer_group을 "development"로 설정
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
프로덕션 환경
# config.yaml의 consumer_group을 "$Default"로 설정
python start_consumer.py
여러 환경 동시 실행
# Terminal 1: 프로덕션 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py
# Terminal 2: 개발 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py
# Terminal 3: 테스트 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py
검증 방법
1. Consumer Group 목록 확인
Python으로 확인
import asyncio
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from src.utils.config import load_config
async def list_consumer_groups():
"""사용 가능한 Consumer Group 확인"""
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
# 여기서는 실제로 Management API를 사용해야 하지만
# Consumer Client로는 자신이 속한 그룹만 확인 가능
print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}")
asyncio.run(list_consumer_groups())
2. 다른 Consumer Group으로 테스트
import asyncio
import json
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from src.utils.config import load_config
async def test_consumer_group(consumer_group_name):
"""특정 Consumer Group으로 이벤트 수신 테스트"""
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
# Checkpoint Store
checkpoint_store = BlobCheckpointStore.from_connection_string(
eventhub_config['storage']['connection_string'],
eventhub_config['storage']['container_name']
)
# Consumer Client (Consumer Group 지정)
client = EventHubConsumerClient.from_connection_string(
eventhub_config['connection_string'],
consumer_group=consumer_group_name, # 여기서 지정!
eventhub_name=eventhub_config['name'],
checkpoint_store=checkpoint_store
)
print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...")
event_count = 0
async def on_event(partition_context, event):
nonlocal event_count
event_count += 1
print(f"이벤트 수신: #{event_count}")
await partition_context.update_checkpoint(event)
async def on_error(partition_context, error):
print(f"에러: {error}")
try:
async with client:
receive_task = asyncio.create_task(
client.receive(
on_event=on_event,
on_error=on_error,
starting_position="@earliest"
)
)
await asyncio.sleep(10) # 10초 대기
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신")
except Exception as e:
print(f"❌ 에러 발생: {str(e)}")
# 테스트 실행
asyncio.run(test_consumer_group("development"))
3. Checkpoint 저장 위치 확인
각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다:
hgzero-checkpoints (Container)
├─ $Default/
│ ├─ ownership/
│ │ └─ 0 (파티션 0의 소유권 정보)
│ └─ checkpoint/
│ └─ 0 (파티션 0의 checkpoint)
├─ development/
│ ├─ ownership/
│ │ └─ 0
│ └─ checkpoint/
│ └─ 0
└─ test/
├─ ownership/
│ └─ 0
└─ checkpoint/
└─ 0
트러블슈팅
문제 1: Consumer Group을 찾을 수 없음
증상
azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found.
원인: Consumer Group이 실제로 생성되지 않음
해결:
- Azure Portal에서 Consumer Group 목록 확인
- 없으면 생성
- 이름 오타 확인 (대소문자 구분)
문제 2: Ownership claim 실패
증상
EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming.
원인:
- 동일 Consumer Group에 이미 다른 Consumer가 실행 중
- 파티션보다 Consumer가 많음
해결:
- 기존 Consumer 종료
- 다른 Consumer Group 사용
- 파티션 수 증가
문제 3: 이벤트를 읽지 못함
증상: Consumer는 실행되지만 이벤트가 수신되지 않음
원인:
- Starting position이 최신으로 설정됨
- 새 이벤트가 없음
해결:
# config.yaml 또는 코드에서
await client.receive(
on_event=on_event,
on_error=on_error,
starting_position="@earliest" # 처음부터 읽기
)
문제 4: Checkpoint가 초기화되지 않음
증상: 새 Consumer Group인데 이미 읽은 것처럼 동작
원인: Blob Storage에 이전 checkpoint가 남아있음
해결:
# Azure Portal에서
# Storage Account > Containers > hgzero-checkpoints
# 해당 Consumer Group 폴더 삭제
또는 코드로:
from azure.storage.blob import BlobServiceClient
blob_service = BlobServiceClient.from_connection_string(
AZURE_STORAGE_CONNECTION_STRING
)
container = blob_service.get_container_client("hgzero-checkpoints")
# 특정 Consumer Group의 checkpoint 삭제
prefix = "development/"
blobs = container.list_blobs(name_starts_with=prefix)
for blob in blobs:
container.delete_blob(blob.name)
print(f"삭제: {blob.name}")
권장 설정
환경별 Consumer Group 전략
# 프로덕션
production:
consumer_group: "$Default"
starting_position: "-1" # 최신 이벤트부터
checkpoint_interval: 30 # 30초마다 checkpoint
# 개발
development:
consumer_group: "development"
starting_position: "@earliest" # 처음부터
checkpoint_interval: 10 # 자주 checkpoint (테스트용)
# 테스트
test:
consumer_group: "test"
starting_position: "@earliest"
checkpoint_interval: 5 # 매우 자주 (빠른 테스트)
# 분석
analytics:
consumer_group: "analytics"
starting_position: "@earliest" # 모든 데이터 분석
checkpoint_interval: 60 # 덜 자주 (성능)
실행 스크립트 예시
#!/bin/bash
# start_dev_consumer.sh
export AZURE_EVENTHUB_CONSUMER_GROUP=development
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
요약
빠른 시작 체크리스트
- Azure Portal에서 Consumer Group 생성
- 이름:
development - Event Hub:
hgzero-eventhub-name
- 이름:
config.yaml수정consumer_group: "development"설정
- Consumer 실행
python start_consumer.py
- 검증
- 이벤트 수신 확인
- Blob Storage에 checkpoint 생성 확인
Consumer Group 활용 팁
- 프로덕션과 개발 분리: 항상 다른 Consumer Group 사용
- 테스트는 독립적으로:
testConsumer Group으로 자유롭게 실험 - Checkpoint 관리: 필요시 삭제하여 처음부터 재처리
- 모니터링: 각 Consumer Group별 lag 모니터링
- 비용 최적화: 불필요한 Consumer Group은 삭제
이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯