Kafka 메시지 구조 개선 및 알림 서비스 추가

This commit is contained in:
merrycoral 2025-10-29 15:00:20 +09:00
parent da173d79e9
commit bcfbb6c7f9
11 changed files with 538 additions and 77 deletions

View File

@ -27,10 +27,10 @@ public class AIEventGenerationJobMessage {
private String jobId;
/**
* 사용자 ID
* 사용자 ID (UUID String)
*/
@JsonProperty("user_id")
private Long userId;
private String userId;
/**
* 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED)

View File

@ -26,16 +26,16 @@ public class ImageGenerationJobMessage {
private String jobId;
/**
* 이벤트 ID
* 이벤트 ID (UUID String)
*/
@JsonProperty("event_id")
private Long eventId;
private String eventId;
/**
* 사용자 ID
* 사용자 ID (UUID String)
*/
@JsonProperty("user_id")
private Long userId;
private String userId;
/**
* 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED)

View File

@ -4,6 +4,7 @@ import com.kt.event.common.exception.BusinessException;
import com.kt.event.common.exception.ErrorCode;
import com.kt.event.eventservice.application.dto.request.*;
import com.kt.event.eventservice.application.dto.response.*;
import com.kt.event.eventservice.domain.enums.JobStatus;
import com.kt.event.eventservice.domain.enums.JobType;
import com.kt.event.eventservice.domain.entity.*;
import com.kt.event.eventservice.domain.enums.EventStatus;
@ -14,6 +15,7 @@ import com.kt.event.eventservice.infrastructure.client.dto.ContentImageGeneratio
import com.kt.event.eventservice.infrastructure.client.dto.ContentJobResponse;
import com.kt.event.eventservice.infrastructure.kafka.AIJobKafkaProducer;
import com.kt.event.eventservice.infrastructure.kafka.EventKafkaProducer;
import com.kt.event.eventservice.infrastructure.kafka.ImageJobKafkaProducer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.Hibernate;
@ -44,6 +46,7 @@ public class EventService {
private final JobRepository jobRepository;
private final ContentServiceClient contentServiceClient;
private final AIJobKafkaProducer aiJobKafkaProducer;
private final ImageJobKafkaProducer imageJobKafkaProducer;
private final EventKafkaProducer eventKafkaProducer;
/**
@ -225,26 +228,37 @@ public class EventService {
throw new BusinessException(ErrorCode.EVENT_002);
}
// Content Service 요청 DTO 생성
ContentImageGenerationRequest contentRequest = ContentImageGenerationRequest.builder()
.eventDraftId(event.getEventId().getMostSignificantBits())
.eventTitle(event.getEventName() != null ? event.getEventName() : "")
.eventDescription(event.getDescription() != null ? event.getDescription() : "")
.styles(request.getStyles())
.platforms(request.getPlatforms())
// 이미지 생성 프롬프트 생성
String prompt = String.format("이벤트: %s, 설명: %s, 스타일: %s, 플랫폼: %s",
event.getEventName() != null ? event.getEventName() : "이벤트",
event.getDescription() != null ? event.getDescription() : "",
String.join(", ", request.getStyles()),
String.join(", ", request.getPlatforms()));
// Job 엔티티 생성
Job job = Job.builder()
.eventId(eventId)
.jobType(JobType.IMAGE_GENERATION)
.build();
// Content Service 호출
ContentJobResponse jobResponse = contentServiceClient.generateImages(contentRequest);
job = jobRepository.save(job);
log.info("Content Service 이미지 생성 요청 완료 - jobId: {}", jobResponse.getId());
// Kafka 메시지 발행
imageJobKafkaProducer.publishImageGenerationJob(
job.getJobId().toString(),
userId.toString(),
eventId.toString(),
prompt
);
log.info("이미지 생성 작업 메시지 발행 완료 - jobId: {}", job.getJobId());
// 응답 생성
return ImageGenerationResponse.builder()
.jobId(UUID.fromString(jobResponse.getId()))
.status(jobResponse.getStatus())
.jobId(job.getJobId())
.status(job.getStatus().name())
.message("이미지 생성 요청이 접수되었습니다.")
.createdAt(jobResponse.getCreatedAt())
.createdAt(job.getCreatedAt())
.build();
}
@ -309,7 +323,7 @@ public class EventService {
// Kafka 메시지 발행
aiJobKafkaProducer.publishAIGenerationJob(
job.getJobId().toString(),
userId.getMostSignificantBits(), // Long으로 변환
userId.toString(),
eventId.toString(),
request.getStoreInfo().getStoreName(),
request.getStoreInfo().getCategory(),

View File

@ -0,0 +1,46 @@
package com.kt.event.eventservice.application.service;
import java.util.UUID;
/**
* 알림 서비스 인터페이스
*
* 사용자에게 작업 완료/실패 알림을 전송하는 서비스입니다.
* WebSocket, SSE, Push Notification 다양한 방식으로 확장 가능합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
public interface NotificationService {
/**
* 작업 완료 알림 전송
*
* @param userId 사용자 ID
* @param jobId 작업 ID
* @param jobType 작업 타입
* @param message 알림 메시지
*/
void notifyJobCompleted(UUID userId, UUID jobId, String jobType, String message);
/**
* 작업 실패 알림 전송
*
* @param userId 사용자 ID
* @param jobId 작업 ID
* @param jobType 작업 타입
* @param errorMessage 에러 메시지
*/
void notifyJobFailed(UUID userId, UUID jobId, String jobType, String errorMessage);
/**
* 작업 진행 상태 알림 전송
*
* @param userId 사용자 ID
* @param jobId 작업 ID
* @param jobType 작업 타입
* @param progress 진행률 (0-100)
*/
void notifyJobProgress(UUID userId, UUID jobId, String jobType, int progress);
}

