mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 18:26:23 +00:00
8.3 KiB
8.3 KiB
Event Hub Active Consumers 조회 가이드
개요
Event Hub를 현재 읽고 있는 Consumer(host)를 확인하는 방법을 설명합니다.
방법 1: 제공된 스크립트 사용 (가장 쉬움) ⭐️
실행
cd /Users/daewoong/home/workspace/HGZero/rag
python check_active_consumers.py
출력 정보
📊 Event Hub Active Consumers
================================================================================
Consumer Group: $Default
Container: hgzero-checkpoints
🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)
--------------------------------------------------------------------------------
파티션 0:
상태: 🟢 ACTIVE
Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95...
마지막 갱신: 2025-10-29 12:35:42 UTC
경과 시간: 15초 전
📋 Consumer 요약
--------------------------------------------------------------------------------
Consumer #1 🟢 ACTIVE
Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95
소유 파티션: 0
파티션 개수: 1
마지막 활동: 2025-10-29 12:35:42 UTC
📍 Checkpoint 정보 (마지막 읽은 위치)
--------------------------------------------------------------------------------
파티션 0:
Offset: 120259090624
Sequence Number: 232
방법 2: Python 코드로 직접 조회
스크립트 예시
import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from src.utils.config import load_config
async def check_consumers():
"""활성 Consumer 조회"""
# 설정 로드
config_path = Path('config.yaml')
config = load_config(str(config_path))
eventhub_config = config['eventhub']
storage_conn_str = eventhub_config['storage']['connection_string']
container_name = eventhub_config['storage']['container_name']
consumer_group = eventhub_config['consumer_group']
# Blob Service Client
blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
container = blob_service.get_container_client(container_name)
# Ownership 정보 조회
ownership_prefix = f"{consumer_group}/ownership/"
print(f"Consumer Group: {consumer_group}\n")
for blob in container.list_blobs(name_starts_with=ownership_prefix):
blob_client = container.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
ownership_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
owner_id = ownership_data.get('ownerIdentifier', 'unknown')
# 활성 상태 확인 (60초 이내 갱신)
now = datetime.now(timezone.utc)
time_diff = (now - blob.last_modified).total_seconds()
is_active = time_diff < 60
status = "🟢 ACTIVE" if is_active else "🔴 INACTIVE"
print(f"파티션 {partition_id}: {status}")
print(f" Owner: {owner_id}")
print(f" 갱신: {int(time_diff)}초 전\n")
asyncio.run(check_consumers())
방법 3: Azure Portal에서 확인
단계
- Azure Portal 접속 (https://portal.azure.com)
- Storage Account 이동
- "hgzerostorage" 검색
- Containers 클릭
- "hgzero-checkpoints" 선택
- 폴더 구조 확인
$Default/ ├── ownership/ │ └── 0 ← 파티션 0의 소유권 정보 └── checkpoint/ └── 0 ← 파티션 0의 checkpoint - ownership/0 파일 다운로드 또는 View 클릭
- JSON 내용 확인
{ "ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95", "lastModifiedTime": "2025-10-29T12:35:42Z", ... }
정보 해석
- ownerIdentifier: 현재 Consumer의 고유 ID
- lastModifiedTime: 마지막 lease 갱신 시간
- 60초 이내: 🟢 활성 상태
- 60초 이상: 🔴 비활성 (Consumer 종료됨)
방법 4: Azure CLI로 조회
Blob 목록 확인
# Storage Account 키 조회
az storage account keys list \
--resource-group rg-digitalgarage-02 \
--account-name hgzerostorage \
--query "[0].value" -o tsv
# Blob 목록 조회
az storage blob list \
--account-name hgzerostorage \
--container-name hgzero-checkpoints \
--prefix "\$Default/ownership/" \
--output table
Blob 내용 다운로드
# ownership 정보 다운로드
az storage blob download \
--account-name hgzerostorage \
--container-name hgzero-checkpoints \
--name "\$Default/ownership/0" \
--file /tmp/ownership.json
# 내용 확인
cat /tmp/ownership.json | python3 -m json.tool
방법 5: 프로세스 레벨에서 확인
로컬 머신에서 실행 중인 Consumer
# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep
# 출력:
# daewoong 81447 0.0 0.2 python start_consumer.py
네트워크 연결 확인
# Event Hub와의 연결 확인
lsof -i -n | grep 81447 | grep ESTABLISHED
# 출력:
# python3.1 81447 daewoong 9u IPv6 TCP ... -> ...servicebus.windows.net:https (ESTABLISHED)
의미:
- Consumer 프로세스가 실행 중
- Event Hub와 HTTPS 연결 유지 중
- 이벤트 수신 대기 중
주요 개념
Ownership (소유권)
파티션 0
↓
Ownership Blob (Blob Storage)
↓
{
"ownerIdentifier": "consumer-uuid",
"lastModifiedTime": "2025-10-29T12:35:42Z",
"eTag": "...",
...
}
↓
Lease 메커니즘 (30초마다 갱신)
↓
갱신 실패 시 → 다른 Consumer가 인수 가능
Checkpoint (읽은 위치)
파티션 0
↓
Checkpoint Blob
↓
{
"offset": "120259090624",
"sequenceNumber": 232,
...
}
↓
Consumer 재시작 시 → 이 위치부터 재개
Consumer 상태 판단
| 조건 | 상태 | 의미 |
|---|---|---|
| lastModifiedTime < 60초 | 🟢 ACTIVE | 정상 동작 중 |
| 60초 < lastModifiedTime < 180초 | 🟡 WARNING | Lease 갱신 지연 |
| lastModifiedTime > 180초 | 🔴 INACTIVE | Consumer 종료 |
트러블슈팅
문제 1: Ownership 정보가 없음
증상:
⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.
원인:
- Consumer가 한 번도 실행되지 않음
- Consumer가 ownership claim에 실패함
해결:
- Consumer 프로세스 확인:
ps aux | grep start_consumer - Consumer 로그 확인
- 권한 확인 (Storage Account 접근 권한)
문제 2: 여러 Owner ID가 나타남
증상:
Consumer #1: owner-id-1 (파티션 0, 2, 4)
Consumer #2: owner-id-2 (파티션 1, 3, 5)
원인:
- 정상: 여러 Consumer가 파티션을 나눠서 처리
- 파티션이 여러 개이고 Consumer도 여러 개
확인:
- 각 파티션이 하나의 Consumer만 소유하면 정상
- 동일 파티션을 여러 Consumer가 소유하면 문제
문제 3: lastModified가 오래됨
증상:
파티션 0:
상태: 🔴 INACTIVE
마지막 갱신: 2025-10-29 10:00:00 UTC
경과 시간: 7200초 전 (2시간)
원인:
- Consumer가 종료됨
- Consumer가 응답 없음 (hang)
- 네트워크 문제
해결:
- Consumer 프로세스 확인
- Consumer 재시작
- 로그 확인하여 에러 파악
실시간 모니터링
스크립트: 주기적 확인
#!/bin/bash
# monitor_consumers.sh
while true; do
clear
echo "=== $(date) ==="
python check_active_consumers.py
sleep 30 # 30초마다 갱신
done
실행
chmod +x monitor_consumers.sh
./monitor_consumers.sh
요약
빠른 확인 방법
# 1. 스크립트 실행 (가장 쉬움)
python check_active_consumers.py
# 2. 프로세스 확인
ps aux | grep start_consumer
# 3. Azure Portal에서 Blob 확인
# Storage Account > Containers > hgzero-checkpoints > $Default/ownership
확인 항목 체크리스트
- Consumer 프로세스 실행 중인가?
- Ownership 정보가 있는가?
- lastModified가 60초 이내인가?
- 각 파티션이 하나의 Consumer만 소유하는가?
- Checkpoint가 갱신되고 있는가?
모든 항목이 체크되면 Consumer가 정상 동작 중입니다! ✅
참고 파일
- 스크립트:
/Users/daewoong/home/workspace/HGZero/rag/check_active_consumers.py - 설정:
/Users/daewoong/home/workspace/HGZero/rag/config.yaml - Blob Storage:
hgzerostorage/hgzero-checkpoints