[ADD] 외부 데이터셋 다운로드 및 S3 저장 API 추가, Controller 및 Service 로직 구현

main
bjkim 8 months ago
parent 17f1a1670d
commit dc37d07b80

@ -7,13 +7,18 @@ import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import kr.re.etri.autoflow.entity.MinioAttachmentEntity;
import kr.re.etri.autoflow.service.DatasetService; import kr.re.etri.autoflow.service.DatasetService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
@Slf4j
@Tag(name = "Dataset API", description = "외부 데이터셋 목록 조회 API (Python requests 코드 기반)") @Tag(name = "Dataset API", description = "외부 데이터셋 목록 조회 API (Python requests 코드 기반)")
@RestController @RestController
@RequestMapping("/api/datasets") @RequestMapping("/api/datasets")
@ -35,26 +40,43 @@ public class ExternalDataSetController {
public ResponseEntity<?> getDatasetList( public ResponseEntity<?> getDatasetList(
@Parameter(description = "프로젝트 인덱스", example = "10") @Parameter(description = "프로젝트 인덱스", example = "10")
@RequestParam(required = false) Integer ds_prj_idx, @RequestParam(required = false) Integer ds_prj_idx,
@Parameter(description = "검색 키워드", example = "traffic") @Parameter(description = "검색 키워드", example = "")
@RequestParam(required = false) String search_keyword, @RequestParam(required = false) String search_keyword,
@Parameter(description = "그룹 이름", example = "AIGroup") @Parameter(description = "그룹 이름", example = "")
@RequestParam(required = false) String grp_name @RequestParam(required = false) String grp_name
) { ) {
Map<String, Object> result = datasetService.getDatasetList(ds_prj_idx, search_keyword, grp_name); Map<String, Object> result = datasetService.getDatasetList(ds_prj_idx, search_keyword, grp_name);
return ResponseEntity.ok(result); return ResponseEntity.ok(result);
} }
@PostMapping("/download") @Operation(summary = "외부 데이터셋 다운로드 후 S3 저장", description = "Kubeflow 등 외부 API로부터 데이터셋을 다운로드하고 MinIO에 업로드합니다.")
@Operation( @PostMapping("/dataset/save")
summary = "데이터셋 다운로드", public ResponseEntity<Map<String, Object>> saveDatasetToS3(
description = "외부 API 서버로 POST 요청을 보내 ZIP 파일을 다운로드합니다.", @RequestParam String datasetName,
parameters = { @RequestParam(required = false) String path,
@Parameter(name = "datasetName", description = "다운로드할 데이터셋 이름", in = ParameterIn.QUERY, required = true) @RequestParam(required = false) Long refId,
} @RequestParam(defaultValue = "DATASET") String refType,
) @RequestParam(required = false) String title,
public ResponseEntity<?> downloadDataset( @RequestParam(required = false) String description,
@RequestParam("datasetName") String datasetName @RequestParam(defaultValue = "1") Integer version,
@RequestParam String regUserId,
@RequestParam Long projectId
) { ) {
return datasetService.downloadDataset(datasetName); try {
MinioAttachmentEntity saved = datasetService.downloadDataset(
datasetName, path, refId, refType, title, description, version, regUserId, projectId
);
Map<String, Object> response = new HashMap<>();
response.put("attachment", saved);
//response.put("minioUrl", minioAttachmentService.getFileUrl(saved.getStoragePath()));
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("데이터셋 저장 실패", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("error", e.getMessage()));
}
} }
} }

