hgzero/rag/eventhub_guide.md

10 KiB

Event Hub Consumer Guide

핵심 개념: Partition Ownership (파티션 소유권)

Event Hub에서는 같은 Consumer Group 내에서 하나의 파티션은 동시에 오직 하나의 Consumer만 읽을 수 있습니다. 이를 "Exclusive Consumer" 패턴이라고 합니다.

왜 이런 제약이 있나요?

1. 순서 보장 (Ordering)

파티션 0: [이벤트1] → [이벤트2] → [이벤트3]

❌ 잘못된 경우 (여러 Consumer가 동시 읽기):
Consumer A: 이벤트1 처리 중... (느림)
Consumer B: 이벤트2 처리 완료 ✓
Consumer C: 이벤트3 처리 완료 ✓
→ 처리 순서: 2 → 3 → 1 (순서 뒤바뀜!)

✅ 올바른 경우 (하나의 Consumer만):
Consumer A: 이벤트1 ✓ → 이벤트2 ✓ → 이벤트3 ✓
→ 처리 순서: 1 → 2 → 3 (순서 보장!)

2. Checkpoint 일관성

❌ 여러 Consumer가 각자 checkpoint:
Consumer A: offset 100까지 읽음 → checkpoint 저장
Consumer B: offset 150까지 읽음 → checkpoint 덮어씀
Consumer A 재시작 → offset 150부터 읽음 → offset 100~149 누락!

✅ 하나의 Consumer만:
Consumer A: offset 100 → 110 → 120 → ... (순차적)
재시작 시 → 마지막 checkpoint부터 정확히 재개

3. 중복 처리 방지

❌ 여러 Consumer가 동일 이벤트 읽기:
Consumer A: 주문 이벤트 처리 → 결제 완료
Consumer B: 동일 주문 이벤트 처리 → 중복 결제!

✅ 하나의 Consumer만:
Consumer A: 주문 이벤트 처리 → 1번만 결제 ✓

Ownership Claim 메커니즘

동작 과정

Consumer A (PID 51257) - 먼저 시작
    ↓
Blob Storage에 파티션 0 소유권 요청
    ↓
✅ 승인 (Owner: A, Lease: 30초)
    ↓
30초마다 Lease 갱신
    ↓
계속 소유권 유지


Consumer B (테스트) - 나중에 시작
    ↓
Blob Storage에 파티션 0 소유권 요청
    ↓
❌ 거부 (이미 A가 소유 중)
    ↓
"hasn't claimed an ownership" 로그
    ↓
계속 재시도 (대기 상태)

Blob Storage에 저장되는 정보

{
  "partitionId": "0",
  "ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95",
  "lastModifiedTime": "2025-10-29T02:17:49Z",
  "eTag": "\"0x8DCF7E8F9B3C1A0\"",
  "offset": "120259090624",
  "sequenceNumber": 232
}

현재 상황 분석

Event Hub: hgzero-eventhub-name
├─ 파티션 수: 1개 (파티션 0)
└─ Consumer Group: $Default

실행 중:
├─ Consumer A (PID 51257): 파티션 0 소유 ✅
│   ├─ 이벤트 정상 수신 중
│   ├─ Lease 주기적 갱신 중
│   └─ Checkpoint 저장 중
│
└─ 테스트 Consumer들: 소유권 없음 ❌
    ├─ 파티션 0 claim 시도
    ├─ 계속 거부당함
    ├─ "hasn't claimed an ownership" 로그
    └─ 이벤트 수신 불가

해결 방법

Option 1: 기존 Consumer 종료 후 재시작

# 기존 Consumer 종료
kill 51257

# 새로 시작
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py

장점: 간단 단점: 다운타임 발생 (Lease 만료까지 최대 30초)

Option 2: 다른 Consumer Group 사용 (권장)

# config.yaml
eventhub:
  consumer_group: "test-group"  # $Default 대신 사용

장점:

  • 기존 Consumer에 영향 없음
  • 독립적으로 모든 이벤트 읽기 가능
  • 개발/테스트에 이상적

단점: 리소스 추가 사용

Option 3: 파티션 수평 확장

Event Hub 파티션 증가: 1개 → 3개
Consumer 실행: 3개

분산:
├─ Consumer A: 파티션 0
├─ Consumer B: 파티션 1
└─ Consumer C: 파티션 2

→ 병렬 처리로 3배 성능 향상!

장점: 높은 처리량 단점: 비용 증가, 전체 순서는 보장 안 됨 (파티션 내 순서만 보장)

Consumer Group 비교

┌─────────────────────────────────────────┐
│ Event Hub: hgzero-eventhub              │
│ 파티션 0: [이벤트들...]                  │
└─────────────────────────────────────────┘
           │
           ├─────────────────┬─────────────────┐
           │                 │                 │
    Consumer Group      Consumer Group    Consumer Group
      "$Default"          "analytics"        "backup"
           │                 │                 │
      Consumer A         Consumer B         Consumer C
    (RAG 처리)          (분석 처리)        (백업 처리)
           │                 │                 │
    각자 독립적으로 동일한 파티션 0의 모든 이벤트 읽음
    각자 독립적인 Checkpoint 유지

파티션과 Consumer 수 관계

Case 1: Consumer 1개

Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default

├─ Consumer A: P0, P1, P2 모두 소유
└─ 모든 파티션 처리 (순차적)

