diff --git a/vector/app/config/settings.py b/vector/app/config/settings.py index 1a07d34..0147111 100644 --- a/vector/app/config/settings.py +++ b/vector/app/config/settings.py @@ -1,6 +1,5 @@ # app/config/settings.py import os -from typing import Optional class Settings: """환경 변수 기반 설정 클래스""" @@ -19,18 +18,10 @@ class Settings: RESTAURANT_API_HOST = os.getenv("RESTAURANT_API_HOST", "0.0.0.0") RESTAURANT_API_PORT = os.getenv("RESTAURANT_API_PORT", "18000") - @property - def RESTAURANT_API_URL(self) -> str: - return f"http://{self.RESTAURANT_API_HOST}:{self.RESTAURANT_API_PORT}" - # Review API 설정 REVIEW_API_HOST = os.getenv("REVIEW_API_HOST", "0.0.0.0") REVIEW_API_PORT = os.getenv("REVIEW_API_PORT", "19000") - @property - def REVIEW_API_URL(self) -> str: - return f"http://{self.REVIEW_API_HOST}:{self.REVIEW_API_PORT}" - # Claude API 설정 CLAUDE_API_KEY = os.getenv("CLAUDE_API_KEY", "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") CLAUDE_MODEL = os.getenv("CLAUDE_MODEL", "claude-sonnet-4-20250514") @@ -77,4 +68,4 @@ class Settings: return f"http://{host}:{port}" # 설정 인스턴스 -settings = Settings() +settings = Settings() \ No newline at end of file diff --git a/vector/app/main.py b/vector/app/main.py index 2562feb..8476d7e 100644 --- a/vector/app/main.py +++ b/vector/app/main.py @@ -17,7 +17,6 @@ def is_kubernetes_env(): if not is_kubernetes_env(): try: sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - from dotenv import load_dotenv load_dotenv() print("✅ 로컬 개발환경: .env 파일 로딩") @@ -31,8 +30,7 @@ from contextlib import asynccontextmanager from datetime import datetime from typing import Optional import asyncio -import fastapi -from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends +from fastapi import FastAPI, HTTPException, Depends from fastapi.responses import HTMLResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field @@ -43,6 +41,7 @@ from app.models.restaurant_models import RestaurantSearchRequest, ErrorResponse from app.models.vector_models import ( VectorBuildRequest, VectorBuildResponse, ActionRecommendationRequest, ActionRecommendationResponse, + ActionRecommendationSimpleResponse, VectorDBStatusResponse, VectorDBStatus ) from app.services.restaurant_service import RestaurantService @@ -306,80 +305,66 @@ async def root(): {settings.APP_TITLE} -

🍽️ {settings.APP_TITLE}

-

{settings.APP_DESCRIPTION}

- -
-

📊 Vector DB 상태

- - {f''' -
-

⚠️ 초기화 실패 서비스

+
+

🍽️ {settings.APP_TITLE}

+

{settings.APP_DESCRIPTION}

+ +
+

📊 Vector DB 상태

    - {"".join([f"
  • {k}: {v}
  • " for k, v in app_state["initialization_errors"].items()])} +
  • 컬렉션: {db_status.get('collection_name', 'N/A')}
  • +
  • 총 문서: {db_status.get('total_documents', 0)}
  • +
  • 총 가게: {db_status.get('total_stores', 0)}
  • +
  • 상태: {db_status.get('status', 'unknown')}
- ''' if app_state["initialization_errors"] else ''} -
- -
-

🔧 시스템 구성

-
    -
  • Claude Model: {settings.CLAUDE_MODEL}
  • -
  • Embedding Model: {settings.EMBEDDING_MODEL}
  • -
  • Vector DB Path: {settings.VECTOR_DB_PATH}
  • -
  • 환경: {'Kubernetes' if hasattr(settings, 'IS_K8S_ENV') and settings.IS_K8S_ENV else 'Local'}
  • -
-
- -

📚 API 문서

- Swagger UI 문서 - ReDoc 문서 - 헬스 체크 - Vector DB 상태 - -

🛠️ 사용 방법

-

POST /find-reviews - 리뷰 검색 및 Vector DB 저장 (본인 가게 우선)

-
+                
+                
+

🔧 시스템 구성

+
    +
  • Claude Model: {settings.CLAUDE_MODEL}
  • +
  • Embedding Model: {settings.EMBEDDING_MODEL}
  • +
  • Vector DB Path: {settings.VECTOR_DB_PATH}
  • +
  • 환경: {'Kubernetes' if hasattr(settings, 'IS_K8S_ENV') and settings.IS_K8S_ENV else 'Local'}
  • +
+
+ +

📚 API 문서

+ Swagger UI 문서 + ReDoc 문서 + 헬스 체크 + Vector DB 상태 + +

🛠️ 사용 방법

+

POST /find-reviews - 리뷰 검색 및 Vector DB 저장 (본인 가게 우선)

+
 {{
   "region": "서울특별시 강남구 역삼동",
   "store_name": "맛있는 한식당"
 }}
-            
- -

POST /build-vector - Vector DB 구축

-
-{{
-  "region": "서울특별시 강남구 역삼동",
-  "store_name": "맛있는 한식당",
-  "force_rebuild": false
-}}
-            
- -

POST /action-recommendation - 액션 추천 요청

-
+                
+ +

POST /action-recommendation-simple - 간소화된 액션 추천 요청

+
 {{
   "store_id": "12345",
   "context": "매출이 감소하고 있어서 개선이 필요합니다"
 }}
