MQ 설정 변경 (kafka -> EventHub)

This commit is contained in:
djeon
2025-10-24 10:35:59 +09:00
parent b790d1b31a
commit d9261bad2c
7 changed files with 419 additions and 179 deletions
@@ -1,91 +1,40 @@
package com.unicorn.hgzero.meeting.infra.config;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Event Hub (Kafka) 설정
* 이벤트 발행을 위한 Kafka 설정
* Azure EventHub 설정
* 이벤트 발행을 위한 EventHub 설정
*/
@Configuration
@Slf4j
public class EventHubConfig {
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
private String bootstrapServers;
@Value("${eventhub.connection-string}")
private String connectionString;
@Value("${spring.kafka.producer.client-id:meeting-service}")
private String clientId;
@Value("${eventhub.name}")
private String eventHubName;
@Value("${spring.kafka.producer.acks:all}")
private String acks;
@Value("${spring.kafka.producer.retries:3}")
private Integer retries;
@Value("${spring.kafka.producer.batch-size:16384}")
private Integer batchSize;
@Value("${spring.kafka.producer.linger-ms:5}")
private Integer lingerMs;
@Value("${spring.kafka.producer.buffer-memory:33554432}")
private Long bufferMemory;
/**
* Kafka Producer 설정
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// 기본 설정
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 성능 및 안정성 설정
configProps.put(ProducerConfig.ACKS_CONFIG, acks);
configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
// 중복 방지 설정
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 압축 설정
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
log.info("Kafka Producer 설정 완료 - bootstrapServers: {}, clientId: {}",
bootstrapServers, clientId);
return new DefaultKafkaProducerFactory<>(configProps);
@PostConstruct
public void init() {
log.info("Initializing Azure EventHub configuration with hub name: {}", eventHubName);
}
/**
* Kafka Template
* EventHub Producer Client 생성
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
// 메시지 전송 결과 로깅
template.setProducerListener(new org.springframework.kafka.support.LoggingProducerListener<>());
log.info("Kafka Template 설정 완료");
return template;
@Bean(name = "eventProducer")
public EventHubProducerClient eventProducer() {
log.info("Creating EventHub producer for hub: {}", eventHubName);
return new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
}
}
@@ -0,0 +1,30 @@
package com.unicorn.hgzero.meeting.infra.event.constant;
/**
* EventHub 이벤트 관련 상수 정의
* Azure EventHub에서 사용하는 이벤트 타입과 토픽 정의
*/
public class EventHubConstants {
// 이벤트 타입 상수
public static final String EVENT_TYPE_MEETING_STARTED = "MEETING_STARTED";
public static final String EVENT_TYPE_MEETING_ENDED = "MEETING_ENDED";
public static final String EVENT_TYPE_TODO_ASSIGNED = "TODO_ASSIGNED";
public static final String EVENT_TYPE_TODO_COMPLETED = "TODO_COMPLETED";
public static final String EVENT_TYPE_MINUTES_FINALIZED = "MINUTES_FINALIZED";
public static final String EVENT_TYPE_NOTIFICATION_REQUEST = "NOTIFICATION_REQUEST";
// 토픽 이름 상수
public static final String TOPIC_MEETING = "meeting";
public static final String TOPIC_TODO = "todo";
public static final String TOPIC_MINUTES = "minutes";
public static final String TOPIC_NOTIFICATION = "notification";
// 속성 키 상수
public static final String PROPERTY_TYPE = "type";
public static final String PROPERTY_TOPIC = "topic";
private EventHubConstants() {
// 유틸리티 클래스는 인스턴스화 방지
}
}
@@ -1,102 +1,69 @@
package com.unicorn.hgzero.meeting.infra.event.publisher;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.unicorn.hgzero.meeting.infra.event.constant.EventHubConstants;
import com.unicorn.hgzero.meeting.infra.event.dto.MeetingStartedEvent;
import com.unicorn.hgzero.meeting.infra.event.dto.MeetingEndedEvent;
import com.unicorn.hgzero.meeting.infra.event.dto.TodoAssignedEvent;
import com.unicorn.hgzero.meeting.infra.event.dto.NotificationRequestEvent;
import java.time.LocalDate;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* Event Hub 이벤트 발행 구현체
* Kafka를 통한 이벤트 발행
* Azure EventHub 이벤트 발행 구현체
* Azure EventHub를 통한 이벤트 발행
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class EventHubPublisher implements EventPublisher {
private final KafkaTemplate<String, String> kafkaTemplate;
private final EventHubProducerClient eventProducer;
private final ObjectMapper objectMapper;
@Value("${app.event.topic.meeting-started:meeting-started}")
private String meetingStartedTopic;
@Value("${app.event.topic.meeting-ended:meeting-ended}")
private String meetingEndedTopic;
@Value("${app.event.topic.todo-assigned:todo-assigned}")
private String todoAssignedTopic;
@Value("${app.event.topic.notification-request:notification-request}")
private String notificationRequestTopic;
@Value("${app.event.topic.todo-completed:todo-completed}")
private String todoCompletedTopic;
@Value("${app.event.topic.minutes-finalized:minutes-finalized}")
private String minutesFinalizedTopic;
public EventHubPublisher(
@Qualifier("eventProducer") EventHubProducerClient eventProducer,
ObjectMapper objectMapper) {
this.eventProducer = eventProducer;
this.objectMapper = objectMapper;
}
@Override
public void publishMeetingStarted(MeetingStartedEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send(meetingStartedTopic, event.getMeetingId(), payload);
log.info("회의 시작 이벤트 발행 완료 - meetingId: {}", event.getMeetingId());
} catch (Exception e) {
log.error("회의 시작 이벤트 발행 실패 - meetingId: {}", event.getMeetingId(), e);
throw new RuntimeException("회의 시작 이벤트 발행 실패", e);
}
publishEvent(event, event.getMeetingId(),
EventHubConstants.TOPIC_MEETING,
EventHubConstants.EVENT_TYPE_MEETING_STARTED);
}
@Override
public void publishMeetingEnded(MeetingEndedEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send(meetingEndedTopic, event.getMeetingId(), payload);
log.info("회의 종료 이벤트 발행 완료 - meetingId: {}", event.getMeetingId());
} catch (Exception e) {
log.error("회의 종료 이벤트 발행 실패 - meetingId: {}", event.getMeetingId(), e);
throw new RuntimeException("회의 종료 이벤트 발행 실패", e);
}
publishEvent(event, event.getMeetingId(),
EventHubConstants.TOPIC_MEETING,
EventHubConstants.EVENT_TYPE_MEETING_ENDED);
}
@Override
public void publishTodoAssigned(TodoAssignedEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send(todoAssignedTopic, event.getTodoId(), payload);
log.info("Todo 할당 이벤트 발행 완료 - todoId: {}", event.getTodoId());
} catch (Exception e) {
log.error("Todo 할당 이벤트 발행 실패 - todoId: {}", event.getTodoId(), e);
throw new RuntimeException("Todo 할당 이벤트 발행 실패", e);
}
publishEvent(event, event.getTodoId(),
EventHubConstants.TOPIC_TODO,
EventHubConstants.EVENT_TYPE_TODO_ASSIGNED);
}
@Override
public void publishNotificationRequest(NotificationRequestEvent event) {
try {
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send(notificationRequestTopic, event.getRecipientId(), payload);
log.info("알림 요청 이벤트 발행 완료 - type: {}, recipientId: {}",
event.getNotificationType(), event.getRecipientId());
} catch (Exception e) {
log.error("알림 요청 이벤트 발행 실패 - type: {}, recipientId: {}",
event.getNotificationType(), event.getRecipientId(), e);
throw new RuntimeException("알림 요청 이벤트 발행 실패", e);
}
publishEvent(event, event.getRecipientId(),
EventHubConstants.TOPIC_NOTIFICATION,
EventHubConstants.EVENT_TYPE_NOTIFICATION_REQUEST);
}
// 편의 메서드 구현
@Override
public void publishTodoAssigned(String todoId, String title, String assigneeId, String assigneeName,
public void publishTodoAssigned(String todoId, String title, String assigneeId, String assigneeName,
String assignedBy, String assignedByName, LocalDate dueDate) {
TodoAssignedEvent event = TodoAssignedEvent.builder()
.todoId(todoId)
@@ -108,60 +75,85 @@ public class EventHubPublisher implements EventPublisher {
.dueDate(dueDate)
.assignedAt(LocalDateTime.now())
.build();
publishTodoAssigned(event);
}
@Override
public void publishTodoCompleted(String todoId, String title, String assigneeId, String assigneeName,
String completedBy, String completedByName) {
try {
// Todo 완료 이벤트는 NotificationRequestEvent로 발행
NotificationRequestEvent event = NotificationRequestEvent.builder()
.notificationType("TODO_COMPLETED")
.recipientId(assigneeId)
.recipientName(assigneeName)
.title("Todo 완료")
.message(String.format("Todo '%s'가 완료되었습니다", title))
.relatedEntityId(todoId)
.relatedEntityType("TODO")
.requestedBy(completedBy)
.requestedByName(completedByName)
.eventTime(LocalDateTime.now())
.build();
String completedBy, String completedByName) {
NotificationRequestEvent event = NotificationRequestEvent.builder()
.notificationType("TODO_COMPLETED")
.recipientId(assigneeId)
.recipientName(assigneeName)
.title("Todo 완료")
.message(String.format("Todo '%s'가 완료되었습니다", title))
.relatedEntityId(todoId)
.relatedEntityType("TODO")
.requestedBy(completedBy)
.requestedByName(completedByName)
.eventTime(LocalDateTime.now())
.build();
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send(todoCompletedTopic, todoId, payload);
log.info("Todo 완료 이벤트 발행 완료 - todoId: {}", todoId);
} catch (Exception e) {
log.error("Todo 완료 이벤트 발행 실패 - todoId: {}", todoId, e);
throw new RuntimeException("Todo 완료 이벤트 발행 실패", e);
}
publishEvent(event, todoId,
EventHubConstants.TOPIC_TODO,
EventHubConstants.EVENT_TYPE_TODO_COMPLETED);
}
@Override
public void publishMinutesFinalized(String minutesId, String title, String finalizedBy, String finalizedByName) {
try {
// 회의록 확정 이벤트는 NotificationRequestEvent로 발행
NotificationRequestEvent event = NotificationRequestEvent.builder()
.notificationType("MINUTES_FINALIZED")
.recipientId(finalizedBy)
.recipientName(finalizedByName)
.title("회의록 확정")
.message(String.format("회의록 '%s'가 확정되었습니다", title))
.relatedEntityId(minutesId)
.relatedEntityType("MINUTES")
.requestedBy(finalizedBy)
.requestedByName(finalizedByName)
.eventTime(LocalDateTime.now())
.build();
NotificationRequestEvent event = NotificationRequestEvent.builder()
.notificationType("MINUTES_FINALIZED")
.recipientId(finalizedBy)
.recipientName(finalizedByName)
.title("회의록 확정")
.message(String.format("회의록 '%s'가 확정되었습니다", title))
.relatedEntityId(minutesId)
.relatedEntityType("MINUTES")
.requestedBy(finalizedBy)
.requestedByName(finalizedByName)
.eventTime(LocalDateTime.now())
.build();
publishEvent(event, minutesId,
EventHubConstants.TOPIC_MINUTES,
EventHubConstants.EVENT_TYPE_MINUTES_FINALIZED);
}
/**
* 이벤트 발행 공통 메서드
*
* @param event 발행할 이벤트 객체
* @param partitionKey 파티션 키 (이벤트 ID 등)
* @param topic 토픽 이름
* @param eventType 이벤트 타입
* @param <T> 이벤트 타입
*/
private <T> void publishEvent(T event, String partitionKey, String topic, String eventType) {
try {
CreateBatchOptions options = new CreateBatchOptions()
.setPartitionKey(partitionKey);
EventDataBatch batch = eventProducer.createBatch(options);
String eventJson = objectMapper.writeValueAsString(event);
EventData eventData = new EventData(eventJson);
// 이벤트 속성 설정
eventData.getProperties().put(EventHubConstants.PROPERTY_TYPE, eventType);
eventData.getProperties().put(EventHubConstants.PROPERTY_TOPIC, topic);
if (!batch.tryAdd(eventData)) {
throw new RuntimeException("이벤트 크기가 너무 큽니다");
}
eventProducer.send(batch);
log.info("이벤트 발행 완료: topic={}, type={}, partitionKey={}",
topic, eventType, partitionKey);
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send(minutesFinalizedTopic, minutesId, payload);
log.info("회의록 확정 이벤트 발행 완료 - minutesId: {}", minutesId);
} catch (Exception e) {
log.error("회의록 확정 이벤트 발행 실패 - minutesId: {}", minutesId, e);
throw new RuntimeException("회의록 확정 이벤트 발행 실패", e);
log.error("이벤트 발행 실패: topic={}, type={}, partitionKey={}, error={}",
topic, eventType, partitionKey, e.getMessage(), e);
throw new RuntimeException("이벤트 발행 중 오류가 발생했습니다", e);
}
}
}
}
+7 -1
View File
@@ -125,5 +125,11 @@ api:
# Azure EventHub Configuration
eventhub:
connection-string: ${EVENTHUB_CONNECTION_STRING:}
name: ${EVENTHUB_NAME:hgzero-eventhub-name}
name: ${EVENTHUB_NAME:hgzero-events}
consumer-group: ${EVENTHUB_CONSUMER_GROUP:$Default}
# Azure Storage Configuration (for EventHub checkpoints)
azure:
storage:
connection-string: ${AZURE_STORAGE_CONNECTION_STRING:}
container: ${AZURE_STORAGE_CONTAINER:hgzero-checkpoints}