mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 23:06:23 +00:00
192 lines
6.3 KiB
Python
192 lines
6.3 KiB
Python
"""
|
|
Event Hub의 활성 Consumer(host) 조회 스크립트
|
|
|
|
Blob Storage의 ownership 정보를 통해 현재 어떤 Consumer가
|
|
어떤 파티션을 읽고 있는지 확인합니다.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from azure.storage.blob import BlobServiceClient
|
|
from src.utils.config import load_config
|
|
|
|
|
|
async def check_active_consumers():
|
|
"""활성 Consumer 조회"""
|
|
|
|
# 설정 로드
|
|
config_path = Path(__file__).parent / "config.yaml"
|
|
config = load_config(str(config_path))
|
|
|
|
eventhub_config = config['eventhub']
|
|
storage_conn_str = eventhub_config['storage']['connection_string']
|
|
container_name = eventhub_config['storage']['container_name']
|
|
consumer_group = eventhub_config['consumer_group']
|
|
|
|
print("=" * 80)
|
|
print("📊 Event Hub Active Consumers")
|
|
print("=" * 80)
|
|
print(f"Consumer Group: {consumer_group}")
|
|
print(f"Container: {container_name}")
|
|
print()
|
|
|
|
# Blob Service Client 생성
|
|
blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
|
|
container_client = blob_service.get_container_client(container_name)
|
|
|
|
# Ownership blob 조회 (파티션 소유권 정보)
|
|
ownership_prefix = f"{consumer_group}/ownership/"
|
|
checkpoint_prefix = f"{consumer_group}/checkpoint/"
|
|
|
|
print("-" * 80)
|
|
print("🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)")
|
|
print("-" * 80)
|
|
|
|
ownership_blobs = container_client.list_blobs(name_starts_with=ownership_prefix)
|
|
active_consumers = {}
|
|
partition_owners = {}
|
|
|
|
for blob in ownership_blobs:
|
|
# Blob 내용 읽기
|
|
blob_client = container_client.get_blob_client(blob.name)
|
|
content = blob_client.download_blob().readall()
|
|
|
|
try:
|
|
ownership_data = json.loads(content)
|
|
|
|
partition_id = blob.name.split('/')[-1]
|
|
owner_id = ownership_data.get('ownerIdentifier', 'unknown')
|
|
last_modified = blob.last_modified
|
|
|
|
# 현재 시간과 비교하여 활성 상태 확인 (30초 이내 갱신되었으면 활성)
|
|
now = datetime.now(timezone.utc)
|
|
time_diff = (now - last_modified).total_seconds()
|
|
is_active = time_diff < 60 # 60초 이내면 활성으로 간주
|
|
|
|
partition_owners[partition_id] = {
|
|
'owner_id': owner_id,
|
|
'last_modified': last_modified,
|
|
'is_active': is_active,
|
|
'time_diff': time_diff
|
|
}
|
|
|
|
# Consumer별 집계
|
|
if owner_id not in active_consumers:
|
|
active_consumers[owner_id] = {
|
|
'partitions': [],
|
|
'last_seen': last_modified,
|
|
'is_active': is_active
|
|
}
|
|
|
|
active_consumers[owner_id]['partitions'].append(partition_id)
|
|
|
|
# 가장 최근 시간으로 업데이트
|
|
if last_modified > active_consumers[owner_id]['last_seen']:
|
|
active_consumers[owner_id]['last_seen'] = last_modified
|
|
active_consumers[owner_id]['is_active'] = is_active
|
|
|
|
except json.JSONDecodeError:
|
|
print(f"⚠️ 파티션 {partition_id}: 파싱 실패")
|
|
continue
|
|
|
|
# 파티션별 출력
|
|
if partition_owners:
|
|
print()
|
|
for partition_id, info in sorted(partition_owners.items()):
|
|
status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE"
|
|
print(f"파티션 {partition_id}:")
|
|
print(f" 상태: {status}")
|
|
print(f" Owner ID: {info['owner_id'][:40]}...")
|
|
print(f" 마지막 갱신: {info['last_modified'].strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
|
print(f" 경과 시간: {int(info['time_diff'])}초 전")
|
|
print()
|
|
else:
|
|
print("⚠️ 소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.")
|
|
print()
|
|
|
|
# Consumer별 요약
|
|
print("-" * 80)
|
|
print("📋 Consumer 요약")
|
|
print("-" * 80)
|
|
|
|
if active_consumers:
|
|
for idx, (owner_id, info) in enumerate(active_consumers.items(), 1):
|
|
status = "🟢 ACTIVE" if info['is_active'] else "🔴 INACTIVE"
|
|
print(f"\nConsumer #{idx} {status}")
|
|
print(f" Owner ID: {owner_id}")
|
|
print(f" 소유 파티션: {', '.join(info['partitions'])}")
|
|
print(f" 파티션 개수: {len(info['partitions'])}")
|
|
print(f" 마지막 활동: {info['last_seen'].strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
|
else:
|
|
print("\n활성 Consumer가 없습니다.")
|
|
|
|
# Checkpoint 정보 조회
|
|
print()
|
|
print("-" * 80)
|
|
print("📍 Checkpoint 정보 (마지막 읽은 위치)")
|
|
print("-" * 80)
|
|
|
|
checkpoint_blobs = container_client.list_blobs(name_starts_with=checkpoint_prefix)
|
|
|
|
for blob in checkpoint_blobs:
|
|
blob_client = container_client.get_blob_client(blob.name)
|
|
content = blob_client.download_blob().readall()
|
|
|
|
try:
|
|
checkpoint_data = json.loads(content)
|
|
|
|
partition_id = blob.name.split('/')[-1]
|
|
offset = checkpoint_data.get('offset', 'N/A')
|
|
sequence_number = checkpoint_data.get('sequenceNumber', 'N/A')
|
|
|
|
print(f"\n파티션 {partition_id}:")
|
|
print(f" Offset: {offset}")
|
|
print(f" Sequence Number: {sequence_number}")
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
print()
|
|
print("=" * 80)
|
|
|
|
# 모든 Consumer Group 조회
|
|
print()
|
|
print("-" * 80)
|
|
print("📂 모든 Consumer Groups")
|
|
print("-" * 80)
|
|
|
|
all_blobs = container_client.list_blobs()
|
|
consumer_groups = set()
|
|
|
|
for blob in all_blobs:
|
|
# Consumer Group 이름은 첫 번째 경로 세그먼트
|
|
parts = blob.name.split('/')
|
|
if len(parts) > 0:
|
|
consumer_groups.add(parts[0])
|
|
|
|
if consumer_groups:
|
|
print()
|
|
for cg in sorted(consumer_groups):
|
|
current = " ⬅️ 현재" if cg == consumer_group else ""
|
|
print(f" - {cg}{current}")
|
|
else:
|
|
print("\nConsumer Group 정보가 없습니다.")
|
|
|
|
print()
|
|
print("=" * 80)
|
|
|
|
|
|
def main():
|
|
"""메인 함수"""
|
|
try:
|
|
asyncio.run(check_active_consumers())
|
|
except Exception as e:
|
|
print(f"❌ 에러 발생: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|