# app/utils/database_utils.py """ HealthSync Motivator Batch 데이터베이스 유틸리티 """ import databases import logging from typing import Dict, Any, List, Optional from app.config.settings import settings from app.repositories.queries import BaseQueries logger = logging.getLogger(__name__) class SimpleDatabase: """PostgreSQL 데이터베이스 연결 클래스""" def __init__(self): self.database = databases.Database(settings.database_url) self._connected = False async def connect(self): """데이터베이스 연결""" if not self._connected: try: await self.database.connect() self._connected = True logger.info("데이터베이스 연결 성공") except Exception as e: logger.error(f"데이터베이스 연결 실패: {str(e)}") raise async def disconnect(self): """데이터베이스 연결 해제""" if self._connected: try: await self.database.disconnect() self._connected = False logger.info("데이터베이스 연결 해제") except Exception as e: logger.error(f"데이터베이스 연결 해제 실패: {str(e)}") async def test_connection(self) -> bool: """데이터베이스 연결 테스트""" try: if not self._connected: await self.connect() result = await self.database.fetch_val(BaseQueries.CONNECTION_TEST) return result == 1 except Exception as e: logger.error(f"데이터베이스 연결 테스트 실패: {str(e)}") return False async def execute_query(self, query: str, values: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """쿼리 실행 (SELECT)""" try: if not self._connected: await self.connect() rows = await self.database.fetch_all(query, values or {}) return [dict(row) for row in rows] except Exception as e: logger.error(f"쿼리 실행 실패: {str(e)}") logger.error(f"쿼리: {query}") logger.error(f"파라미터: {values}") raise Exception(f"쿼리 실행 실패: {str(e)}") async def execute_insert(self, query: str, values: Optional[Dict[str, Any]] = None) -> int: """INSERT 쿼리 실행""" try: if not self._connected: await self.connect() result = await self.database.execute(query, values or {}) return result except Exception as e: logger.error(f"INSERT 실행 실패: {str(e)}") logger.error(f"쿼리: {query}") logger.error(f"파라미터: {values}") raise Exception(f"INSERT 실행 실패: {str(e)}") # 전역 데이터베이스 인스턴스 db = SimpleDatabase()