|
|
|
|
@ -22,6 +22,7 @@ import org.springframework.web.reactive.function.client.WebClient;
|
|
|
|
|
|
|
|
|
|
import java.time.Instant;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
|
|
|
|
|
@Configuration
|
|
|
|
|
@EnableBatchProcessing
|
|
|
|
|
@ -33,7 +34,9 @@ public class KubeflowRunBatchConfig {
|
|
|
|
|
private final KubeflowRunRepository kubeflowRunRepository;
|
|
|
|
|
|
|
|
|
|
// 최신 데이터 몇 개만 가져올지
|
|
|
|
|
private static final int PAGE_SIZE = 50;
|
|
|
|
|
private static final int PAGE_SIZE = 10;
|
|
|
|
|
|
|
|
|
|
private static final String SORT_BY = "created_at DESC";
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
public WebClient.Builder webClientBuilder() {
|
|
|
|
|
@ -65,35 +68,39 @@ public class KubeflowRunBatchConfig {
|
|
|
|
|
@Bean
|
|
|
|
|
public ItemReader<KubeflowRunRequest> runReader() {
|
|
|
|
|
return new ItemReader<>() {
|
|
|
|
|
private boolean read = false;
|
|
|
|
|
private List<KubeflowRunRequest> runs;
|
|
|
|
|
private List<KubeflowRunRequest> runs = List.of();
|
|
|
|
|
private int index = 0;
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public KubeflowRunRequest read() {
|
|
|
|
|
if (!read) {
|
|
|
|
|
// 페이지 끝나면 항상 최신 10개 fetch
|
|
|
|
|
if (index >= runs.size()) {
|
|
|
|
|
WebClient client = webClientBuilder().build();
|
|
|
|
|
KubeflowRunResponse response = client.get()
|
|
|
|
|
.uri(uriBuilder -> uriBuilder
|
|
|
|
|
.path("/apis/v2beta1/runs")
|
|
|
|
|
.queryParam("page_size", PAGE_SIZE)
|
|
|
|
|
.queryParam("sort_by", "created_at desc")
|
|
|
|
|
.build())
|
|
|
|
|
.retrieve()
|
|
|
|
|
.bodyToMono(KubeflowRunResponse.class)
|
|
|
|
|
.block();
|
|
|
|
|
|
|
|
|
|
runs = response != null ? response.getRuns() : List.of();
|
|
|
|
|
read = true;
|
|
|
|
|
}
|
|
|
|
|
index = 0;
|
|
|
|
|
|
|
|
|
|
if (runs != null && index < runs.size()) {
|
|
|
|
|
return runs.get(index++);
|
|
|
|
|
// 데이터가 없으면 Step 종료
|
|
|
|
|
if (runs.isEmpty()) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
|
|
return runs.get(index++);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
|
public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() {
|
|
|
|
|
return dto -> {
|
|
|
|
|
|