View File

@ -37,6 +37,7 @@ public class KafkaConfig {
/**
* Kafka Producer 설정
* Producer에서 JSON 문자열을 보내므로 StringSerializer 사용
*
* @return ProducerFactory 인스턴스
*/
@ -45,8 +46,7 @@ public class KafkaConfig {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Producer 성능 최적화 설정
config.put(ProducerConfig.ACKS_CONFIG, "all");
@ -83,14 +83,9 @@ public class KafkaConfig {
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
// 실제 Deserializer 설정
// 실제 Deserializer 설정 (Producer에서 JSON 문자열을 보내므로 StringDeserializer 사용)
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
// JsonDeserializer 설정
config.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.util.HashMap");
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

View File

@ -59,6 +59,14 @@ public class Job extends BaseTimeEntity {
@Column(name = "completed_at")
private LocalDateTime completedAt;
@Column(name = "retry_count", nullable = false)
@Builder.Default
private int retryCount = 0;
@Column(name = "max_retry_count", nullable = false)
@Builder.Default
private int maxRetryCount = 3;
// ==== 비즈니스 로직 ==== //
/**
@ -97,4 +105,30 @@ public class Job extends BaseTimeEntity {
this.errorMessage = errorMessage;
this.completedAt = LocalDateTime.now();
}
/**
* 재시도 가능 여부 확인
*/
public boolean canRetry() {
return this.retryCount < this.maxRetryCount;
}
/**
* 재시도 카운트 증가
*/
public void incrementRetryCount() {
this.retryCount++;
}
/**
* 재시도 준비 (상태를 PENDING으로 변경)
*/
public void prepareRetry() {
if (!canRetry()) {
throw new IllegalStateException("최대 재시도 횟수를 초과했습니다.");
}
incrementRetryCount();
this.status = JobStatus.PENDING;
this.errorMessage = null;
}
}

View File

@ -2,6 +2,12 @@ package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage;
import com.kt.event.eventservice.application.service.NotificationService;
import com.kt.event.eventservice.domain.entity.AiRecommendation;
import com.kt.event.eventservice.domain.entity.Event;
import com.kt.event.eventservice.domain.entity.Job;
import com.kt.event.eventservice.domain.repository.EventRepository;
import com.kt.event.eventservice.domain.repository.JobRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
@ -10,11 +16,18 @@ import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* AI 이벤트 생성 작업 메시지 구독 Consumer
*
* ai-event-generation-job 토픽의 메시지를 구독하여 처리합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Component
@ -22,6 +35,9 @@ import org.springframework.stereotype.Component;
public class AIJobKafkaConsumer {
private final ObjectMapper objectMapper;
private final JobRepository jobRepository;
private final EventRepository eventRepository;
private final NotificationService notificationService;
/**
* AI 이벤트 생성 작업 메시지 수신 처리
@ -74,29 +90,120 @@ public class AIJobKafkaConsumer {
*
* @param message AI 이벤트 생성 작업 메시지
*/
private void processAIEventGenerationJob(AIEventGenerationJobMessage message) {
switch (message.getStatus()) {
case "COMPLETED":
log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}",
message.getJobId(), message.getUserId());
// TODO: AI 추천 결과를 캐시 또는 DB에 저장
// TODO: 사용자에게 알림 전송
break;
@Transactional
protected void processAIEventGenerationJob(AIEventGenerationJobMessage message) {
try {
UUID jobId = UUID.fromString(message.getJobId());
case "FAILED":
log.error("AI 작업 실패 처리 - JobId: {}, Error: {}",
message.getJobId(), message.getErrorMessage());
// TODO: 실패 로그 저장 사용자 알림
break;
// Job 조회
Job job = jobRepository.findById(jobId).orElse(null);
if (job == null) {
log.warn("Job을 찾을 수 없습니다 - JobId: {}", message.getJobId());
return;
}
case "PROCESSING":
log.info("AI 작업 진행 중 - JobId: {}", message.getJobId());
// TODO: 작업 상태 업데이트
break;
UUID eventId = job.getEventId();
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}",
message.getJobId(), message.getStatus());
// Event 조회 (모든 케이스에서 사용)
Event event = eventRepository.findById(eventId).orElse(null);
switch (message.getStatus()) {
case "COMPLETED":
log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}",
message.getJobId(), message.getUserId());
// Job 상태 업데이트
if (message.getAiRecommendation() != null) {
// AI 추천 데이터를 JSON 문자열로 저장 (또는 별도 처리)
String recommendationData = objectMapper.writeValueAsString(message.getAiRecommendation());
job.complete(recommendationData);
} else {
job.complete("AI 추천 완료");
}
jobRepository.save(job);
// Event 조회 AI 추천 저장
if (event != null && message.getAiRecommendation() != null) {
var aiData = message.getAiRecommendation();
// AiRecommendation 엔티티 생성 Event에 추가
AiRecommendation aiRecommendation = AiRecommendation.builder()
.eventName(aiData.getEventTitle())
.description(aiData.getEventDescription())
.promotionType(aiData.getEventType())
.targetAudience(aiData.getTargetKeywords() != null ?
String.join(", ", aiData.getTargetKeywords()) : null)
.build();
event.addAiRecommendation(aiRecommendation);
eventRepository.save(event);
log.info("AI 추천 저장 완료 - EventId: {}, RecommendationTitle: {}",
eventId, aiData.getEventTitle());
// 사용자에게 알림 전송
UUID userId = event.getUserId();
notificationService.notifyJobCompleted(
userId,
jobId,
"AI_RECOMMENDATION",
"AI 추천이 완료되었습니다."
);
} else {
if (event == null) {
log.warn("Event를 찾을 수 없습니다 - EventId: {}", eventId);
}
}
break;
case "FAILED":
log.error("AI 작업 실패 처리 - JobId: {}, Error: {}",
message.getJobId(), message.getErrorMessage());
// Job 상태 업데이트
job.fail(message.getErrorMessage());
jobRepository.save(job);
// 사용자에게 실패 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobFailed(
userId,
jobId,
"AI_RECOMMENDATION",
"AI 추천에 실패했습니다: " + message.getErrorMessage()
);
}
break;
case "PROCESSING":
log.info("AI 작업 진행 중 - JobId: {}", message.getJobId());
// Job 상태 업데이트
job.start();
jobRepository.save(job);
// 사용자에게 진행 상태 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobProgress(
userId,
jobId,
"AI_RECOMMENDATION",
job.getProgress()
);
}
break;
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}",
message.getJobId(), message.getStatus());
}
} catch (Exception e) {
log.error("AI 작업 처리 중 예외 발생 - JobId: {}, Error: {}",
message.getJobId(), e.getMessage(), e);
throw new RuntimeException(e);
}
}
}

