ai-review/vector/app/services/vector_service.py
2025-06-16 07:08:09 +09:00

294 lines
11 KiB
Python

# app/services/vector_service.py
import os
import json
import logging
import tempfile
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import chromadb
from chromadb.config import Settings as ChromaSettings
from sentence_transformers import SentenceTransformer
from ..config.settings import settings
from ..utils.data_utils import (
create_store_hash, combine_store_and_reviews, generate_review_summary,
extract_text_for_embedding, create_metadata, is_duplicate_store
)
logger = logging.getLogger(__name__)
class VectorService:
"""Vector DB 서비스"""
def __init__(self):
self.db_path = settings.VECTOR_DB_PATH
self.collection_name = settings.VECTOR_DB_COLLECTION
self.embedding_model_name = settings.EMBEDDING_MODEL
# 상태 변수
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
# 안전한 초기화 시도
self._safe_initialize()
def _safe_initialize(self):
"""안전한 초기화"""
try:
logger.info("🔧 VectorService 초기화 시작...")
# 1단계: 디렉토리 권한 확인
self._ensure_directory_permissions()
# 2단계: ChromaDB 초기화
self._initialize_chromadb()
# 3단계: 임베딩 모델 로드
self._initialize_embedding_model()
logger.info("✅ VectorService 초기화 완료")
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ VectorService 초기화 실패: {e}")
logger.info("🔄 서비스는 런타임에 재시도 가능합니다")
def _ensure_directory_permissions(self):
"""Vector DB 디렉토리 권한을 확인하고 생성"""
try:
logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}")
# 절대 경로로 변환
abs_path = os.path.abspath(self.db_path)
# 디렉토리 생성
os.makedirs(abs_path, mode=0o755, exist_ok=True)
# 권한 확인
if not os.access(abs_path, os.W_OK):
logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}")
# 권한 변경 시도
try:
os.chmod(abs_path, 0o755)
logger.info("✅ 디렉토리 권한 변경 성공")
except Exception as chmod_error:
logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}")
# 임시 디렉토리로 대체
temp_dir = tempfile.mkdtemp(prefix="vectordb_")
logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}")
self.db_path = temp_dir
abs_path = temp_dir
# 테스트 파일 생성/삭제로 권한 확인
test_file = os.path.join(abs_path, "test_permissions.tmp")
try:
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logger.info("✅ 디렉토리 권한 확인 완료")
except Exception as test_error:
raise Exception(f"디렉토리 권한 테스트 실패: {test_error}")
except Exception as e:
raise Exception(f"디렉토리 설정 실패: {e}")
def _initialize_chromadb(self):
"""ChromaDB 초기화"""
try:
logger.info("🔧 ChromaDB 클라이언트 초기화...")
# ChromaDB 클라이언트 생성
self.client = chromadb.PersistentClient(
path=self.db_path,
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True,
)
)
# Collection 가져오기 또는 생성
try:
self.collection = self.client.get_collection(name=self.collection_name)
logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}")
except Exception:
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}")
except Exception as e:
raise Exception(f"ChromaDB 초기화 실패: {e}")
def _initialize_embedding_model(self):
"""임베딩 모델 초기화"""
try:
logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}")
self.embedding_model = SentenceTransformer(self.embedding_model_name)
logger.info("✅ 임베딩 모델 로드 완료")
except Exception as e:
raise Exception(f"임베딩 모델 로드 실패: {e}")
def is_ready(self) -> bool:
"""서비스 준비 상태 확인"""
return all([
self.client is not None,
self.collection is not None,
self.embedding_model is not None,
self.initialization_error is None
])
async def build_vector_store(
self,
target_store_info: Dict[str, Any],
review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]],
food_category: str,
region: str
) -> Dict[str, Any]:
"""Vector Store를 구축합니다"""
if not self.is_ready():
raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}")
try:
logger.info("🚀 Vector Store 구축 시작")
processed_count = 0
documents = []
embeddings = []
metadatas = []
ids = []
for store_id, store_info, reviews in review_results:
try:
# 텍스트 추출 및 임베딩 생성
text_for_embedding = extract_text_for_embedding(store_info, reviews)
embedding = self.embedding_model.encode(text_for_embedding)
# 메타데이터 생성
metadata = create_metadata(store_info, food_category, region)
# 문서 ID 생성
document_id = create_store_hash(store_id, store_info.get('name', ''), region)
# 문서 데이터 생성
document_text = combine_store_and_reviews(store_info, reviews)
documents.append(document_text)
embeddings.append(embedding.tolist())
metadatas.append(metadata)
ids.append(document_id)
processed_count += 1
except Exception as e:
logger.warning(f"⚠️ 가게 {store_id} 처리 실패: {e}")
continue
# Vector DB에 저장
if documents:
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
logger.info(f"✅ Vector Store 구축 완료: {processed_count}개 문서 저장")
return {
'success': True,
'processed_count': processed_count,
'message': f"{processed_count}개 문서가 Vector DB에 저장되었습니다"
}
else:
return {
'success': False,
'error': '저장할 문서가 없습니다',
'processed_count': 0
}
except Exception as e:
logger.error(f"❌ Vector Store 구축 실패: {e}")
return {
'success': False,
'error': str(e),
'processed_count': 0
}
def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[str]:
"""유사 케이스 검색 (개선된 버전)"""
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
try:
# 검색 쿼리 생성
query_text = f"가게 ID: {store_id} 요청사항: {context}"
query_embedding = self.embedding_model.encode(query_text)
# 유사도 검색
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=limit,
include=['documents', 'metadatas', 'distances']
)
if results['documents'] and results['documents'][0]:
# 검색 결과 요약
context_parts = []
for i, (doc, metadata, distance) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
store_name = metadata.get('store_name', 'Unknown')
category = metadata.get('food_category', 'Unknown')
context_parts.append(
f"유사 가게 {i+1}: {store_name} ({category}) - 유사도: {1-distance:.3f}"
)
return "\n".join(context_parts)
return None
except Exception as e:
logger.error(f"유사 케이스 검색 실패: {e}")
return None
def get_db_status(self) -> Dict[str, Any]:
"""DB 상태 정보 반환"""
try:
if not self.is_ready():
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'not_ready',
'error': self.initialization_error
}
# 컬렉션 정보 조회
count = self.collection.count()
return {
'collection_name': self.collection_name,
'total_documents': count,
'total_stores': count, # 각 문서가 하나의 가게를 나타냄
'db_path': self.db_path,
'status': 'ready',
'last_updated': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"DB 상태 조회 실패: {e}")
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'error',
'error': str(e)
}