""" 관련자료 데이터 로딩 스크립트 """ import sys import json import logging from pathlib import Path from typing import List from datetime import datetime # 프로젝트 루트 디렉토리를 Python 경로에 추가 project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from src.models.document import Document, DocumentChunk, DocumentMetadata from src.db.azure_search import AzureAISearchDB from src.utils.config import load_config from src.utils.embedding import EmbeddingGenerator logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def load_documents_from_json(file_path: Path) -> List[Document]: """ JSON 파일에서 문서 데이터 로드 Args: file_path: JSON 파일 경로 Returns: 문서 리스트 """ logger.info(f"JSON 파일 로딩: {file_path}") with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) documents = [] # 업무 도메인별 데이터 처리 for domain, doc_types in data.get("sample_data", {}).items(): for doc_type, docs in doc_types.items(): for doc_data in docs: # Metadata 파싱 metadata = None if "metadata" in doc_data: metadata = DocumentMetadata(**doc_data["metadata"]) # Document 객체 생성 doc = Document( document_id=doc_data["document_id"], document_type=doc_data["document_type"], business_domain=doc_data.get("business_domain"), title=doc_data["title"], content=doc_data["content"], summary=doc_data["summary"], keywords=doc_data.get("keywords", []), created_date=doc_data.get("created_date"), participants=doc_data.get("participants", []), metadata=metadata, embedding=None # 나중에 생성 ) documents.append(doc) logger.info(f" → {len(documents)}개 문서 로드 완료") return documents def create_chunks( document: Document, embedding_gen: EmbeddingGenerator, max_tokens: int = 2000 ) -> List[DocumentChunk]: """ 문서를 청크로 분할 및 임베딩 생성 Args: document: 문서 embedding_gen: 임베딩 생성기 max_tokens: 최대 토큰 수 Returns: 문서 청크 리스트 """ chunks = [] # 전체 문서를 하나의 청크로 처리 (간단한 구현) # 실제로는 안건 단위로 분할해야 함 content = document.content token_count = embedding_gen.get_token_count(content) if token_count > max_tokens: # 간단한 분할: 문단 단위 paragraphs = content.split("\n\n") current_chunk = "" chunk_index = 0 for para in paragraphs: test_chunk = current_chunk + "\n\n" + para if current_chunk else para if embedding_gen.get_token_count(test_chunk) > max_tokens: # 현재 청크 저장 if current_chunk: chunks.append({ "content": current_chunk, "chunk_index": chunk_index }) chunk_index += 1 current_chunk = para else: current_chunk = test_chunk # 마지막 청크 저장 if current_chunk: chunks.append({ "content": current_chunk, "chunk_index": chunk_index }) else: # 토큰 수가 적으면 하나의 청크로 chunks.append({ "content": content, "chunk_index": 0 }) # 임베딩 생성 chunk_texts = [chunk["content"] for chunk in chunks] embeddings = embedding_gen.generate_embeddings_batch(chunk_texts) # DocumentChunk 객체 생성 document_chunks = [] for chunk_data, embedding in zip(chunks, embeddings): chunk = DocumentChunk( id=f"{document.document_id}_chunk_{chunk_data['chunk_index']}", document_id=document.document_id, document_type=document.document_type, title=document.title, folder=document.metadata.folder if document.metadata else None, created_date=document.created_date, participants=document.participants, keywords=document.keywords, agenda_id=None, # 간단한 구현에서는 None agenda_title=None, chunk_index=chunk_data["chunk_index"], content=chunk_data["content"], content_vector=embedding, token_count=embedding_gen.get_token_count(chunk_data["content"]) ) document_chunks.append(chunk) return document_chunks def main(): """메인 함수""" logger.info("=" * 60) logger.info("관련자료 데이터 로딩 시작") logger.info("=" * 60) # 1. 설정 로드 config = load_config(str(project_root / "config.yaml")) logger.info("✓ 설정 로드 완료") # 2. Azure AI Search 연결 azure_search = config["azure_search"] search_db = AzureAISearchDB( endpoint=azure_search["endpoint"], api_key=azure_search["api_key"], index_name=azure_search["index_name"], api_version=azure_search["api_version"] ) logger.info("✓ Azure AI Search 연결 완료") # 3. 인덱스 생성 search_db.create_index() logger.info("✓ 인덱스 생성 완료") # 4. 임베딩 생성기 초기화 azure_openai = config["azure_openai"] embedding_gen = EmbeddingGenerator( api_key=azure_openai["api_key"], endpoint=azure_openai["endpoint"], model=azure_openai["embedding_model"], dimension=azure_openai["embedding_dimension"], api_version=azure_openai["api_version"] ) logger.info("✓ 임베딩 생성기 초기화 완료") # 5. 문서 데이터 로딩 data_file = project_root.parent / config["data"]["documents_file"] if not data_file.exists(): logger.error(f"❌ 파일 없음: {data_file}") sys.exit(1) documents = load_documents_from_json(data_file) logger.info(f"✓ 총 {len(documents)}개 문서 로드 완료") # 6. 청킹 및 임베딩 생성 logger.info("청킹 및 임베딩 생성 시작") all_chunks = [] for i, doc in enumerate(documents, 1): logger.info(f" 처리 중: {i}/{len(documents)} - {doc.title}") chunks = create_chunks(doc, embedding_gen) all_chunks.extend(chunks) logger.info(f"✓ 총 {len(all_chunks)}개 청크 생성 완료") # 7. Azure AI Search에 업로드 logger.info("Azure AI Search 업로드 시작") success = search_db.upload_documents(all_chunks) if success: logger.info(f"✓ {len(all_chunks)}개 청크 업로드 완료") else: logger.error("❌ 업로드 실패") sys.exit(1) # 8. 통계 조회 stats = search_db.get_stats() logger.info("=" * 60) logger.info("관련자료 통계") logger.info("=" * 60) logger.info(f"전체 문서: {stats['total_documents']}개") logger.info(f"전체 청크: {stats['total_chunks']}개") logger.info("\n문서 타입별 통계:") for doc_type, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True): logger.info(f" - {doc_type}: {count}개") logger.info("=" * 60) logger.info("관련자료 데이터 로딩 완료") logger.info("=" * 60) if __name__ == "__main__": try: main() except Exception as e: logger.error(f"오류 발생: {str(e)}", exc_info=True) sys.exit(1)