From 45f370a944a8f2d8431a23cde0dfd769625eb6e3 Mon Sep 17 00:00:00 2001 From: merrycoral Date: Mon, 27 Oct 2025 11:16:50 +0900 Subject: [PATCH] =?UTF-8?q?=ED=94=84=EB=A1=9C=EC=A0=9D=ED=8A=B8=20?= =?UTF-8?q?=EA=B5=AC=EC=84=B1=20=EA=B0=9C=EC=84=A0=20=EB=B0=8F=20Kafka=20?= =?UTF-8?q?=EC=9D=B8=ED=94=84=EB=9D=BC=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 변경사항 ### 보안 강화 - gradle.properties를 .gitignore에 추가 (로컬 환경 설정 제외) ### 공통 모듈 - ErrorCode에 Legacy 호환용 에러 코드 추가 (NOT_FOUND, INVALID_INPUT_VALUE) ### Event Service - hibernate-types 라이브러리 제거 (Hibernate 6 네이티브 지원으로 대체) ### Kafka 인프라 추가 - Kafka 메시지 DTO 3개 추가 * AIEventGenerationJobMessage: AI 이벤트 생성 작업 메시지 * EventCreatedMessage: 이벤트 생성 완료 메시지 * ImageGenerationJobMessage: 이미지 생성 작업 메시지 - Kafka Producer/Consumer 3개 추가 * EventKafkaProducer: 이벤트 메시지 발행 * AIJobKafkaConsumer: AI 작업 메시지 소비 * ImageJobKafkaConsumer: 이미지 작업 메시지 소비 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .gitignore | 13 +++ .../kt/event/common/exception/ErrorCode.java | 4 + event-service/build.gradle | 4 +- .../kafka/AIEventGenerationJobMessage.java | 95 ++++++++++++++++ .../dto/kafka/EventCreatedMessage.java | 57 ++++++++++ .../dto/kafka/ImageGenerationJobMessage.java | 75 +++++++++++++ .../kafka/AIJobKafkaConsumer.java | 102 +++++++++++++++++ .../kafka/EventKafkaProducer.java | 78 +++++++++++++ .../kafka/ImageJobKafkaConsumer.java | 105 ++++++++++++++++++ 9 files changed, 531 insertions(+), 2 deletions(-) create mode 100644 event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/EventCreatedMessage.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/EventKafkaProducer.java create mode 100644 event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java diff --git a/.gitignore b/.gitignore index 2a41541..32a0a86 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,16 @@ build/ tmp/ temp/ *.tmp + +# Kubernetes Secrets (민감한 정보 포함) +k8s/**/secret.yaml +k8s/**/*-secret.yaml +k8s/**/*-prod.yaml +k8s/**/*-dev.yaml +k8s/**/*-local.yaml + +# IntelliJ 실행 프로파일 (민감한 환경 변수 포함 가능) +.run/*.run.xml + +# Gradle (로컬 환경 설정) +gradle.properties diff --git a/common/src/main/java/com/kt/event/common/exception/ErrorCode.java b/common/src/main/java/com/kt/event/common/exception/ErrorCode.java index bd422c5..f43e24b 100644 --- a/common/src/main/java/com/kt/event/common/exception/ErrorCode.java +++ b/common/src/main/java/com/kt/event/common/exception/ErrorCode.java @@ -18,6 +18,10 @@ public enum ErrorCode { COMMON_004("COMMON_004", "서버 내부 오류가 발생했습니다"), COMMON_005("COMMON_005", "지원하지 않는 작업입니다"), + // 일반 에러 상수 (Legacy 호환용) + NOT_FOUND("NOT_FOUND", "요청한 리소스를 찾을 수 없습니다"), + INVALID_INPUT_VALUE("INVALID_INPUT_VALUE", "유효하지 않은 입력값입니다"), + // 인증/인가 에러 (AUTH_XXX) AUTH_001("AUTH_001", "인증에 실패했습니다"), AUTH_002("AUTH_002", "유효하지 않은 토큰입니다"), diff --git a/event-service/build.gradle b/event-service/build.gradle index 340d5ca..af3323a 100644 --- a/event-service/build.gradle +++ b/event-service/build.gradle @@ -11,6 +11,6 @@ dependencies { // Jackson for JSON implementation 'com.fasterxml.jackson.core:jackson-databind' - // Hibernate UUID generator - implementation 'com.vladmihalcea:hibernate-types-60:2.21.1' + // Hibernate 6 네이티브로 배열 타입 지원하므로 별도 라이브러리 불필요 + // implementation 'com.vladmihalcea:hibernate-types-60:2.21.1' } diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java new file mode 100644 index 0000000..966778f --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/AIEventGenerationJobMessage.java @@ -0,0 +1,95 @@ +package com.kt.event.eventservice.application.dto.kafka; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * AI 이벤트 생성 작업 메시지 DTO + * + * ai-event-generation-job 토픽에서 구독하는 메시지 형식 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class AIEventGenerationJobMessage { + + /** + * 작업 ID + */ + @JsonProperty("job_id") + private String jobId; + + /** + * 사용자 ID + */ + @JsonProperty("user_id") + private Long userId; + + /** + * 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED) + */ + @JsonProperty("status") + private String status; + + /** + * AI 추천 결과 데이터 + */ + @JsonProperty("ai_recommendation") + private AIRecommendationData aiRecommendation; + + /** + * 에러 메시지 (실패 시) + */ + @JsonProperty("error_message") + private String errorMessage; + + /** + * 작업 생성 일시 + */ + @JsonProperty("created_at") + private LocalDateTime createdAt; + + /** + * 작업 완료/실패 일시 + */ + @JsonProperty("completed_at") + private LocalDateTime completedAt; + + /** + * AI 추천 데이터 내부 클래스 + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + public static class AIRecommendationData { + + @JsonProperty("event_title") + private String eventTitle; + + @JsonProperty("event_description") + private String eventDescription; + + @JsonProperty("event_type") + private String eventType; + + @JsonProperty("target_keywords") + private List targetKeywords; + + @JsonProperty("recommended_benefits") + private List recommendedBenefits; + + @JsonProperty("start_date") + private String startDate; + + @JsonProperty("end_date") + private String endDate; + } +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/EventCreatedMessage.java b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/EventCreatedMessage.java new file mode 100644 index 0000000..d971374 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/EventCreatedMessage.java @@ -0,0 +1,57 @@ +package com.kt.event.eventservice.application.dto.kafka; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * 이벤트 생성 완료 메시지 DTO + * + * event-created 토픽에 발행되는 메시지 형식 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class EventCreatedMessage { + + /** + * 이벤트 ID + */ + @JsonProperty("event_id") + private Long eventId; + + /** + * 사용자 ID + */ + @JsonProperty("user_id") + private Long userId; + + /** + * 이벤트 제목 + */ + @JsonProperty("title") + private String title; + + /** + * 이벤트 생성 일시 + */ + @JsonProperty("created_at") + private LocalDateTime createdAt; + + /** + * 이벤트 타입 (COUPON, DISCOUNT, GIFT, POINT 등) + */ + @JsonProperty("event_type") + private String eventType; + + /** + * 메시지 타임스탬프 + */ + @JsonProperty("timestamp") + private LocalDateTime timestamp; +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java new file mode 100644 index 0000000..dd52243 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/application/dto/kafka/ImageGenerationJobMessage.java @@ -0,0 +1,75 @@ +package com.kt.event.eventservice.application.dto.kafka; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + +/** + * 이미지 생성 작업 메시지 DTO + * + * image-generation-job 토픽에서 구독하는 메시지 형식 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class ImageGenerationJobMessage { + + /** + * 작업 ID + */ + @JsonProperty("job_id") + private String jobId; + + /** + * 이벤트 ID + */ + @JsonProperty("event_id") + private Long eventId; + + /** + * 사용자 ID + */ + @JsonProperty("user_id") + private Long userId; + + /** + * 작업 상태 (PENDING, PROCESSING, COMPLETED, FAILED) + */ + @JsonProperty("status") + private String status; + + /** + * 생성된 이미지 URL + */ + @JsonProperty("image_url") + private String imageUrl; + + /** + * 이미지 생성 프롬프트 + */ + @JsonProperty("prompt") + private String prompt; + + /** + * 에러 메시지 (실패 시) + */ + @JsonProperty("error_message") + private String errorMessage; + + /** + * 작업 생성 일시 + */ + @JsonProperty("created_at") + private LocalDateTime createdAt; + + /** + * 작업 완료/실패 일시 + */ + @JsonProperty("completed_at") + private LocalDateTime completedAt; +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java new file mode 100644 index 0000000..f4f1608 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/AIJobKafkaConsumer.java @@ -0,0 +1,102 @@ +package com.kt.event.eventservice.infrastructure.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.kt.event.eventservice.application.dto.kafka.AIEventGenerationJobMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +/** + * AI 이벤트 생성 작업 메시지 구독 Consumer + * + * ai-event-generation-job 토픽의 메시지를 구독하여 처리합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AIJobKafkaConsumer { + + private final ObjectMapper objectMapper; + + /** + * AI 이벤트 생성 작업 메시지 수신 처리 + * + * @param message AI 이벤트 생성 작업 메시지 + * @param partition 파티션 번호 + * @param offset 오프셋 + * @param acknowledgment 수동 커밋용 Acknowledgment + */ + @KafkaListener( + topics = "${app.kafka.topics.ai-event-generation-job}", + groupId = "${spring.kafka.consumer.group-id}", + containerFactory = "kafkaListenerContainerFactory" + ) + public void consumeAIEventGenerationJob( + @Payload String payload, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, + @Header(KafkaHeaders.OFFSET) long offset, + Acknowledgment acknowledgment + ) { + try { + log.info("AI 이벤트 생성 작업 메시지 수신 - Partition: {}, Offset: {}", partition, offset); + + // JSON을 객체로 변환 + AIEventGenerationJobMessage message = objectMapper.readValue( + payload, + AIEventGenerationJobMessage.class + ); + + log.info("AI 작업 메시지 파싱 완료 - JobId: {}, UserId: {}, Status: {}", + message.getJobId(), message.getUserId(), message.getStatus()); + + // 메시지 처리 로직 + processAIEventGenerationJob(message); + + // 수동 커밋 + acknowledgment.acknowledge(); + log.info("AI 이벤트 생성 작업 메시지 처리 완료 - JobId: {}", message.getJobId()); + + } catch (Exception e) { + log.error("AI 이벤트 생성 작업 메시지 처리 중 오류 발생 - Partition: {}, Offset: {}, Error: {}", + partition, offset, e.getMessage(), e); + // 에러 발생 시에도 커밋 (재처리 방지, DLQ 사용 권장) + acknowledgment.acknowledge(); + } + } + + /** + * AI 이벤트 생성 작업 처리 + * + * @param message AI 이벤트 생성 작업 메시지 + */ + private void processAIEventGenerationJob(AIEventGenerationJobMessage message) { + switch (message.getStatus()) { + case "COMPLETED": + log.info("AI 작업 완료 처리 - JobId: {}, UserId: {}", + message.getJobId(), message.getUserId()); + // TODO: AI 추천 결과를 캐시 또는 DB에 저장 + // TODO: 사용자에게 알림 전송 + break; + + case "FAILED": + log.error("AI 작업 실패 처리 - JobId: {}, Error: {}", + message.getJobId(), message.getErrorMessage()); + // TODO: 실패 로그 저장 및 사용자 알림 + break; + + case "PROCESSING": + log.info("AI 작업 진행 중 - JobId: {}", message.getJobId()); + // TODO: 작업 상태 업데이트 + break; + + default: + log.warn("알 수 없는 작업 상태 - JobId: {}, Status: {}", + message.getJobId(), message.getStatus()); + } + } +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/EventKafkaProducer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/EventKafkaProducer.java new file mode 100644 index 0000000..a409831 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/EventKafkaProducer.java @@ -0,0 +1,78 @@ +package com.kt.event.eventservice.infrastructure.kafka; + +import com.kt.event.eventservice.application.dto.kafka.EventCreatedMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.concurrent.CompletableFuture; + +/** + * 이벤트 생성 메시지 발행 Producer + * + * event-created 토픽에 이벤트 생성 완료 메시지를 발행합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class EventKafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + @Value("${app.kafka.topics.event-created}") + private String eventCreatedTopic; + + /** + * 이벤트 생성 완료 메시지 발행 + * + * @param eventId 이벤트 ID + * @param userId 사용자 ID + * @param title 이벤트 제목 + * @param eventType 이벤트 타입 + */ + public void publishEventCreated(Long eventId, Long userId, String title, String eventType) { + EventCreatedMessage message = EventCreatedMessage.builder() + .eventId(eventId) + .userId(userId) + .title(title) + .eventType(eventType) + .createdAt(LocalDateTime.now()) + .timestamp(LocalDateTime.now()) + .build(); + + publishEventCreatedMessage(message); + } + + /** + * 이벤트 생성 메시지 발행 + * + * @param message EventCreatedMessage 객체 + */ + public void publishEventCreatedMessage(EventCreatedMessage message) { + try { + CompletableFuture> future = + kafkaTemplate.send(eventCreatedTopic, message.getEventId().toString(), message); + + future.whenComplete((result, ex) -> { + if (ex == null) { + log.info("이벤트 생성 메시지 발행 성공 - Topic: {}, EventId: {}, Offset: {}", + eventCreatedTopic, + message.getEventId(), + result.getRecordMetadata().offset()); + } else { + log.error("이벤트 생성 메시지 발행 실패 - Topic: {}, EventId: {}, Error: {}", + eventCreatedTopic, + message.getEventId(), + ex.getMessage(), ex); + } + }); + } catch (Exception e) { + log.error("이벤트 생성 메시지 발행 중 예외 발생 - EventId: {}, Error: {}", + message.getEventId(), e.getMessage(), e); + } + } +} diff --git a/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java new file mode 100644 index 0000000..f66f3e7 --- /dev/null +++ b/event-service/src/main/java/com/kt/event/eventservice/infrastructure/kafka/ImageJobKafkaConsumer.java @@ -0,0 +1,105 @@ +package com.kt.event.eventservice.infrastructure.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.kt.event.eventservice.application.dto.kafka.ImageGenerationJobMessage; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Component; + +/** + * 이미지 생성 작업 메시지 구독 Consumer + * + * image-generation-job 토픽의 메시지를 구독하여 처리합니다. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class ImageJobKafkaConsumer { + + private final ObjectMapper objectMapper; + + /** + * 이미지 생성 작업 메시지 수신 처리 + * + * @param payload 메시지 페이로드 (JSON) + * @param partition 파티션 번호 + * @param offset 오프셋 + * @param acknowledgment 수동 커밋용 Acknowledgment + */ + @KafkaListener( + topics = "${app.kafka.topics.image-generation-job}", + groupId = "${spring.kafka.consumer.group-id}", + containerFactory = "kafkaListenerContainerFactory" + ) + public void consumeImageGenerationJob( + @Payload String payload, + @Header(KafkaHeaders.RECEIVED_PARTITION) int partition, + @Header(KafkaHeaders.OFFSET) long offset, + Acknowledgment acknowledgment + ) { + try { + log.info("이미지 생성 작업 메시지 수신 - Partition: {}, Offset: {}", partition, offset); + + // JSON을 객체로 변환 + ImageGenerationJobMessage message = objectMapper.readValue( + payload, + ImageGenerationJobMessage.class + ); + + log.info("이미지 작업 메시지 파싱 완료 - JobId: {}, EventId: {}, Status: {}", + message.getJobId(), message.getEventId(), message.getStatus()); + + // 메시지 처리 로직 + processImageGenerationJob(message); + + // 수동 커밋 + acknowledgment.acknowledge(); + log.info("이미지 생성 작업 메시지 처리 완료 - JobId: {}", message.getJobId()); + + } catch (Exception e) { + log.error("이미지 생성 작업 메시지 처리 중 오류 발생 - Partition: {}, Offset: {}, Error: {}", + partition, offset, e.getMessage(), e); + // 에러 발생 시에도 커밋 (재처리 방지, DLQ 사용 권장) + acknowledgment.acknowledge(); + } + } + + /** + * 이미지 생성 작업 처리 + * + * @param message 이미지 생성 작업 메시지 + */ + private void processImageGenerationJob(ImageGenerationJobMessage message) { + switch (message.getStatus()) { + case "COMPLETED": + log.info("이미지 작업 완료 처리 - JobId: {}, EventId: {}, ImageURL: {}", + message.getJobId(), message.getEventId(), message.getImageUrl()); + // TODO: 생성된 이미지 URL을 캐시 또는 DB에 저장 + // TODO: 이벤트 엔티티에 이미지 URL 업데이트 + // TODO: 사용자에게 알림 전송 + break; + + case "FAILED": + log.error("이미지 작업 실패 처리 - JobId: {}, EventId: {}, Error: {}", + message.getJobId(), message.getEventId(), message.getErrorMessage()); + // TODO: 실패 로그 저장 및 사용자 알림 + // TODO: 재시도 로직 또는 기본 이미지 사용 + break; + + case "PROCESSING": + log.info("이미지 작업 진행 중 - JobId: {}, EventId: {}", + message.getJobId(), message.getEventId()); + // TODO: 작업 상태 업데이트 + break; + + default: + log.warn("알 수 없는 작업 상태 - JobId: {}, EventId: {}, Status: {}", + message.getJobId(), message.getEventId(), message.getStatus()); + } + } +}