You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
autoflow-server-mgmt/src/main/java/kr/re/etri/autoflow/service/AdminService.java

798 lines
34 KiB

package kr.re.etri.autoflow.service;
import io.minio.ListBucketsArgs;
import io.minio.MinioClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* (KFP, MLflow, MinIO).
* 30 .
*
* :
* - KFP/MLflow: URL HTTP 2xx "정상" (API ).
* - MinIO: listBuckets() "정상".
*
* Pod Running Kubernetes API Pod .
* (: admin.k8s.enabled=true, namespace/label CoreV1Api.listNamespacedPod phase == "Running" .
* kubernetes-client-api .)
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AdminService {
private static final int CACHE_SECONDS = 30;
private static final String KFP_ARCHIVE_BUCKET = "mlpipeline";
private static final Pattern WAIT_LOG_ARCHIVE_KEY =
Pattern.compile("\\bkey:\\s*([\\w\\-./]+/main\\.log)\\b");
private final RestTemplate restTemplate;
private final KubernetesPodHealthService podHealthService;
private final PipelineUploadService pipelineUploadService;
private final ArgoServerLogService argoServerLogService;
@Value("${kubeflow.url:}")
private String kubeflowUrl;
@Value("${mlflow.url:}")
private String mlflowUrl;
@Value("${minio.endpoint:}")
private String minioEndpoint;
@Value("${minio.access-key:}")
private String minioAccessKey;
@Value("${minio.secret-key:}")
private String minioSecretKey;
/** true면 KFP ml-pipeline v1beta1 노드 로그 API를 kubectl보다 먼저 시도 (클러스터 내부 조회, UI와 동일 경로) */
@Value("${admin.k8s.prefer-kfp-api-for-logs:true}")
private boolean preferKfpApiForLogs;
/** true면 Argo Server 로그 API를 KFP/K8s보다 먼저 시도 (삭제된 Pod는 MinIO 아카이브) */
@Value("${admin.logs.prefer-argo-server:true}")
private boolean preferArgoServer;
private volatile Map<String, Object> cachedStatus;
private volatile long cachedAt;
/**
* KFP, MLflow, MinIO . 30 .
*/
public Map<String, Object> getStatus() {
long now = System.currentTimeMillis();
if (cachedStatus != null && (now - cachedAt) < CACHE_SECONDS * 1000L) {
return new HashMap<>(cachedStatus);
}
Map<String, Object> status = new HashMap<>();
status.put("kfp", checkKfp());
status.put("mlflow", checkMlflow());
status.put("minio", checkMinio());
status.put("updatedAt", Instant.now().toString());
cachedStatus = status;
cachedAt = now;
return new HashMap<>(status);
}
/**
* Kubernetes Pod ( ).
* admin.k8s.enabled=false kfp/mlflow/minio "Pod 없음" .
*/
public Map<String, Object> getPodStatus() {
Map<String, Object> out = new HashMap<>();
if (podHealthService == null || !podHealthService.isEnabled()) {
out.put("namespace", "");
out.put("message", "admin.k8s.enabled=false");
Map<String, Object> disabled = new HashMap<>();
disabled.put("ok", false);
disabled.put("message", "admin.k8s.enabled=true 설정 후 Pod 상태 조회 가능");
disabled.put("running", 0);
disabled.put("total", 0);
disabled.put("pods", java.util.Collections.emptyList());
out.put("kfp", disabled);
out.put("mlflow", new HashMap<>(disabled));
out.put("minio", new HashMap<>(disabled));
out.put("updatedAt", Instant.now().toString());
return out;
}
out.put("namespace", podHealthService.getNamespace());
out.put("kfp", podHealthService.getKfpPodStatus());
out.put("mlflow", podHealthService.getMlflowPodStatus());
out.put("minio", podHealthService.getMinioPodStatus());
out.put("updatedAt", Instant.now().toString());
return out;
}
private Map<String, Object> checkKfp() {
Map<String, Object> out = new HashMap<>();
String base = (kubeflowUrl != null ? kubeflowUrl.replaceAll("/+$", "") : "").trim();
if (base.isBlank()) {
out.put("status", "skip");
out.put("message", "kubeflow.url 미설정");
return out;
}
String url = base + "/apis/v2beta1/healthz";
try {
ResponseEntity<String> resp = restTemplate.getForEntity(url, String.class);
out.put("status", resp.getStatusCode().is2xxSuccessful() ? "ok" : "error");
out.put("message", "HTTP " + resp.getStatusCode().value());
} catch (Exception e) {
out.put("status", "error");
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
log.debug("[Admin] KFP health check failed: {}", e.getMessage());
}
return out;
}
private Map<String, Object> checkMlflow() {
Map<String, Object> out = new HashMap<>();
String base = (mlflowUrl != null ? mlflowUrl.replaceAll("/+$", "") : "").trim();
if (base.isBlank()) {
out.put("status", "skip");
out.put("message", "mlflow.url 미설정");
return out;
}
String url = base + "/health";
try {
ResponseEntity<String> resp = restTemplate.getForEntity(url, String.class);
out.put("status", resp.getStatusCode().is2xxSuccessful() ? "ok" : "error");
out.put("message", "HTTP " + resp.getStatusCode().value());
} catch (Exception e) {
out.put("status", "error");
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
log.debug("[Admin] MLflow health check failed: {}", e.getMessage());
}
return out;
}
/**
* MinIO / (accessKey/secretKey) .
* listBuckets() · .
*/
private Map<String, Object> checkMinio() {
Map<String, Object> out = new HashMap<>();
String endpoint = (minioEndpoint != null
? minioEndpoint.replaceAll("/+$", "") : "").trim();
if (endpoint.isBlank()) {
out.put("status", "skip");
out.put("message", "MinIO endpoint 미설정");
return out;
}
String accessKey = minioAccessKey;
String secretKey = minioSecretKey;
if (accessKey == null) accessKey = "";
if (secretKey == null) secretKey = "";
try {
MinioClient client = MinioClient.builder()
.endpoint(endpoint)
.credentials(accessKey, secretKey)
.build();
client.listBuckets(ListBucketsArgs.builder().build());
out.put("status", "ok");
out.put("message", "연결 정상 (동일 계정)");
} catch (Exception e) {
out.put("status", "error");
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
log.debug("[Admin] MinIO health check failed: {}", e.getMessage());
}
return out;
}
/**
* . K8s API .
* .
*/
public Map<String, Object> requestRestart(String service) {
log.info("[Admin] restart requested for service: {}", service);
Map<String, Object> out = new HashMap<>();
if (service == null || !java.util.Set.of("kfp", "mlflow", "minio").contains(service.toLowerCase())) {
out.put("success", false);
out.put("message", "지원하지 않는 서비스입니다. (kfp, mlflow, minio 중 하나)");
return out;
}
out.put("success", true);
out.put("message", "재시작 요청이 접수되었습니다. 실제 재시작은 Kubernetes 등에서 별도 설정이 필요합니다.");
return out;
}
/**
* runId(KFP run id) Pod . Executions .
*/
public Map<String, Object> getPodsByRunId(String runId) {
if (podHealthService == null || !podHealthService.isEnabled()) {
Map<String, Object> out = new HashMap<>();
out.put("namespace", "");
out.put("pods", java.util.Collections.emptyList());
out.put("message", "admin.k8s.enabled=false");
return out;
}
Map<String, Object> out = podHealthService.listPodsByRunId(runId);
@SuppressWarnings("unchecked")
List<Map<String, String>> pods = (List<Map<String, String>>) out.get("pods");
if (pods != null && !pods.isEmpty()) {
return out;
}
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
if (tasks.isEmpty()) {
return out;
}
String ns = podHealthService.getNamespace();
List<Map<String, String>> fromKfp = new ArrayList<>();
Set<String> seen = new HashSet<>();
for (Map<String, Object> t : tasks) {
String pod = firstString(t.get("podName"), t.get("pod_name"));
if (pod == null || pod.isBlank() || !seen.add(pod)) {
continue;
}
Map<String, String> e = new LinkedHashMap<>();
e.put("name", pod);
e.put("phase", kfpStateToPhase(t.get("state")));
e.put("displayName", firstString(t.get("displayName"), t.get("display_name")));
fromKfp.add(e);
}
if (!fromKfp.isEmpty()) {
out.put("namespace", ns);
out.put("pods", fromKfp);
out.put("message", fromKfp.size() + " pods (KFP run_details.task_details)");
out.remove("hint");
}
return out;
}
/**
* Pod . admin.k8s.enabled=true .
*/
public String getPodLog(String namespace, String podName, String container, Integer tailLines) {
if (podHealthService == null || !podHealthService.isEnabled()) {
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
}
return podHealthService.getPodLog(namespace, podName, container, tailLines);
}
/**
* Run . (allSteps=false): KFP UI <b> </b> Pod, Pod.
* allSteps=true task_details Pod .
* podName Pod. stepName displayName .
* Pod {@code admin.k8s.pipeline-pod-namespaces} .
*/
public String getPodLogsByRunId(
String runId,
Integer tailLines,
boolean allSteps,
String podNameParam,
String stepNameParam,
String workflowNameParam,
String workflowNamespaceParam) {
if (podHealthService == null || !podHealthService.isEnabled()) {
if (argoServerLogService != null
&& argoServerLogService.isConfigured()
&& runId != null
&& !runId.isBlank()
&& podNameParam != null
&& !podNameParam.isBlank()) {
String wfNs =
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
? workflowNamespaceParam.trim()
: "kubeflow";
String wfName =
workflowNameParam != null && !workflowNameParam.isBlank()
? workflowNameParam.trim()
: ArgoServerLogService.guessWorkflowNameFromExecutorPod(podNameParam.trim());
if (wfName != null) {
ArgoServerLogService.ArgoLogFetchResult al =
argoServerLogService.fetchWorkflowPodLog(
wfNs, wfName, podNameParam.trim(), normalizeTail(tailLines));
if (isSubstantialArgoLogBody(al != null ? al.logText : null)) {
return "-- Argo Server /api/v1/workflows/"
+ wfNs
+ "/"
+ wfName
+ "/log (kubectl 없음) --\n\n"
+ al.logText;
}
}
}
if (runId != null && !runId.isBlank()
&& podNameParam != null && !podNameParam.isBlank()
&& kubeflowUrl != null && !kubeflowUrl.isBlank()) {
String kfpOnly = tryKfpMlPipelineNodeLog(runId, podNameParam.trim());
if (kfpOnly != null) {
return "-- KFP ml-pipeline API (kubectl 없이) v1beta1 노드 로그 --\n\n" + kfpOnly;
}
}
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
}
if (runId == null || runId.isBlank()) {
return "runId가 없습니다.";
}
if (podNameParam != null && !podNameParam.isBlank()) {
KubernetesPodHealthService.ExecutorPodResolution res0 =
podHealthService.resolveKfpExecutorImplPod(runId, podNameParam.trim());
String wfNsPod =
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
? workflowNamespaceParam.trim()
: res0.namespace;
ArgoServerLogService.ArgoLogFetchResult argoDebug = null;
if (preferArgoServer && argoServerLogService.isConfigured()) {
argoDebug =
tryArgoWorkflowLogForStep(
wfNsPod,
workflowNameParam,
res0.podName,
podNameParam.trim(),
tailLines);
if (argoDebug != null && isSubstantialArgoLogBody(argoDebug.logText)) {
return "-- Argo Server API (Pod 종료 후 MinIO 아카이브 포함) | wf="
+ wfNsPod
+ " | pod="
+ res0.podName
+ " --\n\n"
+ argoDebug.logText;
}
}
if (preferKfpApiForLogs) {
String kfpFirst = tryKfpMlPipelineNodeLog(runId, res0.podName, podNameParam.trim());
if (kfpFirst != null) {
return "-- KFP ml-pipeline API (UI와 동일) | node: " + res0.podName
+ (podNameParam.trim().equals(res0.podName) ? "" : " (요청: " + podNameParam.trim() + ")")
+ " --\n\n" + kfpFirst;
}
}
KubernetesPodHealthService.ExecutorPodResolution res =
podHealthService.resolveKfpExecutorImplPod(runId, podNameParam.trim());
String logText = podHealthService.readPodLogInNamespace(
res.namespace, res.podName, normalizeTail(tailLines));
if (logText != null && logText.startsWith("로그 조회 실패")) {
KubernetesPodHealthService.PipelinePodLogOutcome fb =
podHealthService.readPipelinePodLog(res.podName, null, normalizeTail(tailLines));
logText = fb.logText;
}
String out = "-- kubectl logs " + res.podName + " -n " + res.namespace
+ (podNameParam.trim().equals(res.podName) ? "" : " (요청: " + podNameParam.trim() + ")")
+ " --\n\n" + (logText != null ? logText : "");
String archived = tryFetchArchivedMainLogFromWait(logText);
if (archived != null) {
out = out + "\n\n" + archived;
}
if ((logText == null || logText.trim().isEmpty() || logText.startsWith("로그 조회 실패"))
&& argoDebug != null
&& argoDebug.url != null
&& !argoDebug.url.isBlank()) {
out =
out
+ "\n\n-- Argo Server 시도(진단) --\n"
+ "url: "
+ argoDebug.url
+ "\nstatus: "
+ (argoDebug.statusCode != null ? argoDebug.statusCode : "(none)")
+ "\nerror: "
+ (argoDebug.error != null ? argoDebug.error : "(none)")
+ "\n(설정: argo.server.url / 필요시 argo.server.token)";
}
return out;
}
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
if (tasks.isEmpty()) {
String hint =
"KFP Run에 task_details/pod_name이 없거나 API 조회 실패입니다. "
+ "멀티유저 환경이면 application.properties 에 "
+ "admin.k8s.pipeline-pod-namespaces=파이프라인Pod가있는NS 를 설정하세요.\n";
return hint + podHealthService.aggregatePodLogsByRunId(runId, tailLines);
}
if (stepNameParam != null && !stepNameParam.isBlank()) {
String hint = stepNameParam.trim().toLowerCase();
for (Map<String, Object> t : tasks) {
String dn = firstString(t.get("displayName"), t.get("display_name"));
if (dn != null && dn.toLowerCase().contains(hint)) {
return formatSingleTaskLog(runId, t, tailLines, workflowNameParam, workflowNamespaceParam);
}
}
return "스텝 표시명에 \"" + stepNameParam + "\" 가 포함된 task를 찾지 못했습니다. (allSteps=true 로 전체 확인 가능)";
}
if (allSteps) {
List<String> podNames = new ArrayList<>();
List<String> stepNames = new ArrayList<>();
Set<String> seen = new HashSet<>();
for (Map<String, Object> t : tasks) {
String pod = firstString(t.get("podName"), t.get("pod_name"));
if (pod == null || pod.isBlank() || !seen.add(pod)) {
continue;
}
podNames.add(pod);
stepNames.add(firstString(t.get("displayName"), t.get("display_name")));
}
return podHealthService.aggregatePodLogsForKfpTasks(runId, podNames, stepNames, tailLines);
}
Map<String, Object> chosen = pickPrimaryKfpTaskForLog(tasks);
return formatSingleTaskLog(runId, chosen, tailLines, workflowNameParam, workflowNamespaceParam);
}
private static Integer normalizeTail(Integer tailLines) {
if (tailLines == null) {
return null;
}
if (tailLines <= 0) {
return null;
}
return tailLines;
}
private String formatSingleTaskLog(
String runId,
Map<String, Object> task,
Integer tailLines,
String workflowNameParam,
String workflowNamespaceParam) {
String pod = firstString(task.get("podName"), task.get("pod_name"));
String step = firstString(task.get("displayName"), task.get("display_name"));
if (pod == null || pod.isBlank()) {
return "선택된 task에 pod_name이 없습니다.";
}
KubernetesPodHealthService.ExecutorPodResolution res =
podHealthService.resolveKfpExecutorImplPod(runId, pod.trim());
String wfNsEff =
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
? workflowNamespaceParam.trim()
: res.namespace;
if (preferArgoServer && argoServerLogService.isConfigured()) {
ArgoServerLogService.ArgoLogFetchResult argoLog =
tryArgoWorkflowLogForStep(
wfNsEff, workflowNameParam, res.podName, pod.trim(), tailLines);
if (argoLog != null && isSubstantialArgoLogBody(argoLog.logText)) {
StringBuilder a = new StringBuilder();
a.append("-- Argo Server workflow log (실시간·아카이브 자동) --\n");
a.append("namespace: ").append(wfNsEff).append(" | pod: ").append(res.podName);
if (!pod.trim().equals(res.podName)) {
a.append(" | KFP pod_name: ").append(pod.trim());
}
a.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
a.append(argoLog.logText != null ? argoLog.logText : "");
return a.toString();
}
}
if (preferKfpApiForLogs) {
String kfpLog = tryKfpMlPipelineNodeLog(runId, res.podName, pod.trim());
if (kfpLog != null) {
StringBuilder k = new StringBuilder();
k.append("-- KFP ml-pipeline API v1beta1/runs/.../nodes/{node_id}/log (UI와 동일 백엔드) --\n");
k.append("node_id: ").append(res.podName);
if (!pod.trim().equals(res.podName)) {
k.append(" | KFP task pod_name: ").append(pod.trim());
}
k.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
k.append(kfpLog);
return k.toString();
}
}
String logText = podHealthService.readPodLogInNamespace(
res.namespace, res.podName, normalizeTail(tailLines));
if (logText != null && logText.startsWith("로그 조회 실패")) {
KubernetesPodHealthService.PipelinePodLogOutcome fb =
podHealthService.readPipelinePodLog(res.podName, null, normalizeTail(tailLines));
logText = fb.logText;
}
StringBuilder sb = new StringBuilder();
sb.append("-- kubectl logs ").append(res.podName).append(" -n ").append(res.namespace);
sb.append(" (KFP 로그와 동일) | Step: ").append(step != null ? step : "(이름 없음)");
if (!pod.trim().equals(res.podName)) {
sb.append(" | KFP API pod_name: ").append(pod.trim());
}
sb.append(" --\n\n");
sb.append(logText != null ? logText : "");
String archived = tryFetchArchivedMainLogFromWait(logText);
if (archived != null) {
sb.append("\n\n").append(archived);
}
return sb.toString();
}
/**
* KFP UI ml-pipeline . node_id .
*/
private String tryKfpMlPipelineNodeLog(String runId, String... nodeIdsOrdered) {
if (runId == null || runId.isBlank() || kubeflowUrl == null || kubeflowUrl.isBlank()) {
return null;
}
LinkedHashSet<String> seen = new LinkedHashSet<>();
for (String id : nodeIdsOrdered) {
if (id == null || id.isBlank()) {
continue;
}
String t = id.trim();
if (!seen.add(t)) {
continue;
}
String body = pipelineUploadService.getV1beta1RunNodeLog(runId, t);
if (isSubstantialKfpV1LogBody(body)) {
return body;
}
}
return null;
}
private static boolean isSubstantialKfpV1LogBody(String s) {
if (s == null) {
return false;
}
String t = s.trim();
if (t.length() < 15) {
return false;
}
if (t.startsWith("{") && (t.contains("\"error\"") || t.contains("Error"))) {
return false;
}
String low = t.toLowerCase();
if (low.startsWith("failed to get")) {
return false;
}
return true;
}
private ArgoServerLogService.ArgoLogFetchResult tryArgoWorkflowLogForStep(
String workflowNamespace,
String workflowNameOverride,
String implPodName,
String apiPodHint,
Integer tailLines) {
if (!preferArgoServer || argoServerLogService == null || !argoServerLogService.isConfigured()) {
return null;
}
if (workflowNamespace == null || workflowNamespace.isBlank()) {
return null;
}
String wfNs = workflowNamespace.trim();
String wfName =
workflowNameOverride != null && !workflowNameOverride.isBlank()
? workflowNameOverride.trim()
: null;
if (wfName == null && podHealthService != null && podHealthService.isEnabled()) {
wfName = podHealthService.getArgoWorkflowNameFromPod(wfNs, implPodName);
}
if (wfName == null || wfName.isBlank()) {
wfName = ArgoServerLogService.guessWorkflowNameFromExecutorPod(implPodName);
}
if (wfName == null || wfName.isBlank()) {
wfName = ArgoServerLogService.guessWorkflowNameFromExecutorPod(apiPodHint);
}
if (wfName == null || wfName.isBlank()) {
return null;
}
return argoServerLogService.fetchWorkflowPodLog(
wfNs, wfName, implPodName, normalizeTail(tailLines));
}
private static boolean isSubstantialArgoLogBody(String s) {
if (s == null) {
return false;
}
String t = s.trim();
if (t.length() < 12) {
return false;
}
if (t.startsWith("{") && (t.contains("\"error\"") || t.contains("Error"))) {
return false;
}
String low = t.toLowerCase();
if (low.startsWith("rpc error") || low.startsWith("failed to retrieve")) {
return false;
}
return true;
}
/**
* KFP/Argo executor impl Pod kubectl wait main.log key ,
* MinIO(mlpipeline ) "실제 실행 로그(main)" .
*/
private String tryFetchArchivedMainLogFromWait(String logText) {
if (logText == null || logText.isBlank()) {
return null;
}
String t = logText.trim();
if (!t.startsWith("-- container=wait --")) {
return null;
}
Matcher m = WAIT_LOG_ARCHIVE_KEY.matcher(t);
if (!m.find()) {
return null;
}
String key = m.group(1);
if (key == null || key.isBlank()) {
return null;
}
try {
MinioClient client = MinioClient.builder().endpoint(minioEndpoint).credentials(minioAccessKey, minioSecretKey).build();
try (java.io.InputStream is = client.getObject(io.minio.GetObjectArgs.builder().bucket(KFP_ARCHIVE_BUCKET).object(key.trim()).build())) {
byte[] bytes = is.readAllBytes();
String body = new String(bytes, StandardCharsets.UTF_8);
if (body.isBlank()) return null;
return "-- MinIO archived main.log (bucket=" + KFP_ARCHIVE_BUCKET + ", key=" + key.trim() + ") --\n\n" + body;
}
} catch (Exception e) {
return "-- MinIO archived main.log 읽기 실패 (bucket=" + KFP_ARCHIVE_BUCKET + ", key=" + key.trim() + "): "
+ (e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName())
+ " --";
}
}
/** 실패 스텝 중 시간상 마지막 것, 없으면 생성 순 마지막 스텝 */
private static Map<String, Object> pickPrimaryKfpTaskForLog(List<Map<String, Object>> tasks) {
List<Map<String, Object>> failed = new ArrayList<>();
for (Map<String, Object> t : tasks) {
if (kfpTaskStateFailed(t)) {
failed.add(t);
}
}
if (!failed.isEmpty()) {
return failed.get(failed.size() - 1);
}
return tasks.get(tasks.size() - 1);
}
private static boolean kfpTaskStateFailed(Map<String, Object> task) {
Object st = task.get("state");
if (st instanceof Map) {
Object rs = ((Map<?, ?>) st).get("runtimeState");
if (rs == null) {
rs = ((Map<?, ?>) st).get("runtime_state");
}
if (rs != null) {
st = rs;
}
}
if (st instanceof Number) {
return ((Number) st).intValue() == 5;
}
String s = String.valueOf(st).toUpperCase();
return s.contains("FAILED") || s.contains("ERROR") || s.contains("CANCEL");
}
@SuppressWarnings("unchecked")
private static Map<String, Object> unwrapKfpTaskMap(Object node) {
if (!(node instanceof Map)) {
return null;
}
Map<String, Object> m = (Map<String, Object>) node;
Object inner = m.get("task");
if (inner instanceof Map) {
return (Map<String, Object>) inner;
}
return m;
}
/** task_details 항목에서 pod가 있는 태스크를 모음(평면 배열 + 중첩 child). Pod당 최신 항목 유지. */
@SuppressWarnings("unchecked")
private static void mergeKfpTaskWithPod(Object node, Map<String, Map<String, Object>> byPod) {
Map<String, Object> task = unwrapKfpTaskMap(node);
if (task == null) {
return;
}
String pod = firstString(task.get("podName"), task.get("pod_name"));
if (pod != null && !pod.isBlank()) {
byPod.put(pod.trim(), task);
}
Object child = task.get("childTasks");
if (child == null) {
child = task.get("child_tasks");
}
if (!(child instanceof List)) {
return;
}
for (Object c : (List<?>) child) {
if (!(c instanceof Map)) {
continue;
}
Map<String, Object> cm = (Map<String, Object>) c;
if (cm.get("task") instanceof Map) {
mergeKfpTaskWithPod(cm.get("task"), byPod);
} else if (firstString(cm.get("podName"), cm.get("pod_name")) != null) {
mergeKfpTaskWithPod(cm, byPod);
}
}
}
@SuppressWarnings("unchecked")
private static List<Map<String, Object>> kfpTasksWithPods(Map<String, Object> run) {
if (run == null) {
return List.of();
}
Object rd = run.get("runDetails");
if (rd == null) {
rd = run.get("run_details");
}
if (!(rd instanceof Map)) {
return List.of();
}
Object td = ((Map<?, ?>) rd).get("taskDetails");
if (td == null) {
td = ((Map<?, ?>) rd).get("task_details");
}
if (!(td instanceof List)) {
return List.of();
}
Map<String, Map<String, Object>> byPod = new LinkedHashMap<>();
for (Object item : (List<?>) td) {
mergeKfpTaskWithPod(item, byPod);
}
List<Map<String, Object>> flat = new ArrayList<>(byPod.values());
flat.sort(Comparator.comparing(m -> {
Object ct = m.get("createTime");
if (ct == null) {
ct = m.get("create_time");
}
return ct != null ? ct.toString() : "";
}));
return flat;
}
private static String firstString(Object a, Object b) {
if (a != null) {
String s = String.valueOf(a).trim();
if (!s.isEmpty()) {
return s;
}
}
if (b != null) {
String s = String.valueOf(b).trim();
if (!s.isEmpty()) {
return s;
}
}
return null;
}
@SuppressWarnings("unchecked")
private static String kfpStateToPhase(Object state) {
if (state == null) {
return null;
}
if (state instanceof Map) {
Object rs = ((Map<?, ?>) state).get("runtimeState");
if (rs == null) {
rs = ((Map<?, ?>) state).get("runtime_state");
}
if (rs != null) {
state = rs;
}
}
String s = state.toString();
if (s.contains("RUNNING")) {
return "Running";
}
if (s.contains("SUCCEEDED")) {
return "Succeeded";
}
if (s.contains("FAILED") || s.contains("ERROR")) {
return "Failed";
}
if (s.contains("PENDING") || s.contains("SKIPPED")) {
return s.contains("SKIPPED") ? "Skipped" : "Pending";
}
return s;
}
/** 관리자 페이지: 동일 카드(KFP/MLflow/MinIO)에 속한 Pod들 로그를 한 문자열로 */
public String getPodLogsAggregate(String namespace, java.util.List<String> podNames, Integer tailLines) {
if (podHealthService == null || !podHealthService.isEnabled()) {
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
}
return podHealthService.aggregatePodLogsByNames(namespace, podNames, tailLines);
}
}