From e90a59ec29eed91567e3a9b687c8761c6cbf3a47 Mon Sep 17 00:00:00 2001 From: bjkim Date: Tue, 14 Oct 2025 09:35:53 +0900 Subject: [PATCH] =?UTF-8?q?[ADD]=20JobExplorer=20=EA=B8=B0=EB=B0=98?= =?UTF-8?q?=EC=9D=98=20Job=20=EC=8B=A4=ED=96=89=20=EC=A4=91=20=EC=97=AC?= =?UTF-8?q?=EB=B6=80=20=EC=B2=B4=ED=81=AC=20=EB=A1=9C=EC=A7=81=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20=EB=B0=8F=20KubeflowRunBatchConfig=20=EC=A3=BC?= =?UTF-8?q?=EC=84=9D=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../etri/autoflow/batch/BatchScheduler.java | 38 +++++++++++++++---- .../batch/KubeflowRunBatchConfig.java | 38 ++++++++++--------- 2 files changed, 51 insertions(+), 25 deletions(-) diff --git a/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java b/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java index 329ecd1..1e8e366 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java +++ b/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java @@ -1,28 +1,50 @@ package kr.re.etri.autoflow.batch; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.explore.JobExplorer; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; +@Slf4j @Configuration @EnableScheduling @RequiredArgsConstructor public class BatchScheduler { private final JobLauncher jobLauncher; - private final Job runSyncJob; // ✅ Spring Batch의 Job 타입 + private final JobExplorer jobExplorer; // 실행 중 Job 확인용 + private final Job runSyncJob; - @Scheduled(fixedRate = 60000) // 1분마다 실행 - public void runJob() throws Exception { - JobParameters params = new JobParametersBuilder() - .addLong("timestamp", System.currentTimeMillis()) // 중복 실행 방지 - .toJobParameters(); + /** + * Kubeflow run 동기화 배치 Job + * - 1분마다 실행 시도 + * - 실행 중인 Job이 있으면 스킵 + */ + @Scheduled(fixedDelay = 60000) // 이전 실행이 끝난 후 1분 뒤 다시 실행 + public void runJob() { + try { + boolean running = !jobExplorer.findRunningJobExecutions("runSyncJob").isEmpty(); - jobLauncher.run(runSyncJob, params); + if (running) { + log.info("runSyncJob is already running. Skipping this schedule."); + return; + } + + JobParameters params = new JobParametersBuilder() + .addLong("timestamp", System.currentTimeMillis()) + .toJobParameters(); + + log.info("Launching runSyncJob..."); + jobLauncher.run(runSyncJob, params); + + } catch (Exception e) { + log.error("Error launching runSyncJob", e); + } } -} +} \ No newline at end of file diff --git a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java index 43be944..068b022 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java +++ b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java @@ -66,6 +66,10 @@ public class KubeflowRunBatchConfig { .build(); } + /** + * Reader: Kubeflow API에서 run 목록을 불러오고 그대로 반환. + * DB 존재 여부는 Processor에서 필터링. + */ @Bean public ItemReader runReader() { return new ItemReader<>() { @@ -87,19 +91,11 @@ public class KubeflowRunBatchConfig { .block(); if (response == null || response.getRuns().isEmpty()) { - return null; // 데이터 없음 -> Step 종료 + return null; // 데이터 없음 → Step 종료 } - // DB에 이미 존재하는 run_id는 제거 - runs = response.getRuns().stream() - .filter(r -> !kubeflowRunRepository.existsByRunId(r.getRun_id())) - .collect(Collectors.toList()); - + runs = response.getRuns(); index = 0; - - if (runs.isEmpty()) { - return null; - } } return runs.get(index++); @@ -107,21 +103,26 @@ public class KubeflowRunBatchConfig { }; } + /** + * Processor: 캐시 및 DB를 이용해 중복 제거 후 엔티티 생성. + */ @Bean public ItemProcessor runProcessor() { Cache runIdCache = cacheManager.getCache("runIdCache"); - Set seenRunIds = new HashSet<>(); // chunk 내 중복 방지 + Set seenRunIds = new HashSet<>(); return dto -> { String runId = dto.getRun_id(); + // chunk 내 중복 if (seenRunIds.contains(runId)) { - return null; // chunk 내 중복 skip + return null; } + // 캐시 또는 DB 중복 검사 if (runIdCache.get(runId) != null || kubeflowRunRepository.existsByRunId(runId)) { runIdCache.put(runId, true); - return null; // 이미 캐시 혹은 DB 존재 + return null; } seenRunIds.add(runId); @@ -132,16 +133,19 @@ public class KubeflowRunBatchConfig { entity.setDisplayName(dto.getDisplay_name()); entity.setStorageState(dto.getStorage_state()); entity.setDescription(dto.getDescription()); - entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id()); - entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id()); + + if (dto.getPipeline_version_reference() != null) { + entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id()); + entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id()); + } + entity.setServiceAccount(dto.getService_account()); entity.setCreatedAt(Instant.parse(dto.getCreated_at())); entity.setScheduledAt(Instant.parse(dto.getScheduled_at())); entity.setFinishedAt(Instant.parse(dto.getFinished_at())); entity.setState(dto.getState()); - runIdCache.put(runId, true); // 캐시에 추가 - + runIdCache.put(runId, true); return entity; }; }