This commit is contained in:
hiondal
2025-06-16 07:08:09 +09:00
parent d0b59725df
commit b3aeb7f52a
11 changed files with 480 additions and 1820 deletions
+2 -236
View File
@@ -1,6 +1,7 @@
# app/services/claude_service.py
import json
import logging
import re
from typing import Optional, Dict, Any, Tuple
import anthropic
from ..config.settings import settings
@@ -108,8 +109,6 @@ class ClaudeService:
def _parse_json_response(self, raw_response: str) -> Optional[Dict[str, Any]]:
"""Claude의 원본 응답에서 JSON을 추출하고 파싱합니다."""
try:
import re
# JSON 블록 찾기
json_match = re.search(r'```json\s*\n(.*?)\n```', raw_response, re.DOTALL)
if json_match:
@@ -206,239 +205,6 @@ class ClaudeService:
# 호환성을 위한 메서드들 (기존 코드가 사용할 수 있도록)
# =============================================================================
async def get_recommendation(self, prompt: str) -> Optional[str]:
"""Claude API를 호출하여 추천을 받습니다. (호환성용)"""
if not self.is_ready():
logger.error("ClaudeService가 준비되지 않음")
return None
try:
logger.info("🤖 Claude API 호출 시작")
response = self.client.messages.create(
model=self.model,
max_tokens=4000,
temperature=0.7,
messages=[
{
"role": "user",
"content": prompt
}
]
)
if response.content and len(response.content) > 0:
raw_response = response.content[0].text
logger.info(f"✅ Claude API 응답 성공: {len(raw_response)} 문자")
return raw_response
else:
logger.warning("⚠️ Claude API 응답이 비어있음")
return None
except Exception as e:
logger.error(f"❌ Claude API 호출 실패: {e}")
return None
def parse_recommendation_response(self, raw_response: str) -> Optional[Dict[str, Any]]:
"""Claude 응답에서 JSON을 추출하고 파싱합니다. (호환성용)"""
return self._parse_json_response(raw_response)
def build_recommendation_prompt(self, store_id: str, context: str, vector_context: Optional[str] = None) -> str:
"""액션 추천용 프롬프트를 구성합니다. (호환성용)"""
prompt_parts = [
"당신은 소상공인을 위한 경영 컨설턴트입니다.",
f"가게 ID: {store_id}",
f"점주 요청: {context}"
]
if vector_context:
prompt_parts.extend([
"\n--- 동종 업체 분석 데이터 ---",
vector_context,
"--- 분석 데이터 끝 ---\n"
])
prompt_parts.extend([
"\n위 정보를 바탕으로 실질적이고 구체적인 액션 추천을 해주세요.",
"응답은 반드시 아래 JSON 형식으로만 작성해주세요:",
"",
"```json",
"{",
' "summary": {',
' "current_situation": "현재 상황 요약",',
' "key_insights": ["핵심 인사이트 1", "핵심 인사이트 2"],',
' "priority_areas": ["우선 개선 영역 1", "우선 개선 영역 2"]',
' },',
' "action_plans": {',
' "short_term": [',
' {',
' "title": "즉시 실행 가능한 액션",',
' "description": "구체적인 실행 방법",',
' "expected_impact": "예상 효과",',
' "timeline": "1-2주",',
' "cost": "예상 비용"',
' }',
' ],',
' "mid_term": [',
' {',
' "title": "중기 개선 방안",',
' "description": "구체적인 실행 방법",',
' "expected_impact": "예상 효과",',
' "timeline": "1-3개월",',
' "cost": "예상 비용"',
' }',
' ]',
' },',
' "implementation_tips": ["실행 팁 1", "실행 팁 2"]',
"}",
"```"
])
return "\n".join(prompt_parts)
def create_prompt_for_api_response(self, context: str, additional_context: Optional[str] = None) -> str:
"""API 응답용 프롬프트를 생성합니다. (호환성용)"""
return self._build_action_prompt(context, additional_context)
async def generate_action_recommendations_optimized(
self,
context: str,
additional_context: Optional[str] = None
) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
"""
최적화된 액션 추천 생성
- 더 명확한 JSON 지시사항
- 토큰 효율성 개선
- 파싱 안정성 향상
"""
if not self.is_ready():
return None, None
try:
# 최적화된 프롬프트 구성
prompt = self._build_optimized_prompt(context, additional_context)
response = self.client.messages.create(
model=self.model,
max_tokens=3000, # 토큰 수 최적화
temperature=0.3, # 일관성 향상
messages=[{"role": "user", "content": prompt}]
)
if response.content and len(response.content) > 0:
raw_response = response.content[0].text
# 즉시 JSON 파싱 시도
parsed_json = self._parse_json_response_enhanced(raw_response)
return raw_response, parsed_json
return None, None
except Exception as e:
logger.error(f"Claude AI 호출 실패: {e}")
return None, None
def _build_optimized_prompt(self, context: str, additional_context: Optional[str] = None) -> str:
"""최적화된 프롬프트 구성"""
prompt_parts = [
"당신은 소상공인 경영 컨설턴트입니다.",
f"분석 요청: {context}",
]
if additional_context:
prompt_parts.extend([
"\n=== 참고 데이터 ===",
additional_context,
"==================\n"
])
prompt_parts.extend([
"위 정보를 바탕으로 실행 가능한 액션을 추천해주세요.",
"",
"⚠️ 응답은 반드시 아래 JSON 형식으로만 작성하세요:",
"다른 텍스트는 포함하지 마세요.",
"",
"```json",
"{",
' "summary": {',
' "current_situation": "현재 상황 요약 (50자 이내)",',
' "key_insights": ["핵심 포인트1", "핵심 포인트2"],',
' "priority": "high|medium|low"',
' },',
' "actions": [',
' {',
' "title": "액션명",',
' "description": "구체적 실행방법",',
' "timeline": "실행기간",',
' "cost": "예상비용",',
' "impact": "예상효과"',
' }',
' ],',
' "quick_tips": ["즉시 실행 팁1", "즉시 실행 팁2"]',
"}",
"```"
])
return "\n".join(prompt_parts)
def _parse_json_response_enhanced(self, raw_response: str) -> Optional[Dict[str, Any]]:
"""향상된 JSON 파싱"""
try:
# 1. JSON 블록 추출
json_match = re.search(r'```json\s*(\{.*?\})\s*```', raw_response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# 2. 직접 JSON 찾기
json_match = re.search(r'(\{.*\})', raw_response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
return None
# 3. JSON 파싱
return json.loads(json_str)
except json.JSONDecodeError as e:
logger.error(f"JSON 파싱 오류: {e}")
return None
except Exception as e:
logger.error(f"JSON 추출 실패: {e}")
return None
# =============================================================================
# 헬스체크 및 상태 확인 메서드들
# =============================================================================
def get_health_status(self) -> Dict[str, Any]:
"""ClaudeService 상태 확인"""
try:
status = {
"service": "claude_api",
"status": "healthy" if self.is_ready() else "unhealthy",
"model": self.model,
"api_key_configured": bool(settings.CLAUDE_API_KEY and settings.CLAUDE_API_KEY.strip()),
"timestamp": self._get_timestamp()
}
if self.initialization_error:
status["initialization_error"] = self.initialization_error
status["status"] = "error"
return status
except Exception as e:
return {
"service": "claude_api",
"status": "error",
"error": str(e),
"timestamp": self._get_timestamp()
}
def _get_timestamp(self) -> str:
"""현재 시간 문자열 반환"""
from datetime import datetime
return datetime.now().isoformat()
return self._parse_json_response(raw_response)
+86 -90
View File
@@ -1,4 +1,4 @@
# app/services/restaurant_service.py (수정된 버전)
# app/services/restaurant_service.py
import aiohttp
import asyncio
import logging
@@ -70,6 +70,87 @@ class RestaurantService:
logger.error(f"가게 검색 중 오류: {str(e)}")
return None
async def find_similar_stores(self, region: str, food_category: str, max_count: int = 50) -> List[RestaurantInfo]:
"""
동종 업체를 찾습니다.
Args:
region: 지역
food_category: 음식 카테고리
max_count: 최대 검색 개수
Returns:
동종 업체 목록
"""
try:
logger.info(f"동종 업체 검색 시작: region={region}, food_category={food_category}")
# 검색 쿼리 생성 (음식 카테고리만 포함)
search_query = self._clean_food_category(food_category)
logger.info(f"음식점 수집 요청: query='{search_query}' region='{region}'")
similar_stores = []
async with aiohttp.ClientSession(timeout=self.timeout) as session:
# 페이지별로 검색 (최대 5페이지)
max_pages = min(5, (max_count // 15) + 1)
for page in range(1, max_pages + 1):
if len(similar_stores) >= max_count:
break
url = f"{self.base_url}/collect"
payload = {
"query": search_query, # 음식 카테고리만 포함
"region": region, # 지역 정보는 별도 파라미터
"size": 15,
"pages": 1, # 페이지별로 하나씩 호출
"save_to_file": False
}
logger.info(f"동종 업체 검색 ({page}/{max_pages}): {url}")
try:
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.json()
restaurants = data.get('restaurants', [])
for restaurant_data in restaurants:
if len(similar_stores) >= max_count:
break
try:
restaurant = RestaurantInfo(**restaurant_data)
# 카테고리 유사성 검사
if self._is_similar_category(food_category, restaurant.category_name):
similar_stores.append(restaurant)
except Exception as e:
logger.warning(f"음식점 데이터 파싱 실패: {e}")
continue
logger.info(f"페이지 {page} 완료: {len(restaurants)}개 발견, 총 {len(similar_stores)}개 수집")
else:
logger.warning(f"페이지 {page} API 호출 실패: HTTP {response.status}")
break
except Exception as e:
logger.warning(f"페이지 {page} 처리 실패: {e}")
continue
# API 호출 간격 조절
await asyncio.sleep(settings.REQUEST_DELAY)
logger.info(f"동종 업체 검색 완료: {len(similar_stores)}개 발견")
return similar_stores
except Exception as e:
logger.error(f"동종 업체 검색 중 오류: {str(e)}")
return []
def _clean_food_category(self, food_category: str) -> str:
"""
음식 카테고리를 정리하여 검색 키워드로 변환합니다.
@@ -99,96 +180,12 @@ class RestaurantService:
if not keywords:
return "음식점"
# 🔧 지역 정보는 포함하지 않고 음식 카테고리만 반환
# 지역 정보는 포함하지 않고 음식 카테고리만 반환
return ' '.join(keywords)
async def find_similar_stores(self, region: str, food_category: str, max_count: int = 50) -> List[RestaurantInfo]:
def _is_similar_category(self, target_category: str, restaurant_category: str) -> bool:
"""
동종 업체를 찾습니다.
Args:
region: 지역
food_category: 음식 카테고리
max_count: 최대 검색 개수
Returns:
동종 업체 목록
"""
try:
logger.info(f"동종 업체 검색 시작: region={region}, food_category={food_category}")
# 🔧 검색 쿼리 생성 (음식 카테고리만 포함)
search_query = self._clean_food_category(food_category)
logger.info(f"음식점 수집 요청: query='{search_query}' region='{region}' size=15 pages=1 save_to_file=False")
similar_stores = []
async with aiohttp.ClientSession(timeout=self.timeout) as session:
# 페이지별로 검색 (최대 5페이지)
max_pages = min(5, (max_count // 15) + 1)
for page in range(1, max_pages + 1):
if len(similar_stores) >= max_count:
break
url = f"{self.base_url}/collect"
# 🔧 수정된 payload - query에는 음식 카테고리만, region은 분리
payload = {
"query": search_query, # 🔧 음식 카테고리만 포함
"region": region, # 🔧 지역 정보는 별도 파라미터
"size": 15,
"pages": 1, # 페이지별로 하나씩 호출
"save_to_file": False
}
logger.info(f"동종 업체 검색 페이지 {page}: query='{search_query}' region='{region}'")
try:
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.json()
restaurants = data.get('restaurants', [])
for restaurant_data in restaurants:
if len(similar_stores) >= max_count:
break
try:
restaurant = RestaurantInfo(**restaurant_data)
# 카테고리 필터링
restaurant_category = extract_food_category(restaurant.category_name)
if self._is_similar_food_category(food_category, restaurant_category):
similar_stores.append(restaurant)
logger.debug(f"유사 카테고리 매치: {restaurant.place_name} ({restaurant_category})")
except Exception as e:
logger.warning(f"음식점 데이터 파싱 실패: {e}")
continue
logger.info(f"페이지 {page} 완료: {len(restaurants)}개 음식점 수집")
else:
logger.warning(f"동종 업체 검색 실패 (페이지 {page}): HTTP {response.status}")
continue
except Exception as e:
logger.warning(f"페이지 {page} 검색 중 오류: {e}")
continue
# API 요청 제한을 위한 지연
await asyncio.sleep(settings.REQUEST_DELAY)
logger.info(f"동종 업체 검색 완료: 총 {len(similar_stores)}")
return similar_stores
except Exception as e:
logger.error(f"동종 업체 검색 중 오류: {str(e)}")
return []
def _is_similar_food_category(self, target_category: str, restaurant_category: str) -> bool:
"""
음식 카테고리가 유사한지 확인합니다.
두 카테고리가 유사한지 판단합니다.
Args:
target_category: 대상 카테고리
@@ -231,5 +228,4 @@ class RestaurantService:
return response.status == 200
except Exception as e:
logger.error(f"Restaurant API 헬스체크 실패: {e}")
return False
return False
+102 -358
View File
@@ -1,4 +1,4 @@
# vector/app/services/review_service.py
# app/services/review_service.py
import aiohttp
import asyncio
import logging
@@ -10,7 +10,7 @@ from ..models.restaurant_models import RestaurantInfo
logger = logging.getLogger(__name__)
class ReviewService:
"""리뷰 API 연동 서비스 (본인 가게 우선 처리 강화)"""
"""리뷰 API 연동 서비스"""
def __init__(self):
self.base_url = settings.get_review_api_url()
@@ -18,7 +18,7 @@ class ReviewService:
async def collect_store_reviews(self, store_id: str, max_reviews: int = 100) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]:
"""
단일 가게의 리뷰를 수집합니다. (본인 가게용 강화 처리)
단일 가게의 리뷰를 수집합니다.
Args:
store_id: 카카오맵 가게 ID
@@ -66,15 +66,9 @@ class ReviewService:
converted_store_info = self._convert_store_info(store_info)
converted_reviews = self._convert_reviews(filtered_reviews)
if converted_store_info and converted_reviews:
logger.info(f"✅ 리뷰 수집 성공: {len(converted_reviews)}")
return converted_store_info, converted_reviews
else:
logger.warning("⚠️ 변환된 데이터가 비어있음")
return None, []
return converted_store_info, converted_reviews
else:
error_msg = data.get('message', 'Unknown error')
logger.error(f"❌ 리뷰 분석 실패: {error_msg}")
logger.warning(f"⚠️ 리뷰 수집 실패: {data.get('message', 'Unknown error')}")
return None, []
else:
logger.error(f"❌ Review API 호출 실패: HTTP {response.status}")
@@ -83,313 +77,123 @@ class ReviewService:
return None, []
except asyncio.TimeoutError:
logger.error(" Review API 호출 타임아웃")
logger.error(" Review API 호출 타임아웃")
return None, []
except Exception as e:
logger.error(f"❌ 리뷰 수집 중 오류: {str(e)}")
return None, []
def _filter_quality_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""리뷰 품질 필터링"""
try:
filtered = []
"""
리뷰 품질을 기준으로 필터링합니다.
Args:
reviews: 원본 리뷰 목록
for review in reviews:
content = review.get('content', '').strip()
rating = review.get('rating', 0)
# 품질 기준
if (len(content) >= 10 and # 최소 10자 이상
rating > 0 and # 별점이 있어야 함
not self._is_spam_review(content)): # 스팸 제외
filtered.append(review)
Returns:
필터링된 리뷰 목록
"""
filtered_reviews = []
for review in reviews:
content = review.get('content', '').strip()
logger.debug(f"품질 필터링: {len(reviews)}{len(filtered)}")
return filtered
except Exception as e:
logger.warning(f"⚠️ 리뷰 필터링 실패: {e}")
return reviews # 실패 시 원본 반환
# 품질 기준
if (
len(content) >= 10 and # 최소 10자 이상
not self._is_spam_content(content) and # 스팸 내용이 아님
review.get('rating', 0) > 0 # 별점이 있음
):
filtered_reviews.append(review)
return filtered_reviews
def _is_spam_review(self, content: str) -> bool:
"""스팸 리뷰 판별"""
try:
spam_keywords = [
"추천추천", "최고최고", "맛있어요맛있어요",
"좋아요좋아요", "ㅎㅎㅎㅎ", "ㅋㅋㅋㅋ",
"굿굿굿", "Nice", "Good"
]
def _is_spam_content(self, content: str) -> bool:
"""
스팸 또는 의미없는 리뷰인지 판단합니다.
Args:
content: 리뷰 내용
content_lower = content.lower()
# 스팸 키워드 확인
for keyword in spam_keywords:
if keyword.lower() in content_lower:
return True
# 너무 짧거나 반복 문자 확인
if len(set(content.replace(' ', ''))) < 3: # 고유 문자 3개 미만
Returns:
스팸 여부
"""
spam_patterns = [
'ㅋㅋㅋㅋㅋ',
'ㅎㅎㅎㅎㅎ',
'!!!!!',
'.....',
'???',
'^^',
'굿굿굿',
'최고최고'
]
content_lower = content.lower()
# 스팸 패턴 체크
for pattern in spam_patterns:
if pattern in content_lower:
return True
return False
except Exception as e:
logger.warning(f"⚠️ 스팸 판별 실패: {e}")
return False
# 너무 반복적인 문자 체크
if len(set(content)) < 3: # 고유 문자가 3개 미만
return True
return False
def _convert_store_info(self, store_info):
"""가게 정보 변환 (강화된 버전)"""
def _convert_store_info(self, store_info: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""
API 응답의 가게 정보를 내부 형식으로 변환합니다.
Args:
store_info: API 응답 가게 정보
Returns:
변환된 가게 정보
"""
if not store_info:
logger.warning("⚠️ 가게 정보가 비어있음")
return None
try:
converted = {
'id': str(store_info.get('id', '')),
'name': str(store_info.get('name', '')),
'category': str(store_info.get('category', '')),
'rating': str(store_info.get('rating', '')),
'review_count': str(store_info.get('review_count', '')),
'status': str(store_info.get('status', '')),
'address': str(store_info.get('address', ''))
}
# 필수 필드 확인 (ID와 이름은 반드시 있어야 함)
if not converted['id'] or not converted['name']:
logger.warning(f"⚠️ 필수 가게 정보 누락: ID={converted['id']}, Name={converted['name']}")
return None
logger.debug(f"가게 정보 변환 성공: {converted['name']}")
return converted
except Exception as e:
logger.error(f"❌ 가게 정보 변환 실패: {e}")
return None
return {
'id': store_info.get('id', ''),
'name': store_info.get('name', ''),
'category': store_info.get('category', ''),
'rating': store_info.get('rating', ''),
'review_count': store_info.get('review_count', ''),
'status': store_info.get('status', ''),
'address': store_info.get('address', '')
}
def _convert_reviews(self, reviews):
"""리뷰 목록 변환 (강화된 버전)"""
if not reviews:
logger.warning("⚠️ 리뷰 목록이 비어있음")
return []
def _convert_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
API 응답의 리뷰를 내부 형식으로 변환합니다.
Args:
reviews: API 응답 리뷰 목록
Returns:
변환된 리뷰 목록
"""
converted_reviews = []
for i, review in enumerate(reviews):
try:
# 안전한 형변환
rating = 0
try:
rating = int(review.get('rating', 0))
except (ValueError, TypeError):
rating = 0
likes = 0
try:
likes = int(review.get('likes', 0))
except (ValueError, TypeError):
likes = 0
photo_count = 0
try:
photo_count = int(review.get('photo_count', 0))
except (ValueError, TypeError):
photo_count = 0
converted_review = {
'reviewer_name': str(review.get('reviewer_name', 'Anonymous')),
'rating': rating,
'date': str(review.get('date', '')),
'content': str(review.get('content', '')).strip(),
'badges': list(review.get('badges', [])),
'likes': likes,
'photo_count': photo_count,
'has_photos': bool(photo_count > 0)
}
# 기본 검증
if converted_review['content'] and converted_review['rating'] > 0:
converted_reviews.append(converted_review)
else:
logger.debug(f"⚠️ 리뷰 {i+1} 품질 미달로 제외")
except Exception as e:
logger.warning(f"⚠️ 리뷰 {i+1} 변환 실패: {e}")
continue
for review in reviews:
converted_review = {
'reviewer_name': review.get('reviewer_name', ''),
'reviewer_level': review.get('reviewer_level', ''),
'reviewer_stats': review.get('reviewer_stats', {}),
'rating': review.get('rating', 0),
'date': review.get('date', ''),
'content': review.get('content', ''),
'badges': review.get('badges', []),
'likes': review.get('likes', 0),
'photo_count': review.get('photo_count', 0),
'has_photos': review.get('has_photos', False)
}
converted_reviews.append(converted_review)
logger.info(f"리뷰 변환 완료: {len(reviews)}{len(converted_reviews)}")
return converted_reviews
async def collect_multiple_stores_reviews(self, restaurants: List[RestaurantInfo]) -> List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]]:
"""
여러 가게의 리뷰를 수집합니다.
Args:
restaurants: 음식점 목록
Returns:
(store_id, 가게 정보, 리뷰 목록) 튜플의 리스트
"""
try:
logger.info(f"다중 가게 리뷰 수집 시작: {len(restaurants)}개 가게")
results = []
# 동시성 제한을 위해 세마포어 사용
semaphore = asyncio.Semaphore(3) # 최대 3개 동시 요청
async def collect_single_store(restaurant: RestaurantInfo):
async with semaphore:
try:
# 카카오맵 place_url에서 store_id 추출 시도
store_id = self._extract_store_id_from_url(restaurant.place_url)
if not store_id:
# URL에서 추출 실패 시 restaurant.id 사용
store_id = restaurant.id
if not store_id:
logger.warning(f"Store ID를 찾을 수 없음: {restaurant.place_name}")
return None
logger.info(f"가게 {restaurant.place_name} 리뷰 수집 중...")
# 리뷰 수집 (최대 50개로 제한)
store_info, reviews = await self.collect_store_reviews(
store_id,
max_reviews=settings.MAX_REVIEWS_PER_RESTAURANT
)
if store_info and reviews:
return (store_id, store_info, reviews)
else:
logger.warning(f"가게 {restaurant.place_name}의 리뷰 수집 실패")
return None
except Exception as e:
logger.error(f"가게 {restaurant.place_name} 리뷰 수집 중 오류: {e}")
return None
finally:
# API 요청 제한을 위한 지연
await asyncio.sleep(settings.REQUEST_DELAY)
# 병렬 처리
tasks = [collect_single_store(restaurant) for restaurant in restaurants]
results_raw = await asyncio.gather(*tasks, return_exceptions=True)
# 성공한 결과만 필터링
for result in results_raw:
if result and not isinstance(result, Exception):
results.append(result)
logger.info(f"다중 가게 리뷰 수집 완료: {len(results)}개 성공")
return results
except Exception as e:
logger.error(f"다중 가게 리뷰 수집 중 오류: {str(e)}")
return []
def _extract_store_id_from_url(self, place_url: str) -> Optional[str]:
"""
카카오맵 URL에서 store_id를 추출합니다.
Args:
place_url: 카카오맵 장소 URL
Returns:
추출된 store_id 또는 None
"""
try:
if not place_url:
return None
# URL 패턴: https://place.map.kakao.com/123456789
import re
pattern = r'/(\d+)(?:\?|$|#)'
match = re.search(pattern, place_url)
if match:
store_id = match.group(1)
logger.debug(f"URL에서 store_id 추출: {store_id}")
return store_id
else:
logger.debug(f"URL에서 store_id 추출 실패: {place_url}")
return None
except Exception as e:
logger.warning(f"store_id 추출 중 오류: {e}")
return None
def _build_store_info_from_restaurant(self, restaurant: RestaurantInfo) -> Dict[str, Any]:
"""
RestaurantInfo를 store_info 형식으로 변환합니다.
Args:
restaurant: RestaurantInfo 객체
Returns:
store_info 딕셔너리
"""
try:
return {
'id': restaurant.id,
'name': restaurant.place_name,
'category': restaurant.category_name,
'rating': '', # API에서 제공되지 않음
'review_count': '', # API에서 제공되지 않음
'status': '', # API에서 제공되지 않음
'address': restaurant.address_name
}
except Exception as e:
logger.error(f"RestaurantInfo 변환 실패: {e}")
return {
'id': '',
'name': '',
'category': '',
'rating': '',
'review_count': '',
'status': '',
'address': ''
}
def _convert_single_review_data(self, review_data: dict) -> dict:
"""
단일 리뷰 데이터 변환
Args:
review_data: API 응답의 단일 리뷰 데이터
Returns:
변환된 리뷰 데이터
"""
try:
return {
'reviewer_name': review_data.get('reviewer_name', ''),
'reviewer_level': review_data.get('reviewer_level', ''),
'reviewer_stats': review_data.get('reviewer_stats', {}),
'rating': int(review_data.get('rating', 0)),
'date': review_data.get('date', ''),
'content': review_data.get('content', ''),
'badges': review_data.get('badges', []),
'likes': int(review_data.get('likes', 0)),
'photo_count': int(review_data.get('photo_count', 0)),
'has_photos': bool(review_data.get('has_photos', False))
}
except Exception as e:
logger.warning(f"단일 리뷰 데이터 변환 실패: {e}")
return {
'reviewer_name': 'Unknown',
'reviewer_level': '',
'reviewer_stats': {},
'rating': 0,
'date': '',
'content': '',
'badges': [],
'likes': 0,
'photo_count': 0,
'has_photos': False
}
async def health_check(self) -> bool:
"""
Review API 상태를 확인합니다.
@@ -401,67 +205,7 @@ class ReviewService:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
url = f"{self.base_url}/health"
async with session.get(url) as response:
is_healthy = response.status == 200
if is_healthy:
logger.debug("Review API 헬스체크 성공")
else:
logger.warning(f"Review API 헬스체크 실패: HTTP {response.status}")
return is_healthy
return response.status == 200
except Exception as e:
logger.error(f"Review API 헬스체크 실패: {e}")
return False
def get_api_info(self) -> Dict[str, Any]:
"""
Review API 정보를 반환합니다.
Returns:
API 정보 딕셔너리
"""
return {
"service_name": "Review API Service",
"base_url": self.base_url,
"timeout": self.timeout.total,
"max_reviews_per_restaurant": settings.MAX_REVIEWS_PER_RESTAURANT,
"request_delay": settings.REQUEST_DELAY
}
async def test_connection(self) -> Dict[str, Any]:
"""
Review API 연결을 테스트합니다.
Returns:
테스트 결과
"""
test_result = {
"service": "Review API",
"base_url": self.base_url,
"status": "unknown",
"response_time": None,
"error": None
}
try:
import time
start_time = time.time()
is_healthy = await self.health_check()
response_time = time.time() - start_time
test_result["response_time"] = round(response_time, 3)
if is_healthy:
test_result["status"] = "healthy"
logger.info(f"Review API 연결 테스트 성공: {response_time:.3f}")
else:
test_result["status"] = "unhealthy"
test_result["error"] = "Health check failed"
logger.warning("Review API 연결 테스트 실패: 헬스체크 실패")
except Exception as e:
test_result["status"] = "error"
test_result["error"] = str(e)
logger.error(f"Review API 연결 테스트 오류: {e}")
return test_result
return False
+152 -680
View File
@@ -1,10 +1,8 @@
# app/services/vector_service.py (개선된 버전)
# app/services/vector_service.py
import os
import json
import logging
import time
import shutil
import signal
import tempfile
from datetime import datetime
from typing import List, Dict, Any, Optional, Tuple
import chromadb
@@ -19,7 +17,7 @@ from ..utils.data_utils import (
logger = logging.getLogger(__name__)
class VectorService:
"""Vector DB 서비스 (개선된 초기화 로직)"""
"""Vector DB 서비스"""
def __init__(self):
self.db_path = settings.VECTOR_DB_PATH
@@ -36,15 +34,15 @@ class VectorService:
self._safe_initialize()
def _safe_initialize(self):
"""안전한 초기화 - 개선된 로직"""
"""안전한 초기화"""
try:
logger.info("🔧 VectorService 초기화 시작...")
# 1단계: 디렉토리 권한 확인
self._ensure_directory_permissions()
# 2단계: ChromaDB 초기화 (호환성 확인 포함)
self._initialize_chromadb_with_compatibility_check()
# 2단계: ChromaDB 초기화
self._initialize_chromadb()
# 3단계: 임베딩 모델 로드
self._initialize_embedding_model()
@@ -57,7 +55,7 @@ class VectorService:
logger.info("🔄 서비스는 런타임에 재시도 가능합니다")
def _ensure_directory_permissions(self):
"""Vector DB 디렉토리 권한을 확인하고 생성합니다"""
"""Vector DB 디렉토리 권한을 확인하고 생성"""
try:
logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}")
@@ -78,7 +76,6 @@ class VectorService:
except Exception as chmod_error:
logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}")
# 임시 디렉토리로 대체
import tempfile
temp_dir = tempfile.mkdtemp(prefix="vectordb_")
logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}")
self.db_path = temp_dir
@@ -93,535 +90,175 @@ class VectorService:
logger.info("✅ 디렉토리 권한 확인 완료")
except Exception as test_error:
raise Exception(f"디렉토리 권한 테스트 실패: {test_error}")
except Exception as e:
logger.error(f"디렉토리 설정 실패: {e}")
raise
raise Exception(f"디렉토리 설정 실패: {e}")
def _initialize_chromadb_with_compatibility_check(self):
"""ChromaDB 초기화 (호환성 확인 포함)"""
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
logger.info(f"🔄 ChromaDB 초기화 시도 {attempt + 1}/{max_retries}")
# 1단계: 기존 DB 호환성 확인
existing_db_valid = self._check_existing_db_compatibility()
# 2단계: ChromaDB 클라이언트 생성
self._create_chromadb_client()
# 3단계: 컬렉션 초기화
self._initialize_collection(existing_db_valid)
logger.info("✅ ChromaDB 초기화 완료")
return # 성공 시 루프 종료
except Exception as e:
logger.error(f"❌ ChromaDB 초기화 실패 (시도 {attempt + 1}): {e}")
if attempt < max_retries - 1:
logger.info(f"🔄 {retry_delay}초 후 재시도...")
time.sleep(retry_delay)
retry_delay *= 2 # 지수 백오프
else:
raise Exception(f"ChromaDB 초기화 최종 실패: {e}")
def _check_existing_db_compatibility(self):
"""기존 DB 호환성 확인"""
def _initialize_chromadb(self):
"""ChromaDB 초기화"""
try:
if not os.path.exists(self.db_path):
logger.info("📁 새 DB 디렉토리 - 스키마 확인 불필요")
return False
db_files = [f for f in os.listdir(self.db_path) if not f.startswith('.')]
if not db_files:
logger.info("📁 빈 DB 디렉토리 - 스키마 확인 불필요")
return False
logger.info(f"📁 기존 DB 파일 발견: {db_files}")
# 실제 호환성 테스트
logger.info("🔍 기존 DB 호환성 테스트 중...")
try:
# 테스트용 클라이언트 생성
test_client = chromadb.PersistentClient(path=self.db_path)
test_client.heartbeat()
# 컬렉션 접근 시도
try:
test_collection = test_client.get_collection(name=self.collection_name)
count = test_collection.count()
logger.info(f"✅ 기존 DB 호환성 확인 완료: {count}개 벡터 존재")
return True
except Exception as collection_error:
logger.info(f"📝 기존 컬렉션 없음 (정상): {collection_error}")
return True # DB는 정상, 컬렉션만 새로 생성하면 됨
except Exception as compatibility_error:
# 실제 호환성 문제 발견
logger.warning(f"⚠️ 실제 호환성 문제 발견: {compatibility_error}")
self._backup_incompatible_db()
return False
except Exception as e:
logger.warning(f"⚠️ 호환성 확인 중 오류: {e}")
return False
def _backup_incompatible_db(self):
"""호환성 문제가 있는 DB 백업"""
try:
backup_path = f"{self.db_path}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
logger.warning(f"🔄 호환성 문제로 기존 DB 백업: {backup_path}")
shutil.move(self.db_path, backup_path)
logger.info(f"✅ 기존 DB 백업 완료: {backup_path}")
# 새 디렉토리 생성
os.makedirs(self.db_path, exist_ok=True)
# 오래된 백업 정리 (7일 이상)
self._cleanup_old_backups()
except Exception as backup_error:
logger.warning(f"⚠️ 백업 실패, 강제 삭제 진행: {backup_error}")
shutil.rmtree(self.db_path, ignore_errors=True)
os.makedirs(self.db_path, exist_ok=True)
def _cleanup_old_backups(self):
"""오래된 백업 파일 정리 (7일 이상)"""
try:
base_path = os.path.dirname(self.db_path)
backup_pattern = f"{os.path.basename(self.db_path)}_backup_"
cutoff_time = time.time() - (7 * 24 * 3600) # 7일 전
for item in os.listdir(base_path):
if item.startswith(backup_pattern):
backup_path = os.path.join(base_path, item)
if os.path.isdir(backup_path) and os.path.getctime(backup_path) < cutoff_time:
shutil.rmtree(backup_path, ignore_errors=True)
logger.info(f"🗑️ 오래된 백업 삭제: {backup_path}")
except Exception as e:
logger.warning(f"⚠️ 백업 정리 중 오류: {e}")
def _create_chromadb_client(self):
"""ChromaDB 클라이언트 생성"""
try:
# 최신 버전 호환 설정
chroma_settings = ChromaSettings(
anonymized_telemetry=False,
allow_reset=True,
is_persistent=True
)
logger.info("🔧 ChromaDB 클라이언트 초기화...")
# ChromaDB 클라이언트 생성
self.client = chromadb.PersistentClient(
path=self.db_path,
settings=chroma_settings
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True,
)
)
logger.info("✅ ChromaDB 클라이언트 생성 성공")
except Exception as modern_error:
logger.warning(f"⚠️ 최신 설정 실패, 간단한 설정으로 재시도: {modern_error}")
# Collection 가져오기 또는 생성
try:
self.collection = self.client.get_collection(name=self.collection_name)
logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}")
except Exception:
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}")
# 간단한 설정으로 재시도
self.client = chromadb.PersistentClient(path=self.db_path)
logger.info("✅ ChromaDB 간단 설정 클라이언트 생성 성공")
# 연결 테스트
try:
self.client.heartbeat()
logger.info("✅ ChromaDB 연결 테스트 성공")
except Exception as heartbeat_error:
logger.warning(f"⚠️ Heartbeat 실패 (무시): {heartbeat_error}")
def _initialize_collection(self, existing_db_valid: bool):
"""컬렉션 초기화"""
try:
if existing_db_valid:
# 기존 컬렉션 로드 시도
try:
self.collection = self.client.get_collection(name=self.collection_name)
count = self.collection.count()
logger.info(f"✅ 기존 컬렉션 로드 성공: {self.collection_name} ({count}개 벡터)")
return
except Exception as get_error:
logger.info(f"📝 기존 컬렉션 없음, 새로 생성: {get_error}")
# 새 컬렉션 생성
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={
"description": "Restaurant reviews vector store",
"created_at": datetime.now().isoformat(),
"version": "1.0"
}
)
logger.info(f"✅ 새 컬렉션 생성 성공: {self.collection_name}")
except Exception as create_error:
logger.error(f"❌ 컬렉션 초기화 실패: {create_error}")
# 대체 컬렉션명으로 재시도
fallback_name = f"{self.collection_name}_{int(time.time())}"
logger.info(f"🔄 대체 컬렉션명으로 재시도: {fallback_name}")
self.collection = self.client.create_collection(
name=fallback_name,
metadata={"description": "Restaurant reviews (fallback)"}
)
self.collection_name = fallback_name
logger.info(f"✅ 대체 컬렉션 생성 성공: {fallback_name}")
except Exception as e:
raise Exception(f"ChromaDB 초기화 실패: {e}")
def _initialize_embedding_model(self):
"""임베딩 모델 초기화"""
try:
logger.info(f"🤖 임베딩 모델 로드 시작: {self.embedding_model_name}")
# 캐시 디렉토리 설정
cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "sentence_transformers")
os.makedirs(cache_dir, exist_ok=True)
# 권한 확인
if not os.access(cache_dir, os.W_OK):
import tempfile
cache_dir = tempfile.mkdtemp(prefix="st_cache_")
logger.info(f"🔄 임시 캐시 디렉토리 사용: {cache_dir}")
# 모델 로드 (타임아웃 설정)
def timeout_handler(signum, frame):
raise TimeoutError("임베딩 모델 로드 타임아웃")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(300) # 5분 타임아웃
try:
self.embedding_model = SentenceTransformer(
self.embedding_model_name,
cache_folder=cache_dir,
device='cpu' # CPU 사용 명시
)
signal.alarm(0) # 타임아웃 해제
# 모델 테스트
test_embedding = self.embedding_model.encode(["테스트 문장"])
logger.info(f"✅ 임베딩 모델 로드 성공: {test_embedding.shape}")
except TimeoutError:
signal.alarm(0)
raise Exception("임베딩 모델 로드 타임아웃 (5분)")
logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}")
self.embedding_model = SentenceTransformer(self.embedding_model_name)
logger.info("✅ 임베딩 모델 로드 완료")
except Exception as e:
logger.error(f"임베딩 모델 로드 실패: {e}")
raise
raise Exception(f"임베딩 모델 로드 실패: {e}")
def is_ready(self) -> bool:
"""서비스 준비 상태 확인"""
return (
self.client is not None and
self.collection is not None and
self.embedding_model is not None and
return all([
self.client is not None,
self.collection is not None,
self.embedding_model is not None,
self.initialization_error is None
)
])
def get_initialization_error(self) -> Optional[str]:
"""초기화 에러 메시지 반환"""
return self.initialization_error
def retry_initialization(self) -> bool:
"""초기화 재시도"""
async def build_vector_store(
self,
target_store_info: Dict[str, Any],
review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]],
food_category: str,
region: str
) -> Dict[str, Any]:
"""Vector Store를 구축합니다"""
if not self.is_ready():
raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}")
try:
logger.info("🔄 VectorService 초기화 재시도...")
logger.info("🚀 Vector Store 구축 시작")
# 상태 초기화
self.client = None
self.collection = None
self.embedding_model = None
self.initialization_error = None
processed_count = 0
documents = []
embeddings = []
metadatas = []
ids = []
# 재초기화
self._safe_initialize()
return self.is_ready()
except Exception as e:
self.initialization_error = str(e)
logger.error(f"❌ 초기화 재시도 실패: {e}")
return False
def reset_vector_db(self) -> Dict[str, Any]:
"""Vector DB 완전 리셋"""
try:
logger.info("🔄 Vector DB 완전 리셋 시작...")
# 기존 클라이언트 정리
self.client = None
self.collection = None
# DB 디렉토리 완전 삭제
if os.path.exists(self.db_path):
shutil.rmtree(self.db_path, ignore_errors=True)
logger.info(f"✅ 기존 DB 디렉토리 삭제: {self.db_path}")
# 새 디렉토리 생성
os.makedirs(self.db_path, exist_ok=True)
logger.info(f"✅ 새 DB 디렉토리 생성: {self.db_path}")
# 재초기화
success = self.retry_initialization()
if success:
return {
"success": True,
"message": "Vector DB가 성공적으로 리셋되었습니다",
"collection_name": self.collection_name,
"db_path": self.db_path
}
else:
return {
"success": False,
"error": self.initialization_error or "재초기화 실패"
}
except Exception as e:
logger.error(f"❌ Vector DB 리셋 실패: {e}")
return {
"success": False,
"error": str(e)
}
def get_health_status(self) -> Dict[str, Any]:
"""서비스 상태 확인"""
try:
status = {
"service": "vector_db",
"status": "healthy" if self.is_ready() else "unhealthy",
"db_path": self.db_path,
"collection_name": self.collection_name,
"embedding_model": self.embedding_model_name,
"timestamp": datetime.now().isoformat()
}
if self.initialization_error:
status["initialization_error"] = self.initialization_error
status["status"] = "error"
# 상세 상태
status["components"] = {
"client": "connected" if self.client else "disconnected",
"collection": "ready" if self.collection else "not_ready",
"embedding": "loaded" if self.embedding_model else "not_loaded"
}
# 컬렉션 정보
if self.collection:
for store_id, store_info, reviews in review_results:
try:
status["collection_count"] = self.collection.count()
# 텍스트 추출 및 임베딩 생성
text_for_embedding = extract_text_for_embedding(store_info, reviews)
embedding = self.embedding_model.encode(text_for_embedding)
# 메타데이터 생성
metadata = create_metadata(store_info, food_category, region)
# 문서 ID 생성
document_id = create_store_hash(store_id, store_info.get('name', ''), region)
# 문서 데이터 생성
document_text = combine_store_and_reviews(store_info, reviews)
documents.append(document_text)
embeddings.append(embedding.tolist())
metadatas.append(metadata)
ids.append(document_id)
processed_count += 1
except Exception as e:
status["collection_error"] = str(e)
return status
except Exception as e:
return {
"service": "vector_db",
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
def get_store_context(self, store_id: str) -> Optional[str]:
"""스토어 ID로 컨텍스트 조회"""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 스토어 ID로 검색
results = self.collection.get(
where={"store_id": store_id}
)
if not results or not results.get('documents'):
logger.warning(f"스토어 ID '{store_id}'에 대한 데이터 없음")
return None
# 컨텍스트 생성
documents = results['documents']
metadatas = results.get('metadatas', [])
context_parts = []
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
# 메타데이터 정보 추가
if metadata:
context_parts.append(f"[{metadata.get('store_name', 'Unknown')}]")
context_parts.append(doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"스토어 컨텍스트 조회 실패: {e}")
return None
async def build_vector_store(self, store_info: Dict[str, Any], similar_stores_data: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str) -> Dict[str, Any]:
"""Vector Store 구축 (완전 수정된 버전)"""
try:
if not self.is_ready():
# 재시도 한 번 더
if not self.retry_initialization():
raise Exception("Vector DB가 초기화되지 않았습니다")
logger.info(f"Vector Store 구축 시작: {len(similar_stores_data)}개 스토어")
# 통계 초기화
stats = {
"total_processed": 0,
"newly_added": 0,
"updated": 0,
"duplicates": 0,
"errors": 0
}
# 배치 처리용 리스트
all_documents = []
all_embeddings = []
all_metadatas = []
all_ids = []
# 각 스토어 처리
for store_id, store_data, reviews in similar_stores_data:
try:
# 데이터 검증
if not store_data or not reviews:
logger.warning(f"스토어 '{store_id}' 데이터 부족: store_data={bool(store_data)}, reviews={len(reviews) if reviews else 0}")
stats["errors"] += 1
continue
# 올바른 create_store_hash 호출
store_hash = create_store_hash(
store_id=store_id,
store_name=store_data.get('place_name', ''),
region=region
)
# ChromaDB에서 직접 중복 확인 (is_duplicate_store 함수 사용하지 않음)
try:
# 같은 store_id로 이미 저장된 데이터가 있는지 확인
existing_data = self.collection.get(
where={"store_id": store_id},
limit=1
)
if existing_data and len(existing_data.get('ids', [])) > 0:
logger.debug(f"중복 스토어 건너뛰기: {store_id}")
stats["duplicates"] += 1
continue
except Exception as dup_check_error:
# 중복 확인 실패는 로그만 남기고 계속 진행
logger.warning(f"중복 확인 실패 (계속 진행): {dup_check_error}")
# 올바른 extract_text_for_embedding 호출
embedding_text = extract_text_for_embedding(
store_info=store_data,
reviews=reviews
)
# 임베딩 생성
try:
embedding = self.embedding_model.encode(embedding_text)
embedding = embedding.tolist() # numpy array를 list로 변환
except Exception as embed_error:
logger.error(f"임베딩 생성 실패 (store_id: {store_id}): {embed_error}")
stats["errors"] += 1
continue
# 올바른 create_metadata 호출
metadata = create_metadata(
store_info=store_data,
food_category=food_category,
region=region
)
# 배치에 추가
all_documents.append(embedding_text)
all_embeddings.append(embedding)
all_metadatas.append(metadata)
all_ids.append(f"{store_id}_{store_hash}")
stats["total_processed"] += 1
stats["newly_added"] += 1
if stats["total_processed"] % 10 == 0:
logger.info(f"처리 진행률: {stats['total_processed']}/{len(similar_stores_data)}")
except Exception as store_error:
logger.error(f"음식점 처리 중 오류 (store_id: {store_id}): {store_error}")
stats["errors"] += 1
logger.warning(f"⚠️ 가게 {store_id} 처리 실패: {e}")
continue
# 배치로 벡터 저장
if all_documents:
logger.info(f"벡터 배치 저장 시작: {len(all_documents)}")
# Vector DB에 저장
if documents:
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=metadatas,
ids=ids
)
try:
self.collection.add(
documents=all_documents,
embeddings=all_embeddings,
metadatas=all_metadatas,
ids=all_ids
)
logger.info("✅ 벡터 배치 저장 성공")
except Exception as save_error:
logger.error(f"❌ 벡터 저장 실패: {save_error}")
return {
"success": False,
"error": f"벡터 저장 실패: {str(save_error)}",
"statistics": stats
}
logger.info(f"✅ Vector Store 구축 완료: {processed_count}개 문서 저장")
return {
'success': True,
'processed_count': processed_count,
'message': f"{processed_count}개 문서가 Vector DB에 저장되었습니다"
}
else:
logger.warning("⚠️ 저장할 벡터 데이터가 없음")
# 최종 통계
try:
total_vectors = self.collection.count()
logger.info(f"✅ Vector Store 구축 완료: 총 {total_vectors}개 벡터")
except Exception as count_error:
logger.warning(f"벡터 개수 확인 실패: {count_error}")
total_vectors = len(all_documents)
return {
'success': False,
'error': '저장할 문서가 없습니다',
'processed_count': 0
}
except Exception as e:
logger.error(f"❌ Vector Store 구축 실패: {e}")
return {
"success": True,
"message": "Vector Store 구축 완료",
"statistics": stats,
"total_vectors": total_vectors,
"store_info": store_info
'success': False,
'error': str(e),
'processed_count': 0
}
def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[str]:
"""유사 케이스 검색 (개선된 버전)"""
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
try:
# 검색 쿼리 생성
query_text = f"가게 ID: {store_id} 요청사항: {context}"
query_embedding = self.embedding_model.encode(query_text)
# 유사도 검색
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=limit,
include=['documents', 'metadatas', 'distances']
)
if results['documents'] and results['documents'][0]:
# 검색 결과 요약
context_parts = []
for i, (doc, metadata, distance) in enumerate(zip(
results['documents'][0],
results['metadatas'][0],
results['distances'][0]
)):
store_name = metadata.get('store_name', 'Unknown')
category = metadata.get('food_category', 'Unknown')
context_parts.append(
f"유사 가게 {i+1}: {store_name} ({category}) - 유사도: {1-distance:.3f}"
)
return "\n".join(context_parts)
return None
except Exception as e:
logger.error(f"Vector Store 구축 전체 실패: {e}")
return {
"success": False,
"error": str(e),
"statistics": stats if 'stats' in locals() else {
"total_processed": 0,
"newly_added": 0,
"updated": 0,
"duplicates": 0,
"errors": 1
}
}
logger.error(f"유사 케이스 검색 실패: {e}")
return None
def get_db_status(self) -> Dict[str, Any]:
"""Vector DB 상태 정보 반환합니다."""
"""DB 상태 정보 반환"""
try:
if not self.is_ready():
return {
@@ -630,44 +267,23 @@ class VectorService:
'total_stores': 0,
'db_path': self.db_path,
'status': 'not_ready',
'initialization_error': self.initialization_error
'error': self.initialization_error
}
# 문서 개수 확인
try:
total_documents = self.collection.count()
except Exception as e:
logger.warning(f"문서 개수 확인 실패: {e}")
total_documents = 0
# 고유 가게 수 확인 (store_id 기준)
try:
# 모든 메타데이터에서 고유 store_id 추출
all_metadata = self.collection.get()
store_ids = set()
if all_metadata.get('metadatas'):
for metadata in all_metadata['metadatas']:
store_id = metadata.get('store_id')
if store_id:
store_ids.add(store_id)
total_stores = len(store_ids)
except Exception as e:
logger.warning(f"가게 수 확인 실패: {e}")
total_stores = 0
# 컬렉션 정보 조회
count = self.collection.count()
return {
'collection_name': self.collection_name,
'total_documents': total_documents,
'total_stores': total_stores,
'total_documents': count,
'total_stores': count, # 각 문서가 하나의 가게를 나타냄
'db_path': self.db_path,
'status': 'ready'
'status': 'ready',
'last_updated': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"DB 상태 확인 실패: {e}")
logger.error(f"DB 상태 조회 실패: {e}")
return {
'collection_name': self.collection_name,
'total_documents': 0,
@@ -675,148 +291,4 @@ class VectorService:
'db_path': self.db_path,
'status': 'error',
'error': str(e)
}
def search_similar_cases(self, store_id: str, context: str) -> Optional[str]:
"""유사한 케이스를 검색합니다."""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 컨텍스트 기반 유사 검색
try:
# 검색 쿼리를 임베딩으로 변환
query_embedding = self.embedding_model.encode(context)
query_embedding = query_embedding.tolist()
# 유사한 문서 검색 (상위 5개)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=['documents', 'metadatas']
)
if not results or not results.get('documents') or not results['documents'][0]:
logger.info("유사 케이스를 찾을 수 없음")
return None
# 컨텍스트 조합
context_parts = []
documents = results['documents'][0]
metadatas = results.get('metadatas', [[]])[0]
for i, doc in enumerate(documents):
metadata = metadatas[i] if i < len(metadatas) else {}
# 가게 정보 추가
store_name = metadata.get('store_name', 'Unknown')
food_category = metadata.get('food_category', 'Unknown')
context_parts.append(f"[{food_category} - {store_name}]")
context_parts.append(doc[:500] + "..." if len(doc) > 500 else doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as search_error:
logger.warning(f"벡터 검색 실패: {search_error}")
return None
except Exception as e:
logger.error(f"유사 케이스 검색 실패: {e}")
return None
def search_similar_cases_improved(self, store_id: str, context: str) -> Optional[str]:
"""
개선된 유사 케이스 검색
1. store_id 기반 필터링 우선 적용
2. 동종 업체 우선 검색
3. 캐싱 및 성능 최적화
"""
try:
if not self.is_ready():
logger.warning("VectorService가 준비되지 않음")
return None
# 1단계: 해당 가게의 정보 먼저 확인
store_context = self.get_store_context(store_id)
food_category = store_context.get('food_category', '') if store_context else ''
# 2단계: 검색 쿼리 구성 (가게 정보 + 컨텍스트)
enhanced_query = f"{food_category} {context}"
query_embedding = self.embedding_model.encode(enhanced_query).tolist()
# 3단계: 동종 업체 우선 검색 (메타데이터 필터링)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=10, # 더 많은 결과에서 필터링
include=['documents', 'metadatas', 'distances'],
where={"food_category": {"$eq": food_category}} if food_category else None
)
if not results or not results.get('documents') or not results['documents'][0]:
# 4단계: 동종 업체가 없으면 전체 검색
logger.info("동종 업체 없음 - 전체 검색으로 전환")
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=['documents', 'metadatas', 'distances']
)
if not results or not results.get('documents') or not results['documents'][0]:
logger.info("유사 케이스를 찾을 수 없음")
return None
# 5단계: 결과 조합 (관련성 높은 순서로)
context_parts = []
documents = results['documents'][0]
metadatas = results.get('metadatas', [[]])[0]
distances = results.get('distances', [[]])[0]
# 거리(유사도) 기준으로 필터링 (너무 관련성 낮은 것 제외)
filtered_results = []
for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)):
if distance < 0.8: # 유사도 임계값
filtered_results.append((doc, metadata, distance))
if not filtered_results:
return None
# 최대 3개의 가장 관련성 높은 케이스만 사용
for doc, metadata, distance in filtered_results[:3]:
store_name = metadata.get('store_name', 'Unknown')
food_cat = metadata.get('food_category', 'Unknown')
context_parts.append(f"[{food_cat} - {store_name}] (유사도: {1-distance:.2f})")
# 문서 길이 제한으로 토큰 수 최적화
context_parts.append(doc[:300] + "..." if len(doc) > 300 else doc)
context_parts.append("---")
return "\n".join(context_parts)
except Exception as e:
logger.error(f"유사 케이스 검색 실패: {e}")
return None
def get_store_context(self, store_id: str) -> Optional[Dict[str, Any]]:
"""해당 가게의 컨텍스트 정보 조회 (캐싱 적용)"""
try:
if not self.is_ready():
return None
# 메타데이터에서 해당 store_id 검색
results = self.collection.get(
where={"store_id": {"$eq": store_id}},
limit=1,
include=['metadatas']
)
if results and results.get('metadatas') and len(results['metadatas']) > 0:
return results['metadatas'][0]
return None
except Exception as e:
logger.error(f"가게 컨텍스트 조회 실패: {e}")
return None
}