mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 07:56:24 +00:00
10 KiB
10 KiB
Event Hub Consumer Guide
핵심 개념: Partition Ownership (파티션 소유권)
Event Hub에서는 같은 Consumer Group 내에서 하나의 파티션은 동시에 오직 하나의 Consumer만 읽을 수 있습니다. 이를 "Exclusive Consumer" 패턴이라고 합니다.
왜 이런 제약이 있나요?
1. 순서 보장 (Ordering)
파티션 0: [이벤트1] → [이벤트2] → [이벤트3]
❌ 잘못된 경우 (여러 Consumer가 동시 읽기):
Consumer A: 이벤트1 처리 중... (느림)
Consumer B: 이벤트2 처리 완료 ✓
Consumer C: 이벤트3 처리 완료 ✓
→ 처리 순서: 2 → 3 → 1 (순서 뒤바뀜!)
✅ 올바른 경우 (하나의 Consumer만):
Consumer A: 이벤트1 ✓ → 이벤트2 ✓ → 이벤트3 ✓
→ 처리 순서: 1 → 2 → 3 (순서 보장!)
2. Checkpoint 일관성
❌ 여러 Consumer가 각자 checkpoint:
Consumer A: offset 100까지 읽음 → checkpoint 저장
Consumer B: offset 150까지 읽음 → checkpoint 덮어씀
Consumer A 재시작 → offset 150부터 읽음 → offset 100~149 누락!
✅ 하나의 Consumer만:
Consumer A: offset 100 → 110 → 120 → ... (순차적)
재시작 시 → 마지막 checkpoint부터 정확히 재개
3. 중복 처리 방지
❌ 여러 Consumer가 동일 이벤트 읽기:
Consumer A: 주문 이벤트 처리 → 결제 완료
Consumer B: 동일 주문 이벤트 처리 → 중복 결제!
✅ 하나의 Consumer만:
Consumer A: 주문 이벤트 처리 → 1번만 결제 ✓
Ownership Claim 메커니즘
동작 과정
Consumer A (PID 51257) - 먼저 시작
↓
Blob Storage에 파티션 0 소유권 요청
↓
✅ 승인 (Owner: A, Lease: 30초)
↓
30초마다 Lease 갱신
↓
계속 소유권 유지
Consumer B (테스트) - 나중에 시작
↓
Blob Storage에 파티션 0 소유권 요청
↓
❌ 거부 (이미 A가 소유 중)
↓
"hasn't claimed an ownership" 로그
↓
계속 재시도 (대기 상태)
Blob Storage에 저장되는 정보
{
"partitionId": "0",
"ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95",
"lastModifiedTime": "2025-10-29T02:17:49Z",
"eTag": "\"0x8DCF7E8F9B3C1A0\"",
"offset": "120259090624",
"sequenceNumber": 232
}
현재 상황 분석
Event Hub: hgzero-eventhub-name
├─ 파티션 수: 1개 (파티션 0)
└─ Consumer Group: $Default
실행 중:
├─ Consumer A (PID 51257): 파티션 0 소유 ✅
│ ├─ 이벤트 정상 수신 중
│ ├─ Lease 주기적 갱신 중
│ └─ Checkpoint 저장 중
│
└─ 테스트 Consumer들: 소유권 없음 ❌
├─ 파티션 0 claim 시도
├─ 계속 거부당함
├─ "hasn't claimed an ownership" 로그
└─ 이벤트 수신 불가
해결 방법
Option 1: 기존 Consumer 종료 후 재시작
# 기존 Consumer 종료
kill 51257
# 새로 시작
cd /Users/daewoong/home/workspace/HGZero/rag
python start_consumer.py
장점: 간단 단점: 다운타임 발생 (Lease 만료까지 최대 30초)
Option 2: 다른 Consumer Group 사용 (권장)
# config.yaml
eventhub:
consumer_group: "test-group" # $Default 대신 사용
장점:
- 기존 Consumer에 영향 없음
- 독립적으로 모든 이벤트 읽기 가능
- 개발/테스트에 이상적
단점: 리소스 추가 사용
Option 3: 파티션 수평 확장
Event Hub 파티션 증가: 1개 → 3개
Consumer 실행: 3개
분산:
├─ Consumer A: 파티션 0
├─ Consumer B: 파티션 1
└─ Consumer C: 파티션 2
→ 병렬 처리로 3배 성능 향상!
장점: 높은 처리량 단점: 비용 증가, 전체 순서는 보장 안 됨 (파티션 내 순서만 보장)
Consumer Group 비교
┌─────────────────────────────────────────┐
│ Event Hub: hgzero-eventhub │
│ 파티션 0: [이벤트들...] │
└─────────────────────────────────────────┘
│
├─────────────────┬─────────────────┐
│ │ │
Consumer Group Consumer Group Consumer Group
"$Default" "analytics" "backup"
│ │ │
Consumer A Consumer B Consumer C
(RAG 처리) (분석 처리) (백업 처리)
│ │ │
각자 독립적으로 동일한 파티션 0의 모든 이벤트 읽음
각자 독립적인 Checkpoint 유지
파티션과 Consumer 수 관계
Case 1: Consumer 1개
Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default
├─ Consumer A: P0, P1, P2 모두 소유
└─ 모든 파티션 처리 (순차적)
Case 2: Consumer 3개 (이상적)
Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default
├─ Consumer A: P0 소유
├─ Consumer B: P1 소유
└─ Consumer C: P2 소유
→ 병렬 처리로 최대 성능!
Case 3: Consumer 5개 (과잉)
Event Hub: 파티션 3개 (P0, P1, P2)
Consumer Group: $Default
├─ Consumer A: P0 소유
├─ Consumer B: P1 소유
├─ Consumer C: P2 소유
├─ Consumer D: 소유한 파티션 없음 (대기)
└─ Consumer E: 소유한 파티션 없음 (대기)
→ D, E는 이벤트를 읽지 못하고 대기만 함
베스트 프랙티스
| 환경 | Consumer 수 | Consumer Group | 파티션 수 |
|---|---|---|---|
| 프로덕션 | = 파티션 수 | production | 처리량에 맞게 |
| 개발 | 1개 | development | 1~2개 |
| 테스트 | 1개 | test | 1개 |
| 분석 | 1개 | analytics | (공유) |
권장 사항
-
프로덕션 환경
- Consumer 수 = 파티션 수 (1:1 매핑)
- 고가용성을 위해 각 Consumer를 다른 서버에 배치
- Consumer 수 > 파티션 수로 설정하면 일부는 대기 상태 (Standby)
-
개발/테스트 환경
- 별도 Consumer Group 사용
- 파티션 1개로 충분
- 필요시 checkpoint를 초기화하여 처음부터 재처리
-
모니터링
- Ownership claim 실패 로그 모니터링
- Lease 갱신 실패 알림 설정
- Checkpoint lag 모니터링
-
장애 복구
- Lease timeout 고려 (기본 30초)
- Consumer 장애 시 자동 재분배 (30초 이내)
- Checkpoint로부터 정확한 위치에서 재개
Consumer 프로세스 관리 명령어
프로세스 확인
# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# 상세 정보 (실행시간, CPU, 메모리)
ps -p <PID> -o pid,etime,%cpu,%mem,cmd
# 네트워크 연결 확인
lsof -i -n | grep <PID>
# 모든 Python 프로세스 확인
ps aux | grep python | grep -v grep
프로세스 종료
# 정상 종료 (SIGTERM)
kill <PID>
# 강제 종료 (SIGKILL)
kill -9 <PID>
# 이름으로 종료
pkill -f start_consumer.py
테스트 이벤트 전송
from azure.eventhub import EventHubProducerClient, EventData
import json
import os
from dotenv import load_dotenv
load_dotenv('rag/.env')
conn_str = os.getenv('EVENTHUB_CONNECTION_STRING')
eventhub_name = os.getenv('EVENTHUB_NAME')
test_event = {
'eventType': 'MINUTES_FINALIZED',
'data': {
'meetingId': 'test-meeting-001',
'title': '테스트 회의',
'minutesId': 'test-minutes-001',
'sections': [
{
'sectionId': 'section-001',
'type': 'DISCUSSION',
'title': '논의 사항',
'content': '테스트 논의 내용입니다.',
'order': 1,
'verified': True
}
]
}
}
producer = EventHubProducerClient.from_connection_string(
conn_str=conn_str,
eventhub_name=eventhub_name
)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(json.dumps(test_event)))
producer.send_batch(event_data_batch)
print('✅ 테스트 이벤트 전송 완료')
producer.close()
Event Hub 파티션 정보 조회
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
async def check_partitions():
client = EventHubConsumerClient.from_connection_string(
conn_str=EVENTHUB_CONNECTION_STRING,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME
)
async with client:
partition_ids = await client.get_partition_ids()
print(f"파티션 개수: {len(partition_ids)}")
print(f"파티션 IDs: {partition_ids}")
for partition_id in partition_ids:
props = await client.get_partition_properties(partition_id)
print(f"\n파티션 {partition_id}:")
print(f" 시퀀스 번호: {props['last_enqueued_sequence_number']}")
print(f" 오프셋: {props['last_enqueued_offset']}")
print(f" 마지막 이벤트 시간: {props['last_enqueued_time_utc']}")
asyncio.run(check_partitions())
정리
"에러"가 아니라 "설계된 동작"입니다
- ✅ 정상: Consumer A가 파티션 소유 → 이벤트 처리
- ✅ 정상: Consumer B가 claim 실패 → 대기
- ✅ 정상: Consumer A 종료 시 → Consumer B가 자동 인수
이 메커니즘의 장점
- 📌 순서 보장: 파티션 내 이벤트 순서 유지
- 📌 정확히 한 번 처리: 중복 처리 방지
- 📌 자동 장애 복구: Consumer 장애 시 자동 재분배
- 📌 수평 확장: 파티션 추가로 처리량 증가
현재 상황 해결
권장: 다른 Consumer Group을 사용하여 테스트하시는 것이 가장 안전하고 효율적입니다!
# 개발/테스트용 Consumer Group 설정
eventhub:
consumer_group: "development" # 또는 "test"
이렇게 하면:
- 기존 프로덕션 Consumer에 영향 없음
- 독립적으로 모든 이벤트를 처음부터 읽을 수 있음
- 여러 번 테스트 가능