hgzero/rag/docs/consumer_group_setup.md

12 KiB
Raw Permalink Blame History

Consumer Group 생성 및 할당 가이드

목차

  1. Consumer Group이란?
  2. 생성 방법
  3. 코드에서 사용하기
  4. 검증 방법
  5. 트러블슈팅

Consumer Group이란?

Consumer Group은 Event Hub의 이벤트를 독립적으로 읽는 논리적 그룹입니다.

주요 특징

  • 각 Consumer Group은 독립적인 checkpoint를 유지
  • 동일한 이벤트를 여러 Consumer Group이 각각 읽을 수 있음
  • 기본 Consumer Group: $Default (자동 생성됨)

사용 사례

Consumer Group 용도 설명
$Default 프로덕션 실제 운영 환경에서 사용
development 개발 개발자가 로컬에서 테스트
test 테스트 QA 테스트 환경
analytics 분석 데이터 분석 및 모니터링
backup 백업 이벤트 백업 및 아카이빙

생성 방법

방법 1: Azure Portal (가장 쉬움)

단계별 가이드

Step 1: Azure Portal 접속

https://portal.azure.com

Step 2: Event Hub Namespace 찾기

  1. 상단 검색창에 "hgzero-eventhub-ns" 입력
  2. "Event Hubs Namespaces" 아래의 결과 클릭

Step 3: Event Hub 선택

  1. 왼쪽 메뉴에서 "Event Hubs" 클릭
  2. "hgzero-eventhub-name" 선택

Step 4: Consumer Groups 관리

  1. 왼쪽 메뉴에서 "Consumer groups" 클릭
  2. 현재 목록에 $Default만 있을 것임

Step 5: 새 Consumer Group 생성

  1. 상단의 "+ Consumer group" 버튼 클릭
  2. Name 입력:
    • 개발용: development
    • 테스트용: test
    • 분석용: analytics
  3. "Create" 버튼 클릭

Step 6: 생성 확인

  • 목록에 새로운 Consumer Group이 추가되었는지 확인
  • 예: $Default, development, test

방법 2: Azure CLI

사전 요구사항

# Azure CLI 설치 확인
az --version

# 로그인
az login

Consumer Group 생성

# 리소스 그룹 확인 (필요 시)
az eventhubs namespace show \
  --name hgzero-eventhub-ns \
  --query resourceGroup -o tsv

# Consumer Group 생성
az eventhubs eventhub consumer-group create \
  --resource-group <YOUR_RESOURCE_GROUP> \
  --namespace-name hgzero-eventhub-ns \
  --eventhub-name hgzero-eventhub-name \
  --name development

# 생성 확인
az eventhubs eventhub consumer-group list \
  --resource-group <YOUR_RESOURCE_GROUP> \
  --namespace-name hgzero-eventhub-ns \
  --eventhub-name hgzero-eventhub-name \
  --output table

출력 예시

Name         ResourceGroup
-----------  ---------------
$Default     hgzero-rg
development  hgzero-rg
test         hgzero-rg

방법 3: Python Management SDK

설치

pip install azure-mgmt-eventhub azure-identity

코드

from azure.mgmt.eventhub import EventHubManagementClient
from azure.identity import DefaultAzureCredential

# 인증
credential = DefaultAzureCredential()
subscription_id = "<YOUR_SUBSCRIPTION_ID>"

# 관리 클라이언트 생성
mgmt_client = EventHubManagementClient(credential, subscription_id)

# Consumer Group 생성
mgmt_client.consumer_groups.create_or_update(
    resource_group_name='<YOUR_RESOURCE_GROUP>',
    namespace_name='hgzero-eventhub-ns',
    event_hub_name='hgzero-eventhub-name',
    consumer_group_name='development',
    parameters={}  # 추가 설정 가능
)

print("✅ Consumer Group 'development' 생성 완료")

# Consumer Group 목록 조회
consumer_groups = mgmt_client.consumer_groups.list_by_event_hub(
    resource_group_name='<YOUR_RESOURCE_GROUP>',
    namespace_name='hgzero-eventhub-ns',
    event_hub_name='hgzero-eventhub-name'
)

print("\n현재 Consumer Groups:")
for cg in consumer_groups:
    print(f"  - {cg.name}")

코드에서 사용하기

