ai-review/vector/app/services/vector_service.py
2025-06-16 06:51:42 +09:00

822 lines
34 KiB
Python

# app/services/vector_service.py (개선된 버전)
import os
import json
import logging
import time
import shutil
import signal
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import chromadb
from chromadb.config import Settings as ChromaSettings
from sentence_transformers import SentenceTransformer
from ..config.settings import settings
from ..utils.data_utils import (
create_store_hash, combine_store_and_reviews, generate_review_summary,
extract_text_for_embedding, create_metadata, is_duplicate_store
)
logger = logging.getLogger(__name__)
class VectorService:
"""Vector DB 서비스 (개선된 초기화 로직)"""
def __init__(self):
self.db_path = settings.VECTOR_DB_PATH
self.collection_name = settings.VECTOR_DB_COLLECTION
self.embedding_model_name = settings.EMBEDDING_MODEL
# 상태 변수
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
# 안전한 초기화 시도
self._safe_initialize()
def _safe_initialize(self):
"""안전한 초기화 - 개선된 로직"""
try:
logger.info("🔧 VectorService 초기화 시작...")
# 1단계: 디렉토리 권한 확인
self._ensure_directory_permissions()
# 2단계: ChromaDB 초기화 (호환성 확인 포함)
self._initialize_chromadb_with_compatibility_check()
# 3단계: 임베딩 모델 로드
self._initialize_embedding_model()
logger.info("✅ VectorService 초기화 완료")
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ VectorService 초기화 실패: {e}")
logger.info("🔄 서비스는 런타임에 재시도 가능합니다")
def _ensure_directory_permissions(self):
"""Vector DB 디렉토리 권한을 확인하고 생성합니다"""
try:
logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}")
# 절대 경로로 변환
abs_path = os.path.abspath(self.db_path)
# 디렉토리 생성
os.makedirs(abs_path, mode=0o755, exist_ok=True)
# 권한 확인
if not os.access(abs_path, os.W_OK):
logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}")
# 권한 변경 시도
try:
os.chmod(abs_path, 0o755)
logger.info("✅ 디렉토리 권한 변경 성공")
except Exception as chmod_error:
logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}")
# 임시 디렉토리로 대체
import tempfile
temp_dir = tempfile.mkdtemp(prefix="vectordb_")
logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}")
self.db_path = temp_dir
abs_path = temp_dir
# 테스트 파일 생성/삭제로 권한 확인
test_file = os.path.join(abs_path, "test_permissions.tmp")
try:
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logger.info("✅ 디렉토리 권한 확인 완료")
except Exception as test_error:
raise Exception(f"디렉토리 권한 테스트 실패: {test_error}")
except Exception as e:
logger.error(f"❌ 디렉토리 설정 실패: {e}")
raise
def _initialize_chromadb_with_compatibility_check(self):
"""ChromaDB 초기화 (호환성 확인 포함)"""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
logger.info(f"🔄 ChromaDB 초기화 시도 {attempt + 1}/{max_retries}")
# 1단계: 기존 DB 호환성 확인
existing_db_valid = self._check_existing_db_compatibility()
# 2단계: ChromaDB 클라이언트 생성
self._create_chromadb_client()
# 3단계: 컬렉션 초기화
self._initialize_collection(existing_db_valid)
logger.info("✅ ChromaDB 초기화 완료")
return # 성공 시 루프 종료
except Exception as e:
logger.error(f"❌ ChromaDB 초기화 실패 (시도 {attempt + 1}): {e}")
if attempt < max_retries - 1:
logger.info(f"🔄 {retry_delay}초 후 재시도...")
time.sleep(retry_delay)
retry_delay *= 2 # 지수 백오프
else:
raise Exception(f"ChromaDB 초기화 최종 실패: {e}")
def _check_existing_db_compatibility(self):
"""기존 DB 호환성 확인"""
try:
if not os.path.exists(self.db_path):
logger.info("📁 새 DB 디렉토리 - 스키마 확인 불필요")
return False
db_files = [f for f in os.listdir(self.db_path) if not f.startswith('.')]
if not db_files:
logger.info("📁 빈 DB 디렉토리 - 스키마 확인 불필요")
return False
logger.info(f"📁 기존 DB 파일 발견: {db_files}")
# 실제 호환성 테스트
logger.info("🔍 기존 DB 호환성 테스트 중...")
try:
# 테스트용 클라이언트 생성
test_client = chromadb.PersistentClient(path=self.db_path)
test_client.heartbeat()
# 컬렉션 접근 시도
try:
test_collection = test_client.get_collection(name=self.collection_name)
count = test_collection.count()
logger.info(f"✅ 기존 DB 호환성 확인 완료: {count}개 벡터 존재")
return True
except Exception as collection_error:
logger.info(f"📝 기존 컬렉션 없음 (정상): {collection_error}")
return True # DB는 정상, 컬렉션만 새로 생성하면 됨
except Exception as compatibility_error:
# 실제 호환성 문제 발견
logger.warning(f"⚠️ 실제 호환성 문제 발견: {compatibility_error}")
self._backup_incompatible_db()
return False
except Exception as e:
logger.warning(f"⚠️ 호환성 확인 중 오류: {e}")
return False
def _backup_incompatible_db(self):
"""호환성 문제가 있는 DB 백업"""
try:
backup_path = f"{self.db_path}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.warning(f"🔄 호환성 문제로 기존 DB 백업: {backup_path}")
shutil.move(self.db_path, backup_path)
logger.info(f"✅ 기존 DB 백업 완료: {backup_path}")
# 새 디렉토리 생성
os.makedirs(self.db_path, exist_ok=True)
# 오래된 백업 정리 (7일 이상)
self._cleanup_old_backups()
except Exception as backup_error:
logger.warning(f"⚠️ 백업 실패, 강제 삭제 진행: {backup_error}")
shutil.rmtree(self.db_path, ignore_errors=True)
os.makedirs(self.db_path, exist_ok=True)
def _cleanup_old_backups(self):
"""오래된 백업 파일 정리 (7일 이상)"""
try:
base_path = os.path.dirname(self.db_path)
backup_pattern = f"{os.path.basename(self.db_path)}_backup_"
cutoff_time = time.time() - (7 * 24 * 3600) # 7일 전
for item in os.listdir(base_path):
if item.startswith(backup_pattern):
backup_path = os.path.join(base_path, item)
if os.path.isdir(backup_path) and os.path.getctime(backup_path) < cutoff_time:
shutil.rmtree(backup_path, ignore_errors=True)
logger.info(f"🗑️ 오래된 백업 삭제: {backup_path}")
except Exception as e:
logger.warning(f"⚠️ 백업 정리 중 오류: {e}")
def _create_chromadb_client(self):
"""ChromaDB 클라이언트 생성"""
try:
# 최신 버전 호환 설정
chroma_settings = ChromaSettings(
anonymized_telemetry=False,
allow_reset=True,
is_persistent=True
)
self.client = chromadb.PersistentClient(
path=self.db_path,
settings=chroma_settings
)
logger.info("✅ ChromaDB 클라이언트 생성 성공")
except Exception as modern_error:
logger.warning(f"⚠️ 최신 설정 실패, 간단한 설정으로 재시도: {modern_error}")
# 간단한 설정으로 재시도
self.client = chromadb.PersistentClient(path=self.db_path)
logger.info("✅ ChromaDB 간단 설정 클라이언트 생성 성공")
# 연결 테스트
try:
self.client.heartbeat()
logger.info("✅ ChromaDB 연결 테스트 성공")
except Exception as heartbeat_error:
logger.warning(f"⚠️ Heartbeat 실패 (무시): {heartbeat_error}")
def _initialize_collection(self, existing_db_valid: bool):
"""컬렉션 초기화"""
try:
if existing_db_valid:
# 기존 컬렉션 로드 시도
try:
self.collection = self.client.get_collection(name=self.collection_name)
count = self.collection.count()
logger.info(f"✅ 기존 컬렉션 로드 성공: {self.collection_name} ({count}개 벡터)")
return
except Exception as get_error:
logger.info(f"📝 기존 컬렉션 없음, 새로 생성: {get_error}")
# 새 컬렉션 생성
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={
"description": "Restaurant reviews vector store",
"created_at": datetime.now().isoformat(),
"version": "1.0"
}
)
logger.info(f"✅ 새 컬렉션 생성 성공: {self.collection_name}")
except Exception as create_error:
logger.error(f"❌ 컬렉션 초기화 실패: {create_error}")
# 대체 컬렉션명으로 재시도
fallback_name = f"{self.collection_name}_{int(time.time())}"
logger.info(f"🔄 대체 컬렉션명으로 재시도: {fallback_name}")
self.collection = self.client.create_collection(
name=fallback_name,
metadata={"description": "Restaurant reviews (fallback)"}
)
self.collection_name = fallback_name
logger.info(f"✅ 대체 컬렉션 생성 성공: {fallback_name}")
def _initialize_embedding_model(self):
"""임베딩 모델 초기화"""
try:
logger.info(f"🤖 임베딩 모델 로드 시작: {self.embedding_model_name}")
# 캐시 디렉토리 설정
cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "sentence_transformers")
os.makedirs(cache_dir, exist_ok=True)
# 권한 확인
if not os.access(cache_dir, os.W_OK):
import tempfile
cache_dir = tempfile.mkdtemp(prefix="st_cache_")
logger.info(f"🔄 임시 캐시 디렉토리 사용: {cache_dir}")
# 모델 로드 (타임아웃 설정)
def timeout_handler(signum, frame):
raise TimeoutError("임베딩 모델 로드 타임아웃")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300) # 5분 타임아웃
try:
self.embedding_model = SentenceTransformer(
self.embedding_model_name,
cache_folder=cache_dir,
device='cpu' # CPU 사용 명시
)
signal.alarm(0) # 타임아웃 해제
# 모델 테스트
test_embedding = self.embedding_model.encode(["테스트 문장"])
logger.info(f"✅ 임베딩 모델 로드 성공: {test_embedding.shape}")
except TimeoutError:
signal.alarm(0)
raise Exception("임베딩 모델 로드 타임아웃 (5분)")
except Exception as e:
logger.error(f"❌ 임베딩 모델 로드 실패: {e}")
raise
def is_ready(self) -> bool:
"""서비스 준비 상태 확인"""
return (
self.client is not None and
self.collection is not None and
self.embedding_model is not None and
self.initialization_error is None
)
def get_initialization_error(self) -> Optional[str]:
"""초기화 에러 메시지 반환"""
return self.initialization_error
def retry_initialization(self) -> bool:
"""초기화 재시도"""
try:
logger.info("🔄 VectorService 초기화 재시도...")
# 상태 초기화
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
# 재초기화
self._safe_initialize()
return self.is_ready()
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ 초기화 재시도 실패: {e}")
return False
def reset_vector_db(self) -> Dict[str, Any]:
"""Vector DB 완전 리셋"""
try:
logger.info("🔄 Vector DB 완전 리셋 시작...")
# 기존 클라이언트 정리
self.client = None
self.collection = None
# DB 디렉토리 완전 삭제
if os.path.exists(self.db_path):
shutil.rmtree(self.db_path, ignore_errors=True)
logger.info(f"✅ 기존 DB 디렉토리 삭제: {self.db_path}")
# 새 디렉토리 생성
os.makedirs(self.db_path, exist_ok=True)
logger.info(f"✅ 새 DB 디렉토리 생성: {self.db_path}")
# 재초기화
success = self.retry_initialization()
if success:
return {
"success": True,
"message": "Vector DB가 성공적으로 리셋되었습니다",
"collection_name": self.collection_name,
"db_path": self.db_path
}
else:
return {
"success": False,
"error": self.initialization_error or "재초기화 실패"
}
except Exception as e:
logger.error(f"❌ Vector DB 리셋 실패: {e}")
return {
"success": False,
"error": str(e)
}
def get_health_status(self) -> Dict[str, Any]:
"""서비스 상태 확인"""
try:
status = {
"service": "vector_db",
"status": "healthy" if self.is_ready() else "unhealthy",
"db_path": self.db_path,
"collection_name": self.collection_name,
"embedding_model": self.embedding_model_name,
"timestamp": datetime.now().isoformat()
}
if self.initialization_error:
status["initialization_error"] = self.initialization_error
status["status"] = "error"
# 상세 상태
status["components"] = {
"client": "connected" if self.client else "disconnected",
"collection": "ready" if self.collection else "not_ready",
"embedding": "loaded" if self.embedding_model else "not_loaded"
}
# 컬렉션 정보
if self.collection:
try:
status["collection_count"] = self.collection.count()
except Exception as e:
status["collection_error"] = str(e)
return status
except Exception as e:
return {
"service": "vector_db",
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def get_store_context(self, store_id: str) -> Optional[str]:
"""스토어 ID로 컨텍스트 조회"""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 스토어 ID로 검색
results = self.collection.get(
where={"store_id": store_id}
)
if not results or not results.get('documents'):
logger.warning(f"스토어 ID '{store_id}'에 대한 데이터 없음")
return None
# 컨텍스트 생성
documents = results['documents']
metadatas = results.get('metadatas', [])
context_parts = []
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
# 메타데이터 정보 추가
if metadata:
context_parts.append(f"[{metadata.get('store_name', 'Unknown')}]")
context_parts.append(doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"스토어 컨텍스트 조회 실패: {e}")
return None
async def build_vector_store(self, store_info: Dict[str, Any], similar_stores_data: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str) -> Dict[str, Any]:
"""Vector Store 구축 (완전 수정된 버전)"""
try:
if not self.is_ready():
# 재시도 한 번 더
if not self.retry_initialization():
raise Exception("Vector DB가 초기화되지 않았습니다")
logger.info(f"Vector Store 구축 시작: {len(similar_stores_data)}개 스토어")
# 통계 초기화
stats = {
"total_processed": 0,
"newly_added": 0,
"updated": 0,
"duplicates": 0,
"errors": 0
}
# 배치 처리용 리스트
all_documents = []
all_embeddings = []
all_metadatas = []
all_ids = []
# 각 스토어 처리
for store_id, store_data, reviews in similar_stores_data:
try:
# 데이터 검증
if not store_data or not reviews:
logger.warning(f"스토어 '{store_id}' 데이터 부족: store_data={bool(store_data)}, reviews={len(reviews) if reviews else 0}")
stats["errors"] += 1
continue
# 올바른 create_store_hash 호출
store_hash = create_store_hash(
store_id=store_id,
store_name=store_data.get('place_name', ''),
region=region
)
# ChromaDB에서 직접 중복 확인 (is_duplicate_store 함수 사용하지 않음)
try:
# 같은 store_id로 이미 저장된 데이터가 있는지 확인
existing_data = self.collection.get(
where={"store_id": store_id},
limit=1
)
if existing_data and len(existing_data.get('ids', [])) > 0:
logger.debug(f"중복 스토어 건너뛰기: {store_id}")
stats["duplicates"] += 1
continue
except Exception as dup_check_error:
# 중복 확인 실패는 로그만 남기고 계속 진행
logger.warning(f"중복 확인 실패 (계속 진행): {dup_check_error}")
# 올바른 extract_text_for_embedding 호출
embedding_text = extract_text_for_embedding(
store_info=store_data,
reviews=reviews
)
# 임베딩 생성
try:
embedding = self.embedding_model.encode(embedding_text)
embedding = embedding.tolist() # numpy array를 list로 변환
except Exception as embed_error:
logger.error(f"임베딩 생성 실패 (store_id: {store_id}): {embed_error}")
stats["errors"] += 1
continue
# 올바른 create_metadata 호출
metadata = create_metadata(
store_info=store_data,
food_category=food_category,
region=region
)
# 배치에 추가
all_documents.append(embedding_text)
all_embeddings.append(embedding)
all_metadatas.append(metadata)
all_ids.append(f"{store_id}_{store_hash}")
stats["total_processed"] += 1
stats["newly_added"] += 1
if stats["total_processed"] % 10 == 0:
logger.info(f"처리 진행률: {stats['total_processed']}/{len(similar_stores_data)}")
except Exception as store_error:
logger.error(f"음식점 처리 중 오류 (store_id: {store_id}): {store_error}")
stats["errors"] += 1
continue
# 배치로 벡터 저장
if all_documents:
logger.info(f"벡터 배치 저장 시작: {len(all_documents)}")
try:
self.collection.add(
documents=all_documents,
embeddings=all_embeddings,
metadatas=all_metadatas,
ids=all_ids
)
logger.info("✅ 벡터 배치 저장 성공")
except Exception as save_error:
logger.error(f"❌ 벡터 저장 실패: {save_error}")
return {
"success": False,
"error": f"벡터 저장 실패: {str(save_error)}",
"statistics": stats
}
else:
logger.warning("⚠️ 저장할 벡터 데이터가 없음")
# 최종 통계
try:
total_vectors = self.collection.count()
logger.info(f"✅ Vector Store 구축 완료: 총 {total_vectors}개 벡터")
except Exception as count_error:
logger.warning(f"벡터 개수 확인 실패: {count_error}")
total_vectors = len(all_documents)
return {
"success": True,
"message": "Vector Store 구축 완료",
"statistics": stats,
"total_vectors": total_vectors,
"store_info": store_info
}
except Exception as e:
logger.error(f"Vector Store 구축 전체 실패: {e}")
return {
"success": False,
"error": str(e),
"statistics": stats if 'stats' in locals() else {
"total_processed": 0,
"newly_added": 0,
"updated": 0,
"duplicates": 0,
"errors": 1
}
}
def get_db_status(self) -> Dict[str, Any]:
"""Vector DB 상태 정보를 반환합니다."""
try:
if not self.is_ready():
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'not_ready',
'initialization_error': self.initialization_error
}
# 문서 개수 확인
try:
total_documents = self.collection.count()
except Exception as e:
logger.warning(f"문서 개수 확인 실패: {e}")
total_documents = 0
# 고유 가게 수 확인 (store_id 기준)
try:
# 모든 메타데이터에서 고유 store_id 추출
all_metadata = self.collection.get()
store_ids = set()
if all_metadata.get('metadatas'):
for metadata in all_metadata['metadatas']:
store_id = metadata.get('store_id')
if store_id:
store_ids.add(store_id)
total_stores = len(store_ids)
except Exception as e:
logger.warning(f"가게 수 확인 실패: {e}")
total_stores = 0
return {
'collection_name': self.collection_name,
'total_documents': total_documents,
'total_stores': total_stores,
'db_path': self.db_path,
'status': 'ready'
}
except Exception as e:
logger.error(f"DB 상태 확인 실패: {e}")
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'error',
'error': str(e)
}
def search_similar_cases(self, store_id: str, context: str) -> Optional[str]:
"""유사한 케이스를 검색합니다."""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 컨텍스트 기반 유사 검색
try:
# 검색 쿼리를 임베딩으로 변환
query_embedding = self.embedding_model.encode(context)
query_embedding = query_embedding.tolist()
# 유사한 문서 검색 (상위 5개)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=['documents', 'metadatas']
)
if not results or not results.get('documents') or not results['documents'][0]:
logger.info("유사 케이스를 찾을 수 없음")
return None
# 컨텍스트 조합
context_parts = []
documents = results['documents'][0]
metadatas = results.get('metadatas', [[]])[0]
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
# 가게 정보 추가
store_name = metadata.get('store_name', 'Unknown')
food_category = metadata.get('food_category', 'Unknown')
context_parts.append(f"[{food_category} - {store_name}]")
context_parts.append(doc[:500] + "..." if len(doc) > 500 else doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as search_error:
logger.warning(f"벡터 검색 실패: {search_error}")
return None
except Exception as e:
logger.error(f"유사 케이스 검색 실패: {e}")
return None
def search_similar_cases_improved(self, store_id: str, context: str) -> Optional[str]:
"""
개선된 유사 케이스 검색
1. store_id 기반 필터링 우선 적용
2. 동종 업체 우선 검색
3. 캐싱 및 성능 최적화
"""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 1단계: 해당 가게의 정보 먼저 확인
store_context = self.get_store_context(store_id)
food_category = store_context.get('food_category', '') if store_context else ''
# 2단계: 검색 쿼리 구성 (가게 정보 + 컨텍스트)
enhanced_query = f"{food_category} {context}"
query_embedding = self.embedding_model.encode(enhanced_query).tolist()
# 3단계: 동종 업체 우선 검색 (메타데이터 필터링)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=10, # 더 많은 결과에서 필터링
include=['documents', 'metadatas', 'distances'],
where={"food_category": {"$eq": food_category}} if food_category else None
)
if not results or not results.get('documents') or not results['documents'][0]:
# 4단계: 동종 업체가 없으면 전체 검색
logger.info("동종 업체 없음 - 전체 검색으로 전환")
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=['documents', 'metadatas', 'distances']
)
if not results or not results.get('documents') or not results['documents'][0]:
logger.info("유사 케이스를 찾을 수 없음")
return None
# 5단계: 결과 조합 (관련성 높은 순서로)
context_parts = []
documents = results['documents'][0]
metadatas = results.get('metadatas', [[]])[0]
distances = results.get('distances', [[]])[0]
# 거리(유사도) 기준으로 필터링 (너무 관련성 낮은 것 제외)
filtered_results = []
for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)):
if distance < 0.8: # 유사도 임계값
filtered_results.append((doc, metadata, distance))
if not filtered_results:
return None
# 최대 3개의 가장 관련성 높은 케이스만 사용
for doc, metadata, distance in filtered_results[:3]:
store_name = metadata.get('store_name', 'Unknown')
food_cat = metadata.get('food_category', 'Unknown')
context_parts.append(f"[{food_cat} - {store_name}] (유사도: {1-distance:.2f})")
# 문서 길이 제한으로 토큰 수 최적화
context_parts.append(doc[:300] + "..." if len(doc) > 300 else doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"유사 케이스 검색 실패: {e}")
return None
def get_store_context(self, store_id: str) -> Optional[Dict[str, Any]]:
"""해당 가게의 컨텍스트 정보 조회 (캐싱 적용)"""
try:
if not self.is_ready():
return None
# 메타데이터에서 해당 store_id 검색
results = self.collection.get(
where={"store_id": {"$eq": store_id}},
limit=1,
include=['metadatas']
)
if results and results.get('metadatas') and len(results['metadatas']) > 0:
return results['metadatas'][0]
return None
except Exception as e:
logger.error(f"가게 컨텍스트 조회 실패: {e}")
return None