[ADD] Caffeine 기반 캐시 구성 추가 및 KubeflowRunBatchConfig에 캐싱 로직 적용

main
bjkim 9 months ago
parent c134b6a361
commit 5cd4816791

@ -69,7 +69,8 @@ dependencies {
implementation("io.minio:minio:8.5.17")
}
implementation("org.springframework.boot:spring-boot-starter-cache") // 캐시 지원
implementation("com.github.ben-manes.caffeine:caffeine:3.2.2")}
// Java 컴파일 인코딩 및 파라미터 리플렉션 지원
tasks.withType<JavaCompile> {

@ -5,6 +5,7 @@ import kr.re.etri.autoflow.payload.request.KubeflowRunRequest;
import kr.re.etri.autoflow.payload.response.KubeflowRunResponse;
import kr.re.etri.autoflow.repository.KubeflowRunRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
@ -14,6 +15,8 @@ import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@ -22,8 +25,8 @@ import org.springframework.web.reactive.function.client.WebClient;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
@Slf4j
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
@ -32,10 +35,9 @@ public class KubeflowRunBatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final KubeflowRunRepository kubeflowRunRepository;
private final CacheManager cacheManager;
// 최신 데이터 몇 개만 가져올지
private static final int PAGE_SIZE = 10;
private static final String SORT_BY = "created_at DESC";
@Bean
@ -44,7 +46,7 @@ public class KubeflowRunBatchConfig {
.baseUrl("http://192.168.10.135:32473")
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼
.maxInMemorySize(16 * 1024 * 1024))
.build());
}
@ -70,43 +72,67 @@ public class KubeflowRunBatchConfig {
return new ItemReader<>() {
private List<KubeflowRunRequest> runs = List.of();
private int index = 0;
private int apiCallCount = 0;
private int readCount = 0;
@Override
public KubeflowRunRequest read() {
// 페이지 끝나면 항상 최신 10개 fetch
readCount++;
if (readCount > PAGE_SIZE * 2) {
log.info("Reader - 최대 {}건 읽어서 강제 종료", PAGE_SIZE * 2);
return null; // Step 종료
}
if (index >= runs.size()) {
log.info("Reader - API 호출 시작 (지금까지 read 횟수: {})", readCount);
WebClient client = webClientBuilder().build();
KubeflowRunResponse response = client.get()
.uri(uriBuilder -> uriBuilder
.path("/apis/v2beta1/runs")
.queryParam("page_size", PAGE_SIZE)
.queryParam("sort_by", "created_at desc")
.queryParam("sort_by", SORT_BY)
.build())
.retrieve()
.bodyToMono(KubeflowRunResponse.class)
.block();
apiCallCount++;
runs = response != null ? response.getRuns() : List.of();
index = 0;
// 데이터가 없으면 Step 종료
log.info("Reader - API 호출 완료 (총 호출 횟수: {}, 가져온 데이터 수: {})", apiCallCount, runs.size());
if (runs.isEmpty()) {
log.info("Reader - 더 이상 가져올 데이터 없음 → Step 종료");
return null;
}
}
return runs.get(index++);
KubeflowRunRequest item = runs.get(index++);
log.debug("Reader - {}번째 아이템 반환: {}", readCount, item.getRun_id());
return item;
}
};
}
@Bean
public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() {
Cache runIdCache = cacheManager.getCache("runIdCache");
return dto -> {
// runId 기준 중복 체크
log.debug("Processor - runId: {}", dto.getRun_id());
if (runIdCache.get(dto.getRun_id()) != null) {
log.debug("Processor - 이미 캐시에 존재 (skip)");
return null;
}
if (kubeflowRunRepository.existsByRunId(dto.getRun_id())) {
return null; // 중복이면 skip
log.debug("Processor - DB에 이미 존재 (skip)");
runIdCache.put(dto.getRun_id(), true);
return null;
}
KubeflowRunEntity entity = new KubeflowRunEntity();
@ -122,12 +148,19 @@ public class KubeflowRunBatchConfig {
entity.setScheduledAt(Instant.parse(dto.getScheduled_at()));
entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
entity.setState(dto.getState());
runIdCache.put(dto.getRun_id(), true);
return entity;
};
}
@Bean
public ItemWriter<KubeflowRunEntity> runWriter() {
return items -> kubeflowRunRepository.saveAll(items);
return items -> {
log.info("Writer - {}개 아이템 저장 시작", items.size());
kubeflowRunRepository.saveAll(items);
log.info("Writer - 저장 완료");
};
}
}

@ -0,0 +1,27 @@
package kr.re.etri.autoflow.common;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.TimeUnit;
@Configuration
public class CacheConfig {
@Bean
public Caffeine<Object, Object> caffeineConfig() {
return Caffeine.newBuilder()
.maximumSize(10_000) // 최대 1만 개
.expireAfterWrite(1, TimeUnit.HOURS); // 1시간 후 만료
}
@Bean
public CacheManager cacheManager(Caffeine<Object, Object> caffeine) {
CaffeineCacheManager cacheManager = new CaffeineCacheManager("runIdCache");
cacheManager.setCaffeine(caffeine);
return cacheManager;
}
}
Loading…
Cancel
Save