From f3901c8ef83ea1f9db4c94570fbfaf05eb104fad Mon Sep 17 00:00:00 2001 From: Hyowon Yang Date: Fri, 24 Oct 2025 16:37:05 +0900 Subject: [PATCH] =?UTF-8?q?kafka=20=EC=84=A4=EC=A0=95=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../analytics/config/SampleDataLoader.java | 73 ++++++++++-------- .../DistributionCompletedConsumer.java | 75 ++++++++++++------- .../event/DistributionCompletedEvent.java | 47 +++++++++--- .../src/main/resources/application.yml | 2 +- 4 files changed, 124 insertions(+), 73 deletions(-) diff --git a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java index fd16ea7..be27bb3 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java @@ -19,6 +19,8 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.UUID; @@ -94,7 +96,7 @@ public class SampleDataLoader implements ApplicationRunner { log.info("========================================"); log.info("발행된 이벤트:"); log.info(" - EventCreated: 3건"); - log.info(" - DistributionCompleted: 12건 (3 이벤트 × 4 채널)"); + log.info(" - DistributionCompleted: 3건 (각 이벤트당 4개 채널 배열)"); log.info(" - ParticipantRegistered: 180건 (MVP 테스트용)"); log.info("========================================"); @@ -179,15 +181,10 @@ public class SampleDataLoader implements ApplicationRunner { } /** - * DistributionCompleted 이벤트 발행 + * DistributionCompleted 이벤트 발행 (설계서 기준 - 이벤트당 1번 발행, 여러 채널 배열) */ private void publishDistributionCompletedEvents() throws Exception { String[] eventIds = {"evt_2025012301", "evt_2025020101", "evt_2025011501"}; - BigDecimal[] investments = { - new BigDecimal("5000000"), - new BigDecimal("3500000"), - new BigDecimal("2000000") - }; int[][] expectedViews = { {5000, 10000, 3000, 2000}, // 이벤트1: 우리동네TV, 지니TV, 링고비즈, SNS {3500, 7000, 2000, 1500}, // 이벤트2 @@ -196,41 +193,53 @@ public class SampleDataLoader implements ApplicationRunner { for (int i = 0; i < eventIds.length; i++) { String eventId = eventIds[i]; - BigDecimal distributionBudget = investments[i].multiply(new BigDecimal("0.5")); + + // 4개 채널을 배열로 구성 + List channels = new ArrayList<>(); // 1. 우리동네TV (TV) - publishDistributionEvent(eventId, "우리동네TV", "TV", - distributionBudget.multiply(new BigDecimal("0.3")), expectedViews[i][0]); + channels.add(DistributionCompletedEvent.ChannelDistribution.builder() + .channel("우리동네TV") + .channelType("TV") + .status("SUCCESS") + .expectedViews(expectedViews[i][0]) + .build()); // 2. 지니TV (TV) - publishDistributionEvent(eventId, "지니TV", "TV", - distributionBudget.multiply(new BigDecimal("0.3")), expectedViews[i][1]); + channels.add(DistributionCompletedEvent.ChannelDistribution.builder() + .channel("지니TV") + .channelType("TV") + .status("SUCCESS") + .expectedViews(expectedViews[i][1]) + .build()); // 3. 링고비즈 (CALL) - publishDistributionEvent(eventId, "링고비즈", "CALL", - distributionBudget.multiply(new BigDecimal("0.2")), expectedViews[i][2]); + channels.add(DistributionCompletedEvent.ChannelDistribution.builder() + .channel("링고비즈") + .channelType("CALL") + .status("SUCCESS") + .expectedViews(expectedViews[i][2]) + .build()); // 4. SNS (SNS) - publishDistributionEvent(eventId, "SNS", "SNS", - distributionBudget.multiply(new BigDecimal("0.2")), expectedViews[i][3]); + channels.add(DistributionCompletedEvent.ChannelDistribution.builder() + .channel("SNS") + .channelType("SNS") + .status("SUCCESS") + .expectedViews(expectedViews[i][3]) + .build()); + + // 이벤트 발행 (채널 배열 포함) + DistributionCompletedEvent event = DistributionCompletedEvent.builder() + .eventId(eventId) + .distributedChannels(channels) + .completedAt(java.time.LocalDateTime.now()) + .build(); + + publishEvent(DISTRIBUTION_COMPLETED_TOPIC, event); } - log.info("✅ DistributionCompleted 이벤트 12건 발행 완료 (3 이벤트 × 4 채널)"); - } - - /** - * 개별 DistributionCompleted 이벤트 발행 - */ - private void publishDistributionEvent(String eventId, String channelName, String channelType, - BigDecimal distributionCost, Integer expectedViews) throws Exception { - DistributionCompletedEvent event = DistributionCompletedEvent.builder() - .eventId(eventId) - .channelName(channelName) - .channelType(channelType) - .distributionCost(distributionCost) - .expectedViews(expectedViews) - .build(); - publishEvent(DISTRIBUTION_COMPLETED_TOPIC, event); + log.info("✅ DistributionCompleted 이벤트 3건 발행 완료 (3 이벤트 × 4 채널 배열)"); } /** diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java index 894a584..a7c2a41 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java @@ -36,7 +36,7 @@ public class DistributionCompletedConsumer { private static final long IDEMPOTENCY_TTL_DAYS = 7; /** - * DistributionCompleted 이벤트 처리 (MVP용 샘플 토픽) + * DistributionCompleted 이벤트 처리 (설계서 기준 - 여러 채널 배열) */ @KafkaListener(topics = "sample.distribution.completed", groupId = "analytics-service") public void handleDistributionCompleted(String message) { @@ -45,38 +45,26 @@ public class DistributionCompletedConsumer { DistributionCompletedEvent event = objectMapper.readValue(message, DistributionCompletedEvent.class); String eventId = event.getEventId(); - String channelName = event.getChannelName(); - // 멱등성 키: eventId + channelName 조합 - String distributionKey = eventId + ":" + channelName; - - // ✅ 1. 멱등성 체크 (중복 처리 방지) - Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_DISTRIBUTIONS_KEY, distributionKey); + // ✅ 1. 멱등성 체크 (중복 처리 방지) - eventId 기반 + Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_DISTRIBUTIONS_KEY, eventId); if (Boolean.TRUE.equals(isProcessed)) { - log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): eventId={}, channel={}", eventId, channelName); + log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): eventId={}", eventId); return; } - // 2. 채널 통계 생성 또는 업데이트 - ChannelStats channelStats = channelStatsRepository - .findByEventIdAndChannelName(eventId, channelName) - .orElse(ChannelStats.builder() - .eventId(eventId) - .channelName(channelName) - .channelType(event.getChannelType()) - .build()); + // 2. 채널 배열 루프 처리 (설계서: distributedChannels 배열) + if (event.getDistributedChannels() != null && !event.getDistributedChannels().isEmpty()) { + for (DistributionCompletedEvent.ChannelDistribution channel : event.getDistributedChannels()) { + processChannelStats(eventId, channel); + } - channelStats.setDistributionCost(event.getDistributionCost()); - - // 예상 노출 수 저장 - if (event.getExpectedViews() != null) { - channelStats.setImpressions(event.getExpectedViews()); + log.info("✅ 채널 통계 일괄 업데이트 완료: eventId={}, channelCount={}", + eventId, event.getDistributedChannels().size()); + } else { + log.warn("⚠️ 배포된 채널 없음: eventId={}", eventId); } - channelStatsRepository.save(channelStats); - log.info("✅ 채널 통계 업데이트: eventId={}, channel={}, expectedViews={}", - eventId, channelName, event.getExpectedViews()); - // 3. EventStats의 totalViews 업데이트 (모든 채널 노출 수 합계) updateTotalViews(eventId); @@ -85,10 +73,10 @@ public class DistributionCompletedConsumer { redisTemplate.delete(cacheKey); log.debug("🗑️ 캐시 무효화: {}", cacheKey); - // 5. 멱등성 처리 완료 기록 (7일 TTL) - redisTemplate.opsForSet().add(PROCESSED_DISTRIBUTIONS_KEY, distributionKey); + // 5. 멱등성 처리 완료 기록 (7일 TTL) - eventId 기반 + redisTemplate.opsForSet().add(PROCESSED_DISTRIBUTIONS_KEY, eventId); redisTemplate.expire(PROCESSED_DISTRIBUTIONS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS); - log.debug("✅ 멱등성 기록: distributionKey={}", distributionKey); + log.debug("✅ 멱등성 기록: eventId={}", eventId); } catch (Exception e) { log.error("❌ DistributionCompleted 이벤트 처리 실패: {}", e.getMessage(), e); @@ -96,6 +84,37 @@ public class DistributionCompletedConsumer { } } + /** + * 개별 채널 통계 처리 + */ + private void processChannelStats(String eventId, DistributionCompletedEvent.ChannelDistribution channel) { + try { + String channelName = channel.getChannel(); + + // 채널 통계 생성 또는 업데이트 + ChannelStats channelStats = channelStatsRepository + .findByEventIdAndChannelName(eventId, channelName) + .orElse(ChannelStats.builder() + .eventId(eventId) + .channelName(channelName) + .channelType(channel.getChannelType()) + .build()); + + // 예상 노출 수 저장 + if (channel.getExpectedViews() != null) { + channelStats.setImpressions(channel.getExpectedViews()); + } + + channelStatsRepository.save(channelStats); + + log.debug("✅ 채널 통계 저장: eventId={}, channel={}, expectedViews={}", + eventId, channelName, channel.getExpectedViews()); + + } catch (Exception e) { + log.error("❌ 채널 통계 처리 실패: eventId={}, channel={}", eventId, channel.getChannel(), e); + } + } + /** * 모든 채널의 예상 노출 수를 합산하여 EventStats.totalViews 업데이트 */ diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/event/DistributionCompletedEvent.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/event/DistributionCompletedEvent.java index e890918..0883697 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/event/DistributionCompletedEvent.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/event/DistributionCompletedEvent.java @@ -5,10 +5,13 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.List; /** - * 배포 완료 이벤트 + * 배포 완료 이벤트 (설계서 기준) + * + * Distribution Service가 한 이벤트의 모든 채널 배포 완료 시 발행 */ @Data @Builder @@ -22,22 +25,42 @@ public class DistributionCompletedEvent { private String eventId; /** - * 채널명 + * 배포된 채널 목록 (여러 채널을 배열로 포함) */ - private String channelName; + private List distributedChannels; /** - * 채널 유형 + * 배포 완료 시각 */ - private String channelType; + private LocalDateTime completedAt; /** - * 배포 비용 + * 개별 채널 배포 정보 */ - private BigDecimal distributionCost; + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class ChannelDistribution { - /** - * 예상 노출 수 - */ - private Integer expectedViews; + /** + * 채널명 (우리동네TV, 지니TV, 링고비즈, SNS) + */ + private String channel; + + /** + * 채널 유형 (TV, CALL, SNS) + */ + private String channelType; + + /** + * 배포 상태 (SUCCESS, FAILURE) + */ + private String status; + + /** + * 예상 노출 수 + */ + private Integer expectedViews; + } } diff --git a/analytics-service/src/main/resources/application.yml b/analytics-service/src/main/resources/application.yml index cb011cf..340313c 100644 --- a/analytics-service/src/main/resources/application.yml +++ b/analytics-service/src/main/resources/application.yml @@ -44,7 +44,7 @@ spring: # Kafka kafka: enabled: ${KAFKA_ENABLED:false} - bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:4.230.50.63:9092} + bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:4.217.131.59:9095} consumer: group-id: ${KAFKA_CONSUMER_GROUP_ID:analytics-service} auto-offset-reset: earliest