Case 2: Consumer 3개 (이상적)

Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default

├─ Consumer A: P0 소유
├─ Consumer B: P1 소유
└─ Consumer C: P2 소유

→ 병렬 처리로 최대 성능!

Case 3: Consumer 5개 (과잉)

Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default

├─ Consumer A: P0 소유
├─ Consumer B: P1 소유
├─ Consumer C: P2 소유
├─ Consumer D: 소유한 파티션 없음 (대기)
└─ Consumer E: 소유한 파티션 없음 (대기)

→ D, E는 이벤트를 읽지 못하고 대기만 함

베스트 프랙티스

환경 Consumer 수 Consumer Group 파티션 수
프로덕션 = 파티션 수 production 처리량에 맞게
개발 1개 development 1~2개
테스트 1개 test 1개
분석 1개 analytics (공유)

권장 사항

  1. 프로덕션 환경

    • Consumer 수 = 파티션 수 (1:1 매핑)
    • 고가용성을 위해 각 Consumer를 다른 서버에 배치
    • Consumer 수 > 파티션 수로 설정하면 일부는 대기 상태 (Standby)
  2. 개발/테스트 환경

    • 별도 Consumer Group 사용
    • 파티션 1개로 충분
    • 필요시 checkpoint를 초기화하여 처음부터 재처리
  3. 모니터링

    • Ownership claim 실패 로그 모니터링
    • Lease 갱신 실패 알림 설정
    • Checkpoint lag 모니터링
  4. 장애 복구

    • Lease timeout 고려 (기본 30초)
    • Consumer 장애 시 자동 재분배 (30초 이내)
    • Checkpoint로부터 정확한 위치에서 재개

Consumer 프로세스 관리 명령어

프로세스 확인

# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep

# 상세 정보 (실행시간, CPU, 메모리)
ps -p <PID> -o pid,etime,%cpu,%mem,cmd

# 네트워크 연결 확인
lsof -i -n | grep <PID>

# 모든 Python 프로세스 확인
ps aux | grep python | grep -v grep

프로세스 종료

# 정상 종료 (SIGTERM)
kill <PID>

# 강제 종료 (SIGKILL)
kill -9 <PID>

# 이름으로 종료
pkill -f start_consumer.py

테스트 이벤트 전송

from azure.eventhub import EventHubProducerClient, EventData
import json
import os
from dotenv import load_dotenv

load_dotenv('rag/.env')

conn_str = os.getenv('EVENTHUB_CONNECTION_STRING')
eventhub_name = os.getenv('EVENTHUB_NAME')

test_event = {
    'eventType': 'MINUTES_FINALIZED',
    'data': {
        'meetingId': 'test-meeting-001',
        'title': '테스트 회의',
        'minutesId': 'test-minutes-001',
        'sections': [
            {
                'sectionId': 'section-001',
                'type': 'DISCUSSION',
                'title': '논의 사항',
                'content': '테스트 논의 내용입니다.',
                'order': 1,
                'verified': True
            }
        ]
    }
}

producer = EventHubProducerClient.from_connection_string(
    conn_str=conn_str,
    eventhub_name=eventhub_name
)

event_data_batch = producer.create_batch()
event_data_batch.add(EventData(json.dumps(test_event)))

producer.send_batch(event_data_batch)
print('✅ 테스트 이벤트 전송 완료')

producer.close()

Event Hub 파티션 정보 조회

import asyncio
from azure.eventhub.aio import EventHubConsumerClient

async def check_partitions():
    client = EventHubConsumerClient.from_connection_string(
        conn_str=EVENTHUB_CONNECTION_STRING,
        consumer_group="$Default",
        eventhub_name=EVENTHUB_NAME
    )

    async with client:
        partition_ids = await client.get_partition_ids()
        print(f"파티션 개수: {len(partition_ids)}")
        print(f"파티션 IDs: {partition_ids}")

        for partition_id in partition_ids:
            props = await client.get_partition_properties(partition_id)
            print(f"\n파티션 {partition_id}:")
            print(f"  시퀀스 번호: {props['last_enqueued_sequence_number']}")
            print(f"  오프셋: {props['last_enqueued_offset']}")
            print(f"  마지막 이벤트 시간: {props['last_enqueued_time_utc']}")

asyncio.run(check_partitions())

정리

"에러"가 아니라 "설계된 동작"입니다

  1. 정상: Consumer A가 파티션 소유 → 이벤트 처리
  2. 정상: Consumer B가 claim 실패 → 대기
  3. 정상: Consumer A 종료 시 → Consumer B가 자동 인수

이 메커니즘의 장점

  • 📌 순서 보장: 파티션 내 이벤트 순서 유지
  • 📌 정확히 한 번 처리: 중복 처리 방지
  • 📌 자동 장애 복구: Consumer 장애 시 자동 재분배
  • 📌 수평 확장: 파티션 추가로 처리량 증가

현재 상황 해결

권장: 다른 Consumer Group을 사용하여 테스트하시는 것이 가장 안전하고 효율적입니다!

# 개발/테스트용 Consumer Group 설정
eventhub:
  consumer_group: "development"  # 또는 "test"

이렇게 하면:

  • 기존 프로덕션 Consumer에 영향 없음
  • 독립적으로 모든 이벤트를 처음부터 읽을 수 있음
  • 여러 번 테스트 가능