# app/services/review_service.py import aiohttp import asyncio import logging from typing import List, Dict, Any, Tuple, Optional from ..config.settings import settings from ..models.review_models import ReviewAnalysisResponse, StoreInfo, ReviewData from ..models.restaurant_models import RestaurantInfo logger = logging.getLogger(__name__) class ReviewService: """리뷰 API 연동 서비스""" def __init__(self): self.base_url = settings.get_review_api_url() self.timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT) async def collect_store_reviews(self, store_id: str, max_reviews: int = 100) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]: """ 단일 가게의 리뷰를 수집합니다. Args: store_id: 카카오맵 가게 ID max_reviews: 최대 수집할 리뷰 수 Returns: (가게 정보, 리뷰 목록) 튜플 """ try: logger.info(f"🏪 가게 리뷰 수집 시작: store_id={store_id} (최대 {max_reviews}개)") # 본인 가게는 더 관대한 타임아웃 설정 timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT) async with aiohttp.ClientSession(timeout=timeout) as session: url = f"{self.base_url}/analyze" payload = { "store_id": store_id, "days_limit": None, # 모든 날짜의 리뷰 수집 "max_time": settings.REQUEST_TIMEOUT } logger.info(f"Review API 호출: {url} (타임아웃: {payload['max_time']}초)") async with session.post(url, json=payload) as response: if response.status == 200: data = await response.json() if data.get('success', False): store_info = data.get('store_info') reviews = data.get('reviews', []) logger.info(f"📊 원본 리뷰 수집: {len(reviews)}개") # 리뷰 품질 필터링 filtered_reviews = self._filter_quality_reviews(reviews) logger.info(f"📊 품질 필터링 후: {len(filtered_reviews)}개") # 리뷰 개수 제한 if len(filtered_reviews) > max_reviews: filtered_reviews = filtered_reviews[:max_reviews] logger.info(f"📊 최종 리뷰 수: {len(filtered_reviews)}개 (제한 적용)") # 가게 정보와 리뷰 변환 converted_store_info = self._convert_store_info(store_info) converted_reviews = self._convert_reviews(filtered_reviews) return converted_store_info, converted_reviews else: logger.warning(f"⚠️ 리뷰 수집 실패: {data.get('message', 'Unknown error')}") return None, [] else: logger.error(f"❌ Review API 호출 실패: HTTP {response.status}") error_text = await response.text() logger.error(f"Error response: {error_text}") return None, [] except asyncio.TimeoutError: logger.error("⏰ Review API 호출 타임아웃") return None, [] except Exception as e: logger.error(f"❌ 리뷰 수집 중 오류: {str(e)}") return None, [] def _filter_quality_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 리뷰 품질을 기준으로 필터링합니다. Args: reviews: 원본 리뷰 목록 Returns: 필터링된 리뷰 목록 """ filtered_reviews = [] for review in reviews: content = review.get('content', '').strip() # 품질 기준 if ( len(content) >= 10 and # 최소 10자 이상 not self._is_spam_content(content) and # 스팸 내용이 아님 review.get('rating', 0) > 0 # 별점이 있음 ): filtered_reviews.append(review) return filtered_reviews def _is_spam_content(self, content: str) -> bool: """ 스팸 또는 의미없는 리뷰인지 판단합니다. Args: content: 리뷰 내용 Returns: 스팸 여부 """ spam_patterns = [ 'ㅋㅋㅋㅋㅋ', 'ㅎㅎㅎㅎㅎ', '!!!!!', '.....', '???', '^^', '굿굿굿', '최고최고' ] content_lower = content.lower() # 스팸 패턴 체크 for pattern in spam_patterns: if pattern in content_lower: return True # 너무 반복적인 문자 체크 if len(set(content)) < 3: # 고유 문자가 3개 미만 return True return False def _convert_store_info(self, store_info: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """ API 응답의 가게 정보를 내부 형식으로 변환합니다. Args: store_info: API 응답 가게 정보 Returns: 변환된 가게 정보 """ if not store_info: return None return { 'id': store_info.get('id', ''), 'name': store_info.get('name', ''), 'category': store_info.get('category', ''), 'rating': store_info.get('rating', ''), 'review_count': store_info.get('review_count', ''), 'status': store_info.get('status', ''), 'address': store_info.get('address', '') } def _convert_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ API 응답의 리뷰를 내부 형식으로 변환합니다. Args: reviews: API 응답 리뷰 목록 Returns: 변환된 리뷰 목록 """ converted_reviews = [] for review in reviews: converted_review = { 'reviewer_name': review.get('reviewer_name', ''), 'reviewer_level': review.get('reviewer_level', ''), 'reviewer_stats': review.get('reviewer_stats', {}), 'rating': review.get('rating', 0), 'date': review.get('date', ''), 'content': review.get('content', ''), 'badges': review.get('badges', []), 'likes': review.get('likes', 0), 'photo_count': review.get('photo_count', 0), 'has_photos': review.get('has_photos', False) } converted_reviews.append(converted_review) return converted_reviews async def health_check(self) -> bool: """ Review API 상태를 확인합니다. Returns: API 상태 (True: 정상, False: 비정상) """ try: async with aiohttp.ClientSession(timeout=self.timeout) as session: url = f"{self.base_url}/health" async with session.get(url) as response: return response.status == 200 except Exception as e: logger.error(f"Review API 헬스체크 실패: {e}") return False