# vector/app/services/review_service.py import aiohttp import asyncio import logging from typing import List, Dict, Any, Tuple, Optional from ..config.settings import settings from ..models.review_models import ReviewAnalysisResponse, StoreInfo, ReviewData from ..models.restaurant_models import RestaurantInfo logger = logging.getLogger(__name__) class ReviewService: """리뷰 API 연동 서비스 (본인 가게 우선 처리 강화)""" def __init__(self): self.base_url = settings.get_review_api_url() self.timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT) async def collect_store_reviews(self, store_id: str, max_reviews: int = 100) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]: """ 단일 가게의 리뷰를 수집합니다. (본인 가게용 강화 처리) Args: store_id: 카카오맵 가게 ID max_reviews: 최대 수집할 리뷰 수 Returns: (가게 정보, 리뷰 목록) 튜플 """ try: logger.info(f"🏪 가게 리뷰 수집 시작: store_id={store_id} (최대 {max_reviews}개)") # 본인 가게는 더 관대한 타임아웃 설정 timeout = aiohttp.ClientTimeout(total=900) # 15분 async with aiohttp.ClientSession(timeout=timeout) as session: url = f"{self.base_url}/analyze" payload = { "store_id": store_id, "days_limit": None, # 모든 날짜의 리뷰 수집 "max_time": min(600, max_reviews * 3) # 리뷰 수에 따라 시간 조정, 최대 10분 } logger.info(f"Review API 호출: {url} (타임아웃: {payload['max_time']}초)") async with session.post(url, json=payload) as response: if response.status == 200: data = await response.json() if data.get('success', False): store_info = data.get('store_info') reviews = data.get('reviews', []) logger.info(f"📊 원본 리뷰 수집: {len(reviews)}개") # 리뷰 품질 필터링 filtered_reviews = self._filter_quality_reviews(reviews) logger.info(f"📊 품질 필터링 후: {len(filtered_reviews)}개") # 리뷰 개수 제한 if len(filtered_reviews) > max_reviews: filtered_reviews = filtered_reviews[:max_reviews] logger.info(f"📊 최종 리뷰 수: {len(filtered_reviews)}개 (제한 적용)") # 가게 정보와 리뷰 변환 converted_store_info = self._convert_store_info(store_info) converted_reviews = self._convert_reviews(filtered_reviews) if converted_store_info and converted_reviews: logger.info(f"✅ 리뷰 수집 성공: {len(converted_reviews)}개") return converted_store_info, converted_reviews else: logger.warning("⚠️ 변환된 데이터가 비어있음") return None, [] else: error_msg = data.get('message', 'Unknown error') logger.error(f"❌ 리뷰 분석 실패: {error_msg}") return None, [] else: logger.error(f"❌ Review API 호출 실패: HTTP {response.status}") error_text = await response.text() logger.error(f"Error response: {error_text}") return None, [] except asyncio.TimeoutError: logger.error("❌ Review API 호출 타임아웃") return None, [] except Exception as e: logger.error(f"❌ 리뷰 수집 중 오류: {str(e)}") return None, [] def _filter_quality_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """리뷰 품질 필터링""" try: filtered = [] for review in reviews: content = review.get('content', '').strip() rating = review.get('rating', 0) # 품질 기준 if (len(content) >= 10 and # 최소 10자 이상 rating > 0 and # 별점이 있어야 함 not self._is_spam_review(content)): # 스팸 제외 filtered.append(review) logger.debug(f"품질 필터링: {len(reviews)} → {len(filtered)}") return filtered except Exception as e: logger.warning(f"⚠️ 리뷰 필터링 실패: {e}") return reviews # 실패 시 원본 반환 def _is_spam_review(self, content: str) -> bool: """스팸 리뷰 판별""" try: spam_keywords = [ "추천추천", "최고최고", "맛있어요맛있어요", "좋아요좋아요", "ㅎㅎㅎㅎ", "ㅋㅋㅋㅋ", "굿굿굿", "Nice", "Good" ] content_lower = content.lower() # 스팸 키워드 확인 for keyword in spam_keywords: if keyword.lower() in content_lower: return True # 너무 짧거나 반복 문자 확인 if len(set(content.replace(' ', ''))) < 3: # 고유 문자 3개 미만 return True return False except Exception as e: logger.warning(f"⚠️ 스팸 판별 실패: {e}") return False def _convert_store_info(self, store_info): """가게 정보 변환 (강화된 버전)""" if not store_info: logger.warning("⚠️ 가게 정보가 비어있음") return None try: converted = { 'id': str(store_info.get('id', '')), 'name': str(store_info.get('name', '')), 'category': str(store_info.get('category', '')), 'rating': str(store_info.get('rating', '')), 'review_count': str(store_info.get('review_count', '')), 'status': str(store_info.get('status', '')), 'address': str(store_info.get('address', '')) } # 필수 필드 확인 (ID와 이름은 반드시 있어야 함) if not converted['id'] or not converted['name']: logger.warning(f"⚠️ 필수 가게 정보 누락: ID={converted['id']}, Name={converted['name']}") return None logger.debug(f"가게 정보 변환 성공: {converted['name']}") return converted except Exception as e: logger.error(f"❌ 가게 정보 변환 실패: {e}") return None def _convert_reviews(self, reviews): """리뷰 목록 변환 (강화된 버전)""" if not reviews: logger.warning("⚠️ 리뷰 목록이 비어있음") return [] converted_reviews = [] for i, review in enumerate(reviews): try: # 안전한 형변환 rating = 0 try: rating = int(review.get('rating', 0)) except (ValueError, TypeError): rating = 0 likes = 0 try: likes = int(review.get('likes', 0)) except (ValueError, TypeError): likes = 0 photo_count = 0 try: photo_count = int(review.get('photo_count', 0)) except (ValueError, TypeError): photo_count = 0 converted_review = { 'reviewer_name': str(review.get('reviewer_name', 'Anonymous')), 'rating': rating, 'date': str(review.get('date', '')), 'content': str(review.get('content', '')).strip(), 'badges': list(review.get('badges', [])), 'likes': likes, 'photo_count': photo_count, 'has_photos': bool(photo_count > 0) } # 기본 검증 if converted_review['content'] and converted_review['rating'] > 0: converted_reviews.append(converted_review) else: logger.debug(f"⚠️ 리뷰 {i+1} 품질 미달로 제외") except Exception as e: logger.warning(f"⚠️ 리뷰 {i+1} 변환 실패: {e}") continue logger.info(f"리뷰 변환 완료: {len(reviews)} → {len(converted_reviews)}") return converted_reviews async def collect_multiple_stores_reviews(self, restaurants: List[RestaurantInfo]) -> List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]]: """ 여러 가게의 리뷰를 수집합니다. Args: restaurants: 음식점 목록 Returns: (store_id, 가게 정보, 리뷰 목록) 튜플의 리스트 """ try: logger.info(f"다중 가게 리뷰 수집 시작: {len(restaurants)}개 가게") results = [] # 동시성 제한을 위해 세마포어 사용 semaphore = asyncio.Semaphore(3) # 최대 3개 동시 요청 async def collect_single_store(restaurant: RestaurantInfo): async with semaphore: try: # 카카오맵 place_url에서 store_id 추출 시도 store_id = self._extract_store_id_from_url(restaurant.place_url) if not store_id: # URL에서 추출 실패 시 restaurant.id 사용 store_id = restaurant.id if not store_id: logger.warning(f"Store ID를 찾을 수 없음: {restaurant.place_name}") return None logger.info(f"가게 {restaurant.place_name} 리뷰 수집 중...") # 리뷰 수집 (최대 50개로 제한) store_info, reviews = await self.collect_store_reviews( store_id, max_reviews=settings.MAX_REVIEWS_PER_RESTAURANT ) if store_info and reviews: return (store_id, store_info, reviews) else: logger.warning(f"가게 {restaurant.place_name}의 리뷰 수집 실패") return None except Exception as e: logger.error(f"가게 {restaurant.place_name} 리뷰 수집 중 오류: {e}") return None finally: # API 요청 제한을 위한 지연 await asyncio.sleep(settings.REQUEST_DELAY) # 병렬 처리 tasks = [collect_single_store(restaurant) for restaurant in restaurants] results_raw = await asyncio.gather(*tasks, return_exceptions=True) # 성공한 결과만 필터링 for result in results_raw: if result and not isinstance(result, Exception): results.append(result) logger.info(f"다중 가게 리뷰 수집 완료: {len(results)}개 성공") return results except Exception as e: logger.error(f"다중 가게 리뷰 수집 중 오류: {str(e)}") return [] def _extract_store_id_from_url(self, place_url: str) -> Optional[str]: """ 카카오맵 URL에서 store_id를 추출합니다. Args: place_url: 카카오맵 장소 URL Returns: 추출된 store_id 또는 None """ try: if not place_url: return None # URL 패턴: https://place.map.kakao.com/123456789 import re pattern = r'/(\d+)(?:\?|$|#)' match = re.search(pattern, place_url) if match: store_id = match.group(1) logger.debug(f"URL에서 store_id 추출: {store_id}") return store_id else: logger.debug(f"URL에서 store_id 추출 실패: {place_url}") return None except Exception as e: logger.warning(f"store_id 추출 중 오류: {e}") return None def _build_store_info_from_restaurant(self, restaurant: RestaurantInfo) -> Dict[str, Any]: """ RestaurantInfo를 store_info 형식으로 변환합니다. Args: restaurant: RestaurantInfo 객체 Returns: store_info 딕셔너리 """ try: return { 'id': restaurant.id, 'name': restaurant.place_name, 'category': restaurant.category_name, 'rating': '', # API에서 제공되지 않음 'review_count': '', # API에서 제공되지 않음 'status': '', # API에서 제공되지 않음 'address': restaurant.address_name } except Exception as e: logger.error(f"RestaurantInfo 변환 실패: {e}") return { 'id': '', 'name': '', 'category': '', 'rating': '', 'review_count': '', 'status': '', 'address': '' } def _convert_single_review_data(self, review_data: dict) -> dict: """ 단일 리뷰 데이터 변환 Args: review_data: API 응답의 단일 리뷰 데이터 Returns: 변환된 리뷰 데이터 """ try: return { 'reviewer_name': review_data.get('reviewer_name', ''), 'reviewer_level': review_data.get('reviewer_level', ''), 'reviewer_stats': review_data.get('reviewer_stats', {}), 'rating': int(review_data.get('rating', 0)), 'date': review_data.get('date', ''), 'content': review_data.get('content', ''), 'badges': review_data.get('badges', []), 'likes': int(review_data.get('likes', 0)), 'photo_count': int(review_data.get('photo_count', 0)), 'has_photos': bool(review_data.get('has_photos', False)) } except Exception as e: logger.warning(f"단일 리뷰 데이터 변환 실패: {e}") return { 'reviewer_name': 'Unknown', 'reviewer_level': '', 'reviewer_stats': {}, 'rating': 0, 'date': '', 'content': '', 'badges': [], 'likes': 0, 'photo_count': 0, 'has_photos': False } async def health_check(self) -> bool: """ Review API 상태를 확인합니다. Returns: API 상태 (True: 정상, False: 비정상) """ try: async with aiohttp.ClientSession(timeout=self.timeout) as session: url = f"{self.base_url}/health" async with session.get(url) as response: is_healthy = response.status == 200 if is_healthy: logger.debug("Review API 헬스체크 성공") else: logger.warning(f"Review API 헬스체크 실패: HTTP {response.status}") return is_healthy except Exception as e: logger.error(f"Review API 헬스체크 실패: {e}") return False def get_api_info(self) -> Dict[str, Any]: """ Review API 정보를 반환합니다. Returns: API 정보 딕셔너리 """ return { "service_name": "Review API Service", "base_url": self.base_url, "timeout": self.timeout.total, "max_reviews_per_restaurant": settings.MAX_REVIEWS_PER_RESTAURANT, "request_delay": settings.REQUEST_DELAY } async def test_connection(self) -> Dict[str, Any]: """ Review API 연결을 테스트합니다. Returns: 테스트 결과 """ test_result = { "service": "Review API", "base_url": self.base_url, "status": "unknown", "response_time": None, "error": None } try: import time start_time = time.time() is_healthy = await self.health_check() response_time = time.time() - start_time test_result["response_time"] = round(response_time, 3) if is_healthy: test_result["status"] = "healthy" logger.info(f"Review API 연결 테스트 성공: {response_time:.3f}초") else: test_result["status"] = "unhealthy" test_result["error"] = "Health check failed" logger.warning("Review API 연결 테스트 실패: 헬스체크 실패") except Exception as e: test_result["status"] = "error" test_result["error"] = str(e) logger.error(f"Review API 연결 테스트 오류: {e}") return test_result