# app/services/vector_service.py import os import json import logging import tempfile from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import chromadb from chromadb.config import Settings as ChromaSettings from sentence_transformers import SentenceTransformer from ..config.settings import settings from ..utils.data_utils import ( create_store_hash, combine_store_and_reviews, generate_review_summary, extract_text_for_embedding, create_metadata, is_duplicate_store ) logger = logging.getLogger(__name__) class VectorService: """Vector DB 서비스""" def __init__(self): self.db_path = settings.VECTOR_DB_PATH self.collection_name = settings.VECTOR_DB_COLLECTION self.embedding_model_name = settings.EMBEDDING_MODEL # 상태 변수 self.client = None self.collection = None self.embedding_model = None self.initialization_error = None # 안전한 초기화 시도 self._safe_initialize() def _safe_initialize(self): """안전한 초기화""" try: logger.info("🔧 VectorService 초기화 시작...") # 1단계: 디렉토리 권한 확인 self._ensure_directory_permissions() # 2단계: ChromaDB 초기화 self._initialize_chromadb() # 3단계: 임베딩 모델 로드 self._initialize_embedding_model() logger.info("✅ VectorService 초기화 완료") except Exception as e: self.initialization_error = str(e) logger.error(f"❌ VectorService 초기화 실패: {e}") logger.info("🔄 서비스는 런타임에 재시도 가능합니다") def _ensure_directory_permissions(self): """Vector DB 디렉토리 권한을 확인하고 생성""" try: logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}") # 절대 경로로 변환 abs_path = os.path.abspath(self.db_path) # 디렉토리 생성 os.makedirs(abs_path, mode=0o755, exist_ok=True) # 권한 확인 if not os.access(abs_path, os.W_OK): logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}") # 권한 변경 시도 try: os.chmod(abs_path, 0o755) logger.info("✅ 디렉토리 권한 변경 성공") except Exception as chmod_error: logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}") # 임시 디렉토리로 대체 temp_dir = tempfile.mkdtemp(prefix="vectordb_") logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}") self.db_path = temp_dir abs_path = temp_dir # 테스트 파일 생성/삭제로 권한 확인 test_file = os.path.join(abs_path, "test_permissions.tmp") try: with open(test_file, 'w') as f: f.write("test") os.remove(test_file) logger.info("✅ 디렉토리 권한 확인 완료") except Exception as test_error: raise Exception(f"디렉토리 권한 테스트 실패: {test_error}") except Exception as e: raise Exception(f"디렉토리 설정 실패: {e}") def _initialize_chromadb(self): """ChromaDB 초기화""" try: logger.info("🔧 ChromaDB 클라이언트 초기화...") # ChromaDB 클라이언트 생성 self.client = chromadb.PersistentClient( path=self.db_path, settings=ChromaSettings( anonymized_telemetry=False, allow_reset=True, ) ) # Collection 가져오기 또는 생성 try: self.collection = self.client.get_collection(name=self.collection_name) logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}") except Exception: self.collection = self.client.create_collection( name=self.collection_name, metadata={"hnsw:space": "cosine"} ) logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}") except Exception as e: raise Exception(f"ChromaDB 초기화 실패: {e}") def _initialize_embedding_model(self): """임베딩 모델 초기화""" try: logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}") self.embedding_model = SentenceTransformer(self.embedding_model_name) logger.info("✅ 임베딩 모델 로드 완료") except Exception as e: raise Exception(f"임베딩 모델 로드 실패: {e}") def is_ready(self) -> bool: """서비스 준비 상태 확인""" return all([ self.client is not None, self.collection is not None, self.embedding_model is not None, self.initialization_error is None ]) async def build_vector_store( self, target_store_info: Dict[str, Any], review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str ) -> Dict[str, Any]: """Vector Store를 구축합니다""" if not self.is_ready(): raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}") try: logger.info("🚀 Vector Store 구축 시작") processed_count = 0 documents = [] embeddings = [] metadatas = [] ids = [] for store_id, store_info, reviews in review_results: try: # 텍스트 추출 및 임베딩 생성 text_for_embedding = extract_text_for_embedding(store_info, reviews) embedding = self.embedding_model.encode(text_for_embedding) # 메타데이터 생성 metadata = create_metadata(store_info, food_category, region) # 문서 ID 생성 document_id = create_store_hash(store_id, store_info.get('name', ''), region) # 문서 데이터 생성 document_text = combine_store_and_reviews(store_info, reviews) documents.append(document_text) embeddings.append(embedding.tolist()) metadatas.append(metadata) ids.append(document_id) processed_count += 1 except Exception as e: logger.warning(f"⚠️ 가게 {store_id} 처리 실패: {e}") continue # Vector DB에 저장 if documents: self.collection.add( documents=documents, embeddings=embeddings, metadatas=metadatas, ids=ids ) logger.info(f"✅ Vector Store 구축 완료: {processed_count}개 문서 저장") return { 'success': True, 'processed_count': processed_count, 'message': f"{processed_count}개 문서가 Vector DB에 저장되었습니다" } else: return { 'success': False, 'error': '저장할 문서가 없습니다', 'processed_count': 0 } except Exception as e: logger.error(f"❌ Vector Store 구축 실패: {e}") return { 'success': False, 'error': str(e), 'processed_count': 0 } def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[Dict[str, Any]]: """유사 케이스 검색 (대폭 개선된 버전) - 풍부한 데이터 반환 + 디버깅""" # 1단계: 서비스 준비 상태 확인 logger.info(f"🔧 VectorService 상태 확인...") logger.info(f" - Client: {self.client is not None}") logger.info(f" - Collection: {self.collection is not None}") logger.info(f" - Embedding Model: {self.embedding_model is not None}") logger.info(f" - Is Ready: {self.is_ready()}") if not self.is_ready(): logger.error(f"❌ VectorService가 준비되지 않음: {self.initialization_error}") # None 대신 기본 구조 반환 return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": f"VectorService 준비되지 않음: {self.initialization_error}" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["VectorService가 준비되지 않았습니다"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["VectorService 초기화 문제 해결 필요"], "strategic_improvements": [], "benchmarking_targets": [] } } try: # 2단계: Vector DB 데이터 개수 확인 try: count = self.collection.count() logger.info(f"📊 Vector DB 문서 수: {count}") if count == 0: logger.warning("❌ Vector DB에 데이터가 없습니다. find-reviews API로 데이터를 먼저 추가하세요!") return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": "Vector DB에 데이터가 없음" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["Vector DB에 데이터가 없어 분석할 수 없습니다"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["먼저 find-reviews API로 경쟁업체 데이터를 수집하세요"], "strategic_improvements": [], "benchmarking_targets": [] } } except Exception as count_error: logger.error(f"❌ 문서 수 확인 실패: {count_error}") return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": f"문서 수 확인 실패: {count_error}" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["Vector DB 상태 확인 실패"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["Vector DB 상태 점검 필요"], "strategic_improvements": [], "benchmarking_targets": [] } } # 3단계: 검색 쿼리 생성 및 실행 logger.info(f"🔍 Vector 검색 시작: store_id={store_id}, context='{context[:50]}...'") query_text = f"가게 ID: {store_id} 요청사항: {context}" query_embedding = self.embedding_model.encode(query_text) logger.info(f"✅ 임베딩 생성 완료: 차원={len(query_embedding)}") results = self.collection.query( query_embeddings=[query_embedding.tolist()], n_results=limit, include=['documents', 'metadatas', 'distances'] ) # 4단계: 검색 결과 확인 logger.info(f"📊 Vector DB 검색 결과:") logger.info(f" - 문서 수: {len(results.get('documents', [[]]))} 그룹") if not results or not results.get('documents') or not results['documents'][0]: logger.warning("❌ 검색 결과가 없습니다") return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat() }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["검색 결과가 없습니다"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["검색 결과가 없어 분석할 수 없습니다"], "strategic_improvements": [], "benchmarking_targets": [] } } documents = results['documents'][0] metadatas = results['metadatas'][0] distances = results['distances'][0] logger.info(f"✅ 검색 결과: {len(documents)}개 가게 발견") # 검색된 가게들 로깅 for i, metadata in enumerate(metadatas): store_name = metadata.get('store_name', 'Unknown') distance = distances[i] if distances else 0 logger.info(f" {i+1}. {store_name} (유사도: {1-distance:.3f})") # 5단계: 데이터 분석 시작 logger.info("🔬 데이터 분석 시작...") # 분석 결과 디버깅을 위한 try-catch try: enhanced_analysis = self._analyze_similar_stores(documents, metadatas, distances) logger.info(f"✅ 분석 완료: {len(enhanced_analysis.get('competitive_insights', []))}개 가게 분석") except Exception as analysis_error: logger.error(f"❌ 데이터 분석 실패: {analysis_error}") # 기본 분석 결과로 대체 enhanced_analysis = { "market_analysis": { "total_competitors": len(documents), "industry_insights": ["데이터 분석 중 오류 발생"], "performance_benchmarks": {} }, "competitive_insights": self._create_basic_insights(documents, metadatas, distances), "recommendations": { "immediate_actions": ["데이터 분석 오류로 기본 추천 제공"], "strategic_improvements": [], "benchmarking_targets": [] } } # 6단계: 구조화된 컨텍스트 생성 structured_context = { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat() }, "market_intelligence": enhanced_analysis['market_analysis'], "competitive_insights": enhanced_analysis['competitive_insights'], "actionable_recommendations": enhanced_analysis['recommendations'] } logger.info(f"✅ 구조화된 컨텍스트 생성 완료: {len(json.dumps(structured_context, ensure_ascii=False))} 문자") return structured_context except Exception as e: logger.error(f"❌ 유사 케이스 검색 실패: {e}") logger.error(f" 스택 트레이스: ", exc_info=True) # None 대신 오류 정보가 포함된 기본 구조 반환 return { "request_context": { "store_id": store_id, "owner_request": context, "analysis_timestamp": datetime.now().isoformat(), "error": f"검색 실패: {str(e)}" }, "market_intelligence": { "total_competitors": 0, "industry_insights": ["검색 중 오류 발생"], "performance_benchmarks": {} }, "competitive_insights": [], "actionable_recommendations": { "immediate_actions": ["시스템 오류로 분석 불가"], "strategic_improvements": [], "benchmarking_targets": [] } } def _create_basic_insights(self, documents: List[str], metadatas: List[Dict], distances: List[float]) -> List[Dict[str, Any]]: """분석 실패시 기본 인사이트 생성""" basic_insights = [] try: for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)): similarity_score = 1 - distance store_name = metadata.get('store_name', 'Unknown') category = metadata.get('food_category', 'Unknown') # 간단한 JSON 파싱 시도 try: store_data = json.loads(doc) store_info = store_data.get('store_info', {}) rating = store_info.get('rating', 'N/A') review_count = store_info.get('review_count', 'N/A') # 리뷰 요약에서 키워드 추출 review_summary = store_data.get('review_summary', {}) keywords = review_summary.get('common_keywords', []) except Exception as json_error: logger.warning(f"JSON 파싱 실패 for {store_name}: {json_error}") rating = 'N/A' review_count = 'N/A' keywords = [] basic_insight = { "rank": i + 1, "store_name": store_name, "category": category, "similarity_score": round(similarity_score, 3), "performance_analysis": { "performance": {"rating": rating, "review_count": review_count}, "feedback": {"positive_aspects": keywords[:3], "negative_aspects": []}, "business_insights": {"key_finding": "기본 분석만 가능"} } } basic_insights.append(basic_insight) except Exception as e: logger.error(f"기본 인사이트 생성 실패: {e}") return basic_insights def _analyze_similar_stores(self, documents: List[str], metadatas: List[Dict], distances: List[float]) -> Dict[str, Any]: """Vector DB 저장 데이터를 활용한 심층 분석 (디버깅 강화)""" try: logger.info(f"🔬 가게 분석 시작: {len(documents)}개 가게") market_analysis = { "total_competitors": len(documents), "industry_insights": [], "performance_benchmarks": {} } competitive_insights = [] recommendations = { "critical_issues": [], "success_patterns": [], "improvement_opportunities": [] } ratings = [] review_counts = [] # 각 유사 가게 분석 for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)): try: similarity_score = 1 - distance store_name = metadata.get('store_name', 'Unknown') category = metadata.get('food_category', 'Unknown') logger.info(f" 📋 가게 {i+1} 분석 중: {store_name}") # 가게 인사이트 추출 시도 try: store_analysis = self._extract_store_insights(doc, metadata) logger.info(f" ✅ 인사이트 추출 성공") except Exception as insight_error: logger.warning(f" ⚠️ 인사이트 추출 실패: {insight_error}") # 기본값으로 대체 store_analysis = { "performance": {"rating": "N/A", "review_count": "N/A"}, "feedback": {"positive_aspects": [], "negative_aspects": []}, "insights": {"key_finding": "분석 실패"} } competitive_insight = { "rank": i + 1, "store_name": store_name, "category": category, "similarity_score": round(similarity_score, 3), "performance_analysis": store_analysis } competitive_insights.append(competitive_insight) # 성능 지표 수집 if 'rating' in store_analysis['performance']: try: rating_value = float(store_analysis['performance']['rating']) ratings.append(rating_value) except: pass if 'review_count' in store_analysis['performance']: try: count_value = int(store_analysis['performance']['review_count']) review_counts.append(count_value) except: pass except Exception as e: logger.warning(f" ⚠️ 가게 {i} 분석 실패: {e}") continue logger.info(f"✅ 가게 분석 완료: {len(competitive_insights)}개 성공") # 시장 벤치마크 계산 if ratings: market_analysis["performance_benchmarks"] = { "average_rating": round(sum(ratings) / len(ratings), 2), "rating_range": {"min": min(ratings), "max": max(ratings)}, "industry_standard": "Above Average" if sum(ratings) / len(ratings) > 3.5 else "Below Average" } logger.info(f"📊 평점 벤치마크: 평균 {market_analysis['performance_benchmarks']['average_rating']}") if review_counts: market_analysis["performance_benchmarks"]["review_activity"] = { "average_reviews": round(sum(review_counts) / len(review_counts)), "range": {"min": min(review_counts), "max": max(review_counts)} } # 업계 인사이트 생성 try: market_analysis["industry_insights"] = self._generate_industry_insights(competitive_insights) logger.info(f"💡 업계 인사이트: {len(market_analysis['industry_insights'])}개 생성") except Exception as e: logger.warning(f"업계 인사이트 생성 실패: {e}") market_analysis["industry_insights"] = ["인사이트 생성 실패"] # 개선 추천사항 생성 try: recommendations = self._generate_recommendations(competitive_insights) logger.info(f"🎯 추천사항 생성: 즉시실행 {len(recommendations.get('immediate_actions', []))}개") except Exception as e: logger.warning(f"추천사항 생성 실패: {e}") recommendations = { "immediate_actions": ["추천사항 생성 실패"], "strategic_improvements": [], "benchmarking_targets": [] } result = { "market_analysis": market_analysis, "competitive_insights": competitive_insights, "recommendations": recommendations } logger.info("✅ 심층 분석 완료") return result except Exception as e: logger.error(f"❌ 가게 분석 실패: {e}") logger.error(f" 스택 트레이스: ", exc_info=True) return {"market_analysis": {}, "competitive_insights": [], "recommendations": {}} def _extract_store_insights(self, document: str, metadata: Dict) -> Dict[str, Any]: """개별 가게의 상세 인사이트 추출 (디버깅 강화)""" try: # document 내용 로깅 (처음 200자만) logger.info(f" 📄 문서 내용 샘플: {document[:200]}...") # JSON 파싱 시도 try: store_data = json.loads(document) logger.info(f" ✅ JSON 파싱 성공") except json.JSONDecodeError as json_error: logger.warning(f" ⚠️ JSON 파싱 실패: {json_error}") # JSON이 아닌 경우 메타데이터만 사용 return { "performance": { "rating": metadata.get('store_name', 'N/A'), "review_count": "JSON 파싱 실패", "status": "N/A" }, "feedback": {"positive_aspects": [], "negative_aspects": []}, "insights": {"key_finding": f"JSON 파싱 실패: {str(json_error)[:100]}"} } # 기본 성능 지표 store_info = store_data.get('store_info', {}) reviews = store_data.get('reviews', []) review_summary = store_data.get('review_summary', {}) logger.info(f" 📊 데이터 확인: 가게정보={bool(store_info)}, 리뷰={len(reviews)}개, 요약={bool(review_summary)}") performance = { "rating": store_info.get('rating', 'N/A'), "review_count": store_info.get('review_count', 'N/A'), "status": store_info.get('status', 'N/A') } # 리뷰 분석 try: feedback = self._analyze_reviews(reviews, review_summary) logger.info(f" 📝 리뷰 분석: 긍정={len(feedback.get('positive_aspects', []))}, 부정={len(feedback.get('negative_aspects', []))}") except Exception as e: logger.warning(f" ⚠️ 리뷰 분석 실패: {e}") feedback = {"positive_aspects": [], "negative_aspects": []} # 비즈니스 인사이트 try: insights = self._generate_business_insights(store_info, reviews, review_summary) logger.info(f" 💡 비즈니스 인사이트 생성 완료") except Exception as e: logger.warning(f" ⚠️ 비즈니스 인사이트 생성 실패: {e}") insights = {"key_finding": "인사이트 생성 실패"} return { "performance": performance, "feedback": feedback, "insights": insights } except Exception as e: logger.warning(f" ❌ 가게 인사이트 추출 실패: {e}") return { "performance": {"rating": "N/A", "review_count": "N/A"}, "feedback": {"positive_aspects": [], "negative_aspects": []}, "insights": {"key_finding": f"추출 실패: {str(e)[:100]}"} } def _analyze_reviews(self, reviews: List[Dict], review_summary: Dict) -> Dict[str, Any]: """리뷰 데이터 심층 분석""" try: positive_aspects = [] negative_aspects = [] recent_trends = [] # 평점 분포 분석 rating_dist = review_summary.get('rating_distribution', {}) total_reviews = review_summary.get('total_reviews', 0) # 양극화 지수 계산 if total_reviews > 0: polarization = (rating_dist.get('1', 0) + rating_dist.get('5', 0)) / total_reviews if polarization > 0.6: negative_aspects.append("고객 만족도 양극화 (일관성 부족)") # 공통 키워드 분석 common_keywords = review_summary.get('common_keywords', []) positive_aspects.extend(common_keywords) # 개별 리뷰 분석 (최근 5개) recent_reviews = sorted(reviews, key=lambda x: x.get('date', ''), reverse=True)[:5] for review in recent_reviews: rating = review.get('rating', 0) content = review.get('content', '').lower() if rating >= 4: badges = review.get('badges', []) positive_aspects.extend(badges) elif rating <= 2: # 부정적 키워드 추출 negative_keywords = self._extract_negative_keywords(content) negative_aspects.extend(negative_keywords) # 최근 트렌드 (시간 분석) if len(recent_reviews) >= 3: recent_ratings = [r.get('rating', 0) for r in recent_reviews[:3]] avg_recent = sum(recent_ratings) / len(recent_ratings) if avg_recent < 3.0: recent_trends.append("최근 평점 하락세") elif avg_recent > 4.0: recent_trends.append("최근 만족도 상승") return { "positive_aspects": list(set(positive_aspects))[:5], # 중복 제거, 상위 5개 "negative_aspects": list(set(negative_aspects))[:5], "recent_trends": recent_trends, "rating_pattern": self._analyze_rating_pattern(rating_dist) } except Exception as e: logger.warning(f"리뷰 분석 실패: {e}") return {"positive_aspects": [], "negative_aspects": [], "recent_trends": []} def _extract_negative_keywords(self, content: str) -> List[str]: """리뷰 내용에서 부정적 키워드 추출""" negative_patterns = [ ("불친절", "서비스 품질"), ("비싸", "가격 정책"), ("맛없", "음식 품질"), ("더럽", "위생 관리"), ("느리", "서비스 속도"), ("양이 적", "양 부족"), ("시끄럽", "매장 환경"), ("주차", "접근성") ] found_issues = [] for pattern, category in negative_patterns: if pattern in content: found_issues.append(category) return found_issues def _analyze_rating_pattern(self, rating_dist: Dict) -> str: """평점 분포 패턴 분석""" if not rating_dist: return "데이터 부족" total = sum(rating_dist.values()) if total == 0: return "리뷰 없음" high_ratings = rating_dist.get('5', 0) + rating_dist.get('4', 0) low_ratings = rating_dist.get('1', 0) + rating_dist.get('2', 0) high_ratio = high_ratings / total low_ratio = low_ratings / total if high_ratio > 0.7: return "안정적 고만족" elif low_ratio > 0.5: return "심각한 문제 존재" elif high_ratio + low_ratio > 0.7: return "양극화 패턴" else: return "평균적 평가" def _generate_business_insights(self, store_info: Dict, reviews: List[Dict], review_summary: Dict) -> Dict[str, Any]: """비즈니스 인사이트 생성""" try: insights = { "competitive_advantage": [], "critical_issues": [], "improvement_opportunities": [] } # 평점 기반 분석 try: rating = float(store_info.get('rating', 0)) review_count = int(store_info.get('review_count', 0)) if rating >= 4.0: insights["competitive_advantage"].append("높은 고객 만족도") elif rating < 3.0: insights["critical_issues"].append("평점 개선 시급") if review_count < 10: insights["improvement_opportunities"].append("온라인 리뷰 마케팅 필요") elif review_count > 100: insights["competitive_advantage"].append("활발한 고객 참여") except: pass # 리뷰 내용 기반 분석 common_keywords = review_summary.get('common_keywords', []) positive_keywords = ['맛', '친절', '깨끗', '빠름', '저렴'] negative_indicators = ['서비스', '가격', '위생'] for keyword in common_keywords: if keyword in positive_keywords: insights["competitive_advantage"].append(f"{keyword} 우수") elif keyword in negative_indicators: insights["critical_issues"].append(f"{keyword} 문제 지적") # 영업 상태 확인 status = store_info.get('status', '') if '영업 전' in status: insights["improvement_opportunities"].append("영업시간 확장 검토") return insights except Exception as e: logger.warning(f"비즈니스 인사이트 생성 실패: {e}") return {"competitive_advantage": [], "critical_issues": [], "improvement_opportunities": []} def _generate_industry_insights(self, competitive_insights: List[Dict]) -> List[str]: """업계 전체 인사이트 생성""" insights = [] try: # 평점 분석 ratings = [] for store in competitive_insights: rating_str = store['performance_analysis']['performance'].get('rating', '0') try: rating = float(rating_str) ratings.append(rating) except: continue if ratings: avg_rating = sum(ratings) / len(ratings) if avg_rating < 3.5: insights.append("업계 전반적으로 고객 만족도 개선 필요") elif avg_rating > 4.0: insights.append("경쟁이 치열한 고품질 시장") # 공통 문제점 분석 common_issues = {} for store in competitive_insights: issues = store['performance_analysis']['feedback'].get('negative_aspects', []) for issue in issues: common_issues[issue] = common_issues.get(issue, 0) + 1 if common_issues: top_issue = max(common_issues.items(), key=lambda x: x[1]) insights.append(f"업계 공통 이슈: {top_issue[0]}") # 성공 패턴 분석 high_performers = [s for s in competitive_insights if s.get('similarity_score', 0) > 0.8 and float(s['performance_analysis']['performance'].get('rating', 0)) > 4.0] if high_performers: insights.append(f"고성과 업체 {len(high_performers)}개 벤치마킹 가능") except Exception as e: logger.warning(f"업계 인사이트 생성 실패: {e}") return insights[:3] # 상위 3개만 def _generate_recommendations(self, competitive_insights: List[Dict]) -> Dict[str, List[str]]: """데이터 기반 추천사항 생성""" recommendations = { "immediate_actions": [], "strategic_improvements": [], "benchmarking_targets": [] } try: # 즉시 조치 사항 critical_issues = [] for store in competitive_insights: issues = store['performance_analysis']['feedback'].get('negative_aspects', []) critical_issues.extend(issues) # 가장 빈번한 문제 해결 if critical_issues: issue_count = {} for issue in critical_issues: issue_count[issue] = issue_count.get(issue, 0) + 1 top_issues = sorted(issue_count.items(), key=lambda x: x[1], reverse=True)[:2] for issue, count in top_issues: recommendations["immediate_actions"].append(f"{issue} 개선 (업계 {count}개 업체 공통 문제)") # 전략적 개선사항 best_practices = [] for store in competitive_insights: if store.get('similarity_score', 0) > 0.7: practices = store['performance_analysis']['feedback'].get('positive_aspects', []) best_practices.extend(practices) if best_practices: practice_count = {} for practice in best_practices: practice_count[practice] = practice_count.get(practice, 0) + 1 top_practices = sorted(practice_count.items(), key=lambda x: x[1], reverse=True)[:2] for practice, count in top_practices: recommendations["strategic_improvements"].append(f"{practice} 강화 (성공 업체들의 공통점)") # 벤치마킹 대상 top_performers = sorted(competitive_insights, key=lambda x: float(x['performance_analysis']['performance'].get('rating', 0)), reverse=True)[:2] for performer in top_performers: name = performer.get('store_name', 'Unknown') rating = performer['performance_analysis']['performance'].get('rating', 'N/A') advantages = performer['performance_analysis']['business_insights'].get('competitive_advantage', []) if advantages: recommendations["benchmarking_targets"].append(f"{name} (평점 {rating}): {', '.join(advantages[:2])}") except Exception as e: logger.warning(f"추천사항 생성 실패: {e}") return recommendations def get_db_status(self) -> Dict[str, Any]: """DB 상태 정보 반환""" try: if not self.is_ready(): return { 'collection_name': self.collection_name, 'total_documents': 0, 'total_stores': 0, 'db_path': self.db_path, 'status': 'not_ready', 'error': self.initialization_error } # 컬렉션 정보 조회 count = self.collection.count() return { 'collection_name': self.collection_name, 'total_documents': count, 'total_stores': count, # 각 문서가 하나의 가게를 나타냄 'db_path': self.db_path, 'status': 'ready', 'last_updated': datetime.now().isoformat() } except Exception as e: logger.error(f"DB 상태 조회 실패: {e}") return { 'collection_name': self.collection_name, 'total_documents': 0, 'total_stores': 0, 'db_path': self.db_path, 'status': 'error', 'error': str(e) }