ai-review/vector/app/services/review_service.py
2025-06-15 13:52:26 +00:00

468 lines
19 KiB
Python

# vector/app/services/review_service.py
import aiohttp
import asyncio
import logging
from typing import List, Dict, Any, Tuple, Optional
from ..config.settings import settings
from ..models.review_models import ReviewAnalysisResponse, StoreInfo, ReviewData
from ..models.restaurant_models import RestaurantInfo
logger = logging.getLogger(__name__)
class ReviewService:
"""리뷰 API 연동 서비스 (본인 가게 우선 처리 강화)"""
def __init__(self):
self.base_url = settings.get_review_api_url()
self.timeout = aiohttp.ClientTimeout(total=settings.REQUEST_TIMEOUT)
async def collect_store_reviews(self, store_id: str, max_reviews: int = 100) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]:
"""
단일 가게의 리뷰를 수집합니다. (본인 가게용 강화 처리)
Args:
store_id: 카카오맵 가게 ID
max_reviews: 최대 수집할 리뷰 수
Returns:
(가게 정보, 리뷰 목록) 튜플
"""
try:
logger.info(f"🏪 가게 리뷰 수집 시작: store_id={store_id} (최대 {max_reviews}개)")
# 본인 가게는 더 관대한 타임아웃 설정
timeout = aiohttp.ClientTimeout(total=900) # 15분
async with aiohttp.ClientSession(timeout=timeout) as session:
url = f"{self.base_url}/analyze"
payload = {
"store_id": store_id,
"days_limit": None, # 모든 날짜의 리뷰 수집
"max_time": min(600, max_reviews * 3) # 리뷰 수에 따라 시간 조정, 최대 10분
}
logger.info(f"Review API 호출: {url} (타임아웃: {payload['max_time']}초)")
async with session.post(url, json=payload) as response:
if response.status == 200:
data = await response.json()
if data.get('success', False):
store_info = data.get('store_info')
reviews = data.get('reviews', [])
logger.info(f"📊 원본 리뷰 수집: {len(reviews)}")
# 리뷰 품질 필터링
filtered_reviews = self._filter_quality_reviews(reviews)
logger.info(f"📊 품질 필터링 후: {len(filtered_reviews)}")
# 리뷰 개수 제한
if len(filtered_reviews) > max_reviews:
filtered_reviews = filtered_reviews[:max_reviews]
logger.info(f"📊 최종 리뷰 수: {len(filtered_reviews)}개 (제한 적용)")
# 가게 정보와 리뷰 변환
converted_store_info = self._convert_store_info(store_info)
converted_reviews = self._convert_reviews(filtered_reviews)
if converted_store_info and converted_reviews:
logger.info(f"✅ 리뷰 수집 성공: {len(converted_reviews)}")
return converted_store_info, converted_reviews
else:
logger.warning("⚠️ 변환된 데이터가 비어있음")
return None, []
else:
error_msg = data.get('message', 'Unknown error')
logger.error(f"❌ 리뷰 분석 실패: {error_msg}")
return None, []
else:
logger.error(f"❌ Review API 호출 실패: HTTP {response.status}")
error_text = await response.text()
logger.error(f"Error response: {error_text}")
return None, []
except asyncio.TimeoutError:
logger.error("❌ Review API 호출 타임아웃")
return None, []
except Exception as e:
logger.error(f"❌ 리뷰 수집 중 오류: {str(e)}")
return None, []
def _filter_quality_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""리뷰 품질 필터링"""
try:
filtered = []
for review in reviews:
content = review.get('content', '').strip()
rating = review.get('rating', 0)
# 품질 기준
if (len(content) >= 10 and # 최소 10자 이상
rating > 0 and # 별점이 있어야 함
not self._is_spam_review(content)): # 스팸 제외
filtered.append(review)
logger.debug(f"품질 필터링: {len(reviews)}{len(filtered)}")
return filtered
except Exception as e:
logger.warning(f"⚠️ 리뷰 필터링 실패: {e}")
return reviews # 실패 시 원본 반환
def _is_spam_review(self, content: str) -> bool:
"""스팸 리뷰 판별"""
try:
spam_keywords = [
"추천추천", "최고최고", "맛있어요맛있어요",
"좋아요좋아요", "ㅎㅎㅎㅎ", "ㅋㅋㅋㅋ",
"굿굿굿", "Nice", "Good"
]
content_lower = content.lower()
# 스팸 키워드 확인
for keyword in spam_keywords:
if keyword.lower() in content_lower:
return True
# 너무 짧거나 반복 문자 확인
if len(set(content.replace(' ', ''))) < 3: # 고유 문자 3개 미만
return True
return False
except Exception as e:
logger.warning(f"⚠️ 스팸 판별 실패: {e}")
return False
def _convert_store_info(self, store_info):
"""가게 정보 변환 (강화된 버전)"""
if not store_info:
logger.warning("⚠️ 가게 정보가 비어있음")
return None
try:
converted = {
'id': str(store_info.get('id', '')),
'name': str(store_info.get('name', '')),
'category': str(store_info.get('category', '')),
'rating': str(store_info.get('rating', '')),
'review_count': str(store_info.get('review_count', '')),
'status': str(store_info.get('status', '')),
'address': str(store_info.get('address', ''))
}
# 필수 필드 확인 (ID와 이름은 반드시 있어야 함)
if not converted['id'] or not converted['name']:
logger.warning(f"⚠️ 필수 가게 정보 누락: ID={converted['id']}, Name={converted['name']}")
return None
logger.debug(f"가게 정보 변환 성공: {converted['name']}")
return converted
except Exception as e:
logger.error(f"❌ 가게 정보 변환 실패: {e}")
return None
def _convert_reviews(self, reviews):
"""리뷰 목록 변환 (강화된 버전)"""
if not reviews:
logger.warning("⚠️ 리뷰 목록이 비어있음")
return []
converted_reviews = []
for i, review in enumerate(reviews):
try:
# 안전한 형변환
rating = 0
try:
rating = int(review.get('rating', 0))
except (ValueError, TypeError):
rating = 0
likes = 0
try:
likes = int(review.get('likes', 0))
except (ValueError, TypeError):
likes = 0
photo_count = 0
try:
photo_count = int(review.get('photo_count', 0))
except (ValueError, TypeError):
photo_count = 0
converted_review = {
'reviewer_name': str(review.get('reviewer_name', 'Anonymous')),
'rating': rating,
'date': str(review.get('date', '')),
'content': str(review.get('content', '')).strip(),
'badges': list(review.get('badges', [])),
'likes': likes,
'photo_count': photo_count,
'has_photos': bool(photo_count > 0)
}
# 기본 검증
if converted_review['content'] and converted_review['rating'] > 0:
converted_reviews.append(converted_review)
else:
logger.debug(f"⚠️ 리뷰 {i+1} 품질 미달로 제외")
except Exception as e:
logger.warning(f"⚠️ 리뷰 {i+1} 변환 실패: {e}")
continue
logger.info(f"리뷰 변환 완료: {len(reviews)}{len(converted_reviews)}")
return converted_reviews
async def collect_multiple_stores_reviews(self, restaurants: List[RestaurantInfo]) -> List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]]:
"""
여러 가게의 리뷰를 수집합니다.
Args:
restaurants: 음식점 목록
Returns:
(store_id, 가게 정보, 리뷰 목록) 튜플의 리스트
"""
try:
logger.info(f"다중 가게 리뷰 수집 시작: {len(restaurants)}개 가게")
results = []
# 동시성 제한을 위해 세마포어 사용
semaphore = asyncio.Semaphore(3) # 최대 3개 동시 요청
async def collect_single_store(restaurant: RestaurantInfo):
async with semaphore:
try:
# 카카오맵 place_url에서 store_id 추출 시도
store_id = self._extract_store_id_from_url(restaurant.place_url)
if not store_id:
# URL에서 추출 실패 시 restaurant.id 사용
store_id = restaurant.id
if not store_id:
logger.warning(f"Store ID를 찾을 수 없음: {restaurant.place_name}")
return None
logger.info(f"가게 {restaurant.place_name} 리뷰 수집 중...")
# 리뷰 수집 (최대 50개로 제한)
store_info, reviews = await self.collect_store_reviews(
store_id,
max_reviews=settings.MAX_REVIEWS_PER_RESTAURANT
)
if store_info and reviews:
return (store_id, store_info, reviews)
else:
logger.warning(f"가게 {restaurant.place_name}의 리뷰 수집 실패")
return None
except Exception as e:
logger.error(f"가게 {restaurant.place_name} 리뷰 수집 중 오류: {e}")
return None
finally:
# API 요청 제한을 위한 지연
await asyncio.sleep(settings.REQUEST_DELAY)
# 병렬 처리
tasks = [collect_single_store(restaurant) for restaurant in restaurants]
results_raw = await asyncio.gather(*tasks, return_exceptions=True)
# 성공한 결과만 필터링
for result in results_raw:
if result and not isinstance(result, Exception):
results.append(result)
logger.info(f"다중 가게 리뷰 수집 완료: {len(results)}개 성공")
return results
except Exception as e:
logger.error(f"다중 가게 리뷰 수집 중 오류: {str(e)}")
return []
def _extract_store_id_from_url(self, place_url: str) -> Optional[str]:
"""
카카오맵 URL에서 store_id를 추출합니다.
Args:
place_url: 카카오맵 장소 URL
Returns:
추출된 store_id 또는 None
"""
try:
if not place_url:
return None
# URL 패턴: https://place.map.kakao.com/123456789
import re
pattern = r'/(\d+)(?:\?|$|#)'
match = re.search(pattern, place_url)
if match:
store_id = match.group(1)
logger.debug(f"URL에서 store_id 추출: {store_id}")
return store_id
else:
logger.debug(f"URL에서 store_id 추출 실패: {place_url}")
return None
except Exception as e:
logger.warning(f"store_id 추출 중 오류: {e}")
return None
def _build_store_info_from_restaurant(self, restaurant: RestaurantInfo) -> Dict[str, Any]:
"""
RestaurantInfo를 store_info 형식으로 변환합니다.
Args:
restaurant: RestaurantInfo 객체
Returns:
store_info 딕셔너리
"""
try:
return {
'id': restaurant.id,
'name': restaurant.place_name,
'category': restaurant.category_name,
'rating': '', # API에서 제공되지 않음
'review_count': '', # API에서 제공되지 않음
'status': '', # API에서 제공되지 않음
'address': restaurant.address_name
}
except Exception as e:
logger.error(f"RestaurantInfo 변환 실패: {e}")
return {
'id': '',
'name': '',
'category': '',
'rating': '',
'review_count': '',
'status': '',
'address': ''
}
def _convert_single_review_data(self, review_data: dict) -> dict:
"""
단일 리뷰 데이터 변환
Args:
review_data: API 응답의 단일 리뷰 데이터
Returns:
변환된 리뷰 데이터
"""
try:
return {
'reviewer_name': review_data.get('reviewer_name', ''),
'reviewer_level': review_data.get('reviewer_level', ''),
'reviewer_stats': review_data.get('reviewer_stats', {}),
'rating': int(review_data.get('rating', 0)),
'date': review_data.get('date', ''),
'content': review_data.get('content', ''),
'badges': review_data.get('badges', []),
'likes': int(review_data.get('likes', 0)),
'photo_count': int(review_data.get('photo_count', 0)),
'has_photos': bool(review_data.get('has_photos', False))
}
except Exception as e:
logger.warning(f"단일 리뷰 데이터 변환 실패: {e}")
return {
'reviewer_name': 'Unknown',
'reviewer_level': '',
'reviewer_stats': {},
'rating': 0,
'date': '',
'content': '',
'badges': [],
'likes': 0,
'photo_count': 0,
'has_photos': False
}
async def health_check(self) -> bool:
"""
Review API 상태를 확인합니다.
Returns:
API 상태 (True: 정상, False: 비정상)
"""
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
url = f"{self.base_url}/health"
async with session.get(url) as response:
is_healthy = response.status == 200
if is_healthy:
logger.debug("Review API 헬스체크 성공")
else:
logger.warning(f"Review API 헬스체크 실패: HTTP {response.status}")
return is_healthy
except Exception as e:
logger.error(f"Review API 헬스체크 실패: {e}")
return False
def get_api_info(self) -> Dict[str, Any]:
"""
Review API 정보를 반환합니다.
Returns:
API 정보 딕셔너리
"""
return {
"service_name": "Review API Service",
"base_url": self.base_url,
"timeout": self.timeout.total,
"max_reviews_per_restaurant": settings.MAX_REVIEWS_PER_RESTAURANT,
"request_delay": settings.REQUEST_DELAY
}
async def test_connection(self) -> Dict[str, Any]:
"""
Review API 연결을 테스트합니다.
Returns:
테스트 결과
"""
test_result = {
"service": "Review API",
"base_url": self.base_url,
"status": "unknown",
"response_time": None,
"error": None
}
try:
import time
start_time = time.time()
is_healthy = await self.health_check()
response_time = time.time() - start_time
test_result["response_time"] = round(response_time, 3)
if is_healthy:
test_result["status"] = "healthy"
logger.info(f"Review API 연결 테스트 성공: {response_time:.3f}")
else:
test_result["status"] = "unhealthy"
test_result["error"] = "Health check failed"
logger.warning("Review API 연결 테스트 실패: 헬스체크 실패")
except Exception as e:
test_result["status"] = "error"
test_result["error"] = str(e)
logger.error(f"Review API 연결 테스트 오류: {e}")
return test_result