hgzero/rag/scripts/load_documents.py
2025-10-29 05:54:08 +09:00

247 lines
7.8 KiB
Python

"""
관련자료 데이터 로딩 스크립트
"""
import sys
import json
import logging
from pathlib import Path
from typing import List
from datetime import datetime
# 프로젝트 루트 디렉토리를 Python 경로에 추가
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from src.models.document import Document, DocumentChunk, DocumentMetadata
from src.db.azure_search import AzureAISearchDB
from src.utils.config import load_config
from src.utils.embedding import EmbeddingGenerator
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_documents_from_json(file_path: Path) -> List[Document]:
"""
JSON 파일에서 문서 데이터 로드
Args:
file_path: JSON 파일 경로
Returns:
문서 리스트
"""
logger.info(f"JSON 파일 로딩: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
documents = []
# 업무 도메인별 데이터 처리
for domain, doc_types in data.get("sample_data", {}).items():
for doc_type, docs in doc_types.items():
for doc_data in docs:
# Metadata 파싱
metadata = None
if "metadata" in doc_data:
metadata = DocumentMetadata(**doc_data["metadata"])
# Document 객체 생성
doc = Document(
document_id=doc_data["document_id"],
document_type=doc_data["document_type"],
business_domain=doc_data.get("business_domain"),
title=doc_data["title"],
content=doc_data["content"],
summary=doc_data["summary"],
keywords=doc_data.get("keywords", []),
created_date=doc_data.get("created_date"),
participants=doc_data.get("participants", []),
metadata=metadata,
embedding=None # 나중에 생성
)
documents.append(doc)
logger.info(f"{len(documents)}개 문서 로드 완료")
return documents
def create_chunks(
document: Document,
embedding_gen: EmbeddingGenerator,
max_tokens: int = 2000
) -> List[DocumentChunk]:
"""
문서를 청크로 분할 및 임베딩 생성
Args:
document: 문서
embedding_gen: 임베딩 생성기
max_tokens: 최대 토큰 수
Returns:
문서 청크 리스트
"""
chunks = []
# 전체 문서를 하나의 청크로 처리 (간단한 구현)
# 실제로는 안건 단위로 분할해야 함
content = document.content
token_count = embedding_gen.get_token_count(content)
if token_count > max_tokens:
# 간단한 분할: 문단 단위
paragraphs = content.split("\n\n")
current_chunk = ""
chunk_index = 0
for para in paragraphs:
test_chunk = current_chunk + "\n\n" + para if current_chunk else para
if embedding_gen.get_token_count(test_chunk) > max_tokens:
# 현재 청크 저장
if current_chunk:
chunks.append({
"content": current_chunk,
"chunk_index": chunk_index
})
chunk_index += 1
current_chunk = para
else:
current_chunk = test_chunk
# 마지막 청크 저장
if current_chunk:
chunks.append({
"content": current_chunk,
"chunk_index": chunk_index
})
else:
# 토큰 수가 적으면 하나의 청크로
chunks.append({
"content": content,
"chunk_index": 0
})
# 임베딩 생성
chunk_texts = [chunk["content"] for chunk in chunks]
embeddings = embedding_gen.generate_embeddings_batch(chunk_texts)
# DocumentChunk 객체 생성
document_chunks = []
for chunk_data, embedding in zip(chunks, embeddings):
chunk = DocumentChunk(
id=f"{document.document_id}_chunk_{chunk_data['chunk_index']}",
document_id=document.document_id,
document_type=document.document_type,
title=document.title,
folder=document.metadata.folder if document.metadata else None,
created_date=document.created_date,
participants=document.participants,
keywords=document.keywords,
agenda_id=None, # 간단한 구현에서는 None
agenda_title=None,
chunk_index=chunk_data["chunk_index"],
content=chunk_data["content"],
content_vector=embedding,
token_count=embedding_gen.get_token_count(chunk_data["content"])
)
document_chunks.append(chunk)
return document_chunks
def main():
"""메인 함수"""
logger.info("=" * 60)
logger.info("관련자료 데이터 로딩 시작")
logger.info("=" * 60)
# 1. 설정 로드
config = load_config(str(project_root / "config.yaml"))
logger.info("✓ 설정 로드 완료")
# 2. Azure AI Search 연결
azure_search = config["azure_search"]
search_db = AzureAISearchDB(
endpoint=azure_search["endpoint"],
api_key=azure_search["api_key"],
index_name=azure_search["index_name"],
api_version=azure_search["api_version"]
)
logger.info("✓ Azure AI Search 연결 완료")
# 3. 인덱스 생성
search_db.create_index()
logger.info("✓ 인덱스 생성 완료")
# 4. 임베딩 생성기 초기화
azure_openai = config["azure_openai"]
embedding_gen = EmbeddingGenerator(
api_key=azure_openai["api_key"],
endpoint=azure_openai["endpoint"],
model=azure_openai["embedding_model"],
dimension=azure_openai["embedding_dimension"],
api_version=azure_openai["api_version"]
)
logger.info("✓ 임베딩 생성기 초기화 완료")
# 5. 문서 데이터 로딩
data_file = project_root.parent / config["data"]["documents_file"]
if not data_file.exists():
logger.error(f"❌ 파일 없음: {data_file}")
sys.exit(1)
documents = load_documents_from_json(data_file)
logger.info(f"✓ 총 {len(documents)}개 문서 로드 완료")
# 6. 청킹 및 임베딩 생성
logger.info("청킹 및 임베딩 생성 시작")
all_chunks = []
for i, doc in enumerate(documents, 1):
logger.info(f" 처리 중: {i}/{len(documents)} - {doc.title}")
chunks = create_chunks(doc, embedding_gen)
all_chunks.extend(chunks)
logger.info(f"✓ 총 {len(all_chunks)}개 청크 생성 완료")
# 7. Azure AI Search에 업로드
logger.info("Azure AI Search 업로드 시작")
success = search_db.upload_documents(all_chunks)
if success:
logger.info(f"{len(all_chunks)}개 청크 업로드 완료")
else:
logger.error("❌ 업로드 실패")
sys.exit(1)
# 8. 통계 조회
stats = search_db.get_stats()
logger.info("=" * 60)
logger.info("관련자료 통계")
logger.info("=" * 60)
logger.info(f"전체 문서: {stats['total_documents']}")
logger.info(f"전체 청크: {stats['total_chunks']}")
logger.info("\n문서 타입별 통계:")
for doc_type, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True):
logger.info(f" - {doc_type}: {count}")
logger.info("=" * 60)
logger.info("관련자료 데이터 로딩 완료")
logger.info("=" * 60)
if __name__ == "__main__":
try:
main()
except Exception as e:
logger.error(f"오류 발생: {str(e)}", exc_info=True)
sys.exit(1)