[ADD] JobExplorer 기반의 Job 실행 중 여부 체크 로직 추가 및 KubeflowRunBatchConfig 주석 개선

main
bjkim 8 months ago
parent 5f9f29c43a
commit e90a59ec29

@ -1,28 +1,50 @@
package kr.re.etri.autoflow.batch;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Slf4j
@Configuration
@EnableScheduling
@RequiredArgsConstructor
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job runSyncJob; // ✅ Spring Batch의 Job 타입
private final JobExplorer jobExplorer; // 실행 중 Job 확인용
private final Job runSyncJob;
@Scheduled(fixedRate = 60000) // 1분마다 실행
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // 중복 실행 방지
.toJobParameters();
/**
* Kubeflow run Job
* - 1
* - Job
*/
@Scheduled(fixedDelay = 60000) // 이전 실행이 끝난 후 1분 뒤 다시 실행
public void runJob() {
try {
boolean running = !jobExplorer.findRunningJobExecutions("runSyncJob").isEmpty();
jobLauncher.run(runSyncJob, params);
if (running) {
log.info("runSyncJob is already running. Skipping this schedule.");
return;
}
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
log.info("Launching runSyncJob...");
jobLauncher.run(runSyncJob, params);
} catch (Exception e) {
log.error("Error launching runSyncJob", e);
}
}
}
}

@ -66,6 +66,10 @@ public class KubeflowRunBatchConfig {
.build();
}
/**
* Reader: Kubeflow API run .
* DB Processor .
*/
@Bean
public ItemReader<KubeflowRunRequest> runReader() {
return new ItemReader<>() {
@ -87,19 +91,11 @@ public class KubeflowRunBatchConfig {
.block();
if (response == null || response.getRuns().isEmpty()) {
return null; // 데이터 없음 -> Step 종료
return null; // 데이터 없음 Step 종료
}
// DB에 이미 존재하는 run_id는 제거
runs = response.getRuns().stream()
.filter(r -> !kubeflowRunRepository.existsByRunId(r.getRun_id()))
.collect(Collectors.toList());
runs = response.getRuns();
index = 0;
if (runs.isEmpty()) {
return null;
}
}
return runs.get(index++);
@ -107,21 +103,26 @@ public class KubeflowRunBatchConfig {
};
}
/**
* Processor: DB .
*/
@Bean
public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() {
Cache runIdCache = cacheManager.getCache("runIdCache");
Set<String> seenRunIds = new HashSet<>(); // chunk 내 중복 방지
Set<String> seenRunIds = new HashSet<>();
return dto -> {
String runId = dto.getRun_id();
// chunk 내 중복
if (seenRunIds.contains(runId)) {
return null; // chunk 내 중복 skip
return null;
}
// 캐시 또는 DB 중복 검사
if (runIdCache.get(runId) != null || kubeflowRunRepository.existsByRunId(runId)) {
runIdCache.put(runId, true);
return null; // 이미 캐시 혹은 DB 존재
return null;
}
seenRunIds.add(runId);
@ -132,16 +133,19 @@ public class KubeflowRunBatchConfig {
entity.setDisplayName(dto.getDisplay_name());
entity.setStorageState(dto.getStorage_state());
entity.setDescription(dto.getDescription());
entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id());
entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id());
if (dto.getPipeline_version_reference() != null) {
entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id());
entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id());
}
entity.setServiceAccount(dto.getService_account());
entity.setCreatedAt(Instant.parse(dto.getCreated_at()));
entity.setScheduledAt(Instant.parse(dto.getScheduled_at()));
entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
entity.setState(dto.getState());
runIdCache.put(runId, true); // 캐시에 추가
runIdCache.put(runId, true);
return entity;
};
}

Loading…
Cancel
Save