From 50043add5d7a81e526576a3b926bd5423b7ad6df Mon Sep 17 00:00:00 2001 From: Hyowon Yang Date: Tue, 28 Oct 2025 09:16:24 +0900 Subject: [PATCH] =?UTF-8?q?analytics=20=EC=84=9C=EB=B9=84=EC=8A=A4=20?= =?UTF-8?q?=EB=8F=99=EC=8B=9C=EC=84=B1=20=EC=B6=A9=EB=8F=8C=20=ED=95=B4?= =?UTF-8?q?=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [문제] - ParticipantRegistered 이벤트 처리 시 StaleObjectStateException 발생 - 100개의 이벤트가 동시에 발행되어 EventStats 동시 업데이트 충돌 - TransactionRequiredException 발생 (트랜잭션 컨텍스트 부재) [해결] 1. 비관적 락(Pessimistic Lock) 적용 - EventStatsRepository에 findByEventIdWithLock 메서드 추가 - PESSIMISTIC_WRITE 락으로 읽는 순간부터 다른 트랜잭션 차단 2. 트랜잭션 추가 - 모든 Consumer 메서드에 @Transactional 어노테이션 추가 - EventCreatedConsumer, ParticipantRegisteredConsumer, DistributionCompletedConsumer 3. 이벤트 발행 속도 조절 - SampleDataLoader에서 10개마다 100ms 대기 - 동시성 충돌 빈도 감소 [수정 파일] - EventStatsRepository.java: 비관적 락 메서드 추가 - ParticipantRegisteredConsumer.java: @Transactional 추가, 락 메서드 사용 - DistributionCompletedConsumer.java: @Transactional 추가, 락 메서드 사용 - EventCreatedConsumer.java: @Transactional 추가 - SampleDataLoader.java: 이벤트 발행 속도 조절 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../analytics/config/SampleDataLoader.java | 5 +++++ .../DistributionCompletedConsumer.java | 8 ++++++-- .../consumer/EventCreatedConsumer.java | 4 ++++ .../ParticipantRegisteredConsumer.java | 8 ++++++-- .../repository/EventStatsRepository.java | 18 ++++++++++++++++++ 5 files changed, 39 insertions(+), 4 deletions(-) diff --git a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java index 72d27f4..527e840 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/config/SampleDataLoader.java @@ -286,6 +286,11 @@ public class SampleDataLoader implements ApplicationRunner { publishEvent(PARTICIPANT_REGISTERED_TOPIC, event); totalPublished++; + + // 동시성 충돌 방지: 10개마다 100ms 대기 + if ((j + 1) % 10 == 0) { + Thread.sleep(100); + } } } diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java index 1b3d1d1..0d77956 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/DistributionCompletedConsumer.java @@ -11,6 +11,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.concurrent.TimeUnit; @@ -37,7 +38,10 @@ public class DistributionCompletedConsumer { /** * DistributionCompleted 이벤트 처리 (설계서 기준 - 여러 채널 배열) + * + * @Transactional 필수: DB 저장 작업을 위해 트랜잭션 컨텍스트 필요 */ + @Transactional @KafkaListener(topics = "sample.distribution.completed", groupId = "${spring.kafka.consumer.group-id}") public void handleDistributionCompleted(String message) { try { @@ -128,8 +132,8 @@ public class DistributionCompletedConsumer { .mapToInt(ChannelStats::getImpressions) .sum(); - // EventStats 업데이트 - eventStatsRepository.findByEventId(eventId) + // EventStats 업데이트 - 비관적 락 적용 + eventStatsRepository.findByEventIdWithLock(eventId) .ifPresentOrElse( eventStats -> { eventStats.setTotalViews(totalViews); diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java index c7c7689..5f8cb84 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/EventCreatedConsumer.java @@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.concurrent.TimeUnit; @@ -34,7 +35,10 @@ public class EventCreatedConsumer { /** * EventCreated 이벤트 처리 (MVP용 샘플 토픽) + * + * @Transactional 필수: DB 저장 작업을 위해 트랜잭션 컨텍스트 필요 */ + @Transactional @KafkaListener(topics = "sample.event.created", groupId = "${spring.kafka.consumer.group-id}") public void handleEventCreated(String message) { try { diff --git a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java index ae33697..54d2fb5 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/messaging/consumer/ParticipantRegisteredConsumer.java @@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.concurrent.TimeUnit; @@ -34,7 +35,10 @@ public class ParticipantRegisteredConsumer { /** * ParticipantRegistered 이벤트 처리 (MVP용 샘플 토픽) + * + * @Transactional 필수: 비관적 락 사용을 위해 트랜잭션 컨텍스트 필요 */ + @Transactional @KafkaListener(topics = "sample.participant.registered", groupId = "${spring.kafka.consumer.group-id}") public void handleParticipantRegistered(String message) { try { @@ -51,8 +55,8 @@ public class ParticipantRegisteredConsumer { return; } - // 2. 이벤트 통계 업데이트 (참여자 수 +1) - eventStatsRepository.findByEventId(eventId) + // 2. 이벤트 통계 업데이트 (참여자 수 +1) - 비관적 락 적용 + eventStatsRepository.findByEventIdWithLock(eventId) .ifPresentOrElse( eventStats -> { eventStats.incrementParticipants(); diff --git a/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java b/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java index 1b13bfa..02688a9 100644 --- a/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java +++ b/analytics-service/src/main/java/com/kt/event/analytics/repository/EventStatsRepository.java @@ -1,7 +1,11 @@ package com.kt.event.analytics.repository; import com.kt.event.analytics.entity.EventStats; +import jakarta.persistence.LockModeType; import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; import java.util.Optional; @@ -20,6 +24,20 @@ public interface EventStatsRepository extends JpaRepository { */ Optional findByEventId(String eventId); + /** + * 이벤트 ID로 통계 조회 (비관적 락 적용) + * + * 동시성 충돌 방지를 위해 PESSIMISTIC_WRITE 락 사용 + * - 읽는 순간부터 락을 걸어 다른 트랜잭션 차단 + * - ParticipantRegistered 이벤트 처리 시 사용 + * + * @param eventId 이벤트 ID + * @return 이벤트 통계 + */ + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query("SELECT e FROM EventStats e WHERE e.eventId = :eventId") + Optional findByEventIdWithLock(@Param("eventId") String eventId); + /** * 매장 ID와 이벤트 ID로 통계 조회 *