You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
autoflow-server-mgmt/src/main/java/kr/re/etri/autoflow/service/MinioAttachmentService.java

685 lines
26 KiB

package kr.re.etri.autoflow.service;
import io.minio.GetObjectArgs;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import io.minio.StatObjectArgs;
import io.minio.RemoveObjectArgs;
import jakarta.transaction.Transactional;
import kr.re.etri.autoflow.entity.MinioAttachmentEntity;
import kr.re.etri.autoflow.payload.request.BaseSearchRequest;
import kr.re.etri.autoflow.payload.request.ProjectBaseSearchRequest;
import kr.re.etri.autoflow.payload.request.ScriptMergeRequest;
import kr.re.etri.autoflow.repository.MinioAttachmentRepository;
import kr.re.etri.autoflow.specification.MinioAttachmentSpecification;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.*;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.support.ServletUriComponentsBuilder;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.io.ByteArrayOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
@Slf4j
@Service
@RequiredArgsConstructor
@Transactional
public class MinioAttachmentService {
private final MinioClient minioClient;
private final MinioAttachmentRepository minioAttachmentRepository;
private final MinioAttachmentSpecification minioAttachmentSpecification;
@Value("${minio.bucket}")
private String bucketName;
@Value("${minio.endpoint}")
private String minioEndpoint;
@Value("${minio.access-key:}")
private String minioAccessKey;
@Value("${minio.secret-key:}")
private String minioSecretKey;
/** Pod에서 접근할 MinIO 주소 (비어 있으면 클라이언트에서 직접 입력) */
@Value("${minio.endpoint.pod:}")
private String minioEndpointPod;
/** KFP py→YAML 컴파일 시 사용할 Python 실행 파일 (Windows는 python, Linux는 python3 권장) */
@Value("${kfp.compile.python-command:python3}")
private String pythonCommand;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
/**
* & DB
*/
public MinioAttachmentEntity uploadFile(MultipartFile file,
String path,
Long refId,
String refType,
String title,
String description,
Integer version,
String regUserId,
Long projectId
) throws Exception {
try (InputStream is = file.getInputStream()) {
String storedName = UUID.randomUUID() + "-" + file.getOriginalFilename();
String objectName = (path == null || path.isEmpty())
? storedName
: path + "/" + storedName;
// MinIO 업로드
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.stream(is, is.available(), -1)
.contentType(file.getContentType())
.build()
);
// DB 저장
MinioAttachmentEntity attachment = MinioAttachmentEntity.builder()
.refId(refId)
.refType(refType)
.originalName(file.getOriginalFilename())
.storedName(storedName)
.contentType(file.getContentType())
.size(file.getSize())
.storagePath(objectName)
.title(title != null ? title : file.getOriginalFilename())
.version(version)
.description(description)
.regUserId(regUserId)
.projectId(projectId)
.build();
return minioAttachmentRepository.save(attachment);
}
}
/**
*
*/
public List<MinioAttachmentEntity> findAll() {
return minioAttachmentRepository.findAll();
}
/**
* ID
*/
public Optional<MinioAttachmentEntity> findById(Long id) {
return minioAttachmentRepository.findById(id);
}
/**
* +
*/
public Page<MinioAttachmentEntity> search(ProjectBaseSearchRequest request, String refType, Integer refId) {
int pageIndex = request.getPage() > 0 ? request.getPage() - 1 : 0;
Pageable pageable = PageRequest.of(
pageIndex,
request.getSize(),
Sort.by(Sort.Direction.fromString(request.getSortDirection()), request.getSortField())
);
LocalDate startDate = parseDate(request.getStartDate());
LocalDate endDate = parseDate(request.getEndDate());
Specification<MinioAttachmentEntity> spec =
minioAttachmentSpecification.searchByConditions(
refType,
refId,
request.getSearchType(),
request.getKeyword(),
startDate,
endDate
);
if (request.getProjectId() != null) {
spec = spec.and((root, query, cb) ->
cb.equal(root.get("projectId"), request.getProjectId())
);
}
return minioAttachmentRepository.findAll(spec, pageable);
}
public List<MinioAttachmentEntity> findAllByIds(List<Long> ids) {
return minioAttachmentRepository.findAllById(ids);
}
private LocalDate parseDate(String dateStr) {
if (dateStr == null || dateStr.isBlank()) return null;
try {
return LocalDate.parse(dateStr, formatter);
} catch (DateTimeParseException e) {
throw new IllegalArgumentException("날짜 형식이 잘못되었습니다. yyyy-MM-dd 형식이어야 합니다: " + dateStr);
}
}
public byte[] downloadFile(String bucketName, String objectName) {
try (InputStream is = minioClient.getObject(
GetObjectArgs.builder().bucket(bucketName).object(objectName).build()
)) {
return is.readAllBytes();
} catch (Exception e) {
throw new RuntimeException("MinIO 파일 다운로드 실패: " + objectName, e);
}
}
// YAML 텍스트 읽기
public String readYamlText(String bucketName, String objectName) {
try (InputStream is = minioClient.getObject(
GetObjectArgs.builder().bucket(bucketName).object(objectName).build()
)) {
return new String(is.readAllBytes(), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException("MinIO YAML 읽기 실패: " + objectName, e);
}
}
public MinioAttachmentEntity updateFile(
Long id,
Long projectId, // 추가
MultipartFile file,
String path,
String title,
String description,
String regUserId
) throws Exception {
// 기존 엔티티 조회
MinioAttachmentEntity existing = minioAttachmentRepository.findById(id)
.orElseThrow(() -> new IllegalArgumentException("첨부파일을 찾을 수 없습니다. ID=" + id));
// 최신 버전 조회
Integer latestVersion = minioAttachmentRepository
.findTopByRefIdAndRefTypeOrderByVersionDesc(existing.getRefId(), existing.getRefType())
.map(MinioAttachmentEntity::getVersion)
.orElse(0);
int newVersion = latestVersion + 1;
// 새 파일 업로드
String storedName = UUID.randomUUID() + "-" + file.getOriginalFilename();
String objectName = (path == null || path.isEmpty())
? storedName
: path + "/" + storedName;
try (InputStream is = file.getInputStream()) {
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.stream(is, is.available(), -1)
.contentType(file.getContentType())
.build()
);
}
// 새로운 엔티티 생성 (이전 데이터는 그대로 두고, 새로운 버전 생성)
MinioAttachmentEntity newAttachment = MinioAttachmentEntity.builder()
.projectId(projectId) // 추가
.refId(existing.getRefId())
.refType(existing.getRefType())
.originalName(file.getOriginalFilename())
.storedName(storedName)
.contentType(file.getContentType())
.size(file.getSize())
.storagePath(objectName)
.title(title != null ? title : existing.getTitle())
.description(description != null ? description : existing.getDescription())
.version(newVersion)
.regUserId(regUserId)
.build();
return minioAttachmentRepository.save(newAttachment);
}
/**
* (DB , )
*/
public MinioAttachmentEntity create(MinioAttachmentEntity entity) {
return minioAttachmentRepository.save(entity);
}
/**
*
*/
public Optional<MinioAttachmentEntity> update(Long id, MinioAttachmentEntity dto) {
return minioAttachmentRepository.findById(id)
.map(entity -> {
BeanUtils.copyProperties(dto, entity, "id", "regDt"); // ID, 등록일 제외
return minioAttachmentRepository.save(entity);
});
}
/**
*
*/
@Transactional
public boolean delete(Long id) {
Optional<MinioAttachmentEntity> attachmentOpt = minioAttachmentRepository.findById(id);
if (attachmentOpt.isEmpty()) {
return false;
}
MinioAttachmentEntity attachment = attachmentOpt.get();
try {
// MinIO 파일 삭제
minioClient.removeObject(
RemoveObjectArgs.builder()
.bucket(bucketName)
.object(attachment.getStoragePath())
.build()
);
// DB에서 삭제
minioAttachmentRepository.deleteById(id);
return true;
} catch (Exception e) {
log.error("MinIO 파일 삭제 실패: " + attachment.getStoragePath(), e);
return false;
}
}
/**
* MinIO URL
*/
public String getFileUrl(String objectName) {
return String.format("%s/%s/%s", minioEndpoint, bucketName, objectName);
}
/** 파이프라인 Pod에서 MinIO 직접 접근 시 사용 (endpoint) */
public String getMinioEndpoint() {
return minioEndpoint;
}
/** 파이프라인 Pod에서 MinIO 직접 접근 시 사용 (bucket) */
public String getMinioBucket() {
return bucketName;
}
public String getMinioAccessKey() {
return minioAccessKey;
}
public String getMinioSecretKey() {
return minioSecretKey;
}
/** Pod에서 접근할 MinIO endpoint (설정 시 YAML에 반영) */
public String getMinioEndpointPod() {
return minioEndpointPod != null ? minioEndpointPod : "";
}
private String buildMasterScript(List<MinioAttachmentEntity> scripts, String pipelineName) {
// 파이프라인 본문(각 스텝)을 생성
StringBuilder body = new StringBuilder();
String prevOutput = "\"\""; // 첫 스텝은 input_dir = ""
for (int i = 0; i < scripts.size(); i++) {
MinioAttachmentEntity att = scripts.get(i);
String safeTitle = att.getTitle() != null ? att.getTitle() : att.getOriginalName();
safeTitle = safeTitle.replace("\"", "\\\""); // 파이썬 문자열용 이스케이프
int stepIndex = i + 1;
String stepVar = "step" + stepIndex;
String outputDir = "/workspace/outputs/step_" + stepIndex;
body.append(" ").append(stepVar).append(" = run_script_component(\n");
body.append(" minio_endpoint_pod=minio_endpoint_pod,\n");
body.append(" minio_bucket=minio_bucket,\n");
body.append(" minio_access_key=minio_access_key,\n");
body.append(" minio_secret_key=minio_secret_key,\n");
body.append(" object_name=\"").append(att.getStoragePath().replace("\"", "\\\"")).append("\",\n");
body.append(" step_name=\"step-").append(stepIndex).append("-").append(safeTitle).append("\",\n");
body.append(" input_dir=").append(prevOutput).append(",\n");
body.append(" output_dir=\"").append(outputDir).append("\",\n");
body.append(" )\n\n");
prevOutput = stepVar + ".output";
}
String endpointPod = getMinioEndpointPod();
String bucket = getMinioBucket();
String accessKey = getMinioAccessKey();
String secretKey = getMinioSecretKey();
if (endpointPod == null) {
endpointPod = "";
}
if (bucket == null) {
bucket = "mlpipeline";
}
if (accessKey == null) {
accessKey = "";
}
if (secretKey == null) {
secretKey = "";
}
// 공백/개행 제거 및 끝 슬래시 제거
endpointPod = endpointPod.trim()
.replaceFirst("^https?://", "")
.replaceAll("/+$", "");
String template = """
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from kfp import dsl
@dsl.component(base_image="python:3.10")
def run_script_component(
minio_endpoint_pod: str,
minio_bucket: str,
minio_access_key: str,
minio_secret_key: str,
object_name: str,
step_name: str,
input_dir: str = "",
output_dir: str = "/workspace/outputs",
) -> str:
import os
import subprocess
import sys
#
minio_endpoint_pod = (minio_endpoint_pod or "").strip().rstrip("/")
minio_bucket = (minio_bucket or "").strip().strip("/")
os.makedirs(output_dir, exist_ok=True)
# MinIO Python SDK
print("[MASTER] installing minio client...")
subprocess.run([sys.executable, "-m", "pip", "install", "minio"], check=True)
from minio import Minio
# MinIO
client = Minio(
minio_endpoint_pod,
access_key=minio_access_key,
secret_key=minio_secret_key,
secure=False,
)
client.fget_object(minio_bucket, object_name, "script.py")
#
cmd_parts = []
if input_dir:
cmd_parts.append(f"export INPUT_DIR={input_dir}")
cmd_parts.append(f"python script.py --input_dir \\"$INPUT_DIR\\" --output_dir \\"{output_dir}\\"")
cmd = " && ".join(cmd_parts)
print(f"[MASTER] STEP {step_name} 실행: {cmd}")
result = subprocess.run(cmd, shell=True)
if result.returncode != 0:
raise RuntimeError(f"STEP {step_name} 실패 (exit={result.returncode})")
print(f"[MASTER] STEP {step_name} 완료, output_dir={output_dir}")
return output_dir
@dsl.pipeline(name="%s")
def pipeline(
mlflow_experiment_name: str = "",
minio_endpoint_pod: str = "%s",
minio_bucket: str = "%s",
minio_access_key: str = "%s",
minio_secret_key: str = "%s",
):
%s
""";
return String.format(
template,
pipelineName.replace("\"", "\\\""),
endpointPod.replace("\\", "\\\\").replace("\"", "\\\""),
bucket.replace("\\", "\\\\").replace("\"", "\\\""),
accessKey.replace("\\", "\\\\").replace("\"", "\\\""),
secretKey.replace("\\", "\\\\").replace("\"", "\\\""),
body.toString()
);
}
public MinioAttachmentEntity createMergedMaster(ScriptMergeRequest req, List<MinioAttachmentEntity> scripts) throws Exception {
if (scripts == null || scripts.isEmpty()) {
throw new IllegalArgumentException("머지할 스크립트가 없습니다.");
}
String pipelineName = (req.getTitle() != null && !req.getTitle().isBlank())
? req.getTitle()
: "merged-training-script";
String masterPy = buildMasterScript(scripts, pipelineName);
byte[] bytes = masterPy.getBytes(StandardCharsets.UTF_8);
String originalName = (req.getTitle() != null && !req.getTitle().isBlank())
? req.getTitle() + ".py"
: "merged_pipeline.py";
String path = "scripts/merged";
Long refId = req.getRefId() != null ? req.getRefId() : 0L;
String refType = (req.getRefType() != null && !req.getRefType().isBlank())
? req.getRefType()
: "TRAINING_SCRIPT";
String title = req.getTitle() != null ? req.getTitle() : "Merged Script";
String description = req.getDescription();
Integer version = 1;
String regUserId = req.getRegUserId();
Long projectId = req.getProjectId();
// uploadFile 과 동일한 로직을 bytes 기반으로 수행
String storedName = UUID.randomUUID() + "-" + originalName;
String objectName = (path == null || path.isEmpty())
? storedName
: path + "/" + storedName;
try (InputStream is = new ByteArrayInputStream(bytes)) {
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(objectName)
.stream(is, bytes.length, -1)
.contentType("text/x-python")
.build()
);
}
MinioAttachmentEntity attachment = MinioAttachmentEntity.builder()
.refId(refId)
.refType(refType)
.originalName(originalName)
.storedName(storedName)
.contentType("text/x-python")
.size((long) bytes.length)
.storagePath(objectName)
.title(title)
.version(version)
.description(description)
.regUserId(regUserId)
.projectId(projectId)
.build();
return minioAttachmentRepository.save(attachment);
}
/**
* .py KFP YAML .
* Python 3 kfp (pip install kfp) .
* @dsl.pipeline , 'pipeline' 'my_pipeline' .
*
* @return yamlStoragePath(MinIO ), yamlContent( YAML )
*/
public Map<String, String> compilePyToKfpYaml(Long id) throws Exception {
MinioAttachmentEntity attachment = minioAttachmentRepository.findById(id)
.orElseThrow(() -> new IllegalArgumentException("첨부파일을 찾을 수 없습니다. ID=" + id));
String originalName = attachment.getOriginalName();
if (originalName == null || !originalName.toLowerCase().endsWith(".py")) {
throw new IllegalArgumentException("Python(.py) 파일만 컴파일할 수 있습니다. 파일: " + originalName);
}
String pyContent = readYamlText(bucketName, attachment.getStoragePath());
Path tempDir = Files.createTempDirectory("kfp_compile_");
try {
Path pyPath = tempDir.resolve("pipeline.py");
Files.writeString(pyPath, pyContent, StandardCharsets.UTF_8);
Path yamlPath = tempDir.resolve("output.yaml");
Path helperPath = tempDir.resolve("compile_kfp_pipeline.py");
try (InputStream is = getClass().getResourceAsStream("/scripts/compile_kfp_pipeline.py")) {
if (is == null) {
throw new IllegalStateException("컴파일 헬퍼 스크립트를 찾을 수 없습니다. (scripts/compile_kfp_pipeline.py)");
}
Files.copy(is, helperPath);
}
ProcessBuilder pb = new ProcessBuilder(
pythonCommand != null && !pythonCommand.isBlank() ? pythonCommand : "python3",
helperPath.toAbsolutePath().toString(),
pyPath.toAbsolutePath().toString(),
yamlPath.toAbsolutePath().toString()
);
pb.redirectErrorStream(true);
Process p = pb.start();
StringBuilder out = new StringBuilder();
try (BufferedReader r = new BufferedReader(new InputStreamReader(p.getInputStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = r.readLine()) != null) {
out.append(line).append("\n");
}
}
int exit = p.waitFor();
if (exit != 0) {
log.warn("KFP 컴파일 stderr/stdout: {}", out);
throw new RuntimeException("스크립트 컴파일 실패: " + (out.length() > 0 ? out.toString().trim() : "exit " + exit));
}
if (!Files.exists(yamlPath)) {
throw new RuntimeException("컴파일 결과 YAML 파일이 생성되지 않았습니다.");
}
String yamlContent = Files.readString(yamlPath, StandardCharsets.UTF_8);
String baseName = originalName.replaceAll("\\.py$", "");
String yamlObjectName = "compiled/" + id + "_" + baseName + ".yaml";
byte[] yamlBytes = yamlContent.getBytes(StandardCharsets.UTF_8);
minioClient.putObject(
PutObjectArgs.builder()
.bucket(bucketName)
.object(yamlObjectName)
.stream(new ByteArrayInputStream(yamlBytes), yamlBytes.length, -1)
.contentType("application/x-yaml")
.build()
);
Map<String, String> result = new HashMap<>();
result.put("yamlStoragePath", yamlObjectName);
result.put("yamlContent", yamlContent);
return result;
} finally {
try {
if (tempDir != null && Files.exists(tempDir)) {
Files.walk(tempDir).sorted((a, b) -> -a.compareTo(b)).forEach(path -> {
try {
Files.deleteIfExists(path);
} catch (Exception e) {
log.debug("temp 삭제 무시: {}", path, e);
}
});
}
} catch (Exception e) {
log.debug("temp 정리 중 오류 무시", e);
}
}
}
/**
* ID YAML ,
* MinIO Optional .
*/
public Optional<String> findCompiledYamlPath(Long id) {
MinioAttachmentEntity attachment = minioAttachmentRepository.findById(id)
.orElseThrow(() -> new IllegalArgumentException("첨부파일을 찾을 수 없습니다. ID=" + id));
String originalName = attachment.getOriginalName();
if (originalName == null || !originalName.toLowerCase().endsWith(".py")) {
throw new IllegalArgumentException("Python(.py) 파일만 컴파일 대상입니다. 파일: " + originalName);
}
String baseName = originalName.replaceAll("\\.py$", "");
String yamlObjectName = "compiled/" + id + "_" + baseName + ".yaml";
try {
minioClient.statObject(
StatObjectArgs.builder()
.bucket(bucketName)
.object(yamlObjectName)
.build()
);
return Optional.of(yamlObjectName);
} catch (Exception e) {
log.debug("컴파일된 YAML이 존재하지 않음: id={}, objectName={}", id, yamlObjectName);
return Optional.empty();
}
}
/**
* YAML .py ZIP .
* @param attachmentId ID ( .py)
* @param yamlObjectName MinIO YAML (: compiled/5_xxx.yaml)
* @return ZIP byte[]
*/
public byte[] downloadCompiledBundle(Long attachmentId, String yamlObjectName) throws Exception {
MinioAttachmentEntity attachment = minioAttachmentRepository.findById(attachmentId)
.orElseThrow(() -> new IllegalArgumentException("첨부파일을 찾을 수 없습니다. ID=" + attachmentId));
if (yamlObjectName == null || yamlObjectName.isBlank()) {
throw new IllegalArgumentException("yamlObjectName이 없습니다.");
}
byte[] pyBytes = downloadFile(bucketName, attachment.getStoragePath());
byte[] yamlBytes = downloadFile(bucketName, yamlObjectName);
String pyFileName = attachment.getOriginalName() != null ? attachment.getOriginalName() : "script.py";
String yamlFileName = yamlObjectName.contains("/") ? yamlObjectName.substring(yamlObjectName.lastIndexOf('/') + 1) : yamlObjectName;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ZipOutputStream zos = new ZipOutputStream(baos)) {
zos.putNextEntry(new ZipEntry(pyFileName));
zos.write(pyBytes);
zos.closeEntry();
zos.putNextEntry(new ZipEntry(yamlFileName));
zos.write(yamlBytes);
zos.closeEntry();
}
return baos.toByteArray();
}
}