[UPDATE] Kubeflow 및 MLflow 실험 등록 로직 개선: 기존 실험 조회 및 `created_at` 파싱 안정화

feature/apply-patched-updates
bjkim 4 weeks ago
parent db91eee239
commit 6ecd8e137c

@ -23,6 +23,7 @@ import reactor.core.publisher.Mono;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Instant; import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZoneOffset; import java.time.ZoneOffset;
@ -30,6 +31,7 @@ import java.time.format.DateTimeFormatter;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.web.reactive.function.client.WebClientResponseException;
@Tag(name = "Experiments", description = "Kubeflow 및 MLflow Experiment API") @Tag(name = "Experiments", description = "Kubeflow 및 MLflow Experiment API")
@RestController @RestController
@ -154,16 +156,61 @@ public class ExperimentsController {
.bodyValue(kubeflowPayload) .bodyValue(kubeflowPayload)
.retrieve() .retrieve()
.bodyToMono(Map.class) .bodyToMono(Map.class)
.onErrorResume(WebClientResponseException.Conflict.class, e -> {
log.info("Kubeflow experiment 가 이미 존재합니다 (409 Conflict). 기존 experiment를 조회합니다.");
return webClientBuilder.build()
.get()
.uri(kubeflowBaseUrl + "/apis/v2beta1/experiments")
.retrieve()
.bodyToMono(Map.class)
.flatMap(listResp -> {
if (listResp != null && listResp.containsKey("experiments")) {
List<Map<String, Object>> experiments = (List<Map<String, Object>>) listResp.get("experiments");
for (Map<String, Object> exp : experiments) {
String expName = (String) exp.get("display_name");
if (saved.getDisplayName().equals(expName)) {
log.info("기존 Kubeflow experiment 발견: {}", exp);
Map<String, Object> mockResp = new HashMap<>();
String expId = exp.get("id") != null ? exp.get("id").toString() :
(exp.get("experiment_id") != null ? exp.get("experiment_id").toString() : null);
mockResp.put("experiment_id", expId);
mockResp.put("created_at", exp.get("created_at"));
return Mono.just(mockResp);
}
}
}
return Mono.error(new RuntimeException("Kubeflow experiment 가 존재한다고 하나 목록에서 일치하는 이름을 찾을 수 없습니다.", e));
});
})
.flatMap(kubeflowResp -> { .flatMap(kubeflowResp -> {
if (kubeflowResp.containsKey("experiment_id")) { if (kubeflowResp.containsKey("experiment_id") && kubeflowResp.get("experiment_id") != null) {
saved.setKubeFlowId((String) kubeflowResp.get("experiment_id")); saved.setKubeFlowId((String) kubeflowResp.get("experiment_id"));
} else if (kubeflowResp.containsKey("id") && kubeflowResp.get("id") != null) {
saved.setKubeFlowId((String) kubeflowResp.get("id"));
} }
if (kubeflowResp.containsKey("created_at")) {
saved.setKubeflowCreatedAt( if (kubeflowResp.containsKey("created_at") && kubeflowResp.get("created_at") != null) {
Instant.parse((String) kubeflowResp.get("created_at")) try {
.atZone(ZoneId.of("Asia/Seoul")) String createdAtStr = (String) kubeflowResp.get("created_at");
.toLocalDateTime() saved.setKubeflowCreatedAt(
); Instant.parse(createdAtStr)
.atZone(ZoneId.of("Asia/Seoul"))
.toLocalDateTime()
);
} catch (Exception parseEx) {
log.warn("Kubeflow created_at 파싱 실패 (Instant), OffsetDateTime 파싱 시도", parseEx);
try {
String createdAtStr = (String) kubeflowResp.get("created_at");
saved.setKubeflowCreatedAt(
OffsetDateTime.parse(createdAtStr)
.atZoneSameInstant(ZoneId.of("Asia/Seoul"))
.toLocalDateTime()
);
} catch (Exception parseEx2) {
log.error("Kubeflow created_at 최종 파싱 실패, 현재 시간으로 설정", parseEx2);
saved.setKubeflowCreatedAt(LocalDateTime.now());
}
}
} }
log.info("Kubeflow experiment 등록 완료: {}", kubeflowResp); log.info("Kubeflow experiment 등록 완료: {}", kubeflowResp);
@ -171,7 +218,6 @@ public class ExperimentsController {
// 2⃣ MLflow 등록 // 2⃣ MLflow 등록
Map<String, Object> mlflowPayload = new HashMap<>(); Map<String, Object> mlflowPayload = new HashMap<>();
mlflowPayload.put("name", saved.getDisplayName()); mlflowPayload.put("name", saved.getDisplayName());
//mlflowPayload.put("artifact_location", "/default/artifacts");
return webClientBuilder.build() return webClientBuilder.build()
.post() .post()
@ -181,6 +227,29 @@ public class ExperimentsController {
.bodyValue(mlflowPayload) .bodyValue(mlflowPayload)
.retrieve() .retrieve()
.bodyToMono(Map.class) .bodyToMono(Map.class)
.onErrorResume(WebClientResponseException.BadRequest.class, ex -> {
log.info("MLflow experiment 가 이미 존재할 가능성이 있습니다 (400 Bad Request). 이름으로 조회합니다.");
try {
String encodedName = URLEncoder.encode(saved.getDisplayName(), StandardCharsets.UTF_8.toString());
return webClientBuilder.build()
.get()
.uri(mlflowBaseUrl + "/api/2.0/mlflow/experiments/get-by-name?experiment_name=" + encodedName)
.headers(headers -> headers.setBasicAuth(mlflowUser, mlflowPassword))
.retrieve()
.bodyToMono(Map.class)
.flatMap(getByNameResp -> {
if (getByNameResp != null && getByNameResp.containsKey("experiment")) {
Map<String, Object> exp = (Map<String, Object>) getByNameResp.get("experiment");
Map<String, Object> mockCreateResp = new HashMap<>();
mockCreateResp.put("experiment_id", exp.get("experiment_id"));
return Mono.just(mockCreateResp);
}
return Mono.error(new RuntimeException("MLflow experiment 가 존재한다고 하나 이름으로 찾을 수 없습니다.", ex));
});
} catch (Exception e) {
return Mono.error(e);
}
})
.flatMap(createResp -> { .flatMap(createResp -> {
log.info("MLflow experiment 등록 완료: {}", createResp); log.info("MLflow experiment 등록 완료: {}", createResp);
String mlflowExpId = (String) createResp.get("experiment_id"); String mlflowExpId = (String) createResp.get("experiment_id");
@ -206,7 +275,14 @@ public class ExperimentsController {
}); });
}); });
}) })
.doOnError(e -> log.error("Experiment 등록 실패", e)); .doOnError(e -> {
if (e instanceof WebClientResponseException) {
WebClientResponseException we = (WebClientResponseException) e;
log.error("Experiment 등록 중 외부 API 오류 발생. 상태코드={}, 응답바디={}", we.getStatusCode(), we.getResponseBodyAsString(), we);
} else {
log.error("Experiment 등록 실패", e);
}
});
} }
@Operation(summary = "Experiment 수정") @Operation(summary = "Experiment 수정")

Loading…
Cancel
Save