[MODIFY] BatchScheduler 실행 주기 단축 및 KubeflowRunBatchConfig WebClient와 Reader 개선, 로그 추가

main
bjkim 8 months ago
parent 56de5c1f72
commit 2584ec2b84

@ -15,9 +15,9 @@ import org.springframework.scheduling.annotation.Scheduled;
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job runSyncJob; // Spring Batch의 Job 타입
private final Job runSyncJob; // Spring Batch의 Job 타입
@Scheduled(fixedDelay = 60000) // 1분마다 실행
@Scheduled(fixedDelay = 5000) // 1분마다 실행
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // 중복 실행 방지

@ -5,9 +5,11 @@ import kr.re.etri.autoflow.payload.request.KubeflowRunRequest;
import kr.re.etri.autoflow.payload.response.KubeflowRunResponse;
import kr.re.etri.autoflow.repository.KubeflowRunRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
@ -18,17 +20,20 @@ import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
@Slf4j
public class KubeflowRunBatchConfig {
private final JobRepository jobRepository;
@ -36,13 +41,17 @@ public class KubeflowRunBatchConfig {
private final KubeflowRunRepository kubeflowRunRepository;
private final CacheManager cacheManager;
private static final int PAGE_SIZE = 10;
private static final int PAGE_SIZE = 50;
private static final String SORT_BY = "created_at DESC";
@Bean
public WebClient.Builder webClientBuilder() {
HttpClient httpClient = HttpClient.create()
.responseTimeout(Duration.ofSeconds(30)); // 응답 제한
return WebClient.builder()
.baseUrl("http://192.168.10.135:32473")
.clientConnector(new ReactorClientHttpConnector(httpClient))
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024))
@ -66,19 +75,16 @@ public class KubeflowRunBatchConfig {
.build();
}
/**
* Reader: Kubeflow API run .
* DB Processor .
*/
@Bean
@StepScope
public ItemReader<KubeflowRunRequest> runReader() {
return new ItemReader<>() {
private List<KubeflowRunRequest> runs = List.of();
private List<KubeflowRunRequest> runs = null; // null이면 아직 API 호출 안 함
private int index = 0;
@Override
public KubeflowRunRequest read() {
if (index >= runs.size()) {
if (runs == null) {
WebClient client = webClientBuilder().build();
KubeflowRunResponse response = client.get()
.uri(uriBuilder -> uriBuilder
@ -91,11 +97,17 @@ public class KubeflowRunBatchConfig {
.block();
if (response == null || response.getRuns().isEmpty()) {
return null; // 데이터 없음 → Step 종료
log.info("KubeflowRunBatch: 데이터 없음, 종료");
runs = Collections.emptyList();
return null;
}
runs = response.getRuns();
index = 0;
log.info("KubeflowRunBatch: {}건 조회 완료", runs.size());
}
if (index >= runs.size()) {
return null; // 모든 Item 처리 완료 시 null 반환
}
return runs.get(index++);
@ -103,56 +115,38 @@ public class KubeflowRunBatchConfig {
};
}
/**
* Processor: DB .
*/
@Bean
public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() {
Cache runIdCache = cacheManager.getCache("runIdCache");
Set<String> seenRunIds = new HashSet<>();
return dto -> {
String runId = dto.getRun_id();
// chunk 내 중복
if (seenRunIds.contains(runId)) {
return null;
}
// 캐시 또는 DB 중복 검사
assert runIdCache != null;
if (runIdCache.get(runId) != null || kubeflowRunRepository.existsByRunId(runId)) {
runIdCache.put(runId, true);
return null;
}
seenRunIds.add(runId);
KubeflowRunEntity entity = kubeflowRunRepository.findByRunId(dto.getRun_id())
.orElseGet(KubeflowRunEntity::new); // 없으면 새로 생성
KubeflowRunEntity entity = new KubeflowRunEntity();
entity.setRunId(runId);
// 모든 필드 업데이트
entity.setRunId(dto.getRun_id());
entity.setExperimentId(dto.getExperiment_id());
entity.setDisplayName(dto.getDisplay_name());
entity.setStorageState(dto.getStorage_state());
entity.setDescription(dto.getDescription());
if (dto.getPipeline_version_reference() != null) {
entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id());
entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id());
}
entity.setServiceAccount(dto.getService_account());
entity.setCreatedAt(Instant.parse(dto.getCreated_at()));
entity.setScheduledAt(Instant.parse(dto.getScheduled_at()));
entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
entity.setState(dto.getState());
runIdCache.put(runId, true);
return entity;
};
}
@Bean
public ItemWriter<KubeflowRunEntity> runWriter() {
return kubeflowRunRepository::saveAll;
return items -> {
kubeflowRunRepository.saveAll(items);
log.info("KubeflowRunBatch: {}건 DB 저장 완료", items.size());
};
}
}

Loading…
Cancel
Save