hgzero/rag/start_all_services.py

181 lines
5.1 KiB
Python

"""
RAG 서비스 통합 실행 스크립트
API 서버와 Event Hub Consumer를 동시에 실행
"""
import asyncio
import logging
import multiprocessing
import signal
import sys
import time
from pathlib import Path
import uvicorn
from src.utils.config import load_config, get_database_url
from src.db.rag_minutes_db import RagMinutesDB
from src.db.postgres_vector import PostgresVectorDB
from src.utils.embedding import EmbeddingGenerator
from src.services.eventhub_consumer import start_consumer
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def run_api_server():
"""
REST API 서버 실행 (별도 프로세스)
"""
try:
logger.info("=" * 50)
logger.info("REST API 서버 시작")
logger.info("=" * 50)
uvicorn.run(
"src.api.main:app",
host="0.0.0.0",
port=8000,
log_level="info",
access_log=True
)
except Exception as e:
logger.error(f"API 서버 실행 실패: {str(e)}")
sys.exit(1)
def run_event_consumer():
"""
Event Hub Consumer 실행 (별도 프로세스)
"""
try:
logger.info("=" * 50)
logger.info("Event Hub Consumer 시작")
logger.info("=" * 50)
# 설정 로드
config_path = Path(__file__).parent / "config.yaml"
config = load_config(str(config_path))
# 데이터베이스 연결
db_url = get_database_url(config)
rag_minutes_db = RagMinutesDB(db_url)
logger.info("RAG Minutes DB 연결 완료")
# 용어집 데이터베이스 연결
term_db = PostgresVectorDB(db_url)
logger.info("용어집 DB 연결 완료")
# Embedding 생성기 초기화
azure_openai = config["azure_openai"]
embedding_gen = EmbeddingGenerator(
api_key=azure_openai["api_key"],
endpoint=azure_openai["endpoint"],
model=azure_openai["embedding_model"],
dimension=azure_openai["embedding_dimension"],
api_version=azure_openai["api_version"]
)
logger.info("Embedding 생성기 초기화 완료")
# Event Hub Consumer 시작
asyncio.run(start_consumer(config, rag_minutes_db, embedding_gen, term_db))
except KeyboardInterrupt:
logger.info("Consumer 종료 신호 수신")
except Exception as e:
logger.error(f"Consumer 실행 실패: {str(e)}")
sys.exit(1)
def main():
"""
메인 함수: 두 프로세스를 생성하고 관리
"""
logger.info("=" * 60)
logger.info("RAG 서비스 통합 시작")
logger.info(" - REST API 서버: http://0.0.0.0:8000")
logger.info(" - Event Hub Consumer: Background")
logger.info("=" * 60)
# 프로세스 생성
api_process = multiprocessing.Process(
target=run_api_server,
name="API-Server"
)
consumer_process = multiprocessing.Process(
target=run_event_consumer,
name="Event-Consumer"
)
# 종료 시그널 핸들러
def signal_handler(signum, frame):
logger.info("\n종료 신호 수신. 프로세스 종료 중...")
if api_process.is_alive():
logger.info("API 서버 종료 중...")
api_process.terminate()
api_process.join(timeout=5)
if api_process.is_alive():
api_process.kill()
if consumer_process.is_alive():
logger.info("Consumer 종료 중...")
consumer_process.terminate()
consumer_process.join(timeout=5)
if consumer_process.is_alive():
consumer_process.kill()
logger.info("모든 프로세스 종료 완료")
sys.exit(0)
# 시그널 핸들러 등록
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
# 프로세스 시작
api_process.start()
time.sleep(2) # API 서버 시작 대기
consumer_process.start()
time.sleep(2) # Consumer 시작 대기
logger.info("=" * 60)
logger.info("모든 서비스 시작 완료")
logger.info(f" - API Server PID: {api_process.pid}")
logger.info(f" - Consumer PID: {consumer_process.pid}")
logger.info("=" * 60)
# 프로세스 모니터링
while True:
if not api_process.is_alive():
logger.error("API 서버 프로세스 종료됨")
consumer_process.terminate()
break
if not consumer_process.is_alive():
logger.error("Consumer 프로세스 종료됨")
api_process.terminate()
break
time.sleep(5)
# 대기
api_process.join()
consumer_process.join()
except Exception as e:
logger.error(f"서비스 실행 중 에러: {str(e)}")
api_process.terminate()
consumer_process.terminate()
sys.exit(1)
if __name__ == "__main__":
# multiprocessing을 위한 설정
multiprocessing.set_start_method('spawn', force=True)
main()