diff --git a/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java b/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java index 3133dcd..f7ee018 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java +++ b/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java @@ -15,9 +15,9 @@ import org.springframework.scheduling.annotation.Scheduled; public class BatchScheduler { private final JobLauncher jobLauncher; - private final Job runSyncJob; // ✅ Spring Batch의 Job 타입 + private final Job runSyncJob; // Spring Batch의 Job 타입 - @Scheduled(fixedDelay = 60000) // 1분마다 실행 + @Scheduled(fixedDelay = 5000) // 1분마다 실행 public void runJob() throws Exception { JobParameters params = new JobParametersBuilder() .addLong("timestamp", System.currentTimeMillis()) // 중복 실행 방지 diff --git a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java index 3b81fce..cfea56c 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java +++ b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java @@ -5,9 +5,11 @@ import kr.re.etri.autoflow.payload.request.KubeflowRunRequest; import kr.re.etri.autoflow.payload.response.KubeflowRunResponse; import kr.re.etri.autoflow.repository.KubeflowRunRepository; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; @@ -18,17 +20,20 @@ import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; +import java.time.Duration; import java.time.Instant; import java.util.*; -import java.util.stream.Collectors; @Configuration @EnableBatchProcessing @RequiredArgsConstructor +@Slf4j public class KubeflowRunBatchConfig { private final JobRepository jobRepository; @@ -36,13 +41,17 @@ public class KubeflowRunBatchConfig { private final KubeflowRunRepository kubeflowRunRepository; private final CacheManager cacheManager; - private static final int PAGE_SIZE = 10; + private static final int PAGE_SIZE = 50; private static final String SORT_BY = "created_at DESC"; @Bean public WebClient.Builder webClientBuilder() { + HttpClient httpClient = HttpClient.create() + .responseTimeout(Duration.ofSeconds(30)); // 응답 제한 + return WebClient.builder() .baseUrl("http://192.168.10.135:32473") + .clientConnector(new ReactorClientHttpConnector(httpClient)) .exchangeStrategies(ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() .maxInMemorySize(16 * 1024 * 1024)) @@ -66,19 +75,16 @@ public class KubeflowRunBatchConfig { .build(); } - /** - * Reader: Kubeflow API에서 run 목록을 불러오고 그대로 반환. - * DB 존재 여부는 Processor에서 필터링. - */ @Bean + @StepScope public ItemReader runReader() { return new ItemReader<>() { - private List runs = List.of(); + private List runs = null; // null이면 아직 API 호출 안 함 private int index = 0; @Override public KubeflowRunRequest read() { - if (index >= runs.size()) { + if (runs == null) { WebClient client = webClientBuilder().build(); KubeflowRunResponse response = client.get() .uri(uriBuilder -> uriBuilder @@ -91,11 +97,17 @@ public class KubeflowRunBatchConfig { .block(); if (response == null || response.getRuns().isEmpty()) { - return null; // 데이터 없음 → Step 종료 + log.info("KubeflowRunBatch: 데이터 없음, 종료"); + runs = Collections.emptyList(); + return null; } runs = response.getRuns(); - index = 0; + log.info("KubeflowRunBatch: {}건 조회 완료", runs.size()); + } + + if (index >= runs.size()) { + return null; // 모든 Item 처리 완료 시 null 반환 } return runs.get(index++); @@ -103,56 +115,38 @@ public class KubeflowRunBatchConfig { }; } - /** - * Processor: 캐시 및 DB를 이용해 중복 제거 후 엔티티 생성. - */ @Bean public ItemProcessor runProcessor() { - Cache runIdCache = cacheManager.getCache("runIdCache"); - Set seenRunIds = new HashSet<>(); - return dto -> { - String runId = dto.getRun_id(); - - // chunk 내 중복 - if (seenRunIds.contains(runId)) { - return null; - } + KubeflowRunEntity entity = kubeflowRunRepository.findByRunId(dto.getRun_id()) + .orElseGet(KubeflowRunEntity::new); // 없으면 새로 생성 - // 캐시 또는 DB 중복 검사 - assert runIdCache != null; - if (runIdCache.get(runId) != null || kubeflowRunRepository.existsByRunId(runId)) { - runIdCache.put(runId, true); - return null; - } - - seenRunIds.add(runId); - - KubeflowRunEntity entity = new KubeflowRunEntity(); - entity.setRunId(runId); + // 모든 필드 업데이트 + entity.setRunId(dto.getRun_id()); entity.setExperimentId(dto.getExperiment_id()); entity.setDisplayName(dto.getDisplay_name()); entity.setStorageState(dto.getStorage_state()); entity.setDescription(dto.getDescription()); - if (dto.getPipeline_version_reference() != null) { entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id()); entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id()); } - entity.setServiceAccount(dto.getService_account()); entity.setCreatedAt(Instant.parse(dto.getCreated_at())); entity.setScheduledAt(Instant.parse(dto.getScheduled_at())); entity.setFinishedAt(Instant.parse(dto.getFinished_at())); entity.setState(dto.getState()); - runIdCache.put(runId, true); return entity; }; } + @Bean public ItemWriter runWriter() { - return kubeflowRunRepository::saveAll; + return items -> { + kubeflowRunRepository.saveAll(items); + log.info("KubeflowRunBatch: {}건 DB 저장 완료", items.size()); + }; } }