This commit is contained in:
hiondal
2025-06-15 13:52:26 +00:00
commit 6a5c411800
53 changed files with 15785 additions and 0 deletions
+336
View File
@@ -0,0 +1,336 @@
# app/services/claude_service.py
import json
import logging
from typing import Optional, Dict, Any, Tuple
import anthropic
from ..config.settings import settings
logger = logging.getLogger(__name__)
class ClaudeService:
"""Claude API 연동 서비스"""
def __init__(self):
try:
# API 키 유효성 검사
if not settings.CLAUDE_API_KEY or settings.CLAUDE_API_KEY.strip() == "":
raise ValueError("CLAUDE_API_KEY가 설정되지 않았습니다")
# Claude 클라이언트 초기화
self.client = anthropic.Anthropic(api_key=settings.CLAUDE_API_KEY)
self.model = settings.CLAUDE_MODEL
self.initialization_error = None
logger.info(f"✅ ClaudeService 초기화 완료 (모델: {self.model})")
except Exception as e:
self.initialization_error = str(e)
self.client = None
logger.error(f"❌ ClaudeService 초기화 실패: {e}")
def is_ready(self) -> bool:
"""서비스 준비 상태 확인"""
return self.client is not None and self.initialization_error is None
def get_initialization_error(self) -> Optional[str]:
"""초기화 에러 메시지 반환"""
return self.initialization_error
async def test_api_connection(self) -> Tuple[bool, Optional[str]]:
"""Claude API 연결을 테스트합니다."""
if not self.is_ready():
return False, self.initialization_error
try:
logger.info("🔍 Claude API 연결 테스트 시작...")
response = self.client.messages.create(
model=self.model,
max_tokens=50,
messages=[
{
"role": "user",
"content": "안녕하세요. 연결 테스트입니다. '연결 성공'이라고 답변해주세요."
}
]
)
if response.content and len(response.content) > 0:
logger.info("✅ Claude API 연결 테스트 성공")
return True, None
else:
error_msg = "Claude API 응답이 비어있음"
logger.warning(f"⚠️ {error_msg}")
return False, error_msg
except Exception as e:
error_msg = f"Claude API 연결 테스트 실패: {str(e)}"
logger.error(f"{error_msg}")
return False, error_msg
async def generate_action_recommendations(self, context: str, additional_context: Optional[str] = None) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
"""점주를 위한 액션 추천을 생성합니다."""
if not self.is_ready():
logger.error("ClaudeService가 준비되지 않음")
return None, None
try:
logger.info("🤖 Claude API를 통한 액션 추천 생성 시작")
prompt = self._build_action_prompt(context, additional_context)
response = self.client.messages.create(
model=self.model,
max_tokens=4000,
temperature=0.7,
messages=[
{
"role": "user",
"content": prompt
}
]
)
if response.content and len(response.content) > 0:
raw_response = response.content[0].text
logger.info(f"✅ 액션 추천 생성 완료: {len(raw_response)} 문자")
parsed_response = self._parse_json_response(raw_response)
return raw_response, parsed_response
else:
logger.warning("⚠️ Claude API 응답이 비어있음")
return None, None
except Exception as e:
logger.error(f"❌ 액션 추천 생성 중 오류: {e}")
return None, None
def _parse_json_response(self, raw_response: str) -> Optional[Dict[str, Any]]:
"""Claude의 원본 응답에서 JSON을 추출하고 파싱합니다."""
try:
import re
# JSON 블록 찾기
json_match = re.search(r'```json\s*\n(.*?)\n```', raw_response, re.DOTALL)
if json_match:
json_str = json_match.group(1).strip()
else:
brace_start = raw_response.find('{')
brace_end = raw_response.rfind('}')
if brace_start != -1 and brace_end != -1 and brace_end > brace_start:
json_str = raw_response[brace_start:brace_end + 1]
else:
logger.warning("⚠️ JSON 패턴을 찾을 수 없음")
return None
parsed_json = json.loads(json_str)
logger.info("✅ JSON 파싱 성공")
return parsed_json
except json.JSONDecodeError as e:
logger.warning(f"⚠️ JSON 파싱 실패: {e}")
return None
except Exception as e:
logger.warning(f"⚠️ JSON 추출 실패: {e}")
return None
def _build_action_prompt(self, context: str, additional_context: Optional[str] = None) -> str:
"""액션 추천을 위한 프롬프트를 구성합니다."""
base_prompt = f"""당신은 소상공인을 위한 경영 컨설턴트입니다. 아래 정보를 바탕으로 실질적이고 구체적인 액션 추천을 해주세요.
**분석 데이터:**
{context}
**추천 요구사항:**
1. 단기 계획 (1-3개월): 즉시 실행 가능한 개선사항
2. 중기 계획 (3-6개월): 점진적 개선 방안
3. 장기 계획 (6개월-1년): 전략적 발전 방향
**응답 형식:**
반드시 아래 JSON 형식으로만 응답해주세요:
```json
{{
"summary": {{
"current_situation": "현재 상황 요약",
"key_insights": ["핵심 인사이트 1", "핵심 인사이트 2", "핵심 인사이트 3"],
"priority_areas": ["우선 개선 영역 1", "우선 개선 영역 2"]
}},
"action_plans": {{
"short_term": [
{{
"title": "액션 제목",
"description": "구체적인 실행 방법",
"expected_impact": "예상 효과",
"timeline": "실행 기간",
"cost": "예상 비용"
}}
],
"mid_term": [
{{
"title": "액션 제목",
"description": "구체적인 실행 방법",
"expected_impact": "예상 효과",
"timeline": "실행 기간",
"cost": "예상 비용"
}}
],
"long_term": [
{{
"title": "액션 제목",
"description": "구체적인 실행 방법",
"expected_impact": "예상 효과",
"timeline": "실행 기간",
"cost": "예상 비용"
}}
]
}},
"implementation_tips": [
"실행 팁 1",
"실행 팁 2",
"실행 팁 3"
]
}}
```
**응답은 반드시 유효한 JSON 형식으로만 작성하고, JSON 앞뒤에 다른 텍스트는 포함하지 마세요.**"""
if additional_context:
base_prompt += f"\n\n**점주 추가 요청사항:**\n{additional_context}\n"
return base_prompt
# =============================================================================
# 호환성을 위한 메서드들 (기존 코드가 사용할 수 있도록)
# =============================================================================
async def get_recommendation(self, prompt: str) -> Optional[str]:
"""Claude API를 호출하여 추천을 받습니다. (호환성용)"""
if not self.is_ready():
logger.error("ClaudeService가 준비되지 않음")
return None
try:
logger.info("🤖 Claude API 호출 시작")
response = self.client.messages.create(
model=self.model,
max_tokens=4000,
temperature=0.7,
messages=[
{
"role": "user",
"content": prompt
}
]
)
if response.content and len(response.content) > 0:
raw_response = response.content[0].text
logger.info(f"✅ Claude API 응답 성공: {len(raw_response)} 문자")
return raw_response
else:
logger.warning("⚠️ Claude API 응답이 비어있음")
return None
except Exception as e:
logger.error(f"❌ Claude API 호출 실패: {e}")
return None
def parse_recommendation_response(self, raw_response: str) -> Optional[Dict[str, Any]]:
"""Claude 응답에서 JSON을 추출하고 파싱합니다. (호환성용)"""
return self._parse_json_response(raw_response)
def build_recommendation_prompt(self, store_id: str, context: str, vector_context: Optional[str] = None) -> str:
"""액션 추천용 프롬프트를 구성합니다. (호환성용)"""
prompt_parts = [
"당신은 소상공인을 위한 경영 컨설턴트입니다.",
f"가게 ID: {store_id}",
f"점주 요청: {context}"
]
if vector_context:
prompt_parts.extend([
"\n--- 동종 업체 분석 데이터 ---",
vector_context,
"--- 분석 데이터 끝 ---\n"
])
prompt_parts.extend([
"\n위 정보를 바탕으로 실질적이고 구체적인 액션 추천을 해주세요.",
"응답은 반드시 아래 JSON 형식으로만 작성해주세요:",
"",
"```json",
"{",
' "summary": {',
' "current_situation": "현재 상황 요약",',
' "key_insights": ["핵심 인사이트 1", "핵심 인사이트 2"],',
' "priority_areas": ["우선 개선 영역 1", "우선 개선 영역 2"]',
' },',
' "action_plans": {',
' "short_term": [',
' {',
' "title": "즉시 실행 가능한 액션",',
' "description": "구체적인 실행 방법",',
' "expected_impact": "예상 효과",',
' "timeline": "1-2주",',
' "cost": "예상 비용"',
' }',
' ],',
' "mid_term": [',
' {',
' "title": "중기 개선 방안",',
' "description": "구체적인 실행 방법",',
' "expected_impact": "예상 효과",',
' "timeline": "1-3개월",',
' "cost": "예상 비용"',
' }',
' ]',
' },',
' "implementation_tips": ["실행 팁 1", "실행 팁 2"]',
"}",
"```"
])
return "\n".join(prompt_parts)
def create_prompt_for_api_response(self, context: str, additional_context: Optional[str] = None) -> str:
"""API 응답용 프롬프트를 생성합니다. (호환성용)"""
return self._build_action_prompt(context, additional_context)
# =============================================================================
# 헬스체크 및 상태 확인 메서드들
# =============================================================================
def get_health_status(self) -> Dict[str, Any]:
"""ClaudeService 상태 확인"""
try:
status = {
"service": "claude_api",
"status": "healthy" if self.is_ready() else "unhealthy",
"model": self.model,
"api_key_configured": bool(settings.CLAUDE_API_KEY and settings.CLAUDE_API_KEY.strip()),
"timestamp": self._get_timestamp()
}
if self.initialization_error:
status["initialization_error"] = self.initialization_error
status["status"] = "error"
return status
except Exception as e:
return {
"service": "claude_api",
"status": "error",
"error": str(e),
"timestamp": self._get_timestamp()
}
def _get_timestamp(self) -> str:
"""현재 시간 문자열 반환"""
from datetime import datetime
return datetime.now().isoformat()
+235
View File
@@ -0,0 +1,235 @@
# app/services/restaurant_service.py (수정된 버전)
import aiohttp
import asyncio
import logging
from typing import List, Optional, Dict, Any
from ..config.settings import settings
from ..models.restaurant_models import RestaurantInfo
from ..utils.category_utils import extract_food_category
logger = logging.getLogger(__name__)
class RestaurantService:
"""음식점 API 연동 서비스"""
def __init__(self):
self.base_url = settings.get_restaurant_api_url()
self.timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT)
async def find_store_by_name_and_region(self, region: str, store_name: str) -> Optional[RestaurantInfo]:
"""
지역과 가게명으로 가게를 찾습니다.
Args:
region: 지역 (시군구 + 읍면동)
store_name: 가게명
Returns:
찾은 가게 정보 (첫 번째 결과)
"""
try:
logger.info(f"가게 검색 시작: region={region}, store_name={store_name}")
async with aiohttp.ClientSession(timeout=self.timeout) as session:
# Restaurant API 호출
url = f"{self.base_url}/collect"
payload = {
"query": store_name,
"region": region,
"size": 15,
"pages": 3,
"save_to_file": False
}
logger.info(f"Restaurant API 호출: {url}")
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.json()
restaurants = data.get('restaurants', [])
if restaurants:
# 첫 번째 결과 반환
restaurant_data = restaurants[0]
restaurant = RestaurantInfo(**restaurant_data)
logger.info(f"가게 찾기 성공: {restaurant.place_name}")
return restaurant
else:
logger.warning(f"가게를 찾을 수 없음: {store_name}")
return None
else:
logger.error(f"Restaurant API 호출 실패: HTTP {response.status}")
error_text = await response.text()
logger.error(f"Error response: {error_text}")
return None
except asyncio.TimeoutError:
logger.error("Restaurant API 호출 타임아웃")
return None
except Exception as e:
logger.error(f"가게 검색 중 오류: {str(e)}")
return None
def _clean_food_category(self, food_category: str) -> str:
"""
음식 카테고리를 정리하여 검색 키워드로 변환합니다.
Args:
food_category: 원본 음식 카테고리 (예: "육류,고기")
Returns:
정리된 검색 키워드 (예: "육류 고기")
"""
if not food_category:
return "음식점"
# 콤마와 슬래시를 공백으로 변경
cleaned = food_category.replace(',', ' ').replace('/', ' ')
# 불필요한 단어 제거
stop_words = ['음식점', '요리', '전문점', '맛집']
keywords = []
for keyword in cleaned.split():
keyword = keyword.strip()
if keyword and keyword not in stop_words:
keywords.append(keyword)
# 키워드가 없으면 기본 검색어 사용
if not keywords:
return "음식점"
# 🔧 지역 정보는 포함하지 않고 음식 카테고리만 반환
return ' '.join(keywords)
async def find_similar_stores(self, region: str, food_category: str, max_count: int = 50) -> List[RestaurantInfo]:
"""
동종 업체를 찾습니다.
Args:
region: 지역
food_category: 음식 카테고리
max_count: 최대 검색 개수
Returns:
동종 업체 목록
"""
try:
logger.info(f"동종 업체 검색 시작: region={region}, food_category={food_category}")
# 🔧 검색 쿼리 생성 (음식 카테고리만 포함)
search_query = self._clean_food_category(food_category)
logger.info(f"음식점 수집 요청: query='{search_query}' region='{region}' size=15 pages=1 save_to_file=False")
similar_stores = []
async with aiohttp.ClientSession(timeout=self.timeout) as session:
# 페이지별로 검색 (최대 5페이지)
max_pages = min(5, (max_count // 15) + 1)
for page in range(1, max_pages + 1):
if len(similar_stores) >= max_count:
break
url = f"{self.base_url}/collect"
# 🔧 수정된 payload - query에는 음식 카테고리만, region은 분리
payload = {
"query": search_query, # 🔧 음식 카테고리만 포함
"region": region, # 🔧 지역 정보는 별도 파라미터
"size": 15,
"pages": 1, # 페이지별로 하나씩 호출
"save_to_file": False
}
logger.info(f"동종 업체 검색 페이지 {page}: query='{search_query}' region='{region}'")
try:
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.json()
restaurants = data.get('restaurants', [])
for restaurant_data in restaurants:
if len(similar_stores) >= max_count:
break
try:
restaurant = RestaurantInfo(**restaurant_data)
# 카테고리 필터링
restaurant_category = extract_food_category(restaurant.category_name)
if self._is_similar_food_category(food_category, restaurant_category):
similar_stores.append(restaurant)
logger.debug(f"유사 카테고리 매치: {restaurant.place_name} ({restaurant_category})")
except Exception as e:
logger.warning(f"음식점 데이터 파싱 실패: {e}")
continue
logger.info(f"페이지 {page} 완료: {len(restaurants)}개 음식점 수집")
else:
logger.warning(f"동종 업체 검색 실패 (페이지 {page}): HTTP {response.status}")
continue
except Exception as e:
logger.warning(f"페이지 {page} 검색 중 오류: {e}")
continue
# API 요청 제한을 위한 지연
await asyncio.sleep(settings.REQUEST_DELAY)
logger.info(f"동종 업체 검색 완료: 총 {len(similar_stores)}")
return similar_stores
except Exception as e:
logger.error(f"동종 업체 검색 중 오류: {str(e)}")
return []
def _is_similar_food_category(self, target_category: str, restaurant_category: str) -> bool:
"""
음식 카테고리가 유사한지 확인합니다.
Args:
target_category: 대상 카테고리
restaurant_category: 음식점 카테고리
Returns:
유사성 여부
"""
if not target_category or not restaurant_category:
return False
# 정규화
target_lower = target_category.lower().strip()
restaurant_lower = restaurant_category.lower().strip()
# 완전 일치
if target_lower == restaurant_lower:
return True
# 키워드 기반 매칭
target_keywords = set(target_lower.replace(',', ' ').replace('/', ' ').split())
restaurant_keywords = set(restaurant_lower.replace(',', ' ').replace('/', ' ').split())
# 교집합이 있으면 유사한 것으로 판단
common_keywords = target_keywords.intersection(restaurant_keywords)
return len(common_keywords) > 0
async def health_check(self) -> bool:
"""
Restaurant API 상태를 확인합니다.
Returns:
API 상태 (True: 정상, False: 비정상)
"""
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
url = f"{self.base_url}/health"
async with session.get(url) as response:
return response.status == 200
except Exception as e:
logger.error(f"Restaurant API 헬스체크 실패: {e}")
return False
+467
View File
@@ -0,0 +1,467 @@
# vector/app/services/review_service.py
import aiohttp
import asyncio
import logging
from typing import List, Dict, Any, Tuple, Optional
from ..config.settings import settings
from ..models.review_models import ReviewAnalysisResponse, StoreInfo, ReviewData
from ..models.restaurant_models import RestaurantInfo
logger = logging.getLogger(__name__)
class ReviewService:
"""리뷰 API 연동 서비스 (본인 가게 우선 처리 강화)"""
def __init__(self):
self.base_url = settings.get_review_api_url()
self.timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT)
async def collect_store_reviews(self, store_id: str, max_reviews: int = 100) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]:
"""
단일 가게의 리뷰를 수집합니다. (본인 가게용 강화 처리)
Args:
store_id: 카카오맵 가게 ID
max_reviews: 최대 수집할 리뷰 수
Returns:
(가게 정보, 리뷰 목록) 튜플
"""
try:
logger.info(f"🏪 가게 리뷰 수집 시작: store_id={store_id} (최대 {max_reviews}개)")
# 본인 가게는 더 관대한 타임아웃 설정
timeout = aiohttp.ClientTimeout(total=900) # 15분
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{self.base_url}/analyze"
payload = {
"store_id": store_id,
"days_limit": None, # 모든 날짜의 리뷰 수집
"max_time": min(600, max_reviews * 3) # 리뷰 수에 따라 시간 조정, 최대 10분
}
logger.info(f"Review API 호출: {url} (타임아웃: {payload['max_time']}초)")
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.json()
if data.get('success', False):
store_info = data.get('store_info')
reviews = data.get('reviews', [])
logger.info(f"📊 원본 리뷰 수집: {len(reviews)}")
# 리뷰 품질 필터링
filtered_reviews = self._filter_quality_reviews(reviews)
logger.info(f"📊 품질 필터링 후: {len(filtered_reviews)}")
# 리뷰 개수 제한
if len(filtered_reviews) > max_reviews:
filtered_reviews = filtered_reviews[:max_reviews]
logger.info(f"📊 최종 리뷰 수: {len(filtered_reviews)}개 (제한 적용)")
# 가게 정보와 리뷰 변환
converted_store_info = self._convert_store_info(store_info)
converted_reviews = self._convert_reviews(filtered_reviews)
if converted_store_info and converted_reviews:
logger.info(f"✅ 리뷰 수집 성공: {len(converted_reviews)}")
return converted_store_info, converted_reviews
else:
logger.warning("⚠️ 변환된 데이터가 비어있음")
return None, []
else:
error_msg = data.get('message', 'Unknown error')
logger.error(f"❌ 리뷰 분석 실패: {error_msg}")
return None, []
else:
logger.error(f"❌ Review API 호출 실패: HTTP {response.status}")
error_text = await response.text()
logger.error(f"Error response: {error_text}")
return None, []
except asyncio.TimeoutError:
logger.error("❌ Review API 호출 타임아웃")
return None, []
except Exception as e:
logger.error(f"❌ 리뷰 수집 중 오류: {str(e)}")
return None, []
def _filter_quality_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""리뷰 품질 필터링"""
try:
filtered = []
for review in reviews:
content = review.get('content', '').strip()
rating = review.get('rating', 0)
# 품질 기준
if (len(content) >= 10 and # 최소 10자 이상
rating > 0 and # 별점이 있어야 함
not self._is_spam_review(content)): # 스팸 제외
filtered.append(review)
logger.debug(f"품질 필터링: {len(reviews)}{len(filtered)}")
return filtered
except Exception as e:
logger.warning(f"⚠️ 리뷰 필터링 실패: {e}")
return reviews # 실패 시 원본 반환
def _is_spam_review(self, content: str) -> bool:
"""스팸 리뷰 판별"""
try:
spam_keywords = [
"추천추천", "최고최고", "맛있어요맛있어요",
"좋아요좋아요", "ㅎㅎㅎㅎ", "ㅋㅋㅋㅋ",
"굿굿굿", "Nice", "Good"
]
content_lower = content.lower()
# 스팸 키워드 확인
for keyword in spam_keywords:
if keyword.lower() in content_lower:
return True
# 너무 짧거나 반복 문자 확인
if len(set(content.replace(' ', ''))) < 3: # 고유 문자 3개 미만
return True
return False
except Exception as e:
logger.warning(f"⚠️ 스팸 판별 실패: {e}")
return False
def _convert_store_info(self, store_info):
"""가게 정보 변환 (강화된 버전)"""
if not store_info:
logger.warning("⚠️ 가게 정보가 비어있음")
return None
try:
converted = {
'id': str(store_info.get('id', '')),
'name': str(store_info.get('name', '')),
'category': str(store_info.get('category', '')),
'rating': str(store_info.get('rating', '')),
'review_count': str(store_info.get('review_count', '')),
'status': str(store_info.get('status', '')),
'address': str(store_info.get('address', ''))
}
# 필수 필드 확인 (ID와 이름은 반드시 있어야 함)
if not converted['id'] or not converted['name']:
logger.warning(f"⚠️ 필수 가게 정보 누락: ID={converted['id']}, Name={converted['name']}")
return None
logger.debug(f"가게 정보 변환 성공: {converted['name']}")
return converted
except Exception as e:
logger.error(f"❌ 가게 정보 변환 실패: {e}")
return None
def _convert_reviews(self, reviews):
"""리뷰 목록 변환 (강화된 버전)"""
if not reviews:
logger.warning("⚠️ 리뷰 목록이 비어있음")
return []
converted_reviews = []
for i, review in enumerate(reviews):
try:
# 안전한 형변환
rating = 0
try:
rating = int(review.get('rating', 0))
except (ValueError, TypeError):
rating = 0
likes = 0
try:
likes = int(review.get('likes', 0))
except (ValueError, TypeError):
likes = 0
photo_count = 0
try:
photo_count = int(review.get('photo_count', 0))
except (ValueError, TypeError):
photo_count = 0
converted_review = {
'reviewer_name': str(review.get('reviewer_name', 'Anonymous')),
'rating': rating,
'date': str(review.get('date', '')),
'content': str(review.get('content', '')).strip(),
'badges': list(review.get('badges', [])),
'likes': likes,
'photo_count': photo_count,
'has_photos': bool(photo_count > 0)
}
# 기본 검증
if converted_review['content'] and converted_review['rating'] > 0:
converted_reviews.append(converted_review)
else:
logger.debug(f"⚠️ 리뷰 {i+1} 품질 미달로 제외")
except Exception as e:
logger.warning(f"⚠️ 리뷰 {i+1} 변환 실패: {e}")
continue
logger.info(f"리뷰 변환 완료: {len(reviews)}{len(converted_reviews)}")
return converted_reviews
async def collect_multiple_stores_reviews(self, restaurants: List[RestaurantInfo]) -> List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]]:
"""
여러 가게의 리뷰를 수집합니다.
Args:
restaurants: 음식점 목록
Returns:
(store_id, 가게 정보, 리뷰 목록) 튜플의 리스트
"""
try:
logger.info(f"다중 가게 리뷰 수집 시작: {len(restaurants)}개 가게")
results = []
# 동시성 제한을 위해 세마포어 사용
semaphore = asyncio.Semaphore(3) # 최대 3개 동시 요청
async def collect_single_store(restaurant: RestaurantInfo):
async with semaphore:
try:
# 카카오맵 place_url에서 store_id 추출 시도
store_id = self._extract_store_id_from_url(restaurant.place_url)
if not store_id:
# URL에서 추출 실패 시 restaurant.id 사용
store_id = restaurant.id
if not store_id:
logger.warning(f"Store ID를 찾을 수 없음: {restaurant.place_name}")
return None
logger.info(f"가게 {restaurant.place_name} 리뷰 수집 중...")
# 리뷰 수집 (최대 50개로 제한)
store_info, reviews = await self.collect_store_reviews(
store_id,
max_reviews=settings.MAX_REVIEWS_PER_RESTAURANT
)
if store_info and reviews:
return (store_id, store_info, reviews)
else:
logger.warning(f"가게 {restaurant.place_name}의 리뷰 수집 실패")
return None
except Exception as e:
logger.error(f"가게 {restaurant.place_name} 리뷰 수집 중 오류: {e}")
return None
finally:
# API 요청 제한을 위한 지연
await asyncio.sleep(settings.REQUEST_DELAY)
# 병렬 처리
tasks = [collect_single_store(restaurant) for restaurant in restaurants]
results_raw = await asyncio.gather(*tasks, return_exceptions=True)
# 성공한 결과만 필터링
for result in results_raw:
if result and not isinstance(result, Exception):
results.append(result)
logger.info(f"다중 가게 리뷰 수집 완료: {len(results)}개 성공")
return results
except Exception as e:
logger.error(f"다중 가게 리뷰 수집 중 오류: {str(e)}")
return []
def _extract_store_id_from_url(self, place_url: str) -> Optional[str]:
"""
카카오맵 URL에서 store_id를 추출합니다.
Args:
place_url: 카카오맵 장소 URL
Returns:
추출된 store_id 또는 None
"""
try:
if not place_url:
return None
# URL 패턴: https://place.map.kakao.com/123456789
import re
pattern = r'/(\d+)(?:\?|$|#)'
match = re.search(pattern, place_url)
if match:
store_id = match.group(1)
logger.debug(f"URL에서 store_id 추출: {store_id}")
return store_id
else:
logger.debug(f"URL에서 store_id 추출 실패: {place_url}")
return None
except Exception as e:
logger.warning(f"store_id 추출 중 오류: {e}")
return None
def _build_store_info_from_restaurant(self, restaurant: RestaurantInfo) -> Dict[str, Any]:
"""
RestaurantInfo를 store_info 형식으로 변환합니다.
Args:
restaurant: RestaurantInfo 객체
Returns:
store_info 딕셔너리
"""
try:
return {
'id': restaurant.id,
'name': restaurant.place_name,
'category': restaurant.category_name,
'rating': '', # API에서 제공되지 않음
'review_count': '', # API에서 제공되지 않음
'status': '', # API에서 제공되지 않음
'address': restaurant.address_name
}
except Exception as e:
logger.error(f"RestaurantInfo 변환 실패: {e}")
return {
'id': '',
'name': '',
'category': '',
'rating': '',
'review_count': '',
'status': '',
'address': ''
}
def _convert_single_review_data(self, review_data: dict) -> dict:
"""
단일 리뷰 데이터 변환
Args:
review_data: API 응답의 단일 리뷰 데이터
Returns:
변환된 리뷰 데이터
"""
try:
return {
'reviewer_name': review_data.get('reviewer_name', ''),
'reviewer_level': review_data.get('reviewer_level', ''),
'reviewer_stats': review_data.get('reviewer_stats', {}),
'rating': int(review_data.get('rating', 0)),
'date': review_data.get('date', ''),
'content': review_data.get('content', ''),
'badges': review_data.get('badges', []),
'likes': int(review_data.get('likes', 0)),
'photo_count': int(review_data.get('photo_count', 0)),
'has_photos': bool(review_data.get('has_photos', False))
}
except Exception as e:
logger.warning(f"단일 리뷰 데이터 변환 실패: {e}")
return {
'reviewer_name': 'Unknown',
'reviewer_level': '',
'reviewer_stats': {},
'rating': 0,
'date': '',
'content': '',
'badges': [],
'likes': 0,
'photo_count': 0,
'has_photos': False
}
async def health_check(self) -> bool:
"""
Review API 상태를 확인합니다.
Returns:
API 상태 (True: 정상, False: 비정상)
"""
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
url = f"{self.base_url}/health"
async with session.get(url) as response:
is_healthy = response.status == 200
if is_healthy:
logger.debug("Review API 헬스체크 성공")
else:
logger.warning(f"Review API 헬스체크 실패: HTTP {response.status}")
return is_healthy
except Exception as e:
logger.error(f"Review API 헬스체크 실패: {e}")
return False
def get_api_info(self) -> Dict[str, Any]:
"""
Review API 정보를 반환합니다.
Returns:
API 정보 딕셔너리
"""
return {
"service_name": "Review API Service",
"base_url": self.base_url,
"timeout": self.timeout.total,
"max_reviews_per_restaurant": settings.MAX_REVIEWS_PER_RESTAURANT,
"request_delay": settings.REQUEST_DELAY
}
async def test_connection(self) -> Dict[str, Any]:
"""
Review API 연결을 테스트합니다.
Returns:
테스트 결과
"""
test_result = {
"service": "Review API",
"base_url": self.base_url,
"status": "unknown",
"response_time": None,
"error": None
}
try:
import time
start_time = time.time()
is_healthy = await self.health_check()
response_time = time.time() - start_time
test_result["response_time"] = round(response_time, 3)
if is_healthy:
test_result["status"] = "healthy"
logger.info(f"Review API 연결 테스트 성공: {response_time:.3f}")
else:
test_result["status"] = "unhealthy"
test_result["error"] = "Health check failed"
logger.warning("Review API 연결 테스트 실패: 헬스체크 실패")
except Exception as e:
test_result["status"] = "error"
test_result["error"] = str(e)
logger.error(f"Review API 연결 테스트 오류: {e}")
return test_result
+728
View File
@@ -0,0 +1,728 @@
# app/services/vector_service.py (개선된 버전)
import os
import json
import logging
import time
import shutil
import signal
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import chromadb
from chromadb.config import Settings as ChromaSettings
from sentence_transformers import SentenceTransformer
from ..config.settings import settings
from ..utils.data_utils import (
create_store_hash, combine_store_and_reviews, generate_review_summary,
extract_text_for_embedding, create_metadata, is_duplicate_store
)
logger = logging.getLogger(__name__)
class VectorService:
"""Vector DB 서비스 (개선된 초기화 로직)"""
def __init__(self):
self.db_path = settings.VECTOR_DB_PATH
self.collection_name = settings.VECTOR_DB_COLLECTION
self.embedding_model_name = settings.EMBEDDING_MODEL
# 상태 변수
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
# 안전한 초기화 시도
self._safe_initialize()
def _safe_initialize(self):
"""안전한 초기화 - 개선된 로직"""
try:
logger.info("🔧 VectorService 초기화 시작...")
# 1단계: 디렉토리 권한 확인
self._ensure_directory_permissions()
# 2단계: ChromaDB 초기화 (호환성 확인 포함)
self._initialize_chromadb_with_compatibility_check()
# 3단계: 임베딩 모델 로드
self._initialize_embedding_model()
logger.info("✅ VectorService 초기화 완료")
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ VectorService 초기화 실패: {e}")
logger.info("🔄 서비스는 런타임에 재시도 가능합니다")
def _ensure_directory_permissions(self):
"""Vector DB 디렉토리 권한을 확인하고 생성합니다"""
try:
logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}")
# 절대 경로로 변환
abs_path = os.path.abspath(self.db_path)
# 디렉토리 생성
os.makedirs(abs_path, mode=0o755, exist_ok=True)
# 권한 확인
if not os.access(abs_path, os.W_OK):
logger.warning(f"⚠️ 쓰기 권한 없음: {abs_path}")
# 권한 변경 시도
try:
os.chmod(abs_path, 0o755)
logger.info("✅ 디렉토리 권한 변경 성공")
except Exception as chmod_error:
logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}")
# 임시 디렉토리로 대체
import tempfile
temp_dir = tempfile.mkdtemp(prefix="vectordb_")
logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}")
self.db_path = temp_dir
abs_path = temp_dir
# 테스트 파일 생성/삭제로 권한 확인
test_file = os.path.join(abs_path, "test_permissions.tmp")
try:
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
logger.info("✅ 디렉토리 권한 확인 완료")
except Exception as test_error:
raise Exception(f"디렉토리 권한 테스트 실패: {test_error}")
except Exception as e:
logger.error(f"❌ 디렉토리 설정 실패: {e}")
raise
def _initialize_chromadb_with_compatibility_check(self):
"""ChromaDB 초기화 (호환성 확인 포함)"""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
logger.info(f"🔄 ChromaDB 초기화 시도 {attempt + 1}/{max_retries}")
# 1단계: 기존 DB 호환성 확인
existing_db_valid = self._check_existing_db_compatibility()
# 2단계: ChromaDB 클라이언트 생성
self._create_chromadb_client()
# 3단계: 컬렉션 초기화
self._initialize_collection(existing_db_valid)
logger.info("✅ ChromaDB 초기화 완료")
return # 성공 시 루프 종료
except Exception as e:
logger.error(f"❌ ChromaDB 초기화 실패 (시도 {attempt + 1}): {e}")
if attempt < max_retries - 1:
logger.info(f"🔄 {retry_delay}초 후 재시도...")
time.sleep(retry_delay)
retry_delay *= 2 # 지수 백오프
else:
raise Exception(f"ChromaDB 초기화 최종 실패: {e}")
def _check_existing_db_compatibility(self):
"""기존 DB 호환성 확인"""
try:
if not os.path.exists(self.db_path):
logger.info("📁 새 DB 디렉토리 - 스키마 확인 불필요")
return False
db_files = [f for f in os.listdir(self.db_path) if not f.startswith('.')]
if not db_files:
logger.info("📁 빈 DB 디렉토리 - 스키마 확인 불필요")
return False
logger.info(f"📁 기존 DB 파일 발견: {db_files}")
# 실제 호환성 테스트
logger.info("🔍 기존 DB 호환성 테스트 중...")
try:
# 테스트용 클라이언트 생성
test_client = chromadb.PersistentClient(path=self.db_path)
test_client.heartbeat()
# 컬렉션 접근 시도
try:
test_collection = test_client.get_collection(name=self.collection_name)
count = test_collection.count()
logger.info(f"✅ 기존 DB 호환성 확인 완료: {count}개 벡터 존재")
return True
except Exception as collection_error:
logger.info(f"📝 기존 컬렉션 없음 (정상): {collection_error}")
return True # DB는 정상, 컬렉션만 새로 생성하면 됨
except Exception as compatibility_error:
# 실제 호환성 문제 발견
logger.warning(f"⚠️ 실제 호환성 문제 발견: {compatibility_error}")
self._backup_incompatible_db()
return False
except Exception as e:
logger.warning(f"⚠️ 호환성 확인 중 오류: {e}")
return False
def _backup_incompatible_db(self):
"""호환성 문제가 있는 DB 백업"""
try:
backup_path = f"{self.db_path}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.warning(f"🔄 호환성 문제로 기존 DB 백업: {backup_path}")
shutil.move(self.db_path, backup_path)
logger.info(f"✅ 기존 DB 백업 완료: {backup_path}")
# 새 디렉토리 생성
os.makedirs(self.db_path, exist_ok=True)
# 오래된 백업 정리 (7일 이상)
self._cleanup_old_backups()
except Exception as backup_error:
logger.warning(f"⚠️ 백업 실패, 강제 삭제 진행: {backup_error}")
shutil.rmtree(self.db_path, ignore_errors=True)
os.makedirs(self.db_path, exist_ok=True)
def _cleanup_old_backups(self):
"""오래된 백업 파일 정리 (7일 이상)"""
try:
base_path = os.path.dirname(self.db_path)
backup_pattern = f"{os.path.basename(self.db_path)}_backup_"
cutoff_time = time.time() - (7 * 24 * 3600) # 7일 전
for item in os.listdir(base_path):
if item.startswith(backup_pattern):
backup_path = os.path.join(base_path, item)
if os.path.isdir(backup_path) and os.path.getctime(backup_path) < cutoff_time:
shutil.rmtree(backup_path, ignore_errors=True)
logger.info(f"🗑️ 오래된 백업 삭제: {backup_path}")
except Exception as e:
logger.warning(f"⚠️ 백업 정리 중 오류: {e}")
def _create_chromadb_client(self):
"""ChromaDB 클라이언트 생성"""
try:
# 최신 버전 호환 설정
chroma_settings = ChromaSettings(
anonymized_telemetry=False,
allow_reset=True,
is_persistent=True
)
self.client = chromadb.PersistentClient(
path=self.db_path,
settings=chroma_settings
)
logger.info("✅ ChromaDB 클라이언트 생성 성공")
except Exception as modern_error:
logger.warning(f"⚠️ 최신 설정 실패, 간단한 설정으로 재시도: {modern_error}")
# 간단한 설정으로 재시도
self.client = chromadb.PersistentClient(path=self.db_path)
logger.info("✅ ChromaDB 간단 설정 클라이언트 생성 성공")
# 연결 테스트
try:
self.client.heartbeat()
logger.info("✅ ChromaDB 연결 테스트 성공")
except Exception as heartbeat_error:
logger.warning(f"⚠️ Heartbeat 실패 (무시): {heartbeat_error}")
def _initialize_collection(self, existing_db_valid: bool):
"""컬렉션 초기화"""
try:
if existing_db_valid:
# 기존 컬렉션 로드 시도
try:
self.collection = self.client.get_collection(name=self.collection_name)
count = self.collection.count()
logger.info(f"✅ 기존 컬렉션 로드 성공: {self.collection_name} ({count}개 벡터)")
return
except Exception as get_error:
logger.info(f"📝 기존 컬렉션 없음, 새로 생성: {get_error}")
# 새 컬렉션 생성
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={
"description": "Restaurant reviews vector store",
"created_at": datetime.now().isoformat(),
"version": "1.0"
}
)
logger.info(f"✅ 새 컬렉션 생성 성공: {self.collection_name}")
except Exception as create_error:
logger.error(f"❌ 컬렉션 초기화 실패: {create_error}")
# 대체 컬렉션명으로 재시도
fallback_name = f"{self.collection_name}_{int(time.time())}"
logger.info(f"🔄 대체 컬렉션명으로 재시도: {fallback_name}")
self.collection = self.client.create_collection(
name=fallback_name,
metadata={"description": "Restaurant reviews (fallback)"}
)
self.collection_name = fallback_name
logger.info(f"✅ 대체 컬렉션 생성 성공: {fallback_name}")
def _initialize_embedding_model(self):
"""임베딩 모델 초기화"""
try:
logger.info(f"🤖 임베딩 모델 로드 시작: {self.embedding_model_name}")
# 캐시 디렉토리 설정
cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "sentence_transformers")
os.makedirs(cache_dir, exist_ok=True)
# 권한 확인
if not os.access(cache_dir, os.W_OK):
import tempfile
cache_dir = tempfile.mkdtemp(prefix="st_cache_")
logger.info(f"🔄 임시 캐시 디렉토리 사용: {cache_dir}")
# 모델 로드 (타임아웃 설정)
def timeout_handler(signum, frame):
raise TimeoutError("임베딩 모델 로드 타임아웃")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300) # 5분 타임아웃
try:
self.embedding_model = SentenceTransformer(
self.embedding_model_name,
cache_folder=cache_dir,
device='cpu' # CPU 사용 명시
)
signal.alarm(0) # 타임아웃 해제
# 모델 테스트
test_embedding = self.embedding_model.encode(["테스트 문장"])
logger.info(f"✅ 임베딩 모델 로드 성공: {test_embedding.shape}")
except TimeoutError:
signal.alarm(0)
raise Exception("임베딩 모델 로드 타임아웃 (5분)")
except Exception as e:
logger.error(f"❌ 임베딩 모델 로드 실패: {e}")
raise
def is_ready(self) -> bool:
"""서비스 준비 상태 확인"""
return (
self.client is not None and
self.collection is not None and
self.embedding_model is not None and
self.initialization_error is None
)
def get_initialization_error(self) -> Optional[str]:
"""초기화 에러 메시지 반환"""
return self.initialization_error
def retry_initialization(self) -> bool:
"""초기화 재시도"""
try:
logger.info("🔄 VectorService 초기화 재시도...")
# 상태 초기화
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
# 재초기화
self._safe_initialize()
return self.is_ready()
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ 초기화 재시도 실패: {e}")
return False
def reset_vector_db(self) -> Dict[str, Any]:
"""Vector DB 완전 리셋"""
try:
logger.info("🔄 Vector DB 완전 리셋 시작...")
# 기존 클라이언트 정리
self.client = None
self.collection = None
# DB 디렉토리 완전 삭제
if os.path.exists(self.db_path):
shutil.rmtree(self.db_path, ignore_errors=True)
logger.info(f"✅ 기존 DB 디렉토리 삭제: {self.db_path}")
# 새 디렉토리 생성
os.makedirs(self.db_path, exist_ok=True)
logger.info(f"✅ 새 DB 디렉토리 생성: {self.db_path}")
# 재초기화
success = self.retry_initialization()
if success:
return {
"success": True,
"message": "Vector DB가 성공적으로 리셋되었습니다",
"collection_name": self.collection_name,
"db_path": self.db_path
}
else:
return {
"success": False,
"error": self.initialization_error or "재초기화 실패"
}
except Exception as e:
logger.error(f"❌ Vector DB 리셋 실패: {e}")
return {
"success": False,
"error": str(e)
}
def get_health_status(self) -> Dict[str, Any]:
"""서비스 상태 확인"""
try:
status = {
"service": "vector_db",
"status": "healthy" if self.is_ready() else "unhealthy",
"db_path": self.db_path,
"collection_name": self.collection_name,
"embedding_model": self.embedding_model_name,
"timestamp": datetime.now().isoformat()
}
if self.initialization_error:
status["initialization_error"] = self.initialization_error
status["status"] = "error"
# 상세 상태
status["components"] = {
"client": "connected" if self.client else "disconnected",
"collection": "ready" if self.collection else "not_ready",
"embedding": "loaded" if self.embedding_model else "not_loaded"
}
# 컬렉션 정보
if self.collection:
try:
status["collection_count"] = self.collection.count()
except Exception as e:
status["collection_error"] = str(e)
return status
except Exception as e:
return {
"service": "vector_db",
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def get_store_context(self, store_id: str) -> Optional[str]:
"""스토어 ID로 컨텍스트 조회"""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 스토어 ID로 검색
results = self.collection.get(
where={"store_id": store_id}
)
if not results or not results.get('documents'):
logger.warning(f"스토어 ID '{store_id}'에 대한 데이터 없음")
return None
# 컨텍스트 생성
documents = results['documents']
metadatas = results.get('metadatas', [])
context_parts = []
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
# 메타데이터 정보 추가
if metadata:
context_parts.append(f"[{metadata.get('store_name', 'Unknown')}]")
context_parts.append(doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"스토어 컨텍스트 조회 실패: {e}")
return None
async def build_vector_store(self, store_info: Dict[str, Any], similar_stores_data: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str) -> Dict[str, Any]:
"""Vector Store 구축 (완전 수정된 버전)"""
try:
if not self.is_ready():
# 재시도 한 번 더
if not self.retry_initialization():
raise Exception("Vector DB가 초기화되지 않았습니다")
logger.info(f"Vector Store 구축 시작: {len(similar_stores_data)}개 스토어")
# 통계 초기화
stats = {
"total_processed": 0,
"newly_added": 0,
"updated": 0,
"duplicates": 0,
"errors": 0
}
# 배치 처리용 리스트
all_documents = []
all_embeddings = []
all_metadatas = []
all_ids = []
# 각 스토어 처리
for store_id, store_data, reviews in similar_stores_data:
try:
# 데이터 검증
if not store_data or not reviews:
logger.warning(f"스토어 '{store_id}' 데이터 부족: store_data={bool(store_data)}, reviews={len(reviews) if reviews else 0}")
stats["errors"] += 1
continue
# 올바른 create_store_hash 호출
store_hash = create_store_hash(
store_id=store_id,
store_name=store_data.get('place_name', ''),
region=region
)
# ChromaDB에서 직접 중복 확인 (is_duplicate_store 함수 사용하지 않음)
try:
# 같은 store_id로 이미 저장된 데이터가 있는지 확인
existing_data = self.collection.get(
where={"store_id": store_id},
limit=1
)
if existing_data and len(existing_data.get('ids', [])) > 0:
logger.debug(f"중복 스토어 건너뛰기: {store_id}")
stats["duplicates"] += 1
continue
except Exception as dup_check_error:
# 중복 확인 실패는 로그만 남기고 계속 진행
logger.warning(f"중복 확인 실패 (계속 진행): {dup_check_error}")
# 올바른 extract_text_for_embedding 호출
embedding_text = extract_text_for_embedding(
store_info=store_data,
reviews=reviews
)
# 임베딩 생성
try:
embedding = self.embedding_model.encode(embedding_text)
embedding = embedding.tolist() # numpy array를 list로 변환
except Exception as embed_error:
logger.error(f"임베딩 생성 실패 (store_id: {store_id}): {embed_error}")
stats["errors"] += 1
continue
# 올바른 create_metadata 호출
metadata = create_metadata(
store_info=store_data,
food_category=food_category,
region=region
)
# 배치에 추가
all_documents.append(embedding_text)
all_embeddings.append(embedding)
all_metadatas.append(metadata)
all_ids.append(f"{store_id}_{store_hash}")
stats["total_processed"] += 1
stats["newly_added"] += 1
if stats["total_processed"] % 10 == 0:
logger.info(f"처리 진행률: {stats['total_processed']}/{len(similar_stores_data)}")
except Exception as store_error:
logger.error(f"음식점 처리 중 오류 (store_id: {store_id}): {store_error}")
stats["errors"] += 1
continue
# 배치로 벡터 저장
if all_documents:
logger.info(f"벡터 배치 저장 시작: {len(all_documents)}")
try:
self.collection.add(
documents=all_documents,
embeddings=all_embeddings,
metadatas=all_metadatas,
ids=all_ids
)
logger.info("✅ 벡터 배치 저장 성공")
except Exception as save_error:
logger.error(f"❌ 벡터 저장 실패: {save_error}")
return {
"success": False,
"error": f"벡터 저장 실패: {str(save_error)}",
"statistics": stats
}
else:
logger.warning("⚠️ 저장할 벡터 데이터가 없음")
# 최종 통계
try:
total_vectors = self.collection.count()
logger.info(f"✅ Vector Store 구축 완료: 총 {total_vectors}개 벡터")
except Exception as count_error:
logger.warning(f"벡터 개수 확인 실패: {count_error}")
total_vectors = len(all_documents)
return {
"success": True,
"message": "Vector Store 구축 완료",
"statistics": stats,
"total_vectors": total_vectors,
"store_info": store_info
}
except Exception as e:
logger.error(f"Vector Store 구축 전체 실패: {e}")
return {
"success": False,
"error": str(e),
"statistics": stats if 'stats' in locals() else {
"total_processed": 0,
"newly_added": 0,
"updated": 0,
"duplicates": 0,
"errors": 1
}
}
def get_db_status(self) -> Dict[str, Any]:
"""Vector DB 상태 정보를 반환합니다."""
try:
if not self.is_ready():
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'not_ready',
'initialization_error': self.initialization_error
}
# 문서 개수 확인
try:
total_documents = self.collection.count()
except Exception as e:
logger.warning(f"문서 개수 확인 실패: {e}")
total_documents = 0
# 고유 가게 수 확인 (store_id 기준)
try:
# 모든 메타데이터에서 고유 store_id 추출
all_metadata = self.collection.get()
store_ids = set()
if all_metadata.get('metadatas'):
for metadata in all_metadata['metadatas']:
store_id = metadata.get('store_id')
if store_id:
store_ids.add(store_id)
total_stores = len(store_ids)
except Exception as e:
logger.warning(f"가게 수 확인 실패: {e}")
total_stores = 0
return {
'collection_name': self.collection_name,
'total_documents': total_documents,
'total_stores': total_stores,
'db_path': self.db_path,
'status': 'ready'
}
except Exception as e:
logger.error(f"DB 상태 확인 실패: {e}")
return {
'collection_name': self.collection_name,
'total_documents': 0,
'total_stores': 0,
'db_path': self.db_path,
'status': 'error',
'error': str(e)
}
def search_similar_cases(self, store_id: str, context: str) -> Optional[str]:
"""유사한 케이스를 검색합니다."""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 컨텍스트 기반 유사 검색
try:
# 검색 쿼리를 임베딩으로 변환
query_embedding = self.embedding_model.encode(context)
query_embedding = query_embedding.tolist()
# 유사한 문서 검색 (상위 5개)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=['documents', 'metadatas']
)
if not results or not results.get('documents') or not results['documents'][0]:
logger.info("유사 케이스를 찾을 수 없음")
return None
# 컨텍스트 조합
context_parts = []
documents = results['documents'][0]
metadatas = results.get('metadatas', [[]])[0]
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
# 가게 정보 추가
store_name = metadata.get('store_name', 'Unknown')
food_category = metadata.get('food_category', 'Unknown')
context_parts.append(f"[{food_category} - {store_name}]")
context_parts.append(doc[:500] + "..." if len(doc) > 500 else doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as search_error:
logger.warning(f"벡터 검색 실패: {search_error}")
return None
except Exception as e:
logger.error(f"유사 케이스 검색 실패: {e}")
return None