From 53c011a682aefdee9af72c809a5d0c592ed5c5fd Mon Sep 17 00:00:00 2001 From: bjkim Date: Mon, 29 Sep 2025 15:40:35 +0900 Subject: [PATCH] [REFACTOR] Remove unused `@Slf4j` annotation and simplify logging in `KubeflowRunBatchConfig`, streamline reader and processor logic --- .../batch/KubeflowRunBatchConfig.java | 57 +++++++------------ 1 file changed, 21 insertions(+), 36 deletions(-) diff --git a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java index d97da3b..7b5d245 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java +++ b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java @@ -5,7 +5,6 @@ import kr.re.etri.autoflow.payload.request.KubeflowRunRequest; import kr.re.etri.autoflow.payload.response.KubeflowRunResponse; import kr.re.etri.autoflow.repository.KubeflowRunRepository; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; @@ -25,8 +24,8 @@ import org.springframework.web.reactive.function.client.WebClient; import java.time.Instant; import java.util.List; +import java.util.Optional; -@Slf4j @Configuration @EnableBatchProcessing @RequiredArgsConstructor @@ -35,9 +34,12 @@ public class KubeflowRunBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final KubeflowRunRepository kubeflowRunRepository; - private final CacheManager cacheManager; + private final CacheManager cacheManager; // 주입 + + // 최신 데이터 몇 개만 가져올지 private static final int PAGE_SIZE = 10; + private static final String SORT_BY = "created_at DESC"; @Bean @@ -46,7 +48,7 @@ public class KubeflowRunBatchConfig { .baseUrl("http://192.168.10.135:32473") .exchangeStrategies(ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() - .maxInMemorySize(16 * 1024 * 1024)) + .maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼 .build()); } @@ -72,21 +74,11 @@ public class KubeflowRunBatchConfig { return new ItemReader<>() { private List runs = List.of(); private int index = 0; - private int apiCallCount = 0; - private int readCount = 0; @Override public KubeflowRunRequest read() { - readCount++; - - if (readCount > PAGE_SIZE * 2) { - log.info("Reader - 최대 {}건 읽어서 강제 종료", PAGE_SIZE * 2); - return null; // Step 종료 - } - + // 페이지 끝나면 항상 최신 10개 fetch if (index >= runs.size()) { - log.info("Reader - API 호출 시작 (지금까지 read 횟수: {})", readCount); - WebClient client = webClientBuilder().build(); KubeflowRunResponse response = client.get() .uri(uriBuilder -> uriBuilder @@ -98,21 +90,16 @@ public class KubeflowRunBatchConfig { .bodyToMono(KubeflowRunResponse.class) .block(); - apiCallCount++; runs = response != null ? response.getRuns() : List.of(); index = 0; - log.info("Reader - API 호출 완료 (총 호출 횟수: {}, 가져온 데이터 수: {})", apiCallCount, runs.size()); - + // 데이터가 없으면 Step 종료 if (runs.isEmpty()) { - log.info("Reader - 더 이상 가져올 데이터 없음 → Step 종료"); return null; } } - KubeflowRunRequest item = runs.get(index++); - log.debug("Reader - {}번째 아이템 반환: {}", readCount, item.getRun_id()); - return item; + return runs.get(index++); } }; } @@ -122,21 +109,22 @@ public class KubeflowRunBatchConfig { Cache runIdCache = cacheManager.getCache("runIdCache"); return dto -> { - log.debug("Processor - runId: {}", dto.getRun_id()); + String runId = dto.getRun_id(); - if (runIdCache.get(dto.getRun_id()) != null) { - log.debug("Processor - 이미 캐시에 존재 (skip)"); - return null; + // 캐시 체크 + if (runIdCache.get(runId) != null) { + return null; // 이미 캐시에 존재하면 skip } - if (kubeflowRunRepository.existsByRunId(dto.getRun_id())) { - log.debug("Processor - DB에 이미 존재 (skip)"); - runIdCache.put(dto.getRun_id(), true); + // DB 체크 + if (kubeflowRunRepository.existsByRunId(runId)) { + runIdCache.put(runId, true); // DB에 존재하면 캐시에 추가 return null; } + // 신규 엔티티 생성 KubeflowRunEntity entity = new KubeflowRunEntity(); - entity.setRunId(dto.getRun_id()); + entity.setRunId(runId); entity.setExperimentId(dto.getExperiment_id()); entity.setDisplayName(dto.getDisplay_name()); entity.setStorageState(dto.getStorage_state()); @@ -149,7 +137,8 @@ public class KubeflowRunBatchConfig { entity.setFinishedAt(Instant.parse(dto.getFinished_at())); entity.setState(dto.getState()); - runIdCache.put(dto.getRun_id(), true); + // 캐시에 추가 + runIdCache.put(runId, true); return entity; }; @@ -157,10 +146,6 @@ public class KubeflowRunBatchConfig { @Bean public ItemWriter runWriter() { - return items -> { - log.info("Writer - {}개 아이템 저장 시작", items.size()); - kubeflowRunRepository.saveAll(items); - log.info("Writer - 저장 완료"); - }; + return items -> kubeflowRunRepository.saveAll(items); } }