# app/services/vector_service.py import os import json import logging import tempfile from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import chromadb from chromadb.config import Settings as ChromaSettings from sentence_transformers import SentenceTransformer from ..config.settings import settings from ..utils.data_utils import ( create_store_hash, combine_store_and_reviews, generate_review_summary, extract_text_for_embedding, create_metadata, is_duplicate_store ) logger = logging.getLogger(__name__) class VectorService: """Vector DB 서비스""" def __init__(self): self.db_path = settings.VECTOR_DB_PATH self.collection_name = settings.VECTOR_DB_COLLECTION self.embedding_model_name = settings.EMBEDDING_MODEL # 상태 변수 self.client = None self.collection = None self.embedding_model = None self.initialization_error = None # 안전한 초기화 시도 self._safe_initialize() def _safe_initialize(self): """안전한 초기화""" try: logger.info("🔧 VectorService 초기화 시작...") # 1단계: 디렉토리 권한 확인 self._ensure_directory_permissions() # 2단계: ChromaDB 초기화 self._initialize_chromadb() # 3단계: 임베딩 모델 로드 self._initialize_embedding_model() logger.info("✅ VectorService 초기화 완료") except Exception as e: self.initialization_error = str(e) logger.error(f"❌ VectorService 초기화 실패: {e}") logger.info("🔄 서비스는 런타임에 재시도 가능합니다") def _ensure_directory_permissions(self): """Vector DB 디렉토리 권한을 확인하고 생성""" try: logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}") # 절대 경로로 변환 abs_path = os.path.abspath(self.db_path) # 디렉토리 생성 os.makedirs(abs_path, mode=0o755, exist_ok=True) # 권한 확인 if not os.access(abs_path, os.W_OK): logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}") # 권한 변경 시도 try: os.chmod(abs_path, 0o755) logger.info("✅ 디렉토리 권한 변경 성공") except Exception as chmod_error: logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}") # 임시 디렉토리로 대체 temp_dir = tempfile.mkdtemp(prefix="vectordb_") logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}") self.db_path = temp_dir abs_path = temp_dir # 테스트 파일 생성/삭제로 권한 확인 test_file = os.path.join(abs_path, "test_permissions.tmp") try: with open(test_file, 'w') as f: f.write("test") os.remove(test_file) logger.info("✅ 디렉토리 권한 확인 완료") except Exception as test_error: raise Exception(f"디렉토리 권한 테스트 실패: {test_error}") except Exception as e: raise Exception(f"디렉토리 설정 실패: {e}") def _initialize_chromadb(self): """ChromaDB 초기화""" try: logger.info("🔧 ChromaDB 클라이언트 초기화...") # ChromaDB 클라이언트 생성 self.client = chromadb.PersistentClient( path=self.db_path, settings=ChromaSettings( anonymized_telemetry=False, allow_reset=True, ) ) # Collection 가져오기 또는 생성 try: self.collection = self.client.get_collection(name=self.collection_name) logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}") except Exception: self.collection = self.client.create_collection( name=self.collection_name, metadata={"hnsw:space": "cosine"} ) logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}") except Exception as e: raise Exception(f"ChromaDB 초기화 실패: {e}") def _initialize_embedding_model(self): """임베딩 모델 초기화""" try: logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}") self.embedding_model = SentenceTransformer(self.embedding_model_name) logger.info("✅ 임베딩 모델 로드 완료") except Exception as e: raise Exception(f"임베딩 모델 로드 실패: {e}") def is_ready(self) -> bool: """서비스 준비 상태 확인""" return all([ self.client is not None, self.collection is not None, self.embedding_model is not None, self.initialization_error is None ]) async def build_vector_store( self, target_store_info: Dict[str, Any], review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str ) -> Dict[str, Any]: """Vector Store를 구축합니다 (기존 데이터 업데이트 포함)""" if not self.is_ready(): raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}") try: logger.info("🔧 Vector Store 구축 시작 (기존 데이터 업데이트 포함)...") start_time = datetime.now() # 1단계: 가게 분류 (새 가게 vs 업데이트 vs 중복) logger.info("🔍 가게 분류 중...") categorization = self.categorize_stores(review_results, region, food_category) # 2단계: 기존 가게 업데이트 update_result = {'updated_count': 0, 'skipped_count': 0} if categorization['update_stores']: logger.info(f"🔄 {len(categorization['update_stores'])}개 기존 가게 업데이트 중...") update_result = self.update_existing_stores( categorization['update_stores'], food_category, region ) # 3단계: 새 가게 추가 add_result = {'added_count': 0} if categorization['new_stores']: logger.info(f"✨ {len(categorization['new_stores'])}개 새 가게 추가 중...") add_result = self.add_new_stores( categorization['new_stores'], food_category, region ) # 4단계: 결과 정리 total_processed = update_result['updated_count'] + add_result['added_count'] execution_time = (datetime.now() - start_time).total_seconds() result = { 'success': True, 'processed_count': total_processed, # ← 기존 API 호환성 유지 'execution_time': execution_time, 'summary': categorization['summary'], 'operations': { 'new_stores_added': add_result['added_count'], 'existing_stores_updated': update_result['updated_count'], 'update_skipped': update_result.get('skipped_count', 0), 'duplicates_removed': categorization['summary']['duplicate_count'] }, 'message': f"✅ Vector DB 처리 완료: 새 가게 {add_result['added_count']}개 추가, " f"기존 가게 {update_result['updated_count']}개 업데이트" } logger.info(f"📊 Vector Store 구축 완료:") logger.info(f" - 총 처리: {total_processed}개") logger.info(f" - 새 가게: {add_result['added_count']}개") logger.info(f" - 업데이트: {update_result['updated_count']}개") logger.info(f" - 실행 시간: {execution_time:.2f}초") return result except Exception as e: logger.error(f"❌ Vector Store 구축 실패: {e}") return { 'success': False, 'error': str(e), 'processed_count': 0 # ← 기존 API 호환성 유지 } def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[Dict[str, Any]]: """유사 케이스 검색 (대폭 개선된 버전) - 풍부한 데이터 반환 + 디버깅""" # 1단계: 서비스 준비 상태 확인 logger.info(f"🔧 VectorService 상태 확인...") logger.info(f" - Client: {self.client is not None}") logger.info(f" - Collection: {self.collection is not None}") logger.info(f" - Embedding Model: {self.embedding_model is not None}") logger.info(f" - Is Ready: {self.is_ready()}") if not self.is_ready(): logger.error(f"❌ VectorService가 준비되지 않음: {self.initialization_error}") # None 대신 기본 구조 반환 return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": f"VectorService 준비되지 않음: {self.initialization_error}" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["VectorService가 준비되지 않았습니다"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["VectorService 초기화 문제 해결 필요"], "strategic_improvements": [], "benchmarking_targets": [] } } try: # 2단계: Vector DB 데이터 개수 확인 try: count = self.collection.count() logger.info(f"📊 Vector DB 문서 수: {count}") if count == 0: logger.warning("❌ Vector DB에 데이터가 없습니다. find-reviews API로 데이터를 먼저 추가하세요!") return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": "Vector DB에 데이터가 없음" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["Vector DB에 데이터가 없어 분석할 수 없습니다"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["먼저 find-reviews API로 경쟁업체 데이터를 수집하세요"], "strategic_improvements": [], "benchmarking_targets": [] } } except Exception as count_error: logger.error(f"❌ 문서 수 확인 실패: {count_error}") return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": f"문서 수 확인 실패: {count_error}" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["Vector DB 상태 확인 실패"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["Vector DB 상태 점검 필요"], "strategic_improvements": [], "benchmarking_targets": [] } } # 3단계: 검색 쿼리 생성 및 실행 logger.info(f"🔍 Vector 검색 시작: store_id={store_id}, context='{context[:50]}...'") query_text = f"가게 ID: {store_id} 요청사항: {context}" query_embedding = self.embedding_model.encode(query_text) logger.info(f"✅ 임베딩 생성 완료: 차원={len(query_embedding)}") results = self.collection.query( query_embeddings=[query_embedding.tolist()], n_results=limit, include=['documents', 'metadatas', 'distances'] ) # 4단계: 검색 결과 확인 logger.info(f"📊 Vector DB 검색 결과:") logger.info(f" - 문서 수: {len(results.get('documents', [[]]))} 그룹") if not results or not results.get('documents') or not results['documents'][0]: logger.warning("❌ 검색 결과가 없습니다") return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat() }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["검색 결과가 없습니다"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["검색 결과가 없어 분석할 수 없습니다"], "strategic_improvements": [], "benchmarking_targets": [] } } documents = results['documents'][0] metadatas = results['metadatas'][0] distances = results['distances'][0] logger.info(f"✅ 검색 결과: {len(documents)}개 가게 발견") # 검색된 가게들 로깅 for i, metadata in enumerate(metadatas): store_name = metadata.get('store_name', 'Unknown') distance = distances[i] if distances else 0 logger.info(f" {i+1}. {store_name} (유사도: {1-distance:.3f})") # 5단계: 데이터 분석 시작 logger.info("🔬 데이터 분석 시작...") # 분석 결과 디버깅을 위한 try-catch try: enhanced_analysis = self._analyze_similar_stores(documents, metadatas, distances) logger.info(f"✅ 분석 완료: {len(enhanced_analysis.get('competitive_insights', []))}개 가게 분석") except Exception as analysis_error: logger.error(f"❌ 데이터 분석 실패: {analysis_error}") # 기본 분석 결과로 대체 enhanced_analysis = { "market_analysis": { "total_competitors": len(documents), "industry_insights": ["데이터 분석 중 오류 발생"], "performance_benchmarks": {} }, "competitive_insights": self._create_basic_insights(documents, metadatas, distances), "recommendations": { "immediate_actions": ["데이터 분석 오류로 기본 추천 제공"], "strategic_improvements": [], "benchmarking_targets": [] } } # 6단계: 구조화된 컨텍스트 생성 structured_context = { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat() }, "market_intelligence": enhanced_analysis['market_analysis'], "competitive_insights": enhanced_analysis['competitive_insights'], "actionable_recommendations": enhanced_analysis['recommendations'] } logger.info(f"✅ 구조화된 컨텍스트 생성 완료: {len(json.dumps(structured_context, ensure_ascii=False))} 문자") return structured_context except Exception as e: logger.error(f"❌ 유사 케이스 검색 실패: {e}") logger.error(f" 스택 트레이스: ", exc_info=True) # None 대신 오류 정보가 포함된 기본 구조 반환 return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": f"검색 실패: {str(e)}" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["검색 중 오류 발생"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["시스템 오류로 분석 불가"], "strategic_improvements": [], "benchmarking_targets": [] } } def _create_basic_insights(self, documents: List[str], metadatas: List[Dict], distances: List[float]) -> List[Dict[str, Any]]: """분석 실패시 기본 인사이트 생성""" basic_insights = [] try: for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)): similarity_score = 1 - distance store_name = metadata.get('store_name', 'Unknown') category = metadata.get('food_category', 'Unknown') # 간단한 JSON 파싱 시도 try: store_data = json.loads(doc) store_info = store_data.get('store_info', {}) rating = store_info.get('rating', 'N/A') review_count = store_info.get('review_count', 'N/A') # 리뷰 요약에서 키워드 추출 review_summary = store_data.get('review_summary', {}) keywords = review_summary.get('common_keywords', []) except Exception as json_error: logger.warning(f"JSON 파싱 실패 for {store_name}: {json_error}") rating = 'N/A' review_count = 'N/A' keywords = [] basic_insight = { "rank": i + 1, "store_name": store_name, "category": category, "similarity_score": round(similarity_score, 3), "performance_analysis": { "performance": {"rating": rating, "review_count": review_count}, "feedback": {"positive_aspects": keywords[:3], "negative_aspects": []}, "business_insights": {"key_finding": "기본 분석만 가능"} } } basic_insights.append(basic_insight) except Exception as e: logger.error(f"기본 인사이트 생성 실패: {e}") return basic_insights def _analyze_similar_stores(self, documents: List[str], metadatas: List[Dict], distances: List[float]) -> Dict[str, Any]: """Vector DB 저장 데이터를 활용한 심층 분석 (디버깅 강화)""" try: logger.info(f"🔬 가게 분석 시작: {len(documents)}개 가게") market_analysis = { "total_competitors": len(documents), "industry_insights": [], "performance_benchmarks": {} } competitive_insights = [] recommendations = { "critical_issues": [], "success_patterns": [], "improvement_opportunities": [] } ratings = [] review_counts = [] # 각 유사 가게 분석 for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)): try: similarity_score = 1 - distance store_name = metadata.get('store_name', 'Unknown') category = metadata.get('food_category', 'Unknown') logger.info(f" 📋 가게 {i+1} 분석 중: {store_name}") # 가게 인사이트 추출 시도 try: store_analysis = self._extract_store_insights(doc, metadata) logger.info(f" ✅ 인사이트 추출 성공") except Exception as insight_error: logger.warning(f" ⚠️ 인사이트 추출 실패: {insight_error}") # 기본값으로 대체 store_analysis = { "performance": {"rating": "N/A", "review_count": "N/A"}, "feedback": {"positive_aspects": [], "negative_aspects": []}, "insights": {"key_finding": "분석 실패"} } competitive_insight = { "rank": i + 1, "store_name": store_name, "category": category, "similarity_score": round(similarity_score, 3), "performance_analysis": store_analysis } competitive_insights.append(competitive_insight) # 성능 지표 수집 if 'rating' in store_analysis['performance']: try: rating_value = float(store_analysis['performance']['rating']) ratings.append(rating_value) except: pass if 'review_count' in store_analysis['performance']: try: count_value = int(store_analysis['performance']['review_count']) review_counts.append(count_value) except: pass except Exception as e: logger.warning(f" ⚠️ 가게 {i} 분석 실패: {e}") continue logger.info(f"✅ 가게 분석 완료: {len(competitive_insights)}개 성공") # 시장 벤치마크 계산 if ratings: market_analysis["performance_benchmarks"] = { "average_rating": round(sum(ratings) / len(ratings), 2), "rating_range": {"min": min(ratings), "max": max(ratings)}, "industry_standard": "Above Average" if sum(ratings) / len(ratings) > 3.5 else "Below Average" } logger.info(f"📊 평점 벤치마크: 평균 {market_analysis['performance_benchmarks']['average_rating']}") if review_counts: market_analysis["performance_benchmarks"]["review_activity"] = { "average_reviews": round(sum(review_counts) / len(review_counts)), "range": {"min": min(review_counts), "max": max(review_counts)} } # 업계 인사이트 생성 try: market_analysis["industry_insights"] = self._generate_industry_insights(competitive_insights) logger.info(f"💡 업계 인사이트: {len(market_analysis['industry_insights'])}개 생성") except Exception as e: logger.warning(f"업계 인사이트 생성 실패: {e}") market_analysis["industry_insights"] = ["인사이트 생성 실패"] # 개선 추천사항 생성 try: recommendations = self._generate_recommendations(competitive_insights) logger.info(f"🎯 추천사항 생성: 즉시실행 {len(recommendations.get('immediate_actions', []))}개") except Exception as e: logger.warning(f"추천사항 생성 실패: {e}") recommendations = { "immediate_actions": ["추천사항 생성 실패"], "strategic_improvements": [], "benchmarking_targets": [] } result = { "market_analysis": market_analysis, "competitive_insights": competitive_insights, "recommendations": recommendations } logger.info("✅ 심층 분석 완료") return result except Exception as e: logger.error(f"❌ 가게 분석 실패: {e}") logger.error(f" 스택 트레이스: ", exc_info=True) return {"market_analysis": {}, "competitive_insights": [], "recommendations": {}} def _extract_store_insights(self, document: str, metadata: Dict) -> Dict[str, Any]: """개별 가게의 상세 인사이트 추출 (디버깅 강화)""" try: # document 내용 로깅 (처음 200자만) logger.info(f" 📄 문서 내용 샘플: {document[:200]}...") # JSON 파싱 시도 try: store_data = json.loads(document) logger.info(f" ✅ JSON 파싱 성공") except json.JSONDecodeError as json_error: logger.warning(f" ⚠️ JSON 파싱 실패: {json_error}") # JSON이 아닌 경우 메타데이터만 사용 return { "performance": { "rating": metadata.get('store_name', 'N/A'), "review_count": "JSON 파싱 실패", "status": "N/A" }, "feedback": {"positive_aspects": [], "negative_aspects": []}, "insights": {"key_finding": f"JSON 파싱 실패: {str(json_error)[:100]}"} } # 기본 성능 지표 store_info = store_data.get('store_info', {}) reviews = store_data.get('reviews', []) review_summary = store_data.get('review_summary', {}) logger.info(f" 📊 데이터 확인: 가게정보={bool(store_info)}, 리뷰={len(reviews)}개, 요약={bool(review_summary)}") performance = { "rating": store_info.get('rating', 'N/A'), "review_count": store_info.get('review_count', 'N/A'), "status": store_info.get('status', 'N/A') } # 리뷰 분석 try: feedback = self._analyze_reviews(reviews, review_summary) logger.info(f" 📝 리뷰 분석: 긍정={len(feedback.get('positive_aspects', []))}, 부정={len(feedback.get('negative_aspects', []))}") except Exception as e: logger.warning(f" ⚠️ 리뷰 분석 실패: {e}") feedback = {"positive_aspects": [], "negative_aspects": []} # 비즈니스 인사이트 try: insights = self._generate_business_insights(store_info, reviews, review_summary) logger.info(f" 💡 비즈니스 인사이트 생성 완료") except Exception as e: logger.warning(f" ⚠️ 비즈니스 인사이트 생성 실패: {e}") insights = {"key_finding": "인사이트 생성 실패"} return { "performance": performance, "feedback": feedback, "insights": insights } except Exception as e: logger.warning(f" ❌ 가게 인사이트 추출 실패: {e}") return { "performance": {"rating": "N/A", "review_count": "N/A"}, "feedback": {"positive_aspects": [], "negative_aspects": []}, "insights": {"key_finding": f"추출 실패: {str(e)[:100]}"} } def _analyze_reviews(self, reviews: List[Dict], review_summary: Dict) -> Dict[str, Any]: """리뷰 데이터 심층 분석""" try: positive_aspects = [] negative_aspects = [] recent_trends = [] # 평점 분포 분석 rating_dist = review_summary.get('rating_distribution', {}) total_reviews = review_summary.get('total_reviews', 0) # 양극화 지수 계산 if total_reviews > 0: polarization = (rating_dist.get('1', 0) + rating_dist.get('5', 0)) / total_reviews if polarization > 0.6: negative_aspects.append("고객 만족도 양극화 (일관성 부족)") # 공통 키워드 분석 common_keywords = review_summary.get('common_keywords', []) positive_aspects.extend(common_keywords) # 개별 리뷰 분석 (최근 5개) recent_reviews = sorted(reviews, key=lambda x: x.get('date', ''), reverse=True)[:5] for review in recent_reviews: rating = review.get('rating', 0) content = review.get('content', '').lower() if rating >= 4: badges = review.get('badges', []) positive_aspects.extend(badges) elif rating <= 2: # 부정적 키워드 추출 negative_keywords = self._extract_negative_keywords(content) negative_aspects.extend(negative_keywords) # 최근 트렌드 (시간 분석) if len(recent_reviews) >= 3: recent_ratings = [r.get('rating', 0) for r in recent_reviews[:3]] avg_recent = sum(recent_ratings) / len(recent_ratings) if avg_recent < 3.0: recent_trends.append("최근 평점 하락세") elif avg_recent > 4.0: recent_trends.append("최근 만족도 상승") return { "positive_aspects": list(set(positive_aspects))[:5], # 중복 제거, 상위 5개 "negative_aspects": list(set(negative_aspects))[:5], "recent_trends": recent_trends, "rating_pattern": self._analyze_rating_pattern(rating_dist) } except Exception as e: logger.warning(f"리뷰 분석 실패: {e}") return {"positive_aspects": [], "negative_aspects": [], "recent_trends": []} def _extract_negative_keywords(self, content: str) -> List[str]: """리뷰 내용에서 부정적 키워드 추출""" negative_patterns = [ ("불친절", "서비스 품질"), ("비싸", "가격 정책"), ("맛없", "음식 품질"), ("더럽", "위생 관리"), ("느리", "서비스 속도"), ("양이 적", "양 부족"), ("시끄럽", "매장 환경"), ("주차", "접근성") ] found_issues = [] for pattern, category in negative_patterns: if pattern in content: found_issues.append(category) return found_issues def _analyze_rating_pattern(self, rating_dist: Dict) -> str: """평점 분포 패턴 분석""" if not rating_dist: return "데이터 부족" total = sum(rating_dist.values()) if total == 0: return "리뷰 없음" high_ratings = rating_dist.get('5', 0) + rating_dist.get('4', 0) low_ratings = rating_dist.get('1', 0) + rating_dist.get('2', 0) high_ratio = high_ratings / total low_ratio = low_ratings / total if high_ratio > 0.7: return "안정적 고만족" elif low_ratio > 0.5: return "심각한 문제 존재" elif high_ratio + low_ratio > 0.7: return "양극화 패턴" else: return "평균적 평가" def _generate_business_insights(self, store_info: Dict, reviews: List[Dict], review_summary: Dict) -> Dict[str, Any]: """비즈니스 인사이트 생성""" try: insights = { "competitive_advantage": [], "critical_issues": [], "improvement_opportunities": [] } # 평점 기반 분석 try: rating = float(store_info.get('rating', 0)) review_count = int(store_info.get('review_count', 0)) if rating >= 4.0: insights["competitive_advantage"].append("높은 고객 만족도") elif rating < 3.0: insights["critical_issues"].append("평점 개선 시급") if review_count < 10: insights["improvement_opportunities"].append("온라인 리뷰 마케팅 필요") elif review_count > 100: insights["competitive_advantage"].append("활발한 고객 참여") except: pass # 리뷰 내용 기반 분석 common_keywords = review_summary.get('common_keywords', []) positive_keywords = ['맛', '친절', '깨끗', '빠름', '저렴'] negative_indicators = ['서비스', '가격', '위생'] for keyword in common_keywords: if keyword in positive_keywords: insights["competitive_advantage"].append(f"{keyword} 우수") elif keyword in negative_indicators: insights["critical_issues"].append(f"{keyword} 문제 지적") # 영업 상태 확인 status = store_info.get('status', '') if '영업 전' in status: insights["improvement_opportunities"].append("영업시간 확장 검토") return insights except Exception as e: logger.warning(f"비즈니스 인사이트 생성 실패: {e}") return {"competitive_advantage": [], "critical_issues": [], "improvement_opportunities": []} def _generate_industry_insights(self, competitive_insights: List[Dict]) -> List[str]: """업계 전체 인사이트 생성""" insights = [] try: # 평점 분석 ratings = [] for store in competitive_insights: rating_str = store['performance_analysis']['performance'].get('rating', '0') try: rating = float(rating_str) ratings.append(rating) except: continue if ratings: avg_rating = sum(ratings) / len(ratings) if avg_rating < 3.5: insights.append("업계 전반적으로 고객 만족도 개선 필요") elif avg_rating > 4.0: insights.append("경쟁이 치열한 고품질 시장") # 공통 문제점 분석 common_issues = {} for store in competitive_insights: issues = store['performance_analysis']['feedback'].get('negative_aspects', []) for issue in issues: common_issues[issue] = common_issues.get(issue, 0) + 1 if common_issues: top_issue = max(common_issues.items(), key=lambda x: x[1]) insights.append(f"업계 공통 이슈: {top_issue[0]}") # 성공 패턴 분석 high_performers = [s for s in competitive_insights if s.get('similarity_score', 0) > 0.8 and float(s['performance_analysis']['performance'].get('rating', 0)) > 4.0] if high_performers: insights.append(f"고성과 업체 {len(high_performers)}개 벤치마킹 가능") except Exception as e: logger.warning(f"업계 인사이트 생성 실패: {e}") return insights[:3] # 상위 3개만 def _generate_recommendations(self, competitive_insights: List[Dict]) -> Dict[str, List[str]]: """데이터 기반 추천사항 생성""" recommendations = { "immediate_actions": [], "strategic_improvements": [], "benchmarking_targets": [] } try: # 즉시 조치 사항 critical_issues = [] for store in competitive_insights: issues = store['performance_analysis']['feedback'].get('negative_aspects', []) critical_issues.extend(issues) # 가장 빈번한 문제 해결 if critical_issues: issue_count = {} for issue in critical_issues: issue_count[issue] = issue_count.get(issue, 0) + 1 top_issues = sorted(issue_count.items(), key=lambda x: x[1], reverse=True)[:2] for issue, count in top_issues: recommendations["immediate_actions"].append(f"{issue} 개선 (업계 {count}개 업체 공통 문제)") # 전략적 개선사항 best_practices = [] for store in competitive_insights: if store.get('similarity_score', 0) > 0.7: practices = store['performance_analysis']['feedback'].get('positive_aspects', []) best_practices.extend(practices) if best_practices: practice_count = {} for practice in best_practices: practice_count[practice] = practice_count.get(practice, 0) + 1 top_practices = sorted(practice_count.items(), key=lambda x: x[1], reverse=True)[:2] for practice, count in top_practices: recommendations["strategic_improvements"].append(f"{practice} 강화 (성공 업체들의 공통점)") # 벤치마킹 대상 top_performers = sorted(competitive_insights, key=lambda x: float(x['performance_analysis']['performance'].get('rating', 0)), reverse=True)[:2] for performer in top_performers: name = performer.get('store_name', 'Unknown') rating = performer['performance_analysis']['performance'].get('rating', 'N/A') advantages = performer['performance_analysis']['business_insights'].get('competitive_advantage', []) if advantages: recommendations["benchmarking_targets"].append(f"{name} (평점 {rating}): {', '.join(advantages[:2])}") except Exception as e: logger.warning(f"추천사항 생성 실패: {e}") return recommendations def get_db_status(self, include_store_ids: bool = True, store_limit: int = 200) -> Dict[str, Any]: """ DB 상태 정보 반환 - store_id 목록 포함 Args: include_store_ids: store_id 목록 포함 여부 store_limit: store_id 목록 최대 개수 """ try: if not self.is_ready(): return { 'collection_name': self.collection_name, 'total_documents': 0, 'total_stores': 0, 'db_path': self.db_path, 'status': 'not_ready', 'error': self.initialization_error, 'store_ids': [] } # 기본 컬렉션 정보 조회 count = self.collection.count() # store_id 목록 조회 (옵션) store_ids = [] if include_store_ids: store_ids = self.get_all_store_ids(limit=store_limit) return { 'collection_name': self.collection_name, 'total_documents': count, 'total_stores': len(store_ids) if include_store_ids else count, 'db_path': self.db_path, 'status': 'ready', 'last_updated': datetime.now().isoformat(), # 새로 추가되는 정보 'store_ids': store_ids } except Exception as e: logger.error(f"DB 상태 조회 실패: {e}") return { 'collection_name': self.collection_name, 'total_documents': 0, 'total_stores': 0, 'db_path': self.db_path, 'status': 'error', 'error': str(e), 'store_ids': [] } def get_existing_store_data(self, region: str = None, food_category: str = None) -> Dict[str, Dict[str, Any]]: """ Vector DB에서 기존 store 데이터를 가져옵니다. Args: region: 지역 필터 (선택사항) food_category: 음식 카테고리 필터 (선택사항) Returns: {store_id: {metadata, document_id}} 형태의 딕셔너리 """ try: if not self.is_ready(): logger.warning("⚠️ VectorService가 준비되지 않음") return {} # 필터 조건 설정 where_condition = {} if region and food_category: where_condition = { "$and": [ {"region": region}, {"food_category": food_category} ] } elif region: where_condition = {"region": region} elif food_category: where_condition = {"food_category": food_category} # 기존 데이터 조회 if where_condition: results = self.collection.get( where=where_condition, include=['metadatas', 'documents'] ) else: results = self.collection.get( include=['metadatas', 'documents'] ) if not results or not results.get('metadatas'): logger.info("📊 Vector DB에 기존 데이터 없음") return {} # store_id별로 기존 데이터 매핑 existing_data = {} metadatas = results.get('metadatas', []) ids = results.get('ids', []) documents = results.get('documents', []) for i, metadata in enumerate(metadatas): if metadata and 'store_id' in metadata: store_id = metadata['store_id'] existing_data[store_id] = { 'metadata': metadata, 'document_id': ids[i] if i < len(ids) else None, 'document': documents[i] if i < len(documents) else None, 'last_updated': metadata.get('last_updated', 'unknown') } logger.info(f"📊 기존 Vector DB에서 {len(existing_data)}개 store 데이터 발견") return existing_data except Exception as e: logger.error(f"❌ 기존 store 데이터 조회 실패: {e}") return {} def categorize_stores(self, review_results: List[tuple], region: str, food_category: str) -> Dict[str, Any]: """ 가게들을 새 가게/업데이트 대상/현재 배치 중복으로 분류 """ # 기존 Vector DB 데이터 조회 existing_data = self.get_existing_store_data(region, food_category) existing_store_ids = set(existing_data.keys()) # 분류 작업 new_stores = [] # 완전히 새로운 가게 update_stores = [] # 기존 가게 업데이트 current_duplicates = [] # 현재 배치 내 중복 seen_in_batch = set() for store_id, store_info, reviews in review_results: # 현재 배치 내 중복 체크 if store_id in seen_in_batch: current_duplicates.append((store_id, store_info, reviews)) logger.info(f"🔄 현재 배치 중복 발견: {store_id}") continue seen_in_batch.add(store_id) # 기존 DB에 있는지 확인 if store_id in existing_store_ids: # 업데이트 대상 update_stores.append({ 'store_id': store_id, 'new_data': (store_id, store_info, reviews), 'existing_data': existing_data[store_id] }) logger.info(f"🔄 업데이트 대상: {store_id}") else: # 새 가게 new_stores.append((store_id, store_info, reviews)) logger.info(f"✨ 새 가게: {store_id}") result = { 'new_stores': new_stores, 'update_stores': update_stores, 'current_duplicates': current_duplicates, 'existing_data': existing_data, 'summary': { 'total_input': len(review_results), 'new_count': len(new_stores), 'update_count': len(update_stores), 'duplicate_count': len(current_duplicates), 'will_process': len(new_stores) + len(update_stores) } } logger.info(f"📊 가게 분류 완료:") logger.info(f" - 새 가게: {len(new_stores)}개") logger.info(f" - 업데이트: {len(update_stores)}개") logger.info(f" - 현재 중복: {len(current_duplicates)}개") return result def update_existing_stores(self, update_stores: List[Dict], food_category: str, region: str) -> Dict[str, Any]: """ 기존 가게들을 업데이트 """ updated_count = 0 skipped_count = 0 update_details = [] for update_item in update_stores: store_id = update_item['store_id'] new_data = update_item['new_data'] existing_data = update_item['existing_data'] try: _, store_info, reviews = new_data # 새 임베딩 및 메타데이터 생성 text_for_embedding = extract_text_for_embedding(store_info, reviews) if not text_for_embedding or len(text_for_embedding.strip()) < 10: logger.warning(f"⚠️ {store_id}: 임베딩 텍스트 부족, 업데이트 스킵") skipped_count += 1 continue embedding = self.embedding_model.encode(text_for_embedding) new_metadata = create_metadata(store_info, food_category, region) document_text = combine_store_and_reviews(store_info, reviews) # 기존 document_id 사용 (ID 일관성 유지) document_id = existing_data['document_id'] # Vector DB 업데이트 (ChromaDB에서는 upsert 사용) self.collection.upsert( documents=[document_text], embeddings=[embedding.tolist()], metadatas=[new_metadata], ids=[document_id] ) updated_count += 1 update_details.append({ 'store_id': store_id, 'document_id': document_id, 'previous_updated': existing_data['metadata'].get('last_updated', 'unknown'), 'new_updated': new_metadata['last_updated'] }) logger.info(f"✅ {store_id} 업데이트 완료") except Exception as e: logger.error(f"❌ {store_id} 업데이트 실패: {e}") skipped_count += 1 continue return { 'updated_count': updated_count, 'skipped_count': skipped_count, 'update_details': update_details } def add_new_stores(self, new_stores: List[tuple], food_category: str, region: str) -> Dict[str, Any]: """ 새 가게들을 Vector DB에 추가 """ if not new_stores: return {'added_count': 0, 'add_details': []} documents = [] embeddings = [] metadatas = [] ids = [] added_count = 0 add_details = [] for store_id, store_info, reviews in new_stores: try: # 임베딩용 텍스트 생성 text_for_embedding = extract_text_for_embedding(store_info, reviews) if not text_for_embedding or len(text_for_embedding.strip()) < 10: logger.warning(f"⚠️ {store_id}: 임베딩 텍스트 부족, 추가 스킵") continue # 임베딩 생성 embedding = self.embedding_model.encode(text_for_embedding) # 메타데이터 생성 metadata = create_metadata(store_info, food_category, region) # 문서 ID 생성 document_id = create_store_hash(store_id, store_info.get('name', ''), region) # 문서 데이터 생성 document_text = combine_store_and_reviews(store_info, reviews) documents.append(document_text) embeddings.append(embedding.tolist()) metadatas.append(metadata) ids.append(document_id) add_details.append({ 'store_id': store_id, 'document_id': document_id, 'added_at': metadata['last_updated'] }) added_count += 1 except Exception as e: logger.warning(f"⚠️ {store_id} 추가 실패: {e}") continue # Vector DB에 새 가게들 추가 if documents: self.collection.add( documents=documents, embeddings=embeddings, metadatas=metadatas, ids=ids ) logger.info(f"✅ {added_count}개 새 가게 추가 완료") return { 'added_count': added_count, 'add_details': add_details } def get_store_by_id(self, store_id: str) -> Optional[Dict[str, Any]]: """ store_id로 매장 정보를 조회합니다. Args: store_id: 조회할 매장 ID Returns: 매장 정보 딕셔너리 또는 None """ try: if not self.is_ready(): logger.warning("⚠️ VectorService가 준비되지 않음") return None logger.info(f"🔍 매장 정보 조회 시작: store_id={store_id}") # Vector DB에서 해당 store_id로 검색 # ✅ 수정: include에서 'ids' 제거 (ChromaDB는 자동으로 ids를 반환함) results = self.collection.get( where={"store_id": store_id}, include=['documents', 'metadatas'] # 'ids' 제거 ) if not results or not results.get('metadatas') or not results['metadatas']: logger.warning(f"❌ 매장 정보를 찾을 수 없음: store_id={store_id}") return None # 첫 번째 결과 사용 (store_id는 유니크해야 함) metadata = results['metadatas'][0] document = results['documents'][0] if results.get('documents') else None # ✅ 수정: ChromaDB는 자동으로 ids를 반환하므로 그대로 사용 document_id = results['ids'][0] if results.get('ids') else None logger.info(f"✅ 매장 정보 조회 성공: {metadata.get('store_name', 'Unknown')}") # 문서에서 JSON 파싱 (combine_store_and_reviews 형태로 저장되어 있음) store_data = None if document: try: import json store_data = json.loads(document) except json.JSONDecodeError as e: logger.error(f"❌ JSON 파싱 실패: {e}") return None # 응답 데이터 구성 result = { "metadata": metadata, "document_id": document_id, "store_info": store_data.get('store_info') if store_data else None, "reviews": store_data.get('reviews', []) if store_data else [], "review_summary": store_data.get('review_summary', {}) if store_data else {}, "combined_at": store_data.get('combined_at') if store_data else None } return result except Exception as e: logger.error(f"❌ 매장 정보 조회 실패: store_id={store_id}, error={e}") return None def get_all_store_ids(self, limit: int = 200) -> List[str]: """ Vector DB에 저장된 모든 store_id 목록을 조회합니다. Args: limit: 반환할 최대 store_id 수 (기본값: 200) Returns: store_id 문자열 리스트 """ try: if not self.is_ready(): logger.warning("⚠️ VectorService가 준비되지 않음") return [] logger.info(f"🏪 전체 store_id 목록 조회 시작 (최대 {limit}개)") # Vector DB에서 모든 메타데이터 조회 results = self.collection.get( include=['metadatas'], limit=limit # 성능을 위한 제한 ) if not results or not results.get('metadatas'): logger.info("📊 Vector DB에 저장된 매장이 없습니다") return [] # store_id만 추출 및 중복 제거 store_ids = [] seen_ids = set() for metadata in results['metadatas']: if not metadata: continue store_id = metadata.get('store_id', '') if store_id and store_id not in seen_ids: store_ids.append(store_id) seen_ids.add(store_id) # store_id로 정렬 (숫자 순서) store_ids.sort() logger.info(f"✅ store_id 목록 조회 완료: {len(store_ids)}개") return store_ids except Exception as e: logger.error(f"❌ store_id 목록 조회 실패: {e}") return []