-            
+
+
""" @@ -393,14 +378,14 @@ async def find_reviews( ): """ 지역과 가게명으로 리뷰를 찾아 Vector DB에 저장합니다. - 🔥 본인 가게 리뷰는 반드시 포함됩니다. (수정된 버전) + 🔥 본인 가게 리뷰는 반드시 포함됩니다. """ start_time = datetime.now() logger.info(f"🔍 리뷰 검색 요청: {request.region} - {request.store_name}") try: # 1단계: 본인 가게 검색 - logger.info("1단계: 본인 가게 검색 중... (최우선)") + logger.info("1단계: 본인 가게 검색 중...") target_restaurant = await restaurant_service.find_store_by_name_and_region( request.region, request.store_name ) @@ -411,98 +396,48 @@ async def find_reviews( status_code=404, detail=f"'{request.store_name}' 가게를 찾을 수 없습니다. 가게명과 지역을 정확히 입력해주세요." ) - - logger.info(f"✅ 본인 가게 발견: {target_restaurant.place_name} (ID: {target_restaurant.id})") - # 2단계: 동종 업체 검색 - logger.info("2단계: 동종 업체 검색 중...") - similar_restaurants = [] - food_category = "기타" # 기본값 + logger.info(f"✅ 본인 가게 찾기 성공: {target_restaurant.place_name}") - try: - food_category = extract_food_category(target_restaurant.category_name) - logger.info(f"추출된 음식 카테고리: {food_category}") - - similar_restaurants = await restaurant_service.find_similar_stores( - request.region, food_category, settings.MAX_RESTAURANTS_PER_CATEGORY - ) - - logger.info(f"✅ 동종 업체 {len(similar_restaurants)}개 발견") - - except Exception as e: - logger.warning(f"⚠️ 동종 업체 검색 실패 (본인 가게는 계속 진행): {e}") + # 2단계: 음식 카테고리 추출 + food_category = extract_food_category(target_restaurant.category_name) + logger.info(f"🍽️ 추출된 음식 카테고리: {food_category}") - # 3단계: 전체 가게 목록 구성 (본인 가게 우선 + 중복 제거) - logger.info("3단계: 전체 가게 목록 구성 중...") + # 3단계: 본인 가게 리뷰 수집 (우선순위 1) + logger.info("3단계: 본인 가게 리뷰 수집 중...") + target_store_info, target_reviews = await review_service.collect_store_reviews( + target_restaurant.id, max_reviews=100 + ) - # 본인 가게를 첫 번째로 배치 - all_restaurants = [target_restaurant] + if not target_reviews: + logger.warning("⚠️ 본인 가게 리뷰가 없습니다") - # 동종 업체 추가 (개선된 중복 제거) - for restaurant in similar_restaurants: - if not _is_duplicate_restaurant(target_restaurant, restaurant): - all_restaurants.append(restaurant) + # 4단계: 동종 업체 검색 + logger.info("4단계: 동종 업체 검색 중...") + similar_stores = await restaurant_service.find_similar_stores( + request.region, food_category, max_count=50 + ) - logger.info(f"✅ 전체 가게 목록 구성 완료: {len(all_restaurants)}개 (본인 가게 포함)") - - # 4단계: 전체 리뷰 수집 (본인 가게 우선 처리) - logger.info("4단계: 리뷰 수집 중... (본인 가게 우선)") - - # 본인 가게 우선 처리를 위한 특별 로직 + # 5단계: 동종 업체 리뷰 수집 + logger.info("5단계: 동종 업체 리뷰 수집 중...") review_results = [] - # 4-1: 본인 가게 리뷰 수집 (실패 시 전체 중단) - try: - logger.info("본인 가게 리뷰 우선 수집 중... (필수)") - target_store_info, target_reviews = await review_service.collect_store_reviews( - target_restaurant.id, - max_reviews=settings.MAX_REVIEWS_PER_RESTAURANT * 2 # 본인 가게는 더 많이 - ) - - if not target_store_info or not target_reviews: - logger.error(f"❌ 본인 가게 리뷰 수집 실패: {target_restaurant.place_name}") - raise HTTPException( - status_code=422, - detail=f"본인 가게 '{target_restaurant.place_name}'의 리뷰를 수집할 수 없습니다." - ) - - # 본인 가게 결과를 첫 번째로 설정 + # 본인 가게를 첫 번째로 추가 + if target_store_info and target_reviews: review_results.append((target_restaurant.id, target_store_info, target_reviews)) - logger.info(f"✅ 본인 가게 리뷰 수집 성공: {len(target_reviews)}개") - - except HTTPException: - raise - except Exception as e: - logger.error(f"❌ 본인 가게 리뷰 수집 중 오류: {e}") - raise HTTPException( - status_code=500, - detail=f"본인 가게 리뷰 수집 중 오류가 발생했습니다: {str(e)}" - ) - # 4-2: 동종 업체 리뷰 수집 (실패해도 본인 가게는 유지) - if len(all_restaurants) > 1: # 본인 가게 외에 다른 가게가 있는 경우 - try: - logger.info(f"동종 업체 리뷰 수집 중... ({len(all_restaurants) - 1}개)") - - # 본인 가게 제외한 동종 업체만 수집 - similar_restaurants_only = all_restaurants[1:] - similar_results = await review_service.collect_multiple_stores_reviews(similar_restaurants_only) - - # 동종 업체 결과 추가 - review_results.extend(similar_results) - - similar_reviews_count = sum(len(reviews) for _, _, reviews in similar_results) - logger.info(f"✅ 동종 업체 리뷰 수집 완료: {len(similar_results)}개 가게, {similar_reviews_count}개 리뷰") - - except Exception as e: - logger.warning(f"⚠️ 동종 업체 리뷰 수집 실패 (본인 가게는 유지): {e}") - else: - logger.info("동종 업체가 없어 본인 가게 리뷰만 사용") + # 동종 업체 리뷰 수집 (본인 가게 제외) + for store in similar_stores[:10]: # 최대 10개 동종 업체 + if store.id != target_restaurant.id: # 본인 가게 제외 + store_info, reviews = await review_service.collect_store_reviews( + store.id, max_reviews=50 + ) + if store_info and reviews: + review_results.append((store.id, store_info, reviews)) - # 5단계: Vector DB 구축 - logger.info("5단계: Vector DB 구축 중...") + # 6단계: Vector DB 저장 + logger.info("6단계: Vector DB 저장 중...") try: - # 대상 가게 정보를 딕셔너리로 변환 target_store_info_dict = { 'id': target_restaurant.id, 'place_name': target_restaurant.place_name, @@ -531,17 +466,8 @@ async def find_reviews( detail=f"Vector DB 구축 중 오류가 발생했습니다: {str(e)}" ) - # 최종 검증: 본인 가게가 첫 번째에 있는지 확인 - if not review_results or review_results[0][0] != target_restaurant.id: - logger.error("❌ 본인 가게가 첫 번째에 없음") - raise HTTPException( - status_code=500, - detail="본인 가게 리뷰 처리 순서 오류가 발생했습니다." - ) - # 성공 응답 total_reviews = sum(len(reviews) for _, _, reviews in review_results) - execution_time = (datetime.now() - start_time).total_seconds() return FindReviewsResponse( success=True, @@ -565,73 +491,12 @@ async def find_reviews( except HTTPException: raise except Exception as e: - execution_time = (datetime.now() - start_time).total_seconds() logger.error(f"❌ 전체 프로세스 실패: {e}") raise HTTPException( status_code=500, detail=f"서비스 처리 중 예상치 못한 오류가 발생했습니다: {str(e)}" ) -def _is_duplicate_restaurant(restaurant1: RestaurantInfo, restaurant2: RestaurantInfo) -> bool: - """ - 두 음식점이 중복인지 확인 (개선된 로직) - - Args: - restaurant1: 첫 번째 음식점 - restaurant2: 두 번째 음식점 - - Returns: - 중복 여부 - """ - # 1. ID 기준 확인 - if restaurant1.id == restaurant2.id: - return True - - # 2. place_url에서 추출한 store_id 기준 확인 - store_id1 = _extract_store_id_from_place_url(restaurant1.place_url) - store_id2 = _extract_store_id_from_place_url(restaurant2.place_url) - - if store_id1 and store_id2 and store_id1 == store_id2: - return True - - # 3. restaurant.id와 place_url store_id 교차 확인 - if restaurant1.id == store_id2 or restaurant2.id == store_id1: - return True - - # 4. 이름 + 주소 기준 확인 (최후 방법) - if (restaurant1.place_name == restaurant2.place_name and - restaurant1.address_name == restaurant2.address_name): - return True - - return False - -def _extract_store_id_from_place_url(place_url: str) -> Optional[str]: - """ - 카카오맵 URL에서 store_id를 추출합니다. - - Args: - place_url: 카카오맵 장소 URL - - Returns: - 추출된 store_id 또는 None - """ - try: - if not place_url: - return None - - import re - # URL 패턴: https://place.map.kakao.com/123456789 - pattern = r'/(\d+)(?:\?|$|#)' - match = re.search(pattern, place_url) - - if match: - return match.group(1) - else: - return None - - except Exception: - return None - @app.post( "/action-recommendation-simple", response_model=ActionRecommendationSimpleResponse, @@ -702,180 +567,53 @@ async def action_recommendation_simple( summary="Vector DB 상태 조회", description="Vector DB의 현재 상태를 조회합니다." ) -async def get_vector_db_status(vector_service: VectorService = Depends(get_vector_service)): - """Vector DB 상태 조회 API""" +async def get_vector_status(vector_service: VectorService = Depends(get_vector_service)): + """Vector DB 상태를 조회합니다.""" try: - status_info = vector_service.get_db_status() + db_status = vector_service.get_db_status() status = VectorDBStatus( - collection_name=status_info['collection_name'], - total_documents=status_info['total_documents'], - total_stores=status_info['total_stores'], - db_path=status_info['db_path'], - last_updated=datetime.now().isoformat() + collection_name=db_status['collection_name'], + total_documents=db_status['total_documents'], + total_stores=db_status['total_stores'], + db_path=db_status['db_path'], + last_updated=db_status.get('last_updated') ) return VectorDBStatusResponse( success=True, status=status, - message="Vector DB 상태 조회가 완료되었습니다." + message="Vector DB 상태 조회 성공" ) except Exception as e: logger.error(f"Vector DB 상태 조회 실패: {e}") - - raise HTTPException( - status_code=500, - detail={ - "success": False, - "error": "STATUS_CHECK_FAILED", - "message": f"Vector DB 상태 조회 중 오류가 발생했습니다: {str(e)}", - "timestamp": datetime.now().isoformat() - } + return VectorDBStatusResponse( + success=False, + status=VectorDBStatus( + collection_name="unknown", + total_documents=0, + total_stores=0, + db_path="unknown" + ), + message=f"상태 조회 실패: {str(e)}" ) -@app.get("/health", summary="헬스 체크", description="API 서버 및 외부 서비스 상태를 확인합니다.") +@app.get("/health") async def health_check(): - """🏥 헬스체크 API""" - health_result = { + """헬스체크 엔드포인트""" + return { "status": "healthy", "timestamp": datetime.now().isoformat(), - "services": {}, - "app_info": { - "name": settings.APP_TITLE, - "version": settings.APP_VERSION, - "startup_completed": app_state["startup_completed"] - } + "services": { + "vector_service": app_state["vector_service"] is not None, + "restaurant_service": app_state["restaurant_service"] is not None, + "review_service": app_state["review_service"] is not None, + "claude_service": app_state["claude_service"] is not None, + }, + "initialization_errors": app_state["initialization_errors"] } - - # 서비스별 헬스체크 - services_to_check = [ - ("restaurant_service", "restaurant_api"), - ("review_service", "review_api"), - ("claude_service", "claude_ai"), - ("vector_service", "vector_db") - ] - - healthy_count = 0 - total_checks = len(services_to_check) - - for service_key, health_key in services_to_check: - try: - service = app_state.get(service_key) - if service is None: - health_result["services"][health_key] = "not_initialized" - continue - - # 서비스별 헬스체크 메서드 호출 - if hasattr(service, 'health_check'): - status = await service.health_check() - else: - status = True # 헬스체크 메서드가 없으면 초기화됐다고 가정 - - # Vector DB의 경우 상세 정보 추가 - if health_key == "vector_db" and status: - try: - db_status = service.get_db_status() - health_result["vector_db_info"] = { - "total_documents": db_status.get('total_documents', 0), - "total_stores": db_status.get('total_stores', 0), - "db_path": db_status.get('db_path', '') - } - except: - pass - - health_result["services"][health_key] = "healthy" if status else "unhealthy" - if status: - healthy_count += 1 - - except Exception as e: - logger.warning(f"헬스체크 실패 - {service_key}: {e}") - health_result["services"][health_key] = f"error: {str(e)}" - - # 전체 상태 결정 - if healthy_count == total_checks: - health_result["status"] = "healthy" - elif healthy_count > 0: - health_result["status"] = "degraded" - else: - health_result["status"] = "unhealthy" - - # 요약 정보 - health_result["summary"] = { - "healthy_services": healthy_count, - "total_services": total_checks, - "health_percentage": round((healthy_count / total_checks) * 100, 1) - } - - # 초기화 에러가 있으면 포함 - if app_state["initialization_errors"]: - health_result["initialization_errors"] = app_state["initialization_errors"] - - # 환경 정보 - health_result["environment"] = { - "python_version": sys.version.split()[0], - "fastapi_version": fastapi.__version__, - "is_k8s": hasattr(settings, 'IS_K8S_ENV') and settings.IS_K8S_ENV, - "claude_model": settings.CLAUDE_MODEL - } - - # HTTP 상태 코드 결정 - if health_result["status"] == "healthy": - return health_result - elif health_result["status"] == "degraded": - return JSONResponse(status_code=200, content=health_result) # 부분 장애는 200 - else: - return JSONResponse(status_code=503, content=health_result) # 전체 장애는 503 - -# 🔧 전역 예외 처리 -@app.exception_handler(Exception) -async def global_exception_handler(request, exc): - """전역 예외 처리""" - logger.error(f"Unhandled exception: {exc}") - return JSONResponse( - status_code=500, - content={ - "error": "Internal server error", - "detail": str(exc) if settings.LOG_LEVEL.lower() == "debug" else "An unexpected error occurred", - "timestamp": datetime.now().isoformat(), - "path": str(request.url) - } - ) if __name__ == "__main__": import uvicorn - - print("🍽️ " + "="*60) - print(f" {settings.APP_TITLE} 서버 시작") - print("="*64) - print(f"📊 구성 정보:") - print(f" - Python 버전: {sys.version.split()[0]}") - print(f" - FastAPI 버전: {fastapi.__version__}") - print(f" - Vector DB Path: {settings.VECTOR_DB_PATH}") - print(f" - Claude Model: {settings.CLAUDE_MODEL}") - print(f" - 환경: {'Kubernetes' if hasattr(settings, 'IS_K8S_ENV') and settings.IS_K8S_ENV else 'Local'}") - print() - print(f"📚 문서:") - print(f" - Swagger UI: http://{settings.HOST}:{settings.PORT}/docs") - print(f" - ReDoc: http://{settings.HOST}:{settings.PORT}/redoc") - print(f" - 메인 페이지: http://{settings.HOST}:{settings.PORT}/") - print() - - try: - uvicorn.run( - "app.main:app", # 🔧 문자열로 지정 (리로드 지원) - host=settings.HOST, - port=settings.PORT, - log_level=settings.LOG_LEVEL.lower(), - reload=False, # 프로덕션에서는 False - access_log=True, - loop="uvloop" if sys.platform != "win32" else "asyncio" - ) - except KeyboardInterrupt: - print("\n🛑 서버가 사용자에 의해 중단되었습니다.") - except Exception as e: - print(f"\n❌ 서버 시작 실패: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - + uvicorn.run(app, host=settings.HOST, port=settings.PORT) \ No newline at end of file diff --git a/vector/app/models/restaurant_models.py b/vector/app/models/restaurant_models.py index cf532b5..fae4bcc 100644 --- a/vector/app/models/restaurant_models.py +++ b/vector/app/models/restaurant_models.py @@ -1,6 +1,6 @@ # app/models/restaurant_models.py from pydantic import BaseModel, Field -from typing import List, Optional, Dict, Any +from typing import List, Optional class RestaurantSearchRequest(BaseModel): """음식점 검색 요청 모델""" @@ -66,5 +66,4 @@ class ErrorResponse(BaseModel): success: bool = False error: str = Field(description="에러 코드") message: str = Field(description="에러 메시지") - timestamp: str = Field(description="에러 발생 시간") - + timestamp: str = Field(description="에러 발생 시간") \ No newline at end of file diff --git a/vector/app/models/review_models.py b/vector/app/models/review_models.py index 823b652..075b7a0 100644 --- a/vector/app/models/review_models.py +++ b/vector/app/models/review_models.py @@ -64,5 +64,4 @@ class ReviewAnalysisResponse(BaseModel): total_reviews: int = Field(description="수집된 총 리뷰 수") analysis_method: str = Field(description="분석 방법") date_filter: DateFilter = Field(description="날짜 필터 정보") - execution_time: float = Field(description="실행 시간(초)") - + execution_time: float = Field(description="실행 시간(초)") \ No newline at end of file diff --git a/vector/app/models/vector_models.py b/vector/app/models/vector_models.py index e74a6da..3322254 100644 --- a/vector/app/models/vector_models.py +++ b/vector/app/models/vector_models.py @@ -50,12 +50,12 @@ class ActionRecommendationResponse(BaseModel): message: str = Field(description="응답 메시지") claude_input: str = Field(description="Claude API에 전달한 프롬프트") claude_response: Optional[str] = Field(None, description="Claude AI 원본 응답") - parsed_response: Optional[Dict[str, Any]] = Field(None, description="파싱된 JSON 응답") # 새로 추가 + parsed_response: Optional[Dict[str, Any]] = Field(None, description="파싱된 JSON 응답") store_name: str = Field(description="가게명") food_category: str = Field(description="음식 카테고리") similar_stores_count: int = Field(description="분석된 동종 업체 수") execution_time: Optional[float] = Field(None, description="Claude API 응답 시간(초)") - json_parse_success: Optional[bool] = Field(None, description="JSON 파싱 성공 여부") # 새로 추가 + json_parse_success: Optional[bool] = Field(None, description="JSON 파싱 성공 여부") class ActionRecommendationSimpleResponse(BaseModel): """단순화된 액션 추천 응답 - JSON만 반환""" @@ -86,5 +86,4 @@ class VectorDBStatusResponse(BaseModel): """Vector DB 상태 조회 응답""" success: bool = Field(description="조회 성공 여부") status: VectorDBStatus = Field(description="DB 상태 정보") - message: str = Field(description="응답 메시지") - + message: str = Field(description="응답 메시지") \ No newline at end of file diff --git a/vector/app/services/claude_service.py b/vector/app/services/claude_service.py index 4d86004..5cf2f5a 100644 --- a/vector/app/services/claude_service.py +++ b/vector/app/services/claude_service.py @@ -1,6 +1,7 @@ # app/services/claude_service.py import json import logging +import re from typing import Optional, Dict, Any, Tuple import anthropic from ..config.settings import settings @@ -108,8 +109,6 @@ class ClaudeService: def _parse_json_response(self, raw_response: str) -> Optional[Dict[str, Any]]: """Claude의 원본 응답에서 JSON을 추출하고 파싱합니다.""" try: - import re - # JSON 블록 찾기 json_match = re.search(r'```json\s*\n(.*?)\n```', raw_response, re.DOTALL) if json_match: @@ -206,239 +205,6 @@ class ClaudeService: # 호환성을 위한 메서드들 (기존 코드가 사용할 수 있도록) # ============================================================================= - async def get_recommendation(self, prompt: str) -> Optional[str]: - """Claude API를 호출하여 추천을 받습니다. (호환성용)""" - if not self.is_ready(): - logger.error("ClaudeService가 준비되지 않음") - return None - - try: - logger.info("🤖 Claude API 호출 시작") - - response = self.client.messages.create( - model=self.model, - max_tokens=4000, - temperature=0.7, - messages=[ - { - "role": "user", - "content": prompt - } - ] - ) - - if response.content and len(response.content) > 0: - raw_response = response.content[0].text - logger.info(f"✅ Claude API 응답 성공: {len(raw_response)} 문자") - return raw_response - else: - logger.warning("⚠️ Claude API 응답이 비어있음") - return None - - except Exception as e: - logger.error(f"❌ Claude API 호출 실패: {e}") - return None - def parse_recommendation_response(self, raw_response: str) -> Optional[Dict[str, Any]]: """Claude 응답에서 JSON을 추출하고 파싱합니다. (호환성용)""" - return self._parse_json_response(raw_response) - - def build_recommendation_prompt(self, store_id: str, context: str, vector_context: Optional[str] = None) -> str: - """액션 추천용 프롬프트를 구성합니다. (호환성용)""" - - prompt_parts = [ - "당신은 소상공인을 위한 경영 컨설턴트입니다.", - f"가게 ID: {store_id}", - f"점주 요청: {context}" - ] - - if vector_context: - prompt_parts.extend([ - "\n--- 동종 업체 분석 데이터 ---", - vector_context, - "--- 분석 데이터 끝 ---\n" - ]) - - prompt_parts.extend([ - "\n위 정보를 바탕으로 실질적이고 구체적인 액션 추천을 해주세요.", - "응답은 반드시 아래 JSON 형식으로만 작성해주세요:", - "", - "```json", - "{", - ' "summary": {', - ' "current_situation": "현재 상황 요약",', - ' "key_insights": ["핵심 인사이트 1", "핵심 인사이트 2"],', - ' "priority_areas": ["우선 개선 영역 1", "우선 개선 영역 2"]', - ' },', - ' "action_plans": {', - ' "short_term": [', - ' {', - ' "title": "즉시 실행 가능한 액션",', - ' "description": "구체적인 실행 방법",', - ' "expected_impact": "예상 효과",', - ' "timeline": "1-2주",', - ' "cost": "예상 비용"', - ' }', - ' ],', - ' "mid_term": [', - ' {', - ' "title": "중기 개선 방안",', - ' "description": "구체적인 실행 방법",', - ' "expected_impact": "예상 효과",', - ' "timeline": "1-3개월",', - ' "cost": "예상 비용"', - ' }', - ' ]', - ' },', - ' "implementation_tips": ["실행 팁 1", "실행 팁 2"]', - "}", - "```" - ]) - - return "\n".join(prompt_parts) - - def create_prompt_for_api_response(self, context: str, additional_context: Optional[str] = None) -> str: - """API 응답용 프롬프트를 생성합니다. (호환성용)""" - return self._build_action_prompt(context, additional_context) - - async def generate_action_recommendations_optimized( - self, - context: str, - additional_context: Optional[str] = None - ) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: - """ - 최적화된 액션 추천 생성 - - 더 명확한 JSON 지시사항 - - 토큰 효율성 개선 - - 파싱 안정성 향상 - """ - if not self.is_ready(): - return None, None - - try: - # 최적화된 프롬프트 구성 - prompt = self._build_optimized_prompt(context, additional_context) - - response = self.client.messages.create( - model=self.model, - max_tokens=3000, # 토큰 수 최적화 - temperature=0.3, # 일관성 향상 - messages=[{"role": "user", "content": prompt}] - ) - - if response.content and len(response.content) > 0: - raw_response = response.content[0].text - - # 즉시 JSON 파싱 시도 - parsed_json = self._parse_json_response_enhanced(raw_response) - - return raw_response, parsed_json - - return None, None - - except Exception as e: - logger.error(f"Claude AI 호출 실패: {e}") - return None, None - - def _build_optimized_prompt(self, context: str, additional_context: Optional[str] = None) -> str: - """최적화된 프롬프트 구성""" - - prompt_parts = [ - "당신은 소상공인 경영 컨설턴트입니다.", - f"분석 요청: {context}", - ] - - if additional_context: - prompt_parts.extend([ - "\n=== 참고 데이터 ===", - additional_context, - "==================\n" - ]) - - prompt_parts.extend([ - "위 정보를 바탕으로 실행 가능한 액션을 추천해주세요.", - "", - "⚠️ 응답은 반드시 아래 JSON 형식으로만 작성하세요:", - "다른 텍스트는 포함하지 마세요.", - "", - "```json", - "{", - ' "summary": {', - ' "current_situation": "현재 상황 요약 (50자 이내)",', - ' "key_insights": ["핵심 포인트1", "핵심 포인트2"],', - ' "priority": "high|medium|low"', - ' },', - ' "actions": [', - ' {', - ' "title": "액션명",', - ' "description": "구체적 실행방법",', - ' "timeline": "실행기간",', - ' "cost": "예상비용",', - ' "impact": "예상효과"', - ' }', - ' ],', - ' "quick_tips": ["즉시 실행 팁1", "즉시 실행 팁2"]', - "}", - "```" - ]) - - return "\n".join(prompt_parts) - - def _parse_json_response_enhanced(self, raw_response: str) -> Optional[Dict[str, Any]]: - """향상된 JSON 파싱""" - try: - # 1. JSON 블록 추출 - json_match = re.search(r'```json\s*(\{.*?\})\s*```', raw_response, re.DOTALL) - if json_match: - json_str = json_match.group(1) - else: - # 2. 직접 JSON 찾기 - json_match = re.search(r'(\{.*\})', raw_response, re.DOTALL) - if json_match: - json_str = json_match.group(1) - else: - return None - - # 3. JSON 파싱 - return json.loads(json_str) - - except json.JSONDecodeError as e: - logger.error(f"JSON 파싱 오류: {e}") - return None - except Exception as e: - logger.error(f"JSON 추출 실패: {e}") - return None - - # ============================================================================= - # 헬스체크 및 상태 확인 메서드들 - # ============================================================================= - - def get_health_status(self) -> Dict[str, Any]: - """ClaudeService 상태 확인""" - try: - status = { - "service": "claude_api", - "status": "healthy" if self.is_ready() else "unhealthy", - "model": self.model, - "api_key_configured": bool(settings.CLAUDE_API_KEY and settings.CLAUDE_API_KEY.strip()), - "timestamp": self._get_timestamp() - } - - if self.initialization_error: - status["initialization_error"] = self.initialization_error - status["status"] = "error" - - return status - - except Exception as e: - return { - "service": "claude_api", - "status": "error", - "error": str(e), - "timestamp": self._get_timestamp() - } - - def _get_timestamp(self) -> str: - """현재 시간 문자열 반환""" - from datetime import datetime - return datetime.now().isoformat() \ No newline at end of file + return self._parse_json_response(raw_response) \ No newline at end of file diff --git a/vector/app/services/restaurant_service.py b/vector/app/services/restaurant_service.py index 5a7100e..a2054cf 100644 --- a/vector/app/services/restaurant_service.py +++ b/vector/app/services/restaurant_service.py @@ -1,4 +1,4 @@ -# app/services/restaurant_service.py (수정된 버전) +# app/services/restaurant_service.py import aiohttp import asyncio import logging @@ -70,6 +70,87 @@ class RestaurantService: logger.error(f"가게 검색 중 오류: {str(e)}") return None + async def find_similar_stores(self, region: str, food_category: str, max_count: int = 50) -> List[RestaurantInfo]: + """ + 동종 업체를 찾습니다. + + Args: + region: 지역 + food_category: 음식 카테고리 + max_count: 최대 검색 개수 + + Returns: + 동종 업체 목록 + """ + try: + logger.info(f"동종 업체 검색 시작: region={region}, food_category={food_category}") + + # 검색 쿼리 생성 (음식 카테고리만 포함) + search_query = self._clean_food_category(food_category) + logger.info(f"음식점 수집 요청: query='{search_query}' region='{region}'") + + similar_stores = [] + + async with aiohttp.ClientSession(timeout=self.timeout) as session: + # 페이지별로 검색 (최대 5페이지) + max_pages = min(5, (max_count // 15) + 1) + + for page in range(1, max_pages + 1): + if len(similar_stores) >= max_count: + break + + url = f"{self.base_url}/collect" + payload = { + "query": search_query, # 음식 카테고리만 포함 + "region": region, # 지역 정보는 별도 파라미터 + "size": 15, + "pages": 1, # 페이지별로 하나씩 호출 + "save_to_file": False + } + + logger.info(f"동종 업체 검색 ({page}/{max_pages}): {url}") + + try: + async with session.post(url, json=payload) as response: + if response.status == 200: + data = await response.json() + restaurants = data.get('restaurants', []) + + for restaurant_data in restaurants: + if len(similar_stores) >= max_count: + break + + try: + restaurant = RestaurantInfo(**restaurant_data) + + # 카테고리 유사성 검사 + if self._is_similar_category(food_category, restaurant.category_name): + similar_stores.append(restaurant) + + except Exception as e: + logger.warning(f"음식점 데이터 파싱 실패: {e}") + continue + + logger.info(f"페이지 {page} 완료: {len(restaurants)}개 발견, 총 {len(similar_stores)}개 수집") + + else: + logger.warning(f"페이지 {page} API 호출 실패: HTTP {response.status}") + break + + except Exception as e: + logger.warning(f"페이지 {page} 처리 실패: {e}") + continue + + # API 호출 간격 조절 + await asyncio.sleep(settings.REQUEST_DELAY) + + logger.info(f"동종 업체 검색 완료: {len(similar_stores)}개 발견") + return similar_stores + + except Exception as e: + logger.error(f"동종 업체 검색 중 오류: {str(e)}") + return [] + def _clean_food_category(self, food_category: str) -> str: """ 음식 카테고리를 정리하여 검색 키워드로 변환합니다. @@ -99,96 +180,12 @@ class RestaurantService: if not keywords: return "음식점" - # 🔧 지역 정보는 포함하지 않고 음식 카테고리만 반환 + # 지역 정보는 포함하지 않고 음식 카테고리만 반환 return ' '.join(keywords) - async def find_similar_stores(self, region: str, food_category: str, max_count: int = 50) -> List[RestaurantInfo]: + def _is_similar_category(self, target_category: str, restaurant_category: str) -> bool: """ - 동종 업체를 찾습니다. - - Args: - region: 지역 - food_category: 음식 카테고리 - max_count: 최대 검색 개수 - - Returns: - 동종 업체 목록 - """ - try: - logger.info(f"동종 업체 검색 시작: region={region}, food_category={food_category}") - - # 🔧 검색 쿼리 생성 (음식 카테고리만 포함) - search_query = self._clean_food_category(food_category) - logger.info(f"음식점 수집 요청: query='{search_query}' region='{region}' size=15 pages=1 save_to_file=False") - - similar_stores = [] - - async with aiohttp.ClientSession(timeout=self.timeout) as session: - # 페이지별로 검색 (최대 5페이지) - max_pages = min(5, (max_count // 15) + 1) - - for page in range(1, max_pages + 1): - if len(similar_stores) >= max_count: - break - - url = f"{self.base_url}/collect" - # 🔧 수정된 payload - query에는 음식 카테고리만, region은 분리 - payload = { - "query": search_query, # 🔧 음식 카테고리만 포함 - "region": region, # 🔧 지역 정보는 별도 파라미터 - "size": 15, - "pages": 1, # 페이지별로 하나씩 호출 - "save_to_file": False - } - - logger.info(f"동종 업체 검색 페이지 {page}: query='{search_query}' region='{region}'") - - try: - async with session.post(url, json=payload) as response: - if response.status == 200: - data = await response.json() - restaurants = data.get('restaurants', []) - - for restaurant_data in restaurants: - if len(similar_stores) >= max_count: - break - - try: - restaurant = RestaurantInfo(**restaurant_data) - - # 카테고리 필터링 - restaurant_category = extract_food_category(restaurant.category_name) - if self._is_similar_food_category(food_category, restaurant_category): - similar_stores.append(restaurant) - logger.debug(f"유사 카테고리 매치: {restaurant.place_name} ({restaurant_category})") - - except Exception as e: - logger.warning(f"음식점 데이터 파싱 실패: {e}") - continue - - logger.info(f"페이지 {page} 완료: {len(restaurants)}개 음식점 수집") - - else: - logger.warning(f"동종 업체 검색 실패 (페이지 {page}): HTTP {response.status}") - continue - - except Exception as e: - logger.warning(f"페이지 {page} 검색 중 오류: {e}") - continue - - # API 요청 제한을 위한 지연 - await asyncio.sleep(settings.REQUEST_DELAY) - - logger.info(f"동종 업체 검색 완료: 총 {len(similar_stores)}개") - return similar_stores - - except Exception as e: - logger.error(f"동종 업체 검색 중 오류: {str(e)}") - return [] - - def _is_similar_food_category(self, target_category: str, restaurant_category: str) -> bool: - """ - 음식 카테고리가 유사한지 확인합니다. + 두 카테고리가 유사한지 판단합니다. Args: target_category: 대상 카테고리 @@ -231,5 +228,4 @@ class RestaurantService: return response.status == 200 except Exception as e: logger.error(f"Restaurant API 헬스체크 실패: {e}") - return False - + return False \ No newline at end of file diff --git a/vector/app/services/review_service.py b/vector/app/services/review_service.py index c3b7f48..eb31b8a 100644 --- a/vector/app/services/review_service.py +++ b/vector/app/services/review_service.py @@ -1,4 +1,4 @@ -# vector/app/services/review_service.py +# app/services/review_service.py import aiohttp import asyncio import logging @@ -10,7 +10,7 @@ from ..models.restaurant_models import RestaurantInfo logger = logging.getLogger(__name__) class ReviewService: - """리뷰 API 연동 서비스 (본인 가게 우선 처리 강화)""" + """리뷰 API 연동 서비스""" def __init__(self): self.base_url = settings.get_review_api_url() @@ -18,7 +18,7 @@ class ReviewService: async def collect_store_reviews(self, store_id: str, max_reviews: int = 100) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]: """ - 단일 가게의 리뷰를 수집합니다. (본인 가게용 강화 처리) + 단일 가게의 리뷰를 수집합니다. Args: store_id: 카카오맵 가게 ID @@ -66,15 +66,9 @@ class ReviewService: converted_store_info = self._convert_store_info(store_info) converted_reviews = self._convert_reviews(filtered_reviews) - if converted_store_info and converted_reviews: - logger.info(f"✅ 리뷰 수집 성공: {len(converted_reviews)}개") - return converted_store_info, converted_reviews - else: - logger.warning("⚠️ 변환된 데이터가 비어있음") - return None, [] + return converted_store_info, converted_reviews else: - error_msg = data.get('message', 'Unknown error') - logger.error(f"❌ 리뷰 분석 실패: {error_msg}") + logger.warning(f"⚠️ 리뷰 수집 실패: {data.get('message', 'Unknown error')}") return None, [] else: logger.error(f"❌ Review API 호출 실패: HTTP {response.status}") @@ -83,313 +77,123 @@ class ReviewService: return None, [] except asyncio.TimeoutError: - logger.error("❌ Review API 호출 타임아웃") + logger.error("⏰ Review API 호출 타임아웃") return None, [] except Exception as e: logger.error(f"❌ 리뷰 수집 중 오류: {str(e)}") return None, [] def _filter_quality_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """리뷰 품질 필터링""" - try: - filtered = [] + """ + 리뷰 품질을 기준으로 필터링합니다. + + Args: + reviews: 원본 리뷰 목록 - for review in reviews: - content = review.get('content', '').strip() - rating = review.get('rating', 0) - - # 품질 기준 - if (len(content) >= 10 and # 최소 10자 이상 - rating > 0 and # 별점이 있어야 함 - not self._is_spam_review(content)): # 스팸 제외 - filtered.append(review) + Returns: + 필터링된 리뷰 목록 + """ + filtered_reviews = [] + + for review in reviews: + content = review.get('content', '').strip() - logger.debug(f"품질 필터링: {len(reviews)} → {len(filtered)}") - return filtered - - except Exception as e: - logger.warning(f"⚠️ 리뷰 필터링 실패: {e}") - return reviews # 실패 시 원본 반환 + # 품질 기준 + if ( + len(content) >= 10 and # 최소 10자 이상 + not self._is_spam_content(content) and # 스팸 내용이 아님 + review.get('rating', 0) > 0 # 별점이 있음 + ): + filtered_reviews.append(review) + + return filtered_reviews - def _is_spam_review(self, content: str) -> bool: - """스팸 리뷰 판별""" - try: - spam_keywords = [ - "추천추천", "최고최고", "맛있어요맛있어요", - "좋아요좋아요", "ㅎㅎㅎㅎ", "ㅋㅋㅋㅋ", - "굿굿굿", "Nice", "Good" - ] + def _is_spam_content(self, content: str) -> bool: + """ + 스팸 또는 의미없는 리뷰인지 판단합니다. + + Args: + content: 리뷰 내용 - content_lower = content.lower() - - # 스팸 키워드 확인 - for keyword in spam_keywords: - if keyword.lower() in content_lower: - return True - - # 너무 짧거나 반복 문자 확인 - if len(set(content.replace(' ', ''))) < 3: # 고유 문자 3개 미만 + Returns: + 스팸 여부 + """ + spam_patterns = [ + 'ㅋㅋㅋㅋㅋ', + 'ㅎㅎㅎㅎㅎ', + '!!!!!', + '.....', + '???', + '^^', + '굿굿굿', + '최고최고' + ] + + content_lower = content.lower() + + # 스팸 패턴 체크 + for pattern in spam_patterns: + if pattern in content_lower: return True - - return False - - except Exception as e: - logger.warning(f"⚠️ 스팸 판별 실패: {e}") - return False + + # 너무 반복적인 문자 체크 + if len(set(content)) < 3: # 고유 문자가 3개 미만 + return True + + return False - def _convert_store_info(self, store_info): - """가게 정보 변환 (강화된 버전)""" + def _convert_store_info(self, store_info: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: + """ + API 응답의 가게 정보를 내부 형식으로 변환합니다. + + Args: + store_info: API 응답 가게 정보 + + Returns: + 변환된 가게 정보 + """ if not store_info: - logger.warning("⚠️ 가게 정보가 비어있음") return None - try: - converted = { - 'id': str(store_info.get('id', '')), - 'name': str(store_info.get('name', '')), - 'category': str(store_info.get('category', '')), - 'rating': str(store_info.get('rating', '')), - 'review_count': str(store_info.get('review_count', '')), - 'status': str(store_info.get('status', '')), - 'address': str(store_info.get('address', '')) - } - - # 필수 필드 확인 (ID와 이름은 반드시 있어야 함) - if not converted['id'] or not converted['name']: - logger.warning(f"⚠️ 필수 가게 정보 누락: ID={converted['id']}, Name={converted['name']}") - return None - - logger.debug(f"가게 정보 변환 성공: {converted['name']}") - return converted - - except Exception as e: - logger.error(f"❌ 가게 정보 변환 실패: {e}") - return None + return { + 'id': store_info.get('id', ''), + 'name': store_info.get('name', ''), + 'category': store_info.get('category', ''), + 'rating': store_info.get('rating', ''), + 'review_count': store_info.get('review_count', ''), + 'status': store_info.get('status', ''), + 'address': store_info.get('address', '') + } - def _convert_reviews(self, reviews): - """리뷰 목록 변환 (강화된 버전)""" - if not reviews: - logger.warning("⚠️ 리뷰 목록이 비어있음") - return [] + def _convert_reviews(self, reviews: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + API 응답의 리뷰를 내부 형식으로 변환합니다. + Args: + reviews: API 응답 리뷰 목록 + + Returns: + 변환된 리뷰 목록 + """ converted_reviews = [] - for i, review in enumerate(reviews): - try: - # 안전한 형변환 - rating = 0 - try: - rating = int(review.get('rating', 0)) - except (ValueError, TypeError): - rating = 0 - - likes = 0 - try: - likes = int(review.get('likes', 0)) - except (ValueError, TypeError): - likes = 0 - - photo_count = 0 - try: - photo_count = int(review.get('photo_count', 0)) - except (ValueError, TypeError): - photo_count = 0 - - converted_review = { - 'reviewer_name': str(review.get('reviewer_name', 'Anonymous')), - 'rating': rating, - 'date': str(review.get('date', '')), - 'content': str(review.get('content', '')).strip(), - 'badges': list(review.get('badges', [])), - 'likes': likes, - 'photo_count': photo_count, - 'has_photos': bool(photo_count > 0) - } - - # 기본 검증 - if converted_review['content'] and converted_review['rating'] > 0: - converted_reviews.append(converted_review) - else: - logger.debug(f"⚠️ 리뷰 {i+1} 품질 미달로 제외") - - except Exception as e: - logger.warning(f"⚠️ 리뷰 {i+1} 변환 실패: {e}") - continue + for review in reviews: + converted_review = { + 'reviewer_name': review.get('reviewer_name', ''), + 'reviewer_level': review.get('reviewer_level', ''), + 'reviewer_stats': review.get('reviewer_stats', {}), + 'rating': review.get('rating', 0), + 'date': review.get('date', ''), + 'content': review.get('content', ''), + 'badges': review.get('badges', []), + 'likes': review.get('likes', 0), + 'photo_count': review.get('photo_count', 0), + 'has_photos': review.get('has_photos', False) + } + converted_reviews.append(converted_review) - logger.info(f"리뷰 변환 완료: {len(reviews)} → {len(converted_reviews)}") return converted_reviews - async def collect_multiple_stores_reviews(self, restaurants: List[RestaurantInfo]) -> List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]]: - """ - 여러 가게의 리뷰를 수집합니다. - - Args: - restaurants: 음식점 목록 - - Returns: - (store_id, 가게 정보, 리뷰 목록) 튜플의 리스트 - """ - try: - logger.info(f"다중 가게 리뷰 수집 시작: {len(restaurants)}개 가게") - - results = [] - - # 동시성 제한을 위해 세마포어 사용 - semaphore = asyncio.Semaphore(3) # 최대 3개 동시 요청 - - async def collect_single_store(restaurant: RestaurantInfo): - async with semaphore: - try: - # 카카오맵 place_url에서 store_id 추출 시도 - store_id = self._extract_store_id_from_url(restaurant.place_url) - - if not store_id: - # URL에서 추출 실패 시 restaurant.id 사용 - store_id = restaurant.id - - if not store_id: - logger.warning(f"Store ID를 찾을 수 없음: {restaurant.place_name}") - return None - - logger.info(f"가게 {restaurant.place_name} 리뷰 수집 중...") - - # 리뷰 수집 (최대 50개로 제한) - store_info, reviews = await self.collect_store_reviews( - store_id, - max_reviews=settings.MAX_REVIEWS_PER_RESTAURANT - ) - - if store_info and reviews: - return (store_id, store_info, reviews) - else: - logger.warning(f"가게 {restaurant.place_name}의 리뷰 수집 실패") - return None - - except Exception as e: - logger.error(f"가게 {restaurant.place_name} 리뷰 수집 중 오류: {e}") - return None - - finally: - # API 요청 제한을 위한 지연 - await asyncio.sleep(settings.REQUEST_DELAY) - - # 병렬 처리 - tasks = [collect_single_store(restaurant) for restaurant in restaurants] - results_raw = await asyncio.gather(*tasks, return_exceptions=True) - - # 성공한 결과만 필터링 - for result in results_raw: - if result and not isinstance(result, Exception): - results.append(result) - - logger.info(f"다중 가게 리뷰 수집 완료: {len(results)}개 성공") - return results - - except Exception as e: - logger.error(f"다중 가게 리뷰 수집 중 오류: {str(e)}") - return [] - - def _extract_store_id_from_url(self, place_url: str) -> Optional[str]: - """ - 카카오맵 URL에서 store_id를 추출합니다. - - Args: - place_url: 카카오맵 장소 URL - - Returns: - 추출된 store_id 또는 None - """ - try: - if not place_url: - return None - - # URL 패턴: https://place.map.kakao.com/123456789 - import re - pattern = r'/(\d+)(?:\?|$|#)' - match = re.search(pattern, place_url) - - if match: - store_id = match.group(1) - logger.debug(f"URL에서 store_id 추출: {store_id}") - return store_id - else: - logger.debug(f"URL에서 store_id 추출 실패: {place_url}") - return None - - except Exception as e: - logger.warning(f"store_id 추출 중 오류: {e}") - return None - - def _build_store_info_from_restaurant(self, restaurant: RestaurantInfo) -> Dict[str, Any]: - """ - RestaurantInfo를 store_info 형식으로 변환합니다. - - Args: - restaurant: RestaurantInfo 객체 - - Returns: - store_info 딕셔너리 - """ - try: - return { - 'id': restaurant.id, - 'name': restaurant.place_name, - 'category': restaurant.category_name, - 'rating': '', # API에서 제공되지 않음 - 'review_count': '', # API에서 제공되지 않음 - 'status': '', # API에서 제공되지 않음 - 'address': restaurant.address_name - } - except Exception as e: - logger.error(f"RestaurantInfo 변환 실패: {e}") - return { - 'id': '', - 'name': '', - 'category': '', - 'rating': '', - 'review_count': '', - 'status': '', - 'address': '' - } - - def _convert_single_review_data(self, review_data: dict) -> dict: - """ - 단일 리뷰 데이터 변환 - - Args: - review_data: API 응답의 단일 리뷰 데이터 - - Returns: - 변환된 리뷰 데이터 - """ - try: - return { - 'reviewer_name': review_data.get('reviewer_name', ''), - 'reviewer_level': review_data.get('reviewer_level', ''), - 'reviewer_stats': review_data.get('reviewer_stats', {}), - 'rating': int(review_data.get('rating', 0)), - 'date': review_data.get('date', ''), - 'content': review_data.get('content', ''), - 'badges': review_data.get('badges', []), - 'likes': int(review_data.get('likes', 0)), - 'photo_count': int(review_data.get('photo_count', 0)), - 'has_photos': bool(review_data.get('has_photos', False)) - } - except Exception as e: - logger.warning(f"단일 리뷰 데이터 변환 실패: {e}") - return { - 'reviewer_name': 'Unknown', - 'reviewer_level': '', - 'reviewer_stats': {}, - 'rating': 0, - 'date': '', - 'content': '', - 'badges': [], - 'likes': 0, - 'photo_count': 0, - 'has_photos': False - } - async def health_check(self) -> bool: """ Review API 상태를 확인합니다. @@ -401,67 +205,7 @@ class ReviewService: async with aiohttp.ClientSession(timeout=self.timeout) as session: url = f"{self.base_url}/health" async with session.get(url) as response: - is_healthy = response.status == 200 - if is_healthy: - logger.debug("Review API 헬스체크 성공") - else: - logger.warning(f"Review API 헬스체크 실패: HTTP {response.status}") - return is_healthy + return response.status == 200 except Exception as e: logger.error(f"Review API 헬스체크 실패: {e}") - return False - - def get_api_info(self) -> Dict[str, Any]: - """ - Review API 정보를 반환합니다. - - Returns: - API 정보 딕셔너리 - """ - return { - "service_name": "Review API Service", - "base_url": self.base_url, - "timeout": self.timeout.total, - "max_reviews_per_restaurant": settings.MAX_REVIEWS_PER_RESTAURANT, - "request_delay": settings.REQUEST_DELAY - } - - async def test_connection(self) -> Dict[str, Any]: - """ - Review API 연결을 테스트합니다. - - Returns: - 테스트 결과 - """ - test_result = { - "service": "Review API", - "base_url": self.base_url, - "status": "unknown", - "response_time": None, - "error": None - } - - try: - import time - start_time = time.time() - - is_healthy = await self.health_check() - - response_time = time.time() - start_time - test_result["response_time"] = round(response_time, 3) - - if is_healthy: - test_result["status"] = "healthy" - logger.info(f"Review API 연결 테스트 성공: {response_time:.3f}초") - else: - test_result["status"] = "unhealthy" - test_result["error"] = "Health check failed" - logger.warning("Review API 연결 테스트 실패: 헬스체크 실패") - - except Exception as e: - test_result["status"] = "error" - test_result["error"] = str(e) - logger.error(f"Review API 연결 테스트 오류: {e}") - - return test_result - + return False \ No newline at end of file diff --git a/vector/app/services/vector_service.py b/vector/app/services/vector_service.py index 9920c32..2beff20 100644 --- a/vector/app/services/vector_service.py +++ b/vector/app/services/vector_service.py @@ -1,10 +1,8 @@ -# app/services/vector_service.py (개선된 버전) +# app/services/vector_service.py import os import json import logging -import time -import shutil -import signal +import tempfile from datetime import datetime from typing import List, Dict, Any, Optional, Tuple import chromadb @@ -19,7 +17,7 @@ from ..utils.data_utils import ( logger = logging.getLogger(__name__) class VectorService: - """Vector DB 서비스 (개선된 초기화 로직)""" + """Vector DB 서비스""" def __init__(self): self.db_path = settings.VECTOR_DB_PATH @@ -36,15 +34,15 @@ class VectorService: self._safe_initialize() def _safe_initialize(self): - """안전한 초기화 - 개선된 로직""" + """안전한 초기화""" try: logger.info("🔧 VectorService 초기화 시작...") # 1단계: 디렉토리 권한 확인 self._ensure_directory_permissions() - # 2단계: ChromaDB 초기화 (호환성 확인 포함) - self._initialize_chromadb_with_compatibility_check() + # 2단계: ChromaDB 초기화 + self._initialize_chromadb() # 3단계: 임베딩 모델 로드 self._initialize_embedding_model() @@ -57,7 +55,7 @@ class VectorService: logger.info("🔄 서비스는 런타임에 재시도 가능합니다") def _ensure_directory_permissions(self): - """Vector DB 디렉토리 권한을 확인하고 생성합니다""" + """Vector DB 디렉토리 권한을 확인하고 생성""" try: logger.info(f"📁 Vector DB 디렉토리 설정: {self.db_path}") @@ -78,7 +76,6 @@ class VectorService: except Exception as chmod_error: logger.warning(f"⚠️ 권한 변경 실패: {chmod_error}") # 임시 디렉토리로 대체 - import tempfile temp_dir = tempfile.mkdtemp(prefix="vectordb_") logger.info(f"🔄 임시 디렉토리 사용: {temp_dir}") self.db_path = temp_dir @@ -93,535 +90,175 @@ class VectorService: logger.info("✅ 디렉토리 권한 확인 완료") except Exception as test_error: raise Exception(f"디렉토리 권한 테스트 실패: {test_error}") - + except Exception as e: - logger.error(f"❌ 디렉토리 설정 실패: {e}") - raise + raise Exception(f"디렉토리 설정 실패: {e}") - def _initialize_chromadb_with_compatibility_check(self): - """ChromaDB 초기화 (호환성 확인 포함)""" - max_retries = 3 - retry_delay = 2 - - for attempt in range(max_retries): - try: - logger.info(f"🔄 ChromaDB 초기화 시도 {attempt + 1}/{max_retries}") - - # 1단계: 기존 DB 호환성 확인 - existing_db_valid = self._check_existing_db_compatibility() - - # 2단계: ChromaDB 클라이언트 생성 - self._create_chromadb_client() - - # 3단계: 컬렉션 초기화 - self._initialize_collection(existing_db_valid) - - logger.info("✅ ChromaDB 초기화 완료") - return # 성공 시 루프 종료 - - except Exception as e: - logger.error(f"❌ ChromaDB 초기화 실패 (시도 {attempt + 1}): {e}") - - if attempt < max_retries - 1: - logger.info(f"🔄 {retry_delay}초 후 재시도...") - time.sleep(retry_delay) - retry_delay *= 2 # 지수 백오프 - else: - raise Exception(f"ChromaDB 초기화 최종 실패: {e}") - - def _check_existing_db_compatibility(self): - """기존 DB 호환성 확인""" + def _initialize_chromadb(self): + """ChromaDB 초기화""" try: - if not os.path.exists(self.db_path): - logger.info("📁 새 DB 디렉토리 - 스키마 확인 불필요") - return False - - db_files = [f for f in os.listdir(self.db_path) if not f.startswith('.')] - if not db_files: - logger.info("📁 빈 DB 디렉토리 - 스키마 확인 불필요") - return False - - logger.info(f"📁 기존 DB 파일 발견: {db_files}") - - # 실제 호환성 테스트 - logger.info("🔍 기존 DB 호환성 테스트 중...") - - try: - # 테스트용 클라이언트 생성 - test_client = chromadb.PersistentClient(path=self.db_path) - test_client.heartbeat() - - # 컬렉션 접근 시도 - try: - test_collection = test_client.get_collection(name=self.collection_name) - count = test_collection.count() - logger.info(f"✅ 기존 DB 호환성 확인 완료: {count}개 벡터 존재") - return True - except Exception as collection_error: - logger.info(f"📝 기존 컬렉션 없음 (정상): {collection_error}") - return True # DB는 정상, 컬렉션만 새로 생성하면 됨 - - except Exception as compatibility_error: - # 실제 호환성 문제 발견 - logger.warning(f"⚠️ 실제 호환성 문제 발견: {compatibility_error}") - self._backup_incompatible_db() - return False - - except Exception as e: - logger.warning(f"⚠️ 호환성 확인 중 오류: {e}") - return False - - def _backup_incompatible_db(self): - """호환성 문제가 있는 DB 백업""" - try: - backup_path = f"{self.db_path}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - logger.warning(f"🔄 호환성 문제로 기존 DB 백업: {backup_path}") - - shutil.move(self.db_path, backup_path) - logger.info(f"✅ 기존 DB 백업 완료: {backup_path}") - - # 새 디렉토리 생성 - os.makedirs(self.db_path, exist_ok=True) - - # 오래된 백업 정리 (7일 이상) - self._cleanup_old_backups() - - except Exception as backup_error: - logger.warning(f"⚠️ 백업 실패, 강제 삭제 진행: {backup_error}") - shutil.rmtree(self.db_path, ignore_errors=True) - os.makedirs(self.db_path, exist_ok=True) - - def _cleanup_old_backups(self): - """오래된 백업 파일 정리 (7일 이상)""" - try: - base_path = os.path.dirname(self.db_path) - backup_pattern = f"{os.path.basename(self.db_path)}_backup_" - cutoff_time = time.time() - (7 * 24 * 3600) # 7일 전 - - for item in os.listdir(base_path): - if item.startswith(backup_pattern): - backup_path = os.path.join(base_path, item) - if os.path.isdir(backup_path) and os.path.getctime(backup_path) < cutoff_time: - shutil.rmtree(backup_path, ignore_errors=True) - logger.info(f"🗑️ 오래된 백업 삭제: {backup_path}") - - except Exception as e: - logger.warning(f"⚠️ 백업 정리 중 오류: {e}") - - def _create_chromadb_client(self): - """ChromaDB 클라이언트 생성""" - try: - # 최신 버전 호환 설정 - chroma_settings = ChromaSettings( - anonymized_telemetry=False, - allow_reset=True, - is_persistent=True - ) + logger.info("🔧 ChromaDB 클라이언트 초기화...") + # ChromaDB 클라이언트 생성 self.client = chromadb.PersistentClient( path=self.db_path, - settings=chroma_settings + settings=ChromaSettings( + anonymized_telemetry=False, + allow_reset=True, + ) ) - logger.info("✅ ChromaDB 클라이언트 생성 성공") - except Exception as modern_error: - logger.warning(f"⚠️ 최신 설정 실패, 간단한 설정으로 재시도: {modern_error}") + # Collection 가져오기 또는 생성 + try: + self.collection = self.client.get_collection(name=self.collection_name) + logger.info(f"✅ 기존 컬렉션 연결: {self.collection_name}") + except Exception: + self.collection = self.client.create_collection( + name=self.collection_name, + metadata={"hnsw:space": "cosine"} + ) + logger.info(f"✅ 새 컬렉션 생성: {self.collection_name}") - # 간단한 설정으로 재시도 - self.client = chromadb.PersistentClient(path=self.db_path) - logger.info("✅ ChromaDB 간단 설정 클라이언트 생성 성공") - - # 연결 테스트 - try: - self.client.heartbeat() - logger.info("✅ ChromaDB 연결 테스트 성공") - except Exception as heartbeat_error: - logger.warning(f"⚠️ Heartbeat 실패 (무시): {heartbeat_error}") - - def _initialize_collection(self, existing_db_valid: bool): - """컬렉션 초기화""" - try: - if existing_db_valid: - # 기존 컬렉션 로드 시도 - try: - self.collection = self.client.get_collection(name=self.collection_name) - count = self.collection.count() - logger.info(f"✅ 기존 컬렉션 로드 성공: {self.collection_name} ({count}개 벡터)") - return - except Exception as get_error: - logger.info(f"📝 기존 컬렉션 없음, 새로 생성: {get_error}") - - # 새 컬렉션 생성 - self.collection = self.client.create_collection( - name=self.collection_name, - metadata={ - "description": "Restaurant reviews vector store", - "created_at": datetime.now().isoformat(), - "version": "1.0" - } - ) - logger.info(f"✅ 새 컬렉션 생성 성공: {self.collection_name}") - - except Exception as create_error: - logger.error(f"❌ 컬렉션 초기화 실패: {create_error}") - - # 대체 컬렉션명으로 재시도 - fallback_name = f"{self.collection_name}_{int(time.time())}" - logger.info(f"🔄 대체 컬렉션명으로 재시도: {fallback_name}") - - self.collection = self.client.create_collection( - name=fallback_name, - metadata={"description": "Restaurant reviews (fallback)"} - ) - self.collection_name = fallback_name - logger.info(f"✅ 대체 컬렉션 생성 성공: {fallback_name}") + except Exception as e: + raise Exception(f"ChromaDB 초기화 실패: {e}") def _initialize_embedding_model(self): """임베딩 모델 초기화""" try: - logger.info(f"🤖 임베딩 모델 로드 시작: {self.embedding_model_name}") - - # 캐시 디렉토리 설정 - cache_dir = os.path.join(os.path.expanduser("~"), ".cache", "sentence_transformers") - os.makedirs(cache_dir, exist_ok=True) - - # 권한 확인 - if not os.access(cache_dir, os.W_OK): - import tempfile - cache_dir = tempfile.mkdtemp(prefix="st_cache_") - logger.info(f"🔄 임시 캐시 디렉토리 사용: {cache_dir}") - - # 모델 로드 (타임아웃 설정) - def timeout_handler(signum, frame): - raise TimeoutError("임베딩 모델 로드 타임아웃") - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(300) # 5분 타임아웃 - - try: - self.embedding_model = SentenceTransformer( - self.embedding_model_name, - cache_folder=cache_dir, - device='cpu' # CPU 사용 명시 - ) - signal.alarm(0) # 타임아웃 해제 - - # 모델 테스트 - test_embedding = self.embedding_model.encode(["테스트 문장"]) - logger.info(f"✅ 임베딩 모델 로드 성공: {test_embedding.shape}") - - except TimeoutError: - signal.alarm(0) - raise Exception("임베딩 모델 로드 타임아웃 (5분)") - + logger.info(f"🤖 임베딩 모델 로드: {self.embedding_model_name}") + self.embedding_model = SentenceTransformer(self.embedding_model_name) + logger.info("✅ 임베딩 모델 로드 완료") except Exception as e: - logger.error(f"❌ 임베딩 모델 로드 실패: {e}") - raise + raise Exception(f"임베딩 모델 로드 실패: {e}") def is_ready(self) -> bool: """서비스 준비 상태 확인""" - return ( - self.client is not None and - self.collection is not None and - self.embedding_model is not None and + return all([ + self.client is not None, + self.collection is not None, + self.embedding_model is not None, self.initialization_error is None - ) + ]) - def get_initialization_error(self) -> Optional[str]: - """초기화 에러 메시지 반환""" - return self.initialization_error - - def retry_initialization(self) -> bool: - """초기화 재시도""" + async def build_vector_store( + self, + target_store_info: Dict[str, Any], + review_results: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], + food_category: str, + region: str + ) -> Dict[str, Any]: + """Vector Store를 구축합니다""" + if not self.is_ready(): + raise Exception(f"VectorService가 준비되지 않음: {self.initialization_error}") + try: - logger.info("🔄 VectorService 초기화 재시도...") + logger.info("🚀 Vector Store 구축 시작") - # 상태 초기화 - self.client = None - self.collection = None - self.embedding_model = None - self.initialization_error = None + processed_count = 0 + documents = [] + embeddings = [] + metadatas = [] + ids = [] - # 재초기화 - self._safe_initialize() - - return self.is_ready() - - except Exception as e: - self.initialization_error = str(e) - logger.error(f"❌ 초기화 재시도 실패: {e}") - return False - - def reset_vector_db(self) -> Dict[str, Any]: - """Vector DB 완전 리셋""" - try: - logger.info("🔄 Vector DB 완전 리셋 시작...") - - # 기존 클라이언트 정리 - self.client = None - self.collection = None - - # DB 디렉토리 완전 삭제 - if os.path.exists(self.db_path): - shutil.rmtree(self.db_path, ignore_errors=True) - logger.info(f"✅ 기존 DB 디렉토리 삭제: {self.db_path}") - - # 새 디렉토리 생성 - os.makedirs(self.db_path, exist_ok=True) - logger.info(f"✅ 새 DB 디렉토리 생성: {self.db_path}") - - # 재초기화 - success = self.retry_initialization() - - if success: - return { - "success": True, - "message": "Vector DB가 성공적으로 리셋되었습니다", - "collection_name": self.collection_name, - "db_path": self.db_path - } - else: - return { - "success": False, - "error": self.initialization_error or "재초기화 실패" - } - - except Exception as e: - logger.error(f"❌ Vector DB 리셋 실패: {e}") - return { - "success": False, - "error": str(e) - } - - def get_health_status(self) -> Dict[str, Any]: - """서비스 상태 확인""" - try: - status = { - "service": "vector_db", - "status": "healthy" if self.is_ready() else "unhealthy", - "db_path": self.db_path, - "collection_name": self.collection_name, - "embedding_model": self.embedding_model_name, - "timestamp": datetime.now().isoformat() - } - - if self.initialization_error: - status["initialization_error"] = self.initialization_error - status["status"] = "error" - - # 상세 상태 - status["components"] = { - "client": "connected" if self.client else "disconnected", - "collection": "ready" if self.collection else "not_ready", - "embedding": "loaded" if self.embedding_model else "not_loaded" - } - - # 컬렉션 정보 - if self.collection: + for store_id, store_info, reviews in review_results: try: - status["collection_count"] = self.collection.count() + # 텍스트 추출 및 임베딩 생성 + text_for_embedding = extract_text_for_embedding(store_info, reviews) + embedding = self.embedding_model.encode(text_for_embedding) + + # 메타데이터 생성 + metadata = create_metadata(store_info, food_category, region) + + # 문서 ID 생성 + document_id = create_store_hash(store_id, store_info.get('name', ''), region) + + # 문서 데이터 생성 + document_text = combine_store_and_reviews(store_info, reviews) + + documents.append(document_text) + embeddings.append(embedding.tolist()) + metadatas.append(metadata) + ids.append(document_id) + + processed_count += 1 + except Exception as e: - status["collection_error"] = str(e) - - return status - - except Exception as e: - return { - "service": "vector_db", - "status": "error", - "error": str(e), - "timestamp": datetime.now().isoformat() - } - - def get_store_context(self, store_id: str) -> Optional[str]: - """스토어 ID로 컨텍스트 조회""" - try: - if not self.is_ready(): - logger.warning("VectorService가 준비되지 않음") - return None - - # 스토어 ID로 검색 - results = self.collection.get( - where={"store_id": store_id} - ) - - if not results or not results.get('documents'): - logger.warning(f"스토어 ID '{store_id}'에 대한 데이터 없음") - return None - - # 컨텍스트 생성 - documents = results['documents'] - metadatas = results.get('metadatas', []) - - context_parts = [] - for i, doc in enumerate(documents): - metadata = metadatas[i] if i < len(metadatas) else {} - - # 메타데이터 정보 추가 - if metadata: - context_parts.append(f"[{metadata.get('store_name', 'Unknown')}]") - - context_parts.append(doc) - context_parts.append("---") - - return "\n".join(context_parts) - - except Exception as e: - logger.error(f"스토어 컨텍스트 조회 실패: {e}") - return None - - async def build_vector_store(self, store_info: Dict[str, Any], similar_stores_data: List[Tuple[str, Dict[str, Any], List[Dict[str, Any]]]], food_category: str, region: str) -> Dict[str, Any]: - """Vector Store 구축 (완전 수정된 버전)""" - try: - if not self.is_ready(): - # 재시도 한 번 더 - if not self.retry_initialization(): - raise Exception("Vector DB가 초기화되지 않았습니다") - - logger.info(f"Vector Store 구축 시작: {len(similar_stores_data)}개 스토어") - - # 통계 초기화 - stats = { - "total_processed": 0, - "newly_added": 0, - "updated": 0, - "duplicates": 0, - "errors": 0 - } - - # 배치 처리용 리스트 - all_documents = [] - all_embeddings = [] - all_metadatas = [] - all_ids = [] - - # 각 스토어 처리 - for store_id, store_data, reviews in similar_stores_data: - try: - # 데이터 검증 - if not store_data or not reviews: - logger.warning(f"스토어 '{store_id}' 데이터 부족: store_data={bool(store_data)}, reviews={len(reviews) if reviews else 0}") - stats["errors"] += 1 - continue - - # 올바른 create_store_hash 호출 - store_hash = create_store_hash( - store_id=store_id, - store_name=store_data.get('place_name', ''), - region=region - ) - - # ChromaDB에서 직접 중복 확인 (is_duplicate_store 함수 사용하지 않음) - try: - # 같은 store_id로 이미 저장된 데이터가 있는지 확인 - existing_data = self.collection.get( - where={"store_id": store_id}, - limit=1 - ) - - if existing_data and len(existing_data.get('ids', [])) > 0: - logger.debug(f"중복 스토어 건너뛰기: {store_id}") - stats["duplicates"] += 1 - continue - - except Exception as dup_check_error: - # 중복 확인 실패는 로그만 남기고 계속 진행 - logger.warning(f"중복 확인 실패 (계속 진행): {dup_check_error}") - - # 올바른 extract_text_for_embedding 호출 - embedding_text = extract_text_for_embedding( - store_info=store_data, - reviews=reviews - ) - - # 임베딩 생성 - try: - embedding = self.embedding_model.encode(embedding_text) - embedding = embedding.tolist() # numpy array를 list로 변환 - except Exception as embed_error: - logger.error(f"임베딩 생성 실패 (store_id: {store_id}): {embed_error}") - stats["errors"] += 1 - continue - - # 올바른 create_metadata 호출 - metadata = create_metadata( - store_info=store_data, - food_category=food_category, - region=region - ) - - # 배치에 추가 - all_documents.append(embedding_text) - all_embeddings.append(embedding) - all_metadatas.append(metadata) - all_ids.append(f"{store_id}_{store_hash}") - - stats["total_processed"] += 1 - stats["newly_added"] += 1 - - if stats["total_processed"] % 10 == 0: - logger.info(f"처리 진행률: {stats['total_processed']}/{len(similar_stores_data)}") - - except Exception as store_error: - logger.error(f"음식점 처리 중 오류 (store_id: {store_id}): {store_error}") - stats["errors"] += 1 + logger.warning(f"⚠️ 가게 {store_id} 처리 실패: {e}") continue - # 배치로 벡터 저장 - if all_documents: - logger.info(f"벡터 배치 저장 시작: {len(all_documents)}개") + # Vector DB에 저장 + if documents: + self.collection.add( + documents=documents, + embeddings=embeddings, + metadatas=metadatas, + ids=ids + ) - try: - self.collection.add( - documents=all_documents, - embeddings=all_embeddings, - metadatas=all_metadatas, - ids=all_ids - ) - logger.info("✅ 벡터 배치 저장 성공") - - except Exception as save_error: - logger.error(f"❌ 벡터 저장 실패: {save_error}") - return { - "success": False, - "error": f"벡터 저장 실패: {str(save_error)}", - "statistics": stats - } + logger.info(f"✅ Vector Store 구축 완료: {processed_count}개 문서 저장") + + return { + 'success': True, + 'processed_count': processed_count, + 'message': f"{processed_count}개 문서가 Vector DB에 저장되었습니다" + } else: - logger.warning("⚠️ 저장할 벡터 데이터가 없음") - - # 최종 통계 - try: - total_vectors = self.collection.count() - logger.info(f"✅ Vector Store 구축 완료: 총 {total_vectors}개 벡터") - except Exception as count_error: - logger.warning(f"벡터 개수 확인 실패: {count_error}") - total_vectors = len(all_documents) - + return { + 'success': False, + 'error': '저장할 문서가 없습니다', + 'processed_count': 0 + } + + except Exception as e: + logger.error(f"❌ Vector Store 구축 실패: {e}") return { - "success": True, - "message": "Vector Store 구축 완료", - "statistics": stats, - "total_vectors": total_vectors, - "store_info": store_info + 'success': False, + 'error': str(e), + 'processed_count': 0 } + + def search_similar_cases_improved(self, store_id: str, context: str, limit: int = 5) -> Optional[str]: + """유사 케이스 검색 (개선된 버전)""" + if not self.is_ready(): + logger.warning("VectorService가 준비되지 않음") + return None + + try: + # 검색 쿼리 생성 + query_text = f"가게 ID: {store_id} 요청사항: {context}" + query_embedding = self.embedding_model.encode(query_text) + + # 유사도 검색 + results = self.collection.query( + query_embeddings=[query_embedding.tolist()], + n_results=limit, + include=['documents', 'metadatas', 'distances'] + ) + + if results['documents'] and results['documents'][0]: + # 검색 결과 요약 + context_parts = [] + for i, (doc, metadata, distance) in enumerate(zip( + results['documents'][0], + results['metadatas'][0], + results['distances'][0] + )): + store_name = metadata.get('store_name', 'Unknown') + category = metadata.get('food_category', 'Unknown') + context_parts.append( + f"유사 가게 {i+1}: {store_name} ({category}) - 유사도: {1-distance:.3f}" + ) + + return "\n".join(context_parts) + + return None except Exception as e: - logger.error(f"Vector Store 구축 전체 실패: {e}") - return { - "success": False, - "error": str(e), - "statistics": stats if 'stats' in locals() else { - "total_processed": 0, - "newly_added": 0, - "updated": 0, - "duplicates": 0, - "errors": 1 - } - } - + logger.error(f"유사 케이스 검색 실패: {e}") + return None + def get_db_status(self) -> Dict[str, Any]: - """Vector DB 상태 정보를 반환합니다.""" + """DB 상태 정보 반환""" try: if not self.is_ready(): return { @@ -630,44 +267,23 @@ class VectorService: 'total_stores': 0, 'db_path': self.db_path, 'status': 'not_ready', - 'initialization_error': self.initialization_error + 'error': self.initialization_error } - # 문서 개수 확인 - try: - total_documents = self.collection.count() - except Exception as e: - logger.warning(f"문서 개수 확인 실패: {e}") - total_documents = 0 - - # 고유 가게 수 확인 (store_id 기준) - try: - # 모든 메타데이터에서 고유 store_id 추출 - all_metadata = self.collection.get() - store_ids = set() - - if all_metadata.get('metadatas'): - for metadata in all_metadata['metadatas']: - store_id = metadata.get('store_id') - if store_id: - store_ids.add(store_id) - - total_stores = len(store_ids) - - except Exception as e: - logger.warning(f"가게 수 확인 실패: {e}") - total_stores = 0 + # 컬렉션 정보 조회 + count = self.collection.count() return { 'collection_name': self.collection_name, - 'total_documents': total_documents, - 'total_stores': total_stores, + 'total_documents': count, + 'total_stores': count, # 각 문서가 하나의 가게를 나타냄 'db_path': self.db_path, - 'status': 'ready' + 'status': 'ready', + 'last_updated': datetime.now().isoformat() } except Exception as e: - logger.error(f"DB 상태 확인 실패: {e}") + logger.error(f"DB 상태 조회 실패: {e}") return { 'collection_name': self.collection_name, 'total_documents': 0, @@ -675,148 +291,4 @@ class VectorService: 'db_path': self.db_path, 'status': 'error', 'error': str(e) - } - - def search_similar_cases(self, store_id: str, context: str) -> Optional[str]: - """유사한 케이스를 검색합니다.""" - try: - if not self.is_ready(): - logger.warning("VectorService가 준비되지 않음") - return None - - # 컨텍스트 기반 유사 검색 - try: - # 검색 쿼리를 임베딩으로 변환 - query_embedding = self.embedding_model.encode(context) - query_embedding = query_embedding.tolist() - - # 유사한 문서 검색 (상위 5개) - results = self.collection.query( - query_embeddings=[query_embedding], - n_results=5, - include=['documents', 'metadatas'] - ) - - if not results or not results.get('documents') or not results['documents'][0]: - logger.info("유사 케이스를 찾을 수 없음") - return None - - # 컨텍스트 조합 - context_parts = [] - documents = results['documents'][0] - metadatas = results.get('metadatas', [[]])[0] - - for i, doc in enumerate(documents): - metadata = metadatas[i] if i < len(metadatas) else {} - - # 가게 정보 추가 - store_name = metadata.get('store_name', 'Unknown') - food_category = metadata.get('food_category', 'Unknown') - - context_parts.append(f"[{food_category} - {store_name}]") - context_parts.append(doc[:500] + "..." if len(doc) > 500 else doc) - context_parts.append("---") - - return "\n".join(context_parts) - - except Exception as search_error: - logger.warning(f"벡터 검색 실패: {search_error}") - return None - - except Exception as e: - logger.error(f"유사 케이스 검색 실패: {e}") - return None - - def search_similar_cases_improved(self, store_id: str, context: str) -> Optional[str]: - """ - 개선된 유사 케이스 검색 - 1. store_id 기반 필터링 우선 적용 - 2. 동종 업체 우선 검색 - 3. 캐싱 및 성능 최적화 - """ - try: - if not self.is_ready(): - logger.warning("VectorService가 준비되지 않음") - return None - - # 1단계: 해당 가게의 정보 먼저 확인 - store_context = self.get_store_context(store_id) - food_category = store_context.get('food_category', '') if store_context else '' - - # 2단계: 검색 쿼리 구성 (가게 정보 + 컨텍스트) - enhanced_query = f"{food_category} {context}" - query_embedding = self.embedding_model.encode(enhanced_query).tolist() - - # 3단계: 동종 업체 우선 검색 (메타데이터 필터링) - results = self.collection.query( - query_embeddings=[query_embedding], - n_results=10, # 더 많은 결과에서 필터링 - include=['documents', 'metadatas', 'distances'], - where={"food_category": {"$eq": food_category}} if food_category else None - ) - - if not results or not results.get('documents') or not results['documents'][0]: - # 4단계: 동종 업체가 없으면 전체 검색 - logger.info("동종 업체 없음 - 전체 검색으로 전환") - results = self.collection.query( - query_embeddings=[query_embedding], - n_results=5, - include=['documents', 'metadatas', 'distances'] - ) - - if not results or not results.get('documents') or not results['documents'][0]: - logger.info("유사 케이스를 찾을 수 없음") - return None - - # 5단계: 결과 조합 (관련성 높은 순서로) - context_parts = [] - documents = results['documents'][0] - metadatas = results.get('metadatas', [[]])[0] - distances = results.get('distances', [[]])[0] - - # 거리(유사도) 기준으로 필터링 (너무 관련성 낮은 것 제외) - filtered_results = [] - for i, (doc, metadata, distance) in enumerate(zip(documents, metadatas, distances)): - if distance < 0.8: # 유사도 임계값 - filtered_results.append((doc, metadata, distance)) - - if not filtered_results: - return None - - # 최대 3개의 가장 관련성 높은 케이스만 사용 - for doc, metadata, distance in filtered_results[:3]: - store_name = metadata.get('store_name', 'Unknown') - food_cat = metadata.get('food_category', 'Unknown') - - context_parts.append(f"[{food_cat} - {store_name}] (유사도: {1-distance:.2f})") - # 문서 길이 제한으로 토큰 수 최적화 - context_parts.append(doc[:300] + "..." if len(doc) > 300 else doc) - context_parts.append("---") - - return "\n".join(context_parts) - - except Exception as e: - logger.error(f"유사 케이스 검색 실패: {e}") - return None - - def get_store_context(self, store_id: str) -> Optional[Dict[str, Any]]: - """해당 가게의 컨텍스트 정보 조회 (캐싱 적용)""" - try: - if not self.is_ready(): - return None - - # 메타데이터에서 해당 store_id 검색 - results = self.collection.get( - where={"store_id": {"$eq": store_id}}, - limit=1, - include=['metadatas'] - ) - - if results and results.get('metadatas') and len(results['metadatas']) > 0: - return results['metadatas'][0] - - return None - - except Exception as e: - logger.error(f"가게 컨텍스트 조회 실패: {e}") - return None \ No newline at end of file + } \ No newline at end of file diff --git a/vector/app/utils/category_utils.py b/vector/app/utils/category_utils.py index 9abcaa0..22c8236 100644 --- a/vector/app/utils/category_utils.py +++ b/vector/app/utils/category_utils.py @@ -1,4 +1,4 @@ -# app/utils/category_utils.py (수정된 버전) +# app/utils/category_utils.py import re from typing import Optional @@ -98,37 +98,6 @@ def extract_main_category(category_name: str) -> str: return "" -def build_search_query(region: str, food_category: str) -> str: - """ - 검색 쿼리를 구성합니다. (수정된 버전 - 지역 정보 제외) - - Args: - region: 지역 (사용하지 않음) - food_category: 음식 카테고리 - - Returns: - 검색 쿼리 문자열 (음식 카테고리만 포함) - """ - # 콤마와 슬래시를 공백으로 변경하여 검색 키워드 생성 - search_keywords = food_category.replace(',', ' ').replace('/', ' ') - - # 불필요한 단어 제거 - stop_words = ['음식점', '요리', '전문점', '맛집'] - keywords = [] - - for keyword in search_keywords.split(): - keyword = keyword.strip() - if keyword and keyword not in stop_words: - keywords.append(keyword) - - # 키워드가 없으면 기본 검색어 사용 - if not keywords: - keywords = ['음식점'] - - # 🔧 지역 정보는 포함하지 않고 음식 키워드만 반환 - query = ' '.join(keywords) - return query.strip() - def clean_food_category_for_search(food_category: str) -> str: """ 음식 카테고리를 검색용 키워드로 정리합니다. @@ -158,4 +127,4 @@ def clean_food_category_for_search(food_category: str) -> str: if not keywords: return "음식점" - return ' '.join(keywords) + return ' '.join(keywords) \ No newline at end of file diff --git a/vector/app/utils/data_utils.py b/vector/app/utils/data_utils.py index 687f446..bdab6ef 100644 --- a/vector/app/utils/data_utils.py +++ b/vector/app/utils/data_utils.py @@ -52,59 +52,46 @@ def generate_review_summary(reviews: List[Dict[str, Any]]) -> Dict[str, Any]: if not reviews: return { "total_reviews": 0, - "average_rating": 0.0, + "average_rating": 0, "rating_distribution": {}, - "common_keywords": [], - "sentiment_summary": { - "positive": 0, - "neutral": 0, - "negative": 0 - } + "total_likes": 0, + "common_keywords": [] } - # 기본 통계 + # 기본 통계 계산 total_reviews = len(reviews) - ratings = [review.get('rating', 0) for review in reviews if review.get('rating', 0) > 0] - average_rating = sum(ratings) / len(ratings) if ratings else 0.0 + total_rating = sum(review.get('rating', 0) for review in reviews) + average_rating = total_rating / total_reviews if total_reviews > 0 else 0 + total_likes = sum(review.get('likes', 0) for review in reviews) - # 별점 분포 + # 별점 분포 계산 rating_distribution = {} - for rating in ratings: - rating_distribution[str(rating)] = rating_distribution.get(str(rating), 0) + 1 + for review in reviews: + rating = review.get('rating', 0) + rating_distribution[rating] = rating_distribution.get(rating, 0) + 1 - # 키워드 추출 (badges 기반) - keyword_counts = {} + # 공통 키워드 추출 (배지 기준) + keyword_count = {} for review in reviews: badges = review.get('badges', []) for badge in badges: - keyword_counts[badge] = keyword_counts.get(badge, 0) + 1 + keyword_count[badge] = keyword_count.get(badge, 0) + 1 - # 상위 키워드 추출 - common_keywords = sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True)[:10] + # 상위 10개 키워드 + common_keywords = sorted(keyword_count.items(), key=lambda x: x[1], reverse=True)[:10] common_keywords = [keyword for keyword, count in common_keywords] - # 감정 분석 (간단한 별점 기반) - sentiment_summary = { - "positive": len([r for r in ratings if r >= 4]), - "neutral": len([r for r in ratings if r == 3]), - "negative": len([r for r in ratings if r <= 2]) - } - return { "total_reviews": total_reviews, "average_rating": round(average_rating, 2), "rating_distribution": rating_distribution, - "common_keywords": common_keywords, - "sentiment_summary": sentiment_summary, - "has_recent_reviews": any( - review.get('date', '') >= datetime.now().strftime('%Y.%m.%d') - for review in reviews[-10:] # 최근 10개 리뷰 확인 - ) + "total_likes": total_likes, + "common_keywords": common_keywords } def extract_text_for_embedding(store_info: Dict[str, Any], reviews: List[Dict[str, Any]]) -> str: """ - 임베딩을 위한 텍스트를 추출합니다. + Vector DB 임베딩을 위한 텍스트를 추출합니다. Args: store_info: 가게 정보 @@ -191,4 +178,4 @@ def is_duplicate_store(metadata1: Dict[str, Any], metadata2: Dict[str, Any]) -> if name1 and name2 and addr1 and addr2: return name1 == name2 and addr1 == addr2 - return False + return False \ No newline at end of file