hgzero/rag/docs/check_consumers_guide.md

8.3 KiB
Raw Blame History

Event Hub Active Consumers 조회 가이드

개요

Event Hub를 현재 읽고 있는 Consumer(host)를 확인하는 방법을 설명합니다.

방법 1: 제공된 스크립트 사용 (가장 쉬움)

실행

cd /Users/daewoong/home/workspace/HGZero/rag
python check_active_consumers.py

출력 정보

📊 Event Hub Active Consumers
================================================================================
Consumer Group: $Default
Container: hgzero-checkpoints

🔍 Ownership 정보 (현재 파티션을 읽고 있는 Consumer)
--------------------------------------------------------------------------------
파티션 0:
  상태: 🟢 ACTIVE
  Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95...
  마지막 갱신: 2025-10-29 12:35:42 UTC
  경과 시간: 15초 전

📋 Consumer 요약
--------------------------------------------------------------------------------
Consumer #1 🟢 ACTIVE
  Owner ID: 73fda457-b555-4af5-873a-54a2baa5fd95
  소유 파티션: 0
  파티션 개수: 1
  마지막 활동: 2025-10-29 12:35:42 UTC

📍 Checkpoint 정보 (마지막 읽은 위치)
--------------------------------------------------------------------------------
파티션 0:
  Offset: 120259090624
  Sequence Number: 232

방법 2: Python 코드로 직접 조회

스크립트 예시

import asyncio
import json
from datetime import datetime, timezone
from pathlib import Path
from azure.storage.blob import BlobServiceClient
from src.utils.config import load_config


async def check_consumers():
    """활성 Consumer 조회"""

    # 설정 로드
    config_path = Path('config.yaml')
    config = load_config(str(config_path))

    eventhub_config = config['eventhub']
    storage_conn_str = eventhub_config['storage']['connection_string']
    container_name = eventhub_config['storage']['container_name']
    consumer_group = eventhub_config['consumer_group']

    # Blob Service Client
    blob_service = BlobServiceClient.from_connection_string(storage_conn_str)
    container = blob_service.get_container_client(container_name)

    # Ownership 정보 조회
    ownership_prefix = f"{consumer_group}/ownership/"

    print(f"Consumer Group: {consumer_group}\n")

    for blob in container.list_blobs(name_starts_with=ownership_prefix):
        blob_client = container.get_blob_client(blob.name)
        content = blob_client.download_blob().readall()

        ownership_data = json.loads(content)
        partition_id = blob.name.split('/')[-1]
        owner_id = ownership_data.get('ownerIdentifier', 'unknown')

        # 활성 상태 확인 (60초 이내 갱신)
        now = datetime.now(timezone.utc)
        time_diff = (now - blob.last_modified).total_seconds()
        is_active = time_diff < 60

        status = "🟢 ACTIVE" if is_active else "🔴 INACTIVE"

        print(f"파티션 {partition_id}: {status}")
        print(f"  Owner: {owner_id}")
        print(f"  갱신: {int(time_diff)}초 전\n")


asyncio.run(check_consumers())

방법 3: Azure Portal에서 확인

