mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 13:46:24 +00:00
181 lines
5.1 KiB
Python
181 lines
5.1 KiB
Python
"""
|
|
RAG 서비스 통합 실행 스크립트
|
|
API 서버와 Event Hub Consumer를 동시에 실행
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import multiprocessing
|
|
import signal
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import uvicorn
|
|
|
|
from src.utils.config import load_config, get_database_url
|
|
from src.db.rag_minutes_db import RagMinutesDB
|
|
from src.db.postgres_vector import PostgresVectorDB
|
|
from src.utils.embedding import EmbeddingGenerator
|
|
from src.services.eventhub_consumer import start_consumer
|
|
|
|
# 로깅 설정
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_api_server():
|
|
"""
|
|
REST API 서버 실행 (별도 프로세스)
|
|
"""
|
|
try:
|
|
logger.info("=" * 50)
|
|
logger.info("REST API 서버 시작")
|
|
logger.info("=" * 50)
|
|
|
|
uvicorn.run(
|
|
"src.api.main:app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
log_level="info",
|
|
access_log=True
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"API 서버 실행 실패: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
|
|
def run_event_consumer():
|
|
"""
|
|
Event Hub Consumer 실행 (별도 프로세스)
|
|
"""
|
|
try:
|
|
logger.info("=" * 50)
|
|
logger.info("Event Hub Consumer 시작")
|
|
logger.info("=" * 50)
|
|
|
|
# 설정 로드
|
|
config_path = Path(__file__).parent / "config.yaml"
|
|
config = load_config(str(config_path))
|
|
|
|
# 데이터베이스 연결
|
|
db_url = get_database_url(config)
|
|
rag_minutes_db = RagMinutesDB(db_url)
|
|
logger.info("RAG Minutes DB 연결 완료")
|
|
|
|
# 용어집 데이터베이스 연결
|
|
term_db = PostgresVectorDB(db_url)
|
|
logger.info("용어집 DB 연결 완료")
|
|
|
|
# Embedding 생성기 초기화
|
|
azure_openai = config["azure_openai"]
|
|
embedding_gen = EmbeddingGenerator(
|
|
api_key=azure_openai["api_key"],
|
|
endpoint=azure_openai["endpoint"],
|
|
model=azure_openai["embedding_model"],
|
|
dimension=azure_openai["embedding_dimension"],
|
|
api_version=azure_openai["api_version"]
|
|
)
|
|
logger.info("Embedding 생성기 초기화 완료")
|
|
|
|
# Event Hub Consumer 시작
|
|
asyncio.run(start_consumer(config, rag_minutes_db, embedding_gen, term_db))
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Consumer 종료 신호 수신")
|
|
except Exception as e:
|
|
logger.error(f"Consumer 실행 실패: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
"""
|
|
메인 함수: 두 프로세스를 생성하고 관리
|
|
"""
|
|
logger.info("=" * 60)
|
|
logger.info("RAG 서비스 통합 시작")
|
|
logger.info(" - REST API 서버: http://0.0.0.0:8000")
|
|
logger.info(" - Event Hub Consumer: Background")
|
|
logger.info("=" * 60)
|
|
|
|
# 프로세스 생성
|
|
api_process = multiprocessing.Process(
|
|
target=run_api_server,
|
|
name="API-Server"
|
|
)
|
|
consumer_process = multiprocessing.Process(
|
|
target=run_event_consumer,
|
|
name="Event-Consumer"
|
|
)
|
|
|
|
# 종료 시그널 핸들러
|
|
def signal_handler(signum, frame):
|
|
logger.info("\n종료 신호 수신. 프로세스 종료 중...")
|
|
|
|
if api_process.is_alive():
|
|
logger.info("API 서버 종료 중...")
|
|
api_process.terminate()
|
|
api_process.join(timeout=5)
|
|
if api_process.is_alive():
|
|
api_process.kill()
|
|
|
|
if consumer_process.is_alive():
|
|
logger.info("Consumer 종료 중...")
|
|
consumer_process.terminate()
|
|
consumer_process.join(timeout=5)
|
|
if consumer_process.is_alive():
|
|
consumer_process.kill()
|
|
|
|
logger.info("모든 프로세스 종료 완료")
|
|
sys.exit(0)
|
|
|
|
# 시그널 핸들러 등록
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
try:
|
|
# 프로세스 시작
|
|
api_process.start()
|
|
time.sleep(2) # API 서버 시작 대기
|
|
|
|
consumer_process.start()
|
|
time.sleep(2) # Consumer 시작 대기
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("모든 서비스 시작 완료")
|
|
logger.info(f" - API Server PID: {api_process.pid}")
|
|
logger.info(f" - Consumer PID: {consumer_process.pid}")
|
|
logger.info("=" * 60)
|
|
|
|
# 프로세스 모니터링
|
|
while True:
|
|
if not api_process.is_alive():
|
|
logger.error("API 서버 프로세스 종료됨")
|
|
consumer_process.terminate()
|
|
break
|
|
|
|
if not consumer_process.is_alive():
|
|
logger.error("Consumer 프로세스 종료됨")
|
|
api_process.terminate()
|
|
break
|
|
|
|
time.sleep(5)
|
|
|
|
# 대기
|
|
api_process.join()
|
|
consumer_process.join()
|
|
|
|
except Exception as e:
|
|
logger.error(f"서비스 실행 중 에러: {str(e)}")
|
|
api_process.terminate()
|
|
consumer_process.terminate()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# multiprocessing을 위한 설정
|
|
multiprocessing.set_start_method('spawn', force=True)
|
|
main()
|