|
|
|
@ -5,7 +5,6 @@ import kr.re.etri.autoflow.payload.request.KubeflowRunRequest;
|
|
|
|
import kr.re.etri.autoflow.payload.response.KubeflowRunResponse;
|
|
|
|
import kr.re.etri.autoflow.payload.response.KubeflowRunResponse;
|
|
|
|
import kr.re.etri.autoflow.repository.KubeflowRunRepository;
|
|
|
|
import kr.re.etri.autoflow.repository.KubeflowRunRepository;
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
import org.springframework.batch.core.Job;
|
|
|
|
import org.springframework.batch.core.Job;
|
|
|
|
import org.springframework.batch.core.Step;
|
|
|
|
import org.springframework.batch.core.Step;
|
|
|
|
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
|
|
|
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
|
|
|
@ -25,8 +24,8 @@ import org.springframework.web.reactive.function.client.WebClient;
|
|
|
|
|
|
|
|
|
|
|
|
import java.time.Instant;
|
|
|
|
import java.time.Instant;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.List;
|
|
|
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
|
|
|
|
|
|
|
@Slf4j
|
|
|
|
|
|
|
|
@Configuration
|
|
|
|
@Configuration
|
|
|
|
@EnableBatchProcessing
|
|
|
|
@EnableBatchProcessing
|
|
|
|
@RequiredArgsConstructor
|
|
|
|
@RequiredArgsConstructor
|
|
|
|
@ -35,9 +34,12 @@ public class KubeflowRunBatchConfig {
|
|
|
|
private final JobRepository jobRepository;
|
|
|
|
private final JobRepository jobRepository;
|
|
|
|
private final PlatformTransactionManager transactionManager;
|
|
|
|
private final PlatformTransactionManager transactionManager;
|
|
|
|
private final KubeflowRunRepository kubeflowRunRepository;
|
|
|
|
private final KubeflowRunRepository kubeflowRunRepository;
|
|
|
|
private final CacheManager cacheManager;
|
|
|
|
private final CacheManager cacheManager; // 주입
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 최신 데이터 몇 개만 가져올지
|
|
|
|
private static final int PAGE_SIZE = 10;
|
|
|
|
private static final int PAGE_SIZE = 10;
|
|
|
|
|
|
|
|
|
|
|
|
private static final String SORT_BY = "created_at DESC";
|
|
|
|
private static final String SORT_BY = "created_at DESC";
|
|
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
@Bean
|
|
|
|
@ -46,7 +48,7 @@ public class KubeflowRunBatchConfig {
|
|
|
|
.baseUrl("http://192.168.10.135:32473")
|
|
|
|
.baseUrl("http://192.168.10.135:32473")
|
|
|
|
.exchangeStrategies(ExchangeStrategies.builder()
|
|
|
|
.exchangeStrategies(ExchangeStrategies.builder()
|
|
|
|
.codecs(configurer -> configurer.defaultCodecs()
|
|
|
|
.codecs(configurer -> configurer.defaultCodecs()
|
|
|
|
.maxInMemorySize(16 * 1024 * 1024))
|
|
|
|
.maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼
|
|
|
|
.build());
|
|
|
|
.build());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -72,21 +74,11 @@ public class KubeflowRunBatchConfig {
|
|
|
|
return new ItemReader<>() {
|
|
|
|
return new ItemReader<>() {
|
|
|
|
private List<KubeflowRunRequest> runs = List.of();
|
|
|
|
private List<KubeflowRunRequest> runs = List.of();
|
|
|
|
private int index = 0;
|
|
|
|
private int index = 0;
|
|
|
|
private int apiCallCount = 0;
|
|
|
|
|
|
|
|
private int readCount = 0;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public KubeflowRunRequest read() {
|
|
|
|
public KubeflowRunRequest read() {
|
|
|
|
readCount++;
|
|
|
|
// 페이지 끝나면 항상 최신 10개 fetch
|
|
|
|
|
|
|
|
|
|
|
|
if (readCount > PAGE_SIZE * 2) {
|
|
|
|
|
|
|
|
log.info("Reader - 최대 {}건 읽어서 강제 종료", PAGE_SIZE * 2);
|
|
|
|
|
|
|
|
return null; // Step 종료
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (index >= runs.size()) {
|
|
|
|
if (index >= runs.size()) {
|
|
|
|
log.info("Reader - API 호출 시작 (지금까지 read 횟수: {})", readCount);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
WebClient client = webClientBuilder().build();
|
|
|
|
WebClient client = webClientBuilder().build();
|
|
|
|
KubeflowRunResponse response = client.get()
|
|
|
|
KubeflowRunResponse response = client.get()
|
|
|
|
.uri(uriBuilder -> uriBuilder
|
|
|
|
.uri(uriBuilder -> uriBuilder
|
|
|
|
@ -98,21 +90,16 @@ public class KubeflowRunBatchConfig {
|
|
|
|
.bodyToMono(KubeflowRunResponse.class)
|
|
|
|
.bodyToMono(KubeflowRunResponse.class)
|
|
|
|
.block();
|
|
|
|
.block();
|
|
|
|
|
|
|
|
|
|
|
|
apiCallCount++;
|
|
|
|
|
|
|
|
runs = response != null ? response.getRuns() : List.of();
|
|
|
|
runs = response != null ? response.getRuns() : List.of();
|
|
|
|
index = 0;
|
|
|
|
index = 0;
|
|
|
|
|
|
|
|
|
|
|
|
log.info("Reader - API 호출 완료 (총 호출 횟수: {}, 가져온 데이터 수: {})", apiCallCount, runs.size());
|
|
|
|
// 데이터가 없으면 Step 종료
|
|
|
|
|
|
|
|
|
|
|
|
if (runs.isEmpty()) {
|
|
|
|
if (runs.isEmpty()) {
|
|
|
|
log.info("Reader - 더 이상 가져올 데이터 없음 → Step 종료");
|
|
|
|
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
KubeflowRunRequest item = runs.get(index++);
|
|
|
|
return runs.get(index++);
|
|
|
|
log.debug("Reader - {}번째 아이템 반환: {}", readCount, item.getRun_id());
|
|
|
|
|
|
|
|
return item;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
};
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -122,21 +109,22 @@ public class KubeflowRunBatchConfig {
|
|
|
|
Cache runIdCache = cacheManager.getCache("runIdCache");
|
|
|
|
Cache runIdCache = cacheManager.getCache("runIdCache");
|
|
|
|
|
|
|
|
|
|
|
|
return dto -> {
|
|
|
|
return dto -> {
|
|
|
|
log.debug("Processor - runId: {}", dto.getRun_id());
|
|
|
|
String runId = dto.getRun_id();
|
|
|
|
|
|
|
|
|
|
|
|
if (runIdCache.get(dto.getRun_id()) != null) {
|
|
|
|
// 캐시 체크
|
|
|
|
log.debug("Processor - 이미 캐시에 존재 (skip)");
|
|
|
|
if (runIdCache.get(runId) != null) {
|
|
|
|
return null;
|
|
|
|
return null; // 이미 캐시에 존재하면 skip
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (kubeflowRunRepository.existsByRunId(dto.getRun_id())) {
|
|
|
|
// DB 체크
|
|
|
|
log.debug("Processor - DB에 이미 존재 (skip)");
|
|
|
|
if (kubeflowRunRepository.existsByRunId(runId)) {
|
|
|
|
runIdCache.put(dto.getRun_id(), true);
|
|
|
|
runIdCache.put(runId, true); // DB에 존재하면 캐시에 추가
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 신규 엔티티 생성
|
|
|
|
KubeflowRunEntity entity = new KubeflowRunEntity();
|
|
|
|
KubeflowRunEntity entity = new KubeflowRunEntity();
|
|
|
|
entity.setRunId(dto.getRun_id());
|
|
|
|
entity.setRunId(runId);
|
|
|
|
entity.setExperimentId(dto.getExperiment_id());
|
|
|
|
entity.setExperimentId(dto.getExperiment_id());
|
|
|
|
entity.setDisplayName(dto.getDisplay_name());
|
|
|
|
entity.setDisplayName(dto.getDisplay_name());
|
|
|
|
entity.setStorageState(dto.getStorage_state());
|
|
|
|
entity.setStorageState(dto.getStorage_state());
|
|
|
|
@ -149,7 +137,8 @@ public class KubeflowRunBatchConfig {
|
|
|
|
entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
|
|
|
|
entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
|
|
|
|
entity.setState(dto.getState());
|
|
|
|
entity.setState(dto.getState());
|
|
|
|
|
|
|
|
|
|
|
|
runIdCache.put(dto.getRun_id(), true);
|
|
|
|
// 캐시에 추가
|
|
|
|
|
|
|
|
runIdCache.put(runId, true);
|
|
|
|
|
|
|
|
|
|
|
|
return entity;
|
|
|
|
return entity;
|
|
|
|
};
|
|
|
|
};
|
|
|
|
@ -157,10 +146,6 @@ public class KubeflowRunBatchConfig {
|
|
|
|
|
|
|
|
|
|
|
|
@Bean
|
|
|
|
@Bean
|
|
|
|
public ItemWriter<KubeflowRunEntity> runWriter() {
|
|
|
|
public ItemWriter<KubeflowRunEntity> runWriter() {
|
|
|
|
return items -> {
|
|
|
|
return items -> kubeflowRunRepository.saveAll(items);
|
|
|
|
log.info("Writer - {}개 아이템 저장 시작", items.size());
|
|
|
|
|
|
|
|
kubeflowRunRepository.saveAll(items);
|
|
|
|
|
|
|
|
log.info("Writer - 저장 완료");
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|