diff --git a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java index 72d27f4..527e840 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java @@ -286,6 +286,11 @@ public class SampleDataLoader implements ApplicationRunner { publishEvent(PARTICIPANT_REGISTERED_TOPIC, event); totalPublished++; + + // 동시성 충돌 방지: 10개마다 100ms 대기 + if ((j + 1) % 10 == 0) { + Thread.sleep(100); + } } } diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java index 1b3d1d1..0d77956 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java @@ -11,6 +11,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.concurrent.TimeUnit; @@ -37,7 +38,10 @@ public class DistributionCompletedConsumer { /** * DistributionCompleted 이벤트 처리 (설계서 기준 - 여러 채널 배열) + * + * @Transactional 필수: DB 저장 작업을 위해 트랜잭션 컨텍스트 필요 */ + @Transactional @KafkaListener(topics = "sample.distribution.completed", groupId = "${spring.kafka.consumer.group-id}") public void handleDistributionCompleted(String message) { try { @@ -128,8 +132,8 @@ public class DistributionCompletedConsumer { .mapToInt(ChannelStats::getImpressions) .sum(); - // EventStats 업데이트 - eventStatsRepository.findByEventId(eventId) + // EventStats 업데이트 - 비관적 락 적용 + eventStatsRepository.findByEventIdWithLock(eventId) .ifPresentOrElse( eventStats -> { eventStats.setTotalViews(totalViews); diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java index c7c7689..5f8cb84 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java @@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.concurrent.TimeUnit; @@ -34,7 +35,10 @@ public class EventCreatedConsumer { /** * EventCreated 이벤트 처리 (MVP용 샘플 토픽) + * + * @Transactional 필수: DB 저장 작업을 위해 트랜잭션 컨텍스트 필요 */ + @Transactional @KafkaListener(topics = "sample.event.created", groupId = "${spring.kafka.consumer.group-id}") public void handleEventCreated(String message) { try { diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java index ae33697..54d2fb5 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java @@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.concurrent.TimeUnit; @@ -34,7 +35,10 @@ public class ParticipantRegisteredConsumer { /** * ParticipantRegistered 이벤트 처리 (MVP용 샘플 토픽) + * + * @Transactional 필수: 비관적 락 사용을 위해 트랜잭션 컨텍스트 필요 */ + @Transactional @KafkaListener(topics = "sample.participant.registered", groupId = "${spring.kafka.consumer.group-id}") public void handleParticipantRegistered(String message) { try { @@ -51,8 +55,8 @@ public class ParticipantRegisteredConsumer { return; } - // 2. 이벤트 통계 업데이트 (참여자 수 +1) - eventStatsRepository.findByEventId(eventId) + // 2. 이벤트 통계 업데이트 (참여자 수 +1) - 비관적 락 적용 + eventStatsRepository.findByEventIdWithLock(eventId) .ifPresentOrElse( eventStats -> { eventStats.incrementParticipants(); diff --git a/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java b/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java index 1b13bfa..02688a9 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java @@ -1,7 +1,11 @@ package com.kt.event.analytics.repository; import com.kt.event.analytics.entity.EventStats; +import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; import java.util.Optional; @@ -20,6 +24,20 @@ public interface EventStatsRepository extends JpaRepository { */ Optional findByEventId(String eventId); + /** + * 이벤트 ID로 통계 조회 (비관적 락 적용) + * + * 동시성 충돌 방지를 위해 PESSIMISTIC_WRITE 락 사용 + * - 읽는 순간부터 락을 걸어 다른 트랜잭션 차단 + * - ParticipantRegistered 이벤트 처리 시 사용 + * + * @param eventId 이벤트 ID + * @return 이벤트 통계 + */ + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT e FROM EventStats e WHERE e.eventId = :eventId") + Optional findByEventIdWithLock(@Param("eventId") String eventId); + /** * 매장 ID와 이벤트 ID로 통계 조회 *