mirror of
https://github.com/hwanny1128/HGZero.git
synced 2025-12-06 13:46:24 +00:00
247 lines
7.8 KiB
Python
247 lines
7.8 KiB
Python
"""
|
|
관련자료 데이터 로딩 스크립트
|
|
"""
|
|
import sys
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List
|
|
from datetime import datetime
|
|
|
|
# 프로젝트 루트 디렉토리를 Python 경로에 추가
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from src.models.document import Document, DocumentChunk, DocumentMetadata
|
|
from src.db.azure_search import AzureAISearchDB
|
|
from src.utils.config import load_config
|
|
from src.utils.embedding import EmbeddingGenerator
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def load_documents_from_json(file_path: Path) -> List[Document]:
|
|
"""
|
|
JSON 파일에서 문서 데이터 로드
|
|
|
|
Args:
|
|
file_path: JSON 파일 경로
|
|
|
|
Returns:
|
|
문서 리스트
|
|
"""
|
|
logger.info(f"JSON 파일 로딩: {file_path}")
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
documents = []
|
|
|
|
# 업무 도메인별 데이터 처리
|
|
for domain, doc_types in data.get("sample_data", {}).items():
|
|
for doc_type, docs in doc_types.items():
|
|
for doc_data in docs:
|
|
# Metadata 파싱
|
|
metadata = None
|
|
if "metadata" in doc_data:
|
|
metadata = DocumentMetadata(**doc_data["metadata"])
|
|
|
|
# Document 객체 생성
|
|
doc = Document(
|
|
document_id=doc_data["document_id"],
|
|
document_type=doc_data["document_type"],
|
|
business_domain=doc_data.get("business_domain"),
|
|
title=doc_data["title"],
|
|
content=doc_data["content"],
|
|
summary=doc_data["summary"],
|
|
keywords=doc_data.get("keywords", []),
|
|
created_date=doc_data.get("created_date"),
|
|
participants=doc_data.get("participants", []),
|
|
metadata=metadata,
|
|
embedding=None # 나중에 생성
|
|
)
|
|
|
|
documents.append(doc)
|
|
|
|
logger.info(f" → {len(documents)}개 문서 로드 완료")
|
|
return documents
|
|
|
|
|
|
def create_chunks(
|
|
document: Document,
|
|
embedding_gen: EmbeddingGenerator,
|
|
max_tokens: int = 2000
|
|
) -> List[DocumentChunk]:
|
|
"""
|
|
문서를 청크로 분할 및 임베딩 생성
|
|
|
|
Args:
|
|
document: 문서
|
|
embedding_gen: 임베딩 생성기
|
|
max_tokens: 최대 토큰 수
|
|
|
|
Returns:
|
|
문서 청크 리스트
|
|
"""
|
|
chunks = []
|
|
|
|
# 전체 문서를 하나의 청크로 처리 (간단한 구현)
|
|
# 실제로는 안건 단위로 분할해야 함
|
|
content = document.content
|
|
token_count = embedding_gen.get_token_count(content)
|
|
|
|
if token_count > max_tokens:
|
|
# 간단한 분할: 문단 단위
|
|
paragraphs = content.split("\n\n")
|
|
current_chunk = ""
|
|
chunk_index = 0
|
|
|
|
for para in paragraphs:
|
|
test_chunk = current_chunk + "\n\n" + para if current_chunk else para
|
|
if embedding_gen.get_token_count(test_chunk) > max_tokens:
|
|
# 현재 청크 저장
|
|
if current_chunk:
|
|
chunks.append({
|
|
"content": current_chunk,
|
|
"chunk_index": chunk_index
|
|
})
|
|
chunk_index += 1
|
|
|
|
current_chunk = para
|
|
else:
|
|
current_chunk = test_chunk
|
|
|
|
# 마지막 청크 저장
|
|
if current_chunk:
|
|
chunks.append({
|
|
"content": current_chunk,
|
|
"chunk_index": chunk_index
|
|
})
|
|
|
|
else:
|
|
# 토큰 수가 적으면 하나의 청크로
|
|
chunks.append({
|
|
"content": content,
|
|
"chunk_index": 0
|
|
})
|
|
|
|
# 임베딩 생성
|
|
chunk_texts = [chunk["content"] for chunk in chunks]
|
|
embeddings = embedding_gen.generate_embeddings_batch(chunk_texts)
|
|
|
|
# DocumentChunk 객체 생성
|
|
document_chunks = []
|
|
for chunk_data, embedding in zip(chunks, embeddings):
|
|
chunk = DocumentChunk(
|
|
id=f"{document.document_id}_chunk_{chunk_data['chunk_index']}",
|
|
document_id=document.document_id,
|
|
document_type=document.document_type,
|
|
title=document.title,
|
|
folder=document.metadata.folder if document.metadata else None,
|
|
created_date=document.created_date,
|
|
participants=document.participants,
|
|
keywords=document.keywords,
|
|
agenda_id=None, # 간단한 구현에서는 None
|
|
agenda_title=None,
|
|
chunk_index=chunk_data["chunk_index"],
|
|
content=chunk_data["content"],
|
|
content_vector=embedding,
|
|
token_count=embedding_gen.get_token_count(chunk_data["content"])
|
|
)
|
|
document_chunks.append(chunk)
|
|
|
|
return document_chunks
|
|
|
|
|
|
def main():
|
|
"""메인 함수"""
|
|
logger.info("=" * 60)
|
|
logger.info("관련자료 데이터 로딩 시작")
|
|
logger.info("=" * 60)
|
|
|
|
# 1. 설정 로드
|
|
config = load_config(str(project_root / "config.yaml"))
|
|
logger.info("✓ 설정 로드 완료")
|
|
|
|
# 2. Azure AI Search 연결
|
|
azure_search = config["azure_search"]
|
|
search_db = AzureAISearchDB(
|
|
endpoint=azure_search["endpoint"],
|
|
api_key=azure_search["api_key"],
|
|
index_name=azure_search["index_name"],
|
|
api_version=azure_search["api_version"]
|
|
)
|
|
logger.info("✓ Azure AI Search 연결 완료")
|
|
|
|
# 3. 인덱스 생성
|
|
search_db.create_index()
|
|
logger.info("✓ 인덱스 생성 완료")
|
|
|
|
# 4. 임베딩 생성기 초기화
|
|
azure_openai = config["azure_openai"]
|
|
embedding_gen = EmbeddingGenerator(
|
|
api_key=azure_openai["api_key"],
|
|
endpoint=azure_openai["endpoint"],
|
|
model=azure_openai["embedding_model"],
|
|
dimension=azure_openai["embedding_dimension"],
|
|
api_version=azure_openai["api_version"]
|
|
)
|
|
logger.info("✓ 임베딩 생성기 초기화 완료")
|
|
|
|
# 5. 문서 데이터 로딩
|
|
data_file = project_root.parent / config["data"]["documents_file"]
|
|
if not data_file.exists():
|
|
logger.error(f"❌ 파일 없음: {data_file}")
|
|
sys.exit(1)
|
|
|
|
documents = load_documents_from_json(data_file)
|
|
logger.info(f"✓ 총 {len(documents)}개 문서 로드 완료")
|
|
|
|
# 6. 청킹 및 임베딩 생성
|
|
logger.info("청킹 및 임베딩 생성 시작")
|
|
all_chunks = []
|
|
|
|
for i, doc in enumerate(documents, 1):
|
|
logger.info(f" 처리 중: {i}/{len(documents)} - {doc.title}")
|
|
chunks = create_chunks(doc, embedding_gen)
|
|
all_chunks.extend(chunks)
|
|
|
|
logger.info(f"✓ 총 {len(all_chunks)}개 청크 생성 완료")
|
|
|
|
# 7. Azure AI Search에 업로드
|
|
logger.info("Azure AI Search 업로드 시작")
|
|
success = search_db.upload_documents(all_chunks)
|
|
|
|
if success:
|
|
logger.info(f"✓ {len(all_chunks)}개 청크 업로드 완료")
|
|
else:
|
|
logger.error("❌ 업로드 실패")
|
|
sys.exit(1)
|
|
|
|
# 8. 통계 조회
|
|
stats = search_db.get_stats()
|
|
logger.info("=" * 60)
|
|
logger.info("관련자료 통계")
|
|
logger.info("=" * 60)
|
|
logger.info(f"전체 문서: {stats['total_documents']}개")
|
|
logger.info(f"전체 청크: {stats['total_chunks']}개")
|
|
logger.info("\n문서 타입별 통계:")
|
|
for doc_type, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True):
|
|
logger.info(f" - {doc_type}: {count}개")
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("관련자료 데이터 로딩 완료")
|
|
logger.info("=" * 60)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
logger.error(f"오류 발생: {str(e)}", exc_info=True)
|
|
sys.exit(1)
|