diff --git a/analytics/src/main/java/com/ktds/hi/analytics/infra/config/EventHubConfig.java b/analytics/src/main/java/com/ktds/hi/analytics/infra/config/EventHubConfig.java index 0037a39..67e2ac5 100644 --- a/analytics/src/main/java/com/ktds/hi/analytics/infra/config/EventHubConfig.java +++ b/analytics/src/main/java/com/ktds/hi/analytics/infra/config/EventHubConfig.java @@ -1,9 +1,11 @@ package com.ktds.hi.analytics.infra.config; import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient; import com.azure.messaging.eventhubs.EventHubConsumerClient; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; +import com.azure.storage.blob.BlobContainerAsyncClient; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClientBuilder; import lombok.extern.slf4j.Slf4j; @@ -36,22 +38,44 @@ public class EventHubConfig { @Value("${azure.eventhub.event-hubs.ai-analysis-events}") private String aiAnalysisEventsHub; + + + @Bean + public BlobContainerAsyncClient blobContainerAsyncClient() { + return new BlobServiceClientBuilder() + .connectionString(storageConnectionString) + .buildAsyncClient() // 비동기 클라이언트 생성 + .getBlobContainerAsyncClient(containerName); + } + + /** + * Blob 체크포인트 스토어 생성 (BlobContainerAsyncClient 사용) + */ + @Bean + public BlobCheckpointStore checkpointStore(BlobContainerAsyncClient blobContainerAsyncClient) { + return new BlobCheckpointStore(blobContainerAsyncClient); + } /** * 리뷰 이벤트 수신용 Consumer 클라이언트 */ @Bean("reviewEventConsumer") public EventHubConsumerClient reviewEventConsumer() { - BlobContainerClient blobContainerClient = createBlobContainerClient(); - BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerClient); - return new EventHubClientBuilder() - .connectionString(connectionString, reviewEventsHub) - .consumerGroup(consumerGroup) - .checkpointStore(checkpointStore) - .buildConsumerClient(); + .connectionString(connectionString, reviewEventsHub) + .consumerGroup(consumerGroup) + .buildConsumerClient(); } - + @Bean("reviewEventConsumerAsync") + public EventHubConsumerAsyncClient reviewEventConsumerAsync() { + return new EventHubClientBuilder() + .connectionString(connectionString, reviewEventsHub) + .consumerGroup(consumerGroup) + .buildAsyncConsumerClient(); + } + + + /** * AI 분석 결과 발행용 Producer 클라이언트 */ @@ -61,14 +85,6 @@ public class EventHubConfig { .connectionString(connectionString, aiAnalysisEventsHub) .buildProducerClient(); } - - /** - * Blob 컨테이너 클라이언트 생성 - */ - private BlobContainerClient createBlobContainerClient() { - return new BlobServiceClientBuilder() - .connectionString(storageConnectionString) - .buildClient() - .getBlobContainerClient(containerName); - } + + } diff --git a/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventAdapter.java b/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventAdapter.java index 80eca66..dac4a43 100644 --- a/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventAdapter.java +++ b/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventAdapter.java @@ -19,6 +19,23 @@ public class EventAdapter implements EventPort { private final ApplicationEventPublisher eventPublisher; + @Override + public void publishAnalysisCompletedEvent(Long storeId, AnalysisType analysisType) { + log.info("분석 완료 이벤트 발행: storeId={}, analysisType={}", storeId, analysisType); + + try { + // 분석 완료 이벤트 객체 생성 및 발행 + AnalysisCompletedEvent event = new AnalysisCompletedEvent(storeId, analysisType); + eventPublisher.publishEvent(event); + + log.info("분석 완료 이벤트 발행 완료: storeId={}, analysisType={}", storeId, analysisType); + + } catch (Exception e) { + log.error("분석 완료 이벤트 발행 실패: storeId={}, analysisType={}, error={}", + storeId, analysisType, e.getMessage(), e); + } + } + @Override public void publishActionPlanCreatedEvent(ActionPlan actionPlan) { log.info("실행 계획 생성 이벤트 발행: planId={}, storeId={}", actionPlan.getId(), actionPlan.getStoreId()); @@ -49,4 +66,25 @@ public class EventAdapter implements EventPort { return actionPlan; } } + + /** + * 분석 완료 이벤트 클래스 + */ + public static class AnalysisCompletedEvent { + private final Long storeId; + private final AnalysisType analysisType; + + public AnalysisCompletedEvent(Long storeId, AnalysisType analysisType) { + this.storeId = storeId; + this.analysisType = analysisType; + } + + public Long getStoreId() { + return storeId; + } + + public AnalysisType getAnalysisType() { + return analysisType; + } + } } diff --git a/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventHubAdapter.java b/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventHubAdapter.java index 664a2ea..4af64f2 100644 --- a/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventHubAdapter.java +++ b/analytics/src/main/java/com/ktds/hi/analytics/infra/gateway/EventHubAdapter.java @@ -1,6 +1,7 @@ package com.ktds.hi.analytics.infra.gateway; import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventDataBatch; import com.azure.messaging.eventhubs.EventHubConsumerClient; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.models.EventPosition; @@ -9,13 +10,14 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.ktds.hi.analytics.biz.domain.ActionPlan; import com.ktds.hi.analytics.biz.domain.AnalysisType; import com.ktds.hi.analytics.biz.usecase.out.EventPort; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -70,8 +72,11 @@ public class EventHubAdapter implements EventPort { String jsonData = objectMapper.writeValueAsString(eventData); EventData event = new EventData(jsonData); - - aiAnalysisEventProducer.send(event); + + EventDataBatch batch = aiAnalysisEventProducer.createBatch(); + if (batch.tryAdd(event)) { + aiAnalysisEventProducer.send(batch); // EventDataBatch로 전송 + } log.info("분석 완료 이벤트 발행: storeId={}, type={}", storeId, analysisType); } catch (Exception e) { @@ -91,8 +96,13 @@ public class EventHubAdapter implements EventPort { String jsonData = objectMapper.writeValueAsString(eventData); EventData event = new EventData(jsonData); - - aiAnalysisEventProducer.send(event); + + // ✅ 올바른 사용법 + EventDataBatch batch = aiAnalysisEventProducer.createBatch(); + if (batch.tryAdd(event)) { + aiAnalysisEventProducer.send(batch); // EventDataBatch로 전송 + } + log.info("실행계획 생성 이벤트 발행: planId={}, storeId={}", actionPlan.getId(), actionPlan.getStoreId()); @@ -108,9 +118,16 @@ public class EventHubAdapter implements EventPort { log.info("리뷰 이벤트 수신 시작"); try { - reviewEventConsumer.receiveFromPartition("0", EventPosition.earliest()) - .timeout(Duration.ofSeconds(30)) - .subscribe(this::handleReviewEvent); + Iterable events = reviewEventConsumer.receiveFromPartition( + "0", // 파티션 ID + 100, // 최대 이벤트 수 + EventPosition.earliest(), // 시작 위치 + Duration.ofSeconds(30) // 타임아웃 + ); + + for (PartitionEvent partitionEvent : events) { + handleReviewEvent(partitionEvent); + } } catch (Exception e) { log.error("리뷰 이벤트 수신 중 오류 발생", e);