1. config.yaml 수정

기존 설정

eventhub:
  connection_string: ${EVENTHUB_CONNECTION_STRING}
  name: ${EVENTHUB_NAME}
  consumer_group: ${AZURE_EVENTHUB_CONSUMER_GROUP}  # "$Default"
  storage:
    connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
    container_name: ${AZURE_STORAGE_CONTAINER_NAME}

개발 환경용 설정

eventhub:
  connection_string: ${EVENTHUB_CONNECTION_STRING}
  name: ${EVENTHUB_NAME}
  consumer_group: "development"  # 직접 지정
  storage:
    connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
    container_name: ${AZURE_STORAGE_CONTAINER_NAME}

2. .env 파일 수정 (선택사항)

환경 변수로 관리하는 경우

# .env
EVENTHUB_CONNECTION_STRING="Endpoint=sb://hgzero-eventhub-ns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
EVENTHUB_NAME=hgzero-eventhub-name
AZURE_EVENTHUB_CONSUMER_GROUP=development  # 변경
AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;AccountName=hgzerostorage;..."
AZURE_STORAGE_CONTAINER_NAME=hgzero-checkpoints

3. Consumer 실행

개발 환경

# config.yaml의 consumer_group을 "development"로 설정
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py

프로덕션 환경

# config.yaml의 consumer_group을 "$Default"로 설정
python start_consumer.py

여러 환경 동시 실행

# Terminal 1: 프로덕션 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=$Default python start_consumer.py

# Terminal 2: 개발 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=development python start_consumer.py

# Terminal 3: 테스트 Consumer
AZURE_EVENTHUB_CONSUMER_GROUP=test python start_consumer.py

검증 방법

1. Consumer Group 목록 확인

Python으로 확인

import asyncio
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from src.utils.config import load_config

async def list_consumer_groups():
    """사용 가능한 Consumer Group 확인"""
    config_path = Path('config.yaml')
    config = load_config(str(config_path))

    eventhub_config = config['eventhub']

    # 여기서는 실제로 Management API를 사용해야 하지만
    # Consumer Client로는 자신이 속한 그룹만 확인 가능

    print(f"현재 설정된 Consumer Group: {eventhub_config['consumer_group']}")

asyncio.run(list_consumer_groups())

2. 다른 Consumer Group으로 테스트

import asyncio
import json
from pathlib import Path
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from src.utils.config import load_config

async def test_consumer_group(consumer_group_name):
    """특정 Consumer Group으로 이벤트 수신 테스트"""
    config_path = Path('config.yaml')
    config = load_config(str(config_path))

    eventhub_config = config['eventhub']

    # Checkpoint Store
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        eventhub_config['storage']['connection_string'],
        eventhub_config['storage']['container_name']
    )

    # Consumer Client (Consumer Group 지정)
    client = EventHubConsumerClient.from_connection_string(
        eventhub_config['connection_string'],
        consumer_group=consumer_group_name,  # 여기서 지정!
        eventhub_name=eventhub_config['name'],
        checkpoint_store=checkpoint_store
    )

    print(f"✅ Consumer Group '{consumer_group_name}'로 연결 시도...")

    event_count = 0

    async def on_event(partition_context, event):
        nonlocal event_count
        event_count += 1
        print(f"이벤트 수신: #{event_count}")
        await partition_context.update_checkpoint(event)

    async def on_error(partition_context, error):
        print(f"에러: {error}")

    try:
        async with client:
            receive_task = asyncio.create_task(
                client.receive(
                    on_event=on_event,
                    on_error=on_error,
                    starting_position="@earliest"
                )
            )

            await asyncio.sleep(10)  # 10초 대기
            receive_task.cancel()

            try:
                await receive_task
            except asyncio.CancelledError:
                pass

        print(f"\n✅ 테스트 완료: {event_count}개 이벤트 수신")

    except Exception as e:
        print(f"❌ 에러 발생: {str(e)}")

# 테스트 실행
asyncio.run(test_consumer_group("development"))

3. Checkpoint 저장 위치 확인

각 Consumer Group은 독립적인 checkpoint를 Blob Storage에 저장합니다:

