diff --git a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java index 7b5d245..43be944 100644 --- a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java +++ b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java @@ -23,8 +23,8 @@ import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import java.time.Instant; -import java.util.List; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; @Configuration @EnableBatchProcessing @@ -34,12 +34,9 @@ public class KubeflowRunBatchConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final KubeflowRunRepository kubeflowRunRepository; - private final CacheManager cacheManager; // 주입 + private final CacheManager cacheManager; - - // 최신 데이터 몇 개만 가져올지 private static final int PAGE_SIZE = 10; - private static final String SORT_BY = "created_at DESC"; @Bean @@ -48,7 +45,7 @@ public class KubeflowRunBatchConfig { .baseUrl("http://192.168.10.135:32473") .exchangeStrategies(ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() - .maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼 + .maxInMemorySize(16 * 1024 * 1024)) .build()); } @@ -77,7 +74,6 @@ public class KubeflowRunBatchConfig { @Override public KubeflowRunRequest read() { - // 페이지 끝나면 항상 최신 10개 fetch if (index >= runs.size()) { WebClient client = webClientBuilder().build(); KubeflowRunResponse response = client.get() @@ -90,10 +86,17 @@ public class KubeflowRunBatchConfig { .bodyToMono(KubeflowRunResponse.class) .block(); - runs = response != null ? response.getRuns() : List.of(); + if (response == null || response.getRuns().isEmpty()) { + return null; // 데이터 없음 -> Step 종료 + } + + // DB에 이미 존재하는 run_id는 제거 + runs = response.getRuns().stream() + .filter(r -> !kubeflowRunRepository.existsByRunId(r.getRun_id())) + .collect(Collectors.toList()); + index = 0; - // 데이터가 없으면 Step 종료 if (runs.isEmpty()) { return null; } @@ -107,22 +110,22 @@ public class KubeflowRunBatchConfig { @Bean public ItemProcessor runProcessor() { Cache runIdCache = cacheManager.getCache("runIdCache"); + Set seenRunIds = new HashSet<>(); // chunk 내 중복 방지 return dto -> { String runId = dto.getRun_id(); - // 캐시 체크 - if (runIdCache.get(runId) != null) { - return null; // 이미 캐시에 존재하면 skip + if (seenRunIds.contains(runId)) { + return null; // chunk 내 중복 skip } - // DB 체크 - if (kubeflowRunRepository.existsByRunId(runId)) { - runIdCache.put(runId, true); // DB에 존재하면 캐시에 추가 - return null; + if (runIdCache.get(runId) != null || kubeflowRunRepository.existsByRunId(runId)) { + runIdCache.put(runId, true); + return null; // 이미 캐시 혹은 DB 존재 } - // 신규 엔티티 생성 + seenRunIds.add(runId); + KubeflowRunEntity entity = new KubeflowRunEntity(); entity.setRunId(runId); entity.setExperimentId(dto.getExperiment_id()); @@ -137,8 +140,7 @@ public class KubeflowRunBatchConfig { entity.setFinishedAt(Instant.parse(dto.getFinished_at())); entity.setState(dto.getState()); - // 캐시에 추가 - runIdCache.put(runId, true); + runIdCache.put(runId, true); // 캐시에 추가 return entity; }; @@ -146,6 +148,6 @@ public class KubeflowRunBatchConfig { @Bean public ItemWriter runWriter() { - return items -> kubeflowRunRepository.saveAll(items); + return kubeflowRunRepository::saveAll; } } diff --git a/src/main/java/kr/re/etri/autoflow/payload/request/RunCreatedEvent.java b/src/main/java/kr/re/etri/autoflow/payload/request/RunCreatedEvent.java new file mode 100644 index 0000000..12a4bf8 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/payload/request/RunCreatedEvent.java @@ -0,0 +1,4 @@ +package kr.re.etri.autoflow.payload.request; + +// 이벤트 클래스 +public record RunCreatedEvent(String runId) {} diff --git a/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java b/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java index 9ec2341..cd13eb8 100644 --- a/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java +++ b/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java @@ -1,9 +1,15 @@ package kr.re.etri.autoflow.service; import kr.re.etri.autoflow.payload.request.CreateRunRequest; +import kr.re.etri.autoflow.payload.request.RunCreatedEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; @@ -20,6 +26,7 @@ import reactor.core.publisher.Mono; import java.io.IOException; import java.util.Map; +import java.util.concurrent.CompletableFuture; @Service @RequiredArgsConstructor @@ -33,6 +40,9 @@ public class PipelineUploadService { private final WebClient webClient; + @Autowired + private ApplicationEventPublisher eventPublisher; + /** * Pipeline 업로드 @@ -71,24 +81,26 @@ public class PipelineUploadService { * runRequest에는 display_name, pipeline_version_reference, runtime_config 등이 포함되어야 함 */ public Map createRun(CreateRunRequest runRequest) { - if (runRequest.getDisplay_name() == null || runRequest.getDisplay_name().isBlank()) { - throw new IllegalArgumentException("Run display_name cannot be empty"); + // Run 생성 결과를 동기적으로 받아 반환 + Map result = webClient.post() + .uri(kubeflowBaseUrl + "/apis/v2beta1/runs") + .contentType(MediaType.APPLICATION_JSON) + .bodyValue(runRequest) + .retrieve() + .bodyToMono(Map.class) + .block(); // 반환은 Map이므로 여기서는 block 유지 + + // 이벤트 발행만 비동기로 처리 + if (result != null && result.get("run_id") != null) { + String runId = (String) result.get("run_id"); + // 이벤트 발행을 비동기 스레드에서 실행 + CompletableFuture.runAsync(() -> eventPublisher.publishEvent(new RunCreatedEvent(runId))); } - try { - Mono responseMono = webClient.post() - .uri(kubeflowBaseUrl + "/apis/v2beta1/runs") - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(runRequest)) - .retrieve() - .bodyToMono(Map.class); - - return responseMono.block(); // 동기 호출 - } catch (Exception e) { - throw new RuntimeException("Kubeflow Run creation failed", e); - } + return result; } + /** * Experiments 조회 */ diff --git a/src/main/java/kr/re/etri/autoflow/service/RunCreatedListener.java b/src/main/java/kr/re/etri/autoflow/service/RunCreatedListener.java new file mode 100644 index 0000000..862450f --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/service/RunCreatedListener.java @@ -0,0 +1,35 @@ +package kr.re.etri.autoflow.service; + +import kr.re.etri.autoflow.payload.request.RunCreatedEvent; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class RunCreatedListener { + + private final JobLauncher jobLauncher; + private final Job runSyncJob; + + @Async + @EventListener + public void onRunCreated(RunCreatedEvent event) { + try { + jobLauncher.run( + runSyncJob, + new JobParametersBuilder() + .addString("runId", event.runId()) + .addLong("time", System.currentTimeMillis()) // 중복 실행 방지용 + .toJobParameters() + ); + } catch (Exception e) { + // 로깅 처리 + } + } +}