hgzero/rag/scripts/check_active_consumers.py

192 lines
6.3 KiB
Python

"""
Event Hub의 활성 Consumer(host) 조회 스크립트
Blob Storage의 ownership 정보를 통해 현재 어떤 Consumer가
어떤 파티션을 읽고 있는지 확인합니다.
"""
import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from src.utils.config import load_config
async def check_active_consumers():
"""활성 Consumer 조회"""
# 설정 로드
config_path = Path(__file__).parent / "config.yaml"
config = load_config(str(config_path))
eventhub_config = config['eventhub']
storage_conn_str = eventhub_config['storage']['connection_string']
container_name = eventhub_config['storage']['container_name']
consumer_group = eventhub_config['consumer_group']
print("=" * 80)
print("📊 Event Hub Active Consumers")
print("=" * 80)
print(f"Consumer Group: {consumer_group}")
print(f"Container: {container_name}")
print()
# Blob Service Client 생성
blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
container_client = blob_service.get_container_client(container_name)
# Ownership blob 조회 (파티션 소유권 정보)
ownership_prefix = f"{consumer_group}/ownership/"
checkpoint_prefix = f"{consumer_group}/checkpoint/"
print("-" * 80)
print("🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)")
print("-" * 80)
ownership_blobs = container_client.list_blobs(name_starts_with=ownership_prefix)
active_consumers = {}
partition_owners = {}
for blob in ownership_blobs:
# Blob 내용 읽기
blob_client = container_client.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
try:
ownership_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
owner_id = ownership_data.get('ownerIdentifier', 'unknown')
last_modified = blob.last_modified
# 현재 시간과 비교하여 활성 상태 확인 (30초 이내 갱신되었으면 활성)
now = datetime.now(timezone.utc)
time_diff = (now - last_modified).total_seconds()
is_active = time_diff < 60 # 60초 이내면 활성으로 간주
partition_owners[partition_id] = {
'owner_id': owner_id,
'last_modified': last_modified,
'is_active': is_active,
'time_diff': time_diff
}
# Consumer별 집계
if owner_id not in active_consumers:
active_consumers[owner_id] = {
'partitions': [],
'last_seen': last_modified,
'is_active': is_active
}
active_consumers[owner_id]['partitions'].append(partition_id)
# 가장 최근 시간으로 업데이트
if last_modified > active_consumers[owner_id]['last_seen']:
active_consumers[owner_id]['last_seen'] = last_modified
active_consumers[owner_id]['is_active'] = is_active
except json.JSONDecodeError:
print(f"⚠️ 파티션 {partition_id}: 파싱 실패")
continue
# 파티션별 출력
if partition_owners:
print()
for partition_id, info in sorted(partition_owners.items()):
status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE"
print(f"파티션 {partition_id}:")
print(f" 상태: {status}")
print(f" Owner ID: {info['owner_id'][:40]}...")
print(f" 마지막 갱신: {info['last_modified'].strftime('%Y-%m-%d %H:%M:%S')} UTC")
print(f" 경과 시간: {int(info['time_diff'])}초 전")
print()
else:
print("⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.")
print()
# Consumer별 요약
print("-" * 80)
print("📋 Consumer 요약")
print("-" * 80)
if active_consumers:
for idx, (owner_id, info) in enumerate(active_consumers.items(), 1):
status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE"
print(f"\nConsumer #{idx} {status}")
print(f" Owner ID: {owner_id}")
print(f" 소유 파티션: {', '.join(info['partitions'])}")
print(f" 파티션 개수: {len(info['partitions'])}")
print(f" 마지막 활동: {info['last_seen'].strftime('%Y-%m-%d %H:%M:%S')} UTC")
else:
print("\n활성 Consumer가 없습니다.")
# Checkpoint 정보 조회
print()
print("-" * 80)
print("📍 Checkpoint 정보 (마지막 읽은 위치)")
print("-" * 80)
checkpoint_blobs = container_client.list_blobs(name_starts_with=checkpoint_prefix)
for blob in checkpoint_blobs:
blob_client = container_client.get_blob_client(blob.name)
content = blob_client.download_blob().readall()
try:
checkpoint_data = json.loads(content)
partition_id = blob.name.split('/')[-1]
offset = checkpoint_data.get('offset', 'N/A')
sequence_number = checkpoint_data.get('sequenceNumber', 'N/A')
print(f"\n파티션 {partition_id}:")
print(f" Offset: {offset}")
print(f" Sequence Number: {sequence_number}")
except json.JSONDecodeError:
continue
print()
print("=" * 80)
# 모든 Consumer Group 조회
print()
print("-" * 80)
print("📂 모든 Consumer Groups")
print("-" * 80)
all_blobs = container_client.list_blobs()
consumer_groups = set()
for blob in all_blobs:
# Consumer Group 이름은 첫 번째 경로 세그먼트
parts = blob.name.split('/')
if len(parts) > 0:
consumer_groups.add(parts[0])
if consumer_groups:
print()
for cg in sorted(consumer_groups):
current = " ⬅️ 현재" if cg == consumer_group else ""
print(f" - {cg}{current}")
else:
print("\nConsumer Group 정보가 없습니다.")
print()
print("=" * 80)
def main():
"""메인 함수"""
try:
asyncio.run(check_active_consumers())
except Exception as e:
print(f"❌ 에러 발생: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()