# Consumer Group 생성 및 할당 가이드 ## 목차 1. [Consumer Group이란?](#consumer-group이란) 2. [생성 방법](#생성-방법) 3. [코드에서 사용하기](#코드에서-사용하기) 4. [검증 방법](#검증-방법) 5. [트러블슈팅](#트러블슈팅) --- ## Consumer Group이란? Consumer Group은 Event Hub의 이벤트를 **독립적으로 읽는 논리적 그룹**입니다. ### 주요 특징 - 각 Consumer Group은 독립적인 checkpoint를 유지 - 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음 - 기본 Consumer Group: `$Default` (자동 생성됨) ### 사용 사례 | Consumer Group | 용도 | 설명 | |---------------|------|------| | `$Default` | 프로덕션 | 실제 운영 환경에서 사용 | | `development` | 개발 | 개발자가 로컬에서 테스트 | | `test` | 테스트 | QA 테스트 환경 | | `analytics` | 분석 | 데이터 분석 및 모니터링 | | `backup` | 백업 | 이벤트 백업 및 아카이빙 | --- ## 생성 방법 ### 방법 1: Azure Portal (가장 쉬움) ⭐️ #### 단계별 가이드 **Step 1: Azure Portal 접속** ``` https://portal.azure.com ``` **Step 2: Event Hub Namespace 찾기** 1. 상단 검색창에 "hgzero-eventhub-ns" 입력 2. "Event Hubs Namespaces" 아래의 결과 클릭 **Step 3: Event Hub 선택** 1. 왼쪽 메뉴에서 "Event Hubs" 클릭 2. "hgzero-eventhub-name" 선택 **Step 4: Consumer Groups 관리** 1. 왼쪽 메뉴에서 "Consumer groups" 클릭 2. 현재 목록에 `$Default`만 있을 것임 **Step 5: 새 Consumer Group 생성** 1. 상단의 "+ Consumer group" 버튼 클릭 2. Name 입력: - 개발용: `development` - 테스트용: `test` - 분석용: `analytics` 3. "Create" 버튼 클릭 **Step 6: 생성 확인** - 목록에 새로운 Consumer Group이 추가되었는지 확인 - 예: `$Default`, `development`, `test` --- ### 방법 2: Azure CLI #### 사전 요구사항 ```bash # Azure CLI 설치 확인 az --version # 로그인 az login ``` #### Consumer Group 생성 ```bash # 리소스 그룹 확인 (필요 시) az eventhubs namespace show \ --name hgzero-eventhub-ns \ --query resourceGroup -o tsv # Consumer Group 생성 az eventhubs eventhub consumer-group create \ --resource-group \ --namespace-name hgzero-eventhub-ns \ --eventhub-name hgzero-eventhub-name \ --name development # 생성 확인 az eventhubs eventhub consumer-group list \ --resource-group \ --namespace-name hgzero-eventhub-ns \ --eventhub-name hgzero-eventhub-name \ --output table ``` #### 출력 예시 ``` Name ResourceGroup ----------- --------------- $Default hgzero-rg development hgzero-rg test hgzero-rg ``` --- ### 방법 3: Python Management SDK #### 설치 ```bash pip install azure-mgmt-eventhub azure-identity ``` #### 코드 ```python from azure.mgmt.eventhub import EventHubManagementClient from azure.identity import DefaultAzureCredential # 인증 credential = DefaultAzureCredential() subscription_id = "" # 관리 클라이언트 생성 mgmt_client = EventHubManagementClient(credential, subscription_id) # Consumer Group 생성 mgmt_client.consumer_groups.create_or_update( resource_group_name='', namespace_name='hgzero-eventhub-ns', event_hub_name='hgzero-eventhub-name', consumer_group_name='development', parameters={} # 추가 설정 가능 ) print("✅ Consumer Group 'development' 생성 완료") # Consumer Group 목록 조회 consumer_groups = mgmt_client.consumer_groups.list_by_event_hub( resource_group_name='', namespace_name='hgzero-eventhub-ns', event_hub_name='hgzero-eventhub-name' ) print("\n현재 Consumer Groups:") for cg in consumer_groups: print(f" - {cg.name}") ``` --- ## 코드에서 사용하기 ### 1. config.yaml 수정 #### 기존 설정 ```yaml eventhub: connection_string: ${EVENTHUB_CONNECTION_STRING} name: ${EVENTHUB_NAME} consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP} # "$Default" storage: connection_string: ${AZURE_STORAGE_CONNECTION_STRING} container_name: ${AZURE_STORAGE_CONTAINER_NAME} ``` #### 개발 환경용 설정 ```yaml eventhub: connection_string: ${EVENTHUB_CONNECTION_STRING} name: ${EVENTHUB_NAME} consumer_group: "development" # 직접 지정 storage: connection_string: ${AZURE_STORAGE_CONNECTION_STRING} container_name: ${AZURE_STORAGE_CONTAINER_NAME} ``` ### 2. .env 파일 수정 (선택사항) #### 환경 변수로 관리하는 경우 ```bash # .env EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..." EVENTHUB_NAME=hgzero-eventhub-name AZURE_EVENTHUB_CONSUMER_GROUP=development # 변경 AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..." AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints ``` ### 3. Consumer 실행 #### 개발 환경 ```bash # config.yaml의 consumer_group을 "development"로 설정 cd /Users/daewoong/home/workspace/HGZero/rag python start_consumer.py ``` #### 프로덕션 환경 ```bash # config.yaml의 consumer_group을 "$Default"로 설정 python start_consumer.py ``` #### 여러 환경 동시 실행 ```bash # Terminal 1: 프로덕션 Consumer AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py # Terminal 2: 개발 Consumer AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py # Terminal 3: 테스트 Consumer AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py ``` --- ## 검증 방법 ### 1. Consumer Group 목록 확인 #### Python으로 확인 ```python import asyncio from pathlib import Path from azure.eventhub.aio import EventHubConsumerClient from src.utils.config import load_config async def list_consumer_groups(): """사용 가능한 Consumer Group 확인""" config_path = Path('config.yaml') config = load_config(str(config_path)) eventhub_config = config['eventhub'] # 여기서는 실제로 Management API를 사용해야 하지만 # Consumer Client로는 자신이 속한 그룹만 확인 가능 print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}") asyncio.run(list_consumer_groups()) ``` ### 2. 다른 Consumer Group으로 테스트 ```python import asyncio import json from pathlib import Path from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore from src.utils.config import load_config async def test_consumer_group(consumer_group_name): """특정 Consumer Group으로 이벤트 수신 테스트""" config_path = Path('config.yaml') config = load_config(str(config_path)) eventhub_config = config['eventhub'] # Checkpoint Store checkpoint_store = BlobCheckpointStore.from_connection_string( eventhub_config['storage']['connection_string'], eventhub_config['storage']['container_name'] ) # Consumer Client (Consumer Group 지정) client = EventHubConsumerClient.from_connection_string( eventhub_config['connection_string'], consumer_group=consumer_group_name, # 여기서 지정! eventhub_name=eventhub_config['name'], checkpoint_store=checkpoint_store ) print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...") event_count = 0 async def on_event(partition_context, event): nonlocal event_count event_count += 1 print(f"이벤트 수신: #{event_count}") await partition_context.update_checkpoint(event) async def on_error(partition_context, error): print(f"에러: {error}") try: async with client: receive_task = asyncio.create_task( client.receive( on_event=on_event, on_error=on_error, starting_position="@earliest" ) ) await asyncio.sleep(10) # 10초 대기 receive_task.cancel() try: await receive_task except asyncio.CancelledError: pass print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신") except Exception as e: print(f"❌ 에러 발생: {str(e)}") # 테스트 실행 asyncio.run(test_consumer_group("development")) ``` ### 3. Checkpoint 저장 위치 확인 각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다: ``` hgzero-checkpoints (Container) ├─ $Default/ │ ├─ ownership/ │ │ └─ 0 (파티션 0의 소유권 정보) │ └─ checkpoint/ │ └─ 0 (파티션 0의 checkpoint) ├─ development/ │ ├─ ownership/ │ │ └─ 0 │ └─ checkpoint/ │ └─ 0 └─ test/ ├─ ownership/ │ └─ 0 └─ checkpoint/ └─ 0 ``` --- ## 트러블슈팅 ### 문제 1: Consumer Group을 찾을 수 없음 **증상** ``` azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found. ``` **원인**: Consumer Group이 실제로 생성되지 않음 **해결**: 1. Azure Portal에서 Consumer Group 목록 확인 2. 없으면 생성 3. 이름 오타 확인 (대소문자 구분) ### 문제 2: Ownership claim 실패 **증상** ``` EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming. ``` **원인**: - 동일 Consumer Group에 이미 다른 Consumer가 실행 중 - 파티션보다 Consumer가 많음 **해결**: 1. 기존 Consumer 종료 2. 다른 Consumer Group 사용 3. 파티션 수 증가 ### 문제 3: 이벤트를 읽지 못함 **증상**: Consumer는 실행되지만 이벤트가 수신되지 않음 **원인**: - Starting position이 최신으로 설정됨 - 새 이벤트가 없음 **해결**: ```python # config.yaml 또는 코드에서 await client.receive( on_event=on_event, on_error=on_error, starting_position="@earliest" # 처음부터 읽기 ) ``` ### 문제 4: Checkpoint가 초기화되지 않음 **증상**: 새 Consumer Group인데 이미 읽은 것처럼 동작 **원인**: Blob Storage에 이전 checkpoint가 남아있음 **해결**: ```bash # Azure Portal에서 # Storage Account > Containers > hgzero-checkpoints # 해당 Consumer Group 폴더 삭제 ``` 또는 코드로: ```python from azure.storage.blob import BlobServiceClient blob_service = BlobServiceClient.from_connection_string( AZURE_STORAGE_CONNECTION_STRING ) container = blob_service.get_container_client("hgzero-checkpoints") # 특정 Consumer Group의 checkpoint 삭제 prefix = "development/" blobs = container.list_blobs(name_starts_with=prefix) for blob in blobs: container.delete_blob(blob.name) print(f"삭제: {blob.name}") ``` --- ## 권장 설정 ### 환경별 Consumer Group 전략 ```yaml # 프로덕션 production: consumer_group: "$Default" starting_position: "-1" # 최신 이벤트부터 checkpoint_interval: 30 # 30초마다 checkpoint # 개발 development: consumer_group: "development" starting_position: "@earliest" # 처음부터 checkpoint_interval: 10 # 자주 checkpoint (테스트용) # 테스트 test: consumer_group: "test" starting_position: "@earliest" checkpoint_interval: 5 # 매우 자주 (빠른 테스트) # 분석 analytics: consumer_group: "analytics" starting_position: "@earliest" # 모든 데이터 분석 checkpoint_interval: 60 # 덜 자주 (성능) ``` ### 실행 스크립트 예시 ```bash #!/bin/bash # start_dev_consumer.sh export AZURE_EVENTHUB_CONSUMER_GROUP=development cd /Users/daewoong/home/workspace/HGZero/rag python start_consumer.py ``` --- ## 요약 ### 빠른 시작 체크리스트 - [ ] Azure Portal에서 Consumer Group 생성 - [ ] 이름: `development` - [ ] Event Hub: `hgzero-eventhub-name` - [ ] `config.yaml` 수정 - [ ] `consumer_group: "development"` 설정 - [ ] Consumer 실행 - [ ] `python start_consumer.py` - [ ] 검증 - [ ] 이벤트 수신 확인 - [ ] Blob Storage에 checkpoint 생성 확인 ### Consumer Group 활용 팁 1. **프로덕션과 개발 분리**: 항상 다른 Consumer Group 사용 2. **테스트는 독립적으로**: `test` Consumer Group으로 자유롭게 실험 3. **Checkpoint 관리**: 필요시 삭제하여 처음부터 재처리 4. **모니터링**: 각 Consumer Group별 lag 모니터링 5. **비용 최적화**: 불필요한 Consumer Group은 삭제 이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