From 9580de9c822ad33eeab1ed0bdb8822718972c7ca Mon Sep 17 00:00:00 2001 From: djeon Date: Thu, 30 Oct 2025 13:42:11 +0900 Subject: [PATCH] feat: add getting ralated document in realtime --- rag/requirements.txt | 1 - rag/src/api/__pycache__/main.cpython-311.pyc | Bin 29792 -> 29920 bytes .../minutes_routes.cpython-311.pyc | Bin 0 -> 4469 bytes rag/src/api/main.py | 4 +- rag/src/api/minutes_routes.py | 93 +++++++++++++++ .../eventhub_consumer.cpython-311.pyc | Bin 25793 -> 30066 bytes rag/src/services/eventhub_consumer.py | 110 +++++++++++++++++- 7 files changed, 204 insertions(+), 4 deletions(-) create mode 100644 rag/src/api/__pycache__/minutes_routes.cpython-311.pyc create mode 100644 rag/src/api/minutes_routes.py diff --git a/rag/requirements.txt b/rag/requirements.txt index b5ebe84..b70c488 100644 --- a/rag/requirements.txt +++ b/rag/requirements.txt @@ -51,7 +51,6 @@ structlog==23.2.0 pytest==7.4.3 pytest-asyncio==0.21.1 pytest-cov==4.1.0 -httpx==0.25.2 # Development black==23.12.0 diff --git a/rag/src/api/__pycache__/main.cpython-311.pyc b/rag/src/api/__pycache__/main.cpython-311.pyc index 1cc8c53882924a0d42710f9a81e68e16103437e3..fa862d55080d1997fd1e1e5c90b32cbd04a6efa9 100644 GIT binary patch delta 4539 zcmaJ@4Qx}_6@KseFU0vLev~mbI&>V{JneQBl*VP}4YH(2 z)VgaKO5p=^qfSW@DLPYpN|B`J;HN)v>E(%c+H^{Wb@=H|{|eBi5F438O3sv?GQb-A z;%jc|KHe;7{`7=tnV)F6xN_9G^1x7ch z=n>`?&QMX8qM|n`TR%hAl_J{*vRhDZ?jo8-^K40{&n)KWOqG1XSX8})=8~nOcwF(~5}Fxi$OzCB^GXf2p5ooZ6AYVI|a#=}}d+UeZ5;dLpRb|ehTU*X3L zRaG)s58;6w5Y!DR(hn!4H=?v%7Lp2iAVqR#(!)*QVflSM%pZrgGScqpH8f~|43#Au zCmnP#WNCAX5f5!5ox4Q~GaHeKDd_F@f5!MdOhhn+8KQKTBPUg4g zOe#Setc`z>Uj|Es1#Q`=fUQVCAU0Icq{v}LR`JUXf@#MU zOu}h}b;BRcOa?qr7F02K4u^1!b)XQ!A^z{;W;k|}tyu2$A{XPy1`zxJPJ?Fgkg(rF z281~_2bsEN<^g)DS}lEuMiSB#7N6oL=6Hp3Px2p?xLlJCmcT08F=Ib_0J#XKab+n# zY%k=Qr4^vTRr;A(c;@8`$`)h_*G#&vUhXR^TpM>b%R@&7C?SE+kY|X7be|odE#Y$j zHBSx2CSWjx7=O7e2&S5Bzu`B_%XM=>ZnfLj3Clqg+=IXXATZe;f6x~t)T2`NNWf2Z zW2Lwl56~#kNn<>0FE&MhbDlq8_d(1*wRgx5?ga+hhp-=E4B=}4PNSCLVUz?^kKgAX zjw`Q(70m!qj$6<&;v(VDU|?v{)b}e2OyeMXnV+sW0B5SH+-j~t)lNQ9+3i}0YG%<8 z84U+Q5mhhT8P#iK;c$dafaHhVP}Kj=E^1_Rn0gQvv@jVxv?kn?~4?_3x+WK#~pT%urFMe8_ z1~!8svn=4BRy3u45jzBA(p&yX{bHGLyQ?3fHs)GcAWF6Z^*x24w=93dH86r?O-G8;h#Dp1}<#0JJof(vT-W zRN)8@37x;?C()Ge! zagwCZPNQx_xGSP1%;8DUnr#wCRBVdcyQ_z3JQPa-=k~NMDebXI&W9=#^bVs{=e9GZ6*e_599nms}_YbfD zffi7h=&w~7r@Q5!EVxJ=OU4$tourLid`3U!Q&%lIde?p<@ zX#tLw7yp)t$`y%DQ`s$8krcjW!;5pT12tCU@{I;u%s1qUUnpGhIQ774e5TcIJg=C= z$&d1XZaP|X2bS3908W_|BAuEX;EpZe*`0;_LPx&5gn!V{2U%*|+%=gc=$y&xgD4r1 zvtOb@k)+#59YzoVNOrnjl=0M{n=@0B6(3QF*(5>}?)N7$S`e5_OSvAZDNra(<& zTg2PA)M{$Emd72vC*;>-=X+b^@tIX&)~n+G6ol~LJ_RxDKzcJvFhq8u8)K7hn9blu zPWjcGn%IFY1J?4Jums&$41eyCX@1wUP(I3Qz1W7ky?Y>(*S#(XWu|Y7RWBOcc$)`( zW2K)VnGQDZr|4Dg8tBu9!1lH|{P@7~dCjl~KAuJr@Z+Tc2^8<+S3zZ2Xz z;IH8IK8wP>0Me-^RG%gV`W5Hvs3ci^+AKZGnBipQ!3{f1Xi%+>R{*o3+yB9hm{#^H zglyz3KniPD%NUFXgC6nGfZMVtT`1?8nFZ|3)5pqW?aHX9n|G2r*}@+s8?3^Zh~7E= z7AYtb<&+0DuzaUKFlA(FGnIgfT`@Z~81oLn4H_bqyqGx$ctgM|FXulDJS^w(+kyWo zteXF0*Ft$2FWucd-iilkM_7Td6=6F9K^Q^^A*cwCBDfL6#i9c#yeaEfUvU?W;45C< zn27TPQbzzZ<7iloh)=tI&3Fx0-rk0YpAd$VQcNg@ttonhVTCe0oMwr_9W~5Cnt??_ z`i&327UCQ7Wm*fQD;lJ&tQlnCzohm(TOU;Nu1TdAr~l{mp`R9AlMGj-yvtJFq1G$2 VYc5I7E0Xh~D{lY* delta 4611 zcmaJ@3vg7`8Qy#Mk%YVIsbCN9C?1po95mGQ= zc#M?{wmp4`b&91$sj-4P7IX#&2JO>g>ZrB1pg7~SSin{W?bKH7|DU_Nd2Hx~{c;}v zlLV@!}e^0$+-@tzXG4{4ZSY|Jw8B^Qi?-@%RXk#IrA-Gl1*y%5BHUKg zQ0vsTVz?nMnbgMEdN9H(eJMmQYkc&0fw_p^uvkU3GRZPp6pmJ1jFqoP7@6y7_DDHs zmgIvxczhs6wms^x0X*hS@c898kGZ6g7Kc4H#aP)I@fg>vzbhX$=M_T|mO}tR5t6VX zMrm7AY2}!b9h6W(CaYo;wnw?H2CfBDaLwQs)o5M{yUqN^DMzK6(7e=Z($*?i&@673&rQ7y8;zFIC?;M?ni=ym zhP{emlVbU}ZBhgNepZe18iZP0Oi_D+0pfqir_kNh7hsLBxH;rb|Bg{==AUIWrHvUu zAM63X+L{krJ=TU)G{BZbpgZ)Eb&2d~fw@D{65W1Zm#16lcLjP>Y>b-7?z&Ly-GEznSZtJ4P@UcG=cR z`fL@hB0SBT@@hfxK;Ea84qR0DzWljef{|ILh-5W$pPx{#mOwmx)TesQcGf#cvM9%k1^XH6k5#>gX}2(PvOJx5(7nBETw2`4u7X; zy>lL#NuWN`@Avous$RvDO4-Ex{s21&k{|NTvnqk$@GMD|hWW9Qf^2bGOvLY_nuSp{ z=nb&7XugHNR#I8q2E!;9bd}UYUEV;C;_jjDUdCbRlaOupA}MhrDs2F88~M=kj8K1R zw#9NB7oXsdm3?>SNgN_EI0Mq2NagdY1!b8#;25Y$ur*{jv1`;({Z?ts5FACmf59Ww zKS4F2`!m!vvPlaPOM&&%vL-JmXOF=!%9htT7fPLcm$MzB_PTSaRfN3|?LCg5pI=^3 zUL2vhvV6^K@$TzzAI~c@0XS0G_i!A2L)OY)D$kXk3rx|#Z^fDt(8ra-#>C4oWUnHe;fJfY%@WF2V6LS@ z{RQYAap;3^1fitfK)F$Fs+m3bJPcbLN$fRTLO)s(-P7-Nd3?Hy-{Jc25nf05140_Y zRP5t)_G7XA4N3|sm`FW#aWKM{frr&%s@aDJ7;tca@-6a)?7D-zbQwZFjP z%UH)hhrtc_H)QyyCCOa-Zm6}c)R-tL?n&5U$N7PVLFpuy8uN3XLrW49<)!N8hxD(( zFL+g>L;9Gz8|zK@P2!&@^@XRU@Bv~o(9#G%wAyh&SGvm=HPsEGQg9}~&5`rB4*kgf z_ZNn49sH*K*3qBd9)70U&M=eVs3fr$ae@@Jj)np+%sfx z`Z2kNLM_+{n0Vm}Fw)YxJYFb5N))}vsAJjffSAIgS8H1MM8F3sTzwyb`e4%~X!4L8F5t1inH5sBk``PwJue2V@)18_(T z6EsC0VRc|GceTvqwsjd&Ij>mP20iSNb**E?42Mu}U}!;9mq_9-aVoNQ2S?vR5CMob z4ZXa>xk8$vX^%@)N;m~EiGn0q^i1(>EqUgDgKkD>e@lr`y39{*D1&bG#|=e0F$*mY zi-0_Que;7@6DLZh2H6AS(dTX06R;Lj9E|Zc+=aY+`pRTZar+hZc|2XVv!G*iD{4Tu|p zelNjqIeYU#MEvqF?2@88F+7&yd@w9ohHs|D1>qL!XFkoyf1hN`OVB8GkbZBFZl>e@WyyCcmo9mY=m*ZTuBUJDJlDBJKY&J>+a#cnZ-ct z)Qvf82PsDzK~lSQTSQ&Ol?)}SQ&E+xQhv@#^8<}!DN;q1;I*_BwPZ!A)bGuDFWb;m zYR7NB{oZ@u_1=8Pd;YD{X+!Wd{hf|9*%A61>Es^M67s}GA#??aD1bytq@z@f4$v_s zz)+;mL`^Ysz--{As3l;5F;mnUuo}N?fECT6CCbHY0T4qYiR}SB*++6*2g#qpn|%UK zn5RXn$cCvY?m7)~@Gi}{B-cb)u2=jk+DfH}cFBE;79HnJ*MaQQJZYf(sS=#4N^q@^ zutqfbk+&?l8HC!>SEe7|8sz6cxtt!qI`^w_9(r>VlYHmVLwx$m&*pwQHvisKdVFSn zbjJ2IvBFF1984ZdBoztk&V$E~AMH95l6sX$BJQR1imuaAT=6H8I3yjDBKik#6+iV&91Rr+KFOG3=| zmMqCKC<;cz0o_*Ugz~!&imdPs!TaPYj9x;Qm{$-5U!Frs5miqfA3>%2Hp(bDp8+8g z41g7nNR6PMqj9>FlS0GHFoLf}pJxXsg)6YpsGlnRj-{M_)QA+rhqF8$gc}EF#aWOb zGD?x(^e@vqWh(WvcPP<3K)+4*QHr}D1ElT&sl}@+vavuXTArnAwRM+R3ZDOTd-~8B z<|OK)yj*e=G}?H0R{G%0!Sv7w|M=$g<3HTy{r)aKJ$em1aPInTo59hE!*ZK}5&nW7 zv@JO@pFTg6zHpU4v@c74Oj43KUc^!=lL$#Rf;lTJ$?N6Q?~bK^@j)9OD(q9%)hfWb zpP0Z+KB9AuCQ?svHHM^~v1K1mWX9+>&j!Wo{*hWqzwwx>jGMXf;yH z|6NaRMNn4A?ik@UerxU~iCR8AG@d?x^3xD-+((Mw}eL3N9)2uf0hly8ZVBiR6^(S#S}Kg zpl6`=VN%m{TQZKNa72c}>oq~OgjxudRHr1}{G+6ll#CsfNksDe;NUlVeczQOEc--3 z>PsZzVc(mHnB?nA;2+ApLP+u*JP?p@!l=M94jI*WsQ?EtsqS)X?^(Uf_$tA$6qm4| zB=EVVQXIrmRDkaoTw-b66-|VMsJy#%83PaI$Tg577r#PZp+S@}@y?@E#)8&*XB+sj z!_z%qyrORS9<)@$Nv+{z#>~{L$)GGKCs+w^l$F`yiOwPwG%etElU1%*wUC*ROs*gorNzIJ%BZ~FAqk5irw&C>xI*VTa@1Jn|1 z1z23Lp@v4S-j_k}`chkhVlzXe@-m1E{=B?4M7xtDEiSL{Q0TACUxqvp`6sX%+3YDKURyRS*<4WK29SqIdcmX=>b4U?G$|QT zj!9Hvo`AIktOMXR83Bt)>cS-DU=IKze!tK2d+-a;)2p*5-{s+kTVRr;L-}6-7IU#x z?K2;t9=Yo^w`U@da=)m#UmP~ivaa{+qjt4&|I9`J&{FIHjXj{U2QsGh90VhJ7F;+; zA$Q|M$3#b}ahuk-EmgK%E8DKJ+ZP|%tHA5by9!{|RX@pmws(fQvH#Y7)zzMIwQH_+ zm2D@!HEXZTAQ}uThQv6>s6(yV`B}yE>o=-zRRanw#kOf|o65EsBGqlLsjlvnt6Ot* zt86!smN64ifXHOmE_OOQE79Fb+g^^jyLN5oF6QnoC*b$E_Pr+NzR3*ux%$gmy^pO4C1JyC{io*j3~n7Jb|kK=vGoGBk{0a33I0- zA*fJA7s~ANI57OxY(Zs%!MG5UfW&oW(KyebfON{tP0&#qb{tqCI(v%o+F}#uBSHMQzlmjTyR(vP?1=1V05p*jjS4g<)f} F{tYcvk~9DS literal 0 HcmV?d00001 diff --git a/rag/src/api/main.py b/rag/src/api/main.py index b5dd99b..ef30ce5 100644 --- a/rag/src/api/main.py +++ b/rag/src/api/main.py @@ -37,6 +37,7 @@ from ..utils.embedding import EmbeddingGenerator from ..utils.text_processor import extract_nouns_as_query from ..utils.redis_cache import RedisCache from . import term_routes +from . import minutes_routes # 로깅 설정 logging.basicConfig( @@ -63,6 +64,7 @@ app.add_middleware( # SSE 라우터 등록 app.include_router(term_routes.router) +app.include_router(minutes_routes.router) # 앱 시작/종료 이벤트 핸들러 @@ -732,4 +734,4 @@ async def get_related_minutes( if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) + uvicorn.run(app, host="0.0.0.0", port=8080) diff --git a/rag/src/api/minutes_routes.py b/rag/src/api/minutes_routes.py new file mode 100644 index 0000000..04c9e65 --- /dev/null +++ b/rag/src/api/minutes_routes.py @@ -0,0 +1,93 @@ +""" +연관 회의록 관련 API 엔드포인트 +""" +from fastapi import APIRouter, HTTPException +from sse_starlette.sse import EventSourceResponse +import asyncio +import json +import logging + +from ..services.sse_manager import sse_manager + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/rag/minutes", tags=["minutes"]) + + +@router.get("/stream/{session_id}") +async def stream_related_minutes(session_id: str): + """ + 연관 회의록 검색 결과 SSE 스트림 + + Args: + session_id: 회의 세션 ID + + Returns: + SSE 스트림 + """ + try: + # SSE 연결 등록 + queue = sse_manager.register(session_id) + logger.info(f"연관 회의록 스트림 시작: {session_id}") + + async def event_generator(): + """SSE 이벤트 생성기""" + try: + # 연결 확인 메시지 + yield { + "event": "connected", + "data": json.dumps({"session_id": session_id, "status": "connected"}) + } + + # 메시지 수신 및 전송 + while True: + try: + # Timeout을 두어 주기적으로 heartbeat 전송 + message = await asyncio.wait_for(queue.get(), timeout=30.0) + + yield { + "event": message["event"], + "data": json.dumps(message["data"]) + } + + except asyncio.TimeoutError: + # Heartbeat 전송 + yield { + "event": "heartbeat", + "data": json.dumps({"type": "heartbeat"}) + } + + except asyncio.CancelledError: + logger.info(f"연관 회의록 스트림 취소됨: {session_id}") + except Exception as e: + logger.error(f"이벤트 생성 중 에러: {str(e)}") + finally: + # 연결 정리 + sse_manager.unregister(session_id) + logger.info(f"연관 회의록 스트림 종료: {session_id}") + + return EventSourceResponse(event_generator()) + + except ValueError as e: + raise HTTPException(status_code=429, detail=str(e)) + except Exception as e: + logger.error(f"스트림 시작 실패: {str(e)}") + raise HTTPException(status_code=500, detail="스트림 시작 실패") + + +@router.get("/stream/{session_id}/status") +async def get_stream_status(session_id: str): + """ + 스트림 연결 상태 확인 + + Args: + session_id: 회의 세션 ID + + Returns: + 연결 상태 + """ + is_connected = sse_manager.is_connected(session_id) + + return { + "session_id": session_id, + "connected": is_connected + } diff --git a/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc b/rag/src/services/__pycache__/eventhub_consumer.cpython-311.pyc index b36bd70f0067730e21b00c91f332f92541a7a678..a60373daeb61b17dd95380c6ef5bbbfdd92c9e32 100644 GIT binary patch delta 4790 zcmb7H3s6+o8NPQPJa=JXVHc3c#pSg;uxnQB;s8E8{^EO&^f!^AbSP@rmG@G_O4;==n&o zeKPpO@Ujhz(J< z%+-P+_|(r7+Sn8HyTbIYljz?E;EA#bWVg9Nu;f}SLVb&=)zl~uLec&_!YPCo5MBiE zgj`7+=`>(c$8Aiol)+ZwD}H?dY+x4J`e7Ng@p@*B1K% z7_I(#WXaXYk}<_t)0IreBab;FO|D3jLv_zWlgl5iO@YC>lWX&sa~1J`?<;FE;K!90 zvT>2zwK#iYmi$_l5@czS;?%@7_d~k|CBXEf0s&X?k1YZ8%)*BQEF5k&nwOdsD7VSs zfGDUw#Xz&GbZVto53ngf8%VdMB%wAZ$j0(AKKNPL>Y$bkI&2cbzxZyFNmp`{vn zJtZ7l%R{Z<^k`a~5}K$6=gXtG5bvj-% z&lDt@pqVy;kDO-*y|{J~gycWmEC`+4Xm-p&sK`50eJSi?zYdqsXR_ zbJ#RSWYY|5Aw8BJ*L0d0knzUh%(+{KW$J*MJ0tf-B0)>i9(E7&q`Z#V!y2a za8r-BZ)E(^K5p{GBi?~i6K4jv@r(Ps2Yv)>dX(F+VFl;4cY6<>p}UmvTC4D+wT$z2 zkBz@MHZk<}A>-D z!%l^I^4aHbPKU6DQ7d-U9>P?$m>z^5!Jcq3?%X$K*5|>BgH;R;AevD2>u~*~@E@S3v!qicd&3XIn-u=T9N4p^` zz32!zZvo@&>-Bc`e{T}--DqdNzKeu{Cpiva4Dy6b4G-IeV5aaWt!8Vp;0bK+Ank1y z!K1P?>=5{lW=OFnt0${8 zYjE<&P#ITCmO=w#9Kl^fG7)}&a25gkB5?>q2(KZ;BP>TakMI+O4-qhuWJJ7Jhu*}Wn(vA~{zJx9|1l)V%O@yBzyoKHHPDforNd9(^ytcB? zRkC8{7R1%E!H=<@YxOQ|nvapG^*1!SKGkuRBcaWyX?JPb9h!ExM%NnvE|R?q9Kw!= zIg&buo(7PLQ?u8l+3V2k6@9-_MINm_RPBh_c_jn@%k~{pLd;I5s@bJ#cBqL>QXIrsFuQ2EXm+XTy#CL@@it`yUQz`iK|_St9#bXG++Nl z7K0OdJk$}tYv@4$DA(#9 zb!_9Eb%Lu-kTif!4RL9RLql#DvWGT}n4E@Em!Z_h1VrdWI9AWec)vR%&y`W`PRVkm6uC1R z#Y%R8qi}^Qd!;WT*a#B@xXA=Z>u(`IbsA!r*qbZsj|8 z;Ilw)AXtz@x|6Go%sCF=a`0+Hb&`?!Rc->*K32ye8bj7(%0Dj5UXv>SBvrX4b9AKq z6=lF6p8VJ7=88-$vYkg^eX3nBhDs##YkIPxpS?U=Y0uh{Xi3ESpAf!8_%n5_waB!x(bW$nvGF2RA|ZNIZKToE3K@l9GD5bb zC)O=diX2IAta~}K2UNxgiEw|5Ep)VNvrZ>5waCvRD@!Czf4Y9vPVx!3No+hbTXGF` zvlRSStcu(`j?x5z7hw_s3EHE!2o0F@EcGIFeujp>A^aWTU4(aNXU!(|@1xghZp%Yw z$NFsa+Q#Lw^&ap>x}mMriub&B6M_FgzXqKp006Im94Arq zx3slbPxoz(mQ~AW-{$w&Sh}Uonu5$t(3b=vV2H>H1o7*Bi&77rtc#Z=$wouB1hHM4 zWXi4Vh;^*;UF}%arM*5zRNVcCD0o&0gdP^*Q;HIt*XV>(8&0hC1xocGCOXC!EXg4# zhe|SYx1TlgY~^U>R<$xf35mlj8-3)lJuLh0(N9eYY|%e)s$%xiqJ;m*6LcfE1Bd|u qQ`McI@|HHdnk*v`>}XO$6r2B0Fq`RP?v6J_{VjiMQzYg2ZT|%#S;Uh7 delta 1522 zcmZvaZBSHI7{~9q`vS5oFS4+@@!B%bA_&Wik{}3Jnc0|u$bhk4_Fmv3>~7~?RB%;t zXeQG#ef&TJjixhFlat7GCdf*yoUt*R#X$2q+VsJxp)X`Q&NMZ4o?R_Jcy{maKIeJ) zKj++Y?#z<&-;uact2KsW<3aRZAP;J|#Fd50tDpc$wJL-}?nTF$g;t?*U8 zoe$dK=jLRnu1$dS>=d#bh|^|ta%v^G3erdhe3+eVs8VadQ;te6%Rt{|GdNgW z#rjVbe?+pNza&WZ!Tpll7vI6e{S3NIk?0oLD=CW=Me_6rKA}URbT7u=MZAYNfH=sY z$19Rg^r(K%!}hSJG}Qta4j>M}SZPj9WOq2vc27{l8p$-!`gZJmZTqr%0+%YXU}#Nj zf;^SbHJ#8Edi00V%8eyOY$=0T4b|LeZ7Sn8rdO7;*R(MW>2x5gV;VWb%)I>d6^5A# zQzNIhPg7Cb}X%QvG zOXYyd4pg@&-cCvE?UuwERgdoWdj)iB8#ay;^90tMf%w*?$q^Ihmk{40E<;PJ!hcM( zP-_OsKg3*m?0;$=LDggZv_tU8{Sx)aq8`&D`+8MLVK!JcRmADB?XpkkW?_*f!%LeE zTL+njE0L~5^+~wDDaUdQO$Xuwct&_(mmd2_MHEzlVpDns@de^b#1+I@#E*z+#0=sp z;u_*7#B~PUsz_eku;K~pUBrNnxPeg%6*@c*8`5#+ftx}Dc~dKHyJxWc-}a(b+`f{p zID&QI&x0Y|*yWdfE_3889!tiU)$8}El27#n1j=q_2u^j{$Sv4kW`~=d$^0(_?sk4n z7Qkn+n!)ZPN23svi02UDbH9bwAgt|553+c>v>ZP8B-_f&LvH(I=yp_3p<|t z>N09~HpiRHe=?Z|M2mWTD diff --git a/rag/src/services/eventhub_consumer.py b/rag/src/services/eventhub_consumer.py index a4c8d7f..2b784ab6 100644 --- a/rag/src/services/eventhub_consumer.py +++ b/rag/src/services/eventhub_consumer.py @@ -130,7 +130,7 @@ class EventHubConsumer: await self._process_minutes_event(event_data) elif event_type == "SegmentCreated": - # 세그먼트 생성 이벤트 - 용어검색 실행 + # 세그먼트 생성 이벤트 - 용어검색 및 연관 회의록 검색 실행 await self._process_segment_event(event_data) # Checkpoint 업데이트 @@ -294,7 +294,8 @@ class EventHubConsumer: # 7. SSE를 통해 결과 전송 # Event Hub 메시지에서 sessionId 추출 (여러 필드 확인) - session_id = event_data.get("sessionId") or event_data.get("session_id") or event_data.get("meetingId") or meeting_id + session_id = event_data.get("sessionId") + # session_id = event_data.get("sessionId") or event_data.get("session_id") or event_data.get("meetingId") or meeting_id logger.info(f"SSE 전송 시도: sessionId={session_id}, meetingId={meeting_id}") @@ -337,9 +338,114 @@ class EventHubConsumer: else: logger.warning("이벤트 데이터에 sessionId가 없어 SSE 전송을 건너뜁니다") + # 8. 연관 회의록 검색 및 SSE 전송 + await self._search_and_send_related_minutes(text, session_id, segment_id, meeting_id) + except Exception as e: logger.error(f"세그먼트 이벤트 처리 실패: {str(e)}", exc_info=True) + async def _search_and_send_related_minutes( + self, + text: str, + session_id: Optional[str], + segment_id: str, + meeting_id: str + ): + """ + 연관 회의록 검색 및 SSE 전송 + + Args: + text: 세그먼트 텍스트 + session_id: 세션 ID + segment_id: 세그먼트 ID + meeting_id: 회의 ID + """ + try: + # RAG Minutes DB가 없으면 스킵 + if not self.rag_minutes_db: + logger.debug("RAG Minutes DB가 설정되지 않아 연관 회의록 검색을 스킵합니다") + return + + if not text: + logger.warning(f"세그먼트 {segment_id}에 텍스트가 없어 연관 회의록 검색을 스킵합니다") + return + + logger.info(f"세그먼트 연관 회의록 검색 시작: {segment_id} (회의: {meeting_id})") + logger.info(f"검색 텍스트: {text[:100]}...") + + # 1. 텍스트를 임베딩으로 변환 + query_embedding = self.embedding_gen.generate_embedding(text) + logger.info(f"임베딩 생성 완료: {len(query_embedding)}차원") + + # 2. 연관 회의록 검색 설정 + config = self.config.get("rag_minutes", {}) + search_config = config.get("search", {}) + + top_k = search_config.get("top_k", 5) + similarity_threshold = search_config.get("similarity_threshold", 0.7) + + # 3. 벡터 유사도 검색 + results = self.rag_minutes_db.search_by_vector( + query_embedding=query_embedding, + top_k=top_k, + similarity_threshold=similarity_threshold + ) + + # 4. 검색 결과 로깅 + if results: + logger.info(f"세그먼트 {segment_id} 연관 회의록 검색 완료: {len(results)}개 발견") + for idx, result in enumerate(results, 1): + minutes = result["minutes"] + score = result["similarity_score"] + logger.info( + f" [{idx}] {minutes.title} " + f"(회의 ID: {minutes.meeting_id}, 유사도: {score:.3f})" + ) + else: + logger.info(f"세그먼트 {segment_id}에서 연관 회의록을 찾지 못했습니다") + + # 5. SSE를 통해 결과 전송 + if session_id: + from ..services.sse_manager import sse_manager + + # 회의록 정보를 직렬화 가능한 형태로 변환 + minutes_data = [] + for result in results: + minutes = result["minutes"] + minutes_data.append({ + "minutes_id": minutes.minutes_id, + "meeting_id": minutes.meeting_id, + "title": minutes.title, + "purpose": minutes.purpose, + "scheduled_at": minutes.scheduled_at, + "location": minutes.location, + "finalized_at": minutes.finalized_at, + "similarity_score": result["similarity_score"] + }) + + # SSE로 전송 + success = await sse_manager.send_to_session( + session_id=session_id, + data={ + "segment_id": segment_id, + "meeting_id": meeting_id, + "text": text[:100], # 텍스트 일부만 전송 + "related_minutes": minutes_data, + "total_count": len(minutes_data) + }, + event_type="related_minutes_result" + ) + + if success: + logger.info(f"연관 회의록 검색 결과를 SSE로 전송 완료: {session_id}") + else: + logger.warning(f"SSE 전송 실패 (세션 미연결): {session_id}") + else: + logger.warning("세션 ID가 없어 연관 회의록 SSE 전송을 건너뜁니다") + + except Exception as e: + logger.error(f"연관 회의록 검색 및 전송 실패: {str(e)}", exc_info=True) + def _convert_datetime_array_to_string(self, value: Union[str, List, None]) -> Optional[str]: """ Java LocalDateTime 배열을 ISO 8601 문자열로 변환