""" RAG 서비스 통합 실행 스크립트 API 서버와 Event Hub Consumer를 동시에 실행 """ import asyncio import logging import multiprocessing import signal import sys import time from pathlib import Path import uvicorn from src.utils.config import load_config, get_database_url from src.db.rag_minutes_db import RagMinutesDB from src.db.postgres_vector import PostgresVectorDB from src.utils.embedding import EmbeddingGenerator from src.services.eventhub_consumer import start_consumer # 로깅 설정 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def run_api_server(): """ REST API 서버 실행 (별도 프로세스) """ try: logger.info("=" * 50) logger.info("REST API 서버 시작") logger.info("=" * 50) uvicorn.run( "src.api.main:app", host="0.0.0.0", port=8000, log_level="info", access_log=True ) except Exception as e: logger.error(f"API 서버 실행 실패: {str(e)}") sys.exit(1) def run_event_consumer(): """ Event Hub Consumer 실행 (별도 프로세스) """ try: logger.info("=" * 50) logger.info("Event Hub Consumer 시작") logger.info("=" * 50) # 설정 로드 config_path = Path(__file__).parent / "config.yaml" config = load_config(str(config_path)) # 데이터베이스 연결 db_url = get_database_url(config) rag_minutes_db = RagMinutesDB(db_url) logger.info("RAG Minutes DB 연결 완료") # 용어집 데이터베이스 연결 term_db = PostgresVectorDB(db_url) logger.info("용어집 DB 연결 완료") # Embedding 생성기 초기화 azure_openai = config["azure_openai"] embedding_gen = EmbeddingGenerator( api_key=azure_openai["api_key"], endpoint=azure_openai["endpoint"], model=azure_openai["embedding_model"], dimension=azure_openai["embedding_dimension"], api_version=azure_openai["api_version"] ) logger.info("Embedding 생성기 초기화 완료") # Event Hub Consumer 시작 asyncio.run(start_consumer(config, rag_minutes_db, embedding_gen, term_db)) except KeyboardInterrupt: logger.info("Consumer 종료 신호 수신") except Exception as e: logger.error(f"Consumer 실행 실패: {str(e)}") sys.exit(1) def main(): """ 메인 함수: 두 프로세스를 생성하고 관리 """ logger.info("=" * 60) logger.info("RAG 서비스 통합 시작") logger.info(" - REST API 서버: http://0.0.0.0:8000") logger.info(" - Event Hub Consumer: Background") logger.info("=" * 60) # 프로세스 생성 api_process = multiprocessing.Process( target=run_api_server, name="API-Server" ) consumer_process = multiprocessing.Process( target=run_event_consumer, name="Event-Consumer" ) # 종료 시그널 핸들러 def signal_handler(signum, frame): logger.info("\n종료 신호 수신. 프로세스 종료 중...") if api_process.is_alive(): logger.info("API 서버 종료 중...") api_process.terminate() api_process.join(timeout=5) if api_process.is_alive(): api_process.kill() if consumer_process.is_alive(): logger.info("Consumer 종료 중...") consumer_process.terminate() consumer_process.join(timeout=5) if consumer_process.is_alive(): consumer_process.kill() logger.info("모든 프로세스 종료 완료") sys.exit(0) # 시그널 핸들러 등록 signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) try: # 프로세스 시작 api_process.start() time.sleep(2) # API 서버 시작 대기 consumer_process.start() time.sleep(2) # Consumer 시작 대기 logger.info("=" * 60) logger.info("모든 서비스 시작 완료") logger.info(f" - API Server PID: {api_process.pid}") logger.info(f" - Consumer PID: {consumer_process.pid}") logger.info("=" * 60) # 프로세스 모니터링 while True: if not api_process.is_alive(): logger.error("API 서버 프로세스 종료됨") consumer_process.terminate() break if not consumer_process.is_alive(): logger.error("Consumer 프로세스 종료됨") api_process.terminate() break time.sleep(5) # 대기 api_process.join() consumer_process.join() except Exception as e: logger.error(f"서비스 실행 중 에러: {str(e)}") api_process.terminate() consumer_process.terminate() sys.exit(1) if __name__ == "__main__": # multiprocessing을 위한 설정 multiprocessing.set_start_method('spawn', force=True) main()