From 9dfbb5866b6abd944ecc98ed754a76918e587eb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=B0=95=EC=84=B8=EC=9B=90?= Date: Fri, 31 Oct 2025 11:41:05 +0900 Subject: [PATCH] =?UTF-8?q?AI=20Service=20Kafka=20Consumer=20=EB=A1=9C?= =?UTF-8?q?=EA=B9=85=20=EB=B0=8F=20=EC=84=A4=EC=A0=95=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Kafka Consumer에 상세 로깅 추가 (Topic, Partition, Offset, 메시지 내용) - Consumer Factory 초기화 로그 추가 - DefaultErrorHandler를 통한 에러 로깅 강화 - JsonDeserializer USE_TYPE_INFO_HEADERS 명시적 설정 이 변경으로 Kafka 메시지 수신 과정을 더 명확하게 추적할 수 있습니다. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../com/kt/ai/config/KafkaConsumerConfig.java | 22 +++++++++++++++++++ .../kt/ai/kafka/consumer/AIJobConsumer.java | 15 +++++++++---- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/ai-service/src/main/java/com/kt/ai/config/KafkaConsumerConfig.java b/ai-service/src/main/java/com/kt/ai/config/KafkaConsumerConfig.java index 23df4d9..fa801e9 100644 --- a/ai-service/src/main/java/com/kt/ai/config/KafkaConsumerConfig.java +++ b/ai-service/src/main/java/com/kt/ai/config/KafkaConsumerConfig.java @@ -1,6 +1,7 @@ package com.kt.ai.config; import com.kt.ai.kafka.message.AIJobMessage; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; @@ -26,6 +27,7 @@ import java.util.Map; * @author AI Service Team * @since 1.0.0 */ +@Slf4j @EnableKafka @Configuration public class KafkaConsumerConfig { @@ -41,6 +43,12 @@ public class KafkaConsumerConfig { */ @Bean public ConsumerFactory consumerFactory() { + log.info("========================================"); + log.info("Kafka Consumer Factory 초기화 시작"); + log.info("Bootstrap Servers: {}", bootstrapServers); + log.info("Consumer Group ID: {}", groupId); + log.info("========================================"); + Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); @@ -57,7 +65,9 @@ public class KafkaConsumerConfig { props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, AIJobMessage.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); + props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); + log.info("✅ Kafka Consumer Factory 설정 완료"); return new DefaultKafkaConsumerFactory<>(props); } @@ -67,10 +77,22 @@ public class KafkaConsumerConfig { */ @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + log.info("Kafka Listener Container Factory 초기화"); + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + + // 에러 핸들러 추가 + factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler((record, exception) -> { + log.error("❌ Kafka 메시지 처리 중 에러 발생"); + log.error("Topic: {}, Partition: {}, Offset: {}", + record.topic(), record.partition(), record.offset()); + log.error("Error: ", exception); + })); + + log.info("✅ Kafka Listener Container Factory 설정 완료"); return factory; } } diff --git a/ai-service/src/main/java/com/kt/ai/kafka/consumer/AIJobConsumer.java b/ai-service/src/main/java/com/kt/ai/kafka/consumer/AIJobConsumer.java index 2b82f8a..0844544 100644 --- a/ai-service/src/main/java/com/kt/ai/kafka/consumer/AIJobConsumer.java +++ b/ai-service/src/main/java/com/kt/ai/kafka/consumer/AIJobConsumer.java @@ -38,21 +38,28 @@ public class AIJobConsumer { @Payload AIJobMessage message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.OFFSET) Long offset, + @Header(KafkaHeaders.RECEIVED_PARTITION) Integer partition, Acknowledgment acknowledgment ) { try { - log.info("Kafka 메시지 수신: topic={}, offset={}, jobId={}, eventId={}", - topic, offset, message.getJobId(), message.getEventId()); + log.info("========================================"); + log.info("Kafka 메시지 수신 시작"); + log.info("Topic: {}, Partition: {}, Offset: {}", topic, partition, offset); + log.info("JobId: {}, EventId: {}", message.getJobId(), message.getEventId()); + log.info("Industry: {}, Region: {}", message.getIndustry(), message.getRegion()); + log.info("Objective: {}, StoreName: {}", message.getObjective(), message.getStoreName()); + log.info("========================================"); // AI 추천 생성 aiRecommendationService.generateRecommendations(message); // Manual ACK acknowledgment.acknowledge(); - log.info("Kafka 메시지 처리 완료: jobId={}", message.getJobId()); + log.info("✅ Kafka 메시지 처리 완료: jobId={}", message.getJobId()); } catch (Exception e) { - log.error("Kafka 메시지 처리 실패: jobId={}", message.getJobId(), e); + log.error("❌ Kafka 메시지 처리 실패: jobId={}, errorMessage={}", + message != null ? message.getJobId() : "NULL", e.getMessage(), e); // DLQ로 이동하거나 재시도 로직 추가 가능 acknowledgment.acknowledge(); // 실패한 메시지도 ACK (DLQ로 이동) }