단계

  1. Azure Portal 접속 (https://portal.azure.com)
  2. Storage Account 이동
    • "hgzerostorage" 검색
  3. Containers 클릭
    • "hgzero-checkpoints" 선택
  4. 폴더 구조 확인
    $Default/
    ├── ownership/
    │   └── 0  ← 파티션 0의 소유권 정보
    └── checkpoint/
        └── 0  ← 파티션 0의 checkpoint
    
  5. ownership/0 파일 다운로드 또는 View 클릭
  6. JSON 내용 확인
    {
      "ownerIdentifier": "73fda457-b555-4af5-873a-54a2baa5fd95",
      "lastModifiedTime": "2025-10-29T12:35:42Z",
      ...
    }
    

정보 해석

  • ownerIdentifier: 현재 Consumer의 고유 ID
  • lastModifiedTime: 마지막 lease 갱신 시간
    • 60초 이내: 🟢 활성 상태
    • 60초 이상: 🔴 비활성 (Consumer 종료됨)

방법 4: Azure CLI로 조회

Blob 목록 확인

# Storage Account 키 조회
az storage account keys list \
  --resource-group rg-digitalgarage-02 \
  --account-name hgzerostorage \
  --query "[0].value" -o tsv

# Blob 목록 조회
az storage blob list \
  --account-name hgzerostorage \
  --container-name hgzero-checkpoints \
  --prefix "\$Default/ownership/" \
  --output table

Blob 내용 다운로드

# ownership 정보 다운로드
az storage blob download \
  --account-name hgzerostorage \
  --container-name hgzero-checkpoints \
  --name "\$Default/ownership/0" \
  --file /tmp/ownership.json

# 내용 확인
cat /tmp/ownership.json | python3 -m json.tool

방법 5: 프로세스 레벨에서 확인

로컬 머신에서 실행 중인 Consumer

# Consumer 프로세스 확인
ps aux | grep "start_consumer.py" | grep -v grep

# 출력:
# daewoong  81447  0.0  0.2  python start_consumer.py

네트워크 연결 확인

# Event Hub와의 연결 확인
lsof -i -n | grep 81447 | grep ESTABLISHED

# 출력:
# python3.1  81447  daewoong  9u  IPv6  TCP ... -> ...servicebus.windows.net:https (ESTABLISHED)

의미:

  • Consumer 프로세스가 실행 중
  • Event Hub와 HTTPS 연결 유지 중
  • 이벤트 수신 대기 중

주요 개념

Ownership (소유권)

파티션 0
    ↓
Ownership Blob (Blob Storage)
    ↓
{
  "ownerIdentifier": "consumer-uuid",
  "lastModifiedTime": "2025-10-29T12:35:42Z",
  "eTag": "...",
  ...
}
    ↓
Lease 메커니즘 (30초마다 갱신)
    ↓
갱신 실패 시 → 다른 Consumer가 인수 가능

Checkpoint (읽은 위치)

파티션 0
    ↓
Checkpoint Blob
    ↓
{
  "offset": "120259090624",
  "sequenceNumber": 232,
  ...
}
    ↓
Consumer 재시작 시 → 이 위치부터 재개

Consumer 상태 판단

조건 상태 의미
lastModifiedTime < 60초 🟢 ACTIVE 정상 동작 중
60초 < lastModifiedTime < 180초 🟡 WARNING Lease 갱신 지연
lastModifiedTime > 180초 🔴 INACTIVE Consumer 종료

트러블슈팅

문제 1: Ownership 정보가 없음

증상:

⚠️  소유권 정보가 없습니다. Consumer가 실행되지 않았을 수 있습니다.

원인:

  • Consumer가 한 번도 실행되지 않음
  • Consumer가 ownership claim에 실패함

해결:

  1. Consumer 프로세스 확인: ps aux | grep start_consumer
  2. Consumer 로그 확인
  3. 권한 확인 (Storage Account 접근 권한)

문제 2: 여러 Owner ID가 나타남

증상:

Consumer #1: owner-id-1 (파티션 0, 2, 4)
Consumer #2: owner-id-2 (파티션 1, 3, 5)

원인:

  • 정상: 여러 Consumer가 파티션을 나눠서 처리
  • 파티션이 여러 개이고 Consumer도 여러 개

확인:

  • 각 파티션이 하나의 Consumer만 소유하면 정상
  • 동일 파티션을 여러 Consumer가 소유하면 문제

문제 3: lastModified가 오래됨

증상:

파티션 0:
  상태: 🔴 INACTIVE
  마지막 갱신: 2025-10-29 10:00:00 UTC
  경과 시간: 7200초 전 (2시간)

원인:

  • Consumer가 종료됨
  • Consumer가 응답 없음 (hang)
  • 네트워크 문제

해결:

  1. Consumer 프로세스 확인
  2. Consumer 재시작
  3. 로그 확인하여 에러 파악

실시간 모니터링

스크립트: 주기적 확인

#!/bin/bash
# monitor_consumers.sh

while true; do
    clear
    echo "=== $(date) ==="
    python check_active_consumers.py
    sleep 30  # 30초마다 갱신
done

실행

chmod +x monitor_consumers.sh
./monitor_consumers.sh

요약

빠른 확인 방법

# 1. 스크립트 실행 (가장 쉬움)
python check_active_consumers.py

# 2. 프로세스 확인
ps aux | grep start_consumer

# 3. Azure Portal에서 Blob 확인
# Storage Account > Containers > hgzero-checkpoints > $Default/ownership

확인 항목 체크리스트

  • Consumer 프로세스 실행 중인가?
  • Ownership 정보가 있는가?
  • lastModified가 60초 이내인가?
  • 각 파티션이 하나의 Consumer만 소유하는가?
  • Checkpoint가 갱신되고 있는가?

모든 항목이 체크되면 Consumer가 정상 동작 중입니다!


참고 파일

  • 스크립트: /Users/daewoong/home/workspace/HGZero/rag/check_active_consumers.py
  • 설정: /Users/daewoong/home/workspace/HGZero/rag/config.yaml
  • Blob Storage: hgzerostorage/hgzero-checkpoints