From fbf1d9d6e09c78e60688da0ec3333d19b7145830 Mon Sep 17 00:00:00 2001 From: UNGGU0704 Date: Tue, 17 Jun 2025 10:28:25 +0900 Subject: [PATCH] =?UTF-8?q?Add:=20Azure=20Evnenthub=EB=B0=8F=20redis=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- store/build.gradle | 2 +- .../ExternalIntegrationInteractor.java | 115 +++++++++++- .../in/ExternalIntegrationUseCase.java | 3 + .../biz/usecase/out/ExternalPlatformPort.java | 5 + .../hi/store/infra/config/RedisConfig.java | 42 +++++ .../gateway/ExternalPlatformAdapter.java | 166 +++++++++++++++++- 6 files changed, 321 insertions(+), 12 deletions(-) create mode 100644 store/src/main/java/com/ktds/hi/store/infra/config/RedisConfig.java diff --git a/store/build.gradle b/store/build.gradle index 239c9b6..0bdee20 100644 --- a/store/build.gradle +++ b/store/build.gradle @@ -4,5 +4,5 @@ dependencies { // External API Integration implementation 'org.springframework.boot:spring-boot-starter-data-redis' implementation 'org.springframework.boot:spring-boot-starter-webflux' - + implementation 'com.azure:azure-messaging-eventhubs:5.15.0' } diff --git a/store/src/main/java/com/ktds/hi/store/biz/service/ExternalIntegrationInteractor.java b/store/src/main/java/com/ktds/hi/store/biz/service/ExternalIntegrationInteractor.java index 0fce318..a08fc0f 100644 --- a/store/src/main/java/com/ktds/hi/store/biz/service/ExternalIntegrationInteractor.java +++ b/store/src/main/java/com/ktds/hi/store/biz/service/ExternalIntegrationInteractor.java @@ -1,5 +1,9 @@ package com.ktds.hi.store.biz.service; +import com.azure.messaging.eventhubs.EventData; +import com.azure.messaging.eventhubs.EventHubClientBuilder; +import com.azure.messaging.eventhubs.EventHubProducerClient; +import com.fasterxml.jackson.databind.ObjectMapper; import com.ktds.hi.store.biz.usecase.in.ExternalIntegrationUseCase; import com.ktds.hi.store.biz.usecase.out.ExternalPlatformPort; import com.ktds.hi.store.biz.usecase.out.EventPort; @@ -11,9 +15,13 @@ import com.ktds.hi.common.exception.BusinessException; import com.ktds.hi.common.exception.ExternalServiceException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.time.Duration; +import java.util.*; + /** * 외부 연동 인터랙터 클래스 * 외부 플랫폼 연동 비즈니스 로직을 구현 @@ -26,6 +34,8 @@ public class ExternalIntegrationInteractor implements ExternalIntegrationUseCase private final ExternalPlatformPort externalPlatformPort; private final EventPort eventPort; + private final RedisTemplate redisTemplate; + private final ObjectMapper objectMapper; @Override public ExternalSyncResponse syncReviews(Long storeId, ExternalSyncRequest request) { @@ -223,11 +233,110 @@ public class ExternalIntegrationInteractor implements ExternalIntegrationUseCase private void publishSyncEvent(Long storeId, String platform, int syncedCount) { try { - // 동기화 이벤트 발행 로직 - log.info("동기화 이벤트 발행: storeId={}, platform={}, syncedCount={}", + // 기존 Event Hub 설정 그대로 유지 ⭐ + EventHubProducerClient producer = new EventHubClientBuilder() + .connectionString(System.getenv("EVENTHUB_CONNECTION_STRING"), "review-sync") + .buildProducerClient(); + + // Redis에서 실제 리뷰 데이터 조회 + Map eventPayload = createEventPayloadFromRedis(storeId, platform, syncedCount); + String payloadJson = objectMapper.writeValueAsString(eventPayload); + + EventData eventData = new EventData(payloadJson); + + // 메타데이터 추가 + eventData.getProperties().put("storeId", storeId.toString()); + eventData.getProperties().put("platform", platform); + eventData.getProperties().put("eventType", "EXTERNAL_REVIEW_SYNC"); + + producer.send(Arrays.asList(eventData)); + + // 성공 시 Redis에서 해당 데이터 삭제 또는 상태 변경 + markAsProcessedInRedis(storeId, platform); + + log.info("동기화 이벤트 발행 완료: storeId={}, platform={}, syncedCount={}", storeId, platform, syncedCount); + } catch (Exception e) { - log.warn("동기화 이벤트 발행 실패: {}", e.getMessage()); + log.error("동기화 이벤트 발행 실패: storeId={}, platform={}, error={}", + storeId, platform, e.getMessage(), e); + + // 실패 시 재시도 큐로 이동 + moveToRetryQueue(storeId, platform, e.getMessage()); + } + } + + /** + * Redis에서 이벤트 페이로드 생성 (새로 추가) + */ + private Map createEventPayloadFromRedis(Long storeId, String platform, int syncedCount) { + Map payload = new HashMap<>(); + payload.put("eventType", "EXTERNAL_REVIEW_SYNC"); + payload.put("storeId", storeId); + payload.put("platform", platform); + payload.put("syncedCount", syncedCount); + payload.put("timestamp", System.currentTimeMillis()); + + // Redis에서 실제 리뷰 데이터 조회 + List> reviews = externalPlatformPort.getTempReviews(storeId, platform); + payload.put("reviews", reviews); + + return payload; + } + + /** + * Redis에서 처리 완료 표시 (새로 추가) + */ + private void markAsProcessedInRedis(Long storeId, String platform) { + try { + String pattern = String.format("external:reviews:pending:%d:%s:*", storeId, platform); + Set keys = redisTemplate.keys(pattern); + + if (keys != null) { + for (String key : keys) { + redisTemplate.delete(key); + } + } + + log.info("Redis에서 처리 완료된 데이터 삭제: storeId={}, platform={}", storeId, platform); + + } catch (Exception e) { + log.warn("Redis 데이터 정리 실패: storeId={}, platform={}", storeId, platform); + } + } + + /** + * 재시도 큐로 이동 (새로 추가) + */ + private void moveToRetryQueue(Long storeId, String platform, String errorMessage) { + try { + String pattern = String.format("external:reviews:pending:%d:%s:*", storeId, platform); + Set keys = redisTemplate.keys(pattern); + + if (keys != null && !keys.isEmpty()) { + String pendingKey = keys.iterator().next(); + Map cacheData = (Map) redisTemplate.opsForValue().get(pendingKey); + + if (cacheData != null) { + // 재시도 횟수 증가 + Integer retryCount = (Integer) cacheData.getOrDefault("retryCount", 0); + cacheData.put("retryCount", retryCount + 1); + cacheData.put("lastError", errorMessage); + cacheData.put("status", "RETRY"); + + // 재시도 큐로 이동 + String retryKey = pendingKey.replace("pending", "retry"); + redisTemplate.opsForValue().set(retryKey, cacheData, Duration.ofHours(12)); + + // 원본 삭제 + redisTemplate.delete(pendingKey); + + log.info("재시도 큐로 이동: retryKey={}, retryCount={}", retryKey, retryCount + 1); + } + } + + } catch (Exception e) { + log.error("재시도 큐 이동 실패: storeId={}, platform={}", storeId, platform); } } diff --git a/store/src/main/java/com/ktds/hi/store/biz/usecase/in/ExternalIntegrationUseCase.java b/store/src/main/java/com/ktds/hi/store/biz/usecase/in/ExternalIntegrationUseCase.java index 756c5e5..cc12616 100644 --- a/store/src/main/java/com/ktds/hi/store/biz/usecase/in/ExternalIntegrationUseCase.java +++ b/store/src/main/java/com/ktds/hi/store/biz/usecase/in/ExternalIntegrationUseCase.java @@ -5,6 +5,9 @@ import com.ktds.hi.store.infra.dto.ExternalSyncResponse; import com.ktds.hi.store.infra.dto.ExternalConnectRequest; import com.ktds.hi.store.infra.dto.ExternalConnectResponse; +import java.util.List; +import java.util.Map; + /** * 외부 연동 유스케이스 인터페이스 * 외부 플랫폼 연동 관련 비즈니스 로직을 정의 diff --git a/store/src/main/java/com/ktds/hi/store/biz/usecase/out/ExternalPlatformPort.java b/store/src/main/java/com/ktds/hi/store/biz/usecase/out/ExternalPlatformPort.java index 570d06d..0a3c8f2 100644 --- a/store/src/main/java/com/ktds/hi/store/biz/usecase/out/ExternalPlatformPort.java +++ b/store/src/main/java/com/ktds/hi/store/biz/usecase/out/ExternalPlatformPort.java @@ -1,5 +1,8 @@ package com.ktds.hi.store.biz.usecase.out; +import java.util.List; +import java.util.Map; + /** * 외부 플랫폼 포트 인터페이스 * 외부 플랫폼 연동 기능을 정의 @@ -93,4 +96,6 @@ public interface ExternalPlatformPort { * @return 연동 해제 성공 여부 */ boolean disconnectPlatform(Long storeId, String platform); + + public List> getTempReviews(Long storeId, String platform); } \ No newline at end of file diff --git a/store/src/main/java/com/ktds/hi/store/infra/config/RedisConfig.java b/store/src/main/java/com/ktds/hi/store/infra/config/RedisConfig.java new file mode 100644 index 0000000..9e5b7de --- /dev/null +++ b/store/src/main/java/com/ktds/hi/store/infra/config/RedisConfig.java @@ -0,0 +1,42 @@ +package com.ktds.hi.store.infra.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +@Configuration +public class RedisConfig { + + @Value("${data.redis.host}") + private String redisHost; + + @Value("${data.redis.port}") + private int redisPort; + + @Value("${data.redis.password}") + private String redisPassword; + + @Bean + public RedisConnectionFactory redisConnectionFactory() { + LettuceConnectionFactory factory = new LettuceConnectionFactory(redisHost, redisPort); + if (!redisPassword.isEmpty()) { + factory.setPassword(redisPassword); + } + return factory; + } + + @Bean + public RedisTemplate redisTemplate() { + RedisTemplate template = new RedisTemplate<>(); + template.setConnectionFactory(redisConnectionFactory()); + template.setDefaultSerializer(new GenericJackson2JsonRedisSerializer()); + template.setKeySerializer(new StringRedisSerializer()); + return template; + } +} \ No newline at end of file diff --git a/store/src/main/java/com/ktds/hi/store/infra/gateway/ExternalPlatformAdapter.java b/store/src/main/java/com/ktds/hi/store/infra/gateway/ExternalPlatformAdapter.java index 9cfa7d8..4a04429 100644 --- a/store/src/main/java/com/ktds/hi/store/infra/gateway/ExternalPlatformAdapter.java +++ b/store/src/main/java/com/ktds/hi/store/infra/gateway/ExternalPlatformAdapter.java @@ -1,13 +1,22 @@ package com.ktds.hi.store.infra.gateway; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.ktds.hi.store.biz.usecase.out.ExternalPlatformPort; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import java.time.Duration; +import java.util.*; + /** * 외부 플랫폼 어댑터 클래스 * External Platform Port를 구현하여 외부 API 연동 기능을 제공 @@ -18,6 +27,8 @@ import org.springframework.web.client.RestTemplate; public class ExternalPlatformAdapter implements ExternalPlatformPort { private final RestTemplate restTemplate; + private final RedisTemplate redisTemplate; + private final ObjectMapper objectMapper; @Value("${external-api.naver.client-id:}") private String naverClientId; @@ -64,14 +75,23 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort { log.info("카카오 리뷰 동기화 시작: storeId={}, externalStoreId={}", storeId, externalStoreId); try { - // 카카오 API 호출 (Mock) + // 기존 API 호출 로직 그대로 유지 ⭐ + String url = "http://kakao-review-api.20.249.191.180.nip.io/analyze"; + + Map requestBody = new HashMap<>(); + requestBody.put("store_id", storeId); + requestBody.put("days_limit", 360); + requestBody.put("max_time", 300); + HttpHeaders headers = new HttpHeaders(); - headers.set("Authorization", "KakaoAK " + kakaoApiKey); + headers.setContentType(MediaType.APPLICATION_JSON); - // Mock 응답 - int syncedCount = 12; + HttpEntity> entity = new HttpEntity<>(requestBody, headers); + + ResponseEntity response = restTemplate.postForEntity(url, entity, String.class); + + int syncedCount = parseAndStoreToRedis(storeId, "KAKAO", response.getBody()); - log.info("카카오 리뷰 동기화 완료: storeId={}, syncedCount={}", storeId, syncedCount); return syncedCount; } catch (Exception e) { @@ -80,6 +100,8 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort { } } + + @Override public int syncGoogleReviews(Long storeId, String externalStoreId) { log.info("구글 리뷰 동기화 시작: storeId={}, externalStoreId={}", storeId, externalStoreId); @@ -284,11 +306,28 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort { } /** - * 외부 연동 정보 저장 + * 외부 연동 정보 저장 Redis 활용 */ private void saveExternalConnection(Long storeId, String platform, String username) { - // 실제로는 ExternalPlatformEntity에 연동 정보 저장 - log.info("외부 연동 정보 저장: storeId={}, platform={}, username={}", storeId, platform, username); + try { + String connectionKey = String.format("external:connection:%d:%s", storeId, platform); + + Map connectionData = new HashMap<>(); + connectionData.put("storeId", storeId); + connectionData.put("platform", platform); + connectionData.put("username", username); + connectionData.put("connectedAt", System.currentTimeMillis()); + connectionData.put("isActive", true); + + redisTemplate.opsForValue().set(connectionKey, connectionData, Duration.ofDays(30)); + + log.info("외부 연동 정보 Redis 저장 완료: storeId={}, platform={}, username={}", + storeId, platform, username); + + } catch (Exception e) { + log.error("외부 연동 정보 저장 실패: storeId={}, platform={}, error={}", + storeId, platform, e.getMessage()); + } } /** @@ -298,4 +337,115 @@ public class ExternalPlatformAdapter implements ExternalPlatformPort { // 실제로는 ExternalPlatformEntity에서 연동 정보 제거 log.info("외부 연동 정보 제거: storeId={}, platform={}", storeId, platform); } + + /** + * 카카오 응답 파싱 및 Redis 저장 (새로 추가) + */ + private int parseAndStoreToRedis(Long storeId, String platform, String responseBody) { + try { + log.info("카카오 API 응답: {}", responseBody); + + // JSON 파싱 + JsonNode rootNode = objectMapper.readTree(responseBody); + JsonNode reviewsNode = rootNode.get("reviews"); + + if (reviewsNode == null || !reviewsNode.isArray()) { + return 0; + } + + // 리뷰 데이터 변환 + List> parsedReviews = new ArrayList<>(); + for (JsonNode reviewNode : reviewsNode) { + Map review = new HashMap<>(); + review.put("reviewId", reviewNode.get("review_id").asText()); + review.put("content", reviewNode.get("content").asText()); + review.put("rating", reviewNode.get("rating").asInt()); + review.put("authorName", reviewNode.get("author_name").asText()); + review.put("reviewDate", reviewNode.get("review_date").asText()); + review.put("platform", platform); + parsedReviews.add(review); + } + + if (!parsedReviews.isEmpty()) { + // Redis에 저장 (TTL: 24시간) + String redisKey = String.format("external:reviews:pending:%d:%s:%d", + storeId, platform, System.currentTimeMillis()); + + Map cacheData = new HashMap<>(); + cacheData.put("storeId", storeId); + cacheData.put("platform", platform); + cacheData.put("reviews", parsedReviews); + cacheData.put("status", "PENDING"); + cacheData.put("createdAt", System.currentTimeMillis()); + cacheData.put("retryCount", 0); + + redisTemplate.opsForValue().set(redisKey, cacheData, Duration.ofHours(24)); + + log.info("Redis에 리뷰 데이터 저장 완료: key={}, count={}", redisKey, parsedReviews.size()); + + // 동기화 상태 업데이트 + updateSyncStatus(storeId, platform, "SUCCESS", parsedReviews.size()); + } + + return parsedReviews.size(); + + } catch (Exception e) { + log.error("카카오 응답 파싱 및 Redis 저장 실패: {}", e.getMessage()); + updateSyncStatus(storeId, platform, "FAILED", 0); + return 0; + } + } + + /** + * 동기화 상태 Redis에 저장 + */ + private void updateSyncStatus(Long storeId, String platform, String status, int count) { + try { + String statusKey = String.format("external:sync:status:%d:%s", storeId, platform); + + Map statusData = new HashMap<>(); + statusData.put("storeId", storeId); + statusData.put("platform", platform); + statusData.put("status", status); + statusData.put("syncedCount", count); + statusData.put("timestamp", System.currentTimeMillis()); + + redisTemplate.opsForValue().set(statusKey, statusData, Duration.ofDays(1)); + + } catch (Exception e) { + log.warn("동기화 상태 저장 실패: storeId={}, platform={}", storeId, platform); + } + } + + @Override + public List> getTempReviews(Long storeId, String platform) { + try { + // Redis에서 최신 pending 데이터 조회 + String pattern = String.format("external:reviews:pending:%d:%s:*", storeId, platform); + Set keys = redisTemplate.keys(pattern); + + if (keys != null && !keys.isEmpty()) { + // 가장 최신 키 선택 (타임스탬프 기준) + String latestKey = keys.stream() + .max(Comparator.comparing(key -> Long.parseLong(key.substring(key.lastIndexOf(':') + 1)))) + .orElse(null); + + if (latestKey != null) { + Map cacheData = (Map) redisTemplate.opsForValue().get(latestKey); + if (cacheData != null) { + return (List>) cacheData.get("reviews"); + } + } + } + + return new ArrayList<>(); + + } catch (Exception e) { + log.error("Redis에서 임시 리뷰 데이터 조회 실패: storeId={}, platform={}", storeId, platform); + return new ArrayList<>(); + } + } + + + } \ No newline at end of file