Add: Azure Evnenthub및 redis 추가
This commit is contained in:
parent
8ef8fa9e95
commit
fbf1d9d6e0
@ -4,5 +4,5 @@ dependencies {
|
||||
// External API Integration
|
||||
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
|
||||
implementation 'org.springframework.boot:spring-boot-starter-webflux'
|
||||
|
||||
implementation 'com.azure:azure-messaging-eventhubs:5.15.0'
|
||||
}
|
||||
|
||||
@ -1,5 +1,9 @@
|
||||
package com.ktds.hi.store.biz.service;
|
||||
|
||||
import com.azure.messaging.eventhubs.EventData;
|
||||
import com.azure.messaging.eventhubs.EventHubClientBuilder;
|
||||
import com.azure.messaging.eventhubs.EventHubProducerClient;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.ktds.hi.store.biz.usecase.in.ExternalIntegrationUseCase;
|
||||
import com.ktds.hi.store.biz.usecase.out.ExternalPlatformPort;
|
||||
import com.ktds.hi.store.biz.usecase.out.EventPort;
|
||||
@ -11,9 +15,13 @@ import com.ktds.hi.common.exception.BusinessException;
|
||||
import com.ktds.hi.common.exception.ExternalServiceException;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 외부 연동 인터랙터 클래스
|
||||
* 외부 플랫폼 연동 비즈니스 로직을 구현
|
||||
@ -26,6 +34,8 @@ public class ExternalIntegrationInteractor implements ExternalIntegrationUseCase
|
||||
|
||||
private final ExternalPlatformPort externalPlatformPort;
|
||||
private final EventPort eventPort;
|
||||
private final RedisTemplate<String, Object> redisTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Override
|
||||
public ExternalSyncResponse syncReviews(Long storeId, ExternalSyncRequest request) {
|
||||
@ -223,11 +233,110 @@ public class ExternalIntegrationInteractor implements ExternalIntegrationUseCase
|
||||
|
||||
private void publishSyncEvent(Long storeId, String platform, int syncedCount) {
|
||||
try {
|
||||
// 동기화 이벤트 발행 로직
|
||||
log.info("동기화 이벤트 발행: storeId={}, platform={}, syncedCount={}",
|
||||
// 기존 Event Hub 설정 그대로 유지 ⭐
|
||||
EventHubProducerClient producer = new EventHubClientBuilder()
|
||||
.connectionString(System.getenv("EVENTHUB_CONNECTION_STRING"), "review-sync")
|
||||
.buildProducerClient();
|
||||
|
||||
// Redis에서 실제 리뷰 데이터 조회
|
||||
Map<String, Object> eventPayload = createEventPayloadFromRedis(storeId, platform, syncedCount);
|
||||
String payloadJson = objectMapper.writeValueAsString(eventPayload);
|
||||
|
||||
EventData eventData = new EventData(payloadJson);
|
||||
|
||||
// 메타데이터 추가
|
||||
eventData.getProperties().put("storeId", storeId.toString());
|
||||
eventData.getProperties().put("platform", platform);
|
||||
eventData.getProperties().put("eventType", "EXTERNAL_REVIEW_SYNC");
|
||||
|
||||
producer.send(Arrays.asList(eventData));
|
||||
|
||||
// 성공 시 Redis에서 해당 데이터 삭제 또는 상태 변경
|
||||
markAsProcessedInRedis(storeId, platform);
|
||||
|
||||
log.info("동기화 이벤트 발행 완료: storeId={}, platform={}, syncedCount={}",
|
||||
storeId, platform, syncedCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("동기화 이벤트 발행 실패: {}", e.getMessage());
|
||||
log.error("동기화 이벤트 발행 실패: storeId={}, platform={}, error={}",
|
||||
storeId, platform, e.getMessage(), e);
|
||||
|
||||
// 실패 시 재시도 큐로 이동
|
||||
moveToRetryQueue(storeId, platform, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Redis에서 이벤트 페이로드 생성 (새로 추가)
|
||||
*/
|
||||
private Map<String, Object> createEventPayloadFromRedis(Long storeId, String platform, int syncedCount) {
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("eventType", "EXTERNAL_REVIEW_SYNC");
|
||||
payload.put("storeId", storeId);
|
||||
payload.put("platform", platform);
|
||||
payload.put("syncedCount", syncedCount);
|
||||
payload.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
// Redis에서 실제 리뷰 데이터 조회
|
||||
List<Map<String, Object>> reviews = externalPlatformPort.getTempReviews(storeId, platform);
|
||||
payload.put("reviews", reviews);
|
||||
|
||||
return payload;
|
||||
}
|
||||
|
||||
/**
|
||||
* Redis에서 처리 완료 표시 (새로 추가)
|
||||
*/
|
||||
private void markAsProcessedInRedis(Long storeId, String platform) {
|
||||
try {
|
||||
String pattern = String.format("external:reviews:pending:%d:%s:*", storeId, platform);
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
|
||||
if (keys != null) {
|
||||
for (String key : keys) {
|
||||
redisTemplate.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Redis에서 처리 완료된 데이터 삭제: storeId={}, platform={}", storeId, platform);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("Redis 데이터 정리 실패: storeId={}, platform={}", storeId, platform);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 재시도 큐로 이동 (새로 추가)
|
||||
*/
|
||||
private void moveToRetryQueue(Long storeId, String platform, String errorMessage) {
|
||||
try {
|
||||
String pattern = String.format("external:reviews:pending:%d:%s:*", storeId, platform);
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
String pendingKey = keys.iterator().next();
|
||||
Map<String, Object> cacheData = (Map<String, Object>) redisTemplate.opsForValue().get(pendingKey);
|
||||
|
||||
if (cacheData != null) {
|
||||
// 재시도 횟수 증가
|
||||
Integer retryCount = (Integer) cacheData.getOrDefault("retryCount", 0);
|
||||
cacheData.put("retryCount", retryCount + 1);
|
||||
cacheData.put("lastError", errorMessage);
|
||||
cacheData.put("status", "RETRY");
|
||||
|
||||
// 재시도 큐로 이동
|
||||
String retryKey = pendingKey.replace("pending", "retry");
|
||||
redisTemplate.opsForValue().set(retryKey, cacheData, Duration.ofHours(12));
|
||||
|
||||
// 원본 삭제
|
||||
redisTemplate.delete(pendingKey);
|
||||
|
||||
log.info("재시도 큐로 이동: retryKey={}, retryCount={}", retryKey, retryCount + 1);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("재시도 큐 이동 실패: storeId={}, platform={}", storeId, platform);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,9 @@ import com.ktds.hi.store.infra.dto.ExternalSyncResponse;
|
||||
import com.ktds.hi.store.infra.dto.ExternalConnectRequest;
|
||||
import com.ktds.hi.store.infra.dto.ExternalConnectResponse;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 외부 연동 유스케이스 인터페이스
|
||||
* 외부 플랫폼 연동 관련 비즈니스 로직을 정의
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
package com.ktds.hi.store.biz.usecase.out;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 외부 플랫폼 포트 인터페이스
|
||||
* 외부 플랫폼 연동 기능을 정의
|
||||
@ -93,4 +96,6 @@ public interface ExternalPlatformPort {
|
||||
* @return 연동 해제 성공 여부
|
||||
*/
|
||||
boolean disconnectPlatform(Long storeId, String platform);
|
||||
|
||||
public List<Map<String, Object>> getTempReviews(Long storeId, String platform);
|
||||
}
|
||||
@ -0,0 +1,42 @@
|
||||
package com.ktds.hi.store.infra.config;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
|
||||
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
||||
|
||||
@Configuration
|
||||
public class RedisConfig {
|
||||
|
||||
@Value("${data.redis.host}")
|
||||
private String redisHost;
|
||||
|
||||
@Value("${data.redis.port}")
|
||||
private int redisPort;
|
||||
|
||||
@Value("${data.redis.password}")
|
||||
private String redisPassword;
|
||||
|
||||
@Bean
|
||||
public RedisConnectionFactory redisConnectionFactory() {
|
||||
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisHost, redisPort);
|
||||
if (!redisPassword.isEmpty()) {
|
||||
factory.setPassword(redisPassword);
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RedisTemplate<String, Object> redisTemplate() {
|
||||
RedisTemplate<String, Object> template = new RedisTemplate<>();
|
||||
template.setConnectionFactory(redisConnectionFactory());
|
||||
template.setDefaultSerializer(new GenericJackson2JsonRedisSerializer());
|
||||
template.setKeySerializer(new StringRedisSerializer());
|
||||
return template;
|
||||
}
|
||||
}
|
||||
@ -1,13 +1,22 @@
|
||||
package com.ktds.hi.store.infra.gateway;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.ktds.hi.store.biz.usecase.out.ExternalPlatformPort;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 외부 플랫폼 어댑터 클래스
|
||||
* External Platform Port를 구현하여 외부 API 연동 기능을 제공
|
||||
@ -18,6 +27,8 @@ import org.springframework.web.client.RestTemplate;
|
||||
public class ExternalPlatformAdapter implements ExternalPlatformPort {
|
||||
|
||||
private final RestTemplate restTemplate;
|
||||
private final RedisTemplate<String, Object> redisTemplate;
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
@Value("${external-api.naver.client-id:}")
|
||||
private String naverClientId;
|
||||
@ -64,14 +75,23 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort {
|
||||
log.info("카카오 리뷰 동기화 시작: storeId={}, externalStoreId={}", storeId, externalStoreId);
|
||||
|
||||
try {
|
||||
// 카카오 API 호출 (Mock)
|
||||
// 기존 API 호출 로직 그대로 유지 ⭐
|
||||
String url = "http://kakao-review-api.20.249.191.180.nip.io/analyze";
|
||||
|
||||
Map<String, Object> requestBody = new HashMap<>();
|
||||
requestBody.put("store_id", storeId);
|
||||
requestBody.put("days_limit", 360);
|
||||
requestBody.put("max_time", 300);
|
||||
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.set("Authorization", "KakaoAK " + kakaoApiKey);
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
|
||||
// Mock 응답
|
||||
int syncedCount = 12;
|
||||
HttpEntity<Map<String, Object>> entity = new HttpEntity<>(requestBody, headers);
|
||||
|
||||
ResponseEntity<String> response = restTemplate.postForEntity(url, entity, String.class);
|
||||
|
||||
int syncedCount = parseAndStoreToRedis(storeId, "KAKAO", response.getBody());
|
||||
|
||||
log.info("카카오 리뷰 동기화 완료: storeId={}, syncedCount={}", storeId, syncedCount);
|
||||
return syncedCount;
|
||||
|
||||
} catch (Exception e) {
|
||||
@ -80,6 +100,8 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public int syncGoogleReviews(Long storeId, String externalStoreId) {
|
||||
log.info("구글 리뷰 동기화 시작: storeId={}, externalStoreId={}", storeId, externalStoreId);
|
||||
@ -284,11 +306,28 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort {
|
||||
}
|
||||
|
||||
/**
|
||||
* 외부 연동 정보 저장
|
||||
* 외부 연동 정보 저장 Redis 활용
|
||||
*/
|
||||
private void saveExternalConnection(Long storeId, String platform, String username) {
|
||||
// 실제로는 ExternalPlatformEntity에 연동 정보 저장
|
||||
log.info("외부 연동 정보 저장: storeId={}, platform={}, username={}", storeId, platform, username);
|
||||
try {
|
||||
String connectionKey = String.format("external:connection:%d:%s", storeId, platform);
|
||||
|
||||
Map<String, Object> connectionData = new HashMap<>();
|
||||
connectionData.put("storeId", storeId);
|
||||
connectionData.put("platform", platform);
|
||||
connectionData.put("username", username);
|
||||
connectionData.put("connectedAt", System.currentTimeMillis());
|
||||
connectionData.put("isActive", true);
|
||||
|
||||
redisTemplate.opsForValue().set(connectionKey, connectionData, Duration.ofDays(30));
|
||||
|
||||
log.info("외부 연동 정보 Redis 저장 완료: storeId={}, platform={}, username={}",
|
||||
storeId, platform, username);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("외부 연동 정보 저장 실패: storeId={}, platform={}, error={}",
|
||||
storeId, platform, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -298,4 +337,115 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort {
|
||||
// 실제로는 ExternalPlatformEntity에서 연동 정보 제거
|
||||
log.info("외부 연동 정보 제거: storeId={}, platform={}", storeId, platform);
|
||||
}
|
||||
|
||||
/**
|
||||
* 카카오 응답 파싱 및 Redis 저장 (새로 추가)
|
||||
*/
|
||||
private int parseAndStoreToRedis(Long storeId, String platform, String responseBody) {
|
||||
try {
|
||||
log.info("카카오 API 응답: {}", responseBody);
|
||||
|
||||
// JSON 파싱
|
||||
JsonNode rootNode = objectMapper.readTree(responseBody);
|
||||
JsonNode reviewsNode = rootNode.get("reviews");
|
||||
|
||||
if (reviewsNode == null || !reviewsNode.isArray()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// 리뷰 데이터 변환
|
||||
List<Map<String, Object>> parsedReviews = new ArrayList<>();
|
||||
for (JsonNode reviewNode : reviewsNode) {
|
||||
Map<String, Object> review = new HashMap<>();
|
||||
review.put("reviewId", reviewNode.get("review_id").asText());
|
||||
review.put("content", reviewNode.get("content").asText());
|
||||
review.put("rating", reviewNode.get("rating").asInt());
|
||||
review.put("authorName", reviewNode.get("author_name").asText());
|
||||
review.put("reviewDate", reviewNode.get("review_date").asText());
|
||||
review.put("platform", platform);
|
||||
parsedReviews.add(review);
|
||||
}
|
||||
|
||||
if (!parsedReviews.isEmpty()) {
|
||||
// Redis에 저장 (TTL: 24시간)
|
||||
String redisKey = String.format("external:reviews:pending:%d:%s:%d",
|
||||
storeId, platform, System.currentTimeMillis());
|
||||
|
||||
Map<String, Object> cacheData = new HashMap<>();
|
||||
cacheData.put("storeId", storeId);
|
||||
cacheData.put("platform", platform);
|
||||
cacheData.put("reviews", parsedReviews);
|
||||
cacheData.put("status", "PENDING");
|
||||
cacheData.put("createdAt", System.currentTimeMillis());
|
||||
cacheData.put("retryCount", 0);
|
||||
|
||||
redisTemplate.opsForValue().set(redisKey, cacheData, Duration.ofHours(24));
|
||||
|
||||
log.info("Redis에 리뷰 데이터 저장 완료: key={}, count={}", redisKey, parsedReviews.size());
|
||||
|
||||
// 동기화 상태 업데이트
|
||||
updateSyncStatus(storeId, platform, "SUCCESS", parsedReviews.size());
|
||||
}
|
||||
|
||||
return parsedReviews.size();
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("카카오 응답 파싱 및 Redis 저장 실패: {}", e.getMessage());
|
||||
updateSyncStatus(storeId, platform, "FAILED", 0);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 동기화 상태 Redis에 저장
|
||||
*/
|
||||
private void updateSyncStatus(Long storeId, String platform, String status, int count) {
|
||||
try {
|
||||
String statusKey = String.format("external:sync:status:%d:%s", storeId, platform);
|
||||
|
||||
Map<String, Object> statusData = new HashMap<>();
|
||||
statusData.put("storeId", storeId);
|
||||
statusData.put("platform", platform);
|
||||
statusData.put("status", status);
|
||||
statusData.put("syncedCount", count);
|
||||
statusData.put("timestamp", System.currentTimeMillis());
|
||||
|
||||
redisTemplate.opsForValue().set(statusKey, statusData, Duration.ofDays(1));
|
||||
|
||||
} catch (Exception e) {
|
||||
log.warn("동기화 상태 저장 실패: storeId={}, platform={}", storeId, platform);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Map<String, Object>> getTempReviews(Long storeId, String platform) {
|
||||
try {
|
||||
// Redis에서 최신 pending 데이터 조회
|
||||
String pattern = String.format("external:reviews:pending:%d:%s:*", storeId, platform);
|
||||
Set<String> keys = redisTemplate.keys(pattern);
|
||||
|
||||
if (keys != null && !keys.isEmpty()) {
|
||||
// 가장 최신 키 선택 (타임스탬프 기준)
|
||||
String latestKey = keys.stream()
|
||||
.max(Comparator.comparing(key -> Long.parseLong(key.substring(key.lastIndexOf(':') + 1))))
|
||||
.orElse(null);
|
||||
|
||||
if (latestKey != null) {
|
||||
Map<String, Object> cacheData = (Map<String, Object>) redisTemplate.opsForValue().get(latestKey);
|
||||
if (cacheData != null) {
|
||||
return (List<Map<String, Object>>) cacheData.get("reviews");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ArrayList<>();
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Redis에서 임시 리뷰 데이터 조회 실패: storeId={}, platform={}", storeId, platform);
|
||||
return new ArrayList<>();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user