HealthSync_Intelligence/app/services/mission_service.py
hyerimmy 910bd902b1
Some checks failed
HealthSync Intelligence CI / build-and-push (push) Has been cancelled
feat : initial commit
2025-06-20 05:28:30 +00:00

1338 lines
60 KiB
Python

# app/services/mission_service.py
"""
HealthSync AI 미션 관련 서비스 (벡터 초기화 기능 추가)
"""
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
from app.services.base_service import BaseService
from app.services.health_service import HealthService
from app.utils.claude_client import ClaudeClient
from app.utils.vector_client import pinecone_client
from app.utils.redis_client import redis_client
from app.config.prompts import get_mission_recommendation_prompt, get_celebration_prompt
from app.dto.response.mission_response import MissionRecommendationResponse, RecommendedMission
from app.dto.response.celebration_response import CelebrationResponse
from app.dto.response.similar_mission_news_response import SimilarMissionNewsResponse, MissionNewsItem
from app.repositories.chat_repository import ChatRepository
from app.repositories.mission_repository import MissionRepository
from app.repositories.similar_mission_repository import SimilarMissionRepository
from app.exceptions import (
UserNotFoundException,
HealthDataNotFoundException,
DatabaseException,
ClaudeAPIException
)
class MissionService(BaseService):
"""미션 관련 비즈니스 로직 서비스 (벡터 초기화 기능 추가)"""
def __init__(self):
super().__init__()
self.health_service = HealthService()
self.claude_client = ClaudeClient()
self.mission_repository = MissionRepository()
self.chat_repository = ChatRepository()
self.similar_mission_repository = SimilarMissionRepository()
async def recommend_missions(self, user_id: int) -> MissionRecommendationResponse:
"""미션 추천 전체 프로세스 실행"""
try:
self.log_operation("recommend_missions_start", user_id=user_id)
# 1. 사용자 데이터 조회
combined_data = await self.health_service.get_combined_user_health_data(user_id)
user_data = combined_data["user_info"]
health_data = combined_data["health_data"]
# 2. 프롬프트 생성
prompt = await self._build_recommendation_prompt(user_data, health_data)
# 3. Claude API 호출
claude_response = await self.claude_client.call_claude_api(prompt)
# 4. Claude 응답 파싱
claude_json = self.claude_client.parse_json_response(claude_response)
# 5. 응답 형식 변환
recommended_missions = self._convert_claude_to_missions(claude_json)
# 6. 최종 응답 생성
response = MissionRecommendationResponse(missions=recommended_missions)
self.log_operation("recommend_missions_success", user_id=user_id,
mission_count=len(recommended_missions))
return response
except Exception as e:
self.logger.error(f"미션 추천 프로세스 실패 - user_id: {user_id}, error: {str(e)}")
raise Exception(f"미션 추천 실패: {str(e)}")
async def generate_celebration_message(self, user_id: int, mission_id: int) -> CelebrationResponse:
"""미션 달성 축하 메시지 생성 및 Chat DB 저장 (완료 후 저장)"""
try:
self.log_operation("generate_celebration_message_start",
user_id=user_id, mission_id=mission_id)
# 1. 미션 정보 조회
mission_info = await self._get_mission_info(mission_id)
# 2. 프롬프트 생성
prompt = self._build_celebration_prompt(user_id, mission_info)
# 3. Claude API 호출
claude_response = await self.claude_client.call_claude_api(prompt)
# 4. 응답 정제 (앞뒤 공백 제거, 따옴표 제거)
celebration_message = claude_response.strip().strip('"').strip("'")
# 5. 축하 메시지 생성 완료 후 Chat DB에 저장
celebration_message_id = await self.chat_repository.save_chat_message(
user_id=user_id,
message_type="celebration",
message_content=None, # message_content는 null
response_content=celebration_message # 축하 메시지는 response_content에 저장
)
# 6. 최종 응답 생성
response = CelebrationResponse(congratsMessage=celebration_message)
self.log_operation("generate_celebration_message_success",
user_id=user_id, mission_id=mission_id,
celebration_id=celebration_message_id,
mission_name=mission_info.get("mission_name"),
message_length=len(celebration_message))
return response
except (UserNotFoundException, HealthDataNotFoundException, DatabaseException):
# 사용자 정의 예외는 그대로 전파
raise
except Exception as e:
self.logger.error(f"축하 메시지 생성 실패 - user_id: {user_id}, mission_id: {mission_id}, error: {str(e)}")
raise ClaudeAPIException(f"축하 메시지 생성 중 오류가 발생했습니다: {str(e)}")
async def get_similar_mission_news(self, user_id: int) -> SimilarMissionNewsResponse:
"""5가지 유사도 기준별 미션 완료 소식 조회 (각 기준별 1명씩 총 5개)"""
try:
self.log_operation("get_similar_mission_news_start", user_id=user_id)
# 1. 유사 사용자 목록 조회 (더 많은 후보 확보)
similar_users = await self._get_cached_similar_users(user_id, top_k=30)
if not similar_users:
return SimilarMissionNewsResponse(similar_mission_news=[], total_count=0)
# 2. 유사 사용자들의 최근 미션 완료 이력 실시간 조회
recent_completions = await self.similar_mission_repository.get_recent_mission_completions(similar_users)
# 3. 현재 사용자 정보 조회 (비교 기준용)
current_user_data = await self.similar_mission_repository.get_user_health_for_vector(user_id)
# 4. 5가지 유사도 기준별 최적 후보 선별
mission_news_items = await self._select_diverse_mission_news(recent_completions, current_user_data)
# 5. 최종 응답 생성
response = SimilarMissionNewsResponse(
similar_mission_news=mission_news_items,
total_count=len(mission_news_items)
)
self.log_operation("get_similar_mission_news_success", user_id=user_id,
similar_user_count=len(similar_users),
news_count=len(mission_news_items))
return response
except Exception as e:
self.logger.error(f"유사 미션 소식 조회 실패 - user_id: {user_id}, error: {str(e)}")
raise Exception(f"유사 미션 소식 조회 실패: {str(e)}")
async def _select_diverse_mission_news(self, recent_completions: List[Dict[str, Any]],
current_user_data: Dict[str, Any]) -> List[MissionNewsItem]:
"""5가지 유사도 기준별 최적 후보 선별 (정상 특성 제외)"""
try:
if not current_user_data:
return []
# 5가지 유사도 기준별 최고 점수 후보들
similarity_categories = {
"occupation": {"best_completion": None, "best_score": 0.0}, # 직업 유사도
"age": {"best_completion": None, "best_score": 0.0}, # 나이 유사도
"health_bmi": {"best_completion": None, "best_score": 0.0}, # BMI/체형 유사도 (비정상만)
"health_bp": {"best_completion": None, "best_score": 0.0}, # 혈압 유사도 (주의/위험만)
"health_glucose": {"best_completion": None, "best_score": 0.0} # 혈당 유사도 (주의/위험만)
}
# 각 완료 이력에 대해 5가지 기준별 점수 계산
for completion in recent_completions:
# 1. 직업 유사도 계산
occupation_score = self._calculate_occupation_similarity(completion, current_user_data)
if occupation_score > similarity_categories["occupation"]["best_score"]:
similarity_categories["occupation"]["best_completion"] = completion
similarity_categories["occupation"]["best_score"] = occupation_score
# 2. 나이 유사도 계산
age_score = self._calculate_age_similarity(completion, current_user_data)
if age_score > similarity_categories["age"]["best_score"]:
similarity_categories["age"]["best_completion"] = completion
similarity_categories["age"]["best_score"] = age_score
# 3. BMI/체형 유사도 계산 (정상 체형 제외)
bmi_score = self._calculate_bmi_similarity(completion, current_user_data)
if bmi_score > similarity_categories["health_bmi"]["best_score"] and bmi_score > 0.3:
# 정상 체형은 낮은 점수(0.1)이므로 0.3 임계값으로 필터링됨
similarity_categories["health_bmi"]["best_completion"] = completion
similarity_categories["health_bmi"]["best_score"] = bmi_score
# 4. 혈압 유사도 계산 (정상 혈압 제외)
bp_score = self._calculate_bp_similarity(completion, current_user_data)
if bp_score > similarity_categories["health_bp"]["best_score"] and bp_score > 0.3:
# 정상 혈압은 낮은 점수(0.1)이므로 0.3 임계값으로 필터링됨
similarity_categories["health_bp"]["best_completion"] = completion
similarity_categories["health_bp"]["best_score"] = bp_score
# 5. 혈당 유사도 계산 (정상 혈당 제외)
glucose_score = self._calculate_glucose_similarity(completion, current_user_data)
if glucose_score > similarity_categories["health_glucose"]["best_score"] and glucose_score > 0.3:
# 정상 혈당은 낮은 점수(0.1)이므로 0.3 임계값으로 필터링됨
similarity_categories["health_glucose"]["best_completion"] = completion
similarity_categories["health_glucose"]["best_score"] = glucose_score
# 선별된 후보들을 MissionNewsItem으로 변환
mission_news_items = []
used_user_ids = set() # 중복 사용자 방지
for category, data in similarity_categories.items():
completion = data["best_completion"]
score = data["best_score"]
if completion and score > 0.3: # 최소 유사도 임계값 (정상 특성들은 자동 필터링됨)
user_id = completion.get("member_serial_number")
# 중복 사용자 체크
if user_id not in used_user_ids:
news_item = await self._create_mission_news_item(completion, current_user_data, category, score)
if news_item:
mission_news_items.append(news_item)
used_user_ids.add(user_id)
# 유사도 점수 순으로 정렬
mission_news_items.sort(key=lambda x: x.similarity_score, reverse=True)
# 최대 5개 반환
return mission_news_items[:5]
except Exception as e:
self.logger.error(f"다양한 미션 소식 선별 실패: {str(e)}")
return []
def _calculate_occupation_similarity(self, completion: Dict[str, Any], current_user: Dict[str, Any]) -> float:
"""직업 유사도 계산"""
try:
completion_occupation = completion.get("occupation", "")
current_occupation = current_user.get("occupation", "")
if not completion_occupation or not current_occupation:
return 0.0
if completion_occupation == current_occupation:
return 1.0
# 유사 직업군 체크
similar_groups = [
["OFF001", "ENG001"], # 테크 그룹
["MED001", "EDU001"], # 케어 그룹
["SRV001"] # 서비스 그룹
]
for group in similar_groups:
if completion_occupation in group and current_occupation in group:
return 0.6
return 0.0
except:
return 0.0
def _calculate_age_similarity(self, completion: Dict[str, Any], current_user: Dict[str, Any]) -> float:
"""나이 유사도 계산"""
try:
completion_age = completion.get("age", 0)
current_age = current_user.get("age", 0)
if not completion_age or not current_age:
return 0.0
age_diff = abs(completion_age - current_age)
return max(0.0, 1.0 - age_diff / 10) # 10세 차이까지 고려
except:
return 0.0
def _calculate_bmi_similarity(self, completion: Dict[str, Any], current_user: Dict[str, Any]) -> float:
"""BMI/체형 유사도 계산 (정상 체형 제외 로직)"""
try:
completion_bmi = self._calculate_bmi_from_data(completion)
current_bmi = current_user.get("bmi", 0) or self._calculate_bmi_from_data(current_user)
if not completion_bmi or not current_bmi:
return 0.0
# 정상 체형인 경우 유사도 낮춤 (표시되지 않도록)
if 18.5 <= completion_bmi < 25:
return 0.1 # 매우 낮은 유사도
bmi_diff = abs(completion_bmi - current_bmi)
return max(0.0, 1.0 - bmi_diff / 6) # BMI 6 차이까지 고려
except:
return 0.0
def _calculate_bp_similarity(self, completion: Dict[str, Any], current_user: Dict[str, Any]) -> float:
"""혈압 유사도 계산 (정상 혈압 제외 로직)"""
try:
completion_bp = completion.get("systolic_bp", 0)
current_bp = current_user.get("systolic_bp", 0)
if not completion_bp or not current_bp:
return 0.0
# 정상 혈압인 경우 유사도 낮춤 (표시되지 않도록)
if completion_bp < 130:
return 0.1 # 매우 낮은 유사도
bp_diff = abs(completion_bp - current_bp)
return max(0.0, 1.0 - bp_diff / 30) # 혈압 30 차이까지 고려
except:
return 0.0
def _calculate_glucose_similarity(self, completion: Dict[str, Any], current_user: Dict[str, Any]) -> float:
"""혈당 유사도 계산 (정상 혈당 제외 로직)"""
try:
completion_glucose = completion.get("fasting_glucose", 0)
current_glucose = current_user.get("fasting_glucose", 0)
if not completion_glucose or not current_glucose:
return 0.0
# 정상 혈당인 경우 유사도 낮춤 (표시되지 않도록)
if completion_glucose < 100:
return 0.1 # 매우 낮은 유사도
glucose_diff = abs(completion_glucose - current_glucose)
return max(0.0, 1.0 - glucose_diff / 30) # 혈당 30 차이까지 고려
except:
return 0.0
async def _create_mission_news_item(self, completion: Dict[str, Any], current_user_data: Dict[str, Any],
category: str, score: float) -> Optional[MissionNewsItem]:
"""개별 미션 소식 아이템 생성"""
try:
# 이름 마스킹
full_name = completion.get("name", "사용자")
masked_name = self._mask_name(full_name)
# 카테고리별 특성 분석
user_characteristics = self._analyze_category_characteristics(completion, current_user_data, category)
# 미션 정보
mission_name = completion.get("mission_name", "미션")
completed_count = completion.get("daily_completed_count", 1)
# 미션 카테고리 추출
mission_category = self._extract_mission_category(mission_name)
# 간단한 이모지 매핑 (성능 향상)
emoji = self._get_simple_emoji(mission_name)
# 메시지 생성
message = self._generate_category_based_message(
masked_name, mission_name, completed_count, user_characteristics, emoji
)
# MissionNewsItem 생성
news_item = MissionNewsItem(
message=message,
mission_category=mission_category,
similarity_score=score,
completed_at=completion.get("created_at", datetime.now())
)
return news_item
except Exception as e:
self.logger.warning(f"미션 소식 아이템 생성 실패: {str(e)}")
return None
def _analyze_category_characteristics(self, completion: Dict[str, Any], current_user_data: Dict[str, Any],
category: str) -> Dict[str, Any]:
"""카테고리별 특성 분석 (정상 특성 제외)"""
characteristics = {
"primary_identifier": "",
"category": category
}
try:
if category == "occupation":
# 직업 중심
occupation_code = completion.get("occupation", "")
characteristics["primary_identifier"] = self._get_occupation_display_name(occupation_code)
elif category == "age":
# 나이 중심
age = completion.get("age", 0)
if age:
characteristics["primary_identifier"] = f"{age}"
elif category == "health_bmi":
# BMI/체형 중심 (정상 체형 제외)
bmi = self._calculate_bmi_from_data(completion)
if bmi:
if bmi < 18.5:
characteristics["primary_identifier"] = "마른"
elif bmi >= 25:
characteristics["primary_identifier"] = "통통한"
# 정상 체형(18.5-25)은 표시하지 않음
elif category == "health_bp":
# 혈압 중심 (정상 혈압 제외)
systolic_bp = completion.get("systolic_bp", 0)
if systolic_bp:
if systolic_bp >= 140:
characteristics["primary_identifier"] = "혈압높은"
elif systolic_bp >= 130:
characteristics["primary_identifier"] = "혈압주의"
# 정상 혈압(<130)은 표시하지 않음
elif category == "health_glucose":
# 혈당 중심 (정상 혈당 제외)
glucose = completion.get("fasting_glucose", 0)
if glucose:
if glucose >= 126:
characteristics["primary_identifier"] = "혈당높은"
elif glucose >= 100:
characteristics["primary_identifier"] = "혈당주의"
# 정상 혈당(<100)은 표시하지 않음
except Exception as e:
self.logger.warning(f"카테고리별 특성 분석 실패: {str(e)}")
return characteristics
def _generate_category_based_message(self, masked_name: str, mission_name: str, completed_count: int,
characteristics: Dict[str, Any], emoji: str) -> str:
"""카테고리 기반 메시지 생성 (정상 특성 제외)"""
try:
primary_identifier = characteristics.get("primary_identifier", "")
# 식별자가 있으면 사용, 없으면 기본 이름만 사용
# 정상 특성들은 primary_identifier가 빈 문자열이므로 자동으로 기본 이름만 사용됨
if primary_identifier:
identifier = f"{primary_identifier} {masked_name}"
else:
identifier = f"{masked_name}"
# 미션 완료 메시지 생성
if completed_count > 1:
message = f"{identifier}{mission_name} {completed_count}회를 완료했어요! {emoji}"
else:
message = f"{identifier}{mission_name}을 완료했어요! {emoji}"
return message
except Exception as e:
self.logger.warning(f"카테고리 기반 메시지 생성 실패: {str(e)}")
return f"{masked_name}님이 미션을 완료했어요! 🎉"
def _get_simple_emoji(self, mission_name: str) -> str:
"""간단한 이모지 매핑 (성능 향상)"""
try:
mission_name_lower = mission_name.lower()
# 키워드 기반 이모지 매핑
emoji_map = {
"": "💧", "마시기": "💧", "water": "💧",
"걷기": "🚶‍♀️", "산책": "🚶‍♂️", "운동": "💪", "walk": "🚶‍♀️",
"스트레칭": "🧘‍♂️", "stretch": "🧘‍♀️",
"명상": "🧘‍♀️", "meditation": "🧘‍♂️",
"수면": "😴", "": "😴", "sleep": "😴",
"계단": "🏃‍♂️", "오르기": "🏃‍♀️", "stairs": "🏃‍♂️",
"식단": "🥗", "음식": "🍎", "diet": "🥗",
"호흡": "🌬️", "breathing": "🌬️"
}
for keyword, emoji in emoji_map.items():
if keyword in mission_name_lower:
return emoji
return "🎉" # 기본 이모지
except Exception as e:
self.logger.warning(f"간단한 이모지 매핑 실패: {str(e)}")
return "🎉"
async def _format_mission_news(self, recent_completions: List[Dict[str, Any]],
current_user_id: int) -> List[MissionNewsItem]:
"""미션 완료 이력을 소식 메시지로 포맷팅 (다층 특성 기반 개선)"""
try:
mission_news_items = []
# 현재 사용자 정보 조회 (비교 기준용)
current_user_data = await self.similar_mission_repository.get_user_health_for_vector(current_user_id)
for completion in recent_completions:
# 이름 마스킹 (김OO 형식)
full_name = completion.get("name", "사용자")
masked_name = self._mask_name(full_name)
# 다층 특성 분석 (나이, 건강, 직업)
user_characteristics = self._analyze_multi_layer_characteristics(completion, current_user_data)
# 미션 정보
mission_name = completion.get("mission_name", "미션")
completed_count = completion.get("daily_completed_count", 1)
# 미션 카테고리 추출
mission_category = self._extract_mission_category(mission_name)
# 다층 특성 기반 메시지 생성 (AI 이모지 자동 매핑 포함)
message = await self._generate_multi_layer_mission_message(
masked_name, mission_name, completed_count, user_characteristics
)
# 유사도 점수 계산 (직업 가중치 증가)
similarity_score = self._calculate_enhanced_user_similarity(completion, current_user_data)
# MissionNewsItem 생성
news_item = MissionNewsItem(
message=message,
mission_category=mission_category,
similarity_score=similarity_score,
completed_at=completion.get("created_at", datetime.now())
)
mission_news_items.append(news_item)
# 유사도 순으로 정렬 후 최신순으로 재정렬
mission_news_items.sort(key=lambda x: x.similarity_score, reverse=True)
mission_news_items = mission_news_items[:15] # 상위 15개 선택
mission_news_items.sort(key=lambda x: x.completed_at, reverse=True) # 최신순 정렬
return mission_news_items[:10] # 최종 10개 반환
except Exception as e:
self.logger.error(f"미션 소식 포맷팅 실패: {str(e)}")
return []
def _analyze_multi_layer_characteristics(self, user_completion: Dict[str, Any],
current_user_data: Dict[str, Any]) -> Dict[str, Any]:
"""다층 사용자 특성 분석 (나이, 건강, 직업)"""
characteristics = {
"age_char": "",
"health_chars": [],
"occupation_char": "",
"primary_type": "", # age, health, occupation 중 가장 강한 특성
"similarity_strength": 0.0
}
if not current_user_data:
return characteristics
try:
# 1. 나이 특성 분석
completion_age = user_completion.get("age", 0)
current_age = current_user_data.get("age", 0)
age_similarity = 0.0
if completion_age and current_age:
age_diff = abs(completion_age - current_age)
if age_diff <= 3: # 3세 이내
characteristics["age_char"] = f"{completion_age}"
age_similarity = 1.0 - (age_diff / 10)
elif age_diff <= 7: # 7세 이내
if completion_age < 30:
characteristics["age_char"] = f"{completion_age}"
elif completion_age < 40:
characteristics["age_char"] = f"30대"
else:
characteristics["age_char"] = f"40대"
age_similarity = 0.7 - (age_diff / 20)
# 2. 건강 특성 분석
health_chars = []
health_similarity = 0.0
# BMI 특성
completion_bmi = self._calculate_bmi_from_data(user_completion)
current_bmi = current_user_data.get("bmi", 0) or self._calculate_bmi_from_data(current_user_data)
if completion_bmi and current_bmi:
if completion_bmi < 18.5 and current_bmi < 20:
health_chars.append("마른")
health_similarity += 0.2
elif completion_bmi >= 25 and current_bmi >= 23:
health_chars.append("통통한")
health_similarity += 0.2
# 혈압 특성
completion_systolic = user_completion.get("systolic_bp", 0)
current_systolic = current_user_data.get("systolic_bp", 0)
if completion_systolic and current_systolic:
if completion_systolic >= 140 and current_systolic >= 130:
health_chars.append("혈압높은")
health_similarity += 0.25
elif completion_systolic >= 130 and current_systolic >= 120:
health_chars.append("혈압주의")
health_similarity += 0.15
# 혈당 특성
completion_glucose = user_completion.get("fasting_glucose", 0)
current_glucose = current_user_data.get("fasting_glucose", 0)
if completion_glucose and current_glucose:
if completion_glucose >= 126 and current_glucose >= 110:
health_chars.append("혈당높은")
health_similarity += 0.25
elif completion_glucose >= 100 and current_glucose >= 95:
health_chars.append("혈당주의")
health_similarity += 0.15
# 콜레스테롤 특성
completion_chol = user_completion.get("total_cholesterol", 0)
current_chol = current_user_data.get("total_cholesterol", 0)
if completion_chol and current_chol:
if completion_chol >= 240 and current_chol >= 220:
health_chars.append("콜레스테롤높은")
health_similarity += 0.2
# 간기능 특성
completion_alt = user_completion.get("alt", 0)
current_alt = current_user_data.get("alt", 0)
if completion_alt and current_alt:
if completion_alt >= 40 and current_alt >= 35:
health_chars.append("간수치높은")
health_similarity += 0.15
characteristics["health_chars"] = health_chars
# 3. 직업 특성 분석
occupation_similarity = 0.0
completion_occupation = user_completion.get("occupation", "")
current_occupation = current_user_data.get("occupation", "")
if completion_occupation and current_occupation:
if completion_occupation == current_occupation:
# 동일 직업
occupation_name = self._get_occupation_display_name(completion_occupation)
characteristics["occupation_char"] = occupation_name
occupation_similarity = 1.0
else:
# 유사 직업군 체크
similar_groups = {
"tech": ["OFF001", "ENG001"], # 사무직, 엔지니어
"care": ["MED001", "EDU001"], # 의료진, 교육직
"service": ["SRV001"] # 서비스직
}
for group_name, occupations in similar_groups.items():
if completion_occupation in occupations and current_occupation in occupations:
occupation_name = self._get_occupation_display_name(completion_occupation)
characteristics["occupation_char"] = occupation_name
occupation_similarity = 0.7
break
# 4. 주요 특성 타입 결정
similarities = {
"occupation": occupation_similarity,
"health": health_similarity,
"age": age_similarity
}
# 가장 높은 유사도의 특성을 주요 타입으로 설정
primary_type = max(similarities, key=similarities.get)
characteristics["primary_type"] = primary_type
characteristics["similarity_strength"] = similarities[primary_type]
self.logger.debug(f"다층 특성 분석 완료 - "
f"나이: {characteristics['age_char']}, "
f"건강: {health_chars}, "
f"직업: {characteristics['occupation_char']}, "
f"주요타입: {primary_type}")
except Exception as e:
self.logger.warning(f"다층 특성 분석 중 오류: {str(e)}")
return characteristics
def _get_occupation_display_name(self, occupation_code: str) -> str:
"""직업 코드를 표시용 이름으로 변환"""
occupation_display = {
"OFF001": "사무직",
"MED001": "의료진",
"EDU001": "교육직",
"ENG001": "IT직군",
"SRV001": "서비스직"
}
return occupation_display.get(occupation_code, "직장인")
async def _generate_multi_layer_mission_message(self, masked_name: str, mission_name: str,
completed_count: int, characteristics: Dict[str, Any]) -> str:
"""다층 특성을 고려한 미션 완료 소식 메시지 생성 (AI 이모지 자동 매핑)"""
try:
# 특성 조합 및 우선순위 결정
primary_type = characteristics.get("primary_type", "")
age_char = characteristics.get("age_char", "")
health_chars = characteristics.get("health_chars", [])
occupation_char = characteristics.get("occupation_char", "")
# 메시지 식별자 생성 로직
identifier = self._build_identifier(primary_type, age_char, health_chars, occupation_char, masked_name)
# AI를 통한 이모지 자동 매핑
emoji = await self._get_ai_emoji(mission_name)
# 미션 완료 메시지 생성
if completed_count > 1:
message = f"{identifier}{mission_name} {completed_count}회를 완료했어요! {emoji}"
else:
message = f"{identifier}{mission_name}을 완료했어요! {emoji}"
return message
except Exception as e:
self.logger.error(f"다층 미션 메시지 생성 실패: {str(e)}")
return f"{masked_name}님이 미션을 완료했어요! 🎉"
async def _get_ai_emoji(self, mission_name: str) -> str:
"""AI를 통한 미션별 적절한 이모지 자동 선택"""
try:
# 이모지 선택 프롬프트
emoji_prompt = f"""
다음 미션에 가장 적절한 이모지 1개를 선택해서 이모지만 답변해주세요.
미션: {mission_name}
아래 이모지 중에서 가장 적절한 것을 선택하거나, 더 적절한 이모지가 있다면 그것을 사용해주세요:
💧 🚶‍♀️ 🚶‍♂️ 💪 🧘‍♂️ 🧘‍♀️ 😴 🏃‍♂️ 🏃‍♀️ 🥗 🍎 🧠 ❤️ 🌟 ✨ 🎯 🏆 💯 🔥 ⚡ 🌱
이모지만 답변하세요.
"""
# Claude API 호출 (짧은 응답)
emoji_response = await self.claude_client.call_claude_api(emoji_prompt)
# 응답에서 이모지 추출 (첫 번째 이모지 사용)
emoji = emoji_response.strip()
# 이모지 검증 (길이가 1-2자가 아니거나 특수 문자가 아니면 기본값 사용)
if len(emoji) > 4 or not emoji:
emoji = "🎉"
self.logger.debug(f"AI 이모지 선택: {mission_name} -> {emoji}")
return emoji
except Exception as e:
self.logger.warning(f"AI 이모지 선택 실패, 기본값 사용: {str(e)}")
return "🎉"
def _build_identifier(self, primary_type: str, age_char: str, health_chars: List[str],
occupation_char: str, masked_name: str) -> str:
"""특성 기반 식별자 생성"""
try:
# 1순위: 직업이 주요 특성이고 직업 정보가 있는 경우
if primary_type == "occupation" and occupation_char:
return f"{occupation_char} {masked_name}"
# 2순위: 건강이 주요 특성이고 건강 특성이 있는 경우
elif primary_type == "health" and health_chars:
if len(health_chars) >= 2:
# 건강 특성 2개 이상: 첫 번째와 두 번째 조합
return f"{health_chars[0]} {health_chars[1]} {masked_name}"
else:
# 건강 특성 1개: 나이와 조합 시도
if age_char:
return f"{age_char} {health_chars[0]} {masked_name}"
else:
return f"{health_chars[0]} {masked_name}"
# 3순위: 나이가 주요 특성이고 나이 정보가 있는 경우
elif primary_type == "age" and age_char:
# 나이 + 건강 특성 조합 시도
if health_chars:
return f"{age_char} {health_chars[0]} {masked_name}"
else:
return f"{age_char} {masked_name}"
# 보조 로직: 주요 특성이 없거나 약한 경우 다른 특성 활용
else:
# 직업 정보 우선 활용
if occupation_char:
return f"{occupation_char} {masked_name}"
# 건강 특성 활용
elif health_chars:
if age_char:
return f"{age_char} {health_chars[0]} {masked_name}"
else:
return f"{health_chars[0]} {masked_name}"
# 나이만 있는 경우
elif age_char:
return f"{age_char} {masked_name}"
# 모든 특성이 없는 경우
else:
return f"{masked_name}"
except Exception as e:
self.logger.warning(f"식별자 생성 실패: {str(e)}")
return f"{masked_name}"
def _calculate_enhanced_user_similarity(self, completion_user: Dict[str, Any],
current_user: Dict[str, Any]) -> float:
"""강화된 사용자 간 유사도 계산 (직업 가중치 증가)"""
if not current_user:
return 0.5 # 기본 유사도
try:
similarity_score = 0.0
weight_sum = 0.0
# 직업 유사도 (가중치: 25% 증가)
completion_occupation = completion_user.get("occupation", "")
current_occupation = current_user.get("occupation", "")
if completion_occupation and current_occupation:
if completion_occupation == current_occupation:
occupation_similarity = 1.0
else:
# 유사 직업군 체크
similar_groups = {
"tech": ["OFF001", "ENG001"], # 사무직, 엔지니어
"care": ["MED001", "EDU001"], # 의료진, 교육직
"service": ["SRV001"] # 서비스직
}
occupation_similarity = 0.0
for group_name, occupations in similar_groups.items():
if completion_occupation in occupations and current_occupation in occupations:
occupation_similarity = 0.6
break
similarity_score += occupation_similarity * 0.25
weight_sum += 0.25
# 나이 유사도 (가중치: 25%)
completion_age = completion_user.get("age", 0)
current_age = current_user.get("age", 0)
if completion_age and current_age:
age_diff = abs(completion_age - current_age)
age_similarity = max(0.0, 1.0 - age_diff / 15) # 15세 차이까지 고려
similarity_score += age_similarity * 0.25
weight_sum += 0.25
# BMI 유사도 (가중치: 15%)
completion_bmi = self._calculate_bmi_from_data(completion_user)
current_bmi = current_user.get("bmi", 0) or self._calculate_bmi_from_data(current_user)
if completion_bmi and current_bmi:
bmi_diff = abs(completion_bmi - current_bmi)
bmi_similarity = max(0.0, 1.0 - bmi_diff / 8) # BMI 8 차이까지 고려
similarity_score += bmi_similarity * 0.15
weight_sum += 0.15
# 혈압 유사도 (가중치: 12%)
completion_bp = completion_user.get("systolic_bp", 0)
current_bp = current_user.get("systolic_bp", 0)
if completion_bp and current_bp:
bp_diff = abs(completion_bp - current_bp)
bp_similarity = max(0.0, 1.0 - bp_diff / 40) # 혈압 40 차이까지 고려
similarity_score += bp_similarity * 0.12
weight_sum += 0.12
# 혈당 유사도 (가중치: 12%)
completion_glucose = completion_user.get("fasting_glucose", 0)
current_glucose = current_user.get("fasting_glucose", 0)
if completion_glucose and current_glucose:
glucose_diff = abs(completion_glucose - current_glucose)
glucose_similarity = max(0.0, 1.0 - glucose_diff / 40)
similarity_score += glucose_similarity * 0.12
weight_sum += 0.12
# 콜레스테롤 유사도 (가중치: 11%)
completion_chol = completion_user.get("total_cholesterol", 0)
current_chol = current_user.get("total_cholesterol", 0)
if completion_chol and current_chol:
chol_diff = abs(completion_chol - current_chol)
chol_similarity = max(0.0, 1.0 - chol_diff / 80)
similarity_score += chol_similarity * 0.11
weight_sum += 0.11
# 최종 유사도 계산
if weight_sum > 0:
final_similarity = similarity_score / weight_sum
else:
final_similarity = 0.5 # 기본값
return min(1.0, max(0.0, final_similarity))
except Exception as e:
self.logger.warning(f"강화된 유사도 계산 실패: {str(e)}")
return 0.5
def _calculate_bmi_from_data(self, user_data: Dict[str, Any]) -> float:
"""사용자 데이터에서 BMI 계산"""
try:
height = user_data.get("height", 0)
weight = user_data.get("weight", 0)
if height and weight and height > 0:
return round(weight / ((height / 100) ** 2), 1)
return 0.0
except:
return 0.0
def _mask_name(self, full_name: str) -> str:
"""이름 마스킹 (김OO 형식)"""
if len(full_name) <= 1:
return full_name + "OO"
elif len(full_name) == 2:
return full_name[0] + "O"
else:
return full_name[0] + "O" * (len(full_name) - 1)
def _extract_mission_category(self, mission_name: str) -> str:
"""미션명에서 카테고리 추출"""
mission_name_lower = mission_name.lower()
if any(keyword in mission_name_lower for keyword in ["", "water", "마시기"]):
return "hydration"
elif any(keyword in mission_name_lower for keyword in ["걷기", "산책", "운동", "walk", "exercise"]):
return "exercise"
elif any(keyword in mission_name_lower for keyword in ["스트레칭", "stretch"]):
return "stretching"
elif any(keyword in mission_name_lower for keyword in ["명상", "meditation"]):
return "meditation"
elif any(keyword in mission_name_lower for keyword in ["수면", "", "sleep"]):
return "sleep"
else:
return "general"
async def upsert_user_vector_on_registration(self, user_id: int) -> bool:
"""사용자 회원가입/수정 시 벡터DB 저장"""
try:
self.log_operation("upsert_user_vector_start", user_id=user_id)
# 1. 사용자 건강 정보 조회
user_health_data = await self.similar_mission_repository.get_user_health_for_vector(user_id)
if not user_health_data:
raise UserNotFoundException(user_id)
# 2. Pinecone에 사용자 벡터 저장
success = await pinecone_client.upsert_user_vector(user_id, user_health_data)
if success:
# 3. 유사 사용자 캐시 무효화 (기존 캐시 삭제)
await self._invalidate_similar_users_cache(user_id)
self.log_operation("upsert_user_vector_success", user_id=user_id)
return True
else:
raise Exception("벡터 저장 실패")
except Exception as e:
self.logger.error(f"사용자 벡터 저장 실패 - user_id: {user_id}, error: {str(e)}")
return False
async def _get_cached_similar_users(self, user_id: int, top_k: int = 30) -> List[int]:
"""유사 사용자 목록 조회 (더 많은 후보 확보)"""
try:
cache_key = redis_client.generate_similar_users_key(user_id)
# Cache Aside 패턴 적용
async def fetch_similar_users():
return await pinecone_client.search_similar_users(user_id, top_k=top_k)
similar_users = await redis_client.get_or_set(
key=cache_key,
fetch_func=fetch_similar_users,
ttl=1800 # 30분 캐싱
)
self.log_operation("get_cached_similar_users", user_id=user_id,
similar_count=len(similar_users) if similar_users else 0)
return similar_users if similar_users else []
except Exception as e:
self.logger.error(f"유사 사용자 캐시 조회 실패 - user_id: {user_id}, error: {str(e)}")
# 캐시 실패 시 직접 벡터DB 조회
try:
return await pinecone_client.search_similar_users(user_id, top_k=top_k)
except:
return []
async def _invalidate_similar_users_cache(self, user_id: int):
"""유사 사용자 캐시 무효화"""
try:
cache_key = redis_client.generate_similar_users_key(user_id)
await redis_client.delete(cache_key)
self.log_operation("invalidate_similar_users_cache", user_id=user_id)
except Exception as e:
self.logger.error(f"캐시 무효화 실패 - user_id: {user_id}, error: {str(e)}")
# 기존 메소드들 (변경 없음)
async def _get_mission_info(self, mission_id: int) -> Dict[str, Any]:
"""미션 ID로 DB에서 미션 정보 조회"""
try:
mission_info = await self.mission_repository.get_mission_by_id(mission_id)
if not mission_info:
raise HealthDataNotFoundException(mission_id)
self.log_operation("get_mission_info_success",
mission_id=mission_id,
mission_name=mission_info.get("mission_name"))
return mission_info
except HealthDataNotFoundException:
# 미션을 찾을 수 없는 경우
raise
except Exception as e:
self.logger.error(f"미션 정보 조회 실패 - mission_id: {mission_id}, error: {str(e)}")
raise DatabaseException(f"미션 정보 조회 중 오류가 발생했습니다: {str(e)}")
def _build_celebration_prompt(self, user_id: int, mission_info: Dict[str, Any]) -> str:
"""미션 정보를 기반으로 축하 메시지 프롬프트 생성"""
try:
prompt_template = get_celebration_prompt()
# DB에서 조회한 실제 미션 정보 사용
mission_name = mission_info.get("mission_name", "미션")
mission_description = mission_info.get("mission_description", "")
daily_target = mission_info.get("daily_target_count", 1)
formatted_prompt = prompt_template.format(
mission_name=mission_name,
mission_description=mission_description,
daily_target_count=daily_target,
user_id=user_id
)
self.log_operation("build_celebration_prompt",
user_id=user_id,
mission_name=mission_name,
prompt_length=len(formatted_prompt))
return formatted_prompt
except Exception as e:
self.logger.error(f"축하 프롬프트 생성 실패: {str(e)}")
raise Exception(f"축하 프롬프트 생성 실패: {str(e)}")
async def _build_recommendation_prompt(self, user_data: Dict[str, Any], health_data: Dict[str, Any]) -> str:
"""사용자 데이터를 기반으로 프롬프트 생성"""
try:
prompt_template = get_mission_recommendation_prompt()
# 프롬프트에 데이터 매핑
formatted_prompt = prompt_template.format(
# 사용자 기본 정보
occupation=user_data.get("occupation", "정보 없음"),
age=health_data.get("age", "정보 없음"),
# 신체 정보
height=health_data.get("height", "정보 없음"),
weight=health_data.get("weight", "정보 없음"),
waist_circumference=health_data.get("waist_circumference", "정보 없음"),
# 혈압 및 혈당
systolic_bp=health_data.get("systolic_bp", "정보 없음"),
diastolic_bp=health_data.get("diastolic_bp", "정보 없음"),
fasting_glucose=health_data.get("fasting_glucose", "정보 없음"),
# 콜레스테롤
total_cholesterol=health_data.get("total_cholesterol", "정보 없음"),
hdl_cholesterol=health_data.get("hdl_cholesterol", "정보 없음"),
ldl_cholesterol=health_data.get("ldl_cholesterol", "정보 없음"),
triglyceride=health_data.get("triglyceride", "정보 없음"),
# 기타 혈액 검사
hemoglobin=health_data.get("hemoglobin", "정보 없음"),
serum_creatinine=health_data.get("serum_creatinine", "정보 없음"),
ast=health_data.get("ast", "정보 없음"),
alt=health_data.get("alt", "정보 없음"),
gamma_gtp=health_data.get("gamma_gtp", "정보 없음"),
urine_protein=health_data.get("urine_protein", "정보 없음"),
# 감각기관
visual_acuity_left=health_data.get("visual_acuity_left", "정보 없음"),
visual_acuity_right=health_data.get("visual_acuity_right", "정보 없음"),
hearing_left=health_data.get("hearing_left", "정보 없음"),
hearing_right=health_data.get("hearing_right", "정보 없음"),
# 생활습관
smoking_status=self._convert_smoking_status(health_data.get("smoking_status")),
drinking_status=self._convert_drinking_status(health_data.get("drinking_status"))
)
self.log_operation("build_recommendation_prompt", user_id="N/A", prompt_length=len(formatted_prompt))
return formatted_prompt
except Exception as e:
self.logger.error(f"프롬프트 생성 실패: {str(e)}")
raise Exception(f"프롬프트 생성 실패: {str(e)}")
def _convert_claude_to_missions(self, claude_json: Dict[str, Any]) -> List[RecommendedMission]:
"""Claude JSON 응답을 RecommendedMission 리스트로 변환"""
try:
missions_data = claude_json.get("missions", [])
recommended_missions = []
for mission_data in missions_data:
mission = RecommendedMission(
title=mission_data.get("title", ""),
daily_target_count=mission_data.get("daily_target_count", 1),
reason=mission_data.get("reason", "")
)
recommended_missions.append(mission)
self.log_operation("convert_claude_to_missions", mission_count=len(recommended_missions))
return recommended_missions
except Exception as e:
self.logger.error(f"Claude 응답 변환 실패: {str(e)}")
raise Exception(f"Claude 응답 변환 실패: {str(e)}")
def _convert_smoking_status(self, status: int) -> str:
"""흡연 상태 코드를 텍스트로 변환"""
smoking_map = {
0: "비흡연",
1: "과거 흡연",
2: "현재 흡연"
}
return smoking_map.get(status, "정보 없음")
def _convert_drinking_status(self, status: int) -> str:
"""음주 상태 코드를 텍스트로 변환"""
drinking_map = {
0: "비음주",
1: "음주"
}
return drinking_map.get(status, "정보 없음")
async def upsert_all_user_vectors(self, reset_index: bool = False) -> Dict[str, Any]:
"""모든 사용자의 벡터를 일괄 저장/업데이트 (선택적 인덱스 초기화)"""
start_time = time.time()
try:
self.log_operation("upsert_all_user_vectors_start", reset_index=reset_index)
# 1. 선택적 인덱스 초기화
if reset_index:
self.logger.info("🔄 인덱스 초기화 옵션 활성화 - 모든 벡터 삭제 후 재생성")
reset_success = await pinecone_client.reset_index()
if not reset_success:
self.logger.warning("⚠️ 인덱스 초기화 실패 - 기존 벡터와 함께 진행")
else:
self.logger.info("✅ 인덱스 초기화 완료 - 깨끗한 상태에서 시작")
# 2. 전체 사용자 목록 조회
all_users = await self._get_all_users_for_vector_processing()
total_users = len(all_users)
if total_users == 0:
self.logger.warning("⚠️ 처리할 사용자가 없습니다.")
return {
"success": True,
"total_users": 0,
"existing_vectors": 0,
"new_vectors": 0,
"failed": 0,
"processing_time_seconds": 0,
"reset_performed": reset_index,
"message": "처리할 사용자가 없습니다."
}
self.logger.info(f"📊 전체 사용자 수: {total_users}")
# 3. 기존 벡터 확인 (초기화한 경우 0개)
existing_vector_ids = []
existing_count = 0
if not reset_index:
existing_vector_ids = await self._get_existing_vector_ids()
existing_count = len(existing_vector_ids)
self.logger.info(f"📊 기존 벡터 수: {existing_count}")
# 4. 처리 대상 사용자 결정
if reset_index:
# 초기화한 경우 모든 사용자 처리
users_to_process = all_users
new_users_count = total_users
self.logger.info(f"📊 초기화 후 전체 처리 대상: {new_users_count}")
else:
# 초기화하지 않은 경우 신규 사용자만 처리
users_to_process = []
for user in all_users:
user_id = user.get("member_serial_number")
if str(user_id) not in existing_vector_ids:
users_to_process.append(user)
new_users_count = len(users_to_process)
self.logger.info(f"📊 신규 처리 대상: {new_users_count}")
if new_users_count == 0:
self.logger.info("✅ 모든 사용자의 벡터가 이미 존재합니다.")
return {
"success": True,
"total_users": total_users,
"existing_vectors": existing_count,
"new_vectors": 0,
"failed": 0,
"processing_time_seconds": round(time.time() - start_time, 2),
"reset_performed": reset_index,
"message": "모든 사용자의 벡터가 이미 존재합니다."
}
# 5. 사용자 벡터 일괄 저장
success_count = 0
failed_count = 0
for i, user in enumerate(users_to_process, 1):
user_id = user.get("member_serial_number")
try:
# 진행률 로깅 (10개마다 또는 중요 지점)
if i % 10 == 0 or i == 1 or i == new_users_count:
progress = (i / new_users_count) * 100
self.logger.info(f"🔄 진행률: {i}/{new_users_count} ({progress:.1f}%) - "
f"현재 처리: user_id={user_id}")
# 개별 사용자 벡터 저장
success = await pinecone_client.upsert_user_vector(user_id, user)
if success:
success_count += 1
self.logger.debug(f"✅ 벡터 저장 성공 - user_id: {user_id}")
else:
failed_count += 1
self.logger.warning(f"❌ 벡터 저장 실패 - user_id: {user_id}")
# API 요청 간 짧은 대기 (Pinecone API 한도 고려)
if i % 5 == 0:
await self._short_delay(0.1)
except Exception as e:
failed_count += 1
self.logger.error(f"❌ 사용자 벡터 처리 실패 - user_id: {user_id}, error: {str(e)}")
continue
# 6. 최종 인덱스 통계 확인
final_stats = await pinecone_client.get_index_stats()
final_vector_count = final_stats.get('total_vector_count', 0)
# 7. 결과 정리
processing_time = round(time.time() - start_time, 2)
result = {
"success": failed_count == 0,
"total_users": total_users,
"existing_vectors": existing_count if not reset_index else 0,
"new_vectors": success_count,
"failed": failed_count,
"final_vector_count": final_vector_count,
"processing_time_seconds": processing_time,
"processed_users": new_users_count,
"success_rate": round((success_count / new_users_count) * 100, 1) if new_users_count > 0 else 100,
"reset_performed": reset_index
}
self.log_operation("upsert_all_user_vectors_complete",
total_users=total_users,
existing_vectors=existing_count,
new_vectors=success_count,
failed=failed_count,
reset_performed=reset_index,
processing_time=processing_time)
return result
except Exception as e:
self.logger.error(f"❌ 벡터 일괄 처리 전체 실패: {str(e)}")
raise DatabaseException(f"벡터 일괄 처리 실패: {str(e)}")
async def _get_all_users_for_vector_processing(self) -> List[Dict[str, Any]]:
"""벡터 처리를 위한 모든 사용자 데이터 조회"""
try:
all_users = await self.similar_mission_repository.get_all_users_for_vector()
self.logger.info(f"📊 사용자 데이터 조회 완료 - 총 {len(all_users)}")
return all_users
except Exception as e:
self.logger.error(f"❌ 전체 사용자 조회 실패: {str(e)}")
raise DatabaseException(f"전체 사용자 조회 실패: {str(e)}")
async def _get_existing_vector_ids(self) -> List[str]:
"""Pinecone에서 기존 벡터 ID 목록 조회"""
try:
# Pinecone 초기화 확인
if not await pinecone_client.initialize():
self.logger.warning("⚠️ Pinecone 초기화 실패 - 기존 벡터 조회 건너뜀")
return []
# 인덱스 통계 조회
def get_index_stats():
return pinecone_client.index.describe_index_stats()
stats = await self._safe_async_execute(get_index_stats, timeout=10)
if not stats:
self.logger.warning("⚠️ 인덱스 통계 조회 실패")
return []
vector_count = stats.get('total_vector_count', 0)
self.logger.info(f"📊 Pinecone 인덱스 벡터 수: {vector_count}")
# 모든 벡터 ID 조회 (대량 데이터의 경우 최적화 필요)
if vector_count == 0:
return []
# 임시로 빈 목록 반환 (실제로는 Pinecone list 기능 구현 필요)
# Pinecone은 모든 ID를 한번에 조회하는 기능이 제한적이므로
# 여기서는 간단히 빈 목록을 반환하여 모든 사용자를 처리하도록 함
return []
except Exception as e:
self.logger.warning(f"⚠️ 기존 벡터 ID 조회 실패: {str(e)}")
return []
async def _safe_async_execute(self, func, timeout: int = 10):
"""안전한 비동기 함수 실행"""
try:
import asyncio
return await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(None, func),
timeout=timeout
)
except Exception as e:
self.logger.warning(f"⚠️ 비동기 실행 실패: {str(e)}")
return None
async def _short_delay(self, seconds: float):
"""짧은 대기"""
try:
import asyncio
await asyncio.sleep(seconds)
except Exception:
pass