[추가] Run 생성 및 Spring Batch 통합을 통한 runSyncJob 트리거를 위한 비동기 이벤트 기반 흐름

main
bjkim 9 months ago
parent 53c011a682
commit b99b509c28

@ -23,8 +23,8 @@ import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.*;
import java.util.Optional; import java.util.stream.Collectors;
@Configuration @Configuration
@EnableBatchProcessing @EnableBatchProcessing
@ -34,12 +34,9 @@ public class KubeflowRunBatchConfig {
private final JobRepository jobRepository; private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager; private final PlatformTransactionManager transactionManager;
private final KubeflowRunRepository kubeflowRunRepository; private final KubeflowRunRepository kubeflowRunRepository;
private final CacheManager cacheManager; // 주입 private final CacheManager cacheManager;
// 최신 데이터 몇 개만 가져올지
private static final int PAGE_SIZE = 10; private static final int PAGE_SIZE = 10;
private static final String SORT_BY = "created_at DESC"; private static final String SORT_BY = "created_at DESC";
@Bean @Bean
@ -48,7 +45,7 @@ public class KubeflowRunBatchConfig {
.baseUrl("http://192.168.10.135:32473") .baseUrl("http://192.168.10.135:32473")
.exchangeStrategies(ExchangeStrategies.builder() .exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs() .codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼 .maxInMemorySize(16 * 1024 * 1024))
.build()); .build());
} }
@ -77,7 +74,6 @@ public class KubeflowRunBatchConfig {
@Override @Override
public KubeflowRunRequest read() { public KubeflowRunRequest read() {
// 페이지 끝나면 항상 최신 10개 fetch
if (index >= runs.size()) { if (index >= runs.size()) {
WebClient client = webClientBuilder().build(); WebClient client = webClientBuilder().build();
KubeflowRunResponse response = client.get() KubeflowRunResponse response = client.get()
@ -90,10 +86,17 @@ public class KubeflowRunBatchConfig {
.bodyToMono(KubeflowRunResponse.class) .bodyToMono(KubeflowRunResponse.class)
.block(); .block();
runs = response != null ? response.getRuns() : List.of(); if (response == null || response.getRuns().isEmpty()) {
return null; // 데이터 없음 -> Step 종료
}
// DB에 이미 존재하는 run_id는 제거
runs = response.getRuns().stream()
.filter(r -> !kubeflowRunRepository.existsByRunId(r.getRun_id()))
.collect(Collectors.toList());
index = 0; index = 0;
// 데이터가 없으면 Step 종료
if (runs.isEmpty()) { if (runs.isEmpty()) {
return null; return null;
} }
@ -107,22 +110,22 @@ public class KubeflowRunBatchConfig {
@Bean @Bean
public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() { public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() {
Cache runIdCache = cacheManager.getCache("runIdCache"); Cache runIdCache = cacheManager.getCache("runIdCache");
Set<String> seenRunIds = new HashSet<>(); // chunk 내 중복 방지
return dto -> { return dto -> {
String runId = dto.getRun_id(); String runId = dto.getRun_id();
// 캐시 체크 if (seenRunIds.contains(runId)) {
if (runIdCache.get(runId) != null) { return null; // chunk 내 중복 skip
return null; // 이미 캐시에 존재하면 skip
} }
// DB 체크 if (runIdCache.get(runId) != null || kubeflowRunRepository.existsByRunId(runId)) {
if (kubeflowRunRepository.existsByRunId(runId)) { runIdCache.put(runId, true);
runIdCache.put(runId, true); // DB에 존재하면 캐시에 추가 return null; // 이미 캐시 혹은 DB 존재
return null;
} }
// 신규 엔티티 생성 seenRunIds.add(runId);
KubeflowRunEntity entity = new KubeflowRunEntity(); KubeflowRunEntity entity = new KubeflowRunEntity();
entity.setRunId(runId); entity.setRunId(runId);
entity.setExperimentId(dto.getExperiment_id()); entity.setExperimentId(dto.getExperiment_id());
@ -137,8 +140,7 @@ public class KubeflowRunBatchConfig {
entity.setFinishedAt(Instant.parse(dto.getFinished_at())); entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
entity.setState(dto.getState()); entity.setState(dto.getState());
// 캐시에 추가 runIdCache.put(runId, true); // 캐시에 추가
runIdCache.put(runId, true);
return entity; return entity;
}; };
@ -146,6 +148,6 @@ public class KubeflowRunBatchConfig {
@Bean @Bean
public ItemWriter<KubeflowRunEntity> runWriter() { public ItemWriter<KubeflowRunEntity> runWriter() {
return items -> kubeflowRunRepository.saveAll(items); return kubeflowRunRepository::saveAll;
} }
} }

@ -0,0 +1,4 @@
package kr.re.etri.autoflow.payload.request;
// 이벤트 클래스
public record RunCreatedEvent(String runId) {}

@ -1,9 +1,15 @@
package kr.re.etri.autoflow.service; package kr.re.etri.autoflow.service;
import kr.re.etri.autoflow.payload.request.CreateRunRequest; import kr.re.etri.autoflow.payload.request.CreateRunRequest;
import kr.re.etri.autoflow.payload.request.RunCreatedEvent;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.http.HttpEntity; import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
@ -20,6 +26,7 @@ import reactor.core.publisher.Mono;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -33,6 +40,9 @@ public class PipelineUploadService {
private final WebClient webClient; private final WebClient webClient;
@Autowired
private ApplicationEventPublisher eventPublisher;
/** /**
* Pipeline * Pipeline
@ -71,24 +81,26 @@ public class PipelineUploadService {
* runRequest display_name, pipeline_version_reference, runtime_config * runRequest display_name, pipeline_version_reference, runtime_config
*/ */
public Map<String, Object> createRun(CreateRunRequest runRequest) { public Map<String, Object> createRun(CreateRunRequest runRequest) {
if (runRequest.getDisplay_name() == null || runRequest.getDisplay_name().isBlank()) { // Run 생성 결과를 동기적으로 받아 반환
throw new IllegalArgumentException("Run display_name cannot be empty"); Map result = webClient.post()
}
try {
Mono<Map> responseMono = webClient.post()
.uri(kubeflowBaseUrl + "/apis/v2beta1/runs") .uri(kubeflowBaseUrl + "/apis/v2beta1/runs")
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(runRequest)) .bodyValue(runRequest)
.retrieve() .retrieve()
.bodyToMono(Map.class); .bodyToMono(Map.class)
.block(); // 반환은 Map이므로 여기서는 block 유지
return responseMono.block(); // 동기 호출 // 이벤트 발행만 비동기로 처리
} catch (Exception e) { if (result != null && result.get("run_id") != null) {
throw new RuntimeException("Kubeflow Run creation failed", e); String runId = (String) result.get("run_id");
// 이벤트 발행을 비동기 스레드에서 실행
CompletableFuture.runAsync(() -> eventPublisher.publishEvent(new RunCreatedEvent(runId)));
} }
return result;
} }
/** /**
* Experiments * Experiments
*/ */

@ -0,0 +1,35 @@
package kr.re.etri.autoflow.service;
import kr.re.etri.autoflow.payload.request.RunCreatedEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
public class RunCreatedListener {
private final JobLauncher jobLauncher;
private final Job runSyncJob;
@Async
@EventListener
public void onRunCreated(RunCreatedEvent event) {
try {
jobLauncher.run(
runSyncJob,
new JobParametersBuilder()
.addString("runId", event.runId())
.addLong("time", System.currentTimeMillis()) // 중복 실행 방지용
.toJobParameters()
);
} catch (Exception e) {
// 로깅 처리
}
}
}
Loading…
Cancel
Save