hgzero-checkpoints (Container)
├─ $Default/
│   ├─ ownership/
│   │   └─ 0  (파티션 0의 소유권 정보)
│   └─ checkpoint/
│       └─ 0  (파티션 0의 checkpoint)
├─ development/
│   ├─ ownership/
│   │   └─ 0
│   └─ checkpoint/
│       └─ 0
└─ test/
    ├─ ownership/
    │   └─ 0
    └─ checkpoint/
        └─ 0

트러블슈팅

문제 1: Consumer Group을 찾을 수 없음

증상

azure.eventhub.exceptions.EventHubError: The messaging entity 'sb://hgzero-eventhub-ns.servicebus.windows.net/hgzero-eventhub-name/ConsumerGroups/development' could not be found.

원인: Consumer Group이 실제로 생성되지 않음

해결:

  1. Azure Portal에서 Consumer Group 목록 확인
  2. 없으면 생성
  3. 이름 오타 확인 (대소문자 구분)

문제 2: Ownership claim 실패

증상

EventProcessor 'xxx' hasn't claimed an ownership. It keeps claiming.

원인:

  • 동일 Consumer Group에 이미 다른 Consumer가 실행 중
  • 파티션보다 Consumer가 많음

해결:

  1. 기존 Consumer 종료
  2. 다른 Consumer Group 사용
  3. 파티션 수 증가

문제 3: 이벤트를 읽지 못함

증상: Consumer는 실행되지만 이벤트가 수신되지 않음

원인:

  • Starting position이 최신으로 설정됨
  • 새 이벤트가 없음

해결:

# config.yaml 또는 코드에서
await client.receive(
    on_event=on_event,
    on_error=on_error,
    starting_position="@earliest"  # 처음부터 읽기
)

문제 4: Checkpoint가 초기화되지 않음

증상: 새 Consumer Group인데 이미 읽은 것처럼 동작

원인: Blob Storage에 이전 checkpoint가 남아있음

해결:

# Azure Portal에서
# Storage Account > Containers > hgzero-checkpoints
# 해당 Consumer Group 폴더 삭제

또는 코드로:

from azure.storage.blob import BlobServiceClient

blob_service = BlobServiceClient.from_connection_string(
    AZURE_STORAGE_CONNECTION_STRING
)
container = blob_service.get_container_client("hgzero-checkpoints")

# 특정 Consumer Group의 checkpoint 삭제
prefix = "development/"
blobs = container.list_blobs(name_starts_with=prefix)
for blob in blobs:
    container.delete_blob(blob.name)
    print(f"삭제: {blob.name}")

권장 설정

환경별 Consumer Group 전략

# 프로덕션
production:
  consumer_group: "$Default"
  starting_position: "-1"  # 최신 이벤트부터
  checkpoint_interval: 30   # 30초마다 checkpoint

# 개발
development:
  consumer_group: "development"
  starting_position: "@earliest"  # 처음부터
  checkpoint_interval: 10   # 자주 checkpoint (테스트용)

# 테스트
test:
  consumer_group: "test"
  starting_position: "@earliest"
  checkpoint_interval: 5    # 매우 자주 (빠른 테스트)

# 분석
analytics:
  consumer_group: "analytics"
  starting_position: "@earliest"  # 모든 데이터 분석
  checkpoint_interval: 60   # 덜 자주 (성능)

실행 스크립트 예시

#!/bin/bash
# start_dev_consumer.sh

export AZURE_EVENTHUB_CONSUMER_GROUP=development
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py

요약

빠른 시작 체크리스트

  • Azure Portal에서 Consumer Group 생성
    • 이름: development
    • Event Hub: hgzero-eventhub-name
  • config.yaml 수정
    • consumer_group: "development" 설정
  • Consumer 실행
    • python start_consumer.py
  • 검증
    • 이벤트 수신 확인
    • Blob Storage에 checkpoint 생성 확인

Consumer Group 활용 팁

  1. 프로덕션과 개발 분리: 항상 다른 Consumer Group 사용
  2. 테스트는 독립적으로: test Consumer Group으로 자유롭게 실험
  3. Checkpoint 관리: 필요시 삭제하여 처음부터 재처리
  4. 모니터링: 각 Consumer Group별 lag 모니터링
  5. 비용 최적화: 불필요한 Consumer Group은 삭제

이제 원하는 Consumer Group을 만들고 독립적으로 Event Hub를 사용할 수 있습니다! 🎯