View File

@ -1,5 +1,6 @@
package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -26,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
public class AIJobKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Value("${app.kafka.topics.ai-event-generation-job:ai-event-generation-job}")
private String aiEventGenerationJobTopic;
@ -33,9 +35,9 @@ public class AIJobKafkaProducer {
/**
* AI 이벤트 생성 작업 메시지 발행
*
* @param jobId 작업 ID
* @param userId 사용자 ID
* @param eventId 이벤트 ID
* @param jobId 작업 ID (UUID String)
* @param userId 사용자 ID (UUID String)
* @param eventId 이벤트 ID (UUID String)
* @param storeName 매장명
* @param storeCategory 매장 업종
* @param storeDescription 매장 설명
@ -43,7 +45,7 @@ public class AIJobKafkaProducer {
*/
public void publishAIGenerationJob(
String jobId,
Long userId,
String userId,
String eventId,
String storeName,
String storeCategory,
@ -67,8 +69,11 @@ public class AIJobKafkaProducer {
*/
public void publishMessage(AIEventGenerationJobMessage message) {
try {
// JSON 문자열로 변환
String jsonMessage = objectMapper.writeValueAsString(message);
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(aiEventGenerationJobTopic, message.getJobId(), message);
kafkaTemplate.send(aiEventGenerationJobTopic, message.getJobId(), jsonMessage);
future.whenComplete((result, ex) -> {
if (ex == null) {

View File

@ -2,6 +2,12 @@ package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage;
import com.kt.event.eventservice.application.service.NotificationService;
import com.kt.event.eventservice.domain.entity.Event;
import com.kt.event.eventservice.domain.entity.GeneratedImage;
import com.kt.event.eventservice.domain.entity.Job;
import com.kt.event.eventservice.domain.repository.EventRepository;
import com.kt.event.eventservice.domain.repository.JobRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
@ -10,11 +16,18 @@ import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
/**
* 이미지 생성 작업 메시지 구독 Consumer
*
* image-generation-job 토픽의 메시지를 구독하여 처리합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Component
@ -22,6 +35,10 @@ import org.springframework.stereotype.Component;
public class ImageJobKafkaConsumer {
private final ObjectMapper objectMapper;
private final JobRepository jobRepository;
private final EventRepository eventRepository;
private final NotificationService notificationService;
private final ImageJobKafkaProducer imageJobKafkaProducer;
/**
* 이미지 생성 작업 메시지 수신 처리
@ -74,32 +91,136 @@ public class ImageJobKafkaConsumer {
*
* @param message 이미지 생성 작업 메시지
*/
private void processImageGenerationJob(ImageGenerationJobMessage message) {
switch (message.getStatus()) {
case "COMPLETED":
log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}",
message.getJobId(), message.getEventId(), message.getImageUrl());
// TODO: 생성된 이미지 URL을 캐시 또는 DB에 저장
// TODO: 이벤트 엔티티에 이미지 URL 업데이트
// TODO: 사용자에게 알림 전송
break;
@Transactional
protected void processImageGenerationJob(ImageGenerationJobMessage message) {
try {
UUID jobId = UUID.fromString(message.getJobId());
UUID eventId = UUID.fromString(message.getEventId());
case "FAILED":
log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}",
message.getJobId(), message.getEventId(), message.getErrorMessage());
// TODO: 실패 로그 저장 사용자 알림
// TODO: 재시도 로직 또는 기본 이미지 사용
break;
// Job 조회
Job job = jobRepository.findById(jobId).orElse(null);
if (job == null) {
log.warn("Job을 찾을 수 없습니다 - JobId: {}", message.getJobId());
return;
}
case "PROCESSING":
log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}",
message.getJobId(), message.getEventId());
// TODO: 작업 상태 업데이트
break;
// Event 조회 (모든 케이스에서 사용)
Event event = eventRepository.findById(eventId).orElse(null);
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}",
message.getJobId(), message.getEventId(), message.getStatus());
switch (message.getStatus()) {
case "COMPLETED":
log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}",
message.getJobId(), message.getEventId(), message.getImageUrl());
// Job 상태 업데이트
job.complete(message.getImageUrl());
jobRepository.save(job);
// Event 조회
if (event != null) {
// GeneratedImage 엔티티 생성 Event에 추가
GeneratedImage generatedImage = GeneratedImage.builder()
.imageUrl(message.getImageUrl())
.build();
event.addGeneratedImage(generatedImage);
eventRepository.save(event);
log.info("이미지 저장 완료 - EventId: {}, ImageURL: {}",
eventId, message.getImageUrl());
// 사용자에게 알림 전송
UUID userId = event.getUserId();
notificationService.notifyJobCompleted(
userId,
jobId,
"IMAGE_GENERATION",
"이미지 생성이 완료되었습니다."
);
} else {
log.warn("Event를 찾을 수 없습니다 - EventId: {}", eventId);
}
break;
case "FAILED":
log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}",
message.getJobId(), message.getEventId(), message.getErrorMessage());
// 재시도 로직
if (job.canRetry()) {
log.info("이미지 생성 재시도 - JobId: {}, RetryCount: {}/{}",
jobId, job.getRetryCount() + 1, job.getMaxRetryCount());
// 재시도 준비
job.prepareRetry();
jobRepository.save(job);
// 재시도 메시지 발행
if (event != null) {
String prompt = String.format("이벤트: %s (재시도 %d/%d)",
event.getEventName() != null ? event.getEventName() : "이벤트",
job.getRetryCount(),
job.getMaxRetryCount());
imageJobKafkaProducer.publishImageGenerationJob(
jobId.toString(),
message.getUserId(),
eventId.toString(),
prompt
);
log.info("이미지 생성 재시도 메시지 발행 완료 - JobId: {}", jobId);
}
} else {
// 최대 재시도 횟수 초과 - 완전 실패 처리
log.error("이미지 생성 최대 재시도 횟수 초과 - JobId: {}, RetryCount: {}",
jobId, job.getRetryCount());
job.fail(message.getErrorMessage());
jobRepository.save(job);
// 사용자에게 실패 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobFailed(
userId,
jobId,
"IMAGE_GENERATION",
"이미지 생성에 실패했습니다: " + message.getErrorMessage()
);
}
}
break;
case "PROCESSING":
log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}",
message.getJobId(), message.getEventId());
// Job 상태 업데이트
job.start();
jobRepository.save(job);
// 사용자에게 진행 상태 알림 전송
if (event != null) {
UUID userId = event.getUserId();
notificationService.notifyJobProgress(
userId,
jobId,
"IMAGE_GENERATION",
job.getProgress()
);
}
break;
default:
log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}",
message.getJobId(), message.getEventId(), message.getStatus());
}
} catch (Exception e) {
log.error("이미지 작업 처리 중 예외 발생 - JobId: {}, Error: {}",
message.getJobId(), e.getMessage(), e);
throw e;
}
}
}