@ -1,17 +1,30 @@
package kr.re.etri.autoflow.service; package kr.re.etri.autoflow.service;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import kr.re.etri.autoflow.entity.MinioAttachmentEntity;
import kr.re.etri.autoflow.repository.MinioAttachmentRepository;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.*; import org.springframework.http.*;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder; import org.springframework.web.util.UriComponentsBuilder;
import reactor.core.publisher.Flux;
import java.io.*;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -20,9 +33,21 @@ public class DatasetService {
private final RestTemplate restTemplate; private final RestTemplate restTemplate;
private final MinioAttachmentRepository minioAttachmentRepository;
private static final String BASE_URL = "http://52.14.11.43:18010"; private static final String BASE_URL = "http://52.14.11.43:18010";
private static final String LIST_ENDPOINT = "/export/dataset_list"; private static final String LIST_ENDPOINT = "/export/dataset_list";
private final MinioClient minioClient;
@Value("${minio.bucket}")
private String bucketName;
@Value("${minio.endpoint}")
private String minioEndpoint;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, Object> getDatasetList(Integer ds_prj_idx, String search_keyword, String grp_name) { public Map<String, Object> getDatasetList(Integer ds_prj_idx, String search_keyword, String grp_name) {
try { try {
@ -86,48 +111,110 @@ public class DatasetService {
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build(); .build();
public ResponseEntity<?> downloadDataset(String datasetName) { public MinioAttachmentEntity downloadDataset(
String datasetName,
String path,
Long refId,
String refType,
String title,
String description,
Integer version,
String regUserId,
Long projectId
) {
String apiUrl = baseUrl + DOWNLOAD_ENDPOINT; String apiUrl = baseUrl + DOWNLOAD_ENDPOINT;
log.info("외부 API 요청 URL: {}", apiUrl); log.info("외부 API 요청 URL: {}", apiUrl);
log.info("요청 데이터셋 이름: {}", datasetName); log.info("요청 데이터셋 이름: {}", datasetName);
// 요청 본문(JSON) String storedName = UUID.randomUUID() + "-" + datasetName + ".zip";
String requestBody = String.format("{\"dataset_name\":\"%s\"}", datasetName); String objectName = (path == null || path.isEmpty()) ? storedName : path + "/" + storedName;
try { try {
// WebClient로 외부 API POST 요청 Flux<DataBuffer> dataBufferFlux = webClient.post()
return webClient.post()
.uri(apiUrl) .uri(apiUrl)
.bodyValue(requestBody) .bodyValue(Map.of("dataset_name", datasetName))
.exchangeToMono(response -> { .retrieve()
if (response.statusCode().equals(HttpStatus.OK)) { .bodyToFlux(DataBuffer.class)
// 파일 스트림 수신 .doOnError(e -> log.error("다운로드 중 오류 발생", e));
return response.bodyToMono(byte[].class)
.map(bytes -> { PipedOutputStream pos = new PipedOutputStream();
ByteArrayResource resource = new ByteArrayResource(bytes); PipedInputStream pis = new PipedInputStream(pos, 10 * 1024 * 1024);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_OCTET_STREAM); CountDownLatch latch = new CountDownLatch(1);
headers.setContentDisposition(ContentDisposition.attachment()
.filename(datasetName + ".zip", StandardCharsets.UTF_8) // 파일 크기 저장용
.build()); long[] totalBytes = {0};
return new ResponseEntity<>(resource, headers, HttpStatus.OK);
}); Thread uploadThread = new Thread(() -> {
} else { try {
// 오류 응답 처리 minioClient.putObject(
return response.bodyToMono(String.class) PutObjectArgs.builder()
.map(errorBody -> { .bucket(bucketName)
log.error("다운로드 실패 (HTTP {}): {}", response.statusCode(), errorBody); .object(objectName)
return ResponseEntity.status(response.statusCode()) .stream(pis, -1, 10 * 1024 * 1024)
.contentType(MediaType.APPLICATION_JSON) .contentType("application/zip")
.body(errorBody); .build()
);
log.info("MinIO 업로드 완료: {}", objectName);
} catch (Exception e) {
log.error("MinIO 업로드 실패", e);
throw new RuntimeException(e);
} finally {
try { pis.close(); } catch (Exception ignored) {}
latch.countDown();
}
}); });
uploadThread.start();
dataBufferFlux.subscribe(
buffer -> {
try {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
pos.write(bytes);
pos.flush();
totalBytes[0] += bytes.length;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
DataBufferUtils.release(buffer);
} }
}) },
.block(); error -> {
log.error("다운로드 중 오류 발생", error);
try { pos.close(); } catch (Exception ignored) {}
latch.countDown();
},
() -> {
try { pos.close(); } catch (Exception ignored) {}
latch.countDown();
log.info("다운로드 완료: {} MB", totalBytes[0] / (1024 * 1024));
}
);
latch.await();
// DB 저장 시 size 컬럼 필수
MinioAttachmentEntity attachment = MinioAttachmentEntity.builder()
.refId(refId)
.refType(refType)
.originalName(datasetName + ".zip")
.storedName(storedName)
.contentType("application/zip")
.storagePath(objectName)
.title(title != null ? title : datasetName)
.version(version)
.description(description)
.regUserId(regUserId)
.projectId(projectId)
.size(totalBytes[0])
.build();
return minioAttachmentRepository.save(attachment);
} catch (Exception e) { } catch (Exception e) {
log.error("API 요청 중 오류 발생", e); log.error("외부 API 다운로드 및 MinIO 업로드 실패", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR) throw new RuntimeException("데이터셋 저장 실패: " + e.getMessage(), e);
.body("API 요청 중 오류 발생: " + e.getMessage());
} }
} }
} }

Loading…
Cancel
Save