mirror of
https://github.com/ktds-dg0501/kt-event-marketing.git
synced 2026-06-13 11:39:11 +00:00
Swagger 관련 변경사항 롤백 및 정리
This commit is contained in:
@@ -0,0 +1,23 @@
|
||||
package com.kt.distribution;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.openfeign.EnableFeignClients;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
|
||||
/**
|
||||
* Distribution Service Application
|
||||
* 다중 채널 배포 관리 서비스
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@SpringBootApplication
|
||||
@EnableKafka
|
||||
@EnableFeignClients
|
||||
public class DistributionApplication {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(DistributionApplication.class, args);
|
||||
}
|
||||
}
|
||||
+86
@@ -0,0 +1,86 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
|
||||
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
|
||||
import io.github.resilience4j.retry.annotation.Retry;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Abstract Channel Adapter
|
||||
* 공통 로직 및 Resilience4j 적용
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class AbstractChannelAdapter implements ChannelAdapter {
|
||||
|
||||
/**
|
||||
* 채널로 배포 실행 (Resilience4j 적용)
|
||||
*
|
||||
* @param request DistributionRequest
|
||||
* @return ChannelDistributionResult
|
||||
*/
|
||||
@Override
|
||||
@CircuitBreaker(name = "channelApi", fallbackMethod = "fallback")
|
||||
@Retry(name = "channelApi")
|
||||
@Bulkhead(name = "channelApi")
|
||||
public ChannelDistributionResult distribute(DistributionRequest request) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
log.info("Starting distribution to channel: {}, eventId: {}",
|
||||
getChannelType(), request.getEventId());
|
||||
|
||||
// 실제 외부 API 호출 (구현체에서 구현)
|
||||
ChannelDistributionResult result = executeDistribution(request);
|
||||
result.setExecutionTimeMs(System.currentTimeMillis() - startTime);
|
||||
|
||||
log.info("Distribution completed successfully: channel={}, eventId={}, executionTime={}ms",
|
||||
getChannelType(), request.getEventId(), result.getExecutionTimeMs());
|
||||
|
||||
return result;
|
||||
|
||||
} catch (Exception e) {
|
||||
long executionTime = System.currentTimeMillis() - startTime;
|
||||
log.error("Distribution failed: channel={}, eventId={}, error={}",
|
||||
getChannelType(), request.getEventId(), e.getMessage(), e);
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(getChannelType())
|
||||
.success(false)
|
||||
.errorMessage(e.getMessage())
|
||||
.executionTimeMs(executionTime)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 실제 외부 API 호출 로직 (구현체에서 구현)
|
||||
*
|
||||
* @param request DistributionRequest
|
||||
* @return ChannelDistributionResult
|
||||
*/
|
||||
protected abstract ChannelDistributionResult executeDistribution(DistributionRequest request);
|
||||
|
||||
/**
|
||||
* Fallback 메서드 (Circuit Breaker Open 시)
|
||||
*
|
||||
* @param request DistributionRequest
|
||||
* @param throwable Throwable
|
||||
* @return ChannelDistributionResult
|
||||
*/
|
||||
protected ChannelDistributionResult fallback(DistributionRequest request, Throwable throwable) {
|
||||
log.warn("Fallback triggered for channel: {}, eventId: {}, reason: {}",
|
||||
getChannelType(), request.getEventId(), throwable.getMessage());
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(getChannelType())
|
||||
.success(false)
|
||||
.errorMessage("Circuit Breaker Open: " + throwable.getMessage())
|
||||
.executionTimeMs(0)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
|
||||
/**
|
||||
* Channel Adapter Interface
|
||||
* 각 채널별 배포 API를 호출하는 인터페이스
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
public interface ChannelAdapter {
|
||||
|
||||
/**
|
||||
* 지원하는 채널 타입
|
||||
*
|
||||
* @return ChannelType
|
||||
*/
|
||||
ChannelType getChannelType();
|
||||
|
||||
/**
|
||||
* 채널로 배포 실행
|
||||
*
|
||||
* @param request DistributionRequest
|
||||
* @return ChannelDistributionResult
|
||||
*/
|
||||
ChannelDistributionResult distribute(DistributionRequest request);
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 지니TV Adapter
|
||||
* 지니TV 광고 등록 API 호출
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class GiniTvAdapter extends AbstractChannelAdapter {
|
||||
|
||||
@Value("${channel.apis.ginitv.url}")
|
||||
private String apiUrl;
|
||||
|
||||
@Override
|
||||
public ChannelType getChannelType() {
|
||||
return ChannelType.GINITV;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDistributionResult executeDistribution(DistributionRequest request) {
|
||||
log.debug("Calling GiniTV API: url={}, eventId={}", apiUrl, request.getEventId());
|
||||
|
||||
// TODO: 실제 API 호출 (현재는 Mock)
|
||||
String distributionId = "GTIV-" + UUID.randomUUID().toString();
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(ChannelType.GINITV)
|
||||
.success(true)
|
||||
.distributionId(distributionId)
|
||||
.estimatedReach(10000) // TV 광고 노출 수
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Instagram Adapter
|
||||
* Instagram 포스팅 API 호출
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class InstagramAdapter extends AbstractChannelAdapter {
|
||||
|
||||
@Value("${channel.apis.instagram.url}")
|
||||
private String apiUrl;
|
||||
|
||||
@Override
|
||||
public ChannelType getChannelType() {
|
||||
return ChannelType.INSTAGRAM;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDistributionResult executeDistribution(DistributionRequest request) {
|
||||
log.debug("Calling Instagram API: url={}, eventId={}", apiUrl, request.getEventId());
|
||||
|
||||
// TODO: 실제 API 호출 (현재는 Mock)
|
||||
String distributionId = "INSTA-" + UUID.randomUUID().toString();
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(ChannelType.INSTAGRAM)
|
||||
.success(true)
|
||||
.distributionId(distributionId)
|
||||
.estimatedReach(3000) // 팔로워 수 기반 예상 노출
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Kakao Channel Adapter
|
||||
* Kakao Channel 포스팅 API 호출
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class KakaoAdapter extends AbstractChannelAdapter {
|
||||
|
||||
@Value("${channel.apis.kakao.url}")
|
||||
private String apiUrl;
|
||||
|
||||
@Override
|
||||
public ChannelType getChannelType() {
|
||||
return ChannelType.KAKAO;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDistributionResult executeDistribution(DistributionRequest request) {
|
||||
log.debug("Calling Kakao API: url={}, eventId={}", apiUrl, request.getEventId());
|
||||
|
||||
// TODO: 실제 API 호출 (현재는 Mock)
|
||||
String distributionId = "KAKAO-" + UUID.randomUUID().toString();
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(ChannelType.KAKAO)
|
||||
.success(true)
|
||||
.distributionId(distributionId)
|
||||
.estimatedReach(4000) // 채널 친구 수 기반
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Naver Blog Adapter
|
||||
* Naver Blog 포스팅 API 호출
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NaverAdapter extends AbstractChannelAdapter {
|
||||
|
||||
@Value("${channel.apis.naver.url}")
|
||||
private String apiUrl;
|
||||
|
||||
@Override
|
||||
public ChannelType getChannelType() {
|
||||
return ChannelType.NAVER;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDistributionResult executeDistribution(DistributionRequest request) {
|
||||
log.debug("Calling Naver API: url={}, eventId={}", apiUrl, request.getEventId());
|
||||
|
||||
// TODO: 실제 API 호출 (현재는 Mock)
|
||||
String distributionId = "NAVER-" + UUID.randomUUID().toString();
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(ChannelType.NAVER)
|
||||
.success(true)
|
||||
.distributionId(distributionId)
|
||||
.estimatedReach(2000) // 블로그 방문자 수 기반
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 링고비즈 Adapter
|
||||
* 링고비즈 연결음 업데이트 API 호출
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class RingoBizAdapter extends AbstractChannelAdapter {
|
||||
|
||||
@Value("${channel.apis.ringobiz.url}")
|
||||
private String apiUrl;
|
||||
|
||||
@Override
|
||||
public ChannelType getChannelType() {
|
||||
return ChannelType.RINGOBIZ;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDistributionResult executeDistribution(DistributionRequest request) {
|
||||
log.debug("Calling RingoBiz API: url={}, eventId={}", apiUrl, request.getEventId());
|
||||
|
||||
// TODO: 실제 API 호출 (현재는 Mock)
|
||||
String distributionId = "RBIZ-" + UUID.randomUUID().toString();
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(ChannelType.RINGOBIZ)
|
||||
.success(true)
|
||||
.distributionId(distributionId)
|
||||
.estimatedReach(1000) // 연결음 사용자 수
|
||||
.build();
|
||||
}
|
||||
}
|
||||
+72
@@ -0,0 +1,72 @@
|
||||
package com.kt.distribution.adapter;
|
||||
|
||||
import com.kt.distribution.dto.ChannelDistributionResult;
|
||||
import com.kt.distribution.dto.ChannelType;
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* 우리동네TV Adapter
|
||||
* 우리동네TV API 호출
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class UriDongNeTvAdapter extends AbstractChannelAdapter {
|
||||
|
||||
@Value("${channel.apis.uridongnetv.url}")
|
||||
private String apiUrl;
|
||||
|
||||
private final RestTemplate restTemplate = new RestTemplate();
|
||||
|
||||
@Override
|
||||
public ChannelType getChannelType() {
|
||||
return ChannelType.URIDONGNETV;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelDistributionResult executeDistribution(DistributionRequest request) {
|
||||
log.debug("Calling UriDongNeTV API: url={}, eventId={}", apiUrl, request.getEventId());
|
||||
|
||||
// 외부 API 호출 준비
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("eventId", request.getEventId());
|
||||
payload.put("title", request.getTitle());
|
||||
payload.put("videoUrl", request.getImageUrl()); // 이미지를 영상으로 변환한 URL
|
||||
payload.put("radius", getChannelSetting(request, "radius", "500m"));
|
||||
payload.put("timeSlot", getChannelSetting(request, "timeSlot", "evening"));
|
||||
|
||||
// TODO: 실제 API 호출 (현재는 Mock)
|
||||
// ResponseEntity<Map> response = restTemplate.postForEntity(apiUrl + "/distribute", payload, Map.class);
|
||||
|
||||
// Mock 응답
|
||||
String distributionId = "UDTV-" + UUID.randomUUID().toString();
|
||||
int estimatedReach = 5000;
|
||||
|
||||
return ChannelDistributionResult.builder()
|
||||
.channel(ChannelType.URIDONGNETV)
|
||||
.success(true)
|
||||
.distributionId(distributionId)
|
||||
.estimatedReach(estimatedReach)
|
||||
.build();
|
||||
}
|
||||
|
||||
private String getChannelSetting(DistributionRequest request, String key, String defaultValue) {
|
||||
if (request.getChannelSettings() != null) {
|
||||
Map<String, Object> settings = request.getChannelSettings().get(ChannelType.URIDONGNETV.name());
|
||||
if (settings != null && settings.containsKey(key)) {
|
||||
return settings.get(key).toString();
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.kt.distribution.config;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
|
||||
/**
|
||||
* Kafka Configuration
|
||||
* Kafka Producer 설정
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class KafkaConfig {
|
||||
|
||||
@Value("${spring.kafka.bootstrap-servers}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, Object> producerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
|
||||
configProps.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, Object> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.kt.distribution.config;
|
||||
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.web.servlet.config.annotation.CorsRegistry;
|
||||
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
|
||||
|
||||
/**
|
||||
* Web Configuration
|
||||
* CORS 설정 및 기타 웹 관련 설정
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-24
|
||||
*/
|
||||
@Configuration
|
||||
public class WebConfig implements WebMvcConfigurer {
|
||||
|
||||
/**
|
||||
* CORS 설정
|
||||
* - 모든 origin 허용 (개발 환경)
|
||||
* - 모든 HTTP 메서드 허용
|
||||
* - Credentials 허용
|
||||
*/
|
||||
@Override
|
||||
public void addCorsMappings(CorsRegistry registry) {
|
||||
registry.addMapping("/**")
|
||||
.allowedOriginPatterns("*")
|
||||
.allowedMethods("GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS")
|
||||
.allowedHeaders("*")
|
||||
.allowCredentials(true)
|
||||
.maxAge(3600);
|
||||
}
|
||||
}
|
||||
+71
@@ -0,0 +1,71 @@
|
||||
package com.kt.distribution.controller;
|
||||
|
||||
import com.kt.distribution.dto.DistributionRequest;
|
||||
import com.kt.distribution.dto.DistributionResponse;
|
||||
import com.kt.distribution.dto.DistributionStatusResponse;
|
||||
import com.kt.distribution.service.DistributionService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* Distribution Controller
|
||||
* POST /api/distribution/distribute - 다중 채널 배포 실행
|
||||
* GET /api/distribution/{eventId}/status - 배포 상태 조회
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("/api/distribution")
|
||||
@RequiredArgsConstructor
|
||||
public class DistributionController {
|
||||
|
||||
private final DistributionService distributionService;
|
||||
|
||||
/**
|
||||
* 다중 채널 배포 실행
|
||||
* UFR-DIST-010: 다중채널배포
|
||||
*
|
||||
* @param request DistributionRequest
|
||||
* @return DistributionResponse
|
||||
*/
|
||||
@PostMapping("/distribute")
|
||||
public ResponseEntity<DistributionResponse> distribute(@RequestBody DistributionRequest request) {
|
||||
log.info("Received distribution request: eventId={}, channels={}",
|
||||
request.getEventId(), request.getChannels());
|
||||
|
||||
DistributionResponse response = distributionService.distribute(request);
|
||||
|
||||
log.info("Distribution request processed: eventId={}, success={}, successCount={}, failureCount={}",
|
||||
response.getEventId(), response.isSuccess(),
|
||||
response.getSuccessCount(), response.getFailureCount());
|
||||
|
||||
return ResponseEntity.ok(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* 배포 상태 조회
|
||||
* UFR-DIST-020: 배포상태조회
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @return DistributionStatusResponse
|
||||
*/
|
||||
@GetMapping("/{eventId}/status")
|
||||
public ResponseEntity<DistributionStatusResponse> getDistributionStatus(@PathVariable String eventId) {
|
||||
log.info("Received distribution status request: eventId={}", eventId);
|
||||
|
||||
DistributionStatusResponse response = distributionService.getDistributionStatus(eventId);
|
||||
|
||||
log.info("Distribution status retrieved: eventId={}, overallStatus={}",
|
||||
eventId, response.getOverallStatus());
|
||||
|
||||
if ("NOT_FOUND".equals(response.getOverallStatus())) {
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
|
||||
return ResponseEntity.ok(response);
|
||||
}
|
||||
}
|
||||
+49
@@ -0,0 +1,49 @@
|
||||
package com.kt.distribution.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 채널별 배포 결과
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ChannelDistributionResult {
|
||||
|
||||
/**
|
||||
* 채널 타입
|
||||
*/
|
||||
private ChannelType channel;
|
||||
|
||||
/**
|
||||
* 배포 성공 여부
|
||||
*/
|
||||
private boolean success;
|
||||
|
||||
/**
|
||||
* 배포 ID (성공 시)
|
||||
*/
|
||||
private String distributionId;
|
||||
|
||||
/**
|
||||
* 예상 노출 수 (성공 시)
|
||||
*/
|
||||
private Integer estimatedReach;
|
||||
|
||||
/**
|
||||
* 에러 메시지 (실패 시)
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* 배포 소요 시간 (ms)
|
||||
*/
|
||||
private long executionTimeMs;
|
||||
}
|
||||
@@ -0,0 +1,100 @@
|
||||
package com.kt.distribution.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 채널별 배포 상태 DTO
|
||||
*
|
||||
* 각 채널의 배포 진행 상태 및 결과 정보를 담습니다.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ChannelStatus {
|
||||
|
||||
/**
|
||||
* 채널 타입
|
||||
*/
|
||||
private ChannelType channel;
|
||||
|
||||
/**
|
||||
* 채널별 배포 상태
|
||||
* - PENDING: 대기 중
|
||||
* - IN_PROGRESS: 진행 중
|
||||
* - COMPLETED: 완료
|
||||
* - FAILED: 실패
|
||||
*/
|
||||
private String status;
|
||||
|
||||
/**
|
||||
* 진행률 (0-100, IN_PROGRESS 상태일 때 사용)
|
||||
*/
|
||||
private Integer progress;
|
||||
|
||||
/**
|
||||
* 채널별 배포 ID (우리동네TV, 지니TV 등)
|
||||
*/
|
||||
private String distributionId;
|
||||
|
||||
/**
|
||||
* 예상 노출 수 (우리동네TV, 지니TV)
|
||||
*/
|
||||
private Integer estimatedViews;
|
||||
|
||||
/**
|
||||
* 업데이트 완료 시각 (링고비즈)
|
||||
*/
|
||||
private LocalDateTime updateTimestamp;
|
||||
|
||||
/**
|
||||
* 광고 ID (지니TV)
|
||||
*/
|
||||
private String adId;
|
||||
|
||||
/**
|
||||
* 노출 스케줄 (지니TV)
|
||||
*/
|
||||
private List<String> impressionSchedule;
|
||||
|
||||
/**
|
||||
* 게시물 URL (Instagram, Naver Blog)
|
||||
*/
|
||||
private String postUrl;
|
||||
|
||||
/**
|
||||
* 게시물 ID (Instagram)
|
||||
*/
|
||||
private String postId;
|
||||
|
||||
/**
|
||||
* 메시지 ID (Kakao Channel)
|
||||
*/
|
||||
private String messageId;
|
||||
|
||||
/**
|
||||
* 완료 시각
|
||||
*/
|
||||
private LocalDateTime completedAt;
|
||||
|
||||
/**
|
||||
* 오류 메시지 (실패 시)
|
||||
*/
|
||||
private String errorMessage;
|
||||
|
||||
/**
|
||||
* 재시도 횟수
|
||||
*/
|
||||
private Integer retries;
|
||||
|
||||
/**
|
||||
* 마지막 재시도 시각
|
||||
*/
|
||||
private LocalDateTime lastRetryAt;
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package com.kt.distribution.dto;
|
||||
|
||||
/**
|
||||
* 배포 채널 타입
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
public enum ChannelType {
|
||||
URIDONGNETV("우리동네TV"),
|
||||
RINGOBIZ("링고비즈"),
|
||||
GINITV("지니TV"),
|
||||
INSTAGRAM("Instagram"),
|
||||
NAVER("Naver Blog"),
|
||||
KAKAO("Kakao Channel");
|
||||
|
||||
private final String displayName;
|
||||
|
||||
ChannelType(String displayName) {
|
||||
this.displayName = displayName;
|
||||
}
|
||||
|
||||
public String getDisplayName() {
|
||||
return displayName;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.kt.distribution.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 배포 요청 DTO
|
||||
* POST /api/distribution/distribute
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DistributionRequest {
|
||||
|
||||
/**
|
||||
* 이벤트 ID
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 이벤트 제목
|
||||
*/
|
||||
private String title;
|
||||
|
||||
/**
|
||||
* 이벤트 설명
|
||||
*/
|
||||
private String description;
|
||||
|
||||
/**
|
||||
* 이미지 URL (CDN)
|
||||
*/
|
||||
private String imageUrl;
|
||||
|
||||
/**
|
||||
* 배포할 채널 목록
|
||||
*/
|
||||
private List<ChannelType> channels;
|
||||
|
||||
/**
|
||||
* 채널별 추가 설정 (Optional)
|
||||
* 예: { "URIDONGNETV": { "radius": "1km", "timeSlot": "evening" } }
|
||||
*/
|
||||
private Map<String, Map<String, Object>> channelSettings;
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
package com.kt.distribution.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 배포 응답 DTO
|
||||
* POST /api/distribution/distribute
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DistributionResponse {
|
||||
|
||||
/**
|
||||
* 이벤트 ID
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 배포 성공 여부 (모든 채널 또는 일부 채널 성공)
|
||||
*/
|
||||
private boolean success;
|
||||
|
||||
/**
|
||||
* 채널별 배포 결과
|
||||
*/
|
||||
private List<ChannelDistributionResult> channelResults;
|
||||
|
||||
/**
|
||||
* 성공한 채널 수
|
||||
*/
|
||||
private int successCount;
|
||||
|
||||
/**
|
||||
* 실패한 채널 수
|
||||
*/
|
||||
private int failureCount;
|
||||
|
||||
/**
|
||||
* 배포 완료 시각
|
||||
*/
|
||||
private LocalDateTime completedAt;
|
||||
|
||||
/**
|
||||
* 전체 배포 소요 시간 (ms)
|
||||
*/
|
||||
private long totalExecutionTimeMs;
|
||||
|
||||
/**
|
||||
* 메시지
|
||||
*/
|
||||
private String message;
|
||||
}
|
||||
+52
@@ -0,0 +1,52 @@
|
||||
package com.kt.distribution.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 배포 상태 조회 응답 DTO
|
||||
*
|
||||
* 특정 이벤트의 전체 배포 상태 및 채널별 상세 상태 정보를 담습니다.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DistributionStatusResponse {
|
||||
|
||||
/**
|
||||
* 이벤트 ID
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 전체 배포 상태
|
||||
* - PENDING: 대기 중
|
||||
* - IN_PROGRESS: 진행 중
|
||||
* - COMPLETED: 완료
|
||||
* - PARTIAL_FAILURE: 부분 성공
|
||||
* - FAILED: 실패
|
||||
* - NOT_FOUND: 배포 이력 없음
|
||||
*/
|
||||
private String overallStatus;
|
||||
|
||||
/**
|
||||
* 배포 시작 시각
|
||||
*/
|
||||
private LocalDateTime startedAt;
|
||||
|
||||
/**
|
||||
* 배포 완료 시각
|
||||
*/
|
||||
private LocalDateTime completedAt;
|
||||
|
||||
/**
|
||||
* 채널별 배포 상태 목록
|
||||
*/
|
||||
private List<ChannelStatus> channels;
|
||||
}
|
||||
+48
@@ -0,0 +1,48 @@
|
||||
package com.kt.distribution.event;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Distribution Completed Event
|
||||
* 배포 완료 시 Kafka로 발행하는 이벤트
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DistributionCompletedEvent {
|
||||
|
||||
/**
|
||||
* 이벤트 ID
|
||||
*/
|
||||
private String eventId;
|
||||
|
||||
/**
|
||||
* 배포 완료된 채널 목록
|
||||
*/
|
||||
private List<String> distributedChannels;
|
||||
|
||||
/**
|
||||
* 배포 완료 시각
|
||||
*/
|
||||
private LocalDateTime completedAt;
|
||||
|
||||
/**
|
||||
* 성공한 채널 수
|
||||
*/
|
||||
private int successCount;
|
||||
|
||||
/**
|
||||
* 실패한 채널 수
|
||||
*/
|
||||
private int failureCount;
|
||||
}
|
||||
+65
@@ -0,0 +1,65 @@
|
||||
package com.kt.distribution.repository;
|
||||
|
||||
import com.kt.distribution.dto.DistributionStatusResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 배포 상태 저장소
|
||||
*
|
||||
* 메모리 기반으로 배포 상태를 관리합니다.
|
||||
* 실제 운영 환경에서는 Redis 또는 데이터베이스를 사용하여 영구 저장하는 것을 권장합니다.
|
||||
*/
|
||||
@Slf4j
|
||||
@Repository
|
||||
public class DistributionStatusRepository {
|
||||
|
||||
/**
|
||||
* 이벤트 ID를 키로 배포 상태를 저장하는 메모리 저장소
|
||||
*/
|
||||
private final Map<String, DistributionStatusResponse> distributionStatuses = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 배포 상태 저장
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @param status 배포 상태
|
||||
*/
|
||||
public void save(String eventId, DistributionStatusResponse status) {
|
||||
log.debug("Saving distribution status: eventId={}, overallStatus={}", eventId, status.getOverallStatus());
|
||||
distributionStatuses.put(eventId, status);
|
||||
}
|
||||
|
||||
/**
|
||||
* 배포 상태 조회
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @return 배포 상태 (없으면 Optional.empty())
|
||||
*/
|
||||
public Optional<DistributionStatusResponse> findByEventId(String eventId) {
|
||||
log.debug("Finding distribution status: eventId={}", eventId);
|
||||
return Optional.ofNullable(distributionStatuses.get(eventId));
|
||||
}
|
||||
|
||||
/**
|
||||
* 배포 상태 삭제
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
*/
|
||||
public void delete(String eventId) {
|
||||
log.debug("Deleting distribution status: eventId={}", eventId);
|
||||
distributionStatuses.remove(eventId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 모든 배포 상태 삭제 (테스트용)
|
||||
*/
|
||||
public void deleteAll() {
|
||||
log.debug("Deleting all distribution statuses");
|
||||
distributionStatuses.clear();
|
||||
}
|
||||
}
|
||||
+261
@@ -0,0 +1,261 @@
|
||||
package com.kt.distribution.service;
|
||||
|
||||
import com.kt.distribution.adapter.ChannelAdapter;
|
||||
import com.kt.distribution.dto.*;
|
||||
import com.kt.distribution.event.DistributionCompletedEvent;
|
||||
import com.kt.distribution.repository.DistributionStatusRepository;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Distribution Service
|
||||
* 다중 채널 병렬 배포 및 Kafka Event 발행
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class DistributionService {
|
||||
|
||||
private final List<ChannelAdapter> channelAdapters;
|
||||
private final Optional<KafkaEventPublisher> kafkaEventPublisher;
|
||||
private final DistributionStatusRepository statusRepository;
|
||||
|
||||
@Autowired
|
||||
public DistributionService(List<ChannelAdapter> channelAdapters,
|
||||
Optional<KafkaEventPublisher> kafkaEventPublisher,
|
||||
DistributionStatusRepository statusRepository) {
|
||||
this.channelAdapters = channelAdapters;
|
||||
this.kafkaEventPublisher = kafkaEventPublisher;
|
||||
this.statusRepository = statusRepository;
|
||||
}
|
||||
|
||||
// 병렬 실행을 위한 ExecutorService (채널별 스레드 풀)
|
||||
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
||||
|
||||
/**
|
||||
* 다중 채널 병렬 배포
|
||||
*
|
||||
* @param request DistributionRequest
|
||||
* @return DistributionResponse
|
||||
*/
|
||||
public DistributionResponse distribute(DistributionRequest request) {
|
||||
LocalDateTime startedAt = LocalDateTime.now();
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
log.info("Starting multi-channel distribution: eventId={}, channels={}",
|
||||
request.getEventId(), request.getChannels());
|
||||
|
||||
// 배포 시작 상태 저장 (IN_PROGRESS)
|
||||
saveInProgressStatus(request.getEventId(), request.getChannels(), startedAt);
|
||||
|
||||
// 채널 어댑터 매핑 (타입별)
|
||||
Map<String, ChannelAdapter> adapterMap = channelAdapters.stream()
|
||||
.collect(Collectors.toMap(
|
||||
adapter -> adapter.getChannelType().name(),
|
||||
adapter -> adapter
|
||||
));
|
||||
|
||||
// 병렬 배포 실행
|
||||
List<CompletableFuture<ChannelDistributionResult>> futures = request.getChannels().stream()
|
||||
.map(channelType -> {
|
||||
ChannelAdapter adapter = adapterMap.get(channelType.name());
|
||||
if (adapter == null) {
|
||||
log.warn("No adapter found for channel: {}", channelType);
|
||||
return CompletableFuture.completedFuture(
|
||||
ChannelDistributionResult.builder()
|
||||
.channel(channelType)
|
||||
.success(false)
|
||||
.errorMessage("Adapter not found")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
// 비동기 실행
|
||||
return CompletableFuture.supplyAsync(
|
||||
() -> adapter.distribute(request),
|
||||
executorService
|
||||
);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// 모든 배포 완료 대기
|
||||
CompletableFuture<Void> allOf = CompletableFuture.allOf(
|
||||
futures.toArray(new CompletableFuture[0])
|
||||
);
|
||||
|
||||
allOf.join(); // 블로킹 대기 (최대 1분 목표)
|
||||
|
||||
// 결과 수집
|
||||
List<ChannelDistributionResult> results = futures.stream()
|
||||
.map(CompletableFuture::join)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
long totalExecutionTime = System.currentTimeMillis() - startTime;
|
||||
LocalDateTime completedAt = LocalDateTime.now();
|
||||
|
||||
// 성공/실패 카운트
|
||||
long successCount = results.stream().filter(ChannelDistributionResult::isSuccess).count();
|
||||
long failureCount = results.size() - successCount;
|
||||
|
||||
log.info("Multi-channel distribution completed: eventId={}, successCount={}, failureCount={}, totalTime={}ms",
|
||||
request.getEventId(), successCount, failureCount, totalExecutionTime);
|
||||
|
||||
// 배포 완료 상태 저장 (COMPLETED/PARTIAL_FAILURE/FAILED)
|
||||
saveCompletedStatus(request.getEventId(), results, startedAt, completedAt, successCount, failureCount);
|
||||
|
||||
// Kafka Event 발행
|
||||
publishDistributionCompletedEvent(request.getEventId(), results);
|
||||
|
||||
// 응답 생성
|
||||
return DistributionResponse.builder()
|
||||
.eventId(request.getEventId())
|
||||
.success(successCount > 0) // 1개 이상 성공하면 성공으로 간주
|
||||
.channelResults(results)
|
||||
.successCount((int) successCount)
|
||||
.failureCount((int) failureCount)
|
||||
.completedAt(completedAt)
|
||||
.totalExecutionTimeMs(totalExecutionTime)
|
||||
.message(String.format("Distribution completed: %d succeeded, %d failed",
|
||||
successCount, failureCount))
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 배포 상태 조회
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @return 배포 상태
|
||||
*/
|
||||
public DistributionStatusResponse getDistributionStatus(String eventId) {
|
||||
return statusRepository.findByEventId(eventId)
|
||||
.orElse(DistributionStatusResponse.builder()
|
||||
.eventId(eventId)
|
||||
.overallStatus("NOT_FOUND")
|
||||
.channels(List.of())
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* 배포 시작 상태 저장 (IN_PROGRESS)
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @param channels 배포 채널 목록
|
||||
* @param startedAt 시작 시각
|
||||
*/
|
||||
private void saveInProgressStatus(String eventId, List<ChannelType> channels, LocalDateTime startedAt) {
|
||||
List<ChannelStatus> channelStatuses = channels.stream()
|
||||
.map(channelType -> ChannelStatus.builder()
|
||||
.channel(channelType)
|
||||
.status("PENDING")
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
DistributionStatusResponse status = DistributionStatusResponse.builder()
|
||||
.eventId(eventId)
|
||||
.overallStatus("IN_PROGRESS")
|
||||
.startedAt(startedAt)
|
||||
.channels(channelStatuses)
|
||||
.build();
|
||||
|
||||
statusRepository.save(eventId, status);
|
||||
}
|
||||
|
||||
/**
|
||||
* 배포 완료 상태 저장
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @param results 배포 결과
|
||||
* @param startedAt 시작 시각
|
||||
* @param completedAt 완료 시각
|
||||
* @param successCount 성공 개수
|
||||
* @param failureCount 실패 개수
|
||||
*/
|
||||
private void saveCompletedStatus(String eventId, List<ChannelDistributionResult> results,
|
||||
LocalDateTime startedAt, LocalDateTime completedAt,
|
||||
long successCount, long failureCount) {
|
||||
// 전체 상태 결정
|
||||
String overallStatus;
|
||||
if (successCount == 0) {
|
||||
overallStatus = "FAILED";
|
||||
} else if (failureCount == 0) {
|
||||
overallStatus = "COMPLETED";
|
||||
} else {
|
||||
overallStatus = "PARTIAL_FAILURE";
|
||||
}
|
||||
|
||||
// ChannelDistributionResult → ChannelStatus 변환
|
||||
List<ChannelStatus> channelStatuses = results.stream()
|
||||
.map(this::convertToChannelStatus)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
DistributionStatusResponse status = DistributionStatusResponse.builder()
|
||||
.eventId(eventId)
|
||||
.overallStatus(overallStatus)
|
||||
.startedAt(startedAt)
|
||||
.completedAt(completedAt)
|
||||
.channels(channelStatuses)
|
||||
.build();
|
||||
|
||||
statusRepository.save(eventId, status);
|
||||
}
|
||||
|
||||
/**
|
||||
* ChannelDistributionResult를 ChannelStatus로 변환
|
||||
*
|
||||
* @param result 배포 결과
|
||||
* @return 채널 상태
|
||||
*/
|
||||
private ChannelStatus convertToChannelStatus(ChannelDistributionResult result) {
|
||||
return ChannelStatus.builder()
|
||||
.channel(result.getChannel())
|
||||
.status(result.isSuccess() ? "COMPLETED" : "FAILED")
|
||||
.distributionId(result.getDistributionId())
|
||||
.estimatedViews(result.getEstimatedReach())
|
||||
.completedAt(LocalDateTime.now())
|
||||
.errorMessage(result.getErrorMessage())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* DistributionCompleted 이벤트 발행
|
||||
*
|
||||
* @param eventId 이벤트 ID
|
||||
* @param results 채널별 배포 결과
|
||||
*/
|
||||
private void publishDistributionCompletedEvent(String eventId, List<ChannelDistributionResult> results) {
|
||||
if (kafkaEventPublisher.isEmpty()) {
|
||||
log.warn("KafkaEventPublisher not available - skipping event publishing");
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> distributedChannels = results.stream()
|
||||
.filter(ChannelDistributionResult::isSuccess)
|
||||
.map(result -> result.getChannel().name())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
long successCount = results.stream().filter(ChannelDistributionResult::isSuccess).count();
|
||||
long failureCount = results.size() - successCount;
|
||||
|
||||
DistributionCompletedEvent event = DistributionCompletedEvent.builder()
|
||||
.eventId(eventId)
|
||||
.distributedChannels(distributedChannels)
|
||||
.completedAt(LocalDateTime.now())
|
||||
.successCount((int) successCount)
|
||||
.failureCount((int) failureCount)
|
||||
.build();
|
||||
|
||||
kafkaEventPublisher.get().publishDistributionCompleted(event);
|
||||
}
|
||||
}
|
||||
+62
@@ -0,0 +1,62 @@
|
||||
package com.kt.distribution.service;
|
||||
|
||||
import com.kt.distribution.event.DistributionCompletedEvent;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Kafka Event Publisher
|
||||
* DistributionCompleted 이벤트를 Kafka로 발행
|
||||
*
|
||||
* @author System Architect
|
||||
* @since 2025-10-23
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@ConditionalOnProperty(name = "spring.kafka.enabled", havingValue = "true", matchIfMissing = true)
|
||||
@RequiredArgsConstructor
|
||||
public class KafkaEventPublisher {
|
||||
|
||||
private final KafkaTemplate<String, Object> kafkaTemplate;
|
||||
|
||||
@Value("${kafka.topics.distribution-completed}")
|
||||
private String distributionCompletedTopic;
|
||||
|
||||
/**
|
||||
* 배포 완료 이벤트 발행
|
||||
*
|
||||
* @param event DistributionCompletedEvent
|
||||
*/
|
||||
public void publishDistributionCompleted(DistributionCompletedEvent event) {
|
||||
try {
|
||||
log.info("Publishing DistributionCompletedEvent: eventId={}, successCount={}, failureCount={}",
|
||||
event.getEventId(), event.getSuccessCount(), event.getFailureCount());
|
||||
|
||||
CompletableFuture<SendResult<String, Object>> future =
|
||||
kafkaTemplate.send(distributionCompletedTopic, event.getEventId(), event);
|
||||
|
||||
future.whenComplete((result, ex) -> {
|
||||
if (ex == null) {
|
||||
log.info("DistributionCompletedEvent published successfully: topic={}, partition={}, offset={}",
|
||||
distributionCompletedTopic,
|
||||
result.getRecordMetadata().partition(),
|
||||
result.getRecordMetadata().offset());
|
||||
} else {
|
||||
log.error("Failed to publish DistributionCompletedEvent: eventId={}, error={}",
|
||||
event.getEventId(), ex.getMessage(), ex);
|
||||
}
|
||||
});
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Error publishing DistributionCompletedEvent: eventId={}, error={}",
|
||||
event.getEventId(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user