Fix : analytis 수정

This commit is contained in:
lsh9672 2025-06-12 15:06:02 +09:00
parent 2345a4a41a
commit 83dca7262b
3 changed files with 98 additions and 27 deletions

View File

@ -1,9 +1,11 @@
package com.ktds.hi.analytics.infra.config;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import lombok.extern.slf4j.Slf4j;
@ -37,20 +39,42 @@ public class EventHubConfig {
@Value("${azure.eventhub.event-hubs.ai-analysis-events}")
private String aiAnalysisEventsHub;
@Bean
public BlobContainerAsyncClient blobContainerAsyncClient() {
return new BlobServiceClientBuilder()
.connectionString(storageConnectionString)
.buildAsyncClient() // 비동기 클라이언트 생성
.getBlobContainerAsyncClient(containerName);
}
/**
* Blob 체크포인트 스토어 생성 (BlobContainerAsyncClient 사용)
*/
@Bean
public BlobCheckpointStore checkpointStore(BlobContainerAsyncClient blobContainerAsyncClient) {
return new BlobCheckpointStore(blobContainerAsyncClient);
}
/**
* 리뷰 이벤트 수신용 Consumer 클라이언트
*/
@Bean("reviewEventConsumer")
public EventHubConsumerClient reviewEventConsumer() {
BlobContainerClient blobContainerClient = createBlobContainerClient();
BlobCheckpointStore checkpointStore = new BlobCheckpointStore(blobContainerClient);
return new EventHubClientBuilder()
.connectionString(connectionString, reviewEventsHub)
.consumerGroup(consumerGroup)
.checkpointStore(checkpointStore)
.buildConsumerClient();
}
@Bean("reviewEventConsumerAsync")
public EventHubConsumerAsyncClient reviewEventConsumerAsync() {
return new EventHubClientBuilder()
.connectionString(connectionString, reviewEventsHub)
.consumerGroup(consumerGroup)
.buildAsyncConsumerClient();
}
/**
* AI 분석 결과 발행용 Producer 클라이언트
@ -62,13 +86,5 @@ public class EventHubConfig {
.buildProducerClient();
}
/**
* Blob 컨테이너 클라이언트 생성
*/
private BlobContainerClient createBlobContainerClient() {
return new BlobServiceClientBuilder()
.connectionString(storageConnectionString)
.buildClient()
.getBlobContainerClient(containerName);
}
}

View File

@ -19,6 +19,23 @@ public class EventAdapter implements EventPort {
private final ApplicationEventPublisher eventPublisher;
@Override
public void publishAnalysisCompletedEvent(Long storeId, AnalysisType analysisType) {
log.info("분석 완료 이벤트 발행: storeId={}, analysisType={}", storeId, analysisType);
try {
// 분석 완료 이벤트 객체 생성 발행
AnalysisCompletedEvent event = new AnalysisCompletedEvent(storeId, analysisType);
eventPublisher.publishEvent(event);
log.info("분석 완료 이벤트 발행 완료: storeId={}, analysisType={}", storeId, analysisType);
} catch (Exception e) {
log.error("분석 완료 이벤트 발행 실패: storeId={}, analysisType={}, error={}",
storeId, analysisType, e.getMessage(), e);
}
}
@Override
public void publishActionPlanCreatedEvent(ActionPlan actionPlan) {
log.info("실행 계획 생성 이벤트 발행: planId={}, storeId={}", actionPlan.getId(), actionPlan.getStoreId());
@ -49,4 +66,25 @@ public class EventAdapter implements EventPort {
return actionPlan;
}
}
/**
* 분석 완료 이벤트 클래스
*/
public static class AnalysisCompletedEvent {
private final Long storeId;
private final AnalysisType analysisType;
public AnalysisCompletedEvent(Long storeId, AnalysisType analysisType) {
this.storeId = storeId;
this.analysisType = analysisType;
}
public Long getStoreId() {
return storeId;
}
public AnalysisType getAnalysisType() {
return analysisType;
}
}
}

View File

@ -1,6 +1,7 @@
package com.ktds.hi.analytics.infra.gateway;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
@ -9,13 +10,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.ktds.hi.analytics.biz.domain.ActionPlan;
import com.ktds.hi.analytics.biz.domain.AnalysisType;
import com.ktds.hi.analytics.biz.usecase.out.EventPort;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
@ -71,7 +73,10 @@ public class EventHubAdapter implements EventPort {
String jsonData = objectMapper.writeValueAsString(eventData);
EventData event = new EventData(jsonData);
aiAnalysisEventProducer.send(event);
EventDataBatch batch = aiAnalysisEventProducer.createBatch();
if (batch.tryAdd(event)) {
aiAnalysisEventProducer.send(batch); // EventDataBatch로 전송
}
log.info("분석 완료 이벤트 발행: storeId={}, type={}", storeId, analysisType);
} catch (Exception e) {
@ -92,7 +97,12 @@ public class EventHubAdapter implements EventPort {
String jsonData = objectMapper.writeValueAsString(eventData);
EventData event = new EventData(jsonData);
aiAnalysisEventProducer.send(event);
// 올바른 사용법
EventDataBatch batch = aiAnalysisEventProducer.createBatch();
if (batch.tryAdd(event)) {
aiAnalysisEventProducer.send(batch); // EventDataBatch로 전송
}
log.info("실행계획 생성 이벤트 발행: planId={}, storeId={}",
actionPlan.getId(), actionPlan.getStoreId());
@ -108,9 +118,16 @@ public class EventHubAdapter implements EventPort {
log.info("리뷰 이벤트 수신 시작");
try {
reviewEventConsumer.receiveFromPartition("0", EventPosition.earliest())
.timeout(Duration.ofSeconds(30))
.subscribe(this::handleReviewEvent);
Iterable<PartitionEvent> events = reviewEventConsumer.receiveFromPartition(
"0", // 파티션 ID
100, // 최대 이벤트
EventPosition.earliest(), // 시작 위치
Duration.ofSeconds(30) // 타임아웃
);
for (PartitionEvent partitionEvent : events) {
handleReviewEvent(partitionEvent);
}
} catch (Exception e) {
log.error("리뷰 이벤트 수신 중 오류 발생", e);