feat: init rag service

This commit is contained in:
djeon
2025-10-29 05:54:08 +09:00
parent 44ae9c546f
commit 5d897cb845
54 changed files with 6425 additions and 0 deletions
View File
Binary file not shown.
View File
Binary file not shown.
Binary file not shown.
+506
View File
@@ -0,0 +1,506 @@
"""
Vector DB 통합 시스템 FastAPI 애플리케이션
"""
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import List, Dict, Any
import logging
from pathlib import Path
from ..models.term import (
Term,
TermSearchRequest,
TermSearchResult,
TermExplainRequest,
TermExplanation,
TermStats
)
from ..models.document import (
DocumentSearchRequest,
DocumentSearchResult,
DocumentStats
)
from ..models.minutes import (
MinutesSearchRequest,
MinutesSearchResult
)
from ..db.postgres_vector import PostgresVectorDB
from ..db.azure_search import AzureAISearchDB
from ..db.rag_minutes_db import RagMinutesDB
from ..services.claude_service import ClaudeService
from ..utils.config import load_config, get_database_url
from ..utils.embedding import EmbeddingGenerator
from ..utils.text_processor import extract_nouns_as_query
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# FastAPI 앱 생성
app = FastAPI(
title="Vector DB 통합 시스템",
description="회의록 작성 시스템을 위한 Vector DB 기반 용어집 및 관련자료 검색 API",
version="1.0.0"
)
# CORS 설정
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 전역 변수 (의존성 주입용)
_config = None
_term_db = None
_doc_db = None
_rag_minutes_db = None
_embedding_gen = None
_claude_service = None
def get_config():
"""설정 로드"""
global _config
if _config is None:
config_path = Path(__file__).parent.parent.parent / "config.yaml"
_config = load_config(str(config_path))
return _config
def get_term_db():
"""용어집 DB 연결"""
global _term_db
if _term_db is None:
config = get_config()
db_url = get_database_url(config)
_term_db = PostgresVectorDB(db_url)
return _term_db
def get_doc_db():
"""관련자료 DB 연결"""
global _doc_db
if _doc_db is None:
config = get_config()
azure_search = config["azure_search"]
_doc_db = AzureAISearchDB(
endpoint=azure_search["endpoint"],
api_key=azure_search["api_key"],
index_name=azure_search["index_name"],
api_version=azure_search["api_version"]
)
return _doc_db
def get_rag_minutes_db():
"""RAG 회의록 DB 연결"""
global _rag_minutes_db
if _rag_minutes_db is None:
config = get_config()
db_url = get_database_url(config)
_rag_minutes_db = RagMinutesDB(db_url)
return _rag_minutes_db
def get_embedding_gen():
"""임베딩 생성기"""
global _embedding_gen
if _embedding_gen is None:
config = get_config()
azure_openai = config["azure_openai"]
_embedding_gen = EmbeddingGenerator(
api_key=azure_openai["api_key"],
endpoint=azure_openai["endpoint"],
model=azure_openai["embedding_model"],
dimension=azure_openai["embedding_dimension"],
api_version=azure_openai["api_version"]
)
return _embedding_gen
def get_claude_service():
"""Claude 서비스"""
global _claude_service
if _claude_service is None:
config = get_config()
claude = config["claude"]
_claude_service = ClaudeService(
api_key=claude["api_key"],
model=claude["model"],
max_tokens=claude["max_tokens"],
temperature=claude["temperature"]
)
return _claude_service
# ============================================================================
# 용어집 API
# ============================================================================
@app.get("/")
async def root():
"""루트 엔드포인트"""
return {
"service": "Vector DB 통합 시스템",
"version": "1.0.0",
"endpoints": {
"용어집": "/api/terms/*",
"관련자료": "/api/documents/*"
}
}
@app.post("/api/terms/search", response_model=List[TermSearchResult])
async def search_terms(
request: TermSearchRequest,
term_db: PostgresVectorDB = Depends(get_term_db),
embedding_gen: EmbeddingGenerator = Depends(get_embedding_gen)
):
"""
용어 검색 (Hybrid: Keyword + Vector)
Args:
request: 검색 요청
Returns:
검색 결과 리스트
"""
try:
config = get_config()
# 명사 추출하여 검색 쿼리 생성
search_query = extract_nouns_as_query(request.query)
logger.info(f"검색 쿼리 변환: '{request.query}''{search_query}'")
if request.search_type == "keyword":
# 키워드 검색
results = term_db.search_by_keyword(
query=search_query,
top_k=request.top_k,
confidence_threshold=request.confidence_threshold
)
elif request.search_type == "vector":
# 벡터 검색 (임베딩은 원본 쿼리 사용)
query_embedding = embedding_gen.generate_embedding(search_query)
results = term_db.search_by_vector(
query_embedding=query_embedding,
top_k=request.top_k,
confidence_threshold=request.confidence_threshold
)
else: # hybrid
# 하이브리드 검색
keyword_results = term_db.search_by_keyword(
query=search_query,
top_k=request.top_k,
confidence_threshold=request.confidence_threshold
)
query_embedding = embedding_gen.generate_embedding(search_query)
vector_results = term_db.search_by_vector(
query_embedding=query_embedding,
top_k=request.top_k,
confidence_threshold=request.confidence_threshold
)
# RRF 통합
keyword_weight = config["term_glossary"]["search"]["keyword_weight"]
vector_weight = config["term_glossary"]["search"]["vector_weight"]
# 간단한 가중합
results = []
seen_ids = set()
for result in keyword_results:
term_id = result["term"].term_id
if term_id not in seen_ids:
result["relevance_score"] *= keyword_weight
result["match_type"] = "hybrid"
results.append(result)
seen_ids.add(term_id)
for result in vector_results:
term_id = result["term"].term_id
if term_id not in seen_ids:
result["relevance_score"] *= vector_weight
result["match_type"] = "hybrid"
results.append(result)
seen_ids.add(term_id)
# 점수 기준 재정렬
results.sort(key=lambda x: x["relevance_score"], reverse=True)
results = results[:request.top_k]
# 응답 형식으로 변환
return [
TermSearchResult(
term=result["term"],
relevance_score=result["relevance_score"],
match_type=result["match_type"]
)
for result in results
]
except Exception as e:
logger.error(f"용어 검색 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/terms/{term_id}", response_model=Term)
async def get_term(
term_id: str,
term_db: PostgresVectorDB = Depends(get_term_db)
):
"""
용어 상세 조회
Args:
term_id: 용어 ID
Returns:
용어 객체
"""
try:
term = term_db.get_term_by_id(term_id)
if not term:
raise HTTPException(status_code=404, detail="용어를 찾을 수 없습니다")
return term
except HTTPException:
raise
except Exception as e:
logger.error(f"용어 조회 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/terms/{term_id}/explain", response_model=TermExplanation)
async def explain_term(
term_id: str,
request: TermExplainRequest,
term_db: PostgresVectorDB = Depends(get_term_db),
claude_service: ClaudeService = Depends(get_claude_service)
):
"""
용어 맥락 기반 설명 생성 (Claude AI)
Args:
term_id: 용어 ID
request: 설명 요청
Returns:
용어 설명
"""
try:
# 용어 조회
term = term_db.get_term_by_id(term_id)
if not term:
raise HTTPException(status_code=404, detail="용어를 찾을 수 없습니다")
# Claude AI 호출
result = claude_service.explain_term(
term_name=term.term_name,
definition=term.definition,
context=term.context,
meeting_context=request.meeting_context
)
return TermExplanation(
term=term,
explanation=result["explanation"],
context_documents=[],
generated_by=result["generated_by"],
cached=result["cached"]
)
except HTTPException:
raise
except Exception as e:
logger.error(f"용어 설명 생성 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/terms/stats", response_model=TermStats)
async def get_term_stats(term_db: PostgresVectorDB = Depends(get_term_db)):
"""용어 통계 조회"""
try:
stats = term_db.get_stats()
return TermStats(
total_terms=stats["total_terms"],
by_category=stats["by_category"],
by_source_type={},
avg_confidence=stats["avg_confidence"]
)
except Exception as e:
logger.error(f"통계 조회 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# 관련자료 API
# ============================================================================
@app.post("/api/documents/search", response_model=List[DocumentSearchResult])
async def search_documents(
request: DocumentSearchRequest,
doc_db: AzureAISearchDB = Depends(get_doc_db),
embedding_gen: EmbeddingGenerator = Depends(get_embedding_gen)
):
"""
관련 문서 검색 (Hybrid Search + Semantic Ranking)
Args:
request: 검색 요청
Returns:
검색 결과 리스트
"""
try:
# 쿼리 임베딩 생성
query_embedding = embedding_gen.generate_embedding(request.query)
# Hybrid Search 실행
results = doc_db.hybrid_search(
query=request.query,
query_embedding=query_embedding,
top_k=request.top_k,
folder=request.folder,
document_type=request.document_type,
semantic_ranking=request.semantic_ranking
)
# 관련도 임계값 필터링
filtered_results = [
r for r in results
if r["relevance_score"] >= request.relevance_threshold
]
# 응답 형식으로 변환
return [
DocumentSearchResult(**result)
for result in filtered_results
]
except Exception as e:
logger.error(f"문서 검색 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/documents/stats", response_model=DocumentStats)
async def get_document_stats(doc_db: AzureAISearchDB = Depends(get_doc_db)):
"""문서 통계 조회"""
try:
stats = doc_db.get_stats()
return DocumentStats(
total_documents=stats["total_documents"],
by_type=stats["by_type"],
by_domain={},
total_chunks=stats["total_chunks"]
)
except Exception as e:
logger.error(f"통계 조회 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# RAG 회의록 API
# ============================================================================
@app.post("/api/minutes/search", response_model=List[MinutesSearchResult])
async def search_related_minutes(
request: MinutesSearchRequest,
rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db),
embedding_gen: EmbeddingGenerator = Depends(get_embedding_gen)
):
"""
연관 회의록 검색 (Vector Similarity)
Args:
request: 검색 요청
Returns:
유사 회의록 리스트
"""
try:
# 쿼리 임베딩 생성
logger.info(f"회의록 검색 시작: {request.query[:50]}...")
query_embedding = embedding_gen.generate_embedding(request.query)
# 벡터 유사도 검색
results = rag_minutes_db.search_by_vector(
query_embedding=query_embedding,
top_k=request.top_k,
similarity_threshold=request.similarity_threshold
)
# 응답 형식으로 변환
search_results = [
MinutesSearchResult(
minutes=result["minutes"],
similarity_score=result["similarity_score"]
)
for result in results
]
logger.info(f"회의록 검색 완료: {len(search_results)}개 결과")
return search_results
except Exception as e:
logger.error(f"회의록 검색 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/minutes/{minutes_id}")
async def get_minutes(
minutes_id: str,
rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db)
):
"""
회의록 상세 조회
Args:
minutes_id: 회의록 ID
Returns:
회의록 객체
"""
try:
minutes = rag_minutes_db.get_minutes_by_id(minutes_id)
if not minutes:
raise HTTPException(status_code=404, detail="회의록을 찾을 수 없습니다")
return minutes
except HTTPException:
raise
except Exception as e:
logger.error(f"회의록 조회 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/minutes/stats")
async def get_minutes_stats(rag_minutes_db: RagMinutesDB = Depends(get_rag_minutes_db)):
"""회의록 통계 조회"""
try:
stats = rag_minutes_db.get_stats()
return stats
except Exception as e:
logger.error(f"통계 조회 실패: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
View File
Binary file not shown.
Binary file not shown.
+359
View File
@@ -0,0 +1,359 @@
"""
Azure AI Search 관련자료 DB
"""
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
SearchIndex,
SimpleField,
SearchableField,
SearchField,
VectorSearch,
HnswAlgorithmConfiguration,
VectorSearchProfile,
SemanticConfiguration,
SemanticField,
SemanticPrioritizedFields,
SemanticSearch,
SearchFieldDataType
)
from azure.search.documents.models import (
VectorizedQuery,
QueryType,
QueryCaptionType,
QueryAnswerType
)
from typing import List, Dict, Any, Optional
import logging
from ..models.document import DocumentChunk
logger = logging.getLogger(__name__)
class AzureAISearchDB:
"""Azure AI Search 관련자료 데이터베이스"""
def __init__(
self,
endpoint: str,
api_key: str,
index_name: str = "meeting-minutes-index",
api_version: str = "2023-11-01"
):
"""
초기화
Args:
endpoint: Azure AI Search 엔드포인트
api_key: API 키
index_name: 인덱스 이름
api_version: API 버전
"""
self.endpoint = endpoint
self.api_key = api_key
self.index_name = index_name
credential = AzureKeyCredential(api_key)
self.search_client = SearchClient(
endpoint=endpoint,
index_name=index_name,
credential=credential
)
self.index_client = SearchIndexClient(
endpoint=endpoint,
credential=credential
)
def create_index(self):
"""
인덱스 생성 (스키마 정의)
"""
# 필드 정의
fields = [
SimpleField(name="id", type=SearchFieldDataType.String, key=True),
SimpleField(name="documentId", type=SearchFieldDataType.String, filterable=True),
SimpleField(name="documentType", type=SearchFieldDataType.String, filterable=True, facetable=True),
SearchableField(name="title", type=SearchFieldDataType.String, analyzer_name="ko.lucene"),
SimpleField(name="folder", type=SearchFieldDataType.String, filterable=True, facetable=True),
SimpleField(name="createdDate", type=SearchFieldDataType.DateTimeOffset, filterable=True, sortable=True),
SearchField(
name="participants",
type=SearchFieldDataType.Collection(SearchFieldDataType.String),
searchable=True,
filterable=True,
facetable=True
),
SearchField(
name="keywords",
type=SearchFieldDataType.Collection(SearchFieldDataType.String),
searchable=True,
facetable=True
),
SimpleField(name="agendaId", type=SearchFieldDataType.String, filterable=True),
SearchableField(name="agendaTitle", type=SearchFieldDataType.String, analyzer_name="ko.lucene"),
SimpleField(name="chunkIndex", type=SearchFieldDataType.Int32, filterable=True, sortable=True),
SearchableField(name="content", type=SearchFieldDataType.String, analyzer_name="ko.lucene"),
SearchField(
name="contentVector",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
searchable=True,
vector_search_dimensions=1536,
vector_search_profile_name="meeting-vector-profile"
),
SimpleField(name="tokenCount", type=SearchFieldDataType.Int32, filterable=True)
]
# 벡터 검색 설정
vector_search = VectorSearch(
profiles=[
VectorSearchProfile(
name="meeting-vector-profile",
algorithm_configuration_name="meeting-hnsw"
)
],
algorithms=[
HnswAlgorithmConfiguration(
name="meeting-hnsw",
parameters={
"m": 4,
"efConstruction": 400,
"efSearch": 500,
"metric": "cosine"
}
)
]
)
# Semantic Search 설정
semantic_config = SemanticConfiguration(
name="meeting-semantic-config",
prioritized_fields=SemanticPrioritizedFields(
title_field=SemanticField(field_name="title"),
content_fields=[SemanticField(field_name="content")],
keywords_fields=[SemanticField(field_name="keywords")]
)
)
semantic_search = SemanticSearch(
configurations=[semantic_config]
)
# 인덱스 생성
index = SearchIndex(
name=self.index_name,
fields=fields,
vector_search=vector_search,
semantic_search=semantic_search
)
try:
self.index_client.create_or_update_index(index)
logger.info(f"Azure AI Search 인덱스 생성 완료: {self.index_name}")
except Exception as e:
logger.error(f"인덱스 생성 실패: {str(e)}")
raise
def upload_documents(self, chunks: List[DocumentChunk]) -> bool:
"""
문서 업로드 (배치)
Args:
chunks: 문서 청크 리스트
Returns:
성공 여부
"""
if not chunks:
return True
try:
# Pydantic 모델을 딕셔너리로 변환
documents = [chunk.dict() for chunk in chunks]
# 배치 업로드 (최대 1000개씩)
batch_size = 1000
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
result = self.search_client.upload_documents(documents=batch)
logger.info(f"배치 {i//batch_size + 1}: {len(batch)}개 문서 업로드 완료")
return True
except Exception as e:
logger.error(f"문서 업로드 실패: {str(e)}")
return False
def hybrid_search(
self,
query: str,
query_embedding: List[float],
top_k: int = 3,
folder: Optional[str] = None,
document_type: Optional[str] = None,
semantic_ranking: bool = True
) -> List[Dict[str, Any]]:
"""
Hybrid Search (Keyword + Vector + Semantic Ranking)
Args:
query: 검색 쿼리
query_embedding: 쿼리 임베딩 벡터
top_k: 반환할 최대 결과 수
folder: 폴더 필터
document_type: 문서 타입 필터
semantic_ranking: Semantic Ranking 사용 여부
Returns:
검색 결과 리스트
"""
try:
# Vector Query
vector_query = VectorizedQuery(
vector=query_embedding,
k_nearest_neighbors=50,
fields="contentVector"
)
# 필터 생성
filter_parts = []
if folder:
filter_parts.append(f"folder eq '{folder}'")
if document_type:
filter_parts.append(f"documentType eq '{document_type}'")
filter_expression = " and ".join(filter_parts) if filter_parts else None
# 검색 옵션 설정
search_params = {
"search_text": query,
"vector_queries": [vector_query],
"select": ["documentId", "title", "createdDate", "content", "agendaTitle", "folder"],
"top": 50 if semantic_ranking else top_k,
"filter": filter_expression
}
# Semantic Ranking 활성화
if semantic_ranking:
search_params.update({
"query_type": QueryType.SEMANTIC,
"semantic_configuration_name": "meeting-semantic-config",
"query_caption": QueryCaptionType.EXTRACTIVE,
"query_answer": QueryAnswerType.EXTRACTIVE
})
# 검색 실행
results = self.search_client.search(**search_params)
# 결과 처리
search_results = []
for i, result in enumerate(results):
if i >= top_k:
break
# Reranking Score (Semantic Ranking 또는 BM25 Score)
score = result.get("@search.reranker_score", result.get("@search.score", 0.0))
# 관련도 레벨 결정
if score >= 3.0: # Semantic Ranking 점수 기준
relevance_level = "HIGH"
elif score >= 2.0:
relevance_level = "MEDIUM"
else:
relevance_level = "LOW"
# Caption 추출 (Semantic Captions)
captions = result.get("@search.captions", [])
excerpt = captions[0].text if captions else result["content"][:300]
search_results.append({
"document_id": result["documentId"],
"title": result["title"],
"document_type": result.get("documentType", "unknown"),
"created_date": result.get("createdDate"),
"relevance_score": min(score / 4.0, 1.0), # 0~1 정규화
"relevance_level": relevance_level,
"content_excerpt": excerpt,
"folder": result.get("folder")
})
return search_results
except Exception as e:
logger.error(f"Hybrid Search 실패: {str(e)}")
return []
def delete_documents_by_id(self, document_id: str) -> bool:
"""
문서 ID로 모든 청크 삭제
Args:
document_id: 문서 ID
Returns:
성공 여부
"""
try:
# 해당 문서의 모든 청크 조회
results = self.search_client.search(
search_text="*",
filter=f"documentId eq '{document_id}'",
select=["id"]
)
# 청크 ID 수집
chunk_ids = [{"id": result["id"]} for result in results]
if chunk_ids:
# 배치 삭제
self.search_client.delete_documents(documents=chunk_ids)
logger.info(f"문서 {document_id}{len(chunk_ids)}개 청크 삭제 완료")
return True
except Exception as e:
logger.error(f"문서 삭제 실패 ({document_id}): {str(e)}")
return False
def get_stats(self) -> Dict[str, Any]:
"""
인덱스 통계 조회
Returns:
통계 정보
"""
try:
# 전체 문서 수 (중복 제거)
results = self.search_client.search(
search_text="*",
select=["documentId", "documentType"],
top=10000
)
document_ids = set()
type_counts = {}
for result in results:
doc_id = result.get("documentId")
doc_type = result.get("documentType", "unknown")
if doc_id:
document_ids.add(doc_id)
type_counts[doc_type] = type_counts.get(doc_type, 0) + 1
return {
"total_documents": len(document_ids),
"total_chunks": sum(type_counts.values()),
"by_type": type_counts
}
except Exception as e:
logger.error(f"통계 조회 실패: {str(e)}")
return {
"total_documents": 0,
"total_chunks": 0,
"by_type": {}
}
+381
View File
@@ -0,0 +1,381 @@
"""
PostgreSQL + pgvector 용어집 DB
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import List, Optional, Dict, Any
from contextlib import contextmanager
import logging
import json
from ..models.term import Term
from ..utils.embedding import cosine_similarity
logger = logging.getLogger(__name__)
class PostgresVectorDB:
"""PostgreSQL + pgvector 용어집 데이터베이스"""
def __init__(self, connection_string: str):
"""
초기화
Args:
connection_string: PostgreSQL 연결 문자열
"""
self.connection_string = connection_string
@contextmanager
def get_connection(self):
"""데이터베이스 연결 컨텍스트 매니저"""
conn = psycopg2.connect(self.connection_string)
try:
yield conn
finally:
conn.close()
@staticmethod
def _parse_embedding(embedding_str: Optional[str]) -> Optional[List[float]]:
"""
PostgreSQL vector 타입을 Python 리스트로 변환
Args:
embedding_str: PostgreSQL에서 반환된 vector 문자열 (예: "[-0.003,0.01,...]")
Returns:
float 리스트 또는 None
"""
if not embedding_str:
return None
try:
# vector 타입은 "[1,2,3]" 형태의 문자열로 반환됨
if isinstance(embedding_str, str):
return json.loads(embedding_str)
elif isinstance(embedding_str, list):
return embedding_str
return None
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"임베딩 파싱 실패: {str(e)}")
return None
@staticmethod
def _row_to_term(row: Dict[str, Any]) -> Term:
"""
데이터베이스 row를 Term 객체로 변환
Args:
row: 데이터베이스 row (dict)
Returns:
Term 객체
"""
term_dict = dict(row)
# embedding 필드 파싱
if "embedding" in term_dict:
term_dict["embedding"] = PostgresVectorDB._parse_embedding(term_dict["embedding"])
term_dict.pop('embedding')
return Term(**term_dict)
def init_database(self):
"""
데이터베이스 초기화 (테이블 및 인덱스 생성)
"""
with self.get_connection() as conn:
with conn.cursor() as cur:
# pgvector 확장 설치
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
# terms 테이블 생성
cur.execute("""
CREATE TABLE IF NOT EXISTS terms (
term_id VARCHAR(100) PRIMARY KEY,
term_name VARCHAR(200) NOT NULL,
normalized_name VARCHAR(200) NOT NULL,
category VARCHAR(100),
definition TEXT NOT NULL,
context TEXT,
synonyms JSONB DEFAULT '[]',
related_terms JSONB DEFAULT '[]',
document_source JSONB,
confidence_score DECIMAL(3,2) DEFAULT 0.0,
usage_count INTEGER DEFAULT 0,
last_updated VARCHAR(50),
embedding vector(1536),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 인덱스 생성
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_terms_normalized_name
ON terms(normalized_name)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_terms_category
ON terms(category)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_terms_confidence
ON terms(confidence_score DESC)
""")
# 벡터 유사도 검색용 인덱스 (IVFFlat)
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_terms_embedding
ON terms USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)
""")
# term_usage_logs 테이블 (사용 이력)
cur.execute("""
CREATE TABLE IF NOT EXISTS term_usage_logs (
log_id SERIAL PRIMARY KEY,
term_id VARCHAR(100) REFERENCES terms(term_id) ON DELETE CASCADE,
user_id VARCHAR(100),
meeting_id VARCHAR(100),
action VARCHAR(20),
feedback_rating INTEGER,
feedback_comment TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS idx_usage_term_id
ON term_usage_logs(term_id, created_at DESC)
""")
conn.commit()
logger.info("PostgreSQL 데이터베이스 초기화 완료")
def insert_term(self, term: Term) -> bool:
"""
용어 삽입
Args:
term: 용어 객체
Returns:
성공 여부
"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
INSERT INTO terms (
term_id, term_name, normalized_name, category,
definition, context, synonyms, related_terms,
document_source, confidence_score, usage_count,
last_updated, embedding
) VALUES (
%s, %s, %s, %s, %s, %s, %s::jsonb, %s::jsonb,
%s::jsonb, %s, %s, %s, %s::vector
)
ON CONFLICT (term_id) DO UPDATE SET
term_name = EXCLUDED.term_name,
normalized_name = EXCLUDED.normalized_name,
category = EXCLUDED.category,
definition = EXCLUDED.definition,
context = EXCLUDED.context,
synonyms = EXCLUDED.synonyms,
related_terms = EXCLUDED.related_terms,
document_source = EXCLUDED.document_source,
confidence_score = EXCLUDED.confidence_score,
usage_count = EXCLUDED.usage_count,
last_updated = EXCLUDED.last_updated,
embedding = EXCLUDED.embedding,
updated_at = CURRENT_TIMESTAMP
""", (
term.term_id,
term.term_name,
term.normalized_name,
term.category,
term.definition,
term.context,
psycopg2.extras.Json(term.synonyms),
psycopg2.extras.Json(term.related_terms),
psycopg2.extras.Json(term.document_source.dict() if term.document_source else None),
term.confidence_score,
term.usage_count,
term.last_updated,
term.embedding
))
conn.commit()
return True
except Exception as e:
logger.error(f"용어 삽입 실패 ({term.term_id}): {str(e)}")
return False
def get_term_by_id(self, term_id: str) -> Optional[Term]:
"""
ID로 용어 조회
Args:
term_id: 용어 ID
Returns:
용어 객체 또는 None
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT * FROM terms WHERE term_id = %s
""", (term_id,))
row = cur.fetchone()
if row:
return self._row_to_term(row)
return None
def search_by_keyword(
self,
query: str,
top_k: int = 5,
confidence_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""
키워드 검색
Args:
query: 검색 쿼리
top_k: 반환할 최대 결과 수
confidence_threshold: 최소 신뢰도 임계값
Returns:
검색 결과 리스트
"""
normalized_query = query.lower().strip()
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT *,
CASE
WHEN normalized_name = %s THEN 1.0
WHEN normalized_name LIKE %s THEN 0.9
WHEN term_name ILIKE %s THEN 0.8
WHEN synonyms::text ILIKE %s THEN 0.7
ELSE 0.5
END as match_score
FROM terms
WHERE (
normalized_name LIKE %s
OR term_name ILIKE %s
OR synonyms::text ILIKE %s
OR definition ILIKE %s
)
AND confidence_score >= %s
ORDER BY match_score DESC, confidence_score DESC, usage_count DESC
LIMIT %s
""", (
normalized_query,
f"%{normalized_query}%",
f"%{query}%",
f"%{query}%",
f"%{normalized_query}%",
f"%{query}%",
f"%{query}%",
f"%{query}%",
confidence_threshold,
top_k
))
results = []
for row in cur.fetchall():
term_dict = dict(row)
match_score = term_dict.pop("match_score")
results.append({
"term": self._row_to_term(term_dict),
"relevance_score": float(match_score),
"match_type": "keyword"
})
return results
def search_by_vector(
self,
query_embedding: List[float],
top_k: int = 5,
confidence_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""
벡터 유사도 검색
Args:
query_embedding: 쿼리 임베딩 벡터
top_k: 반환할 최대 결과 수
confidence_threshold: 최소 신뢰도 임계값
Returns:
검색 결과 리스트
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT *,
1 - (embedding <=> %s::vector) as similarity_score
FROM terms
WHERE confidence_score >= %s
AND embedding IS NOT NULL
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (
query_embedding,
confidence_threshold,
query_embedding,
top_k
))
results = []
for row in cur.fetchall():
term_dict = dict(row)
similarity_score = term_dict.pop("similarity_score")
results.append({
"term": self._row_to_term(term_dict),
"relevance_score": float(similarity_score),
"match_type": "vector"
})
return results
def get_stats(self) -> Dict[str, Any]:
"""
용어 통계 조회
Returns:
통계 정보
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# 전체 통계
cur.execute("""
SELECT
COUNT(*) as total_terms,
AVG(confidence_score) as avg_confidence
FROM terms
""")
overall = cur.fetchone()
# 카테고리별 통계
cur.execute("""
SELECT category, COUNT(*) as count
FROM terms
GROUP BY category
ORDER BY count DESC
""")
by_category = {row["category"]: row["count"] for row in cur.fetchall()}
return {
"total_terms": overall["total_terms"],
"avg_confidence": float(overall["avg_confidence"]) if overall["avg_confidence"] else 0.0,
"by_category": by_category
}
+338
View File
@@ -0,0 +1,338 @@
"""
RAG 회의록 데이터베이스
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import List, Optional, Dict, Any
from contextlib import contextmanager
import logging
import json
from datetime import datetime
from ..models.minutes import RagMinutes, MinutesSection
from ..utils.embedding import cosine_similarity
logger = logging.getLogger(__name__)
class RagMinutesDB:
"""RAG 회의록 PostgreSQL + pgvector 데이터베이스"""
def __init__(self, connection_string: str):
"""
초기화
Args:
connection_string: PostgreSQL 연결 문자열
"""
self.connection_string = connection_string
@contextmanager
def get_connection(self):
"""데이터베이스 연결 컨텍스트 매니저"""
conn = psycopg2.connect(self.connection_string)
try:
yield conn
finally:
conn.close()
@staticmethod
def _parse_embedding(embedding_str: Optional[str]) -> Optional[List[float]]:
"""
PostgreSQL vector 타입을 Python 리스트로 변환
Args:
embedding_str: PostgreSQL에서 반환된 vector 문자열 (예: "[-0.003,0.01,...]")
Returns:
float 리스트 또는 None
"""
if not embedding_str:
return None
try:
# vector 타입은 "[1,2,3]" 형태의 문자열로 반환됨
if isinstance(embedding_str, str):
return json.loads(embedding_str)
elif isinstance(embedding_str, list):
return embedding_str
return None
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"임베딩 파싱 실패: {str(e)}")
return None
@staticmethod
def _row_to_minutes(row: Dict[str, Any]) -> RagMinutes:
"""
데이터베이스 row를 RagMinutes 객체로 변환
Args:
row: 데이터베이스 row (dict)
Returns:
RagMinutes 객체
"""
minutes_dict = dict(row)
# embedding 필드 파싱
if "embedding" in minutes_dict:
minutes_dict["embedding"] = RagMinutesDB._parse_embedding(minutes_dict["embedding"])
# sections 필드 파싱
if "sections" in minutes_dict and minutes_dict["sections"]:
sections_data = minutes_dict["sections"]
if isinstance(sections_data, str):
sections_data = json.loads(sections_data)
minutes_dict["sections"] = [MinutesSection(**section) for section in sections_data]
else:
minutes_dict["sections"] = []
# datetime 필드를 문자열로 변환
for field in ['scheduled_at', 'finalized_at', 'created_at', 'updated_at']:
if field in minutes_dict and minutes_dict[field]:
if isinstance(minutes_dict[field], datetime):
minutes_dict[field] = minutes_dict[field].isoformat()
return RagMinutes(**minutes_dict)
def insert_minutes(self, minutes: RagMinutes) -> bool:
"""
회의록 삽입 또는 업데이트
Args:
minutes: 회의록 객체
Returns:
성공 여부
"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
# sections를 JSON으로 변환
sections_json = [section.dict() for section in minutes.sections]
cur.execute("""
INSERT INTO rag_minutes (
meeting_id, title, purpose, description, scheduled_at,
location, organizer_id, minutes_id, minutes_status,
minutes_version, created_by, finalized_by, finalized_at,
sections, full_content, embedding
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s::jsonb, %s, %s::vector
)
ON CONFLICT (minutes_id) DO UPDATE SET
meeting_id = EXCLUDED.meeting_id,
title = EXCLUDED.title,
purpose = EXCLUDED.purpose,
description = EXCLUDED.description,
scheduled_at = EXCLUDED.scheduled_at,
location = EXCLUDED.location,
organizer_id = EXCLUDED.organizer_id,
minutes_status = EXCLUDED.minutes_status,
minutes_version = EXCLUDED.minutes_version,
finalized_by = EXCLUDED.finalized_by,
finalized_at = EXCLUDED.finalized_at,
sections = EXCLUDED.sections,
full_content = EXCLUDED.full_content,
embedding = EXCLUDED.embedding,
updated_at = CURRENT_TIMESTAMP
""", (
minutes.meeting_id,
minutes.title,
minutes.purpose,
minutes.description,
minutes.scheduled_at,
minutes.location,
minutes.organizer_id,
minutes.minutes_id,
minutes.minutes_status,
minutes.minutes_version,
minutes.created_by,
minutes.finalized_by,
minutes.finalized_at,
psycopg2.extras.Json(sections_json),
minutes.full_content,
minutes.embedding
))
conn.commit()
logger.info(f"회의록 저장 성공: {minutes.minutes_id}")
return True
except Exception as e:
logger.error(f"회의록 저장 실패 ({minutes.minutes_id}): {str(e)}")
return False
def get_minutes_by_id(self, minutes_id: str) -> Optional[RagMinutes]:
"""
ID로 회의록 조회
Args:
minutes_id: 회의록 ID
Returns:
회의록 객체 또는 None
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT * FROM rag_minutes WHERE minutes_id = %s
""", (minutes_id,))
row = cur.fetchone()
if row:
return self._row_to_minutes(row)
return None
def search_by_vector(
self,
query_embedding: List[float],
top_k: int = 5,
similarity_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""
벡터 유사도 검색
Args:
query_embedding: 쿼리 임베딩 벡터
top_k: 반환할 최대 결과 수
similarity_threshold: 최소 유사도 임계값
Returns:
검색 결과 리스트
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT *,
1 - (embedding <=> %s::vector) as similarity_score
FROM rag_minutes
WHERE embedding IS NOT NULL
AND 1 - (embedding <=> %s::vector) >= %s
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (
query_embedding,
query_embedding,
similarity_threshold,
query_embedding,
top_k
))
results = []
for row in cur.fetchall():
minutes_dict = dict(row)
similarity_score = minutes_dict.pop("similarity_score")
results.append({
"minutes": self._row_to_minutes(minutes_dict),
"similarity_score": float(similarity_score)
})
logger.info(f"벡터 검색 완료: {len(results)}개 결과")
return results
def search_by_keyword(
self,
query: str,
top_k: int = 5
) -> List[Dict[str, Any]]:
"""
키워드 검색
Args:
query: 검색 쿼리
top_k: 반환할 최대 결과 수
Returns:
검색 결과 리스트
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT *,
ts_rank(to_tsvector('simple', full_content), plainto_tsquery('simple', %s)) as rank_score
FROM rag_minutes
WHERE to_tsvector('simple', full_content) @@ plainto_tsquery('simple', %s)
OR title ILIKE %s
ORDER BY rank_score DESC, finalized_at DESC
LIMIT %s
""", (
query,
query,
f"%{query}%",
top_k
))
results = []
for row in cur.fetchall():
minutes_dict = dict(row)
rank_score = minutes_dict.pop("rank_score", 0.0)
results.append({
"minutes": self._row_to_minutes(minutes_dict),
"similarity_score": float(rank_score) if rank_score else 0.0
})
logger.info(f"키워드 검색 완료: {len(results)}개 결과")
return results
def get_stats(self) -> Dict[str, Any]:
"""
통계 조회
Returns:
통계 정보
"""
with self.get_connection() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
# 전체 통계
cur.execute("""
SELECT
COUNT(*) as total_minutes,
COUNT(DISTINCT meeting_id) as total_meetings,
COUNT(DISTINCT created_by) as total_authors
FROM rag_minutes
""")
overall = cur.fetchone()
# 최근 회의록
cur.execute("""
SELECT finalized_at
FROM rag_minutes
WHERE finalized_at IS NOT NULL
ORDER BY finalized_at DESC
LIMIT 1
""")
latest = cur.fetchone()
return {
"total_minutes": overall["total_minutes"],
"total_meetings": overall["total_meetings"],
"total_authors": overall["total_authors"],
"latest_finalized_at": latest["finalized_at"].isoformat() if latest and latest["finalized_at"] else None
}
def delete_minutes(self, minutes_id: str) -> bool:
"""
회의록 삭제
Args:
minutes_id: 회의록 ID
Returns:
성공 여부
"""
try:
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("""
DELETE FROM rag_minutes WHERE minutes_id = %s
""", (minutes_id,))
conn.commit()
logger.info(f"회의록 삭제 성공: {minutes_id}")
return True
except Exception as e:
logger.error(f"회의록 삭제 실패 ({minutes_id}): {str(e)}")
return False
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+137
View File
@@ -0,0 +1,137 @@
"""
관련자료 데이터 모델
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
from uuid import UUID
class DocumentMetadata(BaseModel):
"""문서 메타데이터"""
folder: Optional[str] = Field(None, description="폴더명")
business_domain: Optional[str] = Field(None, description="업무 도메인")
additional_fields: Optional[Dict[str, Any]] = Field(None, description="추가 필드")
class Document(BaseModel):
"""문서 모델"""
document_id: str = Field(..., description="문서 ID")
document_type: str = Field(..., description="문서 타입 (meeting_minutes, org_document 등)")
business_domain: Optional[str] = Field(None, description="업무 도메인")
title: str = Field(..., description="문서 제목")
content: str = Field(..., description="문서 전체 내용")
summary: str = Field(..., description="문서 요약 (3-5 문장)")
keywords: List[str] = Field(default_factory=list, description="키워드 목록")
created_date: Optional[str] = Field(None, description="생성일시")
participants: List[str] = Field(default_factory=list, description="참석자 목록 (회의록의 경우)")
metadata: Optional[DocumentMetadata] = Field(None, description="메타데이터")
embedding: Optional[List[float]] = Field(None, description="임베딩 벡터 (1536차원)")
class Config:
json_schema_extra = {
"example": {
"document_id": "고객-MM-001",
"document_type": "meeting_minutes",
"business_domain": "고객서비스",
"title": "상담 품질 향상 워크샵 1차",
"content": "회의 일시: 2025-10-02...",
"summary": "고객 만족도 지표 검토와 VOC 트렌드 분석을 논의...",
"keywords": ["CSAT", "고객응대", "챗봇"],
"participants": ["김민준", "이미준"]
}
}
class DocumentChunk(BaseModel):
"""문서 청크 (Azure AI Search 인덱싱용)"""
id: str = Field(..., description="청크 ID (document_id_chunk_N)")
document_id: str = Field(..., description="원본 문서 ID")
document_type: str = Field(..., description="문서 타입")
title: str = Field(..., description="문서 제목")
folder: Optional[str] = Field(None, description="폴더명")
created_date: Optional[str] = Field(None, description="생성일시")
participants: List[str] = Field(default_factory=list, description="참석자 목록")
keywords: List[str] = Field(default_factory=list, description="키워드 목록")
agenda_id: Optional[str] = Field(None, description="안건 ID (회의록의 경우)")
agenda_title: Optional[str] = Field(None, description="안건 제목")
chunk_index: int = Field(..., description="청크 인덱스")
content: str = Field(..., description="청크 내용")
content_vector: List[float] = Field(..., description="내용 임베딩 벡터")
token_count: int = Field(..., description="토큰 수")
class DocumentSearchRequest(BaseModel):
"""문서 검색 요청"""
query: str = Field(..., min_length=1, description="검색 쿼리")
top_k: int = Field(3, ge=1, le=10, description="반환할 최대 결과 수")
relevance_threshold: float = Field(0.70, ge=0.0, le=1.0, description="최소 관련도 임계값")
folder: Optional[str] = Field(None, description="폴더 필터 (같은 폴더 우선)")
document_type: Optional[str] = Field(None, description="문서 타입 필터")
business_domain: Optional[str] = Field(None, description="업무 도메인 필터")
semantic_ranking: bool = Field(True, description="Semantic Ranking 사용 여부")
class Config:
json_schema_extra = {
"example": {
"query": "고객 만족도 개선 방안",
"top_k": 3,
"relevance_threshold": 0.70,
"folder": "고객서비스팀",
"semantic_ranking": True
}
}
class DocumentSearchResult(BaseModel):
"""문서 검색 결과"""
document_id: str
title: str
document_type: str
created_date: Optional[str]
relevance_score: float = Field(..., ge=0.0, le=1.0)
relevance_level: str = Field(..., description="HIGH (>90%), MEDIUM (70-90%), LOW (<70%)")
content_excerpt: str = Field(..., description="관련 내용 발췌")
folder: Optional[str] = None
class RelatedMeetingRequest(BaseModel):
"""관련 회의록 검색 요청"""
meeting_id: str = Field(..., description="현재 회의 ID")
top_k: int = Field(3, ge=1, le=5, description="반환할 최대 결과 수")
relevance_threshold: float = Field(0.70, ge=0.0, le=1.0, description="최소 관련도 임계값")
class RelatedMeeting(BaseModel):
"""관련 회의록"""
meeting_id: str
title: str
meeting_date: Optional[str]
relevance_score: float = Field(..., ge=0.0, le=1.0)
relevance_level: str = Field(..., description="HIGH, MEDIUM, LOW")
similar_content_summary: Optional[str] = Field(None, description="유사 내용 요약 (3문장)")
url: str = Field(..., description="회의록 URL")
class DocumentSummarizeRequest(BaseModel):
"""문서 요약 요청"""
document_id: str = Field(..., description="문서 ID")
current_meeting_id: Optional[str] = Field(None, description="현재 회의 ID (비교용)")
summary_type: str = Field("similar_content", description="요약 타입 (similar_content, full)")
class DocumentSummary(BaseModel):
"""문서 요약"""
document_id: str
summary: str = Field(..., description="요약 내용")
generated_by: str = Field("claude-3-5-sonnet", description="생성 모델")
tokens_used: int = Field(..., description="사용된 토큰 수")
cached: bool = Field(False, description="캐시 여부")
class DocumentStats(BaseModel):
"""문서 통계"""
total_documents: int = Field(..., description="전체 문서 수")
by_type: Dict[str, int] = Field(..., description="타입별 문서 수")
by_domain: Dict[str, int] = Field(..., description="도메인별 문서 수")
total_chunks: int = Field(..., description="전체 청크 수")
+108
View File
@@ -0,0 +1,108 @@
"""
회의록 데이터 모델
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
class MinutesSection(BaseModel):
"""회의록 섹션"""
section_id: str = Field(..., description="섹션 ID")
type: str = Field(..., description="섹션 타입")
title: str = Field(..., description="섹션 제목")
content: Optional[str] = Field(None, description="섹션 내용")
order: int = Field(0, description="순서")
verified: bool = Field(False, description="검증 여부")
class RagMinutes(BaseModel):
"""RAG 회의록 모델"""
# Meeting 정보
meeting_id: str = Field(..., description="회의 ID")
title: str = Field(..., description="회의 제목")
purpose: Optional[str] = Field(None, description="회의 목적")
description: Optional[str] = Field(None, description="회의 설명")
scheduled_at: Optional[str] = Field(None, description="예약 일시")
location: Optional[str] = Field(None, description="장소")
organizer_id: str = Field(..., description="주최자 ID")
# Minutes 정보
minutes_id: str = Field(..., description="회의록 ID")
minutes_status: str = Field(..., description="회의록 상태")
minutes_version: int = Field(..., description="회의록 버전")
created_by: str = Field(..., description="작성자")
finalized_by: Optional[str] = Field(None, description="확정자")
finalized_at: Optional[str] = Field(None, description="확정 일시")
# 회의록 섹션 (JSON)
sections: List[MinutesSection] = Field(default_factory=list, description="회의록 섹션 목록")
# 전체 회의록 내용 (검색용 텍스트)
full_content: str = Field(..., description="전체 회의록 내용")
# Embedding
embedding: Optional[List[float]] = Field(None, description="임베딩 벡터 (1536차원)")
# 메타데이터
created_at: Optional[str] = Field(None, description="생성 일시")
updated_at: Optional[str] = Field(None, description="수정 일시")
class Config:
json_schema_extra = {
"example": {
"meeting_id": "MTG-2025-001",
"title": "2025 Q1 마케팅 전략 회의",
"purpose": "2025년 1분기 마케팅 전략 수립",
"minutes_id": "MIN-2025-001",
"minutes_status": "FINALIZED",
"minutes_version": 1,
"created_by": "user@example.com",
"organizer_id": "organizer@example.com",
"sections": [
{
"section_id": "SEC-001",
"type": "DISCUSSION",
"title": "시장 분석",
"content": "2025년 시장 동향 분석...",
"order": 1,
"verified": True
}
],
"full_content": "2025 Q1 마케팅 전략 회의..."
}
}
class MinutesSearchRequest(BaseModel):
"""회의록 검색 요청"""
query: str = Field(..., min_length=1, description="검색 쿼리 (회의록 내용)")
top_k: int = Field(5, ge=1, le=20, description="반환할 최대 결과 수")
similarity_threshold: float = Field(0.7, ge=0.0, le=1.0, description="최소 유사도 임계값")
class Config:
json_schema_extra = {
"example": {
"query": "마케팅 전략 수립",
"top_k": 5,
"similarity_threshold": 0.7
}
}
class MinutesSearchResult(BaseModel):
"""회의록 검색 결과"""
minutes: RagMinutes
similarity_score: float = Field(..., ge=0.0, le=1.0, description="유사도 점수")
class Config:
json_schema_extra = {
"example": {
"minutes": {
"meeting_id": "MTG-2025-001",
"title": "2025 Q1 마케팅 전략 회의",
"minutes_id": "MIN-2025-001"
},
"similarity_score": 0.92
}
}
+97
View File
@@ -0,0 +1,97 @@
"""
용어집 데이터 모델
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
class DocumentSource(BaseModel):
"""문서 출처 정보"""
type: str = Field(..., description="문서 타입 (업무매뉴얼, 정책 및 규정 등)")
title: str = Field(..., description="문서 제목")
url: Optional[str] = Field(None, description="문서 URL")
excerpt: Optional[str] = Field(None, description="문서 발췌")
class Term(BaseModel):
"""용어 모델"""
term_id: str = Field(..., description="용어 ID")
term_name: str = Field(..., description="용어명")
normalized_name: str = Field(..., description="정규화된 용어명 (소문자, 공백 제거)")
category: str = Field(..., description="카테고리")
definition: str = Field(..., description="용어 정의")
context: Optional[str] = Field(None, description="회사 내 사용 맥락")
synonyms: List[str] = Field(default_factory=list, description="동의어 목록")
related_terms: List[str] = Field(default_factory=list, description="관련 용어 목록")
document_source: Optional[DocumentSource] = Field(None, description="출처 문서")
confidence_score: float = Field(0.0, ge=0.0, le=1.0, description="신뢰도 점수")
usage_count: int = Field(0, ge=0, description="사용 횟수")
last_updated: Optional[str] = Field(None, description="마지막 업데이트 일시")
embedding: Optional[List[float]] = Field(None, description="임베딩 벡터 (1536차원)")
class Config:
json_schema_extra = {
"example": {
"term_id": "cs_int_001",
"term_name": "VoC (Voice of Customer)",
"normalized_name": "voc voice of customer",
"category": "고객서비스-분석",
"definition": "고객이 상품이나 서비스를 이용하면서 느낀 경험을 수집하고 분석하는 활동",
"context": "당사의 VoC 관리 시스템은 모든 채널에서 수집된 의견을 통합 분석합니다.",
"synonyms": ["고객의소리", "Customer Voice"],
"related_terms": ["CS", "CRM"],
"confidence_score": 0.95,
"usage_count": 247
}
}
class TermSearchRequest(BaseModel):
"""용어 검색 요청"""
query: str = Field(..., min_length=1, description="검색 쿼리")
top_k: int = Field(5, ge=1, le=20, description="반환할 최대 결과 수")
confidence_threshold: float = Field(0.7, ge=0.0, le=1.0, description="최소 신뢰도 임계값")
search_type: str = Field("hybrid", description="검색 타입 (keyword, vector, hybrid)")
class Config:
json_schema_extra = {
"example": {
"query": "고객 만족도 조사",
"top_k": 5,
"confidence_threshold": 0.7,
"search_type": "hybrid"
}
}
class TermSearchResult(BaseModel):
"""용어 검색 결과"""
term: Term
relevance_score: float = Field(..., ge=0.0, le=1.0, description="관련도 점수")
match_type: str = Field(..., description="매칭 타입 (keyword, vector, hybrid)")
class TermExplainRequest(BaseModel):
"""용어 설명 요청"""
term_id: str = Field(..., description="용어 ID")
meeting_context: Optional[str] = Field(None, description="회의 맥락")
max_context_docs: int = Field(3, ge=1, le=10, description="최대 참고 문서 수")
class TermExplanation(BaseModel):
"""용어 설명"""
term: Term
explanation: str = Field(..., description="맥락 기반 설명")
context_documents: List[Dict[str, Any]] = Field(default_factory=list, description="참고 문서")
generated_by: str = Field("claude-3-5-sonnet", description="생성 모델")
cached: bool = Field(False, description="캐시 여부")
class TermStats(BaseModel):
"""용어 통계"""
total_terms: int = Field(..., description="전체 용어 수")
by_category: Dict[str, int] = Field(..., description="카테고리별 용어 수")
by_source_type: Dict[str, int] = Field(..., description="출처 타입별 용어 수")
avg_confidence: float = Field(..., description="평균 신뢰도")
View File
+210
View File
@@ -0,0 +1,210 @@
"""
Claude AI 연동 서비스
"""
from anthropic import Anthropic
from typing import Dict, Any, Optional
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
class ClaudeService:
"""Claude AI 서비스"""
def __init__(
self,
api_key: str,
model: str = "claude-3-5-sonnet-20241022",
max_tokens: int = 1024,
temperature: float = 0.3
):
"""
초기화
Args:
api_key: Claude API 키
model: 모델명
max_tokens: 최대 토큰 수
temperature: 온도
"""
self.client = Anthropic(api_key=api_key)
self.model = model
self.max_tokens = max_tokens
self.temperature = temperature
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def explain_term(
self,
term_name: str,
definition: str,
context: Optional[str],
meeting_context: Optional[str] = None,
related_docs: Optional[str] = None
) -> Dict[str, Any]:
"""
용어 설명 생성
Args:
term_name: 용어명
definition: 용어 정의
context: 회사 내 사용 맥락
meeting_context: 회의 맥락
related_docs: 관련 문서
Returns:
설명 결과
"""
# 시스템 프롬프트
system_prompt = (
"당신은 전문 용어를 회의 맥락에 맞춰 설명하는 AI 어시스턴트입니다. "
"2-3문장으로 간결하게 설명하세요."
)
# 사용자 프롬프트
user_prompt = f"용어: {term_name}\n\n"
user_prompt += f"정의: {definition}\n\n"
if context:
user_prompt += f"회사 내 사용 맥락: {context}\n\n"
if meeting_context:
user_prompt += f"회의 맥락: {meeting_context}\n\n"
if related_docs:
user_prompt += f"관련 문서:\n{related_docs}\n\n"
user_prompt += (
"위 정보를 바탕으로 이 용어를 2-3문장으로 간결하게 설명해주세요. "
"회의 맥락이 있다면 회의와 연관지어 설명하세요."
)
try:
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
temperature=self.temperature,
system=system_prompt,
messages=[
{"role": "user", "content": user_prompt}
]
)
explanation = response.content[0].text
tokens_used = response.usage.input_tokens + response.usage.output_tokens
return {
"explanation": explanation,
"generated_by": self.model,
"tokens_used": tokens_used,
"cached": False
}
except Exception as e:
logger.error(f"Claude API 호출 실패: {str(e)}")
# Fallback: 기본 설명 반환
fallback_explanation = f"{definition}"
if context:
fallback_explanation += f"\n\n{context}"
return {
"explanation": fallback_explanation,
"generated_by": "fallback",
"tokens_used": 0,
"cached": False,
"error": str(e)
}
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def summarize_similar_content(
self,
current_meeting_title: str,
current_meeting_date: str,
current_meeting_agendas: str,
past_meeting_title: str,
past_meeting_date: str,
past_meeting_content: str
) -> Dict[str, Any]:
"""
관련 회의록 유사 내용 요약 생성
Args:
current_meeting_title: 현재 회의 제목
current_meeting_date: 현재 회의 날짜
current_meeting_agendas: 현재 회의 안건
past_meeting_title: 과거 회의 제목
past_meeting_date: 과거 회의 날짜
past_meeting_content: 과거 회의 내용
Returns:
요약 결과
"""
# 시스템 프롬프트
system_prompt = (
"당신은 회의록 분석 전문가입니다. "
"두 회의록을 비교하여 유사한 내용을 정확하게 추출하고 간결하게 요약합니다.\n\n"
"중요한 원칙:\n"
"1. 과거 회의록에서 실제로 다뤄진 내용만 포함하세요\n"
"2. 환각(Hallucination)을 절대 생성하지 마세요\n"
"3. 구체적인 날짜, 수치, 결정사항을 포함하세요\n"
"4. 정확히 3문장으로 요약하세요"
)
# 사용자 프롬프트
user_prompt = f"""아래 두 회의록을 비교하여 유사한 내용을 정확히 3문장으로 요약해주세요.
## 현재 회의
제목: {current_meeting_title}
날짜: {current_meeting_date}
안건:
{current_meeting_agendas}
## 과거 회의
제목: {past_meeting_title}
날짜: {past_meeting_date}
내용:
{past_meeting_content}
## 요구사항
1. 두 회의에서 공통적으로 논의된 주제나 결정사항을 찾아주세요
2. 정확히 3문장으로 요약하세요 (각 문장은 한 문단)
3. 구체적인 내용을 포함해주세요 (예: 날짜, 수치, 결정사항)
4. 과거 회의에서 실제로 다뤄진 내용만 포함해주세요 (환각 금지)
"""
try:
response = self.client.messages.create(
model=self.model,
max_tokens=self.max_tokens,
temperature=self.temperature,
system=system_prompt,
messages=[
{"role": "user", "content": user_prompt}
]
)
summary = response.content[0].text
tokens_used = response.usage.input_tokens + response.usage.output_tokens
return {
"summary": summary,
"generated_by": self.model,
"tokens_used": tokens_used,
"cached": False
}
except Exception as e:
logger.error(f"Claude API 호출 실패: {str(e)}")
return {
"summary": None,
"generated_by": "fallback",
"tokens_used": 0,
"cached": False,
"error": str(e)
}
+335
View File
@@ -0,0 +1,335 @@
"""
Azure Event Hub Consumer 서비스
회의록 확정 이벤트를 consume하여 RAG 저장소에 저장
"""
import asyncio
import json
import logging
from typing import Dict, Any, Optional, Union, List
from datetime import datetime
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
from ..models.minutes import RagMinutes, MinutesSection
from ..db.rag_minutes_db import RagMinutesDB
from ..utils.embedding import EmbeddingGenerator
logger = logging.getLogger(__name__)
class EventHubConsumer:
"""Event Hub Consumer 서비스"""
def __init__(
self,
connection_string: str,
eventhub_name: str,
consumer_group: str,
storage_connection_string: str,
storage_container_name: str,
rag_minutes_db: RagMinutesDB,
embedding_gen: EmbeddingGenerator
):
"""
초기화
Args:
connection_string: Event Hub 연결 문자열
eventhub_name: Event Hub 이름
consumer_group: Consumer Group 이름
storage_connection_string: Azure Storage 연결 문자열
storage_container_name: Checkpoint 저장 컨테이너 이름
rag_minutes_db: RAG Minutes 데이터베이스
embedding_gen: Embedding 생성기
"""
self.connection_string = connection_string
self.eventhub_name = eventhub_name
self.consumer_group = consumer_group
self.storage_connection_string = storage_connection_string
self.storage_container_name = storage_container_name
self.rag_minutes_db = rag_minutes_db
self.embedding_gen = embedding_gen
self.client: Optional[EventHubConsumerClient] = None
self.is_running = False
async def start(self):
"""Consumer 시작"""
try:
# Checkpoint Store 생성
checkpoint_store = BlobCheckpointStore.from_connection_string(
self.storage_connection_string,
self.storage_container_name
)
# Event Hub Consumer Client 생성
self.client = EventHubConsumerClient.from_connection_string(
self.connection_string,
consumer_group=self.consumer_group,
eventhub_name=self.eventhub_name,
checkpoint_store=checkpoint_store
)
self.is_running = True
logger.info("Event Hub Consumer 시작")
# 이벤트 수신 시작
async with self.client:
await self.client.receive(
on_event=self._on_event,
on_error=self._on_error,
starting_position="-1" # 처음부터 읽기
)
except Exception as e:
logger.error(f"Event Hub Consumer 시작 실패: {str(e)}")
self.is_running = False
raise
async def stop(self):
"""Consumer 중지"""
self.is_running = False
if self.client:
await self.client.close()
logger.info("Event Hub Consumer 중지")
async def _on_event(self, partition_context, event):
"""
이벤트 수신 핸들러
Args:
partition_context: 파티션 컨텍스트
event: Event Hub 이벤트
"""
try:
# 이벤트 데이터 파싱
event_body = event.body_as_str()
event_data = json.loads(event_body)
logger.info(f"이벤트 수신: {event_data.get('eventType', 'unknown')}")
logger.info(f"이벤트 수신: {event_data.get('data', 'unknown')}")
# 회의록 확정 이벤트 처리
if event_data.get("eventType") == "MINUTES_FINALIZED":
await self._process_minutes_event(event_data)
# Checkpoint 업데이트
await partition_context.update_checkpoint(event)
except json.JSONDecodeError as e:
logger.error(f"이벤트 파싱 실패: {str(e)}")
except Exception as e:
logger.error(f"이벤트 처리 실패: {str(e)}")
async def _on_error(self, partition_context, error):
"""
에러 핸들러
Args:
partition_context: 파티션 컨텍스트
error: 에러 객체
"""
logger.error(f"Event Hub 에러 (Partition {partition_context.partition_id}): {str(error)}")
def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]:
"""
Java LocalDateTime 배열을 ISO 8601 문자열로 변환
Java의 Jackson이 LocalDateTime을 배열 형식으로 직렬화할 때 사용
배열 형식: [년, 월, 일, 시, 분, 초, 나노초]
Args:
value: datetime 값 (str, list, None)
Returns:
ISO 8601 형식 문자열 또는 None
Examples:
>>> _convert_datetime_array_to_string([2025, 11, 1, 13, 55, 54, 388000000])
"2025-11-01T13:55:54.388000"
>>> _convert_datetime_array_to_string("2025-11-01T13:55:54.388")
"2025-11-01T13:55:54.388"
>>> _convert_datetime_array_to_string(None)
None
"""
if value is None:
return None
# 이미 문자열이면 그대로 반환
if isinstance(value, str):
return value
# 배열 형식 [년, 월, 일, 시, 분, 초, 나노초]
if isinstance(value, list) and len(value) >= 6:
try:
year, month, day, hour, minute, second = value[:6]
# 나노초를 마이크로초로 변환 (Python datetime은 마이크로초 사용)
microsecond = value[6] // 1000 if len(value) > 6 else 0
dt = datetime(year, month, day, hour, minute, second, microsecond)
return dt.isoformat()
except (ValueError, TypeError) as e:
logger.warning(f"날짜 배열 변환 실패: {value}, 에러: {str(e)}")
return None
logger.warning(f"지원하지 않는 날짜 형식: {type(value)}, 값: {value}")
return None
async def _process_minutes_event(self, event_data: Dict[str, Any]):
"""
회의록 확정 이벤트 처리
Args:
event_data: 이벤트 데이터
"""
try:
# 회의록 데이터 추출
minutes_data = event_data.get("data", {})
# Meeting 정보
meeting_id = minutes_data.get("meetingId")
title = minutes_data.get("title")
purpose = minutes_data.get("purpose")
description = minutes_data.get("description")
# Java LocalDateTime 배열을 문자열로 변환
scheduled_at = self._convert_datetime_array_to_string(
minutes_data.get("scheduledAt")
)
location = minutes_data.get("location")
organizer_id = minutes_data.get("organizerId")
# Minutes 정보
minutes_id = minutes_data.get("minutesId")
minutes_status = minutes_data.get("status", "FINALIZED")
minutes_version = minutes_data.get("version", 1)
created_by = minutes_data.get("createdBy")
finalized_by = minutes_data.get("finalizedBy")
# Java LocalDateTime 배열을 문자열로 변환
finalized_at = self._convert_datetime_array_to_string(
minutes_data.get("finalizedAt")
)
# Sections 정보
sections_data = minutes_data.get("sections", [])
sections = [
MinutesSection(
section_id=section.get("sectionId"),
type=section.get("type"),
title=section.get("title"),
content=section.get("content", ""),
order=section.get("order", 0),
verified=section.get("verified", False)
)
for section in sections_data
]
# 전체 회의록 내용 생성 (검색용)
full_content = self._generate_full_content(title, purpose, sections)
logger.info(f"회의록 내용 생성 완료: {len(full_content)} 글자")
# Embedding 생성
logger.info(f"Embedding 생성 시작: {minutes_id}")
embedding = self.embedding_gen.generate_embedding(full_content)
logger.info(f"Embedding 생성 완료: {len(embedding)} 차원")
# RagMinutes 객체 생성
rag_minutes = RagMinutes(
meeting_id=meeting_id,
title=title,
purpose=purpose,
description=description,
scheduled_at=scheduled_at,
location=location,
organizer_id=organizer_id,
minutes_id=minutes_id,
minutes_status=minutes_status,
minutes_version=minutes_version,
created_by=created_by,
finalized_by=finalized_by,
finalized_at=finalized_at,
sections=sections,
full_content=full_content,
embedding=embedding,
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat()
)
# 데이터베이스에 저장
success = self.rag_minutes_db.insert_minutes(rag_minutes)
if success:
logger.info(f"회의록 RAG 저장 성공: {minutes_id}")
else:
logger.error(f"회의록 RAG 저장 실패: {minutes_id}")
except Exception as e:
logger.error(f"회의록 이벤트 처리 실패: {str(e)}")
raise
def _generate_full_content(self, title: str, purpose: Optional[str], sections: list) -> str:
"""
전체 회의록 내용 생성 (검색용 텍스트)
Args:
title: 회의 제목
purpose: 회의 목적
sections: 회의록 섹션 목록
Returns:
전체 회의록 내용
"""
content_parts = []
# 제목
if title:
content_parts.append(f"제목: {title}")
# 목적
if purpose:
content_parts.append(f"목적: {purpose}")
# 섹션별 내용
for section in sections:
if section.content:
content_parts.append(f"\n[{section.title}]\n{section.content}")
return "\n\n".join(content_parts)
async def start_consumer(
config: Dict[str, Any],
rag_minutes_db: RagMinutesDB,
embedding_gen: EmbeddingGenerator
):
"""
Event Hub Consumer 시작 (비동기)
Args:
config: 설정 딕셔너리
rag_minutes_db: RAG Minutes 데이터베이스
embedding_gen: Embedding 생성기
"""
eventhub_config = config["eventhub"]
consumer = EventHubConsumer(
connection_string=eventhub_config["connection_string"],
eventhub_name=eventhub_config["name"],
consumer_group=eventhub_config["consumer_group"],
storage_connection_string=eventhub_config["storage"]["connection_string"],
storage_container_name=eventhub_config["storage"]["container_name"],
rag_minutes_db=rag_minutes_db,
embedding_gen=embedding_gen
)
try:
await consumer.start()
except KeyboardInterrupt:
logger.info("Consumer 종료 신호 수신")
await consumer.stop()
except Exception as e:
logger.error(f"Consumer 실행 중 에러: {str(e)}")
await consumer.stop()
raise
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
+119
View File
@@ -0,0 +1,119 @@
"""
설정 관리 유틸리티
"""
import os
import yaml
from typing import Any, Dict
from pathlib import Path
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""애플리케이션 설정"""
# PostgreSQL
POSTGRES_HOST: str = "localhost"
POSTGRES_PORT: int = 5432
POSTGRES_DATABASE: str = "meeting_db"
POSTGRES_USER: str = "postgres"
POSTGRES_PASSWORD: str = ""
# Azure OpenAI
AZURE_OPENAI_API_KEY: str = ""
AZURE_OPENAI_ENDPOINT: str = ""
# Azure AI Search
AZURE_SEARCH_ENDPOINT: str = ""
AZURE_SEARCH_API_KEY: str = ""
# Claude AI
CLAUDE_API_KEY: str = ""
# Redis
REDIS_PASSWORD: str = ""
# Azure Event Hub
EVENTHUB_CONNECTION_STRING: str = ""
EVENTHUB_NAME: str = ""
AZURE_EVENTHUB_CONSUMER_GROUP: str = "$Default"
AZURE_STORAGE_CONNECTION_STRING: str = ""
AZURE_STORAGE_CONTAINER_NAME: str = ""
class Config:
# rag 디렉토리 기준으로 .env 파일 경로 설정
env_file = str(Path(__file__).parent.parent.parent / ".env")
case_sensitive = True
def load_config(config_path: str = "config.yaml") -> Dict[str, Any]:
"""
설정 파일 로딩
Args:
config_path: 설정 파일 경로
Returns:
설정 딕셔너리
"""
# 환경변수 로딩
settings = Settings()
# YAML 파일 로딩
config_file = Path(config_path)
if not config_file.exists():
raise FileNotFoundError(f"설정 파일을 찾을 수 없습니다: {config_path}")
with open(config_file, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
# 환경변수로 대체
def replace_env_vars(obj: Any) -> Any:
"""재귀적으로 환경변수 치환"""
if isinstance(obj, dict):
return {k: replace_env_vars(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [replace_env_vars(item) for item in obj]
elif isinstance(obj, str) and obj.startswith("${") and obj.endswith("}"):
env_var = obj[2:-1]
return getattr(settings, env_var, "")
return obj
config = replace_env_vars(config)
return config
def get_database_url(config: Dict[str, Any]) -> str:
"""
PostgreSQL 데이터베이스 URL 생성
Args:
config: 설정 딕셔너리
Returns:
데이터베이스 URL
"""
pg = config["postgres"]
return (
f"postgresql://{pg['user']}:{pg['password']}"
f"@{pg['host']}:{pg['port']}/{pg['database']}"
)
def get_redis_url(config: Dict[str, Any]) -> str:
"""
Redis URL 생성
Args:
config: 설정 딕셔너리
Returns:
Redis URL
"""
redis = config["redis"]
password = redis.get("password", "")
if password:
return f"redis://:{password}@{redis['host']}:{redis['port']}/{redis['db']}"
else:
return f"redis://{redis['host']}:{redis['port']}/{redis['db']}"
+180
View File
@@ -0,0 +1,180 @@
"""
임베딩 생성 유틸리티
"""
import openai
from typing import List, Union
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
class EmbeddingGenerator:
"""OpenAI Embedding 생성기"""
def __init__(
self,
api_key: str,
endpoint: str = None,
model: str = "text-embedding-ada-002",
dimension: int = 1536,
api_version: str = None
):
"""
초기화
Args:
api_key: OpenAI API 키
endpoint: 엔드포인트 (선택사항, Azure 전용)
model: 임베딩 모델명
dimension: 임베딩 차원
api_version: API 버전 (선택사항, Azure 전용)
"""
# Azure OpenAI 또는 일반 OpenAI 자동 선택
if endpoint and "azure" in endpoint.lower():
# Azure OpenAI 사용
self.client = openai.AzureOpenAI(
api_key=api_key,
azure_endpoint=endpoint,
api_version=api_version
)
else:
# 일반 OpenAI 사용
self.client = openai.OpenAI(
api_key=api_key
)
self.model = model
self.dimension = dimension
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def generate_embedding(self, text: str) -> List[float]:
"""
단일 텍스트의 임베딩 생성
Args:
text: 입력 텍스트
Returns:
임베딩 벡터 (1536차원)
"""
try:
response = self.client.embeddings.create(
model=self.model,
input=text
)
embedding = response.data[0].embedding
# 차원 검증
if len(embedding) != self.dimension:
raise ValueError(
f"임베딩 차원 불일치: 예상 {self.dimension}, 실제 {len(embedding)}"
)
return embedding
except Exception as e:
logger.error(f"임베딩 생성 실패: {str(e)}")
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def generate_embeddings_batch(
self,
texts: List[str],
batch_size: int = 50
) -> List[List[float]]:
"""
배치 텍스트의 임베딩 생성
Args:
texts: 입력 텍스트 리스트
batch_size: 배치 크기 (최대 50)
Returns:
임베딩 벡터 리스트
"""
if not texts:
return []
all_embeddings = []
# 배치 단위로 처리
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
try:
response = self.client.embeddings.create(
model=self.model,
input=batch
)
batch_embeddings = [item.embedding for item in response.data]
# 차원 검증
for embedding in batch_embeddings:
if len(embedding) != self.dimension:
raise ValueError(
f"임베딩 차원 불일치: 예상 {self.dimension}, 실제 {len(embedding)}"
)
all_embeddings.extend(batch_embeddings)
logger.info(f"배치 {i//batch_size + 1}: {len(batch)}개 임베딩 생성 완료")
except Exception as e:
logger.error(f"배치 임베딩 생성 실패: {str(e)}")
raise
return all_embeddings
def get_token_count(self, text: str) -> int:
"""
텍스트의 토큰 수 계산 (근사치)
Args:
text: 입력 텍스트
Returns:
토큰 수
"""
# 간단한 추정: 한글은 1글자당 약 1.5 토큰, 영어는 0.75 토큰
korean_chars = sum(1 for c in text if ord(c) >= 0xAC00 and ord(c) <= 0xD7A3)
other_chars = len(text) - korean_chars
estimated_tokens = int(korean_chars * 1.5 + other_chars * 0.75)
return estimated_tokens
def cosine_similarity(vec1: List[float], vec2: List[float]) -> float:
"""
코사인 유사도 계산
Args:
vec1: 벡터 1
vec2: 벡터 2
Returns:
코사인 유사도 (0.0 ~ 1.0)
"""
import numpy as np
vec1_np = np.array(vec1)
vec2_np = np.array(vec2)
dot_product = np.dot(vec1_np, vec2_np)
norm1 = np.linalg.norm(vec1_np)
norm2 = np.linalg.norm(vec2_np)
if norm1 == 0 or norm2 == 0:
return 0.0
similarity = dot_product / (norm1 * norm2)
# -1 ~ 1 범위를 0 ~ 1로 변환
return (similarity + 1) / 2
+74
View File
@@ -0,0 +1,74 @@
"""
텍스트 처리 유틸리티 모듈
"""
from typing import List
import logging
from kiwipiepy import Kiwi
logger = logging.getLogger(__name__)
# Kiwi 인스턴스 (싱글톤)
_kiwi = None
def get_kiwi():
"""Kiwi 형태소 분석기 인스턴스 반환"""
global _kiwi
if _kiwi is None:
_kiwi = Kiwi()
logger.info("Kiwi 형태소 분석기 초기화 완료")
return _kiwi
def extract_nouns(text: str) -> List[str]:
"""
텍스트에서 명사 추출
Args:
text: 입력 텍스트
Returns:
추출된 명사 리스트
"""
if not text or not text.strip():
return []
try:
kiwi = get_kiwi()
# 형태소 분석
result = kiwi.analyze(text)
# 명사 추출 (NNG: 일반명사, NNP: 고유명사, SL: 외국어, SH: 한자, SN: 숫자)
nouns = []
for token, pos, _, _ in result[0][0]:
if pos in ['NNG', 'NNP', 'SL', 'SH', 'SN']:
nouns.append(token)
logger.debug(f"원본 텍스트: {text}")
logger.debug(f"추출된 명사: {nouns}")
return nouns
except Exception as e:
logger.error(f"명사 추출 실패: {str(e)}")
# 오류 발생 시 원본 텍스트를 공백으로 분리하여 반환
return text.split()
def extract_nouns_as_query(text: str) -> str:
"""
텍스트에서 명사를 추출하여 검색 쿼리로 변환
Args:
text: 입력 텍스트
Returns:
공백으로 연결된 명사 문자열
"""
nouns = extract_nouns(text)
query = ' '.join(nouns)
logger.info(f"Query 변환: '{text}''{query}'")
return query if query else text