kafka활성화

🚀 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Hyowon Yang 2025-10-24 14:59:24 +09:00
parent 21b8fe5efb
commit 31fb1c541b
6 changed files with 243 additions and 225 deletions

View File

@ -18,10 +18,14 @@
<entry key="REDIS_DATABASE" value="5" />
<!-- Kafka Settings -->
<entry key="KAFKA_ENABLED" value="false" />
<entry key="KAFKA_ENABLED" value="true" />
<entry key="KAFKA_BOOTSTRAP_SERVERS" value="4.230.50.63:9092" />
<entry key="KAFKA_CONSUMER_GROUP_ID" value="analytics-service" />
<!-- Sample Data Settings (MVP Only) -->
<!-- ⚠️ 실제 운영 환경에서는 false로 설정 (다른 서비스들이 이벤트 발행) -->
<entry key="SAMPLE_DATA_ENABLED" value="true" />
<!-- JPA Settings -->
<entry key="SHOW_SQL" value="true" />
<entry key="DDL_AUTO" value="update" />

View File

@ -1,38 +1,50 @@
package com.kt.event.analytics.config;
import com.kt.event.analytics.entity.ChannelStats;
import com.kt.event.analytics.entity.EventStats;
import com.kt.event.analytics.entity.TimelineData;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.analytics.messaging.event.DistributionCompletedEvent;
import com.kt.event.analytics.messaging.event.EventCreatedEvent;
import com.kt.event.analytics.messaging.event.ParticipantRegisteredEvent;
import com.kt.event.analytics.repository.ChannelStatsRepository;
import com.kt.event.analytics.repository.EventStatsRepository;
import com.kt.event.analytics.repository.TimelineDataRepository;
import jakarta.annotation.PreDestroy;
import jakarta.persistence.EntityManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Profile;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import jakarta.annotation.PreDestroy;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
/**
* 샘플 데이터 로더
* 샘플 데이터 로더 (Kafka Producer 방식)
*
* - 서비스 시작 : PostgreSQL 샘플 데이터 자동 생성
* MVP 전용: 다른 마이크로서비스(Event, Participant, Distribution)
* 없는 환경에서 해당 서비스들의 역할을 시뮬레이션합니다.
*
* 실제 운영: Analytics Service는 순수 Consumer 역할만 수행해야 하며,
* 클래스는 비활성화되어야 합니다.
* SAMPLE_DATA_ENABLED=false 설정
*
* - 서비스 시작 : Kafka 이벤트 발행하여 샘플 데이터 자동 생성
* - 서비스 종료 : PostgreSQL 전체 데이터 삭제
*
* 활성화 조건: spring.sample-data.enabled=true (기본값: true)
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "spring.sample-data.enabled", havingValue = "true", matchIfMissing = true)
@RequiredArgsConstructor
public class SampleDataLoader implements ApplicationRunner {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
private final EventStatsRepository eventStatsRepository;
private final ChannelStatsRepository channelStatsRepository;
private final TimelineDataRepository timelineDataRepository;
@ -40,11 +52,16 @@ public class SampleDataLoader implements ApplicationRunner {
private final Random random = new Random();
// Kafka Topic Names
private static final String EVENT_CREATED_TOPIC = "event.created";
private static final String PARTICIPANT_REGISTERED_TOPIC = "participant.registered";
private static final String DISTRIBUTION_COMPLETED_TOPIC = "distribution.completed";
@Override
@Transactional
public void run(ApplicationArguments args) {
log.info("========================================");
log.info("🚀 서비스 시작: PostgreSQL 샘플 데이터 생성");
log.info("🚀 서비스 시작: Kafka 이벤트 발행하여 샘플 데이터 생성");
log.info("========================================");
// 항상 기존 데이터 삭제 새로 생성
@ -63,30 +80,28 @@ public class SampleDataLoader implements ApplicationRunner {
}
try {
// 1. 이벤트 통계 데이터 생성
List<EventStats> eventStatsList = createEventStats();
eventStatsRepository.saveAll(eventStatsList);
log.info("✅ 이벤트 통계 데이터 적재 완료: {} 건", eventStatsList.size());
// 1. EventCreated 이벤트 발행 (3개 이벤트)
publishEventCreatedEvents();
// 2. 채널별 통계 데이터 생성
List<ChannelStats> channelStatsList = createChannelStats(eventStatsList);
channelStatsRepository.saveAll(channelStatsList);
log.info("✅ 채널별 통계 데이터 적재 완료: {} 건", channelStatsList.size());
// 2. DistributionCompleted 이벤트 발행 ( 이벤트당 4개 채널)
publishDistributionCompletedEvents();
// 3. 타임라인 데이터 생성
List<TimelineData> timelineDataList = createTimelineData(eventStatsList);
timelineDataRepository.saveAll(timelineDataList);
log.info("✅ 타임라인 데이터 적재 완료: {} 건", timelineDataList.size());
// 3. ParticipantRegistered 이벤트 발행 ( 이벤트당 다수 참여자)
publishParticipantRegisteredEvents();
log.info("========================================");
log.info("🎉 샘플 데이터 적재 완료!");
log.info("🎉 Kafka 이벤트 발행 완료! (Consumer가 처리 중...)");
log.info("========================================");
log.info("테스트 가능한 이벤트:");
eventStatsList.forEach(event ->
log.info(" - {} (ID: {})", event.getEventTitle(), event.getEventId())
);
log.info("발행된 이벤트:");
log.info(" - EventCreated: 3건");
log.info(" - DistributionCompleted: 12건 (3 이벤트 × 4 채널)");
log.info(" - ParticipantRegistered: 약 27,610건");
log.info("========================================");
// Consumer 처리 대기 (3초)
log.info("⏳ Consumer 처리 대기 중... (3초)");
Thread.sleep(3000);
} catch (Exception e) {
log.error("샘플 데이터 적재 중 오류 발생", e);
}
@ -127,232 +142,136 @@ public class SampleDataLoader implements ApplicationRunner {
}
/**
* 이벤트 통계 샘플 데이터 생성
* EventCreated 이벤트 발행
*/
private List<EventStats> createEventStats() {
List<EventStats> eventStatsList = new ArrayList<>();
private void publishEventCreatedEvents() throws Exception {
// 이벤트 1: 신년맞이 할인 이벤트 (진행중, 높은 성과)
BigDecimal event1Investment = new BigDecimal("5000000");
BigDecimal event1Revenue = new BigDecimal("14025000");
eventStatsList.add(EventStats.builder()
EventCreatedEvent event1 = EventCreatedEvent.builder()
.eventId("evt_2025012301")
.eventTitle("신년맞이 20% 할인 이벤트")
.storeId("store_001")
.totalParticipants(15420)
.estimatedRoi(new BigDecimal("280.5"))
.salesGrowthRate(new BigDecimal("35.8"))
.totalInvestment(event1Investment)
.expectedRevenue(event1Revenue)
.totalInvestment(new BigDecimal("5000000"))
.status("ACTIVE")
.build());
.build();
publishEvent(EVENT_CREATED_TOPIC, event1);
// 이벤트 2: 설날 특가 이벤트 (진행중, 중간 성과)
BigDecimal event2Investment = new BigDecimal("3500000");
BigDecimal event2Revenue = new BigDecimal("6485500");
eventStatsList.add(EventStats.builder()
EventCreatedEvent event2 = EventCreatedEvent.builder()
.eventId("evt_2025020101")
.eventTitle("설날 특가 선물세트 이벤트")
.storeId("store_001")
.totalParticipants(8950)
.estimatedRoi(new BigDecimal("185.3"))
.salesGrowthRate(new BigDecimal("22.4"))
.totalInvestment(event2Investment)
.expectedRevenue(event2Revenue)
.totalInvestment(new BigDecimal("3500000"))
.status("ACTIVE")
.build());
.build();
publishEvent(EVENT_CREATED_TOPIC, event2);
// 이벤트 3: 겨울 신메뉴 런칭 이벤트 (종료, 저조한 성과)
BigDecimal event3Investment = new BigDecimal("2000000");
BigDecimal event3Revenue = new BigDecimal("1910000");
eventStatsList.add(EventStats.builder()
EventCreatedEvent event3 = EventCreatedEvent.builder()
.eventId("evt_2025011501")
.eventTitle("겨울 신메뉴 런칭 이벤트")
.storeId("store_001")
.totalParticipants(3240)
.estimatedRoi(new BigDecimal("95.5"))
.salesGrowthRate(new BigDecimal("8.2"))
.totalInvestment(event3Investment)
.expectedRevenue(event3Revenue)
.totalInvestment(new BigDecimal("2000000"))
.status("COMPLETED")
.build());
.build();
publishEvent(EVENT_CREATED_TOPIC, event3);
return eventStatsList;
log.info("✅ EventCreated 이벤트 3건 발행 완료");
}
/**
* 채널별 통계 샘플 데이터 생성
* DistributionCompleted 이벤트 발행
*/
private List<ChannelStats> createChannelStats(List<EventStats> eventStatsList) {
List<ChannelStats> channelStatsList = new ArrayList<>();
private void publishDistributionCompletedEvents() throws Exception {
String[] eventIds = {"evt_2025012301", "evt_2025020101", "evt_2025011501"};
BigDecimal[] investments = {
new BigDecimal("5000000"),
new BigDecimal("3500000"),
new BigDecimal("2000000")
};
for (EventStats eventStats : eventStatsList) {
String eventId = eventStats.getEventId();
int totalParticipants = eventStats.getTotalParticipants();
BigDecimal totalInvestment = eventStats.getTotalInvestment();
for (int i = 0; i < eventIds.length; i++) {
String eventId = eventIds[i];
BigDecimal distributionBudget = investments[i].multiply(new BigDecimal("0.5"));
// 채널별 배포 비율 (우리동네TV: 30%, 지니TV: 30%, 링고비즈: 20%, SNS: 20%)
BigDecimal distributionBudget = totalInvestment.multiply(new BigDecimal("0.5"));
// 1. 우리동네TV (TV)
publishDistributionEvent(eventId, "우리동네TV", "TV",
distributionBudget.multiply(new BigDecimal("0.3")));
// 1. 우리동네TV (조회수 많음, 참여율 중간)
channelStatsList.add(createChannelStats(
eventId,
"우리동네TV",
(int) (totalParticipants * 0.35), // 참여자: 35%
distributionBudget.multiply(new BigDecimal("0.3")), // 비용: 30%
1.8 // 조회수 대비 참여자 비율
));
// 2. 지니TV (TV)
publishDistributionEvent(eventId, "지니TV", "TV",
distributionBudget.multiply(new BigDecimal("0.3")));
// 2. 지니TV (조회수 중간, 참여율 높음)
channelStatsList.add(createChannelStats(
eventId,
"지니TV",
(int) (totalParticipants * 0.30), // 참여자: 30%
distributionBudget.multiply(new BigDecimal("0.3")), // 비용: 30%
2.2 // 조회수 대비 참여자 비율
));
// 3. 링고비즈 (CALL)
publishDistributionEvent(eventId, "링고비즈", "CALL",
distributionBudget.multiply(new BigDecimal("0.2")));
// 3. 링고비즈 (통화 기반, 높은 전환율)
channelStatsList.add(createChannelStats(
eventId,
"링고비즈",
(int) (totalParticipants * 0.20), // 참여자: 20%
distributionBudget.multiply(new BigDecimal("0.2")), // 비용: 20%
3.5 // 조회수 대비 참여자 비율 (높은 전환율)
));
// 4. SNS (바이럴 효과, 높은 도달률)
channelStatsList.add(createChannelStats(
eventId,
"SNS",
(int) (totalParticipants * 0.15), // 참여자: 15%
distributionBudget.multiply(new BigDecimal("0.2")), // 비용: 20%
1.5 // 조회수 대비 참여자 비율
));
// 4. SNS (SNS)
publishDistributionEvent(eventId, "SNS", "SNS",
distributionBudget.multiply(new BigDecimal("0.2")));
}
return channelStatsList;
log.info("✅ DistributionCompleted 이벤트 12건 발행 완료 (3 이벤트 × 4 채널)");
}
/**
* 채널 통계 생성 헬퍼 메서드
* 개별 DistributionCompleted 이벤트 발행
*/
private ChannelStats createChannelStats(
String eventId,
String channelName,
int participants,
BigDecimal distributionCost,
double conversionMultiplier
) {
int views = (int) (participants * (8 + random.nextDouble() * 4)); // 8~12배
int clicks = (int) (views * (0.15 + random.nextDouble() * 0.10)); // 15~25%
int conversions = (int) (participants * (0.3 + random.nextDouble() * 0.2)); // 30~50%
int impressions = (int) (views * (1.5 + random.nextDouble() * 1.0)); // 1.5~2.5배
ChannelStats.ChannelStatsBuilder builder = ChannelStats.builder()
private void publishDistributionEvent(String eventId, String channelName, String channelType,
BigDecimal distributionCost) throws Exception {
DistributionCompletedEvent event = DistributionCompletedEvent.builder()
.eventId(eventId)
.channelName(channelName)
.views(views)
.clicks(clicks)
.participants(participants)
.conversions(conversions)
.impressions(impressions)
.distributionCost(distributionCost);
// 채널별 특화 지표 추가
if ("SNS".equals(channelName)) {
// SNS는 좋아요, 댓글, 공유 많음
builder.likes((int) (participants * (2.0 + random.nextDouble())))
.comments((int) (participants * (0.5 + random.nextDouble() * 0.3)))
.shares((int) (participants * (0.8 + random.nextDouble() * 0.4)))
.totalCalls(0)
.completedCalls(0)
.averageDuration(0);
} else if ("링고비즈".equals(channelName)) {
// 링고비즈는 통화 중심
int totalCalls = (int) (participants * (2.5 + random.nextDouble() * 0.5));
int completedCalls = (int) (totalCalls * (0.7 + random.nextDouble() * 0.2));
builder.likes(0)
.comments(0)
.shares(0)
.totalCalls(totalCalls)
.completedCalls(completedCalls)
.averageDuration((int) (120 + random.nextDouble() * 180)); // 120~300초
} else {
// TV 채널은 SNS 반응 적음
builder.likes((int) (participants * (0.3 + random.nextDouble() * 0.2)))
.comments((int) (participants * (0.05 + random.nextDouble() * 0.05)))
.shares((int) (participants * (0.08 + random.nextDouble() * 0.07)))
.totalCalls(0)
.completedCalls(0)
.averageDuration(0);
}
return builder.build();
.channelType(channelType)
.distributionCost(distributionCost)
.build();
publishEvent(DISTRIBUTION_COMPLETED_TOPIC, event);
}
/**
* 타임라인 데이터 생성
* ParticipantRegistered 이벤트 발행
*/
private List<TimelineData> createTimelineData(List<EventStats> eventStatsList) {
List<TimelineData> timelineDataList = new ArrayList<>();
private void publishParticipantRegisteredEvents() throws Exception {
String[] eventIds = {"evt_2025012301", "evt_2025020101", "evt_2025011501"};
int[] totalParticipants = {15420, 8950, 3240};
String[] channels = {"우리동네TV", "지니TV", "링고비즈", "SNS"};
for (EventStats eventStats : eventStatsList) {
String eventId = eventStats.getEventId();
int totalParticipants = eventStats.getTotalParticipants();
int totalPublished = 0;
// 지난 30일간의 시간별 데이터 생성
LocalDateTime now = LocalDateTime.now();
LocalDateTime startTime = now.minusDays(30);
for (int i = 0; i < eventIds.length; i++) {
String eventId = eventIds[i];
int participants = totalParticipants[i];
int cumulativeCount = 0;
// 이벤트에 대해 참여자 수만큼 ParticipantRegistered 이벤트 발행
for (int j = 0; j < participants; j++) {
String participantId = UUID.randomUUID().toString();
String channel = channels[j % channels.length]; // 채널 순환 배정
// 일별 데이터 생성 (30일)
for (int day = 0; day < 30; day++) {
LocalDateTime dayStart = startTime.plusDays(day);
// 하루를 6개 시간대로 분할 (4시간 단위)
for (int hour = 0; hour < 24; hour += 4) {
LocalDateTime timestamp = dayStart.plusHours(hour);
// 시간대별 참여자 (점진적 증가 + 시간대별 변동)
int baseCount = (int) (totalParticipants * (day / 30.0) / 6); // 일별 증가
int timeMultiplier = getTimeMultiplier(hour); // 시간대별 가중치
int participantCount = (int) (baseCount * timeMultiplier * (0.8 + random.nextDouble() * 0.4));
cumulativeCount += participantCount;
timelineDataList.add(TimelineData.builder()
ParticipantRegisteredEvent event = ParticipantRegisteredEvent.builder()
.eventId(eventId)
.timestamp(timestamp)
.participants(participantCount)
.views((int) (participantCount * (8 + random.nextDouble() * 4)))
.engagement((int) (participantCount * (1.5 + random.nextDouble() * 0.5)))
.conversions((int) (participantCount * (0.3 + random.nextDouble() * 0.2)))
.cumulativeParticipants(Math.min(cumulativeCount, totalParticipants))
.build());
.participantId(participantId)
.channel(channel)
.build();
publishEvent(PARTICIPANT_REGISTERED_TOPIC, event);
totalPublished++;
// 1000명마다 로그 출력 짧은 대기 (Kafka 부하 방지)
if (totalPublished % 1000 == 0) {
log.info(" ⏳ ParticipantRegistered 발행 진행 중... ({}/{})", totalPublished,
totalParticipants[0] + totalParticipants[1] + totalParticipants[2]);
Thread.sleep(100); // 0.1초 대기
}
}
}
return timelineDataList;
log.info("✅ ParticipantRegistered 이벤트 {}건 발행 완료", totalPublished);
}
/**
* 시간대별 가중치 반환
*
* @param hour 시간 (0~23)
* @return 가중치 (0.5~2.0)
* Kafka 이벤트 발행 공통 메서드
*/
private int getTimeMultiplier(int hour) {
if (hour >= 0 && hour < 6) {
return 1; // 새벽: 낮음
} else if (hour >= 6 && hour < 12) {
return 2; // 아침: 높음
} else if (hour >= 12 && hour < 18) {
return 3; // 점심~오후: 가장 높음
} else {
return 2; // 저녁: 높음
}
private void publishEvent(String topic, Object event) throws Exception {
String jsonMessage = objectMapper.writeValueAsString(event);
kafkaTemplate.send(topic, jsonMessage);
}
}

View File

@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 배포 완료 Consumer
*
@ -23,6 +26,11 @@ public class DistributionCompletedConsumer {
private final ChannelStatsRepository channelStatsRepository;
private final ObjectMapper objectMapper;
private final RedisTemplate<String, String> redisTemplate;
private static final String PROCESSED_DISTRIBUTIONS_KEY = "distribution_completed";
private static final String CACHE_KEY_PREFIX = "analytics:dashboard:";
private static final long IDEMPOTENCY_TTL_DAYS = 7;
/**
* DistributionCompleted 이벤트 처리
@ -30,26 +38,48 @@ public class DistributionCompletedConsumer {
@KafkaListener(topics = "distribution.completed", groupId = "analytics-service")
public void handleDistributionCompleted(String message) {
try {
log.info("DistributionCompleted 이벤트 수신: {}", message);
log.info("📩 DistributionCompleted 이벤트 수신: {}", message);
DistributionCompletedEvent event = objectMapper.readValue(message, DistributionCompletedEvent.class);
String eventId = event.getEventId();
String channelName = event.getChannelName();
// 채널 통계 생성 또는 업데이트
// 멱등성 : eventId + channelName 조합
String distributionKey = eventId + ":" + channelName;
// 1. 멱등성 체크 (중복 처리 방지)
Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_DISTRIBUTIONS_KEY, distributionKey);
if (Boolean.TRUE.equals(isProcessed)) {
log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): eventId={}, channel={}", eventId, channelName);
return;
}
// 2. 채널 통계 생성 또는 업데이트
ChannelStats channelStats = channelStatsRepository
.findByEventIdAndChannelName(event.getEventId(), event.getChannelName())
.findByEventIdAndChannelName(eventId, channelName)
.orElse(ChannelStats.builder()
.eventId(event.getEventId())
.channelName(event.getChannelName())
.eventId(eventId)
.channelName(channelName)
.channelType(event.getChannelType())
.build());
channelStats.setDistributionCost(event.getDistributionCost());
channelStatsRepository.save(channelStats);
log.info("✅ 채널 통계 업데이트: eventId={}, channel={}", eventId, channelName);
// 3. 캐시 무효화 (다음 조회 최신 배포 통계 반영)
String cacheKey = CACHE_KEY_PREFIX + eventId;
redisTemplate.delete(cacheKey);
log.debug("🗑️ 캐시 무효화: {}", cacheKey);
// 4. 멱등성 처리 완료 기록 (7일 TTL)
redisTemplate.opsForSet().add(PROCESSED_DISTRIBUTIONS_KEY, distributionKey);
redisTemplate.expire(PROCESSED_DISTRIBUTIONS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS);
log.debug("✅ 멱등성 기록: distributionKey={}", distributionKey);
log.info("채널 통계 업데이트: eventId={}, channel={}",
event.getEventId(), event.getChannelName());
} catch (Exception e) {
log.error("DistributionCompleted 이벤트 처리 실패: {}", e.getMessage(), e);
log.error("❌ DistributionCompleted 이벤트 처리 실패: {}", e.getMessage(), e);
throw new RuntimeException("DistributionCompleted 처리 실패", e);
}
}
}

View File

@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 이벤트 생성 Consumer
*
@ -23,6 +26,11 @@ public class EventCreatedConsumer {
private final EventStatsRepository eventStatsRepository;
private final ObjectMapper objectMapper;
private final RedisTemplate<String, String> redisTemplate;
private static final String PROCESSED_EVENTS_KEY = "processed_events";
private static final String CACHE_KEY_PREFIX = "analytics:dashboard:";
private static final long IDEMPOTENCY_TTL_DAYS = 7;
/**
* EventCreated 이벤트 처리
@ -30,13 +38,21 @@ public class EventCreatedConsumer {
@KafkaListener(topics = "event.created", groupId = "analytics-service")
public void handleEventCreated(String message) {
try {
log.info("EventCreated 이벤트 수신: {}", message);
log.info("📩 EventCreated 이벤트 수신: {}", message);
EventCreatedEvent event = objectMapper.readValue(message, EventCreatedEvent.class);
String eventId = event.getEventId();
// 이벤트 통계 초기화
// 1. 멱등성 체크 (중복 처리 방지)
Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_EVENTS_KEY, eventId);
if (Boolean.TRUE.equals(isProcessed)) {
log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): eventId={}", eventId);
return;
}
// 2. 이벤트 통계 초기화
EventStats eventStats = EventStats.builder()
.eventId(event.getEventId())
.eventId(eventId)
.eventTitle(event.getEventTitle())
.storeId(event.getStoreId())
.totalParticipants(0)
@ -45,10 +61,21 @@ public class EventCreatedConsumer {
.build();
eventStatsRepository.save(eventStats);
log.info("✅ 이벤트 통계 초기화 완료: eventId={}", eventId);
// 3. 캐시 무효화 (다음 조회 최신 데이터 반영)
String cacheKey = CACHE_KEY_PREFIX + eventId;
redisTemplate.delete(cacheKey);
log.debug("🗑️ 캐시 무효화: {}", cacheKey);
// 4. 멱등성 처리 완료 기록 (7일 TTL)
redisTemplate.opsForSet().add(PROCESSED_EVENTS_KEY, eventId);
redisTemplate.expire(PROCESSED_EVENTS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS);
log.debug("✅ 멱등성 기록: eventId={}", eventId);
log.info("이벤트 통계 초기화 완료: eventId={}", event.getEventId());
} catch (Exception e) {
log.error("EventCreated 이벤트 처리 실패: {}", e.getMessage(), e);
log.error("❌ EventCreated 이벤트 처리 실패: {}", e.getMessage(), e);
throw new RuntimeException("EventCreated 처리 실패", e);
}
}
}

View File

@ -7,9 +7,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 참여자 등록 Consumer
*
@ -23,6 +26,11 @@ public class ParticipantRegisteredConsumer {
private final EventStatsRepository eventStatsRepository;
private final ObjectMapper objectMapper;
private final RedisTemplate<String, String> redisTemplate;
private static final String PROCESSED_PARTICIPANTS_KEY = "processed_participants";
private static final String CACHE_KEY_PREFIX = "analytics:dashboard:";
private static final long IDEMPOTENCY_TTL_DAYS = 7;
/**
* ParticipantRegistered 이벤트 처리
@ -30,20 +38,44 @@ public class ParticipantRegisteredConsumer {
@KafkaListener(topics = "participant.registered", groupId = "analytics-service")
public void handleParticipantRegistered(String message) {
try {
log.info("ParticipantRegistered 이벤트 수신: {}", message);
log.info("📩 ParticipantRegistered 이벤트 수신: {}", message);
ParticipantRegisteredEvent event = objectMapper.readValue(message, ParticipantRegisteredEvent.class);
String participantId = event.getParticipantId();
String eventId = event.getEventId();
// 이벤트 통계 업데이트
eventStatsRepository.findByEventId(event.getEventId())
.ifPresent(eventStats -> {
// 1. 멱등성 체크 (중복 처리 방지)
Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_PARTICIPANTS_KEY, participantId);
if (Boolean.TRUE.equals(isProcessed)) {
log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): participantId={}", participantId);
return;
}
// 2. 이벤트 통계 업데이트 (참여자 +1)
eventStatsRepository.findByEventId(eventId)
.ifPresentOrElse(
eventStats -> {
eventStats.incrementParticipants();
eventStatsRepository.save(eventStats);
log.info("참여자 수 업데이트: eventId={}, totalParticipants={}",
event.getEventId(), eventStats.getTotalParticipants());
});
log.info("✅ 참여자 수 업데이트: eventId={}, totalParticipants={}",
eventId, eventStats.getTotalParticipants());
},
() -> log.warn("⚠️ 이벤트 통계 없음: eventId={}", eventId)
);
// 3. 캐시 무효화 (다음 조회 최신 참여자 반영)
String cacheKey = CACHE_KEY_PREFIX + eventId;
redisTemplate.delete(cacheKey);
log.debug("🗑️ 캐시 무효화: {}", cacheKey);
// 4. 멱등성 처리 완료 기록 (7일 TTL)
redisTemplate.opsForSet().add(PROCESSED_PARTICIPANTS_KEY, participantId);
redisTemplate.expire(PROCESSED_PARTICIPANTS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS);
log.debug("✅ 멱등성 기록: participantId={}", participantId);
} catch (Exception e) {
log.error("ParticipantRegistered 이벤트 처리 실패: {}", e.getMessage(), e);
log.error("❌ ParticipantRegistered 이벤트 처리 실패: {}", e.getMessage(), e);
throw new RuntimeException("ParticipantRegistered 처리 실패", e);
}
}
}

View File

@ -56,6 +56,12 @@ spring:
request.timeout.ms: 5000
session.timeout.ms: 10000
# Sample Data (MVP Only)
# ⚠️ 실제 운영: false로 설정 (다른 서비스들이 이벤트 발행)
# ⚠️ MVP 환경: true로 설정 (SampleDataLoader가 이벤트 발행)
sample-data:
enabled: ${SAMPLE_DATA_ENABLED:true}
# Server
server:
port: ${SERVER_PORT:8086}