analytics 서비스 동시성 충돌 해결

[문제]
- ParticipantRegistered 이벤트 처리 시 StaleObjectStateException 발생
- 100개의 이벤트가 동시에 발행되어 EventStats 동시 업데이트 충돌
- TransactionRequiredException 발생 (트랜잭션 컨텍스트 부재)

[해결]
1. 비관적 락(Pessimistic Lock) 적용
   - EventStatsRepository에 findByEventIdWithLock 메서드 추가
   - PESSIMISTIC_WRITE 락으로 읽는 순간부터 다른 트랜잭션 차단

2. 트랜잭션 추가
   - 모든 Consumer 메서드에 @Transactional 어노테이션 추가
   - EventCreatedConsumer, ParticipantRegisteredConsumer, DistributionCompletedConsumer

3. 이벤트 발행 속도 조절
   - SampleDataLoader에서 10개마다 100ms 대기
   - 동시성 충돌 빈도 감소

[수정 파일]
- EventStatsRepository.java: 비관적 락 메서드 추가
- ParticipantRegisteredConsumer.java: @Transactional 추가, 락 메서드 사용
- DistributionCompletedConsumer.java: @Transactional 추가, 락 메서드 사용
- EventCreatedConsumer.java: @Transactional 추가
- SampleDataLoader.java: 이벤트 발행 속도 조절

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Hyowon Yang 2025-10-28 09:16:24 +09:00
parent 397a23063d
commit 50043add5d
5 changed files with 39 additions and 4 deletions

View File

@ -286,6 +286,11 @@ public class SampleDataLoader implements ApplicationRunner {
publishEvent(PARTICIPANT_REGISTERED_TOPIC, event);
totalPublished++;
// 동시성 충돌 방지: 10개마다 100ms 대기
if ((j + 1) % 10 == 0) {
Thread.sleep(100);
}
}
}

View File

@ -11,6 +11,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -37,7 +38,10 @@ public class DistributionCompletedConsumer {
/**
* DistributionCompleted 이벤트 처리 (설계서 기준 - 여러 채널 배열)
*
* @Transactional 필수: DB 저장 작업을 위해 트랜잭션 컨텍스트 필요
*/
@Transactional
@KafkaListener(topics = "sample.distribution.completed", groupId = "${spring.kafka.consumer.group-id}")
public void handleDistributionCompleted(String message) {
try {
@ -128,8 +132,8 @@ public class DistributionCompletedConsumer {
.mapToInt(ChannelStats::getImpressions)
.sum();
// EventStats 업데이트
eventStatsRepository.findByEventId(eventId)
// EventStats 업데이트 - 비관적 적용
eventStatsRepository.findByEventIdWithLock(eventId)
.ifPresentOrElse(
eventStats -> {
eventStats.setTotalViews(totalViews);

View File

@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.TimeUnit;
@ -34,7 +35,10 @@ public class EventCreatedConsumer {
/**
* EventCreated 이벤트 처리 (MVP용 샘플 토픽)
*
* @Transactional 필수: DB 저장 작업을 위해 트랜잭션 컨텍스트 필요
*/
@Transactional
@KafkaListener(topics = "sample.event.created", groupId = "${spring.kafka.consumer.group-id}")
public void handleEventCreated(String message) {
try {

View File

@ -10,6 +10,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.concurrent.TimeUnit;
@ -34,7 +35,10 @@ public class ParticipantRegisteredConsumer {
/**
* ParticipantRegistered 이벤트 처리 (MVP용 샘플 토픽)
*
* @Transactional 필수: 비관적 사용을 위해 트랜잭션 컨텍스트 필요
*/
@Transactional
@KafkaListener(topics = "sample.participant.registered", groupId = "${spring.kafka.consumer.group-id}")
public void handleParticipantRegistered(String message) {
try {
@ -51,8 +55,8 @@ public class ParticipantRegisteredConsumer {
return;
}
// 2. 이벤트 통계 업데이트 (참여자 +1)
eventStatsRepository.findByEventId(eventId)
// 2. 이벤트 통계 업데이트 (참여자 +1) - 비관적 적용
eventStatsRepository.findByEventIdWithLock(eventId)
.ifPresentOrElse(
eventStats -> {
eventStats.incrementParticipants();

View File

@ -1,7 +1,11 @@
package com.kt.event.analytics.repository;
import com.kt.event.analytics.entity.EventStats;
import jakarta.persistence.LockModeType;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Lock;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@ -20,6 +24,20 @@ public interface EventStatsRepository extends JpaRepository<EventStats, Long> {
*/
Optional<EventStats> findByEventId(String eventId);
/**
* 이벤트 ID로 통계 조회 (비관적 적용)
*
* 동시성 충돌 방지를 위해 PESSIMISTIC_WRITE 사용
* - 읽는 순간부터 락을 걸어 다른 트랜잭션 차단
* - ParticipantRegistered 이벤트 처리 사용
*
* @param eventId 이벤트 ID
* @return 이벤트 통계
*/
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT e FROM EventStats e WHERE e.eventId = :eventId")
Optional<EventStats> findByEventIdWithLock(@Param("eventId") String eventId);
/**
* 매장 ID와 이벤트 ID로 통계 조회
*