[ADD] Kubeflow 실행 동기화를 위한 Spring Batch 설정, 레포지토리, 엔티티 및 DTO 추가

main
bjkim 9 months ago
parent 0beef25bd5
commit 41598ca440

@ -31,6 +31,12 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-security")
implementation("org.springframework.boot:spring-boot-starter-validation")
// https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core
implementation("org.springframework.batch:spring-batch-core:5.2.3")
// implementation("org.springframework.boot:spring-boot-starter-batch:5.2.0")
testImplementation("org.springframework.batch:spring-batch-test:5.2.3")
// JWT
implementation("io.jsonwebtoken:jjwt-api:0.11.5")
implementation("org.springframework.boot:spring-boot-starter-actuator")
@ -38,7 +44,7 @@ dependencies {
runtimeOnly("io.jsonwebtoken:jjwt-jackson:0.11.5")
// OpenAPI UI
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.9")
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.13")
// MariaDB 드라이버
runtimeOnly("org.mariadb.jdbc:mariadb-java-client:3.1.4")
@ -58,7 +64,7 @@ dependencies {
testImplementation("org.springframework.security:spring-security-test")
//배포시 주석 처리 해야함(sql 디버깅용)
implementation("com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.12.0")
//implementation("com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.12.0")
implementation("io.minio:minio:8.5.17")

@ -0,0 +1,28 @@
package kr.re.etri.autoflow.batch;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
@RequiredArgsConstructor
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job runSyncJob; // ✅ Spring Batch의 Job 타입
@Scheduled(fixedRate = 60000) // 1분마다 실행
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // 중복 실행 방지
.toJobParameters();
jobLauncher.run(runSyncJob, params);
}
}

@ -0,0 +1,126 @@
package kr.re.etri.autoflow.batch;
import kr.re.etri.autoflow.entity.KubeflowRunEntity;
import kr.re.etri.autoflow.payload.request.KubeflowRunRequest;
import kr.re.etri.autoflow.payload.response.KubeflowRunResponse;
import kr.re.etri.autoflow.repository.KubeflowRunRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import java.time.Instant;
import java.util.List;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class KubeflowRunBatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final KubeflowRunRepository kubeflowRunRepository;
// 최신 데이터 몇 개만 가져올지
private static final int PAGE_SIZE = 50;
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder()
.baseUrl("http://192.168.10.135:32473")
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼
.build());
}
@Bean
public Job runSyncJob() {
return new JobBuilder("runSyncJob", jobRepository)
.start(runStep())
.build();
}
@Bean
public Step runStep() {
return new StepBuilder("runStep", jobRepository)
.<KubeflowRunRequest, KubeflowRunEntity>chunk(10, transactionManager)
.reader(runReader())
.processor(runProcessor())
.writer(runWriter())
.build();
}
@Bean
public ItemReader<KubeflowRunRequest> runReader() {
return new ItemReader<>() {
private boolean read = false;
private List<KubeflowRunRequest> runs;
private int index = 0;
@Override
public KubeflowRunRequest read() {
if (!read) {
WebClient client = webClientBuilder().build();
KubeflowRunResponse response = client.get()
.uri(uriBuilder -> uriBuilder
.path("/apis/v2beta1/runs")
.queryParam("page_size", PAGE_SIZE)
.build())
.retrieve()
.bodyToMono(KubeflowRunResponse.class)
.block();
runs = response != null ? response.getRuns() : List.of();
read = true;
}
if (runs != null && index < runs.size()) {
return runs.get(index++);
}
return null;
}
};
}
@Bean
public ItemProcessor<KubeflowRunRequest, KubeflowRunEntity> runProcessor() {
return dto -> {
// DB에서 이미 존재하는 runId 체크
if (kubeflowRunRepository.existsById(dto.getRun_id())) {
return null; // 중복이면 skip
}
KubeflowRunEntity entity = new KubeflowRunEntity();
entity.setRunId(dto.getRun_id());
entity.setExperimentId(dto.getExperiment_id());
entity.setDisplayName(dto.getDisplay_name());
entity.setStorageState(dto.getStorage_state());
entity.setDescription(dto.getDescription());
entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id());
entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id());
entity.setServiceAccount(dto.getService_account());
entity.setCreatedAt(Instant.parse(dto.getCreated_at()));
entity.setScheduledAt(Instant.parse(dto.getScheduled_at()));
entity.setFinishedAt(Instant.parse(dto.getFinished_at()));
entity.setState(dto.getState());
return entity;
};
}
@Bean
public ItemWriter<KubeflowRunEntity> runWriter() {
return items -> kubeflowRunRepository.saveAll(items);
}
}

