external update

This commit is contained in:
youbeen 2025-06-18 10:41:24 +09:00
parent 96bbc3d83c
commit 1396b9a2b9

View File

@ -21,7 +21,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.HashSet;
import java.util.Set;
/**
* Azure Event Hub 어댑터 클래스 (단순화)
* 외부 리뷰 이벤트 수신 Review 테이블 저장
@ -34,6 +35,8 @@ public class ExternalReviewEventHubAdapter {
@Qualifier("externalReviewEventConsumer")
private final EventHubConsumerClient externalReviewEventConsumer;
private final Set<String> processedEventIds = new HashSet<>();
private final ObjectMapper objectMapper;
private final ReviewRepository reviewRepository;
@ -60,6 +63,7 @@ public class ExternalReviewEventHubAdapter {
/**
* 외부 리뷰 이벤트 수신 처리
*/
/**
private void listenToExternalReviewEvents() {
log.info("외부 리뷰 이벤트 수신 시작");
@ -85,7 +89,37 @@ public class ExternalReviewEventHubAdapter {
log.error("외부 리뷰 이벤트 수신 중 오류 발생", e);
}
}
*/
//수정 코드
/**
* 외부 리뷰 이벤트 수신 처리
*/
private void listenToExternalReviewEvents() {
log.info("외부 리뷰 이벤트 수신 시작");
try {
while (isRunning) {
Iterable<PartitionEvent> events = externalReviewEventConsumer.receiveFromPartition(
"4", // 파티션 ID
100, // 최대 이벤트
EventPosition.latest(), // 🔄 latest() 변경하여 데이터만 읽기
Duration.ofSeconds(30) // 타임아웃
);
for (PartitionEvent partitionEvent : events) {
handleExternalReviewEventSafely(partitionEvent);
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
log.info("외부 리뷰 이벤트 수신 중단됨");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("외부 리뷰 이벤트 수신 중 오류 발생", e);
}
}
/**
* 외부 리뷰 이벤트 처리
*/
@ -237,4 +271,68 @@ public class ExternalReviewEventHubAdapter {
return content;
}
//추가 코드
/**
* 외부 리뷰 이벤트 처리 (중복 방지)
*/
private void handleExternalReviewEventSafely(PartitionEvent partitionEvent) {
try {
EventData eventData = partitionEvent.getData();
String eventBody = eventData.getBodyAsString();
// 🔥 이벤트 고유 ID 생성 (오프셋 + 시퀀스 넘버 기반)
String eventId = String.format("%s_%s",
eventData.getOffset(),
eventData.getSequenceNumber());
// 이미 처리된 이벤트인지 확인
if (processedEventIds.contains(eventId)) {
log.debug("이미 처리된 이벤트 스킵: eventId={}", eventId);
return;
}
Map<String, Object> event = objectMapper.readValue(eventBody, Map.class);
String eventType = (String) event.get("eventType");
Long storeId = Long.valueOf(event.get("storeId").toString());
log.info("외부 리뷰 이벤트 수신: type={}, storeId={}, eventId={}", eventType, storeId, eventId);
if ("EXTERNAL_REVIEW_SYNC".equals(eventType)) {
// 기존 메서드 호출하여 리뷰 저장
handleExternalReviewSyncEvent(storeId, event);
// 🔥 처리 완료된 이벤트 ID 저장
markEventAsProcessed(eventId);
log.info("이벤트 처리 완료: eventId={}, storeId={}", eventId, storeId);
} else {
log.warn("알 수 없는 외부 리뷰 이벤트 타입: {}", eventType);
}
} catch (Exception e) {
log.error("외부 리뷰 이벤트 처리 중 오류 발생", e);
}
}
/**
* 이벤트 처리 완료 표시 (메모리 관리 포함)
*/
private void markEventAsProcessed(String eventId) {
processedEventIds.add(eventId);
// 🔥 메모리 관리: 1000개 이상 쌓이면 오래된 것들 삭제
if (processedEventIds.size() > 1000) {
// 앞의 500개만 삭제하고 최근 500개는 유지
Set<String> recentIds = new HashSet<>();
processedEventIds.stream()
.skip(500)
.forEach(recentIds::add);
processedEventIds.clear();
processedEventIds.addAll(recentIds);
log.info("처리된 이벤트 ID 캐시 정리 완료: 현재 크기={}", processedEventIds.size());
}
}
}