[ADD] MinIO 설정 추가 및 엔드포인트, 파일 업로드/다운로드, 삭제, URL 생성 로직 개선

main
bjkim 4 weeks ago
parent 05f7dc9016
commit f637cfafda

@ -4,27 +4,21 @@ import kr.re.etri.autoflow.payload.request.CreateRunRequest;
import kr.re.etri.autoflow.payload.request.RunCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.core.io.InputStreamResource;
import org.springframework.http.*;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@ -36,117 +30,289 @@ public class PipelineUploadService {
private final RestTemplate restTemplate;
@Value("${kubeflow.url}")
private String kubeflowBaseUrl; // 예: http://192.168.10.135:32473/
private String kubeflowBaseUrl;
private final WebClient webClient;
@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* Pipeline
*/
public Map uploadPipeline(MultipartFile file,
public Map uploadPipeline(
MultipartFile file,
String name,
String displayName,
String description,
String namespace) {
String namespace
) {
try {
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("uploadfile", new MultipartInputStreamFileResource(file.getInputStream(), file.getOriginalFilename()));
log.info("""
===== Pipeline Upload Start =====
filename={}
name={}
displayName={}
description={}
namespace={}
kubeflowBaseUrl={}
""",
file.getOriginalFilename(),
name,
displayName,
description,
namespace,
kubeflowBaseUrl
);
MultiValueMap<String, Object> body =
new LinkedMultiValueMap<>();
body.add(
"uploadfile",
new MultipartInputStreamFileResource(
file.getInputStream(),
file.getOriginalFilename()
)
);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
HttpEntity<MultiValueMap<String, Object>> requestEntity =
new HttpEntity<>(body, headers);
String normalizedBaseUrl = normalizeBaseUrl(kubeflowBaseUrl);
UriComponentsBuilder builder = UriComponentsBuilder.fromUriString(kubeflowBaseUrl + "apis/v2beta1/pipelines/upload");
URI uri = UriComponentsBuilder
.fromHttpUrl(normalizedBaseUrl)
.path("/apis/v2beta1/pipelines/upload")
.queryParamIfPresent("name",
optional(name))
.queryParamIfPresent("display_name",
optional(displayName))
.queryParamIfPresent("description",
optional(description))
.queryParamIfPresent("namespace",
optional(namespace))
.build(true)
.toUri();
if (name != null && !name.isBlank()) builder.queryParam("name", name);
if (displayName != null && !displayName.isBlank()) builder.queryParam("display_name", displayName);
if (description != null && !description.isBlank()) builder.queryParam("description", description);
if (namespace != null && !namespace.isBlank()) builder.queryParam("namespace", namespace);
log.info("""
===== Final Upload URI =====
uri={}
""",
uri
);
ResponseEntity<Map> response =
restTemplate.postForEntity(
uri,
requestEntity,
Map.class
);
log.info("""
===== Pipeline Upload Success =====
status={}
body={}
""",
response.getStatusCode(),
response.getBody()
);
ResponseEntity<Map> response = restTemplate.postForEntity(builder.toUriString(), requestEntity, Map.class);
return response.getBody();
} catch (IOException e) {
throw new RuntimeException("Pipeline upload failed", e);
log.error("""
===== Pipeline Upload Failed =====
filename={}
kubeflowBaseUrl={}
""",
file.getOriginalFilename(),
kubeflowBaseUrl,
e
);
throw new RuntimeException(
"Pipeline upload failed",
e
);
}
}
/**
* Run
* runRequest display_name, pipeline_version_reference, runtime_config
*/
public Map<String, Object> createRun(CreateRunRequest runRequest) {
// Run 생성 결과를 동기적으로 받아 반환
public Map<String, Object> createRun(
CreateRunRequest runRequest
) {
String normalizedBaseUrl =
normalizeBaseUrl(kubeflowBaseUrl);
String uri = normalizedBaseUrl +
"/apis/v2beta1/runs";
log.info("Create Run URI={}", uri);
Map result = webClient.post()
.uri(kubeflowBaseUrl + "/apis/v2beta1/runs")
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(runRequest)
.retrieve()
.bodyToMono(Map.class)
.block(); // 반환은 Map이므로 여기서는 block 유지
.block();
// 이벤트 발행만 비동기로 처리
if (result != null && result.get("run_id") != null) {
String runId = (String) result.get("run_id");
// 이벤트 발행을 비동기 스레드에서 실행
CompletableFuture.runAsync(() -> eventPublisher.publishEvent(new RunCreatedEvent(runId)));
if (result != null &&
result.get("run_id") != null) {
String runId =
(String) result.get("run_id");
CompletableFuture.runAsync(() ->
eventPublisher.publishEvent(
new RunCreatedEvent(runId)
)
);
}
return result;
}
/**
* Experiments
*/
public Map listExperiments(String namespace, int pageSize, String pageToken) {
public Map listExperiments(
String namespace,
int pageSize,
String pageToken
) {
try {
UriComponentsBuilder builder = UriComponentsBuilder
.fromHttpUrl(kubeflowBaseUrl + "/apis/v2beta1/experiments");
if (namespace != null && !namespace.isBlank()) {
builder.queryParam("namespace", namespace);
}
if (pageSize > 0) {
builder.queryParam("page_size", pageSize);
}
if (pageToken != null && !pageToken.isBlank()) {
builder.queryParam("page_token", pageToken);
}
String normalizedBaseUrl =
normalizeBaseUrl(kubeflowBaseUrl);
URI uri = UriComponentsBuilder
.fromHttpUrl(normalizedBaseUrl)
.path("/apis/v2beta1/experiments")
.queryParamIfPresent(
"namespace",
optional(namespace)
)
.queryParamIfPresent(
"page_token",
optional(pageToken)
)
.queryParam(
"page_size",
pageSize
)
.build(true)
.toUri();
log.info("List Experiments URI={}", uri);
return webClient.get()
.uri(builder.toUriString())
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Map.class)
.block();
} catch (Exception e) {
throw new RuntimeException("Kubeflow Experiments 조회 실패", e);
log.error("Experiment list failed", e);
throw new RuntimeException(
"Kubeflow Experiments 조회 실패",
e
);
}
}
/**
* Experiment
*/
public Map getExperimentById(String experimentId) {
public Map getExperimentById(
String experimentId
) {
try {
String url = kubeflowBaseUrl + "/apis/v2beta1/experiments/" + experimentId;
String normalizedBaseUrl =
normalizeBaseUrl(kubeflowBaseUrl);
URI uri = UriComponentsBuilder
.fromHttpUrl(normalizedBaseUrl)
.path("/apis/v2beta1/experiments/{id}")
.buildAndExpand(experimentId)
.toUri();
log.info("Get Experiment URI={}", uri);
return webClient.get()
.uri(url)
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(Map.class)
.block();
} catch (Exception e) {
throw new RuntimeException("Kubeflow experiment 조회 실패: " + experimentId, e);
log.error(
"Experiment 조회 실패. experimentId={}",
experimentId,
e
);
throw new RuntimeException(
"Kubeflow experiment 조회 실패: "
+ experimentId,
e
);
}
}
private String normalizeBaseUrl(
String baseUrl
) {
if (baseUrl == null ||
baseUrl.isBlank()) {
throw new IllegalArgumentException(
"kubeflow.url is empty"
);
}
baseUrl = baseUrl.trim();
if (baseUrl.endsWith("/")) {
baseUrl =
baseUrl.substring(
0,
baseUrl.length() - 1
);
}
return baseUrl;
}
private java.util.Optional<String> optional(
String value
) {
if (value == null ||
value.isBlank()) {
return java.util.Optional.empty();
}
return java.util.Optional.of(value);
}
}

@ -5,16 +5,20 @@ import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.RemoveObjectArgs;
import kr.re.etri.autoflow.common.MinioType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.web.util.UriComponentsBuilder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
@Slf4j
@Service("storageProvider")
@ConditionalOnProperty(name = "storage.provider", havingValue = "minio", matchIfMissing = true)
public class MinioStorageProvider implements StorageProvider {
@ -32,27 +36,78 @@ public class MinioStorageProvider implements StorageProvider {
}
private MinioClient getClientByType(String type) {
if (type == null) return defaultMinioClient;
if (type == null) {
return defaultMinioClient;
}
MinioType config = MinioType.of(type);
String endpoint = normalizeEndpoint(config.getEndpoint());
log.info("Creating MinIO client. type={}, endpoint={}", type, endpoint);
return MinioClient.builder()
.endpoint(config.getEndpoint())
.endpoint(endpoint)
.credentials(config.getAccessKey(), config.getSecretKey())
.build();
}
private String getBucketByType(String bucketName, String type) {
if (bucketName != null && !bucketName.isEmpty()) return bucketName;
if (bucketName != null && !bucketName.isEmpty()) {
return bucketName;
}
if (type != null) {
return MinioType.of(type).getBucket();
}
return defaultBucket;
}
private String normalizeEndpoint(String endpoint) {
if (endpoint == null || endpoint.isBlank()) {
throw new IllegalArgumentException("MinIO endpoint is empty");
}
endpoint = endpoint.trim();
if (!endpoint.startsWith("http://") &&
!endpoint.startsWith("https://")) {
endpoint = "http://" + endpoint;
}
return endpoint.replaceAll("/+$", "");
}
@Override
public void uploadFile(String bucketName, String objectName, InputStream is, String contentType, long size, String type) throws Exception {
public void uploadFile(
String bucketName,
String objectName,
InputStream is,
String contentType,
long size,
String type
) throws Exception {
MinioClient client = getClientByType(type);
String targetBucket = getBucketByType(bucketName, type);
log.info("""
Uploading file to MinIO
bucket={}
objectName={}
contentType={}
size={}
type={}
""",
targetBucket,
objectName,
contentType,
size,
type
);
client.putObject(
PutObjectArgs.builder()
.bucket(targetBucket)
@ -61,18 +116,37 @@ public class MinioStorageProvider implements StorageProvider {
.contentType(contentType)
.build()
);
log.info("MinIO upload completed. bucket={}, object={}",
targetBucket,
objectName);
}
@Override
public void uploadFileToDefault(String objectName, InputStream is, String contentType, long size) throws Exception {
public void uploadFileToDefault(
String objectName,
InputStream is,
String contentType,
long size
) throws Exception {
uploadFile(null, objectName, is, contentType, size, null);
}
@Override
public byte[] downloadFile(String bucketName, String objectName, String type) throws Exception {
public byte[] downloadFile(
String bucketName,
String objectName,
String type
) throws Exception {
MinioClient client = getClientByType(type);
String targetBucket = getBucketByType(bucketName, type);
log.info("Downloading file from MinIO. bucket={}, object={}",
targetBucket,
objectName);
try (InputStream is = client.getObject(
GetObjectArgs.builder()
.bucket(targetBucket)
@ -89,37 +163,75 @@ public class MinioStorageProvider implements StorageProvider {
}
@Override
public File downloadFileToServer(String bucketName, String objectName, String localPath, String type) throws Exception {
public File downloadFileToServer(
String bucketName,
String objectName,
String localPath,
String type
) throws Exception {
MinioClient client = getClientByType(type);
String targetBucket = getBucketByType(bucketName, type);
String cleanObjectName = objectName.replaceFirst("^mlflow-artifacts:/", "")
.replaceAll("^/+", "").replaceAll("/+$", "");
String cleanObjectName = objectName
.replaceFirst("^mlflow-artifacts:/", "")
.replaceAll("^/+", "")
.replaceAll("/+$", "");
String fileName = Paths.get(cleanObjectName)
.getFileName()
.toString();
String fileName = Paths.get(cleanObjectName).getFileName().toString();
File localDir = new File(localPath);
if (!localDir.exists()) localDir.mkdirs();
if (!localDir.exists()) {
localDir.mkdirs();
}
File localFile = new File(localDir, fileName);
try (InputStream is = client.getObject(
log.info("""
Downloading file to server
bucket={}
object={}
localFile={}
""",
targetBucket,
cleanObjectName,
localFile.getAbsolutePath()
);
try (
InputStream is = client.getObject(
GetObjectArgs.builder()
.bucket(targetBucket)
.object(cleanObjectName)
.build()
);
FileOutputStream fos = new FileOutputStream(localFile)) {
FileOutputStream fos = new FileOutputStream(localFile)
) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
}
}
return localFile;
}
@Override
public String readYamlText(String bucketName, String objectName, String type) throws Exception {
public String readYamlText(
String bucketName,
String objectName,
String type
) throws Exception {
byte[] bytes = downloadFile(bucketName, objectName, type);
return new String(bytes, StandardCharsets.UTF_8);
}
@ -129,10 +241,19 @@ public class MinioStorageProvider implements StorageProvider {
}
@Override
public void deleteFile(String bucketName, String objectName, String type) throws Exception {
public void deleteFile(
String bucketName,
String objectName,
String type
) throws Exception {
MinioClient client = getClientByType(type);
String targetBucket = getBucketByType(bucketName, type);
log.info("Deleting MinIO object. bucket={}, object={}",
targetBucket,
objectName);
client.removeObject(
RemoveObjectArgs.builder()
.bucket(targetBucket)
@ -147,9 +268,67 @@ public class MinioStorageProvider implements StorageProvider {
}
@Override
public String getFileUrl(String bucketName, String objectName, String type) {
public String getFileUrl(
String bucketName,
String objectName,
String type
) {
try {
String targetBucket = getBucketByType(bucketName, type);
String endpoint = (type != null) ? MinioType.of(type).getEndpoint() : minioEndpoint;
return String.format("%s/%s/%s", endpoint, targetBucket, objectName);
String endpoint = (type != null)
? MinioType.of(type).getEndpoint()
: minioEndpoint;
endpoint = normalizeEndpoint(endpoint);
log.info("""
Generating file URL
endpoint={}
bucket={}
object={}
type={}
""",
endpoint,
targetBucket,
objectName,
type
);
URI uri = UriComponentsBuilder
.fromUriString(endpoint)
.pathSegment(targetBucket)
.path(objectName.startsWith("/")
? objectName
: "/" + objectName)
.build(true)
.toUri();
String finalUrl = uri.toString();
log.info("Generated file URL={}", finalUrl);
return finalUrl;
} catch (Exception e) {
log.error("""
Failed to generate MinIO file URL
bucketName={}
objectName={}
type={}
minioEndpoint={}
""",
bucketName,
objectName,
type,
minioEndpoint,
e
);
throw e;
}
}
}

@ -37,3 +37,9 @@ spring.servlet.multipart.max-request-size=500MB
server.forward-headers-strategy=native
kubeflow.url=http://ml-pipeline-ui.kubeflow.svc.cluster.local:80
# MinIO Configuration for K3s
minio.endpoint=http://minio.minio.svc.cluster.local:9000
minio.access-key=HpaY4yx33VhsIE18nh1b
minio.secret-key=SDuToOgDZdSKR032j895mHZyDOqQaB88Wpg9RjMk
minio.bucket=mlpipeline

Loading…
Cancel
Save