From 41598ca440f19f9d3c77d31fb67d6cd487a86fae Mon Sep 17 00:00:00 2001 From: bjkim Date: Wed, 24 Sep 2025 10:58:21 +0900 Subject: [PATCH] =?UTF-8?q?[ADD]=20Kubeflow=20=EC=8B=A4=ED=96=89=20?= =?UTF-8?q?=EB=8F=99=EA=B8=B0=ED=99=94=EB=A5=BC=20=EC=9C=84=ED=95=9C=20Spr?= =?UTF-8?q?ing=20Batch=20=EC=84=A4=EC=A0=95,=20=EB=A0=88=ED=8F=AC=EC=A7=80?= =?UTF-8?q?=ED=86=A0=EB=A6=AC,=20=EC=97=94=ED=8B=B0=ED=8B=B0=20=EB=B0=8F?= =?UTF-8?q?=20DTO=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle.kts | 10 +- .../etri/autoflow/batch/BatchScheduler.java | 28 ++++ .../batch/KubeflowRunBatchConfig.java | 126 ++++++++++++++++++ .../autoflow/entity/KubeflowRunEntity.java | 34 +++++ .../payload/request/KubeflowRunRequest.java | 18 +++ .../payload/request/PipelineVersionRef.java | 9 ++ .../payload/response/KubeflowRunResponse.java | 13 ++ .../repository/KubeflowRunRepository.java | 7 + src/main/resources/application.properties | 6 +- src/main/resources/data.sql | 76 +++++++++++ 10 files changed, 323 insertions(+), 4 deletions(-) create mode 100644 src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java create mode 100644 src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java create mode 100644 src/main/java/kr/re/etri/autoflow/entity/KubeflowRunEntity.java create mode 100644 src/main/java/kr/re/etri/autoflow/payload/request/KubeflowRunRequest.java create mode 100644 src/main/java/kr/re/etri/autoflow/payload/request/PipelineVersionRef.java create mode 100644 src/main/java/kr/re/etri/autoflow/payload/response/KubeflowRunResponse.java create mode 100644 src/main/java/kr/re/etri/autoflow/repository/KubeflowRunRepository.java diff --git a/build.gradle.kts b/build.gradle.kts index 20a4644..bfcf2a7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -31,6 +31,12 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-security") implementation("org.springframework.boot:spring-boot-starter-validation") + // https://mvnrepository.com/artifact/org.springframework.batch/spring-batch-core + implementation("org.springframework.batch:spring-batch-core:5.2.3") +// implementation("org.springframework.boot:spring-boot-starter-batch:5.2.0") + testImplementation("org.springframework.batch:spring-batch-test:5.2.3") + + // JWT implementation("io.jsonwebtoken:jjwt-api:0.11.5") implementation("org.springframework.boot:spring-boot-starter-actuator") @@ -38,7 +44,7 @@ dependencies { runtimeOnly("io.jsonwebtoken:jjwt-jackson:0.11.5") // OpenAPI UI - implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.9") + implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.13") // MariaDB 드라이버 runtimeOnly("org.mariadb.jdbc:mariadb-java-client:3.1.4") @@ -58,7 +64,7 @@ dependencies { testImplementation("org.springframework.security:spring-security-test") //배포시 주석 처리 해야함(sql 디버깅용) - implementation("com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.12.0") + //implementation("com.github.gavlyukovskiy:p6spy-spring-boot-starter:1.12.0") implementation("io.minio:minio:8.5.17") diff --git a/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java b/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java new file mode 100644 index 0000000..329ecd1 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/batch/BatchScheduler.java @@ -0,0 +1,28 @@ +package kr.re.etri.autoflow.batch; + +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.Scheduled; + +@Configuration +@EnableScheduling +@RequiredArgsConstructor +public class BatchScheduler { + + private final JobLauncher jobLauncher; + private final Job runSyncJob; // ✅ Spring Batch의 Job 타입 + + @Scheduled(fixedRate = 60000) // 1분마다 실행 + public void runJob() throws Exception { + JobParameters params = new JobParametersBuilder() + .addLong("timestamp", System.currentTimeMillis()) // 중복 실행 방지 + .toJobParameters(); + + jobLauncher.run(runSyncJob, params); + } +} diff --git a/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java new file mode 100644 index 0000000..915ab96 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/batch/KubeflowRunBatchConfig.java @@ -0,0 +1,126 @@ +package kr.re.etri.autoflow.batch; + +import kr.re.etri.autoflow.entity.KubeflowRunEntity; +import kr.re.etri.autoflow.payload.request.KubeflowRunRequest; +import kr.re.etri.autoflow.payload.response.KubeflowRunResponse; +import kr.re.etri.autoflow.repository.KubeflowRunRepository; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.reactive.function.client.ExchangeStrategies; +import org.springframework.web.reactive.function.client.WebClient; + +import java.time.Instant; +import java.util.List; + +@Configuration +@EnableBatchProcessing +@RequiredArgsConstructor +public class KubeflowRunBatchConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final KubeflowRunRepository kubeflowRunRepository; + + // 최신 데이터 몇 개만 가져올지 + private static final int PAGE_SIZE = 50; + + @Bean + public WebClient.Builder webClientBuilder() { + return WebClient.builder() + .baseUrl("http://192.168.10.135:32473") + .exchangeStrategies(ExchangeStrategies.builder() + .codecs(configurer -> configurer.defaultCodecs() + .maxInMemorySize(16 * 1024 * 1024)) // 16MB 버퍼 + .build()); + } + + @Bean + public Job runSyncJob() { + return new JobBuilder("runSyncJob", jobRepository) + .start(runStep()) + .build(); + } + + @Bean + public Step runStep() { + return new StepBuilder("runStep", jobRepository) + .chunk(10, transactionManager) + .reader(runReader()) + .processor(runProcessor()) + .writer(runWriter()) + .build(); + } + + @Bean + public ItemReader runReader() { + return new ItemReader<>() { + private boolean read = false; + private List runs; + private int index = 0; + + @Override + public KubeflowRunRequest read() { + if (!read) { + WebClient client = webClientBuilder().build(); + KubeflowRunResponse response = client.get() + .uri(uriBuilder -> uriBuilder + .path("/apis/v2beta1/runs") + .queryParam("page_size", PAGE_SIZE) + .build()) + .retrieve() + .bodyToMono(KubeflowRunResponse.class) + .block(); + + runs = response != null ? response.getRuns() : List.of(); + read = true; + } + + if (runs != null && index < runs.size()) { + return runs.get(index++); + } + return null; + } + }; + } + + @Bean + public ItemProcessor runProcessor() { + return dto -> { + // DB에서 이미 존재하는 runId 체크 + if (kubeflowRunRepository.existsById(dto.getRun_id())) { + return null; // 중복이면 skip + } + + KubeflowRunEntity entity = new KubeflowRunEntity(); + entity.setRunId(dto.getRun_id()); + entity.setExperimentId(dto.getExperiment_id()); + entity.setDisplayName(dto.getDisplay_name()); + entity.setStorageState(dto.getStorage_state()); + entity.setDescription(dto.getDescription()); + entity.setPipelineId(dto.getPipeline_version_reference().getPipeline_id()); + entity.setPipelineVersionId(dto.getPipeline_version_reference().getPipeline_version_id()); + entity.setServiceAccount(dto.getService_account()); + entity.setCreatedAt(Instant.parse(dto.getCreated_at())); + entity.setScheduledAt(Instant.parse(dto.getScheduled_at())); + entity.setFinishedAt(Instant.parse(dto.getFinished_at())); + entity.setState(dto.getState()); + return entity; + }; + } + + @Bean + public ItemWriter runWriter() { + return items -> kubeflowRunRepository.saveAll(items); + } +} diff --git a/src/main/java/kr/re/etri/autoflow/entity/KubeflowRunEntity.java b/src/main/java/kr/re/etri/autoflow/entity/KubeflowRunEntity.java new file mode 100644 index 0000000..eb5979e --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/entity/KubeflowRunEntity.java @@ -0,0 +1,34 @@ +package kr.re.etri.autoflow.entity; + +import jakarta.persistence.Entity; +import jakarta.persistence.Id; +import jakarta.persistence.Table; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +// Entity (DB 저장용) +@Entity +@Table(name = "tb_runs") +@Data +@NoArgsConstructor +@AllArgsConstructor +public class KubeflowRunEntity { + @Id + private String runId; + + private String experimentId; + private String displayName; + private String storageState; + private String description; + private String pipelineId; + private String pipelineVersionId; + private String serviceAccount; + + private Instant createdAt; + private Instant scheduledAt; + private Instant finishedAt; + private String state; +} diff --git a/src/main/java/kr/re/etri/autoflow/payload/request/KubeflowRunRequest.java b/src/main/java/kr/re/etri/autoflow/payload/request/KubeflowRunRequest.java new file mode 100644 index 0000000..33a0b67 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/payload/request/KubeflowRunRequest.java @@ -0,0 +1,18 @@ +package kr.re.etri.autoflow.payload.request; + +import lombok.Data; + +@Data +public class KubeflowRunRequest { + private String run_id; + private String experiment_id; + private String display_name; + private String storage_state; + private String description; + private PipelineVersionRef pipeline_version_reference; + private String service_account; + private String created_at; + private String scheduled_at; + private String finished_at; + private String state; +} \ No newline at end of file diff --git a/src/main/java/kr/re/etri/autoflow/payload/request/PipelineVersionRef.java b/src/main/java/kr/re/etri/autoflow/payload/request/PipelineVersionRef.java new file mode 100644 index 0000000..dd4af76 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/payload/request/PipelineVersionRef.java @@ -0,0 +1,9 @@ +package kr.re.etri.autoflow.payload.request; + +import lombok.Data; + +@Data +public class PipelineVersionRef { + private String pipeline_id; + private String pipeline_version_id; +} \ No newline at end of file diff --git a/src/main/java/kr/re/etri/autoflow/payload/response/KubeflowRunResponse.java b/src/main/java/kr/re/etri/autoflow/payload/response/KubeflowRunResponse.java new file mode 100644 index 0000000..13513a4 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/payload/response/KubeflowRunResponse.java @@ -0,0 +1,13 @@ +package kr.re.etri.autoflow.payload.response; + +import kr.re.etri.autoflow.payload.request.KubeflowRunRequest; +import lombok.Data; + +import java.util.List; + +@Data +public class KubeflowRunResponse { + private List runs; + private int total_size; + private String next_page_token; +} \ No newline at end of file diff --git a/src/main/java/kr/re/etri/autoflow/repository/KubeflowRunRepository.java b/src/main/java/kr/re/etri/autoflow/repository/KubeflowRunRepository.java new file mode 100644 index 0000000..f9c9072 --- /dev/null +++ b/src/main/java/kr/re/etri/autoflow/repository/KubeflowRunRepository.java @@ -0,0 +1,7 @@ +package kr.re.etri.autoflow.repository; + +import kr.re.etri.autoflow.entity.KubeflowRunEntity; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface KubeflowRunRepository extends JpaRepository { +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 4cdde91..3619011 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,5 +1,5 @@ #????? ?? ?? -server.port = 80 +server.port = 8080 spring.profiles.active=local @@ -15,6 +15,8 @@ spring.sql.init.mode=always spring.jpa.defer-datasource-initialization=true +#spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mariadb.sql +#spring.batch.jdbc.initialize-schema=always # App Properties cuuva.app.jwtCookieName=cuuva-jwt @@ -23,7 +25,7 @@ cuuva.app.jwtSecret= 275511b31c520562d69802ce4a913773102563891563a24062f44b3f312 ## For test cuuva.app.jwtExpirationMs= 900000 -cuuva.app.jwtRefreshExpirationMs= 86400000 +cuuva.app.jwtRefreshExpirationMs= 8640000 spring.jpa.show-sql=true diff --git a/src/main/resources/data.sql b/src/main/resources/data.sql index 79d7cb6..4b7f74b 100644 --- a/src/main/resources/data.sql +++ b/src/main/resources/data.sql @@ -35,3 +35,79 @@ INSERT INTO `tb_user_roles` VALUES (1,7), (2,6), (3,5); + + +-- 테이블 생성 (이미 존재하면 생성 안 함) +CREATE TABLE IF NOT EXISTS BATCH_JOB_INSTANCE ( + JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT, + JOB_NAME VARCHAR(100) NOT NULL, + JOB_KEY VARCHAR(32) NOT NULL, + CONSTRAINT JOB_INST_UN UNIQUE (JOB_NAME, JOB_KEY) +) ENGINE=InnoDB; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION ( + JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT, + JOB_INSTANCE_ID BIGINT NOT NULL, + CREATE_TIME DATETIME(6) NOT NULL, + START_TIME DATETIME(6) DEFAULT NULL, + END_TIME DATETIME(6) DEFAULT NULL, + STATUS VARCHAR(10), + EXIT_CODE VARCHAR(2500), + EXIT_MESSAGE VARCHAR(2500), + LAST_UPDATED DATETIME(6), + CONSTRAINT JOB_INST_EXEC_FK FOREIGN KEY (JOB_INSTANCE_ID) + REFERENCES BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) +) ENGINE=InnoDB; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_PARAMS ( + JOB_EXECUTION_ID BIGINT NOT NULL, + PARAMETER_NAME VARCHAR(100) NOT NULL, + PARAMETER_TYPE VARCHAR(100) NOT NULL, + PARAMETER_VALUE VARCHAR(2500), + IDENTIFYING CHAR(1) NOT NULL, + CONSTRAINT JOB_EXEC_PARAMS_FK FOREIGN KEY (JOB_EXECUTION_ID) + REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ENGINE=InnoDB; + +CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION ( + STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + VERSION BIGINT NOT NULL, + STEP_NAME VARCHAR(100) NOT NULL, + JOB_EXECUTION_ID BIGINT NOT NULL, + CREATE_TIME DATETIME(6) NOT NULL, + START_TIME DATETIME(6) DEFAULT NULL, + END_TIME DATETIME(6) DEFAULT NULL, + STATUS VARCHAR(10), + COMMIT_COUNT BIGINT, + READ_COUNT BIGINT, + FILTER_COUNT BIGINT, + WRITE_COUNT BIGINT, + READ_SKIP_COUNT BIGINT, + WRITE_SKIP_COUNT BIGINT, + PROCESS_SKIP_COUNT BIGINT, + ROLLBACK_COUNT BIGINT, + EXIT_CODE VARCHAR(2500), + EXIT_MESSAGE VARCHAR(2500), + LAST_UPDATED DATETIME(6), + CONSTRAINT JOB_EXEC_STEP_FK FOREIGN KEY (JOB_EXECUTION_ID) + REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ENGINE=InnoDB; + +CREATE TABLE IF NOT EXISTS BATCH_STEP_EXECUTION_CONTEXT ( + STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + SHORT_CONTEXT VARCHAR(2500) NOT NULL, + SERIALIZED_CONTEXT TEXT, + CONSTRAINT STEP_EXEC_CTX_FK FOREIGN KEY (STEP_EXECUTION_ID) + REFERENCES BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) +) ENGINE=InnoDB; + +CREATE TABLE IF NOT EXISTS BATCH_JOB_EXECUTION_CONTEXT ( + JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, + SHORT_CONTEXT VARCHAR(2500) NOT NULL, + SERIALIZED_CONTEXT TEXT, + CONSTRAINT JOB_EXEC_CTX_FK FOREIGN KEY (JOB_EXECUTION_ID) + REFERENCES BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) +) ENGINE=InnoDB; +