From 1396b9a2b91f78afa2a92f8cb13b267bd4319c44 Mon Sep 17 00:00:00 2001 From: youbeen Date: Wed, 18 Jun 2025 10:41:24 +0900 Subject: [PATCH] external update --- .../ExternalReviewEventHubAdapter.java | 100 +++++++++++++++++- 1 file changed, 99 insertions(+), 1 deletion(-) diff --git a/review/src/main/java/com/ktds/hi/review/infra/gateway/ExternalReviewEventHubAdapter.java b/review/src/main/java/com/ktds/hi/review/infra/gateway/ExternalReviewEventHubAdapter.java index 952769d..dc3156c 100644 --- a/review/src/main/java/com/ktds/hi/review/infra/gateway/ExternalReviewEventHubAdapter.java +++ b/review/src/main/java/com/ktds/hi/review/infra/gateway/ExternalReviewEventHubAdapter.java @@ -21,7 +21,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - +import java.util.HashSet; +import java.util.Set; /** * Azure Event Hub 어댑터 클래스 (단순화) * 외부 리뷰 이벤트 수신 및 Review 테이블 저장 @@ -34,6 +35,8 @@ public class ExternalReviewEventHubAdapter { @Qualifier("externalReviewEventConsumer") private final EventHubConsumerClient externalReviewEventConsumer; + private final Set processedEventIds = new HashSet<>(); + private final ObjectMapper objectMapper; private final ReviewRepository reviewRepository; @@ -60,6 +63,7 @@ public class ExternalReviewEventHubAdapter { /** * 외부 리뷰 이벤트 수신 처리 */ + /** private void listenToExternalReviewEvents() { log.info("외부 리뷰 이벤트 수신 시작"); @@ -85,7 +89,37 @@ public class ExternalReviewEventHubAdapter { log.error("외부 리뷰 이벤트 수신 중 오류 발생", e); } } + */ + //수정 코드 + /** + * 외부 리뷰 이벤트 수신 처리 + */ + private void listenToExternalReviewEvents() { + log.info("외부 리뷰 이벤트 수신 시작"); + + try { + while (isRunning) { + Iterable events = externalReviewEventConsumer.receiveFromPartition( + "4", // 파티션 ID + 100, // 최대 이벤트 수 + EventPosition.latest(), // 🔄 latest()로 변경하여 새 데이터만 읽기 + Duration.ofSeconds(30) // 타임아웃 + ); + + for (PartitionEvent partitionEvent : events) { + handleExternalReviewEventSafely(partitionEvent); + } + + Thread.sleep(1000); + } + } catch (InterruptedException e) { + log.info("외부 리뷰 이벤트 수신 중단됨"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("외부 리뷰 이벤트 수신 중 오류 발생", e); + } + } /** * 외부 리뷰 이벤트 처리 */ @@ -237,4 +271,68 @@ public class ExternalReviewEventHubAdapter { return content; } + + //추가 코드 + /** + * 외부 리뷰 이벤트 처리 (중복 방지) + */ + private void handleExternalReviewEventSafely(PartitionEvent partitionEvent) { + try { + EventData eventData = partitionEvent.getData(); + String eventBody = eventData.getBodyAsString(); + + // 🔥 이벤트 고유 ID 생성 (오프셋 + 시퀀스 넘버 기반) + String eventId = String.format("%s_%s", + eventData.getOffset(), + eventData.getSequenceNumber()); + + // 이미 처리된 이벤트인지 확인 + if (processedEventIds.contains(eventId)) { + log.debug("이미 처리된 이벤트 스킵: eventId={}", eventId); + return; + } + + Map event = objectMapper.readValue(eventBody, Map.class); + String eventType = (String) event.get("eventType"); + Long storeId = Long.valueOf(event.get("storeId").toString()); + + log.info("외부 리뷰 이벤트 수신: type={}, storeId={}, eventId={}", eventType, storeId, eventId); + + if ("EXTERNAL_REVIEW_SYNC".equals(eventType)) { + // 기존 메서드 호출하여 리뷰 저장 + handleExternalReviewSyncEvent(storeId, event); + + // 🔥 처리 완료된 이벤트 ID 저장 + markEventAsProcessed(eventId); + log.info("이벤트 처리 완료: eventId={}, storeId={}", eventId, storeId); + + } else { + log.warn("알 수 없는 외부 리뷰 이벤트 타입: {}", eventType); + } + + } catch (Exception e) { + log.error("외부 리뷰 이벤트 처리 중 오류 발생", e); + } + } + + /** + * 이벤트 처리 완료 표시 (메모리 관리 포함) + */ + private void markEventAsProcessed(String eventId) { + processedEventIds.add(eventId); + + // 🔥 메모리 관리: 1000개 이상 쌓이면 오래된 것들 삭제 + if (processedEventIds.size() > 1000) { + // 앞의 500개만 삭제하고 최근 500개는 유지 + Set recentIds = new HashSet<>(); + processedEventIds.stream() + .skip(500) + .forEach(recentIds::add); + + processedEventIds.clear(); + processedEventIds.addAll(recentIds); + + log.info("처리된 이벤트 ID 캐시 정리 완료: 현재 크기={}", processedEventIds.size()); + } + } } \ No newline at end of file