From f637cfafdadd1002a81fe9c089dfb460e5f56a54 Mon Sep 17 00:00:00 2001 From: bjkim Date: Tue, 19 May 2026 18:27:59 +0900 Subject: [PATCH] =?UTF-8?q?[ADD]=20MinIO=20=EC=84=A4=EC=A0=95=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20=EB=B0=8F=20=EC=97=94=EB=93=9C=ED=8F=AC=EC=9D=B8?= =?UTF-8?q?=ED=8A=B8,=20=ED=8C=8C=EC=9D=BC=20=EC=97=85=EB=A1=9C=EB=93=9C/?= =?UTF-8?q?=EB=8B=A4=EC=9A=B4=EB=A1=9C=EB=93=9C,=20=EC=82=AD=EC=A0=9C,=20U?= =?UTF-8?q?RL=20=EC=83=9D=EC=84=B1=20=EB=A1=9C=EC=A7=81=20=EA=B0=9C?= =?UTF-8?q?=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/PipelineUploadService.java | 280 ++++++++++++++---- .../service/storage/MinioStorageProvider.java | 227 ++++++++++++-- src/main/resources/application-aws.properties | 6 + 3 files changed, 432 insertions(+), 81 deletions(-) diff --git a/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java b/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java index cd13eb8..90d6877 100644 --- a/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java +++ b/src/main/java/kr/re/etri/autoflow/service/PipelineUploadService.java @@ -4,27 +4,21 @@ import kr.re.etri.autoflow.payload.request.CreateRunRequest; import kr.re.etri.autoflow.payload.request.RunCreatedEvent; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.batch.core.Job; -import org.springframework.batch.core.JobParametersBuilder; -import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; +import org.springframework.core.io.InputStreamResource; +import org.springframework.http.*; import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.client.RestTemplate; import org.springframework.web.multipart.MultipartFile; -import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.util.UriComponentsBuilder; -import reactor.core.publisher.Mono; import java.io.IOException; +import java.net.URI; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -36,117 +30,289 @@ public class PipelineUploadService { private final RestTemplate restTemplate; @Value("${kubeflow.url}") - private String kubeflowBaseUrl; // 예: http://192.168.10.135:32473/ + private String kubeflowBaseUrl; private final WebClient webClient; @Autowired private ApplicationEventPublisher eventPublisher; - /** * Pipeline 업로드 */ - public Map uploadPipeline(MultipartFile file, - String name, - String displayName, - String description, - String namespace) { + public Map uploadPipeline( + MultipartFile file, + String name, + String displayName, + String description, + String namespace + ) { + try { - MultiValueMap body = new LinkedMultiValueMap<>(); - body.add("uploadfile", new MultipartInputStreamFileResource(file.getInputStream(), file.getOriginalFilename())); + + log.info(""" + ===== Pipeline Upload Start ===== + filename={} + name={} + displayName={} + description={} + namespace={} + kubeflowBaseUrl={} + """, + file.getOriginalFilename(), + name, + displayName, + description, + namespace, + kubeflowBaseUrl + ); + + MultiValueMap body = + new LinkedMultiValueMap<>(); + + body.add( + "uploadfile", + new MultipartInputStreamFileResource( + file.getInputStream(), + file.getOriginalFilename() + ) + ); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.MULTIPART_FORM_DATA); - HttpEntity> requestEntity = new HttpEntity<>(body, headers); + HttpEntity> requestEntity = + new HttpEntity<>(body, headers); + + String normalizedBaseUrl = normalizeBaseUrl(kubeflowBaseUrl); - UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(kubeflowBaseUrl + "apis/v2beta1/pipelines/upload"); + URI uri = UriComponentsBuilder + .fromHttpUrl(normalizedBaseUrl) + .path("/apis/v2beta1/pipelines/upload") + .queryParamIfPresent("name", + optional(name)) + .queryParamIfPresent("display_name", + optional(displayName)) + .queryParamIfPresent("description", + optional(description)) + .queryParamIfPresent("namespace", + optional(namespace)) + .build(true) + .toUri(); - if (name != null && !name.isBlank()) builder.queryParam("name", name); - if (displayName != null && !displayName.isBlank()) builder.queryParam("display_name", displayName); - if (description != null && !description.isBlank()) builder.queryParam("description", description); - if (namespace != null && !namespace.isBlank()) builder.queryParam("namespace", namespace); + log.info(""" + ===== Final Upload URI ===== + uri={} + """, + uri + ); + + ResponseEntity response = + restTemplate.postForEntity( + uri, + requestEntity, + Map.class + ); + + log.info(""" + ===== Pipeline Upload Success ===== + status={} + body={} + """, + response.getStatusCode(), + response.getBody() + ); - ResponseEntity response = restTemplate.postForEntity(builder.toUriString(), requestEntity, Map.class); return response.getBody(); } catch (IOException e) { - throw new RuntimeException("Pipeline upload failed", e); + + log.error(""" + ===== Pipeline Upload Failed ===== + filename={} + kubeflowBaseUrl={} + """, + file.getOriginalFilename(), + kubeflowBaseUrl, + e + ); + + throw new RuntimeException( + "Pipeline upload failed", + e + ); } } /** * Run 생성 - * runRequest에는 display_name, pipeline_version_reference, runtime_config 등이 포함되어야 함 */ - public Map createRun(CreateRunRequest runRequest) { - // Run 생성 결과를 동기적으로 받아 반환 + public Map createRun( + CreateRunRequest runRequest + ) { + + String normalizedBaseUrl = + normalizeBaseUrl(kubeflowBaseUrl); + + String uri = normalizedBaseUrl + + "/apis/v2beta1/runs"; + + log.info("Create Run URI={}", uri); + Map result = webClient.post() - .uri(kubeflowBaseUrl + "/apis/v2beta1/runs") + .uri(uri) .contentType(MediaType.APPLICATION_JSON) .bodyValue(runRequest) .retrieve() .bodyToMono(Map.class) - .block(); // 반환은 Map이므로 여기서는 block 유지 + .block(); - // 이벤트 발행만 비동기로 처리 - if (result != null && result.get("run_id") != null) { - String runId = (String) result.get("run_id"); - // 이벤트 발행을 비동기 스레드에서 실행 - CompletableFuture.runAsync(() -> eventPublisher.publishEvent(new RunCreatedEvent(runId))); + if (result != null && + result.get("run_id") != null) { + + String runId = + (String) result.get("run_id"); + + CompletableFuture.runAsync(() -> + eventPublisher.publishEvent( + new RunCreatedEvent(runId) + ) + ); } return result; } - /** * Experiments 조회 */ - public Map listExperiments(String namespace, int pageSize, String pageToken) { + public Map listExperiments( + String namespace, + int pageSize, + String pageToken + ) { + try { - UriComponentsBuilder builder = UriComponentsBuilder - .fromHttpUrl(kubeflowBaseUrl + "/apis/v2beta1/experiments"); - - if (namespace != null && !namespace.isBlank()) { - builder.queryParam("namespace", namespace); - } - if (pageSize > 0) { - builder.queryParam("page_size", pageSize); - } - if (pageToken != null && !pageToken.isBlank()) { - builder.queryParam("page_token", pageToken); - } + + String normalizedBaseUrl = + normalizeBaseUrl(kubeflowBaseUrl); + + URI uri = UriComponentsBuilder + .fromHttpUrl(normalizedBaseUrl) + .path("/apis/v2beta1/experiments") + .queryParamIfPresent( + "namespace", + optional(namespace) + ) + .queryParamIfPresent( + "page_token", + optional(pageToken) + ) + .queryParam( + "page_size", + pageSize + ) + .build(true) + .toUri(); + + log.info("List Experiments URI={}", uri); return webClient.get() - .uri(builder.toUriString()) + .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Map.class) .block(); } catch (Exception e) { - throw new RuntimeException("Kubeflow Experiments 조회 실패", e); + + log.error("Experiment list failed", e); + + throw new RuntimeException( + "Kubeflow Experiments 조회 실패", + e + ); } } /** * Experiment 단건 조회 */ - public Map getExperimentById(String experimentId) { + public Map getExperimentById( + String experimentId + ) { + try { - String url = kubeflowBaseUrl + "/apis/v2beta1/experiments/" + experimentId; + + String normalizedBaseUrl = + normalizeBaseUrl(kubeflowBaseUrl); + + URI uri = UriComponentsBuilder + .fromHttpUrl(normalizedBaseUrl) + .path("/apis/v2beta1/experiments/{id}") + .buildAndExpand(experimentId) + .toUri(); + + log.info("Get Experiment URI={}", uri); return webClient.get() - .uri(url) + .uri(uri) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Map.class) .block(); } catch (Exception e) { - throw new RuntimeException("Kubeflow experiment 조회 실패: " + experimentId, e); + + log.error( + "Experiment 조회 실패. experimentId={}", + experimentId, + e + ); + + throw new RuntimeException( + "Kubeflow experiment 조회 실패: " + + experimentId, + e + ); + } + } + + private String normalizeBaseUrl( + String baseUrl + ) { + + if (baseUrl == null || + baseUrl.isBlank()) { + + throw new IllegalArgumentException( + "kubeflow.url is empty" + ); + } + + baseUrl = baseUrl.trim(); + + if (baseUrl.endsWith("/")) { + baseUrl = + baseUrl.substring( + 0, + baseUrl.length() - 1 + ); } + + return baseUrl; + } + + private java.util.Optional optional( + String value + ) { + + if (value == null || + value.isBlank()) { + + return java.util.Optional.empty(); + } + + return java.util.Optional.of(value); } -} +} \ No newline at end of file diff --git a/src/main/java/kr/re/etri/autoflow/service/storage/MinioStorageProvider.java b/src/main/java/kr/re/etri/autoflow/service/storage/MinioStorageProvider.java index fbb1637..01bbb8e 100644 --- a/src/main/java/kr/re/etri/autoflow/service/storage/MinioStorageProvider.java +++ b/src/main/java/kr/re/etri/autoflow/service/storage/MinioStorageProvider.java @@ -5,16 +5,20 @@ import io.minio.MinioClient; import io.minio.PutObjectArgs; import io.minio.RemoveObjectArgs; import kr.re.etri.autoflow.common.MinioType; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Service; +import org.springframework.web.util.UriComponentsBuilder; import java.io.File; import java.io.FileOutputStream; import java.io.InputStream; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; +@Slf4j @Service("storageProvider") @ConditionalOnProperty(name = "storage.provider", havingValue = "minio", matchIfMissing = true) public class MinioStorageProvider implements StorageProvider { @@ -32,27 +36,78 @@ public class MinioStorageProvider implements StorageProvider { } private MinioClient getClientByType(String type) { - if (type == null) return defaultMinioClient; + if (type == null) { + return defaultMinioClient; + } + MinioType config = MinioType.of(type); + + String endpoint = normalizeEndpoint(config.getEndpoint()); + + log.info("Creating MinIO client. type={}, endpoint={}", type, endpoint); + return MinioClient.builder() - .endpoint(config.getEndpoint()) + .endpoint(endpoint) .credentials(config.getAccessKey(), config.getSecretKey()) .build(); } private String getBucketByType(String bucketName, String type) { - if (bucketName != null && !bucketName.isEmpty()) return bucketName; + if (bucketName != null && !bucketName.isEmpty()) { + return bucketName; + } + if (type != null) { return MinioType.of(type).getBucket(); } + return defaultBucket; } + private String normalizeEndpoint(String endpoint) { + if (endpoint == null || endpoint.isBlank()) { + throw new IllegalArgumentException("MinIO endpoint is empty"); + } + + endpoint = endpoint.trim(); + + if (!endpoint.startsWith("http://") && + !endpoint.startsWith("https://")) { + + endpoint = "http://" + endpoint; + } + + return endpoint.replaceAll("/+$", ""); + } + @Override - public void uploadFile(String bucketName, String objectName, InputStream is, String contentType, long size, String type) throws Exception { + public void uploadFile( + String bucketName, + String objectName, + InputStream is, + String contentType, + long size, + String type + ) throws Exception { + MinioClient client = getClientByType(type); String targetBucket = getBucketByType(bucketName, type); + log.info(""" + Uploading file to MinIO + bucket={} + objectName={} + contentType={} + size={} + type={} + """, + targetBucket, + objectName, + contentType, + size, + type + ); + client.putObject( PutObjectArgs.builder() .bucket(targetBucket) @@ -61,18 +116,37 @@ public class MinioStorageProvider implements StorageProvider { .contentType(contentType) .build() ); + + log.info("MinIO upload completed. bucket={}, object={}", + targetBucket, + objectName); } @Override - public void uploadFileToDefault(String objectName, InputStream is, String contentType, long size) throws Exception { + public void uploadFileToDefault( + String objectName, + InputStream is, + String contentType, + long size + ) throws Exception { + uploadFile(null, objectName, is, contentType, size, null); } @Override - public byte[] downloadFile(String bucketName, String objectName, String type) throws Exception { + public byte[] downloadFile( + String bucketName, + String objectName, + String type + ) throws Exception { + MinioClient client = getClientByType(type); String targetBucket = getBucketByType(bucketName, type); + log.info("Downloading file from MinIO. bucket={}, object={}", + targetBucket, + objectName); + try (InputStream is = client.getObject( GetObjectArgs.builder() .bucket(targetBucket) @@ -89,37 +163,75 @@ public class MinioStorageProvider implements StorageProvider { } @Override - public File downloadFileToServer(String bucketName, String objectName, String localPath, String type) throws Exception { + public File downloadFileToServer( + String bucketName, + String objectName, + String localPath, + String type + ) throws Exception { + MinioClient client = getClientByType(type); String targetBucket = getBucketByType(bucketName, type); - String cleanObjectName = objectName.replaceFirst("^mlflow-artifacts:/", "") - .replaceAll("^/+", "").replaceAll("/+$", ""); + String cleanObjectName = objectName + .replaceFirst("^mlflow-artifacts:/", "") + .replaceAll("^/+", "") + .replaceAll("/+$", ""); + + String fileName = Paths.get(cleanObjectName) + .getFileName() + .toString(); - String fileName = Paths.get(cleanObjectName).getFileName().toString(); File localDir = new File(localPath); - if (!localDir.exists()) localDir.mkdirs(); + + if (!localDir.exists()) { + localDir.mkdirs(); + } + File localFile = new File(localDir, fileName); - try (InputStream is = client.getObject( - GetObjectArgs.builder() - .bucket(targetBucket) - .object(cleanObjectName) - .build() + log.info(""" + Downloading file to server + bucket={} + object={} + localFile={} + """, + targetBucket, + cleanObjectName, + localFile.getAbsolutePath() ); - FileOutputStream fos = new FileOutputStream(localFile)) { + + try ( + InputStream is = client.getObject( + GetObjectArgs.builder() + .bucket(targetBucket) + .object(cleanObjectName) + .build() + ); + FileOutputStream fos = new FileOutputStream(localFile) + ) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { fos.write(buffer, 0, bytesRead); } } + return localFile; } @Override - public String readYamlText(String bucketName, String objectName, String type) throws Exception { + public String readYamlText( + String bucketName, + String objectName, + String type + ) throws Exception { + byte[] bytes = downloadFile(bucketName, objectName, type); + return new String(bytes, StandardCharsets.UTF_8); } @@ -129,10 +241,19 @@ public class MinioStorageProvider implements StorageProvider { } @Override - public void deleteFile(String bucketName, String objectName, String type) throws Exception { + public void deleteFile( + String bucketName, + String objectName, + String type + ) throws Exception { + MinioClient client = getClientByType(type); String targetBucket = getBucketByType(bucketName, type); + log.info("Deleting MinIO object. bucket={}, object={}", + targetBucket, + objectName); + client.removeObject( RemoveObjectArgs.builder() .bucket(targetBucket) @@ -147,9 +268,67 @@ public class MinioStorageProvider implements StorageProvider { } @Override - public String getFileUrl(String bucketName, String objectName, String type) { - String targetBucket = getBucketByType(bucketName, type); - String endpoint = (type != null) ? MinioType.of(type).getEndpoint() : minioEndpoint; - return String.format("%s/%s/%s", endpoint, targetBucket, objectName); + public String getFileUrl( + String bucketName, + String objectName, + String type + ) { + + try { + + String targetBucket = getBucketByType(bucketName, type); + + String endpoint = (type != null) + ? MinioType.of(type).getEndpoint() + : minioEndpoint; + + endpoint = normalizeEndpoint(endpoint); + + log.info(""" + Generating file URL + endpoint={} + bucket={} + object={} + type={} + """, + endpoint, + targetBucket, + objectName, + type + ); + + URI uri = UriComponentsBuilder + .fromUriString(endpoint) + .pathSegment(targetBucket) + .path(objectName.startsWith("/") + ? objectName + : "/" + objectName) + .build(true) + .toUri(); + + String finalUrl = uri.toString(); + + log.info("Generated file URL={}", finalUrl); + + return finalUrl; + + } catch (Exception e) { + + log.error(""" + Failed to generate MinIO file URL + bucketName={} + objectName={} + type={} + minioEndpoint={} + """, + bucketName, + objectName, + type, + minioEndpoint, + e + ); + + throw e; + } } -} +} \ No newline at end of file diff --git a/src/main/resources/application-aws.properties b/src/main/resources/application-aws.properties index ed7adb2..27ff1fe 100644 --- a/src/main/resources/application-aws.properties +++ b/src/main/resources/application-aws.properties @@ -37,3 +37,9 @@ spring.servlet.multipart.max-request-size=500MB server.forward-headers-strategy=native kubeflow.url=http://ml-pipeline-ui.kubeflow.svc.cluster.local:80 + +# MinIO Configuration for K3s +minio.endpoint=http://minio.minio.svc.cluster.local:9000 +minio.access-key=HpaY4yx33VhsIE18nh1b +minio.secret-key=SDuToOgDZdSKR032j895mHZyDOqQaB88Wpg9RjMk +minio.bucket=mlpipeline