View File

@ -0,0 +1,93 @@
package com.kt.event.eventservice.infrastructure.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
/**
* 이미지 생성 작업 메시지 발행 Producer
*
* image-generation-job 토픽에 이미지 생성 작업 메시지를 발행합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ImageJobKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Value("${app.kafka.topics.image-generation-job:image-generation-job}")
private String imageGenerationJobTopic;
/**
* 이미지 생성 작업 메시지 발행
*
* @param jobId 작업 ID (UUID)
* @param userId 사용자 ID (UUID)
* @param eventId 이벤트 ID (UUID)
* @param prompt 이미지 생성 프롬프트
*/
public void publishImageGenerationJob(
String jobId,
String userId,
String eventId,
String prompt) {
ImageGenerationJobMessage message = ImageGenerationJobMessage.builder()
.jobId(jobId)
.userId(userId)
.eventId(eventId)
.prompt(prompt)
.status("PENDING")
.createdAt(LocalDateTime.now())
.build();
publishMessage(message);
}
/**
* 이미지 생성 작업 메시지 발행
*
* @param message ImageGenerationJobMessage 객체
*/
public void publishMessage(ImageGenerationJobMessage message) {
try {
// JSON 문자열로 변환
String jsonMessage = objectMapper.writeValueAsString(message);
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(imageGenerationJobTopic, message.getJobId(), jsonMessage);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("이미지 생성 작업 메시지 발행 성공 - Topic: {}, JobId: {}, EventId: {}, Offset: {}",
imageGenerationJobTopic,
message.getJobId(),
message.getEventId(),
result.getRecordMetadata().offset());
} else {
log.error("이미지 생성 작업 메시지 발행 실패 - Topic: {}, JobId: {}, Error: {}",
imageGenerationJobTopic,
message.getJobId(),
ex.getMessage(), ex);
}
});
} catch (Exception e) {
log.error("이미지 생성 작업 메시지 발행 중 예외 발생 - JobId: {}, Error: {}",
message.getJobId(), e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,46 @@
package com.kt.event.eventservice.infrastructure.notification;
import com.kt.event.eventservice.application.service.NotificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* 로깅 기반 알림 서비스 구현
*
* 현재는 로그로만 알림을 기록하며, 추후 WebSocket, SSE, Push Notification 등으로 확장 가능합니다.
*
* @author Event Service Team
* @version 1.0.0
* @since 2025-10-29
*/
@Slf4j
@Service
public class LoggingNotificationService implements NotificationService {
@Override
public void notifyJobCompleted(UUID userId, UUID jobId, String jobType, String message) {
log.info("📢 [작업 완료 알림] UserId: {}, JobId: {}, JobType: {}, Message: {}",
userId, jobId, jobType, message);
// TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송
// : webSocketTemplate.convertAndSendToUser(userId.toString(), "/queue/notifications", notification);
}
@Override
public void notifyJobFailed(UUID userId, UUID jobId, String jobType, String errorMessage) {
log.error("📢 [작업 실패 알림] UserId: {}, JobId: {}, JobType: {}, Error: {}",
userId, jobId, jobType, errorMessage);
// TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송
}
@Override
public void notifyJobProgress(UUID userId, UUID jobId, String jobType, int progress) {
log.info("📢 [작업 진행 알림] UserId: {}, JobId: {}, JobType: {}, Progress: {}%",
userId, jobId, jobType, progress);
// TODO: WebSocket, SSE, 또는 Push Notification으로 실시간 알림 전송
}
}