# app/services/vector_service.py import os import json import logging import tempfile from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import chromadb from chromadb.config import Settings as ChromaSettings from sentence_transformers import SentenceTransformer from ..config.settings import settings from ..utils.data_utils import ( create_store_hash, combine_store_and_reviews, generate_review_summary, extract_text_for_embedding, create_metadata, is_duplicate_store ) logger = logging.getLogger(__name__) class VectorService: """Vector DB 서비스""" def __init__(self): self.db_path = settings.VECTOR_DB_PATH self.collection_name = settings.VECTOR_DB_COLLECTION self.embedding_model_name = settings.EMBEDDING_MODEL # 상태 변수 self.client = None self.collection = None self.embedding_model = None self.initialization_error = None # 안전한 초기화 시도 self._safe_initialize() def _safe_initialize(self): """안전한 초기화""" try: logger.info("🔧 VectorService 초기화 시작...") # 1단계: 디렉토리 권한 확인 self._ensure_directory_permissions() # 2단계: ChromaDB 초기화 self._initialize_chromadb() # 3단계: 임베딩 모델 로드 self._initialize_embedding_model() logger.info("✅ VectorService 초기화 완료") except Exception as e: self.initialization_error = str(e) logger.error(f"❌ VectorService 초기화 실패: {e}") logger.info("🔄 서비스는 런타임에 재시도 가능합니다") def _ensure_directory_permissions(self): """Vector DB 디렉토리 권한을 확인하고 생성""" try: logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}") # 절대 경로로 변환 abs_path = os.path.abspath(self.db_path) # 디렉토리 생성 os.makedirs(abs_path, mode=0o755, exist_ok=True) # 권한 확인 if not os.access(abs_path, os.W_OK): logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}") # 권한 변경 시도 try: os.chmod(abs_path, 0o755) logger.info("✅ 디렉토리 권한 변경 성공") except Exception as chmod_error: logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}") # 임시 디렉토리로 대체 temp_dir = tempfile.mkdtemp(prefix="vectordb_") logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}") self.db_path = temp_dir abs_path = temp_dir # 테스트 파일 생성/삭제로 권한 확인 test_file = os.path.join(abs_path, "test_permissions.tmp") try: with open(test_file, 'w') as f: f.write("test") os.remove(test_file) logger.info("✅ 디렉토리 권한 확인 완료") except Exception as test_error: raise Exception(f"디렉토리 권한 테스트 실패: {test_error}") except Exception as e: raise Exception(f"디렉토리 설정 실패: {e}") def _initialize_chromadb(self): """ChromaDB 초기화""" try: logger.info("🔧 ChromaDB 클라이언트 초기화...") # ChromaDB 클라이언트 생성 self.client = chromadb.PersistentClient( path=self.db_path, settings=ChromaSettings( anonymized_telemetry=False, allow_reset=True, ) ) # Collection 가져오기 또는 생성 try: self.collection = self.client.get_collection(name=self.collection_name) logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}") except Exception: self.collection = self.client.create_collection( name=self.collection_name, metadata={"hnsw:space": "cosine"} ) logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}") except Exception as e: raise Exception(f"ChromaDB 초기화 실패: {e}") def _initialize_embedding_model(self): """임베딩 모델 초기화""" try: logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}") self.embedding_model = SentenceTransformer(self.embedding_model_name) logger.info("✅ 임베딩 모델 로드 완료") except Exception as e: raise Exception(f"임베딩 모델 로드 실패: {e}") def is_ready(self) -> bool: """서비스 준비 상태 확인""" return all([ self.client is not None, self.collection is not None, self.embedding_model is not None, self.initialization_error is None ]) async def build_vector_store( self, target_store_info: Dict[str, Any], review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str ) -> Dict[str, Any]: """Vector Store를 구축합니다""" if not self.is_ready(): raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}") try: logger.info("🚀 Vector Store 구축 시작") processed_count = 0 documents = [] embeddings = [] metadatas = [] ids = [] for store_id, store_info, reviews in review_results: try: # 텍스트 추출 및 임베딩 생성 text_for_embedding = extract_text_for_embedding(store_info, reviews) embedding = self.embedding_model.encode(text_for_embedding) # 메타데이터 생성 metadata = create_metadata(store_info, food_category, region) # 문서 ID 생성 document_id = create_store_hash(store_id, store_info.get('name', ''), region) # 문서 데이터 생성 document_text = combine_store_and_reviews(store_info, reviews) documents.append(document_text) embeddings.append(embedding.tolist()) metadatas.append(metadata) ids.append(document_id) processed_count += 1 except Exception as e: logger.warning(f"⚠️ 가게 {store_id} 처리 실패: {e}") continue # Vector DB에 저장 if documents: self.collection.add( documents=documents, embeddings=embeddings, metadatas=metadatas, ids=ids ) logger.info(f"✅ Vector Store 구축 완료: {processed_count}개 문서 저장") return { 'success': True, 'processed_count': processed_count, 'message': f"{processed_count}개 문서가 Vector DB에 저장되었습니다" } else: return { 'success': False, 'error': '저장할 문서가 없습니다', 'processed_count': 0 } except Exception as e: logger.error(f"❌ Vector Store 구축 실패: {e}") return { 'success': False, 'error': str(e), 'processed_count': 0 } def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[str]: """유사 케이스 검색 (개선된 버전)""" if not self.is_ready(): logger.warning("VectorService가 준비되지 않음") return None try: # 검색 쿼리 생성 query_text = f"가게 ID: {store_id} 요청사항: {context}" query_embedding = self.embedding_model.encode(query_text) # 유사도 검색 results = self.collection.query( query_embeddings=[query_embedding.tolist()], n_results=limit, include=['documents', 'metadatas', 'distances'] ) if results['documents'] and results['documents'][0]: # 검색 결과 요약 context_parts = [] for i, (doc, metadata, distance) in enumerate(zip( results['documents'][0], results['metadatas'][0], results['distances'][0] )): store_name = metadata.get('store_name', 'Unknown') category = metadata.get('food_category', 'Unknown') context_parts.append( f"유사 가게 {i+1}: {store_name} ({category}) - 유사도: {1-distance:.3f}" ) return "\n".join(context_parts) return None except Exception as e: logger.error(f"유사 케이스 검색 실패: {e}") return None def get_db_status(self) -> Dict[str, Any]: """DB 상태 정보 반환""" try: if not self.is_ready(): return { 'collection_name': self.collection_name, 'total_documents': 0, 'total_stores': 0, 'db_path': self.db_path, 'status': 'not_ready', 'error': self.initialization_error } # 컬렉션 정보 조회 count = self.collection.count() return { 'collection_name': self.collection_name, 'total_documents': count, 'total_stores': count, # 각 문서가 하나의 가게를 나타냄 'db_path': self.db_path, 'status': 'ready', 'last_updated': datetime.now().isoformat() } except Exception as e: logger.error(f"DB 상태 조회 실패: {e}") return { 'collection_name': self.collection_name, 'total_documents': 0, 'total_stores': 0, 'db_path': self.db_path, 'status': 'error', 'error': str(e) }