Merge branch 'feature/event' into develop

This commit is contained in:
merrycoral
2025-10-29 15:01:57 +09:00
26 changed files with 2088 additions and 82 deletions
@@ -27,10 +27,10 @@ public class AIEventGenerationJobMessage {
private String jobId;
/**
* 사용자 ID
* 사용자 ID (UUID String)
*/
@JsonProperty("user_id")
private Long userId;
private String userId;
/**
* 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED)
@@ -26,16 +26,16 @@ public class ImageGenerationJobMessage {
private String jobId;
/**
* 이벤트 ID
* 이벤트 ID (UUID String)
*/
@JsonProperty("event_id")
private Long eventId;
private String eventId;
/**
* 사용자 ID
* 사용자 ID (UUID String)
*/
@JsonProperty("user_id")
private Long userId;
private String userId;
/**
* 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED)
@@ -36,6 +36,9 @@ public class EventDetailResponse {
private EventStatus status;
private UUID selectedImageId;
private String selectedImageUrl;
private Integer participants;
private Integer targetParticipants;
private Double roi;
@Builder.Default
private List<GeneratedImageDto> generatedImages = new ArrayList<>();
@@ -4,6 +4,7 @@ import com.kt.event.common.exception.BusinessException;
import com.kt.event.common.exception.ErrorCode;
import com.kt.event.eventservice.application.dto.request.*;
import com.kt.event.eventservice.application.dto.response.*;
import com.kt.event.eventservice.domain.enums.JobStatus;
import com.kt.event.eventservice.domain.enums.JobType;
import com.kt.event.eventservice.domain.entity.*;
import com.kt.event.eventservice.domain.enums.EventStatus;
@@ -13,6 +14,8 @@ import com.kt.event.eventservice.infrastructure.client.ContentServiceClient;
import com.kt.event.eventservice.infrastructure.client.dto.ContentImageGenerationRequest;
import com.kt.event.eventservice.infrastructure.client.dto.ContentJobResponse;
import com.kt.event.eventservice.infrastructure.kafka.AIJobKafkaProducer;
import com.kt.event.eventservice.infrastructure.kafka.EventKafkaProducer;
import com.kt.event.eventservice.infrastructure.kafka.ImageJobKafkaProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.Hibernate;
@@ -43,6 +46,8 @@ public class EventService {
private final JobRepository jobRepository;
private final ContentServiceClient contentServiceClient;
private final AIJobKafkaProducer aiJobKafkaProducer;
private final ImageJobKafkaProducer imageJobKafkaProducer;
private final EventKafkaProducer eventKafkaProducer;
/**
* 이벤트 생성 (Step 1: 목적 선택)
@@ -171,6 +176,14 @@ public class EventService {
eventRepository.save(event);
// Kafka 이벤트 발행
eventKafkaProducer.publishEventCreated(
event.getEventId(),
event.getUserId(),
event.getEventName(),
event.getObjective()
);
log.info("이벤트 배포 완료 - eventId: {}", eventId);
}
@@ -215,26 +228,37 @@ public class EventService {
throw new BusinessException(ErrorCode.EVENT_002);
}
// Content Service 요청 DTO 생성
ContentImageGenerationRequest contentRequest = ContentImageGenerationRequest.builder()
.eventDraftId(event.getEventId().getMostSignificantBits())
.eventTitle(event.getEventName() != null ? event.getEventName() : "")
.eventDescription(event.getDescription() != null ? event.getDescription() : "")
.styles(request.getStyles())
.platforms(request.getPlatforms())
// 이미지 생성 프롬프트 생성
String prompt = String.format("이벤트: %s, 설명: %s, 스타일: %s, 플랫폼: %s",
event.getEventName() != null ? event.getEventName() : "이벤트",
event.getDescription() != null ? event.getDescription() : "",
String.join(", ", request.getStyles()),
String.join(", ", request.getPlatforms()));
// Job 엔티티 생성
Job job = Job.builder()
.eventId(eventId)
.jobType(JobType.IMAGE_GENERATION)
.build();
// Content Service 호출
ContentJobResponse jobResponse = contentServiceClient.generateImages(contentRequest);
job = jobRepository.save(job);
log.info("Content Service 이미지 생성 요청 완료 - jobId: {}", jobResponse.getId());
// Kafka 메시지 발행
imageJobKafkaProducer.publishImageGenerationJob(
job.getJobId().toString(),
userId.toString(),
eventId.toString(),
prompt
);
log.info("이미지 생성 작업 메시지 발행 완료 - jobId: {}", job.getJobId());
// 응답 생성
return ImageGenerationResponse.builder()
.jobId(UUID.fromString(jobResponse.getId()))
.status(jobResponse.getStatus())
.jobId(job.getJobId())
.status(job.getStatus().name())
.message("이미지 생성 요청이 접수되었습니다.")
.createdAt(jobResponse.getCreatedAt())
.createdAt(job.getCreatedAt())
.build();
}
@@ -299,7 +323,7 @@ public class EventService {
// Kafka 메시지 발행
aiJobKafkaProducer.publishAIGenerationJob(
job.getJobId().toString(),
userId.getMostSignificantBits(), // Long으로 변환
userId.toString(),
eventId.toString(),
request.getStoreInfo().getStoreName(),
request.getStoreInfo().getCategory(),
@@ -518,6 +542,9 @@ public class EventService {
.status(event.getStatus())
.selectedImageId(event.getSelectedImageId())
.selectedImageUrl(event.getSelectedImageUrl())
.participants(event.getParticipants())
.targetParticipants(event.getTargetParticipants())
.roi(event.getRoi())
.generatedImages(
event.getGeneratedImages().stream()
.map(img -> EventDetailResponse.GeneratedImageDto.builder()
@@ -0,0 +1,46 @@
package com.kt.event.eventservice.application.service;
import java.util.UUID;
/**
* 알림 서비스 인터페이스
*
* 사용자에게 작업 완료/실패 알림을 전송하는 서비스입니다.
* WebSocket, SSE, Push Notification 등 다양한 방식으로 확장 가능합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
public interface NotificationService {
/**
* 작업 완료 알림 전송
*
* @param userId 사용자 ID
* @param jobId 작업 ID
* @param jobType 작업 타입
* @param message 알림 메시지
*/
void notifyJobCompleted(UUID userId, UUID jobId, String jobType, String message);
/**
* 작업 실패 알림 전송
*
* @param userId 사용자 ID
* @param jobId 작업 ID
* @param jobType 작업 타입
* @param errorMessage 에러 메시지
*/
void notifyJobFailed(UUID userId, UUID jobId, String jobType, String errorMessage);
/**
* 작업 진행 상태 알림 전송
*
* @param userId 사용자 ID
* @param jobId 작업 ID
* @param jobType 작업 타입
* @param progress 진행률 (0-100)
*/
void notifyJobProgress(UUID userId, UUID jobId, String jobType, int progress);
}
@@ -37,6 +37,7 @@ public class KafkaConfig {
/**
* Kafka Producer 설정
* Producer에서 JSON 문자열을 보내므로 StringSerializer 사용
*
* @return ProducerFactory 인스턴스
*/
@@ -45,8 +46,7 @@ public class KafkaConfig {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Producer 성능 최적화 설정
config.put(ProducerConfig.ACKS_CONFIG, "all");
@@ -83,14 +83,9 @@ public class KafkaConfig {
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
// 실제 Deserializer 설정
// 실제 Deserializer 설정 (Producer에서 JSON 문자열을 보내므로 StringDeserializer 사용)
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
// JsonDeserializer 설정
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.util.HashMap");
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -8,6 +8,12 @@ import org.springframework.security.config.annotation.web.configurers.AbstractHt
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import java.util.Arrays;
import java.util.List;
/**
* Spring Security 설정 클래스
@@ -34,8 +40,8 @@ public class SecurityConfig {
// CSRF 보호 비활성화 (개발 환경)
.csrf(AbstractHttpConfigurer::disable)
// CORS 설정
.cors(AbstractHttpConfigurer::disable)
// CORS 설정 활성화
.cors(cors -> cors.configurationSource(corsConfigurationSource()))
// 폼 로그인 비활성화
.formLogin(AbstractHttpConfigurer::disable)
@@ -62,4 +68,54 @@ public class SecurityConfig {
return http.build();
}
/**
* CORS 설정
* 개발 환경에서 프론트엔드(localhost:3000)의 요청을 허용합니다.
*
* @return CorsConfigurationSource CORS 설정 소스
*/
@Bean
public CorsConfigurationSource corsConfigurationSource() {
CorsConfiguration configuration = new CorsConfiguration();
// 허용할 Origin (개발 환경)
configuration.setAllowedOrigins(Arrays.asList(
"http://localhost:3000",
"http://127.0.0.1:3000"
));
// 허용할 HTTP 메서드
configuration.setAllowedMethods(Arrays.asList(
"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"
));
// 허용할 헤더
configuration.setAllowedHeaders(Arrays.asList(
"Authorization",
"Content-Type",
"X-Requested-With",
"Accept",
"Origin",
"Access-Control-Request-Method",
"Access-Control-Request-Headers"
));
// 인증 정보 포함 허용
configuration.setAllowCredentials(true);
// Preflight 요청 캐시 시간 (초)
configuration.setMaxAge(3600L);
// 노출할 응답 헤더
configuration.setExposedHeaders(Arrays.asList(
"Authorization",
"Content-Type"
));
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", configuration);
return source;
}
}
@@ -69,6 +69,17 @@ public class Event extends BaseTimeEntity {
@Column(name = "selected_image_url", length = 500)
private String selectedImageUrl;
@Column(name = "participants")
@Builder.Default
private Integer participants = 0;
@Column(name = "target_participants")
private Integer targetParticipants;
@Column(name = "roi")
@Builder.Default
private Double roi = 0.0;
@ElementCollection(fetch = FetchType.LAZY)
@CollectionTable(
name = "event_channels",
@@ -139,6 +150,57 @@ public class Event extends BaseTimeEntity {
this.channels.addAll(channels);
}
/**
* 목표 참여자 수 설정
*/
public void updateTargetParticipants(Integer targetParticipants) {
if (targetParticipants != null && targetParticipants < 0) {
throw new IllegalArgumentException("목표 참여자 수는 0 이상이어야 합니다.");
}
this.targetParticipants = targetParticipants;
}
/**
* 참여자 수 증가
*/
public void incrementParticipants() {
this.participants = (this.participants == null ? 0 : this.participants) + 1;
updateRoi();
}
/**
* 참여자 수 직접 설정
*/
public void updateParticipants(Integer participants) {
if (participants != null && participants < 0) {
throw new IllegalArgumentException("참여자 수는 0 이상이어야 합니다.");
}
this.participants = participants;
updateRoi();
}
/**
* ROI 계산 및 업데이트
* ROI = (참여자 수 / 목표 참여자 수) * 100
*/
private void updateRoi() {
if (this.targetParticipants != null && this.targetParticipants > 0) {
this.roi = ((double) (this.participants == null ? 0 : this.participants) / this.targetParticipants) * 100.0;
} else {
this.roi = 0.0;
}
}
/**
* ROI 직접 설정 (외부 계산값 사용)
*/
public void updateRoi(Double roi) {
if (roi != null && roi < 0) {
throw new IllegalArgumentException("ROI는 0 이상이어야 합니다.");
}
this.roi = roi;
}
/**
* 이벤트 배포 (상태 변경: DRAFT → PUBLISHED)
*/
@@ -157,9 +219,10 @@ public class Event extends BaseTimeEntity {
if (startDate.isAfter(endDate)) {
throw new IllegalStateException("시작일은 종료일보다 이전이어야 합니다.");
}
if (selectedImageId == null) {
throw new IllegalStateException("이미지를 선택해야 합니다.");
}
// TODO: Frontend에서 selectedImageId 추적 구현 후 주석 제거
// if (selectedImageId == null) {
// throw new IllegalStateException("이미지를 선택해야 합니다.");
// }
if (channels.isEmpty()) {
throw new IllegalStateException("배포 채널을 선택해야 합니다.");
}
@@ -59,6 +59,14 @@ public class Job extends BaseTimeEntity {
@Column(name = "completed_at")
private LocalDateTime completedAt;
@Column(name = "retry_count", nullable = false)
@Builder.Default
private int retryCount = 0;
@Column(name = "max_retry_count", nullable = false)
@Builder.Default
private int maxRetryCount = 3;
// ==== 비즈니스 로직 ==== //
/**
@@ -97,4 +105,30 @@ public class Job extends BaseTimeEntity {
this.errorMessage = errorMessage;
this.completedAt = LocalDateTime.now();
}
/**
* 재시도 가능 여부 확인
*/
public boolean canRetry() {
return this.retryCount < this.maxRetryCount;
}
/**
* 재시도 카운트 증가
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* 재시도 준비 (상태를 PENDING으로 변경)
*/
public void prepareRetry() {
if (!canRetry()) {
throw new IllegalStateException("최대 재시도 횟수를 초과했습니다.");
}
incrementRetryCount();
this.status = JobStatus.PENDING;
this.errorMessage = null;
}
}
@@ -2,6 +2,12 @@ package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage;
import com.kt.event.eventservice.application.service.NotificationService;
import com.kt.event.eventservice.domain.entity.AiRecommendation;
import com.kt.event.eventservice.domain.entity.Event;
import com.kt.event.eventservice.domain.entity.Job;
import com.kt.event.eventservice.domain.repository.EventRepository;
import com.kt.event.eventservice.domain.repository.JobRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
@@ -10,11 +16,18 @@ import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* AI 이벤트 생성 작업 메시지 구독 Consumer
*
* ai-event-generation-job 토픽의 메시지를 구독하여 처리합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Component
@@ -22,6 +35,9 @@ import org.springframework.stereotype.Component;
public class AIJobKafkaConsumer {
private final ObjectMapper objectMapper;
private final JobRepository jobRepository;
private final EventRepository eventRepository;
private final NotificationService notificationService;
/**
* AI 이벤트 생성 작업 메시지 수신 처리
@@ -74,29 +90,120 @@ public class AIJobKafkaConsumer {
*
* @param message AI 이벤트 생성 작업 메시지
*/
private void processAIEventGenerationJob(AIEventGenerationJobMessage message) {
switch (message.getStatus()) {
case "COMPLETED":
log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}",
message.getJobId(), message.getUserId());
// TODO: AI 추천 결과를 캐시 또는 DB에 저장
// TODO: 사용자에게 알림 전송
break;
@Transactional
protected void processAIEventGenerationJob(AIEventGenerationJobMessage message) {
try {
UUID jobId = UUID.fromString(message.getJobId());
case "FAILED":
log.error("AI 작업 실패 처리 - JobId: {}, Error: {}",
message.getJobId(), message.getErrorMessage());
// TODO: 실패 로그 저장 및 사용자 알림
break;
// Job 조회
Job job = jobRepository.findById(jobId).orElse(null);
if (job == null) {
log.warn("Job을 찾을 수 없습니다 - JobId: {}", message.getJobId());
return;
}
case "PROCESSING":
log.info("AI 작업 진행 중 - JobId: {}", message.getJobId());
// TODO: 작업 상태 업데이트
break;
UUID eventId = job.getEventId();
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}",
message.getJobId(), message.getStatus());
// Event 조회 (모든 케이스에서 사용)
Event event = eventRepository.findById(eventId).orElse(null);
switch (message.getStatus()) {
case "COMPLETED":
log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}",
message.getJobId(), message.getUserId());
// Job 상태 업데이트
if (message.getAiRecommendation() != null) {
// AI 추천 데이터를 JSON 문자열로 저장 (또는 별도 처리)
String recommendationData = objectMapper.writeValueAsString(message.getAiRecommendation());
job.complete(recommendationData);
} else {
job.complete("AI 추천 완료");
}
jobRepository.save(job);
// Event 조회 및 AI 추천 저장
if (event != null && message.getAiRecommendation() != null) {
var aiData = message.getAiRecommendation();
// AiRecommendation 엔티티 생성 및 Event에 추가
AiRecommendation aiRecommendation = AiRecommendation.builder()
.eventName(aiData.getEventTitle())
.description(aiData.getEventDescription())
.promotionType(aiData.getEventType())
.targetAudience(aiData.getTargetKeywords() != null ?
String.join(", ", aiData.getTargetKeywords()) : null)
.build();
event.addAiRecommendation(aiRecommendation);
eventRepository.save(event);
log.info("AI 추천 저장 완료 - EventId: {}, RecommendationTitle: {}",
eventId, aiData.getEventTitle());
// 사용자에게 알림 전송
UUID userId = event.getUserId();
notificationService.notifyJobCompleted(
userId,
jobId,
"AI_RECOMMENDATION",
"AI 추천이 완료되었습니다."
);
} else {
if (event == null) {
log.warn("Event를 찾을 수 없습니다 - EventId: {}", eventId);
}
}
break;
case "FAILED":
log.error("AI 작업 실패 처리 - JobId: {}, Error: {}",
message.getJobId(), message.getErrorMessage());
// Job 상태 업데이트
job.fail(message.getErrorMessage());
jobRepository.save(job);
// 사용자에게 실패 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobFailed(
userId,
jobId,
"AI_RECOMMENDATION",
"AI 추천에 실패했습니다: " + message.getErrorMessage()
);
}
break;
case "PROCESSING":
log.info("AI 작업 진행 중 - JobId: {}", message.getJobId());
// Job 상태 업데이트
job.start();
jobRepository.save(job);
// 사용자에게 진행 상태 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobProgress(
userId,
jobId,
"AI_RECOMMENDATION",
job.getProgress()
);
}
break;
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}",
message.getJobId(), message.getStatus());
}
} catch (Exception e) {
log.error("AI 작업 처리 중 예외 발생 - JobId: {}, Error: {}",
message.getJobId(), e.getMessage(), e);
throw new RuntimeException(e);
}
}
}
@@ -1,5 +1,6 @@
package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -26,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
public class AIJobKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Value("${app.kafka.topics.ai-event-generation-job:ai-event-generation-job}")
private String aiEventGenerationJobTopic;
@@ -33,9 +35,9 @@ public class AIJobKafkaProducer {
/**
* AI 이벤트 생성 작업 메시지 발행
*
* @param jobId 작업 ID
* @param userId 사용자 ID
* @param eventId 이벤트 ID
* @param jobId 작업 ID (UUID String)
* @param userId 사용자 ID (UUID String)
* @param eventId 이벤트 ID (UUID String)
* @param storeName 매장명
* @param storeCategory 매장 업종
* @param storeDescription 매장 설명
@@ -43,7 +45,7 @@ public class AIJobKafkaProducer {
*/
public void publishAIGenerationJob(
String jobId,
Long userId,
String userId,
String eventId,
String storeName,
String storeCategory,
@@ -67,8 +69,11 @@ public class AIJobKafkaProducer {
*/
public void publishMessage(AIEventGenerationJobMessage message) {
try {
// JSON 문자열로 변환
String jsonMessage = objectMapper.writeValueAsString(message);
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(aiEventGenerationJobTopic, message.getJobId(), message);
kafkaTemplate.send(aiEventGenerationJobTopic, message.getJobId(), jsonMessage);
future.whenComplete((result, ex) -> {
if (ex == null) {
@@ -2,6 +2,12 @@ package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage;
import com.kt.event.eventservice.application.service.NotificationService;
import com.kt.event.eventservice.domain.entity.Event;
import com.kt.event.eventservice.domain.entity.GeneratedImage;
import com.kt.event.eventservice.domain.entity.Job;
import com.kt.event.eventservice.domain.repository.EventRepository;
import com.kt.event.eventservice.domain.repository.JobRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
@@ -10,11 +16,18 @@ import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* 이미지 생성 작업 메시지 구독 Consumer
*
* image-generation-job 토픽의 메시지를 구독하여 처리합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Component
@@ -22,6 +35,10 @@ import org.springframework.stereotype.Component;
public class ImageJobKafkaConsumer {
private final ObjectMapper objectMapper;
private final JobRepository jobRepository;
private final EventRepository eventRepository;
private final NotificationService notificationService;
private final ImageJobKafkaProducer imageJobKafkaProducer;
/**
* 이미지 생성 작업 메시지 수신 처리
@@ -74,32 +91,136 @@ public class ImageJobKafkaConsumer {
*
* @param message 이미지 생성 작업 메시지
*/
private void processImageGenerationJob(ImageGenerationJobMessage message) {
switch (message.getStatus()) {
case "COMPLETED":
log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}",
message.getJobId(), message.getEventId(), message.getImageUrl());
// TODO: 생성된 이미지 URL을 캐시 또는 DB에 저장
// TODO: 이벤트 엔티티에 이미지 URL 업데이트
// TODO: 사용자에게 알림 전송
break;
@Transactional
protected void processImageGenerationJob(ImageGenerationJobMessage message) {
try {
UUID jobId = UUID.fromString(message.getJobId());
UUID eventId = UUID.fromString(message.getEventId());
case "FAILED":
log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}",
message.getJobId(), message.getEventId(), message.getErrorMessage());
// TODO: 실패 로그 저장 및 사용자 알림
// TODO: 재시도 로직 또는 기본 이미지 사용
break;
// Job 조회
Job job = jobRepository.findById(jobId).orElse(null);
if (job == null) {
log.warn("Job을 찾을 수 없습니다 - JobId: {}", message.getJobId());
return;
}
case "PROCESSING":
log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}",
message.getJobId(), message.getEventId());
// TODO: 작업 상태 업데이트
break;
// Event 조회 (모든 케이스에서 사용)
Event event = eventRepository.findById(eventId).orElse(null);
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}",
message.getJobId(), message.getEventId(), message.getStatus());
switch (message.getStatus()) {
case "COMPLETED":
log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}",
message.getJobId(), message.getEventId(), message.getImageUrl());
// Job 상태 업데이트
job.complete(message.getImageUrl());
jobRepository.save(job);
// Event 조회
if (event != null) {
// GeneratedImage 엔티티 생성 및 Event에 추가
GeneratedImage generatedImage = GeneratedImage.builder()
.imageUrl(message.getImageUrl())
.build();
event.addGeneratedImage(generatedImage);
eventRepository.save(event);
log.info("이미지 저장 완료 - EventId: {}, ImageURL: {}",
eventId, message.getImageUrl());
// 사용자에게 알림 전송
UUID userId = event.getUserId();
notificationService.notifyJobCompleted(
userId,
jobId,
"IMAGE_GENERATION",
"이미지 생성이 완료되었습니다."
);
} else {
log.warn("Event를 찾을 수 없습니다 - EventId: {}", eventId);
}
break;
case "FAILED":
log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}",
message.getJobId(), message.getEventId(), message.getErrorMessage());
// 재시도 로직
if (job.canRetry()) {
log.info("이미지 생성 재시도 - JobId: {}, RetryCount: {}/{}",
jobId, job.getRetryCount() + 1, job.getMaxRetryCount());
// 재시도 준비
job.prepareRetry();
jobRepository.save(job);
// 재시도 메시지 발행
if (event != null) {
String prompt = String.format("이벤트: %s (재시도 %d/%d)",
event.getEventName() != null ? event.getEventName() : "이벤트",
job.getRetryCount(),
job.getMaxRetryCount());
imageJobKafkaProducer.publishImageGenerationJob(
jobId.toString(),
message.getUserId(),
eventId.toString(),
prompt
);
log.info("이미지 생성 재시도 메시지 발행 완료 - JobId: {}", jobId);
}
} else {
// 최대 재시도 횟수 초과 - 완전 실패 처리
log.error("이미지 생성 최대 재시도 횟수 초과 - JobId: {}, RetryCount: {}",
jobId, job.getRetryCount());
job.fail(message.getErrorMessage());
jobRepository.save(job);
// 사용자에게 실패 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobFailed(
userId,
jobId,
"IMAGE_GENERATION",
"이미지 생성에 실패했습니다: " + message.getErrorMessage()
);
}
}
break;
case "PROCESSING":
log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}",
message.getJobId(), message.getEventId());
// Job 상태 업데이트
job.start();
jobRepository.save(job);
// 사용자에게 진행 상태 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobProgress(
userId,
jobId,
"IMAGE_GENERATION",
job.getProgress()
);
}
break;
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}",
message.getJobId(), message.getEventId(), message.getStatus());
}
} catch (Exception e) {
log.error("이미지 작업 처리 중 예외 발생 - JobId: {}, Error: {}",
message.getJobId(), e.getMessage(), e);
throw e;
}
}
}
@@ -0,0 +1,93 @@
package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
/**
* 이미지 생성 작업 메시지 발행 Producer
*
* image-generation-job 토픽에 이미지 생성 작업 메시지를 발행합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ImageJobKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Value("${app.kafka.topics.image-generation-job:image-generation-job}")
private String imageGenerationJobTopic;
/**
* 이미지 생성 작업 메시지 발행
*
* @param jobId 작업 ID (UUID)
* @param userId 사용자 ID (UUID)
* @param eventId 이벤트 ID (UUID)
* @param prompt 이미지 생성 프롬프트
*/
public void publishImageGenerationJob(
String jobId,
String userId,
String eventId,
String prompt) {
ImageGenerationJobMessage message = ImageGenerationJobMessage.builder()
.jobId(jobId)
.userId(userId)
.eventId(eventId)
.prompt(prompt)
.status("PENDING")
.createdAt(LocalDateTime.now())
.build();
publishMessage(message);
}
/**
* 이미지 생성 작업 메시지 발행
*
* @param message ImageGenerationJobMessage 객체
*/
public void publishMessage(ImageGenerationJobMessage message) {
try {
// JSON 문자열로 변환
String jsonMessage = objectMapper.writeValueAsString(message);
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(imageGenerationJobTopic, message.getJobId(), jsonMessage);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("이미지 생성 작업 메시지 발행 성공 - Topic: {}, JobId: {}, EventId: {}, Offset: {}",
imageGenerationJobTopic,
message.getJobId(),
message.getEventId(),
result.getRecordMetadata().offset());
} else {
log.error("이미지 생성 작업 메시지 발행 실패 - Topic: {}, JobId: {}, Error: {}",
imageGenerationJobTopic,
message.getJobId(),
ex.getMessage(), ex);
}
});
} catch (Exception e) {
log.error("이미지 생성 작업 메시지 발행 중 예외 발생 - JobId: {}, Error: {}",
message.getJobId(), e.getMessage(), e);
}
}
}
@@ -0,0 +1,46 @@
package com.kt.event.eventservice.infrastructure.notification;
import com.kt.event.eventservice.application.service.NotificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* 로깅 기반 알림 서비스 구현
*
* 현재는 로그로만 알림을 기록하며, 추후 WebSocket, SSE, Push Notification 등으로 확장 가능합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Service
public class LoggingNotificationService implements NotificationService {
@Override
public void notifyJobCompleted(UUID userId, UUID jobId, String jobType, String message) {
log.info("📢 [작업 완료 알림] UserId: {}, JobId: {}, JobType: {}, Message: {}",
userId, jobId, jobType, message);
// TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송
// 예: webSocketTemplate.convertAndSendToUser(userId.toString(), "/queue/notifications", notification);
}
@Override
public void notifyJobFailed(UUID userId, UUID jobId, String jobType, String errorMessage) {
log.error("📢 [작업 실패 알림] UserId: {}, JobId: {}, JobType: {}, Error: {}",
userId, jobId, jobType, errorMessage);
// TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송
}
@Override
public void notifyJobProgress(UUID userId, UUID jobId, String jobType, int progress) {
log.info("📢 [작업 진행 알림] UserId: {}, JobId: {}, JobType: {}, Progress: {}%",
userId, jobId, jobType, progress);
// TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송
}
}