@ -0,0 +1,34 @@
package kr.re.etri.autoflow.entity;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
// Entity (DB 저장용)
@Entity
@Table(name = "tb_runs")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class KubeflowRunEntity {
@Id
private String runId;
private String experimentId;
private String displayName;
private String storageState;
private String description;
private String pipelineId;
private String pipelineVersionId;
private String serviceAccount;
private Instant createdAt;
private Instant scheduledAt;
private Instant finishedAt;
private String state;
}

@ -0,0 +1,18 @@
package kr.re.etri.autoflow.payload.request;
import lombok.Data;
@Data
public class KubeflowRunRequest {
private String run_id;
private String experiment_id;
private String display_name;
private String storage_state;
private String description;
private PipelineVersionRef pipeline_version_reference;
private String service_account;
private String created_at;
private String scheduled_at;
private String finished_at;
private String state;
}

@ -0,0 +1,9 @@
package kr.re.etri.autoflow.payload.request;
import lombok.Data;
@Data
public class PipelineVersionRef {
private String pipeline_id;
private String pipeline_version_id;
}

@ -0,0 +1,13 @@
package kr.re.etri.autoflow.payload.response;
import kr.re.etri.autoflow.payload.request.KubeflowRunRequest;
import lombok.Data;
import java.util.List;
@Data
public class KubeflowRunResponse {
private List<KubeflowRunRequest> runs;
private int total_size;
private String next_page_token;
}

@ -0,0 +1,7 @@
package kr.re.etri.autoflow.repository;
import kr.re.etri.autoflow.entity.KubeflowRunEntity;
import org.springframework.data.jpa.repository.JpaRepository;
public interface KubeflowRunRepository extends JpaRepository<KubeflowRunEntity, String> {
}

@ -1,5 +1,5 @@
#????? ?? ??
server.port = 80
server.port = 8080
spring.profiles.active=local
@ -15,6 +15,8 @@ spring.sql.init.mode=always
spring.jpa.defer-datasource-initialization=true
#spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mariadb.sql
#spring.batch.jdbc.initialize-schema=always
# App Properties
cuuva.app.jwtCookieName=cuuva-jwt
@ -23,7 +25,7 @@ cuuva.app.jwtSecret= 275511b31c520562d69802ce4a913773102563891563a24062f44b3f312
## For test
cuuva.app.jwtExpirationMs= 900000
cuuva.app.jwtRefreshExpirationMs= 86400000
cuuva.app.jwtRefreshExpirationMs= 8640000
spring.jpa.show-sql=true

@ -35,3 +35,79 @@ INSERT INTO `tb_user_roles` VALUES
(1,7),
(2,6),
(3,5);
-- 테이블 생성 (이미 존재하면 생성 안 함)
CREATE TABLE IF NOT EXISTS BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
CONSTRAINT JOB_INST_UN UNIQUE (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL,
END_TIME DATETIME(6) DEFAULT NULL,
STATUS VARCHAR(10),
EXIT_CODE VARCHAR(2500),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED DATETIME(6),
CONSTRAINT JOB_INST_EXEC_FK FOREIGN KEY (JOB_INSTANCE_ID)
REFERENCES BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL,
PARAMETER_NAME VARCHAR(100) NOT NULL,
PARAMETER_TYPE VARCHAR(100) NOT NULL,
PARAMETER_VALUE VARCHAR(2500),
IDENTIFYING CHAR(1) NOT NULL,
CONSTRAINT JOB_EXEC_PARAMS_FK FOREIGN KEY (JOB_EXECUTION_ID)
REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL,
END_TIME DATETIME(6) DEFAULT NULL,
STATUS VARCHAR(10),
COMMIT_COUNT BIGINT,
READ_COUNT BIGINT,
FILTER_COUNT BIGINT,
WRITE_COUNT BIGINT,
READ_SKIP_COUNT BIGINT,
WRITE_SKIP_COUNT BIGINT,
PROCESS_SKIP_COUNT BIGINT,
ROLLBACK_COUNT BIGINT,
EXIT_CODE VARCHAR(2500),
EXIT_MESSAGE VARCHAR(2500),
LAST_UPDATED DATETIME(6),
CONSTRAINT JOB_EXEC_STEP_FK FOREIGN KEY (JOB_EXECUTION_ID)
REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT,
CONSTRAINT STEP_EXEC_CTX_FK FOREIGN KEY (STEP_EXECUTION_ID)
REFERENCES BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT,
CONSTRAINT JOB_EXEC_CTX_FK FOREIGN KEY (JOB_EXECUTION_ID)
REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;

Loading…
Cancel
Save