ai-review/vector/app/services/vector_service.py
2025-06-16 03:38:45 +00:00

1181 lines
52 KiB
Python

# app/services/vector_service.py
import os
import json
import logging
import tempfile
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import chromadb
from chromadb.config import Settings as ChromaSettings
from sentence_transformers import SentenceTransformer
from ..config.settings import settings
from ..utils.data_utils import (
create_store_hash, combine_store_and_reviews, generate_review_summary,
extract_text_for_embedding, create_metadata, is_duplicate_store
)
logger = logging.getLogger(__name__)
class VectorService:
"""Vector DB 서비스"""
def __init__(self):
self.db_path = settings.VECTOR_DB_PATH
self.collection_name = settings.VECTOR_DB_COLLECTION
self.embedding_model_name = settings.EMBEDDING_MODEL
# 상태 변수
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
# 안전한 초기화 시도
self._safe_initialize()
def _safe_initialize(self):
"""안전한 초기화"""
try:
logger.info("🔧 VectorService 초기화 시작...")
# 1단계: 디렉토리 권한 확인
self._ensure_directory_permissions()
# 2단계: ChromaDB 초기화
self._initialize_chromadb()
# 3단계: 임베딩 모델 로드
self._initialize_embedding_model()
logger.info("✅ VectorService 초기화 완료")
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ VectorService 초기화 실패: {e}")
logger.info("🔄 서비스는 런타임에 재시도 가능합니다")
def _ensure_directory_permissions(self):
"""Vector DB 디렉토리 권한을 확인하고 생성"""
try:
logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}")
# 절대 경로로 변환
abs_path = os.path.abspath(self.db_path)
# 디렉토리 생성
os.makedirs(abs_path, mode=0o755, exist_ok=True)
# 권한 확인
if not os.access(abs_path, os.W_OK):
logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}")
# 권한 변경 시도
try:
os.chmod(abs_path, 0o755)
logger.info("✅ 디렉토리 권한 변경 성공")
except Exception as chmod_error:
logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}")
# 임시 디렉토리로 대체
temp_dir = tempfile.mkdtemp(prefix="vectordb_")
logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}")
self.db_path = temp_dir
abs_path = temp_dir
# 테스트 파일 생성/삭제로 권한 확인
test_file = os.path.join(abs_path, "test_permissions.tmp")
try:
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logger.info("✅ 디렉토리 권한 확인 완료")
except Exception as test_error:
raise Exception(f"디렉토리 권한 테스트 실패: {test_error}")
except Exception as e:
raise Exception(f"디렉토리 설정 실패: {e}")
def _initialize_chromadb(self):
"""ChromaDB 초기화"""
try:
logger.info("🔧 ChromaDB 클라이언트 초기화...")
# ChromaDB 클라이언트 생성
self.client = chromadb.PersistentClient(
path=self.db_path,
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True,
)
)
# Collection 가져오기 또는 생성
try:
self.collection = self.client.get_collection(name=self.collection_name)
logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}")
except Exception:
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}")
except Exception as e:
raise Exception(f"ChromaDB 초기화 실패: {e}")
def _initialize_embedding_model(self):
"""임베딩 모델 초기화"""
try:
logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}")
self.embedding_model = SentenceTransformer(self.embedding_model_name)
logger.info("✅ 임베딩 모델 로드 완료")
except Exception as e:
raise Exception(f"임베딩 모델 로드 실패: {e}")
def is_ready(self) -> bool:
"""서비스 준비 상태 확인"""
return all([
self.client is not None,
self.collection is not None,
self.embedding_model is not None,
self.initialization_error is None
])
async def build_vector_store(
self,
target_store_info: Dict[str, Any],
review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]],
food_category: str,
region: str
) -> Dict[str, Any]:
"""Vector Store를 구축합니다 (기존 데이터 업데이트 포함)"""
if not self.is_ready():
raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}")
try:
logger.info("🔧 Vector Store 구축 시작 (기존 데이터 업데이트 포함)...")
start_time = datetime.now()
# 1단계: 가게 분류 (새 가게 vs 업데이트 vs 중복)
logger.info("🔍 가게 분류 중...")
categorization = self.categorize_stores(review_results, region, food_category)
# 2단계: 기존 가게 업데이트
update_result = {'updated_count': 0, 'skipped_count': 0}
if categorization['update_stores']:
logger.info(f"🔄 {len(categorization['update_stores'])}개 기존 가게 업데이트 중...")
update_result = self.update_existing_stores(
categorization['update_stores'], food_category, region
)
# 3단계: 새 가게 추가
add_result = {'added_count': 0}
if categorization['new_stores']:
logger.info(f"{len(categorization['new_stores'])}개 새 가게 추가 중...")
add_result = self.add_new_stores(
categorization['new_stores'], food_category, region
)
# 4단계: 결과 정리
total_processed = update_result['updated_count'] + add_result['added_count']
execution_time = (datetime.now() - start_time).total_seconds()
result = {
'success': True,
'processed_count': total_processed, # ← 기존 API 호환성 유지
'execution_time': execution_time,
'summary': categorization['summary'],
'operations': {
'new_stores_added': add_result['added_count'],
'existing_stores_updated': update_result['updated_count'],
'update_skipped': update_result.get('skipped_count', 0),
'duplicates_removed': categorization['summary']['duplicate_count']
},
'message': f"✅ Vector DB 처리 완료: 새 가게 {add_result['added_count']}개 추가, "
f"기존 가게 {update_result['updated_count']}개 업데이트"
}
logger.info(f"📊 Vector Store 구축 완료:")
logger.info(f" - 총 처리: {total_processed}")
logger.info(f" - 새 가게: {add_result['added_count']}")
logger.info(f" - 업데이트: {update_result['updated_count']}")
logger.info(f" - 실행 시간: {execution_time:.2f}")
return result
except Exception as e:
logger.error(f"❌ Vector Store 구축 실패: {e}")
return {
'success': False,
'error': str(e),
'processed_count': 0 # ← 기존 API 호환성 유지
}
def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[Dict[str, Any]]:
"""유사 케이스 검색 (대폭 개선된 버전) - 풍부한 데이터 반환 + 디버깅"""
# 1단계: 서비스 준비 상태 확인
logger.info(f"🔧 VectorService 상태 확인...")
logger.info(f" - Client: {self.client is not None}")
logger.info(f" - Collection: {self.collection is not None}")
logger.info(f" - Embedding Model: {self.embedding_model is not None}")
logger.info(f" - Is Ready: {self.is_ready()}")
if not self.is_ready():
logger.error(f"❌ VectorService가 준비되지 않음: {self.initialization_error}")
# None 대신 기본 구조 반환
return {
"request_context": {
"store_id": store_id,
"owner_request": context,
"analysis_timestamp": datetime.now().isoformat(),
"error": f"VectorService 준비되지 않음: {self.initialization_error}"
},
"market_intelligence": {
"total_competitors": 0,
"industry_insights": ["VectorService가 준비되지 않았습니다"],
"performance_benchmarks": {}
},
"competitive_insights": [],
"actionable_recommendations": {
"immediate_actions": ["VectorService 초기화 문제 해결 필요"],
"strategic_improvements": [],
"benchmarking_targets": []
}
}
try:
# 2단계: Vector DB 데이터 개수 확인
try:
count = self.collection.count()
logger.info(f"📊 Vector DB 문서 수: {count}")
if count == 0:
logger.warning("❌ Vector DB에 데이터가 없습니다. find-reviews API로 데이터를 먼저 추가하세요!")
return {
"request_context": {
"store_id": store_id,
"owner_request": context,
"analysis_timestamp": datetime.now().isoformat(),
"error": "Vector DB에 데이터가 없음"
},
"market_intelligence": {
"total_competitors": 0,
"industry_insights": ["Vector DB에 데이터가 없어 분석할 수 없습니다"],
"performance_benchmarks": {}
},
"competitive_insights": [],
"actionable_recommendations": {
"immediate_actions": ["먼저 find-reviews API로 경쟁업체 데이터를 수집하세요"],
"strategic_improvements": [],
"benchmarking_targets": []
}
}
except Exception as count_error:
logger.error(f"❌ 문서 수 확인 실패: {count_error}")
return {
"request_context": {
"store_id": store_id,
"owner_request": context,
"analysis_timestamp": datetime.now().isoformat(),
"error": f"문서 수 확인 실패: {count_error}"
},
"market_intelligence": {
"total_competitors": 0,
"industry_insights": ["Vector DB 상태 확인 실패"],
"performance_benchmarks": {}
},
"competitive_insights": [],
"actionable_recommendations": {
"immediate_actions": ["Vector DB 상태 점검 필요"],
"strategic_improvements": [],
"benchmarking_targets": []
}
}
# 3단계: 검색 쿼리 생성 및 실행
logger.info(f"🔍 Vector 검색 시작: store_id={store_id}, context='{context[:50]}...'")
query_text = f"가게 ID: {store_id} 요청사항: {context}"
query_embedding = self.embedding_model.encode(query_text)
logger.info(f"✅ 임베딩 생성 완료: 차원={len(query_embedding)}")
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=limit,
include=['documents', 'metadatas', 'distances']
)
# 4단계: 검색 결과 확인
logger.info(f"📊 Vector DB 검색 결과:")
logger.info(f" - 문서 수: {len(results.get('documents', [[]]))} 그룹")
if not results or not results.get('documents') or not results['documents'][0]:
logger.warning("❌ 검색 결과가 없습니다")
return {
"request_context": {
"store_id": store_id,
"owner_request": context,
"analysis_timestamp": datetime.now().isoformat()
},
"market_intelligence": {
"total_competitors": 0,
"industry_insights": ["검색 결과가 없습니다"],
"performance_benchmarks": {}
},
"competitive_insights": [],
"actionable_recommendations": {
"immediate_actions": ["검색 결과가 없어 분석할 수 없습니다"],
"strategic_improvements": [],
"benchmarking_targets": []
}
}
documents = results['documents'][0]
metadatas = results['metadatas'][0]
distances = results['distances'][0]
logger.info(f"✅ 검색 결과: {len(documents)}개 가게 발견")
# 검색된 가게들 로깅
for i, metadata in enumerate(metadatas):
store_name = metadata.get('store_name', 'Unknown')
distance = distances[i] if distances else 0
logger.info(f" {i+1}. {store_name} (유사도: {1-distance:.3f})")
# 5단계: 데이터 분석 시작
logger.info("🔬 데이터 분석 시작...")
# 분석 결과 디버깅을 위한 try-catch
try:
enhanced_analysis = self._analyze_similar_stores(documents, metadatas, distances)
logger.info(f"✅ 분석 완료: {len(enhanced_analysis.get('competitive_insights', []))}개 가게 분석")
except Exception as analysis_error:
logger.error(f"❌ 데이터 분석 실패: {analysis_error}")
# 기본 분석 결과로 대체
enhanced_analysis = {
"market_analysis": {
"total_competitors": len(documents),
"industry_insights": ["데이터 분석 중 오류 발생"],
"performance_benchmarks": {}
},
"competitive_insights": self._create_basic_insights(documents, metadatas, distances),
"recommendations": {
"immediate_actions": ["데이터 분석 오류로 기본 추천 제공"],
"strategic_improvements": [],
"benchmarking_targets": []
}
}
# 6단계: 구조화된 컨텍스트 생성
structured_context = {
"request_context": {
"store_id": store_id,
"owner_request": context,
"analysis_timestamp": datetime.now().isoformat()
},
"market_intelligence": enhanced_analysis['market_analysis'],
"competitive_insights": enhanced_analysis['competitive_insights'],
"actionable_recommendations": enhanced_analysis['recommendations']
}
logger.info(f"✅ 구조화된 컨텍스트 생성 완료: {len(json.dumps(structured_context, ensure_ascii=False))} 문자")
return structured_context
except Exception as e:
logger.error(f"❌ 유사 케이스 검색 실패: {e}")
logger.error(f" 스택 트레이스: ", exc_info=True)
# None 대신 오류 정보가 포함된 기본 구조 반환
return {
"request_context": {
"store_id": store_id,
"owner_request": context,
"analysis_timestamp": datetime.now().isoformat(),
"error": f"검색 실패: {str(e)}"
},
"market_intelligence": {
"total_competitors": 0,
"industry_insights": ["검색 중 오류 발생"],
"performance_benchmarks": {}
},
"competitive_insights": [],
"actionable_recommendations": {
"immediate_actions": ["시스템 오류로 분석 불가"],
"strategic_improvements": [],
"benchmarking_targets": []
}
}
def _create_basic_insights(self, documents: List[str], metadatas: List[Dict], distances: List[float]) -> List[Dict[str, Any]]:
"""분석 실패시 기본 인사이트 생성"""
basic_insights = []
try:
for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)):
similarity_score = 1 - distance
store_name = metadata.get('store_name', 'Unknown')
category = metadata.get('food_category', 'Unknown')
# 간단한 JSON 파싱 시도
try:
store_data = json.loads(doc)
store_info = store_data.get('store_info', {})
rating = store_info.get('rating', 'N/A')
review_count = store_info.get('review_count', 'N/A')
# 리뷰 요약에서 키워드 추출
review_summary = store_data.get('review_summary', {})
keywords = review_summary.get('common_keywords', [])
except Exception as json_error:
logger.warning(f"JSON 파싱 실패 for {store_name}: {json_error}")
rating = 'N/A'
review_count = 'N/A'
keywords = []
basic_insight = {
"rank": i + 1,
"store_name": store_name,
"category": category,
"similarity_score": round(similarity_score, 3),
"performance_analysis": {
"performance": {"rating": rating, "review_count": review_count},
"feedback": {"positive_aspects": keywords[:3], "negative_aspects": []},
"business_insights": {"key_finding": "기본 분석만 가능"}
}
}
basic_insights.append(basic_insight)
except Exception as e:
logger.error(f"기본 인사이트 생성 실패: {e}")
return basic_insights
def _analyze_similar_stores(self, documents: List[str], metadatas: List[Dict], distances: List[float]) -> Dict[str, Any]:
"""Vector DB 저장 데이터를 활용한 심층 분석 (디버깅 강화)"""
try:
logger.info(f"🔬 가게 분석 시작: {len(documents)}개 가게")
market_analysis = {
"total_competitors": len(documents),
"industry_insights": [],
"performance_benchmarks": {}
}
competitive_insights = []
recommendations = {
"critical_issues": [],
"success_patterns": [],
"improvement_opportunities": []
}
ratings = []
review_counts = []
# 각 유사 가게 분석
for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)):
try:
similarity_score = 1 - distance
store_name = metadata.get('store_name', 'Unknown')
category = metadata.get('food_category', 'Unknown')
logger.info(f" 📋 가게 {i+1} 분석 중: {store_name}")
# 가게 인사이트 추출 시도
try:
store_analysis = self._extract_store_insights(doc, metadata)
logger.info(f" ✅ 인사이트 추출 성공")
except Exception as insight_error:
logger.warning(f" ⚠️ 인사이트 추출 실패: {insight_error}")
# 기본값으로 대체
store_analysis = {
"performance": {"rating": "N/A", "review_count": "N/A"},
"feedback": {"positive_aspects": [], "negative_aspects": []},
"insights": {"key_finding": "분석 실패"}
}
competitive_insight = {
"rank": i + 1,
"store_name": store_name,
"category": category,
"similarity_score": round(similarity_score, 3),
"performance_analysis": store_analysis
}
competitive_insights.append(competitive_insight)
# 성능 지표 수집
if 'rating' in store_analysis['performance']:
try:
rating_value = float(store_analysis['performance']['rating'])
ratings.append(rating_value)
except:
pass
if 'review_count' in store_analysis['performance']:
try:
count_value = int(store_analysis['performance']['review_count'])
review_counts.append(count_value)
except:
pass
except Exception as e:
logger.warning(f" ⚠️ 가게 {i} 분석 실패: {e}")
continue
logger.info(f"✅ 가게 분석 완료: {len(competitive_insights)}개 성공")
# 시장 벤치마크 계산
if ratings:
market_analysis["performance_benchmarks"] = {
"average_rating": round(sum(ratings) / len(ratings), 2),
"rating_range": {"min": min(ratings), "max": max(ratings)},
"industry_standard": "Above Average" if sum(ratings) / len(ratings) > 3.5 else "Below Average"
}
logger.info(f"📊 평점 벤치마크: 평균 {market_analysis['performance_benchmarks']['average_rating']}")
if review_counts:
market_analysis["performance_benchmarks"]["review_activity"] = {
"average_reviews": round(sum(review_counts) / len(review_counts)),
"range": {"min": min(review_counts), "max": max(review_counts)}
}
# 업계 인사이트 생성
try:
market_analysis["industry_insights"] = self._generate_industry_insights(competitive_insights)
logger.info(f"💡 업계 인사이트: {len(market_analysis['industry_insights'])}개 생성")
except Exception as e:
logger.warning(f"업계 인사이트 생성 실패: {e}")
market_analysis["industry_insights"] = ["인사이트 생성 실패"]
# 개선 추천사항 생성
try:
recommendations = self._generate_recommendations(competitive_insights)
logger.info(f"🎯 추천사항 생성: 즉시실행 {len(recommendations.get('immediate_actions', []))}")
except Exception as e:
logger.warning(f"추천사항 생성 실패: {e}")
recommendations = {
"immediate_actions": ["추천사항 생성 실패"],
"strategic_improvements": [],
"benchmarking_targets": []
}
result = {
"market_analysis": market_analysis,
"competitive_insights": competitive_insights,
"recommendations": recommendations
}
logger.info("✅ 심층 분석 완료")
return result
except Exception as e:
logger.error(f"❌ 가게 분석 실패: {e}")
logger.error(f" 스택 트레이스: ", exc_info=True)
return {"market_analysis": {}, "competitive_insights": [], "recommendations": {}}
def _extract_store_insights(self, document: str, metadata: Dict) -> Dict[str, Any]:
"""개별 가게의 상세 인사이트 추출 (디버깅 강화)"""
try:
# document 내용 로깅 (처음 200자만)
logger.info(f" 📄 문서 내용 샘플: {document[:200]}...")
# JSON 파싱 시도
try:
store_data = json.loads(document)
logger.info(f" ✅ JSON 파싱 성공")
except json.JSONDecodeError as json_error:
logger.warning(f" ⚠️ JSON 파싱 실패: {json_error}")
# JSON이 아닌 경우 메타데이터만 사용
return {
"performance": {
"rating": metadata.get('store_name', 'N/A'),
"review_count": "JSON 파싱 실패",
"status": "N/A"
},
"feedback": {"positive_aspects": [], "negative_aspects": []},
"insights": {"key_finding": f"JSON 파싱 실패: {str(json_error)[:100]}"}
}
# 기본 성능 지표
store_info = store_data.get('store_info', {})
reviews = store_data.get('reviews', [])
review_summary = store_data.get('review_summary', {})
logger.info(f" 📊 데이터 확인: 가게정보={bool(store_info)}, 리뷰={len(reviews)}개, 요약={bool(review_summary)}")
performance = {
"rating": store_info.get('rating', 'N/A'),
"review_count": store_info.get('review_count', 'N/A'),
"status": store_info.get('status', 'N/A')
}
# 리뷰 분석
try:
feedback = self._analyze_reviews(reviews, review_summary)
logger.info(f" 📝 리뷰 분석: 긍정={len(feedback.get('positive_aspects', []))}, 부정={len(feedback.get('negative_aspects', []))}")
except Exception as e:
logger.warning(f" ⚠️ 리뷰 분석 실패: {e}")
feedback = {"positive_aspects": [], "negative_aspects": []}
# 비즈니스 인사이트
try:
insights = self._generate_business_insights(store_info, reviews, review_summary)
logger.info(f" 💡 비즈니스 인사이트 생성 완료")
except Exception as e:
logger.warning(f" ⚠️ 비즈니스 인사이트 생성 실패: {e}")
insights = {"key_finding": "인사이트 생성 실패"}
return {
"performance": performance,
"feedback": feedback,
"insights": insights
}
except Exception as e:
logger.warning(f" ❌ 가게 인사이트 추출 실패: {e}")
return {
"performance": {"rating": "N/A", "review_count": "N/A"},
"feedback": {"positive_aspects": [], "negative_aspects": []},
"insights": {"key_finding": f"추출 실패: {str(e)[:100]}"}
}
def _analyze_reviews(self, reviews: List[Dict], review_summary: Dict) -> Dict[str, Any]:
"""리뷰 데이터 심층 분석"""
try:
positive_aspects = []
negative_aspects = []
recent_trends = []
# 평점 분포 분석
rating_dist = review_summary.get('rating_distribution', {})
total_reviews = review_summary.get('total_reviews', 0)
# 양극화 지수 계산
if total_reviews > 0:
polarization = (rating_dist.get('1', 0) + rating_dist.get('5', 0)) / total_reviews
if polarization > 0.6:
negative_aspects.append("고객 만족도 양극화 (일관성 부족)")
# 공통 키워드 분석
common_keywords = review_summary.get('common_keywords', [])
positive_aspects.extend(common_keywords)
# 개별 리뷰 분석 (최근 5개)
recent_reviews = sorted(reviews, key=lambda x: x.get('date', ''), reverse=True)[:5]
for review in recent_reviews:
rating = review.get('rating', 0)
content = review.get('content', '').lower()
if rating >= 4:
badges = review.get('badges', [])
positive_aspects.extend(badges)
elif rating <= 2:
# 부정적 키워드 추출
negative_keywords = self._extract_negative_keywords(content)
negative_aspects.extend(negative_keywords)
# 최근 트렌드 (시간 분석)
if len(recent_reviews) >= 3:
recent_ratings = [r.get('rating', 0) for r in recent_reviews[:3]]
avg_recent = sum(recent_ratings) / len(recent_ratings)
if avg_recent < 3.0:
recent_trends.append("최근 평점 하락세")
elif avg_recent > 4.0:
recent_trends.append("최근 만족도 상승")
return {
"positive_aspects": list(set(positive_aspects))[:5], # 중복 제거, 상위 5개
"negative_aspects": list(set(negative_aspects))[:5],
"recent_trends": recent_trends,
"rating_pattern": self._analyze_rating_pattern(rating_dist)
}
except Exception as e:
logger.warning(f"리뷰 분석 실패: {e}")
return {"positive_aspects": [], "negative_aspects": [], "recent_trends": []}
def _extract_negative_keywords(self, content: str) -> List[str]:
"""리뷰 내용에서 부정적 키워드 추출"""
negative_patterns = [
("불친절", "서비스 품질"),
("비싸", "가격 정책"),
("맛없", "음식 품질"),
("더럽", "위생 관리"),
("느리", "서비스 속도"),
("양이 적", "양 부족"),
("시끄럽", "매장 환경"),
("주차", "접근성")
]
found_issues = []
for pattern, category in negative_patterns:
if pattern in content:
found_issues.append(category)
return found_issues
def _analyze_rating_pattern(self, rating_dist: Dict) -> str:
"""평점 분포 패턴 분석"""
if not rating_dist:
return "데이터 부족"
total = sum(rating_dist.values())
if total == 0:
return "리뷰 없음"
high_ratings = rating_dist.get('5', 0) + rating_dist.get('4', 0)
low_ratings = rating_dist.get('1', 0) + rating_dist.get('2', 0)
high_ratio = high_ratings / total
low_ratio = low_ratings / total
if high_ratio > 0.7:
return "안정적 고만족"
elif low_ratio > 0.5:
return "심각한 문제 존재"
elif high_ratio + low_ratio > 0.7:
return "양극화 패턴"
else:
return "평균적 평가"
def _generate_business_insights(self, store_info: Dict, reviews: List[Dict], review_summary: Dict) -> Dict[str, Any]:
"""비즈니스 인사이트 생성"""
try:
insights = {
"competitive_advantage": [],
"critical_issues": [],
"improvement_opportunities": []
}
# 평점 기반 분석
try:
rating = float(store_info.get('rating', 0))
review_count = int(store_info.get('review_count', 0))
if rating >= 4.0:
insights["competitive_advantage"].append("높은 고객 만족도")
elif rating < 3.0:
insights["critical_issues"].append("평점 개선 시급")
if review_count < 10:
insights["improvement_opportunities"].append("온라인 리뷰 마케팅 필요")
elif review_count > 100:
insights["competitive_advantage"].append("활발한 고객 참여")
except:
pass
# 리뷰 내용 기반 분석
common_keywords = review_summary.get('common_keywords', [])
positive_keywords = ['', '친절', '깨끗', '빠름', '저렴']
negative_indicators = ['서비스', '가격', '위생']
for keyword in common_keywords:
if keyword in positive_keywords:
insights["competitive_advantage"].append(f"{keyword} 우수")
elif keyword in negative_indicators:
insights["critical_issues"].append(f"{keyword} 문제 지적")
# 영업 상태 확인
status = store_info.get('status', '')
if '영업 전' in status:
insights["improvement_opportunities"].append("영업시간 확장 검토")
return insights
except Exception as e:
logger.warning(f"비즈니스 인사이트 생성 실패: {e}")
return {"competitive_advantage": [], "critical_issues": [], "improvement_opportunities": []}
def _generate_industry_insights(self, competitive_insights: List[Dict]) -> List[str]:
"""업계 전체 인사이트 생성"""
insights = []
try:
# 평점 분석
ratings = []
for store in competitive_insights:
rating_str = store['performance_analysis']['performance'].get('rating', '0')
try:
rating = float(rating_str)
ratings.append(rating)
except:
continue
if ratings:
avg_rating = sum(ratings) / len(ratings)
if avg_rating < 3.5:
insights.append("업계 전반적으로 고객 만족도 개선 필요")
elif avg_rating > 4.0:
insights.append("경쟁이 치열한 고품질 시장")
# 공통 문제점 분석
common_issues = {}
for store in competitive_insights:
issues = store['performance_analysis']['feedback'].get('negative_aspects', [])
for issue in issues:
common_issues[issue] = common_issues.get(issue, 0) + 1
if common_issues:
top_issue = max(common_issues.items(), key=lambda x: x[1])
insights.append(f"업계 공통 이슈: {top_issue[0]}")
# 성공 패턴 분석
high_performers = [s for s in competitive_insights
if s.get('similarity_score', 0) > 0.8 and
float(s['performance_analysis']['performance'].get('rating', 0)) > 4.0]
if high_performers:
insights.append(f"고성과 업체 {len(high_performers)}개 벤치마킹 가능")
except Exception as e:
logger.warning(f"업계 인사이트 생성 실패: {e}")
return insights[:3] # 상위 3개만
def _generate_recommendations(self, competitive_insights: List[Dict]) -> Dict[str, List[str]]:
"""데이터 기반 추천사항 생성"""
recommendations = {
"immediate_actions": [],
"strategic_improvements": [],
"benchmarking_targets": []
}
try:
# 즉시 조치 사항
critical_issues = []
for store in competitive_insights:
issues = store['performance_analysis']['feedback'].get('negative_aspects', [])
critical_issues.extend(issues)
# 가장 빈번한 문제 해결
if critical_issues:
issue_count = {}
for issue in critical_issues:
issue_count[issue] = issue_count.get(issue, 0) + 1
top_issues = sorted(issue_count.items(), key=lambda x: x[1], reverse=True)[:2]
for issue, count in top_issues:
recommendations["immediate_actions"].append(f"{issue} 개선 (업계 {count}개 업체 공통 문제)")
# 전략적 개선사항
best_practices = []
for store in competitive_insights:
if store.get('similarity_score', 0) > 0.7:
practices = store['performance_analysis']['feedback'].get('positive_aspects', [])
best_practices.extend(practices)
if best_practices:
practice_count = {}
for practice in best_practices:
practice_count[practice] = practice_count.get(practice, 0) + 1
top_practices = sorted(practice_count.items(), key=lambda x: x[1], reverse=True)[:2]
for practice, count in top_practices:
recommendations["strategic_improvements"].append(f"{practice} 강화 (성공 업체들의 공통점)")
# 벤치마킹 대상
top_performers = sorted(competitive_insights,
key=lambda x: float(x['performance_analysis']['performance'].get('rating', 0)),
reverse=True)[:2]
for performer in top_performers:
name = performer.get('store_name', 'Unknown')
rating = performer['performance_analysis']['performance'].get('rating', 'N/A')
advantages = performer['performance_analysis']['business_insights'].get('competitive_advantage', [])
if advantages:
recommendations["benchmarking_targets"].append(f"{name} (평점 {rating}): {', '.join(advantages[:2])}")
except Exception as e:
logger.warning(f"추천사항 생성 실패: {e}")
return recommendations
def get_db_status(self) -> Dict[str, Any]:
"""DB 상태 정보 반환"""
try:
if not self.is_ready():
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'not_ready',
'error': self.initialization_error
}
# 컬렉션 정보 조회
count = self.collection.count()
return {
'collection_name': self.collection_name,
'total_documents': count,
'total_stores': count, # 각 문서가 하나의 가게를 나타냄
'db_path': self.db_path,
'status': 'ready',
'last_updated': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"DB 상태 조회 실패: {e}")
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'error',
'error': str(e)
}
def get_existing_store_data(self, region: str = None, food_category: str = None) -> Dict[str, Dict[str, Any]]:
"""
Vector DB에서 기존 store 데이터를 가져옵니다.
Args:
region: 지역 필터 (선택사항)
food_category: 음식 카테고리 필터 (선택사항)
Returns:
{store_id: {metadata, document_id}} 형태의 딕셔너리
"""
try:
if not self.is_ready():
logger.warning("⚠️ VectorService가 준비되지 않음")
return {}
# 필터 조건 설정
where_condition = {}
if region and food_category:
where_condition = {
"$and": [
{"region": region},
{"food_category": food_category}
]
}
elif region:
where_condition = {"region": region}
elif food_category:
where_condition = {"food_category": food_category}
# 기존 데이터 조회
if where_condition:
results = self.collection.get(
where=where_condition,
include=['metadatas', 'documents']
)
else:
results = self.collection.get(
include=['metadatas', 'documents']
)
if not results or not results.get('metadatas'):
logger.info("📊 Vector DB에 기존 데이터 없음")
return {}
# store_id별로 기존 데이터 매핑
existing_data = {}
metadatas = results.get('metadatas', [])
ids = results.get('ids', [])
documents = results.get('documents', [])
for i, metadata in enumerate(metadatas):
if metadata and 'store_id' in metadata:
store_id = metadata['store_id']
existing_data[store_id] = {
'metadata': metadata,
'document_id': ids[i] if i < len(ids) else None,
'document': documents[i] if i < len(documents) else None,
'last_updated': metadata.get('last_updated', 'unknown')
}
logger.info(f"📊 기존 Vector DB에서 {len(existing_data)}개 store 데이터 발견")
return existing_data
except Exception as e:
logger.error(f"❌ 기존 store 데이터 조회 실패: {e}")
return {}
def categorize_stores(self, review_results: List[tuple], region: str, food_category: str) -> Dict[str, Any]:
"""
가게들을 새 가게/업데이트 대상/현재 배치 중복으로 분류
"""
# 기존 Vector DB 데이터 조회
existing_data = self.get_existing_store_data(region, food_category)
existing_store_ids = set(existing_data.keys())
# 분류 작업
new_stores = [] # 완전히 새로운 가게
update_stores = [] # 기존 가게 업데이트
current_duplicates = [] # 현재 배치 내 중복
seen_in_batch = set()
for store_id, store_info, reviews in review_results:
# 현재 배치 내 중복 체크
if store_id in seen_in_batch:
current_duplicates.append((store_id, store_info, reviews))
logger.info(f"🔄 현재 배치 중복 발견: {store_id}")
continue
seen_in_batch.add(store_id)
# 기존 DB에 있는지 확인
if store_id in existing_store_ids:
# 업데이트 대상
update_stores.append({
'store_id': store_id,
'new_data': (store_id, store_info, reviews),
'existing_data': existing_data[store_id]
})
logger.info(f"🔄 업데이트 대상: {store_id}")
else:
# 새 가게
new_stores.append((store_id, store_info, reviews))
logger.info(f"✨ 새 가게: {store_id}")
result = {
'new_stores': new_stores,
'update_stores': update_stores,
'current_duplicates': current_duplicates,
'existing_data': existing_data,
'summary': {
'total_input': len(review_results),
'new_count': len(new_stores),
'update_count': len(update_stores),
'duplicate_count': len(current_duplicates),
'will_process': len(new_stores) + len(update_stores)
}
}
logger.info(f"📊 가게 분류 완료:")
logger.info(f" - 새 가게: {len(new_stores)}")
logger.info(f" - 업데이트: {len(update_stores)}")
logger.info(f" - 현재 중복: {len(current_duplicates)}")
return result
def update_existing_stores(self, update_stores: List[Dict], food_category: str, region: str) -> Dict[str, Any]:
"""
기존 가게들을 업데이트
"""
updated_count = 0
skipped_count = 0
update_details = []
for update_item in update_stores:
store_id = update_item['store_id']
new_data = update_item['new_data']
existing_data = update_item['existing_data']
try:
_, store_info, reviews = new_data
# 새 임베딩 및 메타데이터 생성
text_for_embedding = extract_text_for_embedding(store_info, reviews)
if not text_for_embedding or len(text_for_embedding.strip()) < 10:
logger.warning(f"⚠️ {store_id}: 임베딩 텍스트 부족, 업데이트 스킵")
skipped_count += 1
continue
embedding = self.embedding_model.encode(text_for_embedding)
new_metadata = create_metadata(store_info, food_category, region)
document_text = combine_store_and_reviews(store_info, reviews)
# 기존 document_id 사용 (ID 일관성 유지)
document_id = existing_data['document_id']
# Vector DB 업데이트 (ChromaDB에서는 upsert 사용)
self.collection.upsert(
documents=[document_text],
embeddings=[embedding.tolist()],
metadatas=[new_metadata],
ids=[document_id]
)
updated_count += 1
update_details.append({
'store_id': store_id,
'document_id': document_id,
'previous_updated': existing_data['metadata'].get('last_updated', 'unknown'),
'new_updated': new_metadata['last_updated']
})
logger.info(f"{store_id} 업데이트 완료")
except Exception as e:
logger.error(f"{store_id} 업데이트 실패: {e}")
skipped_count += 1
continue
return {
'updated_count': updated_count,
'skipped_count': skipped_count,
'update_details': update_details
}
def add_new_stores(self, new_stores: List[tuple], food_category: str, region: str) -> Dict[str, Any]:
"""
새 가게들을 Vector DB에 추가
"""
if not new_stores:
return {'added_count': 0, 'add_details': []}
documents = []
embeddings = []
metadatas = []
ids = []
added_count = 0
add_details = []
for store_id, store_info, reviews in new_stores:
try:
# 임베딩용 텍스트 생성
text_for_embedding = extract_text_for_embedding(store_info, reviews)
if not text_for_embedding or len(text_for_embedding.strip()) < 10:
logger.warning(f"⚠️ {store_id}: 임베딩 텍스트 부족, 추가 스킵")
continue
# 임베딩 생성
embedding = self.embedding_model.encode(text_for_embedding)
# 메타데이터 생성
metadata = create_metadata(store_info, food_category, region)
# 문서 ID 생성
document_id = create_store_hash(store_id, store_info.get('name', ''), region)
# 문서 데이터 생성
document_text = combine_store_and_reviews(store_info, reviews)
documents.append(document_text)
embeddings.append(embedding.tolist())
metadatas.append(metadata)
ids.append(document_id)
add_details.append({
'store_id': store_id,
'document_id': document_id,
'added_at': metadata['last_updated']
})
added_count += 1
except Exception as e:
logger.warning(f"⚠️ {store_id} 추가 실패: {e}")
continue
# Vector DB에 새 가게들 추가
if documents:
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
logger.info(f"{added_count}개 새 가게 추가 완료")
return {
'added_count': added_count,
'add_details': add_details
}