From 5cd481679186a4829b59f3adfcc1f30b8b64041a Mon Sep 17 00:00:00 2001 From: bjkim Date: Mon, 29 Sep 2025 15:15:18 +0900 Subject: [PATCH] =?UTF-8?q?[ADD]=20Caffeine=20=EA=B8=B0=EB=B0=98=20?= =?UTF-8?q?=EC=BA=90=EC=8B=9C=20=EA=B5=AC=EC=84=B1=20=EC=B6=94=EA=B0=80=20?= =?UTF-8?q?=EB=B0=8F=20KubeflowRunBatchConfig=EC=97=90=20=EC=BA=90?= =?UTF-8?q?=EC=8B=B1=20=EB=A1=9C=EC=A7=81=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle.kts | 3 +- .../batch/KubeflowRunBatchConfig.java | 57 +++++++++++++++---- .../re/etri/autoflow/common/CacheConfig.java | 27 +++++++++ 3 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 src/main/java/kr/re/etri/autoflow/common/CacheConfig.java diff --git a/build.gradle.kts b/build.gradle.kts index 3707146..129e90a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -69,7 +69,8 @@ dependencies { implementation("io.minio:minio:8.5.17") -} + implementation("org.springframework.boot:spring-boot-starter-cache") // 캐시 지원 + implementation("com.github.ben-manes.caffeine:caffeine:3.2.2")} // Java 컴파일 인코딩 및 파라미터 리플렉션 지원 tasks.withType { diff --git a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java index acd0697..d97da3b 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java +++ b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java @@ -5,6 +5,7 @@ import kr.re.etri.autoflow.payload.request.KubeflowRunRequest; import kr.re.etri.autoflow.payload.response.KubeflowRunResponse; import kr.re.etri.autoflow.repository.KubeflowRunRepository; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; @@ -14,6 +15,8 @@ import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; @@ -22,8 +25,8 @@ import org.springframework.web.reactive.function.client.WebClient; import java.time.Instant; import java.util.List; -import java.util.Optional; +@Slf4j @Configuration @EnableBatchProcessing @RequiredArgsConstructor @@ -32,10 +35,9 @@ public class KubeflowRunBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final KubeflowRunRepository kubeflowRunRepository; + private final CacheManager cacheManager; - // 최신 데이터 몇 개만 가져올지 private static final int PAGE_SIZE = 10; - private static final String SORT_BY = "created_at DESC"; @Bean @@ -44,7 +46,7 @@ public class KubeflowRunBatchConfig { .baseUrl("http://192.168.10.135:32473") .exchangeStrategies(ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() - .maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼 + .maxInMemorySize(16 * 1024 * 1024)) .build()); } @@ -70,43 +72,67 @@ public class KubeflowRunBatchConfig { return new ItemReader<>() { private List runs = List.of(); private int index = 0; + private int apiCallCount = 0; + private int readCount = 0; @Override public KubeflowRunRequest read() { - // 페이지 끝나면 항상 최신 10개 fetch + readCount++; + + if (readCount > PAGE_SIZE * 2) { + log.info("Reader - 최대 {}건 읽어서 강제 종료", PAGE_SIZE * 2); + return null; // Step 종료 + } + if (index >= runs.size()) { + log.info("Reader - API 호출 시작 (지금까지 read 횟수: {})", readCount); + WebClient client = webClientBuilder().build(); KubeflowRunResponse response = client.get() .uri(uriBuilder -> uriBuilder .path("/apis/v2beta1/runs") .queryParam("page_size", PAGE_SIZE) - .queryParam("sort_by", "created_at desc") + .queryParam("sort_by", SORT_BY) .build()) .retrieve() .bodyToMono(KubeflowRunResponse.class) .block(); + apiCallCount++; runs = response != null ? response.getRuns() : List.of(); index = 0; - // 데이터가 없으면 Step 종료 + log.info("Reader - API 호출 완료 (총 호출 횟수: {}, 가져온 데이터 수: {})", apiCallCount, runs.size()); + if (runs.isEmpty()) { + log.info("Reader - 더 이상 가져올 데이터 없음 → Step 종료"); return null; } } - return runs.get(index++); + KubeflowRunRequest item = runs.get(index++); + log.debug("Reader - {}번째 아이템 반환: {}", readCount, item.getRun_id()); + return item; } }; } - @Bean public ItemProcessor runProcessor() { + Cache runIdCache = cacheManager.getCache("runIdCache"); + return dto -> { - // runId 기준 중복 체크 + log.debug("Processor - runId: {}", dto.getRun_id()); + + if (runIdCache.get(dto.getRun_id()) != null) { + log.debug("Processor - 이미 캐시에 존재 (skip)"); + return null; + } + if (kubeflowRunRepository.existsByRunId(dto.getRun_id())) { - return null; // 중복이면 skip + log.debug("Processor - DB에 이미 존재 (skip)"); + runIdCache.put(dto.getRun_id(), true); + return null; } KubeflowRunEntity entity = new KubeflowRunEntity(); @@ -122,12 +148,19 @@ public class KubeflowRunBatchConfig { entity.setScheduledAt(Instant.parse(dto.getScheduled_at())); entity.setFinishedAt(Instant.parse(dto.getFinished_at())); entity.setState(dto.getState()); + + runIdCache.put(dto.getRun_id(), true); + return entity; }; } @Bean public ItemWriter runWriter() { - return items -> kubeflowRunRepository.saveAll(items); + return items -> { + log.info("Writer - {}개 아이템 저장 시작", items.size()); + kubeflowRunRepository.saveAll(items); + log.info("Writer - 저장 완료"); + }; } } diff --git a/src/main/java/kr/re/etri/autoflow/common/CacheConfig.java b/src/main/java/kr/re/etri/autoflow/common/CacheConfig.java new file mode 100644 index 0000000..c6a5fc5 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/common/CacheConfig.java @@ -0,0 +1,27 @@ +package kr.re.etri.autoflow.common; + +import com.github.benmanes.caffeine.cache.Caffeine; +import org.springframework.cache.CacheManager; +import org.springframework.cache.caffeine.CaffeineCacheManager; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.TimeUnit; + +@Configuration +public class CacheConfig { + + @Bean + public Caffeine caffeineConfig() { + return Caffeine.newBuilder() + .maximumSize(10_000) // 최대 1만 개 + .expireAfterWrite(1, TimeUnit.HOURS); // 1시간 후 만료 + } + + @Bean + public CacheManager cacheManager(Caffeine caffeine) { + CaffeineCacheManager cacheManager = new CaffeineCacheManager("runIdCache"); + cacheManager.setCaffeine(caffeine); + return cacheManager; + } +}