From 31fb1c541b19b8a8e3a4005fdec3d698acce08d7 Mon Sep 17 00:00:00 2001 From: Hyowon Yang Date: Fri, 24 Oct 2025 14:59:24 +0900 Subject: [PATCH] =?UTF-8?q?kafka=ED=99=9C=EC=84=B1=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ๐Ÿš€ Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .run/analytics-service.run.xml | 6 +- .../analytics/config/SampleDataLoader.java | 321 +++++++----------- .../DistributionCompletedConsumer.java | 46 ++- .../consumer/EventCreatedConsumer.java | 37 +- .../ParticipantRegisteredConsumer.java | 52 ++- .../src/main/resources/application.yml | 6 + 6 files changed, 243 insertions(+), 225 deletions(-) diff --git a/.run/analytics-service.run.xml b/.run/analytics-service.run.xml index b0a6a3f..ade144d 100644 --- a/.run/analytics-service.run.xml +++ b/.run/analytics-service.run.xml @@ -18,10 +18,14 @@ - + + + + + diff --git a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java index 6a13695..f3c6571 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java @@ -1,38 +1,50 @@ package com.kt.event.analytics.config; -import com.kt.event.analytics.entity.ChannelStats; -import com.kt.event.analytics.entity.EventStats; -import com.kt.event.analytics.entity.TimelineData; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.kt.event.analytics.messaging.event.DistributionCompletedEvent; +import com.kt.event.analytics.messaging.event.EventCreatedEvent; +import com.kt.event.analytics.messaging.event.ParticipantRegisteredEvent; import com.kt.event.analytics.repository.ChannelStatsRepository; import com.kt.event.analytics.repository.EventStatsRepository; import com.kt.event.analytics.repository.TimelineDataRepository; +import jakarta.annotation.PreDestroy; import jakarta.persistence.EntityManager; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; -import org.springframework.context.annotation.Profile; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import jakarta.annotation.PreDestroy; import java.math.BigDecimal; -import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; import java.util.Random; +import java.util.UUID; /** - * ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ๋กœ๋” + * ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ๋กœ๋” (Kafka Producer ๋ฐฉ์‹) * - * - ์„œ๋น„์Šค ์‹œ์ž‘ ์‹œ: PostgreSQL ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ž๋™ ์ƒ์„ฑ + * โš ๏ธ MVP ์ „์šฉ: ๋‹ค๋ฅธ ๋งˆ์ดํฌ๋กœ์„œ๋น„์Šค(Event, Participant, Distribution)๊ฐ€ + * ์—†๋Š” ํ™˜๊ฒฝ์—์„œ ํ•ด๋‹น ์„œ๋น„์Šค๋“ค์˜ ์—ญํ• ์„ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ํ•ฉ๋‹ˆ๋‹ค. + * + * โš ๏ธ ์‹ค์ œ ์šด์˜: Analytics Service๋Š” ์ˆœ์ˆ˜ Consumer ์—ญํ• ๋งŒ ์ˆ˜ํ–‰ํ•ด์•ผ ํ•˜๋ฉฐ, + * ์ด ํด๋ž˜์Šค๋Š” ๋น„ํ™œ์„ฑํ™”๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. + * โ†’ SAMPLE_DATA_ENABLED=false ์„ค์ • + * + * - ์„œ๋น„์Šค ์‹œ์ž‘ ์‹œ: Kafka ์ด๋ฒคํŠธ ๋ฐœํ–‰ํ•˜์—ฌ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ž๋™ ์ƒ์„ฑ * - ์„œ๋น„์Šค ์ข…๋ฃŒ ์‹œ: PostgreSQL ์ „์ฒด ๋ฐ์ดํ„ฐ ์‚ญ์ œ + * + * ํ™œ์„ฑํ™” ์กฐ๊ฑด: spring.sample-data.enabled=true (๊ธฐ๋ณธ๊ฐ’: true) */ @Slf4j @Component +@ConditionalOnProperty(name = "spring.sample-data.enabled", havingValue = "true", matchIfMissing = true) @RequiredArgsConstructor public class SampleDataLoader implements ApplicationRunner { + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; private final EventStatsRepository eventStatsRepository; private final ChannelStatsRepository channelStatsRepository; private final TimelineDataRepository timelineDataRepository; @@ -40,11 +52,16 @@ public class SampleDataLoader implements ApplicationRunner { private final Random random = new Random(); + // Kafka Topic Names + private static final String EVENT_CREATED_TOPIC = "event.created"; + private static final String PARTICIPANT_REGISTERED_TOPIC = "participant.registered"; + private static final String DISTRIBUTION_COMPLETED_TOPIC = "distribution.completed"; + @Override @Transactional public void run(ApplicationArguments args) { log.info("========================================"); - log.info("๐Ÿš€ ์„œ๋น„์Šค ์‹œ์ž‘: PostgreSQL ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ"); + log.info("๐Ÿš€ ์„œ๋น„์Šค ์‹œ์ž‘: Kafka ์ด๋ฒคํŠธ ๋ฐœํ–‰ํ•˜์—ฌ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ"); log.info("========================================"); // ํ•ญ์ƒ ๊ธฐ์กด ๋ฐ์ดํ„ฐ ์‚ญ์ œ ํ›„ ์ƒˆ๋กœ ์ƒ์„ฑ @@ -63,30 +80,28 @@ public class SampleDataLoader implements ApplicationRunner { } try { - // 1. ์ด๋ฒคํŠธ ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ - List eventStatsList = createEventStats(); - eventStatsRepository.saveAll(eventStatsList); - log.info("โœ… ์ด๋ฒคํŠธ ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ ์ ์žฌ ์™„๋ฃŒ: {} ๊ฑด", eventStatsList.size()); + // 1. EventCreated ์ด๋ฒคํŠธ ๋ฐœํ–‰ (3๊ฐœ ์ด๋ฒคํŠธ) + publishEventCreatedEvents(); - // 2. ์ฑ„๋„๋ณ„ ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ - List channelStatsList = createChannelStats(eventStatsList); - channelStatsRepository.saveAll(channelStatsList); - log.info("โœ… ์ฑ„๋„๋ณ„ ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ ์ ์žฌ ์™„๋ฃŒ: {} ๊ฑด", channelStatsList.size()); + // 2. DistributionCompleted ์ด๋ฒคํŠธ ๋ฐœํ–‰ (๊ฐ ์ด๋ฒคํŠธ๋‹น 4๊ฐœ ์ฑ„๋„) + publishDistributionCompletedEvents(); - // 3. ํƒ€์ž„๋ผ์ธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ - List timelineDataList = createTimelineData(eventStatsList); - timelineDataRepository.saveAll(timelineDataList); - log.info("โœ… ํƒ€์ž„๋ผ์ธ ๋ฐ์ดํ„ฐ ์ ์žฌ ์™„๋ฃŒ: {} ๊ฑด", timelineDataList.size()); + // 3. ParticipantRegistered ์ด๋ฒคํŠธ ๋ฐœํ–‰ (๊ฐ ์ด๋ฒคํŠธ๋‹น ๋‹ค์ˆ˜ ์ฐธ์—ฌ์ž) + publishParticipantRegisteredEvents(); log.info("========================================"); - log.info("๐ŸŽ‰ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ ์žฌ ์™„๋ฃŒ!"); + log.info("๐ŸŽ‰ Kafka ์ด๋ฒคํŠธ ๋ฐœํ–‰ ์™„๋ฃŒ! (Consumer๊ฐ€ ์ฒ˜๋ฆฌ ์ค‘...)"); log.info("========================================"); - log.info("ํ…Œ์ŠคํŠธ ๊ฐ€๋Šฅํ•œ ์ด๋ฒคํŠธ:"); - eventStatsList.forEach(event -> - log.info(" - {} (ID: {})", event.getEventTitle(), event.getEventId()) - ); + log.info("๋ฐœํ–‰๋œ ์ด๋ฒคํŠธ:"); + log.info(" - EventCreated: 3๊ฑด"); + log.info(" - DistributionCompleted: 12๊ฑด (3 ์ด๋ฒคํŠธ ร— 4 ์ฑ„๋„)"); + log.info(" - ParticipantRegistered: ์•ฝ 27,610๊ฑด"); log.info("========================================"); + // Consumer ์ฒ˜๋ฆฌ ๋Œ€๊ธฐ (3์ดˆ) + log.info("โณ Consumer ์ฒ˜๋ฆฌ ๋Œ€๊ธฐ ์ค‘... (3์ดˆ)"); + Thread.sleep(3000); + } catch (Exception e) { log.error("์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ ์žฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ", e); } @@ -127,232 +142,136 @@ public class SampleDataLoader implements ApplicationRunner { } /** - * ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ + * EventCreated ์ด๋ฒคํŠธ ๋ฐœํ–‰ */ - private List createEventStats() { - List eventStatsList = new ArrayList<>(); - + private void publishEventCreatedEvents() throws Exception { // ์ด๋ฒคํŠธ 1: ์‹ ๋…„๋งž์ด ํ• ์ธ ์ด๋ฒคํŠธ (์ง„ํ–‰์ค‘, ๋†’์€ ์„ฑ๊ณผ) - BigDecimal event1Investment = new BigDecimal("5000000"); - BigDecimal event1Revenue = new BigDecimal("14025000"); - eventStatsList.add(EventStats.builder() + EventCreatedEvent event1 = EventCreatedEvent.builder() .eventId("evt_2025012301") .eventTitle("์‹ ๋…„๋งž์ด 20% ํ• ์ธ ์ด๋ฒคํŠธ") .storeId("store_001") - .totalParticipants(15420) - .estimatedRoi(new BigDecimal("280.5")) - .salesGrowthRate(new BigDecimal("35.8")) - .totalInvestment(event1Investment) - .expectedRevenue(event1Revenue) + .totalInvestment(new BigDecimal("5000000")) .status("ACTIVE") - .build()); + .build(); + publishEvent(EVENT_CREATED_TOPIC, event1); // ์ด๋ฒคํŠธ 2: ์„ค๋‚  ํŠน๊ฐ€ ์ด๋ฒคํŠธ (์ง„ํ–‰์ค‘, ์ค‘๊ฐ„ ์„ฑ๊ณผ) - BigDecimal event2Investment = new BigDecimal("3500000"); - BigDecimal event2Revenue = new BigDecimal("6485500"); - eventStatsList.add(EventStats.builder() + EventCreatedEvent event2 = EventCreatedEvent.builder() .eventId("evt_2025020101") .eventTitle("์„ค๋‚  ํŠน๊ฐ€ ์„ ๋ฌผ์„ธํŠธ ์ด๋ฒคํŠธ") .storeId("store_001") - .totalParticipants(8950) - .estimatedRoi(new BigDecimal("185.3")) - .salesGrowthRate(new BigDecimal("22.4")) - .totalInvestment(event2Investment) - .expectedRevenue(event2Revenue) + .totalInvestment(new BigDecimal("3500000")) .status("ACTIVE") - .build()); + .build(); + publishEvent(EVENT_CREATED_TOPIC, event2); // ์ด๋ฒคํŠธ 3: ๊ฒจ์šธ ์‹ ๋ฉ”๋‰ด ๋Ÿฐ์นญ ์ด๋ฒคํŠธ (์ข…๋ฃŒ, ์ €์กฐํ•œ ์„ฑ๊ณผ) - BigDecimal event3Investment = new BigDecimal("2000000"); - BigDecimal event3Revenue = new BigDecimal("1910000"); - eventStatsList.add(EventStats.builder() + EventCreatedEvent event3 = EventCreatedEvent.builder() .eventId("evt_2025011501") .eventTitle("๊ฒจ์šธ ์‹ ๋ฉ”๋‰ด ๋Ÿฐ์นญ ์ด๋ฒคํŠธ") .storeId("store_001") - .totalParticipants(3240) - .estimatedRoi(new BigDecimal("95.5")) - .salesGrowthRate(new BigDecimal("8.2")) - .totalInvestment(event3Investment) - .expectedRevenue(event3Revenue) + .totalInvestment(new BigDecimal("2000000")) .status("COMPLETED") - .build()); + .build(); + publishEvent(EVENT_CREATED_TOPIC, event3); - return eventStatsList; + log.info("โœ… EventCreated ์ด๋ฒคํŠธ 3๊ฑด ๋ฐœํ–‰ ์™„๋ฃŒ"); } /** - * ์ฑ„๋„๋ณ„ ํ†ต๊ณ„ ์ƒ˜ํ”Œ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ + * DistributionCompleted ์ด๋ฒคํŠธ ๋ฐœํ–‰ */ - private List createChannelStats(List eventStatsList) { - List channelStatsList = new ArrayList<>(); + private void publishDistributionCompletedEvents() throws Exception { + String[] eventIds = {"evt_2025012301", "evt_2025020101", "evt_2025011501"}; + BigDecimal[] investments = { + new BigDecimal("5000000"), + new BigDecimal("3500000"), + new BigDecimal("2000000") + }; - for (EventStats eventStats : eventStatsList) { - String eventId = eventStats.getEventId(); - int totalParticipants = eventStats.getTotalParticipants(); - BigDecimal totalInvestment = eventStats.getTotalInvestment(); + for (int i = 0; i < eventIds.length; i++) { + String eventId = eventIds[i]; + BigDecimal distributionBudget = investments[i].multiply(new BigDecimal("0.5")); - // ์ฑ„๋„๋ณ„ ๋ฐฐํฌ ๋น„์œจ (์šฐ๋ฆฌ๋™๋„คTV: 30%, ์ง€๋‹ˆTV: 30%, ๋ง๊ณ ๋น„์ฆˆ: 20%, SNS: 20%) - BigDecimal distributionBudget = totalInvestment.multiply(new BigDecimal("0.5")); + // 1. ์šฐ๋ฆฌ๋™๋„คTV (TV) + publishDistributionEvent(eventId, "์šฐ๋ฆฌ๋™๋„คTV", "TV", + distributionBudget.multiply(new BigDecimal("0.3"))); - // 1. ์šฐ๋ฆฌ๋™๋„คTV (์กฐํšŒ์ˆ˜ ๋งŽ์Œ, ์ฐธ์—ฌ์œจ ์ค‘๊ฐ„) - channelStatsList.add(createChannelStats( - eventId, - "์šฐ๋ฆฌ๋™๋„คTV", - (int) (totalParticipants * 0.35), // ์ฐธ์—ฌ์ž: 35% - distributionBudget.multiply(new BigDecimal("0.3")), // ๋น„์šฉ: 30% - 1.8 // ์กฐํšŒ์ˆ˜ ๋Œ€๋น„ ์ฐธ์—ฌ์ž ๋น„์œจ - )); + // 2. ์ง€๋‹ˆTV (TV) + publishDistributionEvent(eventId, "์ง€๋‹ˆTV", "TV", + distributionBudget.multiply(new BigDecimal("0.3"))); - // 2. ์ง€๋‹ˆTV (์กฐํšŒ์ˆ˜ ์ค‘๊ฐ„, ์ฐธ์—ฌ์œจ ๋†’์Œ) - channelStatsList.add(createChannelStats( - eventId, - "์ง€๋‹ˆTV", - (int) (totalParticipants * 0.30), // ์ฐธ์—ฌ์ž: 30% - distributionBudget.multiply(new BigDecimal("0.3")), // ๋น„์šฉ: 30% - 2.2 // ์กฐํšŒ์ˆ˜ ๋Œ€๋น„ ์ฐธ์—ฌ์ž ๋น„์œจ - )); + // 3. ๋ง๊ณ ๋น„์ฆˆ (CALL) + publishDistributionEvent(eventId, "๋ง๊ณ ๋น„์ฆˆ", "CALL", + distributionBudget.multiply(new BigDecimal("0.2"))); - // 3. ๋ง๊ณ ๋น„์ฆˆ (ํ†ตํ™” ๊ธฐ๋ฐ˜, ๋†’์€ ์ „ํ™˜์œจ) - channelStatsList.add(createChannelStats( - eventId, - "๋ง๊ณ ๋น„์ฆˆ", - (int) (totalParticipants * 0.20), // ์ฐธ์—ฌ์ž: 20% - distributionBudget.multiply(new BigDecimal("0.2")), // ๋น„์šฉ: 20% - 3.5 // ์กฐํšŒ์ˆ˜ ๋Œ€๋น„ ์ฐธ์—ฌ์ž ๋น„์œจ (๋†’์€ ์ „ํ™˜์œจ) - )); - - // 4. SNS (๋ฐ”์ด๋Ÿด ํšจ๊ณผ, ๋†’์€ ๋„๋‹ฌ๋ฅ ) - channelStatsList.add(createChannelStats( - eventId, - "SNS", - (int) (totalParticipants * 0.15), // ์ฐธ์—ฌ์ž: 15% - distributionBudget.multiply(new BigDecimal("0.2")), // ๋น„์šฉ: 20% - 1.5 // ์กฐํšŒ์ˆ˜ ๋Œ€๋น„ ์ฐธ์—ฌ์ž ๋น„์œจ - )); + // 4. SNS (SNS) + publishDistributionEvent(eventId, "SNS", "SNS", + distributionBudget.multiply(new BigDecimal("0.2"))); } - return channelStatsList; + log.info("โœ… DistributionCompleted ์ด๋ฒคํŠธ 12๊ฑด ๋ฐœํ–‰ ์™„๋ฃŒ (3 ์ด๋ฒคํŠธ ร— 4 ์ฑ„๋„)"); } /** - * ์ฑ„๋„ ํ†ต๊ณ„ ์ƒ์„ฑ ํ—ฌํผ ๋ฉ”์„œ๋“œ + * ๊ฐœ๋ณ„ DistributionCompleted ์ด๋ฒคํŠธ ๋ฐœํ–‰ */ - private ChannelStats createChannelStats( - String eventId, - String channelName, - int participants, - BigDecimal distributionCost, - double conversionMultiplier - ) { - int views = (int) (participants * (8 + random.nextDouble() * 4)); // 8~12๋ฐฐ - int clicks = (int) (views * (0.15 + random.nextDouble() * 0.10)); // 15~25% - int conversions = (int) (participants * (0.3 + random.nextDouble() * 0.2)); // 30~50% - int impressions = (int) (views * (1.5 + random.nextDouble() * 1.0)); // 1.5~2.5๋ฐฐ - - ChannelStats.ChannelStatsBuilder builder = ChannelStats.builder() + private void publishDistributionEvent(String eventId, String channelName, String channelType, + BigDecimal distributionCost) throws Exception { + DistributionCompletedEvent event = DistributionCompletedEvent.builder() .eventId(eventId) .channelName(channelName) - .views(views) - .clicks(clicks) - .participants(participants) - .conversions(conversions) - .impressions(impressions) - .distributionCost(distributionCost); - - // ์ฑ„๋„๋ณ„ ํŠนํ™” ์ง€ํ‘œ ์ถ”๊ฐ€ - if ("SNS".equals(channelName)) { - // SNS๋Š” ์ข‹์•„์š”, ๋Œ“๊ธ€, ๊ณต์œ  ๋งŽ์Œ - builder.likes((int) (participants * (2.0 + random.nextDouble()))) - .comments((int) (participants * (0.5 + random.nextDouble() * 0.3))) - .shares((int) (participants * (0.8 + random.nextDouble() * 0.4))) - .totalCalls(0) - .completedCalls(0) - .averageDuration(0); - } else if ("๋ง๊ณ ๋น„์ฆˆ".equals(channelName)) { - // ๋ง๊ณ ๋น„์ฆˆ๋Š” ํ†ตํ™” ์ค‘์‹ฌ - int totalCalls = (int) (participants * (2.5 + random.nextDouble() * 0.5)); - int completedCalls = (int) (totalCalls * (0.7 + random.nextDouble() * 0.2)); - builder.likes(0) - .comments(0) - .shares(0) - .totalCalls(totalCalls) - .completedCalls(completedCalls) - .averageDuration((int) (120 + random.nextDouble() * 180)); // 120~300์ดˆ - } else { - // TV ์ฑ„๋„์€ SNS ๋ฐ˜์‘ ์ ์Œ - builder.likes((int) (participants * (0.3 + random.nextDouble() * 0.2))) - .comments((int) (participants * (0.05 + random.nextDouble() * 0.05))) - .shares((int) (participants * (0.08 + random.nextDouble() * 0.07))) - .totalCalls(0) - .completedCalls(0) - .averageDuration(0); - } - - return builder.build(); + .channelType(channelType) + .distributionCost(distributionCost) + .build(); + publishEvent(DISTRIBUTION_COMPLETED_TOPIC, event); } /** - * ํƒ€์ž„๋ผ์ธ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ + * ParticipantRegistered ์ด๋ฒคํŠธ ๋ฐœํ–‰ */ - private List createTimelineData(List eventStatsList) { - List timelineDataList = new ArrayList<>(); + private void publishParticipantRegisteredEvents() throws Exception { + String[] eventIds = {"evt_2025012301", "evt_2025020101", "evt_2025011501"}; + int[] totalParticipants = {15420, 8950, 3240}; + String[] channels = {"์šฐ๋ฆฌ๋™๋„คTV", "์ง€๋‹ˆTV", "๋ง๊ณ ๋น„์ฆˆ", "SNS"}; - for (EventStats eventStats : eventStatsList) { - String eventId = eventStats.getEventId(); - int totalParticipants = eventStats.getTotalParticipants(); + int totalPublished = 0; - // ์ง€๋‚œ 30์ผ๊ฐ„์˜ ์‹œ๊ฐ„๋ณ„ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ - LocalDateTime now = LocalDateTime.now(); - LocalDateTime startTime = now.minusDays(30); + for (int i = 0; i < eventIds.length; i++) { + String eventId = eventIds[i]; + int participants = totalParticipants[i]; - int cumulativeCount = 0; + // ๊ฐ ์ด๋ฒคํŠธ์— ๋Œ€ํ•ด ์ฐธ์—ฌ์ž ์ˆ˜๋งŒํผ ParticipantRegistered ์ด๋ฒคํŠธ ๋ฐœํ–‰ + for (int j = 0; j < participants; j++) { + String participantId = UUID.randomUUID().toString(); + String channel = channels[j % channels.length]; // ์ฑ„๋„ ์ˆœํ™˜ ๋ฐฐ์ • - // ์ผ๋ณ„ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ (30์ผ) - for (int day = 0; day < 30; day++) { - LocalDateTime dayStart = startTime.plusDays(day); + ParticipantRegisteredEvent event = ParticipantRegisteredEvent.builder() + .eventId(eventId) + .participantId(participantId) + .channel(channel) + .build(); - // ํ•˜๋ฃจ๋ฅผ 6๊ฐœ ์‹œ๊ฐ„๋Œ€๋กœ ๋ถ„ํ•  (4์‹œ๊ฐ„ ๋‹จ์œ„) - for (int hour = 0; hour < 24; hour += 4) { - LocalDateTime timestamp = dayStart.plusHours(hour); + publishEvent(PARTICIPANT_REGISTERED_TOPIC, event); + totalPublished++; - // ์‹œ๊ฐ„๋Œ€๋ณ„ ์ฐธ์—ฌ์ž ์ˆ˜ (์ ์ง„์  ์ฆ๊ฐ€ + ์‹œ๊ฐ„๋Œ€๋ณ„ ๋ณ€๋™) - int baseCount = (int) (totalParticipants * (day / 30.0) / 6); // ์ผ๋ณ„ ์ฆ๊ฐ€ - int timeMultiplier = getTimeMultiplier(hour); // ์‹œ๊ฐ„๋Œ€๋ณ„ ๊ฐ€์ค‘์น˜ - int participantCount = (int) (baseCount * timeMultiplier * (0.8 + random.nextDouble() * 0.4)); - - cumulativeCount += participantCount; - - timelineDataList.add(TimelineData.builder() - .eventId(eventId) - .timestamp(timestamp) - .participants(participantCount) - .views((int) (participantCount * (8 + random.nextDouble() * 4))) - .engagement((int) (participantCount * (1.5 + random.nextDouble() * 0.5))) - .conversions((int) (participantCount * (0.3 + random.nextDouble() * 0.2))) - .cumulativeParticipants(Math.min(cumulativeCount, totalParticipants)) - .build()); + // 1000๋ช…๋งˆ๋‹ค ๋กœ๊ทธ ์ถœ๋ ฅ ๋ฐ ์งง์€ ๋Œ€๊ธฐ (Kafka ๋ถ€ํ•˜ ๋ฐฉ์ง€) + if (totalPublished % 1000 == 0) { + log.info(" โณ ParticipantRegistered ๋ฐœํ–‰ ์ง„ํ–‰ ์ค‘... ({}/{})", totalPublished, + totalParticipants[0] + totalParticipants[1] + totalParticipants[2]); + Thread.sleep(100); // 0.1์ดˆ ๋Œ€๊ธฐ } } } - return timelineDataList; + log.info("โœ… ParticipantRegistered ์ด๋ฒคํŠธ {}๊ฑด ๋ฐœํ–‰ ์™„๋ฃŒ", totalPublished); } /** - * ์‹œ๊ฐ„๋Œ€๋ณ„ ๊ฐ€์ค‘์น˜ ๋ฐ˜ํ™˜ - * - * @param hour ์‹œ๊ฐ„ (0~23) - * @return ๊ฐ€์ค‘์น˜ (0.5~2.0) + * Kafka ์ด๋ฒคํŠธ ๋ฐœํ–‰ ๊ณตํ†ต ๋ฉ”์„œ๋“œ */ - private int getTimeMultiplier(int hour) { - if (hour >= 0 && hour < 6) { - return 1; // ์ƒˆ๋ฒฝ: ๋‚ฎ์Œ - } else if (hour >= 6 && hour < 12) { - return 2; // ์•„์นจ: ๋†’์Œ - } else if (hour >= 12 && hour < 18) { - return 3; // ์ ์‹ฌ~์˜คํ›„: ๊ฐ€์žฅ ๋†’์Œ - } else { - return 2; // ์ €๋…: ๋†’์Œ - } + private void publishEvent(String topic, Object event) throws Exception { + String jsonMessage = objectMapper.writeValueAsString(event); + kafkaTemplate.send(topic, jsonMessage); } } diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java index 7f0192a..eef502a 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java @@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * ๋ฐฐํฌ ์™„๋ฃŒ Consumer * @@ -23,6 +26,11 @@ public class DistributionCompletedConsumer { private final ChannelStatsRepository channelStatsRepository; private final ObjectMapper objectMapper; + private final RedisTemplate redisTemplate; + + private static final String PROCESSED_DISTRIBUTIONS_KEY = "distribution_completed"; + private static final String CACHE_KEY_PREFIX = "analytics:dashboard:"; + private static final long IDEMPOTENCY_TTL_DAYS = 7; /** * DistributionCompleted ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ @@ -30,26 +38,48 @@ public class DistributionCompletedConsumer { @KafkaListener(topics = "distribution.completed", groupId = "analytics-service") public void handleDistributionCompleted(String message) { try { - log.info("DistributionCompleted ์ด๋ฒคํŠธ ์ˆ˜์‹ : {}", message); + log.info("๐Ÿ“ฉ DistributionCompleted ์ด๋ฒคํŠธ ์ˆ˜์‹ : {}", message); DistributionCompletedEvent event = objectMapper.readValue(message, DistributionCompletedEvent.class); + String eventId = event.getEventId(); + String channelName = event.getChannelName(); - // ์ฑ„๋„ ํ†ต๊ณ„ ์ƒ์„ฑ ๋˜๋Š” ์—…๋ฐ์ดํŠธ + // ๋ฉฑ๋“ฑ์„ฑ ํ‚ค: eventId + channelName ์กฐํ•ฉ + String distributionKey = eventId + ":" + channelName; + + // โœ… 1. ๋ฉฑ๋“ฑ์„ฑ ์ฒดํฌ (์ค‘๋ณต ์ฒ˜๋ฆฌ ๋ฐฉ์ง€) + Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_DISTRIBUTIONS_KEY, distributionKey); + if (Boolean.TRUE.equals(isProcessed)) { + log.warn("โš ๏ธ ์ค‘๋ณต ์ด๋ฒคํŠธ ์Šคํ‚ต (์ด๋ฏธ ์ฒ˜๋ฆฌ๋จ): eventId={}, channel={}", eventId, channelName); + return; + } + + // 2. ์ฑ„๋„ ํ†ต๊ณ„ ์ƒ์„ฑ ๋˜๋Š” ์—…๋ฐ์ดํŠธ ChannelStats channelStats = channelStatsRepository - .findByEventIdAndChannelName(event.getEventId(), event.getChannelName()) + .findByEventIdAndChannelName(eventId, channelName) .orElse(ChannelStats.builder() - .eventId(event.getEventId()) - .channelName(event.getChannelName()) + .eventId(eventId) + .channelName(channelName) .channelType(event.getChannelType()) .build()); channelStats.setDistributionCost(event.getDistributionCost()); channelStatsRepository.save(channelStats); + log.info("โœ… ์ฑ„๋„ ํ†ต๊ณ„ ์—…๋ฐ์ดํŠธ: eventId={}, channel={}", eventId, channelName); + + // 3. ์บ์‹œ ๋ฌดํšจํ™” (๋‹ค์Œ ์กฐํšŒ ์‹œ ์ตœ์‹  ๋ฐฐํฌ ํ†ต๊ณ„ ๋ฐ˜์˜) + String cacheKey = CACHE_KEY_PREFIX + eventId; + redisTemplate.delete(cacheKey); + log.debug("๐Ÿ—‘๏ธ ์บ์‹œ ๋ฌดํšจํ™”: {}", cacheKey); + + // 4. ๋ฉฑ๋“ฑ์„ฑ ์ฒ˜๋ฆฌ ์™„๋ฃŒ ๊ธฐ๋ก (7์ผ TTL) + redisTemplate.opsForSet().add(PROCESSED_DISTRIBUTIONS_KEY, distributionKey); + redisTemplate.expire(PROCESSED_DISTRIBUTIONS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS); + log.debug("โœ… ๋ฉฑ๋“ฑ์„ฑ ๊ธฐ๋ก: distributionKey={}", distributionKey); - log.info("์ฑ„๋„ ํ†ต๊ณ„ ์—…๋ฐ์ดํŠธ: eventId={}, channel={}", - event.getEventId(), event.getChannelName()); } catch (Exception e) { - log.error("DistributionCompleted ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", e.getMessage(), e); + log.error("โŒ DistributionCompleted ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", e.getMessage(), e); + throw new RuntimeException("DistributionCompleted ์ฒ˜๋ฆฌ ์‹คํŒจ", e); } } } diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java index 1aa2ead..c548c44 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java @@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * ์ด๋ฒคํŠธ ์ƒ์„ฑ Consumer * @@ -23,6 +26,11 @@ public class EventCreatedConsumer { private final EventStatsRepository eventStatsRepository; private final ObjectMapper objectMapper; + private final RedisTemplate redisTemplate; + + private static final String PROCESSED_EVENTS_KEY = "processed_events"; + private static final String CACHE_KEY_PREFIX = "analytics:dashboard:"; + private static final long IDEMPOTENCY_TTL_DAYS = 7; /** * EventCreated ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ @@ -30,13 +38,21 @@ public class EventCreatedConsumer { @KafkaListener(topics = "event.created", groupId = "analytics-service") public void handleEventCreated(String message) { try { - log.info("EventCreated ์ด๋ฒคํŠธ ์ˆ˜์‹ : {}", message); + log.info("๐Ÿ“ฉ EventCreated ์ด๋ฒคํŠธ ์ˆ˜์‹ : {}", message); EventCreatedEvent event = objectMapper.readValue(message, EventCreatedEvent.class); + String eventId = event.getEventId(); - // ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์ดˆ๊ธฐํ™” + // โœ… 1. ๋ฉฑ๋“ฑ์„ฑ ์ฒดํฌ (์ค‘๋ณต ์ฒ˜๋ฆฌ ๋ฐฉ์ง€) + Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_EVENTS_KEY, eventId); + if (Boolean.TRUE.equals(isProcessed)) { + log.warn("โš ๏ธ ์ค‘๋ณต ์ด๋ฒคํŠธ ์Šคํ‚ต (์ด๋ฏธ ์ฒ˜๋ฆฌ๋จ): eventId={}", eventId); + return; + } + + // 2. ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์ดˆ๊ธฐํ™” EventStats eventStats = EventStats.builder() - .eventId(event.getEventId()) + .eventId(eventId) .eventTitle(event.getEventTitle()) .storeId(event.getStoreId()) .totalParticipants(0) @@ -45,10 +61,21 @@ public class EventCreatedConsumer { .build(); eventStatsRepository.save(eventStats); + log.info("โœ… ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์ดˆ๊ธฐํ™” ์™„๋ฃŒ: eventId={}", eventId); + + // 3. ์บ์‹œ ๋ฌดํšจํ™” (๋‹ค์Œ ์กฐํšŒ ์‹œ ์ตœ์‹  ๋ฐ์ดํ„ฐ ๋ฐ˜์˜) + String cacheKey = CACHE_KEY_PREFIX + eventId; + redisTemplate.delete(cacheKey); + log.debug("๐Ÿ—‘๏ธ ์บ์‹œ ๋ฌดํšจํ™”: {}", cacheKey); + + // 4. ๋ฉฑ๋“ฑ์„ฑ ์ฒ˜๋ฆฌ ์™„๋ฃŒ ๊ธฐ๋ก (7์ผ TTL) + redisTemplate.opsForSet().add(PROCESSED_EVENTS_KEY, eventId); + redisTemplate.expire(PROCESSED_EVENTS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS); + log.debug("โœ… ๋ฉฑ๋“ฑ์„ฑ ๊ธฐ๋ก: eventId={}", eventId); - log.info("์ด๋ฒคํŠธ ํ†ต๊ณ„ ์ดˆ๊ธฐํ™” ์™„๋ฃŒ: eventId={}", event.getEventId()); } catch (Exception e) { - log.error("EventCreated ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", e.getMessage(), e); + log.error("โŒ EventCreated ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", e.getMessage(), e); + throw new RuntimeException("EventCreated ์ฒ˜๋ฆฌ ์‹คํŒจ", e); } } } diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java index 9b25852..7914b0f 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java @@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import java.util.concurrent.TimeUnit; + /** * ์ฐธ์—ฌ์ž ๋“ฑ๋ก Consumer * @@ -23,6 +26,11 @@ public class ParticipantRegisteredConsumer { private final EventStatsRepository eventStatsRepository; private final ObjectMapper objectMapper; + private final RedisTemplate redisTemplate; + + private static final String PROCESSED_PARTICIPANTS_KEY = "processed_participants"; + private static final String CACHE_KEY_PREFIX = "analytics:dashboard:"; + private static final long IDEMPOTENCY_TTL_DAYS = 7; /** * ParticipantRegistered ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ @@ -30,20 +38,44 @@ public class ParticipantRegisteredConsumer { @KafkaListener(topics = "participant.registered", groupId = "analytics-service") public void handleParticipantRegistered(String message) { try { - log.info("ParticipantRegistered ์ด๋ฒคํŠธ ์ˆ˜์‹ : {}", message); + log.info("๐Ÿ“ฉ ParticipantRegistered ์ด๋ฒคํŠธ ์ˆ˜์‹ : {}", message); ParticipantRegisteredEvent event = objectMapper.readValue(message, ParticipantRegisteredEvent.class); + String participantId = event.getParticipantId(); + String eventId = event.getEventId(); + + // โœ… 1. ๋ฉฑ๋“ฑ์„ฑ ์ฒดํฌ (์ค‘๋ณต ์ฒ˜๋ฆฌ ๋ฐฉ์ง€) + Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_PARTICIPANTS_KEY, participantId); + if (Boolean.TRUE.equals(isProcessed)) { + log.warn("โš ๏ธ ์ค‘๋ณต ์ด๋ฒคํŠธ ์Šคํ‚ต (์ด๋ฏธ ์ฒ˜๋ฆฌ๋จ): participantId={}", participantId); + return; + } + + // 2. ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์—…๋ฐ์ดํŠธ (์ฐธ์—ฌ์ž ์ˆ˜ +1) + eventStatsRepository.findByEventId(eventId) + .ifPresentOrElse( + eventStats -> { + eventStats.incrementParticipants(); + eventStatsRepository.save(eventStats); + log.info("โœ… ์ฐธ์—ฌ์ž ์ˆ˜ ์—…๋ฐ์ดํŠธ: eventId={}, totalParticipants={}", + eventId, eventStats.getTotalParticipants()); + }, + () -> log.warn("โš ๏ธ ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์—†์Œ: eventId={}", eventId) + ); + + // 3. ์บ์‹œ ๋ฌดํšจํ™” (๋‹ค์Œ ์กฐํšŒ ์‹œ ์ตœ์‹  ์ฐธ์—ฌ์ž ์ˆ˜ ๋ฐ˜์˜) + String cacheKey = CACHE_KEY_PREFIX + eventId; + redisTemplate.delete(cacheKey); + log.debug("๐Ÿ—‘๏ธ ์บ์‹œ ๋ฌดํšจํ™”: {}", cacheKey); + + // 4. ๋ฉฑ๋“ฑ์„ฑ ์ฒ˜๋ฆฌ ์™„๋ฃŒ ๊ธฐ๋ก (7์ผ TTL) + redisTemplate.opsForSet().add(PROCESSED_PARTICIPANTS_KEY, participantId); + redisTemplate.expire(PROCESSED_PARTICIPANTS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS); + log.debug("โœ… ๋ฉฑ๋“ฑ์„ฑ ๊ธฐ๋ก: participantId={}", participantId); - // ์ด๋ฒคํŠธ ํ†ต๊ณ„ ์—…๋ฐ์ดํŠธ - eventStatsRepository.findByEventId(event.getEventId()) - .ifPresent(eventStats -> { - eventStats.incrementParticipants(); - eventStatsRepository.save(eventStats); - log.info("์ฐธ์—ฌ์ž ์ˆ˜ ์—…๋ฐ์ดํŠธ: eventId={}, totalParticipants={}", - event.getEventId(), eventStats.getTotalParticipants()); - }); } catch (Exception e) { - log.error("ParticipantRegistered ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", e.getMessage(), e); + log.error("โŒ ParticipantRegistered ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ ์‹คํŒจ: {}", e.getMessage(), e); + throw new RuntimeException("ParticipantRegistered ์ฒ˜๋ฆฌ ์‹คํŒจ", e); } } } diff --git a/analytics-service/src/main/resources/application.yml b/analytics-service/src/main/resources/application.yml index ed32f2b..f88bca1 100644 --- a/analytics-service/src/main/resources/application.yml +++ b/analytics-service/src/main/resources/application.yml @@ -56,6 +56,12 @@ spring: request.timeout.ms: 5000 session.timeout.ms: 10000 + # Sample Data (MVP Only) + # โš ๏ธ ์‹ค์ œ ์šด์˜: false๋กœ ์„ค์ • (๋‹ค๋ฅธ ์„œ๋น„์Šค๋“ค์ด ์ด๋ฒคํŠธ ๋ฐœํ–‰) + # โš ๏ธ MVP ํ™˜๊ฒฝ: true๋กœ ์„ค์ • (SampleDataLoader๊ฐ€ ์ด๋ฒคํŠธ ๋ฐœํ–‰) + sample-data: + enabled: ${SAMPLE_DATA_ENABLED:true} + # Server server: port: ${SERVER_PORT:8086}