kafka 설정 변경

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Hyowon Yang 2025-10-24 16:37:05 +09:00
parent 7735c8472b
commit f3901c8ef8
4 changed files with 124 additions and 73 deletions

View File

@ -19,6 +19,8 @@ import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@ -94,7 +96,7 @@ public class SampleDataLoader implements ApplicationRunner {
log.info("========================================");
log.info("발행된 이벤트:");
log.info(" - EventCreated: 3건");
log.info(" - DistributionCompleted: 12건 (3 이벤트 × 4 채널)");
log.info(" - DistributionCompleted: 3건 (각 이벤트당 4개 채널 배열)");
log.info(" - ParticipantRegistered: 180건 (MVP 테스트용)");
log.info("========================================");
@ -179,15 +181,10 @@ public class SampleDataLoader implements ApplicationRunner {
}
/**
* DistributionCompleted 이벤트 발행
* DistributionCompleted 이벤트 발행 (설계서 기준 - 이벤트당 1번 발행, 여러 채널 배열)
*/
private void publishDistributionCompletedEvents() throws Exception {
String[] eventIds = {"evt_2025012301", "evt_2025020101", "evt_2025011501"};
BigDecimal[] investments = {
new BigDecimal("5000000"),
new BigDecimal("3500000"),
new BigDecimal("2000000")
};
int[][] expectedViews = {
{5000, 10000, 3000, 2000}, // 이벤트1: 우리동네TV, 지니TV, 링고비즈, SNS
{3500, 7000, 2000, 1500}, // 이벤트2
@ -196,41 +193,53 @@ public class SampleDataLoader implements ApplicationRunner {
for (int i = 0; i < eventIds.length; i++) {
String eventId = eventIds[i];
BigDecimal distributionBudget = investments[i].multiply(new BigDecimal("0.5"));
// 4개 채널을 배열로 구성
List<DistributionCompletedEvent.ChannelDistribution> channels = new ArrayList<>();
// 1. 우리동네TV (TV)
publishDistributionEvent(eventId, "우리동네TV", "TV",
distributionBudget.multiply(new BigDecimal("0.3")), expectedViews[i][0]);
channels.add(DistributionCompletedEvent.ChannelDistribution.builder()
.channel("우리동네TV")
.channelType("TV")
.status("SUCCESS")
.expectedViews(expectedViews[i][0])
.build());
// 2. 지니TV (TV)
publishDistributionEvent(eventId, "지니TV", "TV",
distributionBudget.multiply(new BigDecimal("0.3")), expectedViews[i][1]);
channels.add(DistributionCompletedEvent.ChannelDistribution.builder()
.channel("지니TV")
.channelType("TV")
.status("SUCCESS")
.expectedViews(expectedViews[i][1])
.build());
// 3. 링고비즈 (CALL)
publishDistributionEvent(eventId, "링고비즈", "CALL",
distributionBudget.multiply(new BigDecimal("0.2")), expectedViews[i][2]);
channels.add(DistributionCompletedEvent.ChannelDistribution.builder()
.channel("링고비즈")
.channelType("CALL")
.status("SUCCESS")
.expectedViews(expectedViews[i][2])
.build());
// 4. SNS (SNS)
publishDistributionEvent(eventId, "SNS", "SNS",
distributionBudget.multiply(new BigDecimal("0.2")), expectedViews[i][3]);
channels.add(DistributionCompletedEvent.ChannelDistribution.builder()
.channel("SNS")
.channelType("SNS")
.status("SUCCESS")
.expectedViews(expectedViews[i][3])
.build());
// 이벤트 발행 (채널 배열 포함)
DistributionCompletedEvent event = DistributionCompletedEvent.builder()
.eventId(eventId)
.distributedChannels(channels)
.completedAt(java.time.LocalDateTime.now())
.build();
publishEvent(DISTRIBUTION_COMPLETED_TOPIC, event);
}
log.info("✅ DistributionCompleted 이벤트 12건 발행 완료 (3 이벤트 × 4 채널)");
}
/**
* 개별 DistributionCompleted 이벤트 발행
*/
private void publishDistributionEvent(String eventId, String channelName, String channelType,
BigDecimal distributionCost, Integer expectedViews) throws Exception {
DistributionCompletedEvent event = DistributionCompletedEvent.builder()
.eventId(eventId)
.channelName(channelName)
.channelType(channelType)
.distributionCost(distributionCost)
.expectedViews(expectedViews)
.build();
publishEvent(DISTRIBUTION_COMPLETED_TOPIC, event);
log.info("✅ DistributionCompleted 이벤트 3건 발행 완료 (3 이벤트 × 4 채널 배열)");
}
/**

View File

@ -36,7 +36,7 @@ public class DistributionCompletedConsumer {
private static final long IDEMPOTENCY_TTL_DAYS = 7;
/**
* DistributionCompleted 이벤트 처리 (MVP용 샘플 토픽)
* DistributionCompleted 이벤트 처리 (설계서 기준 - 여러 채널 배열)
*/
@KafkaListener(topics = "sample.distribution.completed", groupId = "analytics-service")
public void handleDistributionCompleted(String message) {
@ -45,38 +45,26 @@ public class DistributionCompletedConsumer {
DistributionCompletedEvent event = objectMapper.readValue(message, DistributionCompletedEvent.class);
String eventId = event.getEventId();
String channelName = event.getChannelName();
// 멱등성 : eventId + channelName 조합
String distributionKey = eventId + ":" + channelName;
// 1. 멱등성 체크 (중복 처리 방지)
Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_DISTRIBUTIONS_KEY, distributionKey);
// 1. 멱등성 체크 (중복 처리 방지) - eventId 기반
Boolean isProcessed = redisTemplate.opsForSet().isMember(PROCESSED_DISTRIBUTIONS_KEY, eventId);
if (Boolean.TRUE.equals(isProcessed)) {
log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): eventId={}, channel={}", eventId, channelName);
log.warn("⚠️ 중복 이벤트 스킵 (이미 처리됨): eventId={}", eventId);
return;
}
// 2. 채널 통계 생성 또는 업데이트
ChannelStats channelStats = channelStatsRepository
.findByEventIdAndChannelName(eventId, channelName)
.orElse(ChannelStats.builder()
.eventId(eventId)
.channelName(channelName)
.channelType(event.getChannelType())
.build());
// 2. 채널 배열 루프 처리 (설계서: distributedChannels 배열)
if (event.getDistributedChannels() != null && !event.getDistributedChannels().isEmpty()) {
for (DistributionCompletedEvent.ChannelDistribution channel : event.getDistributedChannels()) {
processChannelStats(eventId, channel);
}
channelStats.setDistributionCost(event.getDistributionCost());
// 예상 노출 저장
if (event.getExpectedViews() != null) {
channelStats.setImpressions(event.getExpectedViews());
log.info("✅ 채널 통계 일괄 업데이트 완료: eventId={}, channelCount={}",
eventId, event.getDistributedChannels().size());
} else {
log.warn("⚠️ 배포된 채널 없음: eventId={}", eventId);
}
channelStatsRepository.save(channelStats);
log.info("✅ 채널 통계 업데이트: eventId={}, channel={}, expectedViews={}",
eventId, channelName, event.getExpectedViews());
// 3. EventStats의 totalViews 업데이트 (모든 채널 노출 합계)
updateTotalViews(eventId);
@ -85,10 +73,10 @@ public class DistributionCompletedConsumer {
redisTemplate.delete(cacheKey);
log.debug("🗑️ 캐시 무효화: {}", cacheKey);
// 5. 멱등성 처리 완료 기록 (7일 TTL)
redisTemplate.opsForSet().add(PROCESSED_DISTRIBUTIONS_KEY, distributionKey);
// 5. 멱등성 처리 완료 기록 (7일 TTL) - eventId 기반
redisTemplate.opsForSet().add(PROCESSED_DISTRIBUTIONS_KEY, eventId);
redisTemplate.expire(PROCESSED_DISTRIBUTIONS_KEY, IDEMPOTENCY_TTL_DAYS, TimeUnit.DAYS);
log.debug("✅ 멱등성 기록: distributionKey={}", distributionKey);
log.debug("✅ 멱등성 기록: eventId={}", eventId);
} catch (Exception e) {
log.error("❌ DistributionCompleted 이벤트 처리 실패: {}", e.getMessage(), e);
@ -96,6 +84,37 @@ public class DistributionCompletedConsumer {
}
}
/**
* 개별 채널 통계 처리
*/
private void processChannelStats(String eventId, DistributionCompletedEvent.ChannelDistribution channel) {
try {
String channelName = channel.getChannel();
// 채널 통계 생성 또는 업데이트
ChannelStats channelStats = channelStatsRepository
.findByEventIdAndChannelName(eventId, channelName)
.orElse(ChannelStats.builder()
.eventId(eventId)
.channelName(channelName)
.channelType(channel.getChannelType())
.build());
// 예상 노출 저장
if (channel.getExpectedViews() != null) {
channelStats.setImpressions(channel.getExpectedViews());
}
channelStatsRepository.save(channelStats);
log.debug("✅ 채널 통계 저장: eventId={}, channel={}, expectedViews={}",
eventId, channelName, channel.getExpectedViews());
} catch (Exception e) {
log.error("❌ 채널 통계 처리 실패: eventId={}, channel={}", eventId, channel.getChannel(), e);
}
}
/**
* 모든 채널의 예상 노출 수를 합산하여 EventStats.totalViews 업데이트
*/

View File

@ -5,10 +5,13 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
/**
* 배포 완료 이벤트
* 배포 완료 이벤트 (설계서 기준)
*
* Distribution Service가 이벤트의 모든 채널 배포 완료 발행
*/
@Data
@Builder
@ -22,22 +25,42 @@ public class DistributionCompletedEvent {
private String eventId;
/**
* 채널명
* 배포된 채널 목록 (여러 채널을 배열로 포함)
*/
private String channelName;
private List<ChannelDistribution> distributedChannels;
/**
* 채널 유형
* 배포 완료 시각
*/
private String channelType;
private LocalDateTime completedAt;
/**
* 배포 비용
* 개별 채널 배포 정보
*/
private BigDecimal distributionCost;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class ChannelDistribution {
/**
* 예상 노출
*/
private Integer expectedViews;
/**
* 채널명 (우리동네TV, 지니TV, 링고비즈, SNS)
*/
private String channel;
/**
* 채널 유형 (TV, CALL, SNS)
*/
private String channelType;
/**
* 배포 상태 (SUCCESS, FAILURE)
*/
private String status;
/**
* 예상 노출
*/
private Integer expectedViews;
}
}

View File

@ -44,7 +44,7 @@ spring:
# Kafka
kafka:
enabled: ${KAFKA_ENABLED:false}
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:4.230.50.63:9092}
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:4.217.131.59:9095}
consumer:
group-id: ${KAFKA_CONSUMER_GROUP_ID:analytics-service}
auto-offset-reset: earliest