You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
791 lines
34 KiB
791 lines
34 KiB
|
4 weeks ago
|
package kr.re.etri.autoflow.service;
|
||
|
|
|
||
|
|
import io.minio.ListBucketsArgs;
|
||
|
|
import io.minio.MinioClient;
|
||
|
|
import lombok.RequiredArgsConstructor;
|
||
|
|
import lombok.extern.slf4j.Slf4j;
|
||
|
|
import org.springframework.beans.factory.annotation.Value;
|
||
|
|
import org.springframework.http.ResponseEntity;
|
||
|
|
import org.springframework.stereotype.Service;
|
||
|
|
import org.springframework.web.client.RestTemplate;
|
||
|
|
|
||
|
|
import java.nio.charset.StandardCharsets;
|
||
|
|
import java.time.Instant;
|
||
|
|
import java.util.ArrayList;
|
||
|
|
import java.util.Comparator;
|
||
|
|
import java.util.HashMap;
|
||
|
|
import java.util.HashSet;
|
||
|
|
import java.util.LinkedHashMap;
|
||
|
|
import java.util.List;
|
||
|
|
import java.util.Map;
|
||
|
|
import java.util.LinkedHashSet;
|
||
|
|
import java.util.Set;
|
||
|
|
import java.util.regex.Matcher;
|
||
|
|
import java.util.regex.Pattern;
|
||
|
|
|
||
|
|
/**
|
||
|
|
* 관리자용 시스템 상태 조회 (KFP, MLflow, MinIO).
|
||
|
|
* 헬스 결과는 30초 캐시하여 각 모듈 부하를 줄임.
|
||
|
|
*
|
||
|
|
* 현재 판정 기준:
|
||
|
|
* - KFP/MLflow: 해당 URL로 HTTP 요청 보내 2xx 응답이 오면 "정상" (API가 살아있는지만 확인).
|
||
|
|
* - MinIO: 앱 계정으로 listBuckets() 성공 시 "정상".
|
||
|
|
*
|
||
|
|
* 관련 Pod가 Running인지 보려면 Kubernetes API로 Pod 목록 조회가 필요함.
|
||
|
|
* (예: admin.k8s.enabled=true, namespace/label 설정 후 CoreV1Api.listNamespacedPod → phase == "Running" 확인.
|
||
|
|
* kubernetes-client-api 의존성 및 클러스터 접근 권한 필요.)
|
||
|
|
*/
|
||
|
|
@Slf4j
|
||
|
|
@Service
|
||
|
|
@RequiredArgsConstructor
|
||
|
|
public class AdminService {
|
||
|
|
|
||
|
|
private static final int CACHE_SECONDS = 30;
|
||
|
|
private static final String KFP_ARCHIVE_BUCKET = "mlpipeline";
|
||
|
|
private static final Pattern WAIT_LOG_ARCHIVE_KEY =
|
||
|
|
Pattern.compile("\\bkey:\\s*([\\w\\-./]+/main\\.log)\\b");
|
||
|
|
|
||
|
|
private final MinioAttachmentService minioAttachmentService;
|
||
|
|
private final RestTemplate restTemplate;
|
||
|
|
private final KubernetesPodHealthService podHealthService;
|
||
|
|
private final PipelineUploadService pipelineUploadService;
|
||
|
|
private final ArgoServerLogService argoServerLogService;
|
||
|
|
|
||
|
|
@Value("${kubeflow.url:}")
|
||
|
|
private String kubeflowUrl;
|
||
|
|
@Value("${mlflow.url:}")
|
||
|
|
private String mlflowUrl;
|
||
|
|
/** true면 KFP ml-pipeline v1beta1 노드 로그 API를 kubectl보다 먼저 시도 (클러스터 내부 조회, UI와 동일 경로) */
|
||
|
|
@Value("${admin.k8s.prefer-kfp-api-for-logs:true}")
|
||
|
|
private boolean preferKfpApiForLogs;
|
||
|
|
/** true면 Argo Server 로그 API를 KFP/K8s보다 먼저 시도 (삭제된 Pod는 MinIO 아카이브) */
|
||
|
|
@Value("${admin.logs.prefer-argo-server:true}")
|
||
|
|
private boolean preferArgoServer;
|
||
|
|
|
||
|
|
private volatile Map<String, Object> cachedStatus;
|
||
|
|
private volatile long cachedAt;
|
||
|
|
|
||
|
|
/**
|
||
|
|
* KFP, MLflow, MinIO 상태를 조회. 30초 캐시 적용.
|
||
|
|
*/
|
||
|
|
public Map<String, Object> getStatus() {
|
||
|
|
long now = System.currentTimeMillis();
|
||
|
|
if (cachedStatus != null && (now - cachedAt) < CACHE_SECONDS * 1000L) {
|
||
|
|
return new HashMap<>(cachedStatus);
|
||
|
|
}
|
||
|
|
Map<String, Object> status = new HashMap<>();
|
||
|
|
status.put("kfp", checkKfp());
|
||
|
|
status.put("mlflow", checkMlflow());
|
||
|
|
status.put("minio", checkMinio());
|
||
|
|
status.put("updatedAt", Instant.now().toString());
|
||
|
|
cachedStatus = status;
|
||
|
|
cachedAt = now;
|
||
|
|
return new HashMap<>(status);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Kubernetes Pod 상태만 조회 (별도 버튼용).
|
||
|
|
* admin.k8s.enabled=false 여도 kfp/mlflow/minio 키는 항상 반환해 프론트에서 "Pod 없음" 대신 사유 표시.
|
||
|
|
*/
|
||
|
|
public Map<String, Object> getPodStatus() {
|
||
|
|
Map<String, Object> out = new HashMap<>();
|
||
|
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||
|
|
out.put("namespace", "");
|
||
|
|
out.put("message", "admin.k8s.enabled=false");
|
||
|
|
Map<String, Object> disabled = new HashMap<>();
|
||
|
|
disabled.put("ok", false);
|
||
|
|
disabled.put("message", "admin.k8s.enabled=true 설정 후 Pod 상태 조회 가능");
|
||
|
|
disabled.put("running", 0);
|
||
|
|
disabled.put("total", 0);
|
||
|
|
disabled.put("pods", java.util.Collections.emptyList());
|
||
|
|
out.put("kfp", disabled);
|
||
|
|
out.put("mlflow", new HashMap<>(disabled));
|
||
|
|
out.put("minio", new HashMap<>(disabled));
|
||
|
|
out.put("updatedAt", Instant.now().toString());
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
out.put("namespace", podHealthService.getNamespace());
|
||
|
|
out.put("kfp", podHealthService.getKfpPodStatus());
|
||
|
|
out.put("mlflow", podHealthService.getMlflowPodStatus());
|
||
|
|
out.put("minio", podHealthService.getMinioPodStatus());
|
||
|
|
out.put("updatedAt", Instant.now().toString());
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
private Map<String, Object> checkKfp() {
|
||
|
|
Map<String, Object> out = new HashMap<>();
|
||
|
|
String base = (kubeflowUrl != null ? kubeflowUrl.replaceAll("/+$", "") : "").trim();
|
||
|
|
if (base.isBlank()) {
|
||
|
|
out.put("status", "skip");
|
||
|
|
out.put("message", "kubeflow.url 미설정");
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
String url = base + "/apis/v2beta1/healthz";
|
||
|
|
try {
|
||
|
|
ResponseEntity<String> resp = restTemplate.getForEntity(url, String.class);
|
||
|
|
out.put("status", resp.getStatusCode().is2xxSuccessful() ? "ok" : "error");
|
||
|
|
out.put("message", "HTTP " + resp.getStatusCode().value());
|
||
|
|
} catch (Exception e) {
|
||
|
|
out.put("status", "error");
|
||
|
|
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
|
||
|
|
log.debug("[Admin] KFP health check failed: {}", e.getMessage());
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
private Map<String, Object> checkMlflow() {
|
||
|
|
Map<String, Object> out = new HashMap<>();
|
||
|
|
String base = (mlflowUrl != null ? mlflowUrl.replaceAll("/+$", "") : "").trim();
|
||
|
|
if (base.isBlank()) {
|
||
|
|
out.put("status", "skip");
|
||
|
|
out.put("message", "mlflow.url 미설정");
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
String url = base + "/health";
|
||
|
|
try {
|
||
|
|
ResponseEntity<String> resp = restTemplate.getForEntity(url, String.class);
|
||
|
|
out.put("status", resp.getStatusCode().is2xxSuccessful() ? "ok" : "error");
|
||
|
|
out.put("message", "HTTP " + resp.getStatusCode().value());
|
||
|
|
} catch (Exception e) {
|
||
|
|
out.put("status", "error");
|
||
|
|
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
|
||
|
|
log.debug("[Admin] MLflow health check failed: {}", e.getMessage());
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* MinIO 상태를 스크립트/첨부에서 쓰는 동일 계정(accessKey/secretKey)으로 확인.
|
||
|
|
* listBuckets() 호출로 연결·인증이 정상인지 검사.
|
||
|
|
*/
|
||
|
|
private Map<String, Object> checkMinio() {
|
||
|
|
Map<String, Object> out = new HashMap<>();
|
||
|
|
String endpoint = (minioAttachmentService.getMinioEndpoint() != null
|
||
|
|
? minioAttachmentService.getMinioEndpoint().replaceAll("/+$", "") : "").trim();
|
||
|
|
if (endpoint.isBlank()) {
|
||
|
|
out.put("status", "skip");
|
||
|
|
out.put("message", "MinIO endpoint 미설정");
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
String accessKey = minioAttachmentService.getMinioAccessKey();
|
||
|
|
String secretKey = minioAttachmentService.getMinioSecretKey();
|
||
|
|
if (accessKey == null) accessKey = "";
|
||
|
|
if (secretKey == null) secretKey = "";
|
||
|
|
try {
|
||
|
|
MinioClient client = MinioClient.builder()
|
||
|
|
.endpoint(endpoint)
|
||
|
|
.credentials(accessKey, secretKey)
|
||
|
|
.build();
|
||
|
|
client.listBuckets(ListBucketsArgs.builder().build());
|
||
|
|
out.put("status", "ok");
|
||
|
|
out.put("message", "연결 정상 (동일 계정)");
|
||
|
|
} catch (Exception e) {
|
||
|
|
out.put("status", "error");
|
||
|
|
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
|
||
|
|
log.debug("[Admin] MinIO health check failed: {}", e.getMessage());
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* 재시작 요청 수신. 실제 재시작은 K8s API 등 외부에서 수행하도록 설계.
|
||
|
|
* 현재는 요청 로깅 후 안내 메시지 반환.
|
||
|
|
*/
|
||
|
|
public Map<String, Object> requestRestart(String service) {
|
||
|
|
log.info("[Admin] restart requested for service: {}", service);
|
||
|
|
Map<String, Object> out = new HashMap<>();
|
||
|
|
if (service == null || !java.util.Set.of("kfp", "mlflow", "minio").contains(service.toLowerCase())) {
|
||
|
|
out.put("success", false);
|
||
|
|
out.put("message", "지원하지 않는 서비스입니다. (kfp, mlflow, minio 중 하나)");
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
out.put("success", true);
|
||
|
|
out.put("message", "재시작 요청이 접수되었습니다. 실제 재시작은 Kubernetes 등에서 별도 설정이 필요합니다.");
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* runId(KFP run id)에 해당하는 파이프라인 실행 Pod 목록. Executions 상세 로그 버튼용.
|
||
|
|
*/
|
||
|
|
public Map<String, Object> getPodsByRunId(String runId) {
|
||
|
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||
|
|
Map<String, Object> out = new HashMap<>();
|
||
|
|
out.put("namespace", "");
|
||
|
|
out.put("pods", java.util.Collections.emptyList());
|
||
|
|
out.put("message", "admin.k8s.enabled=false");
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
Map<String, Object> out = podHealthService.listPodsByRunId(runId);
|
||
|
|
@SuppressWarnings("unchecked")
|
||
|
|
List<Map<String, String>> pods = (List<Map<String, String>>) out.get("pods");
|
||
|
|
if (pods != null && !pods.isEmpty()) {
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
|
||
|
|
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
|
||
|
|
if (tasks.isEmpty()) {
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
String ns = podHealthService.getNamespace();
|
||
|
|
List<Map<String, String>> fromKfp = new ArrayList<>();
|
||
|
|
Set<String> seen = new HashSet<>();
|
||
|
|
for (Map<String, Object> t : tasks) {
|
||
|
|
String pod = firstString(t.get("podName"), t.get("pod_name"));
|
||
|
|
if (pod == null || pod.isBlank() || !seen.add(pod)) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
Map<String, String> e = new LinkedHashMap<>();
|
||
|
|
e.put("name", pod);
|
||
|
|
e.put("phase", kfpStateToPhase(t.get("state")));
|
||
|
|
e.put("displayName", firstString(t.get("displayName"), t.get("display_name")));
|
||
|
|
fromKfp.add(e);
|
||
|
|
}
|
||
|
|
if (!fromKfp.isEmpty()) {
|
||
|
|
out.put("namespace", ns);
|
||
|
|
out.put("pods", fromKfp);
|
||
|
|
out.put("message", fromKfp.size() + " pods (KFP run_details.task_details)");
|
||
|
|
out.remove("hint");
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Pod 로그 조회. admin.k8s.enabled=true 일 때만 동작.
|
||
|
|
*/
|
||
|
|
public String getPodLog(String namespace, String podName, String container, Integer tailLines) {
|
||
|
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||
|
|
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
|
||
|
|
}
|
||
|
|
return podHealthService.getPodLog(namespace, podName, container, tailLines);
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Run 로그. 기본(allSteps=false): KFP UI처럼 <b>한 스텝</b>만 — 실패한 스텝이 있으면 그 Pod, 없으면 마지막 스텝 Pod.
|
||
|
|
* allSteps=true 이면 task_details 순서대로 전체 Pod 로그.
|
||
|
|
* podName 지정 시 해당 Pod만. stepName 지정 시 displayName 부분 일치 스텝.
|
||
|
|
* Pod는 {@code admin.k8s.pipeline-pod-namespaces} 순으로 네임스페이스 탐색.
|
||
|
|
*/
|
||
|
|
public String getPodLogsByRunId(
|
||
|
|
String runId,
|
||
|
|
Integer tailLines,
|
||
|
|
boolean allSteps,
|
||
|
|
String podNameParam,
|
||
|
|
String stepNameParam,
|
||
|
|
String workflowNameParam,
|
||
|
|
String workflowNamespaceParam) {
|
||
|
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||
|
|
if (argoServerLogService != null
|
||
|
|
&& argoServerLogService.isConfigured()
|
||
|
|
&& runId != null
|
||
|
|
&& !runId.isBlank()
|
||
|
|
&& podNameParam != null
|
||
|
|
&& !podNameParam.isBlank()) {
|
||
|
|
String wfNs =
|
||
|
|
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
|
||
|
|
? workflowNamespaceParam.trim()
|
||
|
|
: "kubeflow";
|
||
|
|
String wfName =
|
||
|
|
workflowNameParam != null && !workflowNameParam.isBlank()
|
||
|
|
? workflowNameParam.trim()
|
||
|
|
: ArgoServerLogService.guessWorkflowNameFromExecutorPod(podNameParam.trim());
|
||
|
|
if (wfName != null) {
|
||
|
|
ArgoServerLogService.ArgoLogFetchResult al =
|
||
|
|
argoServerLogService.fetchWorkflowPodLog(
|
||
|
|
wfNs, wfName, podNameParam.trim(), normalizeTail(tailLines));
|
||
|
|
if (isSubstantialArgoLogBody(al != null ? al.logText : null)) {
|
||
|
|
return "-- Argo Server /api/v1/workflows/"
|
||
|
|
+ wfNs
|
||
|
|
+ "/"
|
||
|
|
+ wfName
|
||
|
|
+ "/log (kubectl 없음) --\n\n"
|
||
|
|
+ al.logText;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (runId != null && !runId.isBlank()
|
||
|
|
&& podNameParam != null && !podNameParam.isBlank()
|
||
|
|
&& kubeflowUrl != null && !kubeflowUrl.isBlank()) {
|
||
|
|
String kfpOnly = tryKfpMlPipelineNodeLog(runId, podNameParam.trim());
|
||
|
|
if (kfpOnly != null) {
|
||
|
|
return "-- KFP ml-pipeline API (kubectl 없이) v1beta1 노드 로그 --\n\n" + kfpOnly;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
|
||
|
|
}
|
||
|
|
if (runId == null || runId.isBlank()) {
|
||
|
|
return "runId가 없습니다.";
|
||
|
|
}
|
||
|
|
if (podNameParam != null && !podNameParam.isBlank()) {
|
||
|
|
KubernetesPodHealthService.ExecutorPodResolution res0 =
|
||
|
|
podHealthService.resolveKfpExecutorImplPod(runId, podNameParam.trim());
|
||
|
|
String wfNsPod =
|
||
|
|
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
|
||
|
|
? workflowNamespaceParam.trim()
|
||
|
|
: res0.namespace;
|
||
|
|
ArgoServerLogService.ArgoLogFetchResult argoDebug = null;
|
||
|
|
if (preferArgoServer && argoServerLogService.isConfigured()) {
|
||
|
|
argoDebug =
|
||
|
|
tryArgoWorkflowLogForStep(
|
||
|
|
wfNsPod,
|
||
|
|
workflowNameParam,
|
||
|
|
res0.podName,
|
||
|
|
podNameParam.trim(),
|
||
|
|
tailLines);
|
||
|
|
if (argoDebug != null && isSubstantialArgoLogBody(argoDebug.logText)) {
|
||
|
|
return "-- Argo Server API (Pod 종료 후 MinIO 아카이브 포함) | wf="
|
||
|
|
+ wfNsPod
|
||
|
|
+ " | pod="
|
||
|
|
+ res0.podName
|
||
|
|
+ " --\n\n"
|
||
|
|
+ argoDebug.logText;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (preferKfpApiForLogs) {
|
||
|
|
String kfpFirst = tryKfpMlPipelineNodeLog(runId, res0.podName, podNameParam.trim());
|
||
|
|
if (kfpFirst != null) {
|
||
|
|
return "-- KFP ml-pipeline API (UI와 동일) | node: " + res0.podName
|
||
|
|
+ (podNameParam.trim().equals(res0.podName) ? "" : " (요청: " + podNameParam.trim() + ")")
|
||
|
|
+ " --\n\n" + kfpFirst;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
KubernetesPodHealthService.ExecutorPodResolution res =
|
||
|
|
podHealthService.resolveKfpExecutorImplPod(runId, podNameParam.trim());
|
||
|
|
String logText = podHealthService.readPodLogInNamespace(
|
||
|
|
res.namespace, res.podName, normalizeTail(tailLines));
|
||
|
|
if (logText != null && logText.startsWith("로그 조회 실패")) {
|
||
|
|
KubernetesPodHealthService.PipelinePodLogOutcome fb =
|
||
|
|
podHealthService.readPipelinePodLog(res.podName, null, normalizeTail(tailLines));
|
||
|
|
logText = fb.logText;
|
||
|
|
}
|
||
|
|
String out = "-- kubectl logs " + res.podName + " -n " + res.namespace
|
||
|
|
+ (podNameParam.trim().equals(res.podName) ? "" : " (요청: " + podNameParam.trim() + ")")
|
||
|
|
+ " --\n\n" + (logText != null ? logText : "");
|
||
|
|
String archived = tryFetchArchivedMainLogFromWait(logText);
|
||
|
|
if (archived != null) {
|
||
|
|
out = out + "\n\n" + archived;
|
||
|
|
}
|
||
|
|
if ((logText == null || logText.trim().isEmpty() || logText.startsWith("로그 조회 실패"))
|
||
|
|
&& argoDebug != null
|
||
|
|
&& argoDebug.url != null
|
||
|
|
&& !argoDebug.url.isBlank()) {
|
||
|
|
out =
|
||
|
|
out
|
||
|
|
+ "\n\n-- Argo Server 시도(진단) --\n"
|
||
|
|
+ "url: "
|
||
|
|
+ argoDebug.url
|
||
|
|
+ "\nstatus: "
|
||
|
|
+ (argoDebug.statusCode != null ? argoDebug.statusCode : "(none)")
|
||
|
|
+ "\nerror: "
|
||
|
|
+ (argoDebug.error != null ? argoDebug.error : "(none)")
|
||
|
|
+ "\n(설정: argo.server.url / 필요시 argo.server.token)";
|
||
|
|
}
|
||
|
|
return out;
|
||
|
|
}
|
||
|
|
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
|
||
|
|
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
|
||
|
|
if (tasks.isEmpty()) {
|
||
|
|
String hint =
|
||
|
|
"KFP Run에 task_details/pod_name이 없거나 API 조회 실패입니다. "
|
||
|
|
+ "멀티유저 환경이면 application.properties 에 "
|
||
|
|
+ "admin.k8s.pipeline-pod-namespaces=파이프라인Pod가있는NS 를 설정하세요.\n";
|
||
|
|
return hint + podHealthService.aggregatePodLogsByRunId(runId, tailLines);
|
||
|
|
}
|
||
|
|
if (stepNameParam != null && !stepNameParam.isBlank()) {
|
||
|
|
String hint = stepNameParam.trim().toLowerCase();
|
||
|
|
for (Map<String, Object> t : tasks) {
|
||
|
|
String dn = firstString(t.get("displayName"), t.get("display_name"));
|
||
|
|
if (dn != null && dn.toLowerCase().contains(hint)) {
|
||
|
|
return formatSingleTaskLog(runId, t, tailLines, workflowNameParam, workflowNamespaceParam);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return "스텝 표시명에 \"" + stepNameParam + "\" 가 포함된 task를 찾지 못했습니다. (allSteps=true 로 전체 확인 가능)";
|
||
|
|
}
|
||
|
|
if (allSteps) {
|
||
|
|
List<String> podNames = new ArrayList<>();
|
||
|
|
List<String> stepNames = new ArrayList<>();
|
||
|
|
Set<String> seen = new HashSet<>();
|
||
|
|
for (Map<String, Object> t : tasks) {
|
||
|
|
String pod = firstString(t.get("podName"), t.get("pod_name"));
|
||
|
|
if (pod == null || pod.isBlank() || !seen.add(pod)) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
podNames.add(pod);
|
||
|
|
stepNames.add(firstString(t.get("displayName"), t.get("display_name")));
|
||
|
|
}
|
||
|
|
return podHealthService.aggregatePodLogsForKfpTasks(runId, podNames, stepNames, tailLines);
|
||
|
|
}
|
||
|
|
Map<String, Object> chosen = pickPrimaryKfpTaskForLog(tasks);
|
||
|
|
return formatSingleTaskLog(runId, chosen, tailLines, workflowNameParam, workflowNamespaceParam);
|
||
|
|
}
|
||
|
|
|
||
|
|
private static Integer normalizeTail(Integer tailLines) {
|
||
|
|
if (tailLines == null) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
if (tailLines <= 0) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
return tailLines;
|
||
|
|
}
|
||
|
|
|
||
|
|
private String formatSingleTaskLog(
|
||
|
|
String runId,
|
||
|
|
Map<String, Object> task,
|
||
|
|
Integer tailLines,
|
||
|
|
String workflowNameParam,
|
||
|
|
String workflowNamespaceParam) {
|
||
|
|
String pod = firstString(task.get("podName"), task.get("pod_name"));
|
||
|
|
String step = firstString(task.get("displayName"), task.get("display_name"));
|
||
|
|
if (pod == null || pod.isBlank()) {
|
||
|
|
return "선택된 task에 pod_name이 없습니다.";
|
||
|
|
}
|
||
|
|
KubernetesPodHealthService.ExecutorPodResolution res =
|
||
|
|
podHealthService.resolveKfpExecutorImplPod(runId, pod.trim());
|
||
|
|
String wfNsEff =
|
||
|
|
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
|
||
|
|
? workflowNamespaceParam.trim()
|
||
|
|
: res.namespace;
|
||
|
|
if (preferArgoServer && argoServerLogService.isConfigured()) {
|
||
|
|
ArgoServerLogService.ArgoLogFetchResult argoLog =
|
||
|
|
tryArgoWorkflowLogForStep(
|
||
|
|
wfNsEff, workflowNameParam, res.podName, pod.trim(), tailLines);
|
||
|
|
if (argoLog != null && isSubstantialArgoLogBody(argoLog.logText)) {
|
||
|
|
StringBuilder a = new StringBuilder();
|
||
|
|
a.append("-- Argo Server workflow log (실시간·아카이브 자동) --\n");
|
||
|
|
a.append("namespace: ").append(wfNsEff).append(" | pod: ").append(res.podName);
|
||
|
|
if (!pod.trim().equals(res.podName)) {
|
||
|
|
a.append(" | KFP pod_name: ").append(pod.trim());
|
||
|
|
}
|
||
|
|
a.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
|
||
|
|
a.append(argoLog.logText != null ? argoLog.logText : "");
|
||
|
|
return a.toString();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (preferKfpApiForLogs) {
|
||
|
|
String kfpLog = tryKfpMlPipelineNodeLog(runId, res.podName, pod.trim());
|
||
|
|
if (kfpLog != null) {
|
||
|
|
StringBuilder k = new StringBuilder();
|
||
|
|
k.append("-- KFP ml-pipeline API v1beta1/runs/.../nodes/{node_id}/log (UI와 동일 백엔드) --\n");
|
||
|
|
k.append("node_id: ").append(res.podName);
|
||
|
|
if (!pod.trim().equals(res.podName)) {
|
||
|
|
k.append(" | KFP task pod_name: ").append(pod.trim());
|
||
|
|
}
|
||
|
|
k.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
|
||
|
|
k.append(kfpLog);
|
||
|
|
return k.toString();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
String logText = podHealthService.readPodLogInNamespace(
|
||
|
|
res.namespace, res.podName, normalizeTail(tailLines));
|
||
|
|
if (logText != null && logText.startsWith("로그 조회 실패")) {
|
||
|
|
KubernetesPodHealthService.PipelinePodLogOutcome fb =
|
||
|
|
podHealthService.readPipelinePodLog(res.podName, null, normalizeTail(tailLines));
|
||
|
|
logText = fb.logText;
|
||
|
|
}
|
||
|
|
StringBuilder sb = new StringBuilder();
|
||
|
|
sb.append("-- kubectl logs ").append(res.podName).append(" -n ").append(res.namespace);
|
||
|
|
sb.append(" (KFP 로그와 동일) | Step: ").append(step != null ? step : "(이름 없음)");
|
||
|
|
if (!pod.trim().equals(res.podName)) {
|
||
|
|
sb.append(" | KFP API pod_name: ").append(pod.trim());
|
||
|
|
}
|
||
|
|
sb.append(" --\n\n");
|
||
|
|
sb.append(logText != null ? logText : "");
|
||
|
|
String archived = tryFetchArchivedMainLogFromWait(logText);
|
||
|
|
if (archived != null) {
|
||
|
|
sb.append("\n\n").append(archived);
|
||
|
|
}
|
||
|
|
return sb.toString();
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* KFP UI가 쓰는 것과 같은 ml-pipeline 노드 로그. node_id 후보 순서대로 시도.
|
||
|
|
*/
|
||
|
|
private String tryKfpMlPipelineNodeLog(String runId, String... nodeIdsOrdered) {
|
||
|
|
if (runId == null || runId.isBlank() || kubeflowUrl == null || kubeflowUrl.isBlank()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
LinkedHashSet<String> seen = new LinkedHashSet<>();
|
||
|
|
for (String id : nodeIdsOrdered) {
|
||
|
|
if (id == null || id.isBlank()) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
String t = id.trim();
|
||
|
|
if (!seen.add(t)) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
String body = pipelineUploadService.getV1beta1RunNodeLog(runId, t);
|
||
|
|
if (isSubstantialKfpV1LogBody(body)) {
|
||
|
|
return body;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
|
||
|
|
private static boolean isSubstantialKfpV1LogBody(String s) {
|
||
|
|
if (s == null) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
String t = s.trim();
|
||
|
|
if (t.length() < 15) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
if (t.startsWith("{") && (t.contains("\"error\"") || t.contains("Error"))) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
String low = t.toLowerCase();
|
||
|
|
if (low.startsWith("failed to get")) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
private ArgoServerLogService.ArgoLogFetchResult tryArgoWorkflowLogForStep(
|
||
|
|
String workflowNamespace,
|
||
|
|
String workflowNameOverride,
|
||
|
|
String implPodName,
|
||
|
|
String apiPodHint,
|
||
|
|
Integer tailLines) {
|
||
|
|
if (!preferArgoServer || argoServerLogService == null || !argoServerLogService.isConfigured()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
if (workflowNamespace == null || workflowNamespace.isBlank()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
String wfNs = workflowNamespace.trim();
|
||
|
|
String wfName =
|
||
|
|
workflowNameOverride != null && !workflowNameOverride.isBlank()
|
||
|
|
? workflowNameOverride.trim()
|
||
|
|
: null;
|
||
|
|
if (wfName == null && podHealthService != null && podHealthService.isEnabled()) {
|
||
|
|
wfName = podHealthService.getArgoWorkflowNameFromPod(wfNs, implPodName);
|
||
|
|
}
|
||
|
|
if (wfName == null || wfName.isBlank()) {
|
||
|
|
wfName = ArgoServerLogService.guessWorkflowNameFromExecutorPod(implPodName);
|
||
|
|
}
|
||
|
|
if (wfName == null || wfName.isBlank()) {
|
||
|
|
wfName = ArgoServerLogService.guessWorkflowNameFromExecutorPod(apiPodHint);
|
||
|
|
}
|
||
|
|
if (wfName == null || wfName.isBlank()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
return argoServerLogService.fetchWorkflowPodLog(
|
||
|
|
wfNs, wfName, implPodName, normalizeTail(tailLines));
|
||
|
|
}
|
||
|
|
|
||
|
|
private static boolean isSubstantialArgoLogBody(String s) {
|
||
|
|
if (s == null) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
String t = s.trim();
|
||
|
|
if (t.length() < 12) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
if (t.startsWith("{") && (t.contains("\"error\"") || t.contains("Error"))) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
String low = t.toLowerCase();
|
||
|
|
if (low.startsWith("rpc error") || low.startsWith("failed to retrieve")) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* KFP/Argo executor impl Pod에서 kubectl로 잡힌 wait 로그에 main.log 아카이브 key가 찍히면,
|
||
|
|
* MinIO(mlpipeline 버킷)에서 해당 객체를 읽어와 "실제 실행 로그(main)"를 보여준다.
|
||
|
|
*/
|
||
|
|
private String tryFetchArchivedMainLogFromWait(String logText) {
|
||
|
|
if (logText == null || logText.isBlank()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
String t = logText.trim();
|
||
|
|
if (!t.startsWith("-- container=wait --")) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
Matcher m = WAIT_LOG_ARCHIVE_KEY.matcher(t);
|
||
|
|
if (!m.find()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
String key = m.group(1);
|
||
|
|
if (key == null || key.isBlank()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
try {
|
||
|
|
byte[] bytes = minioAttachmentService.downloadFile(KFP_ARCHIVE_BUCKET, key.trim());
|
||
|
|
String body = new String(bytes, StandardCharsets.UTF_8);
|
||
|
|
if (body.isBlank()) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
return "-- MinIO archived main.log (bucket=" + KFP_ARCHIVE_BUCKET + ", key=" + key.trim() + ") --\n\n" + body;
|
||
|
|
} catch (Exception e) {
|
||
|
|
return "-- MinIO archived main.log 읽기 실패 (bucket=" + KFP_ARCHIVE_BUCKET + ", key=" + key.trim() + "): "
|
||
|
|
+ (e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName())
|
||
|
|
+ " --";
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/** 실패 스텝 중 시간상 마지막 것, 없으면 생성 순 마지막 스텝 */
|
||
|
|
private static Map<String, Object> pickPrimaryKfpTaskForLog(List<Map<String, Object>> tasks) {
|
||
|
|
List<Map<String, Object>> failed = new ArrayList<>();
|
||
|
|
for (Map<String, Object> t : tasks) {
|
||
|
|
if (kfpTaskStateFailed(t)) {
|
||
|
|
failed.add(t);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (!failed.isEmpty()) {
|
||
|
|
return failed.get(failed.size() - 1);
|
||
|
|
}
|
||
|
|
return tasks.get(tasks.size() - 1);
|
||
|
|
}
|
||
|
|
|
||
|
|
private static boolean kfpTaskStateFailed(Map<String, Object> task) {
|
||
|
|
Object st = task.get("state");
|
||
|
|
if (st instanceof Map) {
|
||
|
|
Object rs = ((Map<?, ?>) st).get("runtimeState");
|
||
|
|
if (rs == null) {
|
||
|
|
rs = ((Map<?, ?>) st).get("runtime_state");
|
||
|
|
}
|
||
|
|
if (rs != null) {
|
||
|
|
st = rs;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (st instanceof Number) {
|
||
|
|
return ((Number) st).intValue() == 5;
|
||
|
|
}
|
||
|
|
String s = String.valueOf(st).toUpperCase();
|
||
|
|
return s.contains("FAILED") || s.contains("ERROR") || s.contains("CANCEL");
|
||
|
|
}
|
||
|
|
|
||
|
|
@SuppressWarnings("unchecked")
|
||
|
|
private static Map<String, Object> unwrapKfpTaskMap(Object node) {
|
||
|
|
if (!(node instanceof Map)) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
Map<String, Object> m = (Map<String, Object>) node;
|
||
|
|
Object inner = m.get("task");
|
||
|
|
if (inner instanceof Map) {
|
||
|
|
return (Map<String, Object>) inner;
|
||
|
|
}
|
||
|
|
return m;
|
||
|
|
}
|
||
|
|
|
||
|
|
/** task_details 항목에서 pod가 있는 태스크를 모음(평면 배열 + 중첩 child). Pod당 최신 항목 유지. */
|
||
|
|
@SuppressWarnings("unchecked")
|
||
|
|
private static void mergeKfpTaskWithPod(Object node, Map<String, Map<String, Object>> byPod) {
|
||
|
|
Map<String, Object> task = unwrapKfpTaskMap(node);
|
||
|
|
if (task == null) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
String pod = firstString(task.get("podName"), task.get("pod_name"));
|
||
|
|
if (pod != null && !pod.isBlank()) {
|
||
|
|
byPod.put(pod.trim(), task);
|
||
|
|
}
|
||
|
|
Object child = task.get("childTasks");
|
||
|
|
if (child == null) {
|
||
|
|
child = task.get("child_tasks");
|
||
|
|
}
|
||
|
|
if (!(child instanceof List)) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
for (Object c : (List<?>) child) {
|
||
|
|
if (!(c instanceof Map)) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
Map<String, Object> cm = (Map<String, Object>) c;
|
||
|
|
if (cm.get("task") instanceof Map) {
|
||
|
|
mergeKfpTaskWithPod(cm.get("task"), byPod);
|
||
|
|
} else if (firstString(cm.get("podName"), cm.get("pod_name")) != null) {
|
||
|
|
mergeKfpTaskWithPod(cm, byPod);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
@SuppressWarnings("unchecked")
|
||
|
|
private static List<Map<String, Object>> kfpTasksWithPods(Map<String, Object> run) {
|
||
|
|
if (run == null) {
|
||
|
|
return List.of();
|
||
|
|
}
|
||
|
|
Object rd = run.get("runDetails");
|
||
|
|
if (rd == null) {
|
||
|
|
rd = run.get("run_details");
|
||
|
|
}
|
||
|
|
if (!(rd instanceof Map)) {
|
||
|
|
return List.of();
|
||
|
|
}
|
||
|
|
Object td = ((Map<?, ?>) rd).get("taskDetails");
|
||
|
|
if (td == null) {
|
||
|
|
td = ((Map<?, ?>) rd).get("task_details");
|
||
|
|
}
|
||
|
|
if (!(td instanceof List)) {
|
||
|
|
return List.of();
|
||
|
|
}
|
||
|
|
Map<String, Map<String, Object>> byPod = new LinkedHashMap<>();
|
||
|
|
for (Object item : (List<?>) td) {
|
||
|
|
mergeKfpTaskWithPod(item, byPod);
|
||
|
|
}
|
||
|
|
List<Map<String, Object>> flat = new ArrayList<>(byPod.values());
|
||
|
|
flat.sort(Comparator.comparing(m -> {
|
||
|
|
Object ct = m.get("createTime");
|
||
|
|
if (ct == null) {
|
||
|
|
ct = m.get("create_time");
|
||
|
|
}
|
||
|
|
return ct != null ? ct.toString() : "";
|
||
|
|
}));
|
||
|
|
return flat;
|
||
|
|
}
|
||
|
|
|
||
|
|
private static String firstString(Object a, Object b) {
|
||
|
|
if (a != null) {
|
||
|
|
String s = String.valueOf(a).trim();
|
||
|
|
if (!s.isEmpty()) {
|
||
|
|
return s;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if (b != null) {
|
||
|
|
String s = String.valueOf(b).trim();
|
||
|
|
if (!s.isEmpty()) {
|
||
|
|
return s;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
|
||
|
|
@SuppressWarnings("unchecked")
|
||
|
|
private static String kfpStateToPhase(Object state) {
|
||
|
|
if (state == null) {
|
||
|
|
return null;
|
||
|
|
}
|
||
|
|
if (state instanceof Map) {
|
||
|
|
Object rs = ((Map<?, ?>) state).get("runtimeState");
|
||
|
|
if (rs == null) {
|
||
|
|
rs = ((Map<?, ?>) state).get("runtime_state");
|
||
|
|
}
|
||
|
|
if (rs != null) {
|
||
|
|
state = rs;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
String s = state.toString();
|
||
|
|
if (s.contains("RUNNING")) {
|
||
|
|
return "Running";
|
||
|
|
}
|
||
|
|
if (s.contains("SUCCEEDED")) {
|
||
|
|
return "Succeeded";
|
||
|
|
}
|
||
|
|
if (s.contains("FAILED") || s.contains("ERROR")) {
|
||
|
|
return "Failed";
|
||
|
|
}
|
||
|
|
if (s.contains("PENDING") || s.contains("SKIPPED")) {
|
||
|
|
return s.contains("SKIPPED") ? "Skipped" : "Pending";
|
||
|
|
}
|
||
|
|
return s;
|
||
|
|
}
|
||
|
|
|
||
|
|
/** 관리자 페이지: 동일 카드(KFP/MLflow/MinIO)에 속한 Pod들 로그를 한 문자열로 */
|
||
|
|
public String getPodLogsAggregate(String namespace, java.util.List<String> podNames, Integer tailLines) {
|
||
|
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||
|
|
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
|
||
|
|
}
|
||
|
|
return podHealthService.aggregatePodLogsByNames(namespace, podNames, tailLines);
|
||
|
|
}
|
||
|
|
}
|