AI Service Kafka Consumer 로깅 및 설정 개선

- Kafka Consumer에 상세 로깅 추가 (Topic, Partition, Offset, 메시지 내용)
- Consumer Factory 초기화 로그 추가
- DefaultErrorHandler를 통한 에러 로깅 강화
- JsonDeserializer USE_TYPE_INFO_HEADERS 명시적 설정

이 변경으로 Kafka 메시지 수신 과정을 더 명확하게 추적할 수 있습니다.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
박세원 2025-10-31 11:41:05 +09:00
parent 2314156afe
commit 9dfbb5866b
2 changed files with 33 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package com.kt.ai.config; package com.kt.ai.config;
import com.kt.ai.kafka.message.AIJobMessage; import com.kt.ai.kafka.message.AIJobMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -26,6 +27,7 @@ import java.util.Map;
* @author AI Service Team * @author AI Service Team
* @since 1.0.0 * @since 1.0.0
*/ */
@Slf4j
@EnableKafka @EnableKafka
@Configuration @Configuration
public class KafkaConsumerConfig { public class KafkaConsumerConfig {
@ -41,6 +43,12 @@ public class KafkaConsumerConfig {
*/ */
@Bean @Bean
public ConsumerFactory<String, AIJobMessage> consumerFactory() { public ConsumerFactory<String, AIJobMessage> consumerFactory() {
log.info("========================================");
log.info("Kafka Consumer Factory 초기화 시작");
log.info("Bootstrap Servers: {}", bootstrapServers);
log.info("Consumer Group ID: {}", groupId);
log.info("========================================");
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@ -57,7 +65,9 @@ public class KafkaConsumerConfig {
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName()); props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, AIJobMessage.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, AIJobMessage.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
log.info("✅ Kafka Consumer Factory 설정 완료");
return new DefaultKafkaConsumerFactory<>(props); return new DefaultKafkaConsumerFactory<>(props);
} }
@ -67,10 +77,22 @@ public class KafkaConsumerConfig {
*/ */
@Bean @Bean
public ConcurrentKafkaListenerContainerFactory<String, AIJobMessage> kafkaListenerContainerFactory() { public ConcurrentKafkaListenerContainerFactory<String, AIJobMessage> kafkaListenerContainerFactory() {
log.info("Kafka Listener Container Factory 초기화");
ConcurrentKafkaListenerContainerFactory<String, AIJobMessage> factory = ConcurrentKafkaListenerContainerFactory<String, AIJobMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>(); new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 에러 핸들러 추가
factory.setCommonErrorHandler(new org.springframework.kafka.listener.DefaultErrorHandler((record, exception) -> {
log.error("❌ Kafka 메시지 처리 중 에러 발생");
log.error("Topic: {}, Partition: {}, Offset: {}",
record.topic(), record.partition(), record.offset());
log.error("Error: ", exception);
}));
log.info("✅ Kafka Listener Container Factory 설정 완료");
return factory; return factory;
} }
} }

View File

@ -38,21 +38,28 @@ public class AIJobConsumer {
@Payload AIJobMessage message, @Payload AIJobMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) Long offset, @Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_PARTITION) Integer partition,
Acknowledgment acknowledgment Acknowledgment acknowledgment
) { ) {
try { try {
log.info("Kafka 메시지 수신: topic={}, offset={}, jobId={}, eventId={}", log.info("========================================");
topic, offset, message.getJobId(), message.getEventId()); log.info("Kafka 메시지 수신 시작");
log.info("Topic: {}, Partition: {}, Offset: {}", topic, partition, offset);
log.info("JobId: {}, EventId: {}", message.getJobId(), message.getEventId());
log.info("Industry: {}, Region: {}", message.getIndustry(), message.getRegion());
log.info("Objective: {}, StoreName: {}", message.getObjective(), message.getStoreName());
log.info("========================================");
// AI 추천 생성 // AI 추천 생성
aiRecommendationService.generateRecommendations(message); aiRecommendationService.generateRecommendations(message);
// Manual ACK // Manual ACK
acknowledgment.acknowledge(); acknowledgment.acknowledge();
log.info("Kafka 메시지 처리 완료: jobId={}", message.getJobId()); log.info("Kafka 메시지 처리 완료: jobId={}", message.getJobId());
} catch (Exception e) { } catch (Exception e) {
log.error("Kafka 메시지 처리 실패: jobId={}", message.getJobId(), e); log.error("❌ Kafka 메시지 처리 실패: jobId={}, errorMessage={}",
message != null ? message.getJobId() : "NULL", e.getMessage(), e);
// DLQ로 이동하거나 재시도 로직 추가 가능 // DLQ로 이동하거나 재시도 로직 추가 가능
acknowledgment.acknowledge(); // 실패한 메시지도 ACK (DLQ로 이동) acknowledgment.acknowledge(); // 실패한 메시지도 ACK (DLQ로 이동)
} }