From bcfbb6c7f92e218e75e95f2193ee64c881515ec0 Mon Sep 17 00:00:00 2001 From: merrycoral Date: Wed, 29 Oct 2025 15:00:20 +0900 Subject: [PATCH] =?UTF-8?q?Kafka=20=EB=A9=94=EC=8B=9C=EC=A7=80=20=EA=B5=AC?= =?UTF-8?q?=EC=A1=B0=20=EA=B0=9C=EC=84=A0=20=EB=B0=8F=20=EC=95=8C=EB=A6=BC?= =?UTF-8?q?=20=EC=84=9C=EB=B9=84=EC=8A=A4=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafka/AIEventGenerationJobMessage.java | 4 +- .../dto/kafka/ImageGenerationJobMessage.java | 8 +- .../application/service/EventService.java | 42 +++-- .../service/NotificationService.java | 46 +++++ .../eventservice/config/KafkaConfig.java | 13 +- .../event/eventservice/domain/entity/Job.java | 34 ++++ .../kafka/AIJobKafkaConsumer.java | 147 ++++++++++++--- .../kafka/AIJobKafkaProducer.java | 15 +- .../kafka/ImageJobKafkaConsumer.java | 167 +++++++++++++++--- .../kafka/ImageJobKafkaProducer.java | 93 ++++++++++ .../LoggingNotificationService.java | 46 +++++ 11 files changed, 538 insertions(+), 77 deletions(-) create mode 100644 event-service/src/main/java/com/kt/event/eventservice/application/service/NotificationService.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaProducer.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/infrastructure/notification/LoggingNotificationService.java diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java index 966778f..7d8b2fe 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java +++ b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java @@ -27,10 +27,10 @@ public class AIEventGenerationJobMessage { private String jobId; /** - * 사용자 ID + * 사용자 ID (UUID String) */ @JsonProperty("user_id") - private Long userId; + private String userId; /** * 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED) diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java index dd52243..9d1c492 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java +++ b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java @@ -26,16 +26,16 @@ public class ImageGenerationJobMessage { private String jobId; /** - * 이벤트 ID + * 이벤트 ID (UUID String) */ @JsonProperty("event_id") - private Long eventId; + private String eventId; /** - * 사용자 ID + * 사용자 ID (UUID String) */ @JsonProperty("user_id") - private Long userId; + private String userId; /** * 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED) diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/service/EventService.java b/event-service/src/main/java/com/kt/event/eventservice/application/service/EventService.java index f0ce544..79ffd4d 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/application/service/EventService.java +++ b/event-service/src/main/java/com/kt/event/eventservice/application/service/EventService.java @@ -4,6 +4,7 @@ import com.kt.event.common.exception.BusinessException; import com.kt.event.common.exception.ErrorCode; import com.kt.event.eventservice.application.dto.request.*; import com.kt.event.eventservice.application.dto.response.*; +import com.kt.event.eventservice.domain.enums.JobStatus; import com.kt.event.eventservice.domain.enums.JobType; import com.kt.event.eventservice.domain.entity.*; import com.kt.event.eventservice.domain.enums.EventStatus; @@ -14,6 +15,7 @@ import com.kt.event.eventservice.infrastructure.client.dto.ContentImageGeneratio import com.kt.event.eventservice.infrastructure.client.dto.ContentJobResponse; import com.kt.event.eventservice.infrastructure.kafka.AIJobKafkaProducer; import com.kt.event.eventservice.infrastructure.kafka.EventKafkaProducer; +import com.kt.event.eventservice.infrastructure.kafka.ImageJobKafkaProducer; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.hibernate.Hibernate; @@ -44,6 +46,7 @@ public class EventService { private final JobRepository jobRepository; private final ContentServiceClient contentServiceClient; private final AIJobKafkaProducer aiJobKafkaProducer; + private final ImageJobKafkaProducer imageJobKafkaProducer; private final EventKafkaProducer eventKafkaProducer; /** @@ -225,26 +228,37 @@ public class EventService { throw new BusinessException(ErrorCode.EVENT_002); } - // Content Service 요청 DTO 생성 - ContentImageGenerationRequest contentRequest = ContentImageGenerationRequest.builder() - .eventDraftId(event.getEventId().getMostSignificantBits()) - .eventTitle(event.getEventName() != null ? event.getEventName() : "") - .eventDescription(event.getDescription() != null ? event.getDescription() : "") - .styles(request.getStyles()) - .platforms(request.getPlatforms()) + // 이미지 생성 프롬프트 생성 + String prompt = String.format("이벤트: %s, 설명: %s, 스타일: %s, 플랫폼: %s", + event.getEventName() != null ? event.getEventName() : "이벤트", + event.getDescription() != null ? event.getDescription() : "", + String.join(", ", request.getStyles()), + String.join(", ", request.getPlatforms())); + + // Job 엔티티 생성 + Job job = Job.builder() + .eventId(eventId) + .jobType(JobType.IMAGE_GENERATION) .build(); - // Content Service 호출 - ContentJobResponse jobResponse = contentServiceClient.generateImages(contentRequest); + job = jobRepository.save(job); - log.info("Content Service 이미지 생성 요청 완료 - jobId: {}", jobResponse.getId()); + // Kafka 메시지 발행 + imageJobKafkaProducer.publishImageGenerationJob( + job.getJobId().toString(), + userId.toString(), + eventId.toString(), + prompt + ); + + log.info("이미지 생성 작업 메시지 발행 완료 - jobId: {}", job.getJobId()); // 응답 생성 return ImageGenerationResponse.builder() - .jobId(UUID.fromString(jobResponse.getId())) - .status(jobResponse.getStatus()) + .jobId(job.getJobId()) + .status(job.getStatus().name()) .message("이미지 생성 요청이 접수되었습니다.") - .createdAt(jobResponse.getCreatedAt()) + .createdAt(job.getCreatedAt()) .build(); } @@ -309,7 +323,7 @@ public class EventService { // Kafka 메시지 발행 aiJobKafkaProducer.publishAIGenerationJob( job.getJobId().toString(), - userId.getMostSignificantBits(), // Long으로 변환 + userId.toString(), eventId.toString(), request.getStoreInfo().getStoreName(), request.getStoreInfo().getCategory(), diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/service/NotificationService.java b/event-service/src/main/java/com/kt/event/eventservice/application/service/NotificationService.java new file mode 100644 index 0000000..6e32315 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/application/service/NotificationService.java @@ -0,0 +1,46 @@ +package com.kt.event.eventservice.application.service; + +import java.util.UUID; + +/** + * 알림 서비스 인터페이스 + * + * 사용자에게 작업 완료/실패 알림을 전송하는 서비스입니다. + * WebSocket, SSE, Push Notification 등 다양한 방식으로 확장 가능합니다. + * + * @author Event Service Team + * @version 1.0.0 + * @since 2025-10-29 + */ +public interface NotificationService { + + /** + * 작업 완료 알림 전송 + * + * @param userId 사용자 ID + * @param jobId 작업 ID + * @param jobType 작업 타입 + * @param message 알림 메시지 + */ + void notifyJobCompleted(UUID userId, UUID jobId, String jobType, String message); + + /** + * 작업 실패 알림 전송 + * + * @param userId 사용자 ID + * @param jobId 작업 ID + * @param jobType 작업 타입 + * @param errorMessage 에러 메시지 + */ + void notifyJobFailed(UUID userId, UUID jobId, String jobType, String errorMessage); + + /** + * 작업 진행 상태 알림 전송 + * + * @param userId 사용자 ID + * @param jobId 작업 ID + * @param jobType 작업 타입 + * @param progress 진행률 (0-100) + */ + void notifyJobProgress(UUID userId, UUID jobId, String jobType, int progress); +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/config/KafkaConfig.java b/event-service/src/main/java/com/kt/event/eventservice/config/KafkaConfig.java index 632327c..b9d661d 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/config/KafkaConfig.java +++ b/event-service/src/main/java/com/kt/event/eventservice/config/KafkaConfig.java @@ -37,6 +37,7 @@ public class KafkaConfig { /** * Kafka Producer 설정 + * Producer에서 JSON 문자열을 보내므로 StringSerializer 사용 * * @return ProducerFactory 인스턴스 */ @@ -45,8 +46,7 @@ public class KafkaConfig { Map config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Producer 성능 최적화 설정 config.put(ProducerConfig.ACKS_CONFIG, "all"); @@ -83,14 +83,9 @@ public class KafkaConfig { config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); - // 실제 Deserializer 설정 + // 실제 Deserializer 설정 (Producer에서 JSON 문자열을 보내므로 StringDeserializer 사용) config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); - config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class); - - // JsonDeserializer 설정 - config.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); - config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); - config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.util.HashMap"); + config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); diff --git a/event-service/src/main/java/com/kt/event/eventservice/domain/entity/Job.java b/event-service/src/main/java/com/kt/event/eventservice/domain/entity/Job.java index 818dc30..4ca3f73 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/domain/entity/Job.java +++ b/event-service/src/main/java/com/kt/event/eventservice/domain/entity/Job.java @@ -59,6 +59,14 @@ public class Job extends BaseTimeEntity { @Column(name = "completed_at") private LocalDateTime completedAt; + @Column(name = "retry_count", nullable = false) + @Builder.Default + private int retryCount = 0; + + @Column(name = "max_retry_count", nullable = false) + @Builder.Default + private int maxRetryCount = 3; + // ==== 비즈니스 로직 ==== // /** @@ -97,4 +105,30 @@ public class Job extends BaseTimeEntity { this.errorMessage = errorMessage; this.completedAt = LocalDateTime.now(); } + + /** + * 재시도 가능 여부 확인 + */ + public boolean canRetry() { + return this.retryCount < this.maxRetryCount; + } + + /** + * 재시도 카운트 증가 + */ + public void incrementRetryCount() { + this.retryCount++; + } + + /** + * 재시도 준비 (상태를 PENDING으로 변경) + */ + public void prepareRetry() { + if (!canRetry()) { + throw new IllegalStateException("최대 재시도 횟수를 초과했습니다."); + } + incrementRetryCount(); + this.status = JobStatus.PENDING; + this.errorMessage = null; + } } diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java index f4f1608..6d87699 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java @@ -2,6 +2,12 @@ package com.kt.event.eventservice.infrastructure.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage; +import com.kt.event.eventservice.application.service.NotificationService; +import com.kt.event.eventservice.domain.entity.AiRecommendation; +import com.kt.event.eventservice.domain.entity.Event; +import com.kt.event.eventservice.domain.entity.Job; +import com.kt.event.eventservice.domain.repository.EventRepository; +import com.kt.event.eventservice.domain.repository.JobRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; @@ -10,11 +16,18 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.UUID; /** * AI 이벤트 생성 작업 메시지 구독 Consumer * * ai-event-generation-job 토픽의 메시지를 구독하여 처리합니다. + * + * @author Event Service Team + * @version 1.0.0 + * @since 2025-10-29 */ @Slf4j @Component @@ -22,6 +35,9 @@ import org.springframework.stereotype.Component; public class AIJobKafkaConsumer { private final ObjectMapper objectMapper; + private final JobRepository jobRepository; + private final EventRepository eventRepository; + private final NotificationService notificationService; /** * AI 이벤트 생성 작업 메시지 수신 처리 @@ -74,29 +90,120 @@ public class AIJobKafkaConsumer { * * @param message AI 이벤트 생성 작업 메시지 */ - private void processAIEventGenerationJob(AIEventGenerationJobMessage message) { - switch (message.getStatus()) { - case "COMPLETED": - log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}", - message.getJobId(), message.getUserId()); - // TODO: AI 추천 결과를 캐시 또는 DB에 저장 - // TODO: 사용자에게 알림 전송 - break; + @Transactional + protected void processAIEventGenerationJob(AIEventGenerationJobMessage message) { + try { + UUID jobId = UUID.fromString(message.getJobId()); - case "FAILED": - log.error("AI 작업 실패 처리 - JobId: {}, Error: {}", - message.getJobId(), message.getErrorMessage()); - // TODO: 실패 로그 저장 및 사용자 알림 - break; + // Job 조회 + Job job = jobRepository.findById(jobId).orElse(null); + if (job == null) { + log.warn("Job을 찾을 수 없습니다 - JobId: {}", message.getJobId()); + return; + } - case "PROCESSING": - log.info("AI 작업 진행 중 - JobId: {}", message.getJobId()); - // TODO: 작업 상태 업데이트 - break; + UUID eventId = job.getEventId(); - default: - log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}", - message.getJobId(), message.getStatus()); + // Event 조회 (모든 케이스에서 사용) + Event event = eventRepository.findById(eventId).orElse(null); + + switch (message.getStatus()) { + case "COMPLETED": + log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}", + message.getJobId(), message.getUserId()); + + // Job 상태 업데이트 + if (message.getAiRecommendation() != null) { + // AI 추천 데이터를 JSON 문자열로 저장 (또는 별도 처리) + String recommendationData = objectMapper.writeValueAsString(message.getAiRecommendation()); + job.complete(recommendationData); + } else { + job.complete("AI 추천 완료"); + } + jobRepository.save(job); + + // Event 조회 및 AI 추천 저장 + if (event != null && message.getAiRecommendation() != null) { + var aiData = message.getAiRecommendation(); + + // AiRecommendation 엔티티 생성 및 Event에 추가 + AiRecommendation aiRecommendation = AiRecommendation.builder() + .eventName(aiData.getEventTitle()) + .description(aiData.getEventDescription()) + .promotionType(aiData.getEventType()) + .targetAudience(aiData.getTargetKeywords() != null ? + String.join(", ", aiData.getTargetKeywords()) : null) + .build(); + + event.addAiRecommendation(aiRecommendation); + eventRepository.save(event); + + log.info("AI 추천 저장 완료 - EventId: {}, RecommendationTitle: {}", + eventId, aiData.getEventTitle()); + + // 사용자에게 알림 전송 + UUID userId = event.getUserId(); + notificationService.notifyJobCompleted( + userId, + jobId, + "AI_RECOMMENDATION", + "AI 추천이 완료되었습니다." + ); + } else { + if (event == null) { + log.warn("Event를 찾을 수 없습니다 - EventId: {}", eventId); + } + } + break; + + case "FAILED": + log.error("AI 작업 실패 처리 - JobId: {}, Error: {}", + message.getJobId(), message.getErrorMessage()); + + // Job 상태 업데이트 + job.fail(message.getErrorMessage()); + jobRepository.save(job); + + // 사용자에게 실패 알림 전송 + if (event != null) { + UUID userId = event.getUserId(); + notificationService.notifyJobFailed( + userId, + jobId, + "AI_RECOMMENDATION", + "AI 추천에 실패했습니다: " + message.getErrorMessage() + ); + } + break; + + case "PROCESSING": + log.info("AI 작업 진행 중 - JobId: {}", message.getJobId()); + + // Job 상태 업데이트 + job.start(); + jobRepository.save(job); + + // 사용자에게 진행 상태 알림 전송 + if (event != null) { + UUID userId = event.getUserId(); + notificationService.notifyJobProgress( + userId, + jobId, + "AI_RECOMMENDATION", + job.getProgress() + ); + } + break; + + default: + log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}", + message.getJobId(), message.getStatus()); + } + + } catch (Exception e) { + log.error("AI 작업 처리 중 예외 발생 - JobId: {}, Error: {}", + message.getJobId(), e.getMessage(), e); + throw new RuntimeException(e); } } } diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaProducer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaProducer.java index c60a72c..05f179f 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaProducer.java +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaProducer.java @@ -1,5 +1,6 @@ package com.kt.event.eventservice.infrastructure.kafka; +import com.fasterxml.jackson.databind.ObjectMapper; import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -26,6 +27,7 @@ import java.util.concurrent.CompletableFuture; public class AIJobKafkaProducer { private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; @Value("${app.kafka.topics.ai-event-generation-job:ai-event-generation-job}") private String aiEventGenerationJobTopic; @@ -33,9 +35,9 @@ public class AIJobKafkaProducer { /** * AI 이벤트 생성 작업 메시지 발행 * - * @param jobId 작업 ID - * @param userId 사용자 ID - * @param eventId 이벤트 ID + * @param jobId 작업 ID (UUID String) + * @param userId 사용자 ID (UUID String) + * @param eventId 이벤트 ID (UUID String) * @param storeName 매장명 * @param storeCategory 매장 업종 * @param storeDescription 매장 설명 @@ -43,7 +45,7 @@ public class AIJobKafkaProducer { */ public void publishAIGenerationJob( String jobId, - Long userId, + String userId, String eventId, String storeName, String storeCategory, @@ -67,8 +69,11 @@ public class AIJobKafkaProducer { */ public void publishMessage(AIEventGenerationJobMessage message) { try { + // JSON 문자열로 변환 + String jsonMessage = objectMapper.writeValueAsString(message); + CompletableFuture> future = - kafkaTemplate.send(aiEventGenerationJobTopic, message.getJobId(), message); + kafkaTemplate.send(aiEventGenerationJobTopic, message.getJobId(), jsonMessage); future.whenComplete((result, ex) -> { if (ex == null) { diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java index f66f3e7..515bac9 100644 --- a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java @@ -2,6 +2,12 @@ package com.kt.event.eventservice.infrastructure.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage; +import com.kt.event.eventservice.application.service.NotificationService; +import com.kt.event.eventservice.domain.entity.Event; +import com.kt.event.eventservice.domain.entity.GeneratedImage; +import com.kt.event.eventservice.domain.entity.Job; +import com.kt.event.eventservice.domain.repository.EventRepository; +import com.kt.event.eventservice.domain.repository.JobRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.annotation.KafkaListener; @@ -10,11 +16,18 @@ import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.UUID; /** * 이미지 생성 작업 메시지 구독 Consumer * * image-generation-job 토픽의 메시지를 구독하여 처리합니다. + * + * @author Event Service Team + * @version 1.0.0 + * @since 2025-10-29 */ @Slf4j @Component @@ -22,6 +35,10 @@ import org.springframework.stereotype.Component; public class ImageJobKafkaConsumer { private final ObjectMapper objectMapper; + private final JobRepository jobRepository; + private final EventRepository eventRepository; + private final NotificationService notificationService; + private final ImageJobKafkaProducer imageJobKafkaProducer; /** * 이미지 생성 작업 메시지 수신 처리 @@ -74,32 +91,136 @@ public class ImageJobKafkaConsumer { * * @param message 이미지 생성 작업 메시지 */ - private void processImageGenerationJob(ImageGenerationJobMessage message) { - switch (message.getStatus()) { - case "COMPLETED": - log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}", - message.getJobId(), message.getEventId(), message.getImageUrl()); - // TODO: 생성된 이미지 URL을 캐시 또는 DB에 저장 - // TODO: 이벤트 엔티티에 이미지 URL 업데이트 - // TODO: 사용자에게 알림 전송 - break; + @Transactional + protected void processImageGenerationJob(ImageGenerationJobMessage message) { + try { + UUID jobId = UUID.fromString(message.getJobId()); + UUID eventId = UUID.fromString(message.getEventId()); - case "FAILED": - log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}", - message.getJobId(), message.getEventId(), message.getErrorMessage()); - // TODO: 실패 로그 저장 및 사용자 알림 - // TODO: 재시도 로직 또는 기본 이미지 사용 - break; + // Job 조회 + Job job = jobRepository.findById(jobId).orElse(null); + if (job == null) { + log.warn("Job을 찾을 수 없습니다 - JobId: {}", message.getJobId()); + return; + } - case "PROCESSING": - log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}", - message.getJobId(), message.getEventId()); - // TODO: 작업 상태 업데이트 - break; + // Event 조회 (모든 케이스에서 사용) + Event event = eventRepository.findById(eventId).orElse(null); - default: - log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}", - message.getJobId(), message.getEventId(), message.getStatus()); + switch (message.getStatus()) { + case "COMPLETED": + log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}", + message.getJobId(), message.getEventId(), message.getImageUrl()); + + // Job 상태 업데이트 + job.complete(message.getImageUrl()); + jobRepository.save(job); + + // Event 조회 + if (event != null) { + // GeneratedImage 엔티티 생성 및 Event에 추가 + GeneratedImage generatedImage = GeneratedImage.builder() + .imageUrl(message.getImageUrl()) + .build(); + + event.addGeneratedImage(generatedImage); + eventRepository.save(event); + + log.info("이미지 저장 완료 - EventId: {}, ImageURL: {}", + eventId, message.getImageUrl()); + + // 사용자에게 알림 전송 + UUID userId = event.getUserId(); + notificationService.notifyJobCompleted( + userId, + jobId, + "IMAGE_GENERATION", + "이미지 생성이 완료되었습니다." + ); + } else { + log.warn("Event를 찾을 수 없습니다 - EventId: {}", eventId); + } + break; + + case "FAILED": + log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}", + message.getJobId(), message.getEventId(), message.getErrorMessage()); + + // 재시도 로직 + if (job.canRetry()) { + log.info("이미지 생성 재시도 - JobId: {}, RetryCount: {}/{}", + jobId, job.getRetryCount() + 1, job.getMaxRetryCount()); + + // 재시도 준비 + job.prepareRetry(); + jobRepository.save(job); + + // 재시도 메시지 발행 + if (event != null) { + String prompt = String.format("이벤트: %s (재시도 %d/%d)", + event.getEventName() != null ? event.getEventName() : "이벤트", + job.getRetryCount(), + job.getMaxRetryCount()); + + imageJobKafkaProducer.publishImageGenerationJob( + jobId.toString(), + message.getUserId(), + eventId.toString(), + prompt + ); + + log.info("이미지 생성 재시도 메시지 발행 완료 - JobId: {}", jobId); + } + } else { + // 최대 재시도 횟수 초과 - 완전 실패 처리 + log.error("이미지 생성 최대 재시도 횟수 초과 - JobId: {}, RetryCount: {}", + jobId, job.getRetryCount()); + + job.fail(message.getErrorMessage()); + jobRepository.save(job); + + // 사용자에게 실패 알림 전송 + if (event != null) { + UUID userId = event.getUserId(); + notificationService.notifyJobFailed( + userId, + jobId, + "IMAGE_GENERATION", + "이미지 생성에 실패했습니다: " + message.getErrorMessage() + ); + } + } + break; + + case "PROCESSING": + log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}", + message.getJobId(), message.getEventId()); + + // Job 상태 업데이트 + job.start(); + jobRepository.save(job); + + // 사용자에게 진행 상태 알림 전송 + if (event != null) { + UUID userId = event.getUserId(); + notificationService.notifyJobProgress( + userId, + jobId, + "IMAGE_GENERATION", + job.getProgress() + ); + } + break; + + default: + log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}", + message.getJobId(), message.getEventId(), message.getStatus()); + } + + } catch (Exception e) { + log.error("이미지 작업 처리 중 예외 발생 - JobId: {}, Error: {}", + message.getJobId(), e.getMessage(), e); + throw e; } } } diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaProducer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaProducer.java new file mode 100644 index 0000000..94dbbc5 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaProducer.java @@ -0,0 +1,93 @@ +package com.kt.event.eventservice.infrastructure.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.concurrent.CompletableFuture; + +/** + * 이미지 생성 작업 메시지 발행 Producer + * + * image-generation-job 토픽에 이미지 생성 작업 메시지를 발행합니다. + * + * @author Event Service Team + * @version 1.0.0 + * @since 2025-10-29 + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ImageJobKafkaProducer { + + private final KafkaTemplate kafkaTemplate; + private final ObjectMapper objectMapper; + + @Value("${app.kafka.topics.image-generation-job:image-generation-job}") + private String imageGenerationJobTopic; + + /** + * 이미지 생성 작업 메시지 발행 + * + * @param jobId 작업 ID (UUID) + * @param userId 사용자 ID (UUID) + * @param eventId 이벤트 ID (UUID) + * @param prompt 이미지 생성 프롬프트 + */ + public void publishImageGenerationJob( + String jobId, + String userId, + String eventId, + String prompt) { + + ImageGenerationJobMessage message = ImageGenerationJobMessage.builder() + .jobId(jobId) + .userId(userId) + .eventId(eventId) + .prompt(prompt) + .status("PENDING") + .createdAt(LocalDateTime.now()) + .build(); + + publishMessage(message); + } + + /** + * 이미지 생성 작업 메시지 발행 + * + * @param message ImageGenerationJobMessage 객체 + */ + public void publishMessage(ImageGenerationJobMessage message) { + try { + // JSON 문자열로 변환 + String jsonMessage = objectMapper.writeValueAsString(message); + + CompletableFuture> future = + kafkaTemplate.send(imageGenerationJobTopic, message.getJobId(), jsonMessage); + + future.whenComplete((result, ex) -> { + if (ex == null) { + log.info("이미지 생성 작업 메시지 발행 성공 - Topic: {}, JobId: {}, EventId: {}, Offset: {}", + imageGenerationJobTopic, + message.getJobId(), + message.getEventId(), + result.getRecordMetadata().offset()); + } else { + log.error("이미지 생성 작업 메시지 발행 실패 - Topic: {}, JobId: {}, Error: {}", + imageGenerationJobTopic, + message.getJobId(), + ex.getMessage(), ex); + } + }); + } catch (Exception e) { + log.error("이미지 생성 작업 메시지 발행 중 예외 발생 - JobId: {}, Error: {}", + message.getJobId(), e.getMessage(), e); + } + } +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/notification/LoggingNotificationService.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/notification/LoggingNotificationService.java new file mode 100644 index 0000000..49ca3ca --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/notification/LoggingNotificationService.java @@ -0,0 +1,46 @@ +package com.kt.event.eventservice.infrastructure.notification; + +import com.kt.event.eventservice.application.service.NotificationService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.UUID; + +/** + * 로깅 기반 알림 서비스 구현 + * + * 현재는 로그로만 알림을 기록하며, 추후 WebSocket, SSE, Push Notification 등으로 확장 가능합니다. + * + * @author Event Service Team + * @version 1.0.0 + * @since 2025-10-29 + */ +@Slf4j +@Service +public class LoggingNotificationService implements NotificationService { + + @Override + public void notifyJobCompleted(UUID userId, UUID jobId, String jobType, String message) { + log.info("📢 [작업 완료 알림] UserId: {}, JobId: {}, JobType: {}, Message: {}", + userId, jobId, jobType, message); + + // TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송 + // 예: webSocketTemplate.convertAndSendToUser(userId.toString(), "/queue/notifications", notification); + } + + @Override + public void notifyJobFailed(UUID userId, UUID jobId, String jobType, String errorMessage) { + log.error("📢 [작업 실패 알림] UserId: {}, JobId: {}, JobType: {}, Error: {}", + userId, jobId, jobType, errorMessage); + + // TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송 + } + + @Override + public void notifyJobProgress(UUID userId, UUID jobId, String jobType, int progress) { + log.info("📢 [작업 진행 알림] UserId: {}, JobId: {}, JobType: {}, Progress: {}%", + userId, jobId, jobType, progress); + + // TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송 + } +}