Compare commits
8 Commits
main
...
feature/ap
| Author | SHA1 | Date |
|---|---|---|
|
|
ec78bd1210 | 31 minutes ago |
|
|
5ac6190235 | 6 days ago |
|
|
6ecd8e137c | 4 weeks ago |
|
|
db91eee239 | 4 weeks ago |
|
|
21f9059acb | 4 weeks ago |
|
|
abb2afcda0 | 4 weeks ago |
|
|
074aea2b33 | 4 weeks ago |
|
|
4b52381876 | 4 weeks ago |
@ -0,0 +1,51 @@
|
|||||||
|
# CLAUDE.md
|
||||||
|
|
||||||
|
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
|
||||||
|
|
||||||
|
## 1. Common Development Tasks
|
||||||
|
|
||||||
|
### Build and Run
|
||||||
|
|
||||||
|
- **Run the application (Spring Boot dev mode):** `./gradlew bootRun`
|
||||||
|
- **Build JAR:** `./gradlew build`
|
||||||
|
- **Run JAR:** `java -jar build/libs/autoflow-server-mgmt-0.0.1-SNAPSHOT.jar`
|
||||||
|
|
||||||
|
### Testing
|
||||||
|
|
||||||
|
- **Run all tests:** `./gradlew test`
|
||||||
|
|
||||||
|
## 2. High-Level Code Architecture and Structure
|
||||||
|
|
||||||
|
This project is the core management server for the AutoFlow system, built with Spring Boot. It manages machine learning pipelines, datasets, and integrates with external systems like Kubeflow, MLflow, and OTA.
|
||||||
|
|
||||||
|
### Key Features:
|
||||||
|
- **Authentication & Security:** JWT-based authentication with refresh tokens, cookie-based token storage, and fine-grained access control.
|
||||||
|
- **Project & Data Management:** Hierarchical management of projects, data groups, and datasets.
|
||||||
|
- **ML Pipeline & Workflow:** Integration with Kubeflow for experiment/run management and pipeline uploads; MLflow for experiment tracking.
|
||||||
|
- **File Management:** AWS S3 integration for large file storage and multipart uploads.
|
||||||
|
- **External Integrations:** OTA for external authentication/package search, Spring Batch for large data processing.
|
||||||
|
|
||||||
|
### Project Structure (`kr.re.etri.autoflow` package):
|
||||||
|
- `controllers`: API endpoints for authentication, projects, data, etc.
|
||||||
|
- `service`: Business logic implementation.
|
||||||
|
- `repository`: Data access layer using JPA.
|
||||||
|
- `entity`: Database entities.
|
||||||
|
- `security`: Spring Security and JWT handling.
|
||||||
|
- `batch`: Spring Batch job configurations.
|
||||||
|
- `payload`: Request/Response Data Transfer Objects (DTOs).
|
||||||
|
- `models`: Domain models.
|
||||||
|
- `exception`: Global exception handling.
|
||||||
|
- `common`: Common utilities and configurations.
|
||||||
|
|
||||||
|
### Technologies:
|
||||||
|
- **Language:** Java 17
|
||||||
|
- **Framework:** Spring Boot 3.5.6
|
||||||
|
- **Build Tool:** Gradle
|
||||||
|
- **Database:** MariaDB (JPA / Hibernate)
|
||||||
|
- **Security:** Spring Security, JWT
|
||||||
|
- **Storage:** AWS S3
|
||||||
|
- **Batch Processing:** Spring Batch
|
||||||
|
- **API Documentation:** Springdoc OpenAPI (Swagger UI)
|
||||||
|
|
||||||
|
### Configuration:
|
||||||
|
- The `src/main/resources/application.properties` file contains critical settings for the database, JWT secret, AWS S3, Kubeflow, and MLflow URLs. These must be configured for the application to run correctly.
|
||||||
@ -1,6 +1,7 @@
|
|||||||
#Fri May 08 16:07:48 KST 2026
|
|
||||||
distributionBase=GRADLE_USER_HOME
|
distributionBase=GRADLE_USER_HOME
|
||||||
distributionPath=wrapper/dists
|
distributionPath=wrapper/dists
|
||||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.4-bin.zip
|
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
|
||||||
|
networkTimeout=10000
|
||||||
|
validateDistributionUrl=true
|
||||||
zipStoreBase=GRADLE_USER_HOME
|
zipStoreBase=GRADLE_USER_HOME
|
||||||
zipStorePath=wrapper/dists
|
zipStorePath=wrapper/dists
|
||||||
|
|||||||
@ -0,0 +1,60 @@
|
|||||||
|
# Autoflow DB 생성 (WSL / 로컬)
|
||||||
|
|
||||||
|
## 1. MariaDB/MySQL이 이미 설치된 경우
|
||||||
|
|
||||||
|
### 1) DB와 사용자만 만들기
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# WSL 또는 로컬 터미널에서 (root 비밀번호 입력 필요)
|
||||||
|
mysql -u root -p < scripts/init-autoflow-db.sql
|
||||||
|
```
|
||||||
|
|
||||||
|
또는 MySQL 클라이언트 접속 후:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SOURCE /경로/autoflow-server-mgmt-main/autoflow-server-mgmt/scripts/init-autoflow-db.sql;
|
||||||
|
```
|
||||||
|
|
||||||
|
### 2) 테이블 + 초기 데이터 (data.sql 사용)
|
||||||
|
|
||||||
|
**WSL 프로파일(`application-wsl.properties`)에서는 이미 아래가 설정되어 있습니다.**
|
||||||
|
|
||||||
|
- `spring.jpa.hibernate.ddl-auto=update` → 엔티티 기준으로 `tb_*` 테이블 자동 생성
|
||||||
|
- `spring.sql.init.mode=always` → 기동 시 **`src/main/resources/data.sql`** 자동 실행
|
||||||
|
|
||||||
|
`data.sql`에서 하는 일:
|
||||||
|
|
||||||
|
- BATCH_* 테이블 생성 (Spring Batch용)
|
||||||
|
- 초기 데이터 INSERT: `tb_role`, `tb_user`, `tb_project`, `tb_user_roles` 등
|
||||||
|
|
||||||
|
따라서 **DB(autoflow)와 사용자(autoflow)만 만든 뒤** 백엔드를 `--spring.profiles.active=wsl`로 실행하면, 테이블 생성과 data.sql 적용이 한 번에 이루어집니다. 별도 스키마 SQL 실행은 필요 없습니다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. MariaDB가 없을 때 (Docker로 설치)
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -d \
|
||||||
|
--name autoflow-mariadb \
|
||||||
|
-p 3306:3306 \
|
||||||
|
-e MARIADB_ROOT_PASSWORD=root \
|
||||||
|
-e MARIADB_DATABASE=autoflow \
|
||||||
|
-e MARIADB_USER=autoflow \
|
||||||
|
-e MARIADB_PASSWORD=autoflow \
|
||||||
|
mariadb:latest
|
||||||
|
```
|
||||||
|
|
||||||
|
이렇게 하면 `autoflow` DB와 사용자 `autoflow`/비밀번호 `autoflow`가 자동 생성됩니다.
|
||||||
|
그 다음 위 **1.2) 테이블 생성** 중 하나를 진행하면 됩니다.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. application-wsl.properties와 맞추기
|
||||||
|
|
||||||
|
현재 WSL 설정 기준:
|
||||||
|
|
||||||
|
- **URL:** `jdbc:mariadb://localhost:3306/autoflow`
|
||||||
|
- **사용자:** `autoflow`
|
||||||
|
- **비밀번호:** `autoflow`
|
||||||
|
|
||||||
|
다른 포트/비밀번호를 쓰면 `application-wsl.properties`의 `spring.datasource.*` 값을 같이 수정하면 됩니다.
|
||||||
@ -0,0 +1,20 @@
|
|||||||
|
-- autoflow DB 및 사용자 생성 (MariaDB/MySQL)
|
||||||
|
-- 실행: root 또는 DB 관리자로 실행
|
||||||
|
-- 예: mysql -u root -p < init-autoflow-db.sql
|
||||||
|
-- 또는 mysql -u root -p 후 소스 또는 붙여넣기
|
||||||
|
|
||||||
|
CREATE DATABASE IF NOT EXISTS autoflow
|
||||||
|
CHARACTER SET utf8mb4
|
||||||
|
COLLATE utf8mb4_unicode_ci;
|
||||||
|
|
||||||
|
CREATE USER IF NOT EXISTS 'autoflow'@'localhost' IDENTIFIED BY 'autoflow';
|
||||||
|
CREATE USER IF NOT EXISTS 'autoflow'@'%' IDENTIFIED BY 'autoflow';
|
||||||
|
|
||||||
|
GRANT ALL PRIVILEGES ON autoflow.* TO 'autoflow'@'localhost';
|
||||||
|
GRANT ALL PRIVILEGES ON autoflow.* TO 'autoflow'@'%';
|
||||||
|
|
||||||
|
FLUSH PRIVILEGES;
|
||||||
|
|
||||||
|
-- 확인
|
||||||
|
SELECT User, Host FROM mysql.user WHERE User = 'autoflow';
|
||||||
|
SHOW DATABASES LIKE 'autoflow';
|
||||||
@ -0,0 +1,29 @@
|
|||||||
|
package kr.re.etri.autoflow.config;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
@Component
|
||||||
|
@ConfigurationProperties(prefix = "minio")
|
||||||
|
public class MinioTypeProperties {
|
||||||
|
|
||||||
|
private TypeConfig type1 = new TypeConfig();
|
||||||
|
private TypeConfig type2 = new TypeConfig();
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public static class TypeConfig {
|
||||||
|
private String endpoint = "";
|
||||||
|
private String bucket = "";
|
||||||
|
private String accessKey = "";
|
||||||
|
private String secretKey = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
public TypeConfig getByType(String type) {
|
||||||
|
if (type != null && "type2".equalsIgnoreCase(type.trim())) {
|
||||||
|
return type2;
|
||||||
|
}
|
||||||
|
return type1;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,86 @@
|
|||||||
|
package kr.re.etri.autoflow.controllers;
|
||||||
|
|
||||||
|
import io.swagger.v3.oas.annotations.Operation;
|
||||||
|
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||||
|
import kr.re.etri.autoflow.service.AdminService;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 관리자 전용 API: 시스템 상태 조회, 조치(재시작 요청).
|
||||||
|
*/
|
||||||
|
@Tag(name = "관리자", description = "시스템 상태 조회 및 조치 API (관리자 전용)")
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/api/admin")
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class AdminController {
|
||||||
|
|
||||||
|
private final AdminService adminService;
|
||||||
|
|
||||||
|
@Operation(summary = "시스템 상태 조회", description = "KFP, MLflow, MinIO HTTP 헬스만 조회 (30초 캐시)")
|
||||||
|
@GetMapping(value = "/status", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||||
|
public ResponseEntity<Map<String, Object>> getStatus() {
|
||||||
|
return ResponseEntity.ok(adminService.getStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "Pod 상태 조회", description = "K8s Pod 상태만 별도 조회 (admin.k8s.enabled=true 시)")
|
||||||
|
@GetMapping(value = "/pods/status", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||||
|
public ResponseEntity<Map<String, Object>> getPodStatus() {
|
||||||
|
return ResponseEntity.ok(adminService.getPodStatus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "서비스 재시작 요청", description = "kfp, mlflow, minio 중 하나. 실제 재시작은 K8s 등에서 별도 설정 필요.")
|
||||||
|
@PostMapping(value = "/restart/{service}", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||||
|
public ResponseEntity<Map<String, Object>> restart(
|
||||||
|
@PathVariable String service) {
|
||||||
|
return ResponseEntity.ok(adminService.requestRestart(service));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "Run별 Pod 목록", description = "KFP runId에 해당하는 파이프라인 실행 Pod 목록 (label: pipeline/runid). Executions 로그 버튼용.")
|
||||||
|
@GetMapping(value = "/pods/by-run", produces = MediaType.APPLICATION_JSON_VALUE)
|
||||||
|
public ResponseEntity<Map<String, Object>> getPodsByRunId(
|
||||||
|
@RequestParam String runId) {
|
||||||
|
return ResponseEntity.ok(adminService.getPodsByRunId(runId));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "Pod 로그 조회", description = "지정 namespace/pod 로그. admin.k8s.enabled=true 필요.")
|
||||||
|
@GetMapping(value = "/pods/logs", produces = MediaType.TEXT_PLAIN_VALUE)
|
||||||
|
public ResponseEntity<String> getPodLog(
|
||||||
|
@RequestParam String namespace,
|
||||||
|
@RequestParam String pod,
|
||||||
|
@RequestParam(required = false) String container,
|
||||||
|
@RequestParam(required = false) Integer tailLines) {
|
||||||
|
String log = adminService.getPodLog(namespace, pod, container, tailLines);
|
||||||
|
return ResponseEntity.ok(log != null ? log : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "Run Pod 로그", description = "기본: KFP와 같이 한 스텝(실패 스텝 우선, 없으면 마지막 스텝). allSteps=true 이면 전체. podName/stepName 으로 지정 가능.")
|
||||||
|
@GetMapping(value = "/pods/logs-by-run", produces = MediaType.TEXT_PLAIN_VALUE)
|
||||||
|
public ResponseEntity<String> getPodLogsByRunId(
|
||||||
|
@RequestParam String runId,
|
||||||
|
@RequestParam(required = false) Integer tailLines,
|
||||||
|
@RequestParam(required = false, defaultValue = "false") boolean allSteps,
|
||||||
|
@RequestParam(required = false) String podName,
|
||||||
|
@RequestParam(required = false) String stepName,
|
||||||
|
@RequestParam(required = false) String workflowName,
|
||||||
|
@RequestParam(required = false) String workflowNamespace) {
|
||||||
|
String log =
|
||||||
|
adminService.getPodLogsByRunId(
|
||||||
|
runId, tailLines, allSteps, podName, stepName, workflowName, workflowNamespace);
|
||||||
|
return ResponseEntity.ok(log != null ? log : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Operation(summary = "지정 Pod 목록 로그 합침", description = "관리자 페이지 Pod 카드용. pod 파라미터 반복 전달. tailLines 0 이하면 Pod당 전체(가능한 범위).")
|
||||||
|
@GetMapping(value = "/pods/logs-aggregate", produces = MediaType.TEXT_PLAIN_VALUE)
|
||||||
|
public ResponseEntity<String> getPodLogsAggregate(
|
||||||
|
@RequestParam String namespace,
|
||||||
|
@RequestParam(name = "pod") java.util.List<String> podNames,
|
||||||
|
@RequestParam(required = false) Integer tailLines) {
|
||||||
|
String log = adminService.getPodLogsAggregate(namespace, podNames, tailLines);
|
||||||
|
return ResponseEntity.ok(log != null ? log : "");
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,94 @@
|
|||||||
|
package kr.re.etri.autoflow.entity;
|
||||||
|
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
import lombok.*;
|
||||||
|
import org.hibernate.annotations.Comment;
|
||||||
|
import org.springframework.data.annotation.CreatedDate;
|
||||||
|
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
|
||||||
|
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
|
||||||
|
@Schema(description = "MinIO 첨부파일 (Dataset/TrainingScript 통합)")
|
||||||
|
@Comment("MinIO 첨부파일")
|
||||||
|
@Entity
|
||||||
|
@EntityListeners(AuditingEntityListener.class)
|
||||||
|
@Table(name = "tb_minio_attachment")
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
@Builder
|
||||||
|
public class MinioAttachmentEntity {
|
||||||
|
|
||||||
|
@Id
|
||||||
|
@GeneratedValue(strategy = GenerationType.IDENTITY)
|
||||||
|
@Schema(description = "첨부파일 ID", example = "1")
|
||||||
|
@Comment("첨부파일 ID")
|
||||||
|
private Long id;
|
||||||
|
|
||||||
|
@Schema(description = "연관 엔티티 ID", example = "1")
|
||||||
|
@Comment("연관 엔티티 ID")
|
||||||
|
@Column(nullable = false)
|
||||||
|
private Long refId;
|
||||||
|
|
||||||
|
@Schema(description = "첨부파일 종류 (DATASET / SCRIPT)", example = "DATASET")
|
||||||
|
@Comment("첨부파일 종류")
|
||||||
|
@Column(nullable = false, length = 50)
|
||||||
|
private String refType; // 구분자 (예: WORKFLOW_STEP,DATASET, TRAINING_SCRIPT)
|
||||||
|
|
||||||
|
@Schema(description = "원본 파일명", example = "step1.yaml")
|
||||||
|
@Comment("원본 파일명")
|
||||||
|
@Column(nullable = false, length = 255)
|
||||||
|
private String originalName;
|
||||||
|
|
||||||
|
@Schema(description = "저장된 파일명(UUID + ver)", example = "a1b2c3d4-step1-ver.1.yaml")
|
||||||
|
@Comment("저장된 파일명")
|
||||||
|
@Column(nullable = false, length = 255)
|
||||||
|
private String storedName;
|
||||||
|
|
||||||
|
@Schema(description = "MIME 타입", example = "application/x-yaml")
|
||||||
|
@Comment("MIME 타입")
|
||||||
|
@Column(nullable = false, length = 100)
|
||||||
|
private String contentType;
|
||||||
|
|
||||||
|
@Schema(description = "파일 크기(byte)", example = "2048")
|
||||||
|
@Comment("파일 크기")
|
||||||
|
@Column(nullable = false)
|
||||||
|
private Long size;
|
||||||
|
|
||||||
|
@Schema(description = "스토리지 경로", example = "/uploads/step1-ver.1.yaml")
|
||||||
|
@Comment("스토리지 경로")
|
||||||
|
@Column(nullable = false, length = 500)
|
||||||
|
private String storagePath;
|
||||||
|
|
||||||
|
@Schema(description = "업로더 ID", example = "admin")
|
||||||
|
@Comment("업로더 ID")
|
||||||
|
@Column(nullable = false, length = 50)
|
||||||
|
private String regUserId;
|
||||||
|
|
||||||
|
@Schema(description = "업로드 일시", example = "2025-09-17T15:00:00")
|
||||||
|
@CreatedDate
|
||||||
|
@Comment("업로드 일시")
|
||||||
|
@Column(nullable = false, updatable = false)
|
||||||
|
private LocalDateTime regDt;
|
||||||
|
|
||||||
|
@Schema(description = "파일 제목", example = "자율주행차량 데이터 셋")
|
||||||
|
@Comment("파일 제목")
|
||||||
|
@Column(length = 200)
|
||||||
|
private String title;
|
||||||
|
|
||||||
|
@Schema(description = "파일 버전", example = "1")
|
||||||
|
@Comment("파일 버전")
|
||||||
|
@Column(nullable = false)
|
||||||
|
private Integer version;
|
||||||
|
|
||||||
|
@Schema(description = "파일 설명", example = "자율주행차량 데이터 모음집입니다.")
|
||||||
|
@Comment("파일 설명")
|
||||||
|
@Column(length = 1000)
|
||||||
|
private String description;
|
||||||
|
|
||||||
|
@Schema(description = "프로젝트 아이디", example = "1", defaultValue = "0")
|
||||||
|
@Column(nullable = false)
|
||||||
|
private Long projectId;
|
||||||
|
}
|
||||||
@ -0,0 +1,32 @@
|
|||||||
|
package kr.re.etri.autoflow.payload.request;
|
||||||
|
|
||||||
|
import io.swagger.v3.oas.annotations.media.Schema;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class ScriptMergeRequest {
|
||||||
|
|
||||||
|
@Schema(description = "머지할 스크립트(첨부파일) ID 목록", example = "[1,2,3]")
|
||||||
|
private List<Long> scriptIds;
|
||||||
|
|
||||||
|
@Schema(description = "머지 결과 스크립트 제목", example = "merged-preprocess-train-eval")
|
||||||
|
private String title;
|
||||||
|
|
||||||
|
@Schema(description = "머지 결과 설명", example = "preprocess → train → eval 파이프라인")
|
||||||
|
private String description;
|
||||||
|
|
||||||
|
@Schema(description = "연관 refId (Training Script 그룹 ID 등)", example = "0", defaultValue = "0")
|
||||||
|
private Long refId;
|
||||||
|
|
||||||
|
@Schema(description = "연관 refType", example = "TRAINING_SCRIPT", defaultValue = "TRAINING_SCRIPT")
|
||||||
|
private String refType = "TRAINING_SCRIPT";
|
||||||
|
|
||||||
|
@Schema(description = "등록 사용자 ID", example = "admin")
|
||||||
|
private String regUserId;
|
||||||
|
|
||||||
|
@Schema(description = "프로젝트 ID", example = "1")
|
||||||
|
private Long projectId;
|
||||||
|
}
|
||||||
|
|
||||||
@ -0,0 +1,16 @@
|
|||||||
|
package kr.re.etri.autoflow.repository;
|
||||||
|
|
||||||
|
import kr.re.etri.autoflow.entity.MinioAttachmentEntity;
|
||||||
|
import org.springframework.data.jpa.repository.JpaRepository;
|
||||||
|
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@Repository
|
||||||
|
public interface MinioAttachmentRepository extends JpaRepository<MinioAttachmentEntity, Long>, JpaSpecificationExecutor<MinioAttachmentEntity> {
|
||||||
|
//최신버전 파일 가져오기
|
||||||
|
Optional<MinioAttachmentEntity> findTopByRefIdAndRefTypeOrderByVersionDesc(Long refId, String refType);
|
||||||
|
List<MinioAttachmentEntity> findAllByRefId(Long refId);
|
||||||
|
}
|
||||||
@ -0,0 +1,844 @@
|
|||||||
|
package kr.re.etri.autoflow.service;
|
||||||
|
|
||||||
|
import io.minio.ListBucketsArgs;
|
||||||
|
import io.minio.MinioClient;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.http.ResponseEntity;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 관리자용 시스템 상태 조회 (KFP, MLflow, MinIO).
|
||||||
|
* 헬스 결과는 30초 캐시하여 각 모듈 부하를 줄임.
|
||||||
|
*
|
||||||
|
* 현재 판정 기준:
|
||||||
|
* - KFP/MLflow: 해당 URL로 HTTP 요청 보내 2xx 응답이 오면 "정상" (API가 살아있는지만 확인).
|
||||||
|
* - MinIO: 앱 계정으로 listBuckets() 성공 시 "정상".
|
||||||
|
*
|
||||||
|
* 관련 Pod가 Running인지 보려면 Kubernetes API로 Pod 목록 조회가 필요함.
|
||||||
|
* (예: admin.k8s.enabled=true, namespace/label 설정 후 CoreV1Api.listNamespacedPod → phase == "Running" 확인.
|
||||||
|
* kubernetes-client-api 의존성 및 클러스터 접근 권한 필요.)
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class AdminService {
|
||||||
|
|
||||||
|
private static final int CACHE_SECONDS = 30;
|
||||||
|
private static final String KFP_ARCHIVE_BUCKET = "mlpipeline";
|
||||||
|
private static final Pattern WAIT_LOG_ARCHIVE_KEY =
|
||||||
|
Pattern.compile("\\bkey:\\s*([\\w\\-./]+/main\\.log)\\b");
|
||||||
|
|
||||||
|
private final RestTemplate restTemplate;
|
||||||
|
private final KubernetesPodHealthService podHealthService;
|
||||||
|
private final PipelineUploadService pipelineUploadService;
|
||||||
|
private final ArgoServerLogService argoServerLogService;
|
||||||
|
|
||||||
|
@Value("${kubeflow.url:}")
|
||||||
|
private String kubeflowUrl;
|
||||||
|
@Value("${mlflow.url:}")
|
||||||
|
private String mlflowUrl;
|
||||||
|
@Value("${minio.endpoint:}")
|
||||||
|
private String minioEndpoint;
|
||||||
|
@Value("${minio.access-key:}")
|
||||||
|
private String minioAccessKey;
|
||||||
|
@Value("${minio.secret-key:}")
|
||||||
|
private String minioSecretKey;
|
||||||
|
|
||||||
|
/** true면 KFP ml-pipeline v1beta1 노드 로그 API를 kubectl보다 먼저 시도 (클러스터 내부 조회, UI와 동일 경로) */
|
||||||
|
@Value("${admin.k8s.prefer-kfp-api-for-logs:true}")
|
||||||
|
private boolean preferKfpApiForLogs;
|
||||||
|
/** true면 Argo Server 로그 API를 KFP/K8s보다 먼저 시도 (삭제된 Pod는 MinIO 아카이브) */
|
||||||
|
@Value("${admin.logs.prefer-argo-server:true}")
|
||||||
|
private boolean preferArgoServer;
|
||||||
|
|
||||||
|
private volatile Map<String, Object> cachedStatus;
|
||||||
|
private volatile long cachedAt;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP, MLflow, MinIO 상태를 조회. 30초 캐시 적용.
|
||||||
|
*/
|
||||||
|
public Map<String, Object> getStatus() {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (cachedStatus != null && (now - cachedAt) < CACHE_SECONDS * 1000L) {
|
||||||
|
return new HashMap<>(cachedStatus);
|
||||||
|
}
|
||||||
|
Map<String, Object> status = new HashMap<>();
|
||||||
|
status.put("kfp", checkKfp());
|
||||||
|
status.put("mlflow", checkMlflow());
|
||||||
|
status.put("minio", checkMinio());
|
||||||
|
status.put("updatedAt", Instant.now().toString());
|
||||||
|
cachedStatus = status;
|
||||||
|
cachedAt = now;
|
||||||
|
return new HashMap<>(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kubernetes Pod 상태만 조회 (별도 버튼용).
|
||||||
|
* admin.k8s.enabled=false 여도 kfp/mlflow/minio 키는 항상 반환해 프론트에서 "Pod 없음" 대신 사유 표시.
|
||||||
|
*/
|
||||||
|
public Map<String, Object> getPodStatus() {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||||||
|
out.put("namespace", "");
|
||||||
|
out.put("message", "admin.k8s.enabled=false");
|
||||||
|
Map<String, Object> disabled = new HashMap<>();
|
||||||
|
disabled.put("ok", false);
|
||||||
|
disabled.put("message", "admin.k8s.enabled=true 설정 후 Pod 상태 조회 가능");
|
||||||
|
disabled.put("running", 0);
|
||||||
|
disabled.put("total", 0);
|
||||||
|
disabled.put("pods", java.util.Collections.emptyList());
|
||||||
|
out.put("kfp", disabled);
|
||||||
|
out.put("mlflow", new HashMap<>(disabled));
|
||||||
|
out.put("minio", new HashMap<>(disabled));
|
||||||
|
out.put("updatedAt", Instant.now().toString());
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
out.put("namespace", podHealthService.getNamespace());
|
||||||
|
out.put("kfp", podHealthService.getKfpPodStatus());
|
||||||
|
out.put("mlflow", podHealthService.getMlflowPodStatus());
|
||||||
|
out.put("minio", podHealthService.getMinioPodStatus());
|
||||||
|
out.put("updatedAt", Instant.now().toString());
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> checkKfp() {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
String base = (kubeflowUrl != null ? kubeflowUrl.replaceAll("/+$", "") : "").trim();
|
||||||
|
if (base.isBlank()) {
|
||||||
|
out.put("status", "skip");
|
||||||
|
out.put("message", "kubeflow.url 미설정");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
String url = base + "/apis/v2beta1/healthz";
|
||||||
|
try {
|
||||||
|
ResponseEntity<String> resp = restTemplate.getForEntity(url, String.class);
|
||||||
|
out.put("status", resp.getStatusCode().is2xxSuccessful() ? "ok" : "error");
|
||||||
|
out.put("message", "HTTP " + resp.getStatusCode().value());
|
||||||
|
} catch (Exception e) {
|
||||||
|
out.put("status", "error");
|
||||||
|
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
|
||||||
|
log.debug("[Admin] KFP health check failed: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object> checkMlflow() {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
String base = (mlflowUrl != null ? mlflowUrl.replaceAll("/+$", "") : "").trim();
|
||||||
|
if (base.isBlank()) {
|
||||||
|
out.put("status", "skip");
|
||||||
|
out.put("message", "mlflow.url 미설정");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
String url = base + "/health";
|
||||||
|
try {
|
||||||
|
ResponseEntity<String> resp = restTemplate.getForEntity(url, String.class);
|
||||||
|
out.put("status", resp.getStatusCode().is2xxSuccessful() ? "ok" : "error");
|
||||||
|
out.put("message", "HTTP " + resp.getStatusCode().value());
|
||||||
|
} catch (Exception e) {
|
||||||
|
out.put("status", "error");
|
||||||
|
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
|
||||||
|
log.debug("[Admin] MLflow health check failed: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MinIO 상태를 스크립트/첨부에서 쓰는 동일 계정(accessKey/secretKey)으로 확인.
|
||||||
|
* listBuckets() 호출로 연결·인증이 정상인지 검사.
|
||||||
|
*/
|
||||||
|
private Map<String, Object> checkMinio() {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
String endpoint = (minioEndpoint != null
|
||||||
|
? minioEndpoint.replaceAll("/+$", "") : "").trim();
|
||||||
|
if (endpoint.isBlank()) {
|
||||||
|
out.put("status", "skip");
|
||||||
|
out.put("message", "MinIO endpoint 미설정");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
String accessKey = minioAccessKey;
|
||||||
|
String secretKey = minioSecretKey;
|
||||||
|
if (accessKey == null) accessKey = "";
|
||||||
|
if (secretKey == null) secretKey = "";
|
||||||
|
try {
|
||||||
|
MinioClient client = MinioClient.builder()
|
||||||
|
.endpoint(endpoint)
|
||||||
|
.credentials(accessKey, secretKey)
|
||||||
|
.build();
|
||||||
|
client.listBuckets(ListBucketsArgs.builder().build());
|
||||||
|
out.put("status", "ok");
|
||||||
|
out.put("message", "연결 정상 (동일 계정)");
|
||||||
|
} catch (Exception e) {
|
||||||
|
out.put("status", "error");
|
||||||
|
out.put("message", e.getMessage() != null ? e.getMessage() : "연결 실패");
|
||||||
|
log.debug("[Admin] MinIO health check failed: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 재시작 요청 수신. 실제 재시작은 K8s API 등 외부에서 수행하도록 설계.
|
||||||
|
* 현재는 요청 로깅 후 안내 메시지 반환.
|
||||||
|
*/
|
||||||
|
public Map<String, Object> requestRestart(String service) {
|
||||||
|
log.info("[Admin] restart requested for service: {}", service);
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
if (service == null || !java.util.Set.of("kfp", "mlflow", "minio").contains(service.toLowerCase())) {
|
||||||
|
out.put("success", false);
|
||||||
|
out.put("message", "지원하지 않는 서비스입니다. (kfp, mlflow, minio 중 하나)");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
out.put("success", true);
|
||||||
|
out.put("message", "재시작 요청이 접수되었습니다. 실제 재시작은 Kubernetes 등에서 별도 설정이 필요합니다.");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* runId(KFP run id)에 해당하는 파이프라인 실행 Pod 목록. Executions 상세 로그 버튼용.
|
||||||
|
*/
|
||||||
|
public Map<String, Object> getPodsByRunId(String runId) {
|
||||||
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
out.put("namespace", "");
|
||||||
|
out.put("pods", java.util.Collections.emptyList());
|
||||||
|
out.put("message", "admin.k8s.enabled=false");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
Map<String, Object> out = podHealthService.listPodsByRunId(runId);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<Map<String, String>> pods = (List<Map<String, String>>) out.get("pods");
|
||||||
|
if (pods != null && !pods.isEmpty()) {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
|
||||||
|
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
|
||||||
|
if (tasks.isEmpty()) {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
String ns = podHealthService.getNamespace();
|
||||||
|
List<Map<String, String>> fromKfp = new ArrayList<>();
|
||||||
|
Set<String> seen = new HashSet<>();
|
||||||
|
for (Map<String, Object> t : tasks) {
|
||||||
|
String pod = firstString(t.get("podName"), t.get("pod_name"));
|
||||||
|
if (pod == null || pod.isBlank() || !seen.add(pod)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<String, String> e = new LinkedHashMap<>();
|
||||||
|
e.put("name", pod);
|
||||||
|
e.put("phase", kfpStateToPhase(t.get("state")));
|
||||||
|
e.put("displayName", firstString(t.get("displayName"), t.get("display_name")));
|
||||||
|
fromKfp.add(e);
|
||||||
|
}
|
||||||
|
if (!fromKfp.isEmpty()) {
|
||||||
|
out.put("namespace", ns);
|
||||||
|
out.put("pods", fromKfp);
|
||||||
|
out.put("message", fromKfp.size() + " pods (KFP run_details.task_details)");
|
||||||
|
out.remove("hint");
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pod 로그 조회. admin.k8s.enabled=true 일 때만 동작.
|
||||||
|
*/
|
||||||
|
public String getPodLog(String namespace, String podName, String container, Integer tailLines) {
|
||||||
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||||||
|
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
|
||||||
|
}
|
||||||
|
return podHealthService.getPodLog(namespace, podName, container, tailLines);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run 로그. 기본(allSteps=false): KFP UI처럼 <b>한 스텝</b>만 — 실패한 스텝이 있으면 그 Pod, 없으면 마지막 스텝 Pod.
|
||||||
|
* allSteps=true 이면 task_details 순서대로 전체 Pod 로그.
|
||||||
|
* podName 지정 시 해당 Pod만. stepName 지정 시 displayName 부분 일치 스텝.
|
||||||
|
* Pod는 {@code admin.k8s.pipeline-pod-namespaces} 순으로 네임스페이스 탐색.
|
||||||
|
*/
|
||||||
|
public String getPodLogsByRunId(
|
||||||
|
String runId,
|
||||||
|
Integer tailLines,
|
||||||
|
boolean allSteps,
|
||||||
|
String podNameParam,
|
||||||
|
String stepNameParam,
|
||||||
|
String workflowNameParam,
|
||||||
|
String workflowNamespaceParam) {
|
||||||
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||||||
|
if (argoServerLogService != null
|
||||||
|
&& argoServerLogService.isConfigured()
|
||||||
|
&& runId != null
|
||||||
|
&& !runId.isBlank()
|
||||||
|
&& podNameParam != null
|
||||||
|
&& !podNameParam.isBlank()) {
|
||||||
|
String wfNs =
|
||||||
|
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
|
||||||
|
? workflowNamespaceParam.trim()
|
||||||
|
: "kubeflow";
|
||||||
|
String wfName =
|
||||||
|
workflowNameParam != null && !workflowNameParam.isBlank()
|
||||||
|
? workflowNameParam.trim()
|
||||||
|
: ArgoServerLogService.guessWorkflowNameFromExecutorPod(podNameParam.trim());
|
||||||
|
if (wfName != null) {
|
||||||
|
ArgoServerLogService.ArgoLogFetchResult al =
|
||||||
|
argoServerLogService.fetchWorkflowPodLog(
|
||||||
|
wfNs, wfName, podNameParam.trim(), normalizeTail(tailLines));
|
||||||
|
if (isSubstantialArgoLogBody(al != null ? al.logText : null)) {
|
||||||
|
return "-- Argo Server /api/v1/workflows/"
|
||||||
|
+ wfNs
|
||||||
|
+ "/"
|
||||||
|
+ wfName
|
||||||
|
+ "/log (kubectl 없음) --\n\n"
|
||||||
|
+ al.logText;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (runId != null && !runId.isBlank()
|
||||||
|
&& podNameParam != null && !podNameParam.isBlank()
|
||||||
|
&& kubeflowUrl != null && !kubeflowUrl.isBlank()) {
|
||||||
|
String ns = (workflowNamespaceParam != null && !workflowNamespaceParam.isBlank())
|
||||||
|
? workflowNamespaceParam.trim() : null;
|
||||||
|
String kfpOnly = tryKfpMlPipelineNodeLog(runId, ns, podNameParam.trim());
|
||||||
|
if (kfpOnly != null) {
|
||||||
|
return "-- KFP ml-pipeline API (kubectl 없이) v1beta1 노드 로그 --\n\n" + kfpOnly;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
|
||||||
|
}
|
||||||
|
if (runId == null || runId.isBlank()) {
|
||||||
|
return "runId가 없습니다.";
|
||||||
|
}
|
||||||
|
if (podNameParam != null && !podNameParam.isBlank()) {
|
||||||
|
KubernetesPodHealthService.ExecutorPodResolution res0 =
|
||||||
|
podHealthService.resolveKfpExecutorImplPod(runId, podNameParam.trim());
|
||||||
|
String wfNsPod =
|
||||||
|
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
|
||||||
|
? workflowNamespaceParam.trim()
|
||||||
|
: res0.namespace;
|
||||||
|
ArgoServerLogService.ArgoLogFetchResult argoDebug = null;
|
||||||
|
if (preferArgoServer && argoServerLogService.isConfigured()) {
|
||||||
|
argoDebug =
|
||||||
|
tryArgoWorkflowLogForStep(
|
||||||
|
wfNsPod,
|
||||||
|
workflowNameParam,
|
||||||
|
res0.podName,
|
||||||
|
podNameParam.trim(),
|
||||||
|
tailLines);
|
||||||
|
if (argoDebug != null && isSubstantialArgoLogBody(argoDebug.logText)) {
|
||||||
|
return "-- Argo Server API (Pod 종료 후 MinIO 아카이브 포함) | wf="
|
||||||
|
+ wfNsPod
|
||||||
|
+ " | pod="
|
||||||
|
+ res0.podName
|
||||||
|
+ " --\n\n"
|
||||||
|
+ argoDebug.logText;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (preferKfpApiForLogs) {
|
||||||
|
String kfpFirst = tryKfpMlPipelineNodeLog(runId, res0.namespace, res0.podName, podNameParam.trim());
|
||||||
|
if (kfpFirst != null) {
|
||||||
|
return "-- KFP ml-pipeline API (UI와 동일) | node: " + res0.podName
|
||||||
|
+ (podNameParam.trim().equals(res0.podName) ? "" : " (요청: " + podNameParam.trim() + ")")
|
||||||
|
+ " --\n\n" + kfpFirst;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
KubernetesPodHealthService.ExecutorPodResolution res =
|
||||||
|
podHealthService.resolveKfpExecutorImplPod(runId, podNameParam.trim());
|
||||||
|
String logText = podHealthService.readPodLogInNamespace(
|
||||||
|
res.namespace, res.podName, normalizeTail(tailLines));
|
||||||
|
if (logText != null && logText.startsWith("로그 조회 실패")) {
|
||||||
|
KubernetesPodHealthService.PipelinePodLogOutcome fb =
|
||||||
|
podHealthService.readPipelinePodLog(res.podName, null, normalizeTail(tailLines));
|
||||||
|
logText = fb.logText;
|
||||||
|
}
|
||||||
|
if (logText != null && logText.contains("code=403") && logText.contains("ApiException")) {
|
||||||
|
log.warn("[Admin] K8s Pod 로그 조회 권한이 없습니다 (403 Forbidden). Kubeflow API로 대체를 시도합니다. runId={}, podName={}", runId, podNameParam);
|
||||||
|
String resolvedTaskId = null;
|
||||||
|
try {
|
||||||
|
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
|
||||||
|
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
|
||||||
|
for (Map<String, Object> t : tasks) {
|
||||||
|
String pod = firstString(t.get("podName"), t.get("pod_name"));
|
||||||
|
if (podNameParam.trim().equals(pod) || res.podName.equals(pod)) {
|
||||||
|
resolvedTaskId = firstString(t.get("taskId"), t.get("task_id"));
|
||||||
|
if (resolvedTaskId != null && !resolvedTaskId.isBlank()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
log.error("[Admin] KFP Run에서 Task ID 추출 실패: {}", ex.getMessage());
|
||||||
|
}
|
||||||
|
String kfpLog = tryKfpMlPipelineNodeLog(runId, res.namespace, resolvedTaskId, res.podName, podNameParam.trim());
|
||||||
|
if (kfpLog != null) {
|
||||||
|
log.info("[Admin] Kubeflow API로 로그 대체 조회 성공. nodeId/podName={}", resolvedTaskId != null ? resolvedTaskId : res.podName);
|
||||||
|
return "-- KFP ml-pipeline API (kubectl 403 Forbidden 대체) --\n\n" + kfpLog;
|
||||||
|
} else {
|
||||||
|
log.warn("[Admin] Kubeflow API로 로그 대체 조회 실패. runId={}, podName={}, resolvedTaskId={}", runId, podNameParam, resolvedTaskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String out = "-- kubectl logs " + res.podName + " -n " + res.namespace
|
||||||
|
+ (podNameParam.trim().equals(res.podName) ? "" : " (요청: " + podNameParam.trim() + ")")
|
||||||
|
+ " --\n\n" + (logText != null ? logText : "");
|
||||||
|
String archived = tryFetchArchivedMainLogFromWait(logText);
|
||||||
|
if (archived != null) {
|
||||||
|
out = out + "\n\n" + archived;
|
||||||
|
}
|
||||||
|
if ((logText == null || logText.trim().isEmpty() || logText.startsWith("로그 조회 실패"))
|
||||||
|
&& argoDebug != null
|
||||||
|
&& argoDebug.url != null
|
||||||
|
&& !argoDebug.url.isBlank()) {
|
||||||
|
out =
|
||||||
|
out
|
||||||
|
+ "\n\n-- Argo Server 시도(진단) --\n"
|
||||||
|
+ "url: "
|
||||||
|
+ argoDebug.url
|
||||||
|
+ "\nstatus: "
|
||||||
|
+ (argoDebug.statusCode != null ? argoDebug.statusCode : "(none)")
|
||||||
|
+ "\nerror: "
|
||||||
|
+ (argoDebug.error != null ? argoDebug.error : "(none)")
|
||||||
|
+ "\n(설정: argo.server.url / 필요시 argo.server.token)";
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
Map<String, Object> run = pipelineUploadService.getKfpRunById(runId);
|
||||||
|
List<Map<String, Object>> tasks = kfpTasksWithPods(run);
|
||||||
|
if (tasks.isEmpty()) {
|
||||||
|
String hint =
|
||||||
|
"KFP Run에 task_details/pod_name이 없거나 API 조회 실패입니다. "
|
||||||
|
+ "멀티유저 환경이면 application.properties 에 "
|
||||||
|
+ "admin.k8s.pipeline-pod-namespaces=파이프라인Pod가있는NS 를 설정하세요.\n";
|
||||||
|
return hint + podHealthService.aggregatePodLogsByRunId(runId, tailLines);
|
||||||
|
}
|
||||||
|
if (stepNameParam != null && !stepNameParam.isBlank()) {
|
||||||
|
String hint = stepNameParam.trim().toLowerCase();
|
||||||
|
for (Map<String, Object> t : tasks) {
|
||||||
|
String dn = firstString(t.get("displayName"), t.get("display_name"));
|
||||||
|
if (dn != null && dn.toLowerCase().contains(hint)) {
|
||||||
|
return formatSingleTaskLog(runId, t, tailLines, workflowNameParam, workflowNamespaceParam);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "스텝 표시명에 \"" + stepNameParam + "\" 가 포함된 task를 찾지 못했습니다. (allSteps=true 로 전체 확인 가능)";
|
||||||
|
}
|
||||||
|
if (allSteps) {
|
||||||
|
List<String> podNames = new ArrayList<>();
|
||||||
|
List<String> stepNames = new ArrayList<>();
|
||||||
|
Set<String> seen = new HashSet<>();
|
||||||
|
for (Map<String, Object> t : tasks) {
|
||||||
|
String pod = firstString(t.get("podName"), t.get("pod_name"));
|
||||||
|
if (pod == null || pod.isBlank() || !seen.add(pod)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
podNames.add(pod);
|
||||||
|
stepNames.add(firstString(t.get("displayName"), t.get("display_name")));
|
||||||
|
}
|
||||||
|
return podHealthService.aggregatePodLogsForKfpTasks(runId, podNames, stepNames, tailLines);
|
||||||
|
}
|
||||||
|
Map<String, Object> chosen = pickPrimaryKfpTaskForLog(tasks);
|
||||||
|
return formatSingleTaskLog(runId, chosen, tailLines, workflowNameParam, workflowNamespaceParam);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Integer normalizeTail(Integer tailLines) {
|
||||||
|
if (tailLines == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (tailLines <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return tailLines;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String formatSingleTaskLog(
|
||||||
|
String runId,
|
||||||
|
Map<String, Object> task,
|
||||||
|
Integer tailLines,
|
||||||
|
String workflowNameParam,
|
||||||
|
String workflowNamespaceParam) {
|
||||||
|
String pod = firstString(task.get("podName"), task.get("pod_name"));
|
||||||
|
String step = firstString(task.get("displayName"), task.get("display_name"));
|
||||||
|
if (pod == null || pod.isBlank()) {
|
||||||
|
return "선택된 task에 pod_name이 없습니다.";
|
||||||
|
}
|
||||||
|
KubernetesPodHealthService.ExecutorPodResolution res =
|
||||||
|
podHealthService.resolveKfpExecutorImplPod(runId, pod.trim());
|
||||||
|
String wfNsEff =
|
||||||
|
workflowNamespaceParam != null && !workflowNamespaceParam.isBlank()
|
||||||
|
? workflowNamespaceParam.trim()
|
||||||
|
: res.namespace;
|
||||||
|
if (preferArgoServer && argoServerLogService.isConfigured()) {
|
||||||
|
ArgoServerLogService.ArgoLogFetchResult argoLog =
|
||||||
|
tryArgoWorkflowLogForStep(
|
||||||
|
wfNsEff, workflowNameParam, res.podName, pod.trim(), tailLines);
|
||||||
|
if (argoLog != null && isSubstantialArgoLogBody(argoLog.logText)) {
|
||||||
|
StringBuilder a = new StringBuilder();
|
||||||
|
a.append("-- Argo Server workflow log (실시간·아카이브 자동) --\n");
|
||||||
|
a.append("namespace: ").append(wfNsEff).append(" | pod: ").append(res.podName);
|
||||||
|
if (!pod.trim().equals(res.podName)) {
|
||||||
|
a.append(" | KFP pod_name: ").append(pod.trim());
|
||||||
|
}
|
||||||
|
a.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
|
||||||
|
a.append(argoLog.logText != null ? argoLog.logText : "");
|
||||||
|
return a.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (preferKfpApiForLogs) {
|
||||||
|
String kfpLog = tryKfpMlPipelineNodeLog(runId, wfNsEff, res.podName, pod.trim());
|
||||||
|
if (kfpLog != null) {
|
||||||
|
StringBuilder k = new StringBuilder();
|
||||||
|
k.append("-- KFP ml-pipeline API v1beta1/runs/.../nodes/{node_id}/log (UI와 동일 백엔드) --\n");
|
||||||
|
k.append("node_id: ").append(res.podName);
|
||||||
|
if (!pod.trim().equals(res.podName)) {
|
||||||
|
k.append(" | KFP task pod_name: ").append(pod.trim());
|
||||||
|
}
|
||||||
|
k.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
|
||||||
|
k.append(kfpLog);
|
||||||
|
return k.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String logText = podHealthService.readPodLogInNamespace(
|
||||||
|
res.namespace, res.podName, normalizeTail(tailLines));
|
||||||
|
if (logText != null && logText.startsWith("로그 조회 실패")) {
|
||||||
|
KubernetesPodHealthService.PipelinePodLogOutcome fb =
|
||||||
|
podHealthService.readPipelinePodLog(res.podName, null, normalizeTail(tailLines));
|
||||||
|
logText = fb.logText;
|
||||||
|
}
|
||||||
|
if (logText != null && logText.contains("code=403") && logText.contains("ApiException")) {
|
||||||
|
log.warn("[Admin] K8s Pod 로그 조회 권한이 없습니다 (403 Forbidden). Kubeflow API로 대체를 시도합니다. runId={}, podName={}, step={}", runId, res.podName, step);
|
||||||
|
String taskId = firstString(task.get("taskId"), task.get("task_id"));
|
||||||
|
String kfpLog = tryKfpMlPipelineNodeLog(runId, res.namespace, taskId, res.podName, pod.trim());
|
||||||
|
if (kfpLog != null) {
|
||||||
|
log.info("[Admin] Kubeflow API로 로그 대체 조회 성공. nodeId/podName={}", taskId != null ? taskId : res.podName);
|
||||||
|
StringBuilder k = new StringBuilder();
|
||||||
|
k.append("-- KFP ml-pipeline API (kubectl 403 Forbidden 대체) --\n");
|
||||||
|
k.append("node_id: ").append(taskId != null ? taskId : res.podName);
|
||||||
|
if (!pod.trim().equals(res.podName)) {
|
||||||
|
k.append(" | KFP task pod_name: ").append(pod.trim());
|
||||||
|
}
|
||||||
|
k.append(" | Step: ").append(step != null ? step : "(이름 없음)").append(" --\n\n");
|
||||||
|
k.append(kfpLog);
|
||||||
|
return k.toString();
|
||||||
|
} else {
|
||||||
|
log.warn("[Admin] Kubeflow API로 로그 대체 조회 실패. runId={}, podName={}, taskId={}", runId, res.podName, taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("-- kubectl logs ").append(res.podName).append(" -n ").append(res.namespace);
|
||||||
|
sb.append(" (KFP 로그와 동일) | Step: ").append(step != null ? step : "(이름 없음)");
|
||||||
|
if (!pod.trim().equals(res.podName)) {
|
||||||
|
sb.append(" | KFP API pod_name: ").append(pod.trim());
|
||||||
|
}
|
||||||
|
sb.append(" --\n\n");
|
||||||
|
sb.append(logText != null ? logText : "");
|
||||||
|
String archived = tryFetchArchivedMainLogFromWait(logText);
|
||||||
|
if (archived != null) {
|
||||||
|
sb.append("\n\n").append(archived);
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP UI가 쓰는 것과 같은 ml-pipeline 노드 로그. node_id 후보 순서대로 시도.
|
||||||
|
*/
|
||||||
|
private String tryKfpMlPipelineNodeLog(String runId, String namespace, String... nodeIdsOrdered) {
|
||||||
|
if (runId == null || runId.isBlank() || kubeflowUrl == null || kubeflowUrl.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LinkedHashSet<String> seen = new LinkedHashSet<>();
|
||||||
|
for (String id : nodeIdsOrdered) {
|
||||||
|
if (id == null || id.isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String t = id.trim();
|
||||||
|
if (!seen.add(t)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String body = pipelineUploadService.getV1beta1RunNodeLog(runId, t, namespace);
|
||||||
|
if (isSubstantialKfpV1LogBody(body)) {
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isSubstantialKfpV1LogBody(String s) {
|
||||||
|
if (s == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String t = s.trim();
|
||||||
|
if (t.length() < 15) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (t.startsWith("{") && (t.contains("\"error\"") || t.contains("Error"))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String low = t.toLowerCase();
|
||||||
|
if (low.startsWith("failed to get")) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ArgoServerLogService.ArgoLogFetchResult tryArgoWorkflowLogForStep(
|
||||||
|
String workflowNamespace,
|
||||||
|
String workflowNameOverride,
|
||||||
|
String implPodName,
|
||||||
|
String apiPodHint,
|
||||||
|
Integer tailLines) {
|
||||||
|
if (!preferArgoServer || argoServerLogService == null || !argoServerLogService.isConfigured()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (workflowNamespace == null || workflowNamespace.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String wfNs = workflowNamespace.trim();
|
||||||
|
String wfName =
|
||||||
|
workflowNameOverride != null && !workflowNameOverride.isBlank()
|
||||||
|
? workflowNameOverride.trim()
|
||||||
|
: null;
|
||||||
|
if (wfName == null && podHealthService != null && podHealthService.isEnabled()) {
|
||||||
|
wfName = podHealthService.getArgoWorkflowNameFromPod(wfNs, implPodName);
|
||||||
|
}
|
||||||
|
if (wfName == null || wfName.isBlank()) {
|
||||||
|
wfName = ArgoServerLogService.guessWorkflowNameFromExecutorPod(implPodName);
|
||||||
|
}
|
||||||
|
if (wfName == null || wfName.isBlank()) {
|
||||||
|
wfName = ArgoServerLogService.guessWorkflowNameFromExecutorPod(apiPodHint);
|
||||||
|
}
|
||||||
|
if (wfName == null || wfName.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return argoServerLogService.fetchWorkflowPodLog(
|
||||||
|
wfNs, wfName, implPodName, normalizeTail(tailLines));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isSubstantialArgoLogBody(String s) {
|
||||||
|
if (s == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String t = s.trim();
|
||||||
|
if (t.length() < 12) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (t.startsWith("{") && (t.contains("\"error\"") || t.contains("Error"))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String low = t.toLowerCase();
|
||||||
|
if (low.startsWith("rpc error") || low.startsWith("failed to retrieve")) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP/Argo executor impl Pod에서 kubectl로 잡힌 wait 로그에 main.log 아카이브 key가 찍히면,
|
||||||
|
* MinIO(mlpipeline 버킷)에서 해당 객체를 읽어와 "실제 실행 로그(main)"를 보여준다.
|
||||||
|
*/
|
||||||
|
private String tryFetchArchivedMainLogFromWait(String logText) {
|
||||||
|
if (logText == null || logText.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String t = logText.trim();
|
||||||
|
if (!t.startsWith("-- container=wait --")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
Matcher m = WAIT_LOG_ARCHIVE_KEY.matcher(t);
|
||||||
|
if (!m.find()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String key = m.group(1);
|
||||||
|
if (key == null || key.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
MinioClient client = MinioClient.builder().endpoint(minioEndpoint).credentials(minioAccessKey, minioSecretKey).build();
|
||||||
|
try (java.io.InputStream is = client.getObject(io.minio.GetObjectArgs.builder().bucket(KFP_ARCHIVE_BUCKET).object(key.trim()).build())) {
|
||||||
|
byte[] bytes = is.readAllBytes();
|
||||||
|
String body = new String(bytes, StandardCharsets.UTF_8);
|
||||||
|
if (body.isBlank()) return null;
|
||||||
|
return "-- MinIO archived main.log (bucket=" + KFP_ARCHIVE_BUCKET + ", key=" + key.trim() + ") --\n\n" + body;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
return "-- MinIO archived main.log 읽기 실패 (bucket=" + KFP_ARCHIVE_BUCKET + ", key=" + key.trim() + "): "
|
||||||
|
+ (e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName())
|
||||||
|
+ " --";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** 실패 스텝 중 시간상 마지막 것, 없으면 생성 순 마지막 스텝 */
|
||||||
|
private static Map<String, Object> pickPrimaryKfpTaskForLog(List<Map<String, Object>> tasks) {
|
||||||
|
List<Map<String, Object>> failed = new ArrayList<>();
|
||||||
|
for (Map<String, Object> t : tasks) {
|
||||||
|
if (kfpTaskStateFailed(t)) {
|
||||||
|
failed.add(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!failed.isEmpty()) {
|
||||||
|
return failed.get(failed.size() - 1);
|
||||||
|
}
|
||||||
|
return tasks.get(tasks.size() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean kfpTaskStateFailed(Map<String, Object> task) {
|
||||||
|
Object st = task.get("state");
|
||||||
|
if (st instanceof Map) {
|
||||||
|
Object rs = ((Map<?, ?>) st).get("runtimeState");
|
||||||
|
if (rs == null) {
|
||||||
|
rs = ((Map<?, ?>) st).get("runtime_state");
|
||||||
|
}
|
||||||
|
if (rs != null) {
|
||||||
|
st = rs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (st instanceof Number) {
|
||||||
|
return ((Number) st).intValue() == 5;
|
||||||
|
}
|
||||||
|
String s = String.valueOf(st).toUpperCase();
|
||||||
|
return s.contains("FAILED") || s.contains("ERROR") || s.contains("CANCEL");
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static Map<String, Object> unwrapKfpTaskMap(Object node) {
|
||||||
|
if (!(node instanceof Map)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
Map<String, Object> m = (Map<String, Object>) node;
|
||||||
|
Object inner = m.get("task");
|
||||||
|
if (inner instanceof Map) {
|
||||||
|
return (Map<String, Object>) inner;
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** task_details 항목에서 pod가 있는 태스크를 모음(평면 배열 + 중첩 child). Pod당 최신 항목 유지. */
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static void mergeKfpTaskWithPod(Object node, Map<String, Map<String, Object>> byPod) {
|
||||||
|
Map<String, Object> task = unwrapKfpTaskMap(node);
|
||||||
|
if (task == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String pod = firstString(task.get("podName"), task.get("pod_name"));
|
||||||
|
if (pod != null && !pod.isBlank()) {
|
||||||
|
byPod.put(pod.trim(), task);
|
||||||
|
}
|
||||||
|
Object child = task.get("childTasks");
|
||||||
|
if (child == null) {
|
||||||
|
child = task.get("child_tasks");
|
||||||
|
}
|
||||||
|
if (!(child instanceof List)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Object c : (List<?>) child) {
|
||||||
|
if (!(c instanceof Map)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<String, Object> cm = (Map<String, Object>) c;
|
||||||
|
if (cm.get("task") instanceof Map) {
|
||||||
|
mergeKfpTaskWithPod(cm.get("task"), byPod);
|
||||||
|
} else if (firstString(cm.get("podName"), cm.get("pod_name")) != null) {
|
||||||
|
mergeKfpTaskWithPod(cm, byPod);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static List<Map<String, Object>> kfpTasksWithPods(Map<String, Object> run) {
|
||||||
|
if (run == null) {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
|
Object rd = run.get("runDetails");
|
||||||
|
if (rd == null) {
|
||||||
|
rd = run.get("run_details");
|
||||||
|
}
|
||||||
|
if (!(rd instanceof Map)) {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
|
Object td = ((Map<?, ?>) rd).get("taskDetails");
|
||||||
|
if (td == null) {
|
||||||
|
td = ((Map<?, ?>) rd).get("task_details");
|
||||||
|
}
|
||||||
|
if (!(td instanceof List)) {
|
||||||
|
return List.of();
|
||||||
|
}
|
||||||
|
Map<String, Map<String, Object>> byPod = new LinkedHashMap<>();
|
||||||
|
for (Object item : (List<?>) td) {
|
||||||
|
mergeKfpTaskWithPod(item, byPod);
|
||||||
|
}
|
||||||
|
List<Map<String, Object>> flat = new ArrayList<>(byPod.values());
|
||||||
|
flat.sort(Comparator.comparing(m -> {
|
||||||
|
Object ct = m.get("createTime");
|
||||||
|
if (ct == null) {
|
||||||
|
ct = m.get("create_time");
|
||||||
|
}
|
||||||
|
return ct != null ? ct.toString() : "";
|
||||||
|
}));
|
||||||
|
return flat;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String firstString(Object a, Object b) {
|
||||||
|
if (a != null) {
|
||||||
|
String s = String.valueOf(a).trim();
|
||||||
|
if (!s.isEmpty()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (b != null) {
|
||||||
|
String s = String.valueOf(b).trim();
|
||||||
|
if (!s.isEmpty()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private static String kfpStateToPhase(Object state) {
|
||||||
|
if (state == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (state instanceof Map) {
|
||||||
|
Object rs = ((Map<?, ?>) state).get("runtimeState");
|
||||||
|
if (rs == null) {
|
||||||
|
rs = ((Map<?, ?>) state).get("runtime_state");
|
||||||
|
}
|
||||||
|
if (rs != null) {
|
||||||
|
state = rs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String s = state.toString();
|
||||||
|
if (s.contains("RUNNING")) {
|
||||||
|
return "Running";
|
||||||
|
}
|
||||||
|
if (s.contains("SUCCEEDED")) {
|
||||||
|
return "Succeeded";
|
||||||
|
}
|
||||||
|
if (s.contains("FAILED") || s.contains("ERROR")) {
|
||||||
|
return "Failed";
|
||||||
|
}
|
||||||
|
if (s.contains("PENDING") || s.contains("SKIPPED")) {
|
||||||
|
return s.contains("SKIPPED") ? "Skipped" : "Pending";
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** 관리자 페이지: 동일 카드(KFP/MLflow/MinIO)에 속한 Pod들 로그를 한 문자열로 */
|
||||||
|
public String getPodLogsAggregate(String namespace, java.util.List<String> podNames, Integer tailLines) {
|
||||||
|
if (podHealthService == null || !podHealthService.isEnabled()) {
|
||||||
|
return "K8s Pod 로그는 비활성화되어 있습니다. (admin.k8s.enabled=true 설정 필요)";
|
||||||
|
}
|
||||||
|
return podHealthService.aggregatePodLogsByNames(namespace, podNames, tailLines);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,137 @@
|
|||||||
|
package kr.re.etri.autoflow.service;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.http.HttpHeaders;
|
||||||
|
import org.springframework.http.MediaType;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClient;
|
||||||
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
||||||
|
import org.springframework.web.util.UriComponentsBuilder;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Argo Server REST: Pod 살아 있으면 K8s 스트림, 종료 후에는 MinIO 아카이브까지 Argo가 처리.
|
||||||
|
* {@code GET /api/v1/workflows/{namespace}/{workflowName}/log?podName=...&logOptions.container=main&logOptions.follow=false}
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class ArgoServerLogService {
|
||||||
|
|
||||||
|
private final WebClient webClient;
|
||||||
|
|
||||||
|
public static final class ArgoLogFetchResult {
|
||||||
|
public final String url;
|
||||||
|
public final String logText;
|
||||||
|
public final Integer statusCode;
|
||||||
|
public final String error;
|
||||||
|
|
||||||
|
public ArgoLogFetchResult(String url, String logText, Integer statusCode, String error) {
|
||||||
|
this.url = url;
|
||||||
|
this.logText = logText;
|
||||||
|
this.statusCode = statusCode;
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean ok() {
|
||||||
|
return logText != null && !logText.isBlank();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Value("${argo.server.url:}")
|
||||||
|
private String argoBaseUrl;
|
||||||
|
|
||||||
|
@Value("${argo.server.token:}")
|
||||||
|
private String argoToken;
|
||||||
|
|
||||||
|
@Value("${argo.server.container:main}")
|
||||||
|
private String logContainer;
|
||||||
|
|
||||||
|
public boolean isConfigured() {
|
||||||
|
return argoBaseUrl != null && !argoBaseUrl.isBlank();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP v2 executor impl Pod 이름에서 Argo Workflow 리소스 이름 추정.
|
||||||
|
* 예: {@code my-pipeline-j8prl-retry-system-container-impl-123} → {@code my-pipeline-j8prl}
|
||||||
|
*/
|
||||||
|
public static String guessWorkflowNameFromExecutorPod(String podName) {
|
||||||
|
if (podName == null || podName.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String s = podName.trim();
|
||||||
|
if (s.matches(".*-retry-system-container-impl-\\d+$")) {
|
||||||
|
return s.replaceFirst("-retry-system-container-impl-\\d+$", "");
|
||||||
|
}
|
||||||
|
if (s.matches(".*-system-container-impl-\\d+$")) {
|
||||||
|
return s.replaceFirst("-system-container-impl-\\d+$", "");
|
||||||
|
}
|
||||||
|
int last = s.lastIndexOf('-');
|
||||||
|
if (last > 0 && !s.contains("system-container")) {
|
||||||
|
String tail = s.substring(last + 1);
|
||||||
|
if (tail.matches("\\d{6,}")) {
|
||||||
|
return s.substring(0, last);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ArgoLogFetchResult fetchWorkflowPodLog(
|
||||||
|
String workflowNamespace,
|
||||||
|
String workflowName,
|
||||||
|
String podName,
|
||||||
|
Integer tailLines) {
|
||||||
|
if (!isConfigured()
|
||||||
|
|| workflowNamespace == null
|
||||||
|
|| workflowNamespace.isBlank()
|
||||||
|
|| workflowName == null
|
||||||
|
|| workflowName.isBlank()
|
||||||
|
|| podName == null
|
||||||
|
|| podName.isBlank()) {
|
||||||
|
return new ArgoLogFetchResult("", null, null, "argo.server.url 또는 파라미터 누락");
|
||||||
|
}
|
||||||
|
String base = argoBaseUrl.replaceAll("/+$", "");
|
||||||
|
UriComponentsBuilder ub =
|
||||||
|
UriComponentsBuilder.fromHttpUrl(base)
|
||||||
|
.path("/api/v1/workflows/{namespace}/{name}/log")
|
||||||
|
.queryParam("podName", podName)
|
||||||
|
.queryParam("logOptions.container", logContainer != null && !logContainer.isBlank() ? logContainer : "main")
|
||||||
|
.queryParam("logOptions.follow", "false");
|
||||||
|
if (tailLines != null && tailLines > 0) {
|
||||||
|
ub.queryParam("logOptions.tailLines", tailLines);
|
||||||
|
}
|
||||||
|
String url =
|
||||||
|
ub.encode(StandardCharsets.UTF_8)
|
||||||
|
.buildAndExpand(workflowNamespace.trim(), workflowName.trim())
|
||||||
|
.toUriString();
|
||||||
|
try {
|
||||||
|
WebClient.RequestHeadersSpec<?> spec =
|
||||||
|
webClient.get().uri(url).accept(MediaType.TEXT_PLAIN);
|
||||||
|
if (argoToken != null && !argoToken.isBlank()) {
|
||||||
|
spec = spec.header(HttpHeaders.AUTHORIZATION, "Bearer " + argoToken.trim());
|
||||||
|
}
|
||||||
|
String body =
|
||||||
|
spec.retrieve()
|
||||||
|
.bodyToMono(String.class)
|
||||||
|
.timeout(Duration.ofSeconds(180))
|
||||||
|
.block();
|
||||||
|
return new ArgoLogFetchResult(url, body, 200, null);
|
||||||
|
} catch (WebClientResponseException e) {
|
||||||
|
log.debug(
|
||||||
|
"[Argo] workflow log {} {}/{} pod={}: {}",
|
||||||
|
e.getStatusCode().value(),
|
||||||
|
workflowNamespace,
|
||||||
|
workflowName,
|
||||||
|
podName,
|
||||||
|
e.getMessage());
|
||||||
|
return new ArgoLogFetchResult(url, null, e.getStatusCode().value(), e.getResponseBodyAsString());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Argo] workflow log failed {}/{}: {}", workflowNamespace, workflowName, e.getMessage());
|
||||||
|
return new ArgoLogFetchResult(url, null, null, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,855 @@
|
|||||||
|
package kr.re.etri.autoflow.service;
|
||||||
|
|
||||||
|
import io.kubernetes.client.openapi.ApiClient;
|
||||||
|
import io.kubernetes.client.openapi.ApiException;
|
||||||
|
import io.kubernetes.client.openapi.apis.CoreV1Api;
|
||||||
|
import io.kubernetes.client.openapi.models.V1Container;
|
||||||
|
import io.kubernetes.client.openapi.models.V1Pod;
|
||||||
|
import io.kubernetes.client.openapi.models.V1PodList;
|
||||||
|
import io.kubernetes.client.util.Config;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.time.OffsetDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Kubernetes API로 Pod 목록을 조회하고 phase == "Running" 인지 확인.
|
||||||
|
* - 클러스터 내부: ServiceAccount 토큰으로 자동 접근.
|
||||||
|
* - 클러스터 외부: KUBECONFIG 또는 ~/.kube/config 사용.
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
public class KubernetesPodHealthService {
|
||||||
|
|
||||||
|
private static final String PHASE_RUNNING = "Running";
|
||||||
|
|
||||||
|
@Value("${admin.k8s.enabled:false}")
|
||||||
|
private boolean enabled;
|
||||||
|
|
||||||
|
@Value("${admin.k8s.namespace:kubeflow}")
|
||||||
|
private String namespace;
|
||||||
|
|
||||||
|
/** MLflow 관련 Pod 라벨 (예: app=mlflow-server) */
|
||||||
|
@Value("${admin.k8s.mlflow.label-selector:app=mlflow-server}")
|
||||||
|
private String mlflowLabelSelector;
|
||||||
|
|
||||||
|
/** MinIO 관련 Pod 라벨 (예: app=minio) */
|
||||||
|
@Value("${admin.k8s.minio.label-selector:app=minio}")
|
||||||
|
private String minioLabelSelector;
|
||||||
|
|
||||||
|
/** KFP(ML Pipeline API) 관련 Pod 라벨 (예: app.kubernetes.io/name=ml-pipeline) */
|
||||||
|
@Value("${admin.k8s.kfp.label-selector:app.kubernetes.io/name=ml-pipeline}")
|
||||||
|
private String kfpLabelSelector;
|
||||||
|
|
||||||
|
/** Run별 Pod 조회 시 사용하는 라벨 키. 값은 runId. (예: pipeline/runid → pipeline/runid=runId) Tekton 등은 tekton.dev/pipelineRun 등으로 다를 수 있음. */
|
||||||
|
@Value("${admin.k8s.run-pod-label:pipeline/runid}")
|
||||||
|
private String runPodLabelKey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 파이프라인 태스크 Pod가 있는 네임스페이스 후보(쉼표 구분). KFP UI 로그와 동일하려면 Run Pod가 떠 있는 NS 필요.
|
||||||
|
* 비어 있으면 {@link #namespace} 만 시도.
|
||||||
|
*/
|
||||||
|
@Value("${admin.k8s.pipeline-pod-namespaces:}")
|
||||||
|
private String pipelinePodNamespaces;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 지정한 namespace + labelSelector 에 해당하는 Pod 중 Running 개수 조회.
|
||||||
|
*
|
||||||
|
* @param labelSelector 예: "app=mlflow-server", "app in (minio,minio-mlflow)"
|
||||||
|
* @return { "running": n, "total": m, "message": "n/m Running", "ok": true if total>0 and all running }
|
||||||
|
*/
|
||||||
|
public Map<String, Object> getPodStatus(String labelSelector) {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
if (!enabled) {
|
||||||
|
out.put("ok", false);
|
||||||
|
out.put("message", "admin.k8s.enabled=false");
|
||||||
|
out.put("running", 0);
|
||||||
|
out.put("total", 0);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
if (namespace == null || namespace.isBlank()) {
|
||||||
|
out.put("ok", false);
|
||||||
|
out.put("message", "admin.k8s.namespace 미설정");
|
||||||
|
out.put("running", 0);
|
||||||
|
out.put("total", 0);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
// listNamespacedPod(namespace, pretty, allowWatchBookmarks, _continue, fieldSelector, labelSelector, limit, resourceVersion, resourceVersionMatch, timeoutSeconds, watch, sendInitialEvents)
|
||||||
|
V1PodList list = api.listNamespacedPod(
|
||||||
|
namespace,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
labelSelector != null && !labelSelector.isBlank() ? labelSelector : null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
int total = list.getItems() != null ? list.getItems().size() : 0;
|
||||||
|
long running = list.getItems() == null ? 0
|
||||||
|
: list.getItems().stream()
|
||||||
|
.map(V1Pod::getStatus)
|
||||||
|
.filter(s -> s != null && PHASE_RUNNING.equals(s.getPhase()))
|
||||||
|
.count();
|
||||||
|
out.put("total", total);
|
||||||
|
out.put("running", running);
|
||||||
|
out.put("message", running + "/" + total + " Running");
|
||||||
|
out.put("ok", total > 0 && running == total);
|
||||||
|
List<Map<String, String>> pods = new ArrayList<>();
|
||||||
|
if (list.getItems() != null) {
|
||||||
|
for (V1Pod p : list.getItems()) {
|
||||||
|
Map<String, String> entry = new HashMap<>();
|
||||||
|
entry.put("name", p.getMetadata() != null ? p.getMetadata().getName() : null);
|
||||||
|
entry.put("phase", p.getStatus() != null ? p.getStatus().getPhase() : null);
|
||||||
|
pods.add(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.put("pods", pods);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Admin] K8s pod list failed: {}", e.getMessage());
|
||||||
|
out.put("ok", false);
|
||||||
|
out.put("message", e.getMessage() != null ? e.getMessage() : "K8s API 연결 실패");
|
||||||
|
out.put("running", 0);
|
||||||
|
out.put("total", 0);
|
||||||
|
out.put("pods", new ArrayList<>());
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** KFP 관련 Pod가 전부 Running인지 */
|
||||||
|
public Map<String, Object> getKfpPodStatus() {
|
||||||
|
return getPodStatus(kfpLabelSelector);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** MLflow 관련 Pod가 전부 Running인지 */
|
||||||
|
public Map<String, Object> getMlflowPodStatus() {
|
||||||
|
return getPodStatus(mlflowLabelSelector);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** MinIO 관련 Pod가 전부 Running인지 */
|
||||||
|
public Map<String, Object> getMinioPodStatus() {
|
||||||
|
return getPodStatus(minioLabelSelector);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEnabled() {
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String ARGO_WORKFLOW_LABEL = "workflows.argoproj.io/workflow";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pod에 붙은 Argo Workflow 리소스 이름 (Workflow metadata.name).
|
||||||
|
*/
|
||||||
|
public String getArgoWorkflowNameFromPod(String podNamespace, String podName) {
|
||||||
|
if (!enabled || podNamespace == null || podNamespace.isBlank() || podName == null || podName.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
V1Pod p = api.readNamespacedPod(podName.trim(), podNamespace.trim(), null);
|
||||||
|
if (p.getMetadata() != null && p.getMetadata().getLabels() != null) {
|
||||||
|
String w = p.getMetadata().getLabels().get(ARGO_WORKFLOW_LABEL);
|
||||||
|
if (w != null && !w.isBlank()) {
|
||||||
|
return w.trim();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Admin] Argo workflow label for pod {}: {}", podName, e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNamespace() {
|
||||||
|
return namespace;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** 파이프라인 Pod 로그 조회 시 시도할 네임스페이스 순서 (앞이 우선). */
|
||||||
|
public List<String> getPipelinePodNamespaceCandidates() {
|
||||||
|
List<String> fromConfig = new ArrayList<>();
|
||||||
|
if (pipelinePodNamespaces != null && !pipelinePodNamespaces.isBlank()) {
|
||||||
|
fromConfig = Arrays.stream(pipelinePodNamespaces.split(","))
|
||||||
|
.map(String::trim)
|
||||||
|
.filter(s -> !s.isEmpty())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
if (!fromConfig.isEmpty()) {
|
||||||
|
if (namespace != null && !namespace.isBlank() && !fromConfig.contains(namespace)) {
|
||||||
|
fromConfig.add(namespace);
|
||||||
|
}
|
||||||
|
return fromConfig;
|
||||||
|
}
|
||||||
|
return (namespace != null && !namespace.isBlank()) ? List.of(namespace) : List.of();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 여러 네임스페이스를 순서대로 시도해 Pod 로그 1건 조회 (멀티유저 KFP 프로필 NS 대응).
|
||||||
|
*/
|
||||||
|
public PipelinePodLogOutcome readPipelinePodLog(String podName, String container, Integer tailLines) {
|
||||||
|
List<String> nss = getPipelinePodNamespaceCandidates();
|
||||||
|
if (nss.isEmpty()) {
|
||||||
|
return new PipelinePodLogOutcome("", "admin.k8s.namespace 미설정");
|
||||||
|
}
|
||||||
|
String lastFail = "";
|
||||||
|
for (String ns : nss) {
|
||||||
|
String log = readPodLogTail(ns, podName, container, tailLines);
|
||||||
|
if (log == null || !log.startsWith("로그 조회 실패")) {
|
||||||
|
return new PipelinePodLogOutcome(ns, log != null ? log : "");
|
||||||
|
}
|
||||||
|
lastFail = log;
|
||||||
|
}
|
||||||
|
return new PipelinePodLogOutcome(nss.get(0), lastFail + "\n(시도한 네임스페이스: " + String.join(", ", nss) + ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Pod 로그 + 실제로 성공한 namespace */
|
||||||
|
public static final class PipelinePodLogOutcome {
|
||||||
|
public final String namespace;
|
||||||
|
public final String logText;
|
||||||
|
|
||||||
|
public PipelinePodLogOutcome(String namespace, String logText) {
|
||||||
|
this.namespace = namespace != null ? namespace : "";
|
||||||
|
this.logText = logText != null ? logText : "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP UI / kubectl logs ...-system-container-impl-... 와 동일한 로그를 읽을 Pod.
|
||||||
|
* KFP API task_details 의 pod_name 은 종종 system-container-driver 인데, 실제 사용자 로그는 impl Pod.
|
||||||
|
*/
|
||||||
|
public static final class ExecutorPodResolution {
|
||||||
|
public final String podName;
|
||||||
|
public final String namespace;
|
||||||
|
/** KFP API가 준 원본 Pod 이름 (driver 등) */
|
||||||
|
public final String sourcePodFromApi;
|
||||||
|
|
||||||
|
public ExecutorPodResolution(String podName, String namespace, String sourcePodFromApi) {
|
||||||
|
this.podName = podName != null ? podName : "";
|
||||||
|
this.namespace = namespace != null ? namespace : "";
|
||||||
|
this.sourcePodFromApi = sourcePodFromApi != null ? sourcePodFromApi : "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run 라벨로 네임스페이스 후보마다 Pod 목록을 모음 (이름·NS·생성시각).
|
||||||
|
*/
|
||||||
|
public List<Map<String, String>> listRunPodsAllNamespaces(String runId) {
|
||||||
|
List<Map<String, String>> out = new ArrayList<>();
|
||||||
|
if (!enabled || runId == null || runId.isBlank()) {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
Set<String> nss = new LinkedHashSet<>();
|
||||||
|
nss.addAll(getPipelinePodNamespaceCandidates());
|
||||||
|
if (namespace != null && !namespace.isBlank()) {
|
||||||
|
nss.add(namespace.trim());
|
||||||
|
}
|
||||||
|
if (nss.isEmpty()) {
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
String labelKey = (runPodLabelKey != null && !runPodLabelKey.isBlank()) ? runPodLabelKey : "pipeline/runid";
|
||||||
|
String labelSelector = labelKey + "=" + runId.trim();
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
for (String ns : nss) {
|
||||||
|
try {
|
||||||
|
V1PodList list = api.listNamespacedPod(
|
||||||
|
ns,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
labelSelector,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null);
|
||||||
|
if (list.getItems() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
for (V1Pod p : list.getItems()) {
|
||||||
|
if (p.getMetadata() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<String, String> row = new HashMap<>();
|
||||||
|
row.put("name", p.getMetadata().getName());
|
||||||
|
row.put("namespace", ns);
|
||||||
|
if (p.getMetadata().getCreationTimestamp() != null) {
|
||||||
|
row.put("createdAt", p.getMetadata().getCreationTimestamp().toString());
|
||||||
|
}
|
||||||
|
out.add(row);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Admin] list pods runId={} ns={}: {}", runId, ns, e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Admin] listRunPodsAllNamespaces: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP API가 알려준 Pod 이름을, kubectl logs ...-system-container-impl-... 에 해당하는 Pod 로 바꿈.
|
||||||
|
*/
|
||||||
|
public ExecutorPodResolution resolveKfpExecutorImplPod(String runId, String podNameFromKfpApi) {
|
||||||
|
if (podNameFromKfpApi == null || podNameFromKfpApi.isBlank()) {
|
||||||
|
return new ExecutorPodResolution("", namespace, podNameFromKfpApi);
|
||||||
|
}
|
||||||
|
String reported = podNameFromKfpApi.trim();
|
||||||
|
List<Map<String, String>> runPods = listRunPodsAllNamespaces(runId);
|
||||||
|
String defaultNs = (namespace != null && !namespace.isBlank()) ? namespace : "";
|
||||||
|
|
||||||
|
if (reported.contains("system-container-impl")) {
|
||||||
|
for (Map<String, String> row : runPods) {
|
||||||
|
if (reported.equals(row.get("name"))) {
|
||||||
|
return new ExecutorPodResolution(reported, row.get("namespace"), reported);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ExecutorPodResolution(reported, defaultNs, reported);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (reported.contains("system-container-driver")) {
|
||||||
|
int idx = reported.indexOf("-system-container-driver");
|
||||||
|
if (idx > 0) {
|
||||||
|
String prefix = reported.substring(0, idx);
|
||||||
|
List<Map<String, String>> implCandidates = new ArrayList<>();
|
||||||
|
for (Map<String, String> row : runPods) {
|
||||||
|
String n = row.get("name");
|
||||||
|
if (n == null || n.isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (n.contains("system-container-impl") && n.startsWith(prefix)) {
|
||||||
|
implCandidates.add(row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
implCandidates.sort(Comparator.comparing(
|
||||||
|
m -> parseCreated(m.get("createdAt")),
|
||||||
|
Comparator.nullsFirst(Comparator.naturalOrder())));
|
||||||
|
if (!implCandidates.isEmpty()) {
|
||||||
|
Map<String, String> best = pickLatestImplPod(implCandidates);
|
||||||
|
return new ExecutorPodResolution(
|
||||||
|
best.get("name"), best.get("namespace"), reported);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* KFP가 ...-j8prl-2326247768 처럼 driver/impl 이 아닌 Pod 이름만 줄 때.
|
||||||
|
* 마지막 -숫자 해시를 떼면 워크플로 인스턴스 접두어 → 같은 Run의 system-container-impl Pod 매칭.
|
||||||
|
* (kubectl: ...-retry-system-container-impl-2358626606)
|
||||||
|
*/
|
||||||
|
String wfPrefix = workflowInstancePrefixFromReportedPod(reported);
|
||||||
|
if (wfPrefix != null && !wfPrefix.isBlank()) {
|
||||||
|
List<Map<String, String>> implByPrefix = new ArrayList<>();
|
||||||
|
for (Map<String, String> row : runPods) {
|
||||||
|
String n = row.get("name");
|
||||||
|
if (n != null && n.contains("system-container-impl") && n.startsWith(wfPrefix)) {
|
||||||
|
implByPrefix.add(row);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!implByPrefix.isEmpty()) {
|
||||||
|
Map<String, String> best = pickLatestImplPod(implByPrefix);
|
||||||
|
return new ExecutorPodResolution(best.get("name"), best.get("namespace"), reported);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map<String, String> row : runPods) {
|
||||||
|
if (reported.equals(row.get("name"))) {
|
||||||
|
return new ExecutorPodResolution(reported, row.get("namespace"), reported);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ExecutorPodResolution(reported, defaultNs, reported);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** 생성 시각 기준 가장 늦은 impl Pod (재시도 시 최신) */
|
||||||
|
private static Map<String, String> pickLatestImplPod(List<Map<String, String>> implCandidates) {
|
||||||
|
implCandidates.sort(Comparator.comparing(
|
||||||
|
m -> parseCreated(m.get("createdAt")),
|
||||||
|
Comparator.nullsFirst(Comparator.naturalOrder())));
|
||||||
|
return implCandidates.get(implCandidates.size() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 예: minimal-gpu-mlflow-train-v2-j8prl-2326247768 → minimal-gpu-mlflow-train-v2-j8prl
|
||||||
|
* (마지막 세그먼트가 숫자 해시이고 system-container 가 없을 때)
|
||||||
|
*/
|
||||||
|
private static String workflowInstancePrefixFromReportedPod(String reported) {
|
||||||
|
if (reported == null || reported.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (reported.contains("system-container")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
int last = reported.lastIndexOf('-');
|
||||||
|
if (last <= 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String tail = reported.substring(last + 1);
|
||||||
|
if (!tail.matches("\\d{6,}")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return reported.substring(0, last);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static OffsetDateTime parseCreated(String s) {
|
||||||
|
if (s == null || s.isBlank()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return OffsetDateTime.parse(s);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP run id에 해당하는 파이프라인 실행 Pod 목록 조회.
|
||||||
|
* 라벨: admin.k8s.run-pod-label=runId (기본 pipeline/runid). Tekton 등은 라벨이 다를 수 있어 설정으로 변경 가능.
|
||||||
|
*/
|
||||||
|
public Map<String, Object> listPodsByRunId(String runId) {
|
||||||
|
Map<String, Object> out = new HashMap<>();
|
||||||
|
if (!enabled) {
|
||||||
|
out.put("namespace", "");
|
||||||
|
out.put("pods", new ArrayList<Map<String, String>>());
|
||||||
|
out.put("message", "admin.k8s.enabled=false");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
if (runId == null || runId.isBlank()) {
|
||||||
|
out.put("namespace", namespace);
|
||||||
|
out.put("pods", new ArrayList<Map<String, String>>());
|
||||||
|
out.put("message", "runId 없음");
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
String labelKey = (runPodLabelKey != null && !runPodLabelKey.isBlank()) ? runPodLabelKey : "pipeline/runid";
|
||||||
|
String labelSelector = labelKey + "=" + runId.trim();
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
V1PodList list = api.listNamespacedPod(
|
||||||
|
namespace,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
labelSelector,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
List<Map<String, String>> pods = new ArrayList<>();
|
||||||
|
if (list.getItems() != null) {
|
||||||
|
for (V1Pod p : list.getItems()) {
|
||||||
|
Map<String, String> entry = new HashMap<>();
|
||||||
|
entry.put("name", p.getMetadata() != null ? p.getMetadata().getName() : null);
|
||||||
|
entry.put("phase", p.getStatus() != null ? p.getStatus().getPhase() : null);
|
||||||
|
String displayName = getStepDisplayName(p);
|
||||||
|
if (displayName != null) {
|
||||||
|
entry.put("displayName", displayName);
|
||||||
|
}
|
||||||
|
if (p.getMetadata() != null && p.getMetadata().getCreationTimestamp() != null) {
|
||||||
|
entry.put("createdAt", p.getMetadata().getCreationTimestamp().toString());
|
||||||
|
}
|
||||||
|
pods.add(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pods.sort(Comparator.comparing(m -> m.get("createdAt"), Comparator.nullsLast(Comparator.naturalOrder())));
|
||||||
|
out.put("namespace", namespace);
|
||||||
|
out.put("pods", pods);
|
||||||
|
out.put("message", pods.size() + " pods");
|
||||||
|
if (pods.isEmpty()) {
|
||||||
|
out.put("hint", "라벨 " + labelSelector + " 로 조회했으나 Pod 없음. kubectl get pods -n " + namespace + " --show-labels 로 실제 Run Pod 라벨 확인 후 application.properties 의 admin.k8s.run-pod-label 설정 변경.");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Admin] Pods by runId failed: {}", e.getMessage());
|
||||||
|
out.put("namespace", namespace);
|
||||||
|
out.put("pods", new ArrayList<Map<String, String>>());
|
||||||
|
out.put("message", e.getMessage() != null ? e.getMessage() : "K8s API 연결 실패");
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** KFP/Argo Pod에서 그래프 모듈(스텝) 표시 이름 추출. 로그 섹션 헤더용. */
|
||||||
|
private String getStepDisplayName(V1Pod p) {
|
||||||
|
if (p == null || p.getMetadata() == null) return null;
|
||||||
|
Map<String, String> ann = p.getMetadata().getAnnotations();
|
||||||
|
if (ann != null) {
|
||||||
|
String v = ann.get("workflows.argoproj.io/template");
|
||||||
|
if (v != null && !v.isBlank()) return v;
|
||||||
|
v = ann.get("pipelines.kubeflow.org/component_ref");
|
||||||
|
if (v != null && !v.isBlank()) return v;
|
||||||
|
v = ann.get("workflows.argoproj.io/display-name");
|
||||||
|
if (v != null && !v.isBlank()) return v;
|
||||||
|
}
|
||||||
|
Map<String, String> labels = p.getMetadata().getLabels();
|
||||||
|
if (labels != null) {
|
||||||
|
String v = labels.get("pipelines.kubeflow.org/component_ref");
|
||||||
|
if (v != null && !v.isBlank()) return v;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pod 로그 조회 (최근 tailLines 줄). container 비우면 Pod 기본 컨테이너.
|
||||||
|
* KFP Run 그래프에서 모듈 클릭 시 나오는 로그와 동일한 스트림(launcher + executor stdout).
|
||||||
|
*/
|
||||||
|
/** 지정 NS/Pod 로그 (kubectl logs 와 동일). */
|
||||||
|
public String readPodLogInNamespace(String ns, String podName, Integer tailLines) {
|
||||||
|
if (!enabled || ns == null || ns.isBlank() || podName == null || podName.isBlank()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
return readPodLogTail(ns.trim(), podName.trim(), null, tailLines);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPodLog(String namespaceOverride, String podName, String container, Integer tailLines) {
|
||||||
|
if (!enabled) return "";
|
||||||
|
String ns = (namespaceOverride != null && !namespaceOverride.isBlank()) ? namespaceOverride : namespace;
|
||||||
|
if (ns == null || ns.isBlank() || podName == null || podName.isBlank()) return "";
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
// readNamespacedPodLog(name, namespace, container, follow, previous, sinceSeconds, timestamps, tailLines, limitBytes, insecureSkipTLSVerify, previous)
|
||||||
|
return api.readNamespacedPodLog(
|
||||||
|
podName,
|
||||||
|
ns,
|
||||||
|
(container != null && !container.isBlank()) ? container : null,
|
||||||
|
false,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
false,
|
||||||
|
tailLines != null ? tailLines : 500,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.debug("[Admin] Pod log failed {}: {}", podName, e.getMessage());
|
||||||
|
return "로그 조회 실패: " + (e.getMessage() != null ? e.getMessage() : "unknown");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* runId에 해당하는 모든 파이프라인 Pod 로그를 순서대로 이어 붙인 문자열.
|
||||||
|
*
|
||||||
|
* @param tailLinesPerPod Pod당 최근 줄 수. null이면 50_000줄, 0 이하이면 tail 제한 없이(가능한 경우) 전체 로그.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public String aggregatePodLogsByRunId(String runId, Integer tailLinesPerPod) {
|
||||||
|
if (!enabled) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
Map<String, Object> list = listPodsByRunId(runId);
|
||||||
|
List<Map<String, String>> pods = (List<Map<String, String>>) list.get("pods");
|
||||||
|
String ns = list.get("namespace") != null ? list.get("namespace").toString() : namespace;
|
||||||
|
if (pods == null || pods.isEmpty()) {
|
||||||
|
String msg = list.get("message") != null ? list.get("message").toString() : "";
|
||||||
|
return "이 Run에 해당하는 Pod가 없습니다.\n" + msg;
|
||||||
|
}
|
||||||
|
List<Map<String, String>> workPods = pods.stream()
|
||||||
|
.filter(p -> {
|
||||||
|
String n = p.get("name");
|
||||||
|
return n != null && n.contains("system-container-impl");
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (workPods.isEmpty()) {
|
||||||
|
workPods = pods.stream()
|
||||||
|
.filter(p -> {
|
||||||
|
String n = p.get("name");
|
||||||
|
return n != null && !n.contains("system-container-driver");
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
if (workPods.isEmpty()) {
|
||||||
|
workPods = new ArrayList<>(pods);
|
||||||
|
}
|
||||||
|
Integer tl;
|
||||||
|
if (tailLinesPerPod == null) {
|
||||||
|
tl = 50_000;
|
||||||
|
} else if (tailLinesPerPod <= 0) {
|
||||||
|
tl = null;
|
||||||
|
} else {
|
||||||
|
tl = tailLinesPerPod;
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("-- Run ").append(runId).append(" | namespace=").append(ns)
|
||||||
|
.append(" | 스텝 ").append(workPods.size())
|
||||||
|
.append("개 (system-container-impl Pod = kubectl logs / KFP UI 와 동일) --\n");
|
||||||
|
for (Map<String, String> p : workPods) {
|
||||||
|
String name = p.get("name");
|
||||||
|
if (name == null || name.isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String displayName = p.get("displayName");
|
||||||
|
String phase = p.get("phase");
|
||||||
|
if (displayName != null && !displayName.isBlank()) {
|
||||||
|
sb.append("\n========== Step: ").append(displayName).append(" (Pod: ").append(name).append(")");
|
||||||
|
} else {
|
||||||
|
sb.append("\n========== Pod: ").append(name);
|
||||||
|
}
|
||||||
|
if (phase != null) {
|
||||||
|
sb.append(" [").append(phase).append("]");
|
||||||
|
}
|
||||||
|
sb.append(tl == null ? " [전체 로그]" : " [최근 " + tl + "줄]");
|
||||||
|
sb.append(" ==========\n\n");
|
||||||
|
sb.append(readPodLogTail(ns, name, null, tl));
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KFP task_details Pod 이름 → system-container-impl Pod 로 바꿔 로그 수집 (kubectl logs / KFP UI 와 동일).
|
||||||
|
*
|
||||||
|
* @param stepNames API Pod와 동일 인덱스의 스텝 표시명(null 가능)
|
||||||
|
*/
|
||||||
|
public String aggregatePodLogsForKfpTasks(
|
||||||
|
String runId,
|
||||||
|
List<String> podNamesFromKfpApi,
|
||||||
|
List<String> stepNames,
|
||||||
|
Integer tailLinesPerPod) {
|
||||||
|
if (!enabled) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
if (podNamesFromKfpApi == null || podNamesFromKfpApi.isEmpty()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
Integer tl;
|
||||||
|
if (tailLinesPerPod == null) {
|
||||||
|
tl = 50_000;
|
||||||
|
} else if (tailLinesPerPod <= 0) {
|
||||||
|
tl = null;
|
||||||
|
} else {
|
||||||
|
tl = tailLinesPerPod;
|
||||||
|
}
|
||||||
|
String rid = runId != null ? runId.trim() : "";
|
||||||
|
/*
|
||||||
|
* 여러 task의 pod_name이 driver 등으로 다르게 나와도 동일 system-container-impl 로 해석되면
|
||||||
|
* 로그 내용이 같으므로 (kubectl 대상 Pod 동일) 한 번만 조회한다.
|
||||||
|
*/
|
||||||
|
Map<String, List<Integer>> byResolvedPod = new LinkedHashMap<>();
|
||||||
|
for (int i = 0; i < podNamesFromKfpApi.size(); i++) {
|
||||||
|
String apiName = podNamesFromKfpApi.get(i);
|
||||||
|
if (apiName == null || apiName.isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ExecutorPodResolution res = resolveKfpExecutorImplPod(rid, apiName.trim());
|
||||||
|
String key = res.namespace + "\0" + res.podName;
|
||||||
|
byResolvedPod.computeIfAbsent(key, k -> new ArrayList<>()).add(i);
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("-- KFP task_details → system-container-impl | task ")
|
||||||
|
.append(podNamesFromKfpApi.stream().filter(s -> s != null && !s.isBlank()).count())
|
||||||
|
.append("건 → 고유 Pod ")
|
||||||
|
.append(byResolvedPod.size())
|
||||||
|
.append("개 (동일 Pod는 로그 1회) --\n");
|
||||||
|
for (List<Integer> group : byResolvedPod.values()) {
|
||||||
|
int i0 = group.get(0);
|
||||||
|
String apiName0 = podNamesFromKfpApi.get(i0).trim();
|
||||||
|
ExecutorPodResolution res = resolveKfpExecutorImplPod(rid, apiName0);
|
||||||
|
sb.append("\n========== ");
|
||||||
|
if (group.size() == 1) {
|
||||||
|
String step = (stepNames != null && i0 < stepNames.size()) ? stepNames.get(i0) : null;
|
||||||
|
if (step != null && !step.isBlank()) {
|
||||||
|
sb.append("Step: ").append(step);
|
||||||
|
} else {
|
||||||
|
sb.append("Pod");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sb.append("Steps (동일 Pod ").append(group.size()).append("건 묶음): ");
|
||||||
|
for (int g = 0; g < group.size(); g++) {
|
||||||
|
int idx = group.get(g);
|
||||||
|
String step = (stepNames != null && idx < stepNames.size()) ? stepNames.get(idx) : null;
|
||||||
|
String api = podNamesFromKfpApi.get(idx);
|
||||||
|
if (step != null && !step.isBlank()) {
|
||||||
|
sb.append(step);
|
||||||
|
} else {
|
||||||
|
sb.append(api != null ? api : "?");
|
||||||
|
}
|
||||||
|
if (g < group.size() - 1) {
|
||||||
|
sb.append(", ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.append(" | kubectl: logs ").append(res.podName).append(" -n ").append(res.namespace);
|
||||||
|
if (!apiName0.equals(res.podName)) {
|
||||||
|
sb.append(" (대표 KFP pod_name: ").append(apiName0).append(")");
|
||||||
|
}
|
||||||
|
if (group.size() > 1) {
|
||||||
|
sb.append(" | 생략: 동일 로그 (다른 task pod_name ");
|
||||||
|
for (int j = 1; j < group.size(); j++) {
|
||||||
|
sb.append(podNamesFromKfpApi.get(group.get(j)));
|
||||||
|
if (j < group.size() - 1) {
|
||||||
|
sb.append(", ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sb.append(")");
|
||||||
|
}
|
||||||
|
sb.append(tl == null ? " [전체 로그]" : " [최근 " + tl + "줄]");
|
||||||
|
sb.append(" ==========\n\n");
|
||||||
|
String logText = readPodLogTail(res.namespace, res.podName, null, tl);
|
||||||
|
if (logText != null && logText.startsWith("로그 조회 실패")) {
|
||||||
|
PipelinePodLogOutcome fb = readPipelinePodLog(res.podName, null, tl);
|
||||||
|
logText = fb.logText;
|
||||||
|
}
|
||||||
|
sb.append(logText != null ? logText : "");
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 지정 namespace의 Pod 이름 목록에 대해 로그를 순서대로 이어 붙임. (관리자 페이지 KFP/MLflow/MinIO 카드용)
|
||||||
|
*/
|
||||||
|
public String aggregatePodLogsByNames(String namespaceOverride, List<String> podNames, Integer tailLinesPerPod) {
|
||||||
|
if (!enabled) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
String ns = (namespaceOverride != null && !namespaceOverride.isBlank()) ? namespaceOverride : namespace;
|
||||||
|
if (podNames == null || podNames.isEmpty()) {
|
||||||
|
return "Pod 목록이 비어 있습니다.";
|
||||||
|
}
|
||||||
|
Integer tl;
|
||||||
|
if (tailLinesPerPod == null) {
|
||||||
|
tl = 50_000;
|
||||||
|
} else if (tailLinesPerPod <= 0) {
|
||||||
|
tl = null;
|
||||||
|
} else {
|
||||||
|
tl = tailLinesPerPod;
|
||||||
|
}
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("-- namespace=").append(ns).append(" | Pod ").append(podNames.size()).append("개 --\n");
|
||||||
|
for (String name : podNames) {
|
||||||
|
if (name == null || name.isBlank()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sb.append("\n========== Pod: ").append(name);
|
||||||
|
sb.append(tl == null ? " [전체 로그]" : " [최근 " + tl + "줄]");
|
||||||
|
sb.append(" ==========\n\n");
|
||||||
|
sb.append(readPodLogTail(ns, name.trim(), null, tl));
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** tailLines null 이면 API에 tail 제한 없이 요청(전체 로그). 실패 시 대량 tail로 재시도. */
|
||||||
|
private String readPodLogTail(String ns, String podName, String container, Integer tailLines) {
|
||||||
|
if (ns == null || ns.isBlank() || podName == null || podName.isBlank()) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
String c = (container != null && !container.isBlank()) ? container : null;
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
return api.readNamespacedPodLog(podName, ns, c, false, null, null, null, false, tailLines, null, null);
|
||||||
|
} catch (Exception e1) {
|
||||||
|
// container 미지정인데 Pod에 컨테이너가 여러 개인 경우: 컨테이너별로 재시도
|
||||||
|
if (c == null) {
|
||||||
|
List<String> containers = listPodContainers(ns, podName);
|
||||||
|
if (!containers.isEmpty()) {
|
||||||
|
for (String cn : containers) {
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
String body = api.readNamespacedPodLog(podName, ns, cn, false, null, null, null, false, tailLines, null, null);
|
||||||
|
if (body != null && !body.isBlank()) {
|
||||||
|
return "-- container=" + cn + " --\n" + body;
|
||||||
|
}
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
// keep trying
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (tailLines == null) {
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
return api.readNamespacedPodLog(podName, ns, c, false, null, null, null, false, 2_000_000, null, null);
|
||||||
|
} catch (Exception e2) {
|
||||||
|
log.debug("[Admin] Pod log (full) failed {}: {}", podName, e2.getMessage());
|
||||||
|
return "로그 조회 실패: " + formatK8sException(e2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.debug("[Admin] Pod log failed {}: {}", podName, e1.getMessage());
|
||||||
|
return "로그 조회 실패: " + formatK8sException(e1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String formatK8sException(Exception e) {
|
||||||
|
if (e == null) {
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
if (e instanceof ApiException) {
|
||||||
|
ApiException ae = (ApiException) e;
|
||||||
|
String msg = ae.getMessage();
|
||||||
|
String body = ae.getResponseBody();
|
||||||
|
String code = String.valueOf(ae.getCode());
|
||||||
|
if (body != null && body.length() > 2000) {
|
||||||
|
body = body.substring(0, 2000) + "…(truncated)";
|
||||||
|
}
|
||||||
|
return "ApiException(code=" + code + ") "
|
||||||
|
+ (msg != null ? msg : "")
|
||||||
|
+ (body != null && !body.isBlank() ? " | body=" + body : "");
|
||||||
|
}
|
||||||
|
String msg = e.getMessage();
|
||||||
|
return e.getClass().getSimpleName() + (msg != null && !msg.isBlank() ? (": " + msg) : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Pod의 컨테이너 이름 목록(순서 보존). */
|
||||||
|
private List<String> listPodContainers(String ns, String podName) {
|
||||||
|
try {
|
||||||
|
ApiClient client = Config.defaultClient();
|
||||||
|
CoreV1Api api = new CoreV1Api(client);
|
||||||
|
V1Pod p = api.readNamespacedPod(podName, ns, null);
|
||||||
|
if (p != null
|
||||||
|
&& p.getSpec() != null
|
||||||
|
&& p.getSpec().getContainers() != null
|
||||||
|
&& !p.getSpec().getContainers().isEmpty()) {
|
||||||
|
List<String> out = new ArrayList<>();
|
||||||
|
for (V1Container c : p.getSpec().getContainers()) {
|
||||||
|
if (c != null && c.getName() != null && !c.getName().isBlank()) {
|
||||||
|
out.add(c.getName().trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// KFP/Argo executor pod: 보통 "main" 로그가 사용자 로그.
|
||||||
|
out.sort((a, b) -> Integer.compare(containerRank(a), containerRank(b)));
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int containerRank(String name) {
|
||||||
|
if (name == null) return 100;
|
||||||
|
String n = name.trim().toLowerCase();
|
||||||
|
if (n.equals("main")) return 0;
|
||||||
|
if (n.equals("user-container")) return 1;
|
||||||
|
if (n.contains("main")) return 2;
|
||||||
|
if (n.equals("wait")) return 90;
|
||||||
|
return 50;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,95 @@
|
|||||||
|
package kr.re.etri.autoflow.specification;
|
||||||
|
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import jakarta.persistence.EntityManager;
|
||||||
|
import jakarta.persistence.criteria.Predicate;
|
||||||
|
import jakarta.persistence.metamodel.Attribute;
|
||||||
|
import jakarta.persistence.metamodel.EntityType;
|
||||||
|
import jakarta.persistence.metamodel.Metamodel;
|
||||||
|
import kr.re.etri.autoflow.entity.MinioAttachmentEntity;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.data.jpa.domain.Specification;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
|
||||||
|
public class MinioAttachmentSpecification {
|
||||||
|
|
||||||
|
private final EntityManager entityManager;
|
||||||
|
|
||||||
|
private Set<String> stringFields;
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init() {
|
||||||
|
Metamodel metamodel = entityManager.getMetamodel();
|
||||||
|
EntityType<MinioAttachmentEntity> entityType = metamodel.entity(MinioAttachmentEntity.class);
|
||||||
|
|
||||||
|
// 문자열 타입 필드명만 추출
|
||||||
|
stringFields = entityType.getAttributes().stream()
|
||||||
|
.filter(attr -> attr.getJavaType().equals(String.class))
|
||||||
|
.map(Attribute::getName)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
log.info("MinioAttachmentEntity string fields: {}", stringFields);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Specification<MinioAttachmentEntity> searchByConditions(
|
||||||
|
String refType,
|
||||||
|
Integer refId,
|
||||||
|
String searchType,
|
||||||
|
String keyword,
|
||||||
|
LocalDate startDate,
|
||||||
|
LocalDate endDate
|
||||||
|
) {
|
||||||
|
return (root, query, cb) -> {
|
||||||
|
Predicate predicate = cb.conjunction();
|
||||||
|
|
||||||
|
// refType 조건 추가
|
||||||
|
if (refType != null && !refType.isBlank()) {
|
||||||
|
predicate = cb.and(predicate, cb.equal(root.get("refType"), refType));
|
||||||
|
}
|
||||||
|
|
||||||
|
// refId 조건 추가
|
||||||
|
if (refId != null ) {
|
||||||
|
predicate = cb.and(predicate, cb.equal(root.get("refId"), refId));
|
||||||
|
}
|
||||||
|
|
||||||
|
// keyword 검색
|
||||||
|
if (keyword != null && !keyword.isEmpty()) {
|
||||||
|
if (searchType == null || searchType.isEmpty()
|
||||||
|
|| "전체".equalsIgnoreCase(searchType)
|
||||||
|
|| "all".equalsIgnoreCase(searchType)) {
|
||||||
|
Predicate orPredicate = cb.disjunction();
|
||||||
|
for (String field : stringFields) {
|
||||||
|
orPredicate = cb.or(orPredicate,
|
||||||
|
cb.like(cb.lower(root.get(field)), "%" + keyword.toLowerCase() + "%"));
|
||||||
|
}
|
||||||
|
predicate = cb.and(predicate, orPredicate);
|
||||||
|
} else if (stringFields.contains(searchType)) {
|
||||||
|
predicate = cb.and(predicate,
|
||||||
|
cb.like(cb.lower(root.get(searchType)), "%" + keyword.toLowerCase() + "%"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 날짜 검색
|
||||||
|
if (startDate != null) {
|
||||||
|
predicate = cb.and(predicate,
|
||||||
|
cb.greaterThanOrEqualTo(root.get("regDt"), startDate.atStartOfDay()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (endDate != null) {
|
||||||
|
predicate = cb.and(predicate,
|
||||||
|
cb.lessThanOrEqualTo(root.get("regDt"), endDate.atTime(23, 59, 59)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return predicate;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,48 @@
|
|||||||
|
# WSL 로컬 환경: k3s Kubeflow/MLflow/MinIO 연동 (port-forward 기준)
|
||||||
|
# 사용: spring.profiles.active=wsl 또는 --spring.profiles.active=wsl
|
||||||
|
#
|
||||||
|
# 스크립트 컴파일(TrainingScript py→YAML) 사용 시 WSL에 한 번 설치:
|
||||||
|
# sudo apt install -y python3 python3-pip
|
||||||
|
# pip3 install kfp
|
||||||
|
|
||||||
|
# DB - WSL 내 MariaDB 또는 kubeflow/mysql port-forward 3306:3306 후 DB 생성
|
||||||
|
spring.datasource.url=jdbc:mariadb://localhost:3306/autoflow
|
||||||
|
spring.datasource.username=autoflow
|
||||||
|
spring.datasource.password=autoflow
|
||||||
|
|
||||||
|
# resource/data.sql 사용: 테이블 생성(BATCH_*) + 초기 데이터(tb_role, tb_user, tb_project 등)
|
||||||
|
# JPA가 엔티티 테이블(tb_*)을 만든 뒤 data.sql이 실행됨
|
||||||
|
spring.jpa.hibernate.ddl-auto=update
|
||||||
|
spring.sql.init.mode=always
|
||||||
|
# 재시작 시 data.sql 재실행 시 INSERT 중복/시퀀스 이미 존재 등 오류 무시
|
||||||
|
spring.sql.init.continue-on-error=true
|
||||||
|
|
||||||
|
# Kubeflow Pipelines API (WSL에서: kubectl port-forward -n kubeflow svc/ml-pipeline 8888:8888)
|
||||||
|
kubeflow.url=http://localhost:8888
|
||||||
|
|
||||||
|
# MLflow (WSL에서: kubectl port-forward -n kubeflow svc/mlflow-server 5000:5000)
|
||||||
|
mlflow.url=http://localhost:5000
|
||||||
|
mlflow.user=
|
||||||
|
mlflow.password=
|
||||||
|
|
||||||
|
# MinIO type1 = Kubeflow 파이프라인 (port-forward: svc/minio-service 9000:9000)
|
||||||
|
# 기본 단일 MinIO는 application.properties와 동일 계정(minio/minio123) 사용
|
||||||
|
minio.endpoint=http://localhost:9000
|
||||||
|
minio.access-key=minio
|
||||||
|
minio.secret-key=minio123
|
||||||
|
minio.bucket=mlpipeline
|
||||||
|
minio.endpoint.pod=http://minio-service.kubeflow.svc.cluster.local:9000
|
||||||
|
minio.type1.endpoint=http://localhost:9000
|
||||||
|
minio.type1.bucket=mlpipeline
|
||||||
|
minio.type1.access-key=minio
|
||||||
|
minio.type1.secret-key=minio123
|
||||||
|
|
||||||
|
# MinIO type2 = MLflow 아티팩트 (port-forward: svc/minio-mlflow 9001:9000)
|
||||||
|
minio.type2.endpoint=http://localhost:9001
|
||||||
|
minio.type2.bucket=mlflow
|
||||||
|
# MLflow 아티팩트용 MinIO 계정 (YOLO 파이프라인과 동일: minio-mlflow / minio-mlflow-12345)
|
||||||
|
minio.type2.access-key=minio-mlflow
|
||||||
|
minio.type2.secret-key=minio-mlflow-12345
|
||||||
|
|
||||||
|
# KFP 스크립트 컴파일: WSL에서는 python3 사용 (기본값)
|
||||||
|
kfp.compile.python-command=python3
|
||||||
@ -0,0 +1,54 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
KFP DSL .py 파이프라인을 KFP 학습 실행용 YAML로 컴파일합니다.
|
||||||
|
사용법: python compile_kfp_pipeline.py <input.py> <output.yaml>
|
||||||
|
스크립트에는 @dsl.pipeline 데코레이터가 붙은 함수가 하나 이상 있어야 합니다.
|
||||||
|
우선 'pipeline', 'my_pipeline' 이름을 찾고, 없으면 모듈 내 모든 @dsl.pipeline 함수를 시도합니다.
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import importlib.util
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) != 3:
|
||||||
|
print("Usage: compile_kfp_pipeline.py <input.py> <output.yaml>", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
py_path = sys.argv[1]
|
||||||
|
yaml_path = sys.argv[2]
|
||||||
|
|
||||||
|
spec = importlib.util.spec_from_file_location("user_pipeline", py_path)
|
||||||
|
mod = importlib.util.module_from_spec(spec)
|
||||||
|
sys.modules["user_pipeline"] = mod
|
||||||
|
spec.loader.exec_module(mod)
|
||||||
|
|
||||||
|
from kfp import compiler
|
||||||
|
comp = compiler.Compiler()
|
||||||
|
|
||||||
|
# 1) 권장 이름 먼저
|
||||||
|
pipeline_fn = getattr(mod, "pipeline", None) or getattr(mod, "my_pipeline", None)
|
||||||
|
if pipeline_fn is not None and callable(pipeline_fn):
|
||||||
|
comp.compile(pipeline_fn, yaml_path)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 2) 모듈에서 @dsl.pipeline 데코레이터가 붙은 함수 찾기 (이름 무관)
|
||||||
|
for name in dir(mod):
|
||||||
|
if name.startswith("_"):
|
||||||
|
continue
|
||||||
|
obj = getattr(mod, name)
|
||||||
|
if not callable(obj):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
comp.compile(obj, yaml_path)
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(
|
||||||
|
"No @dsl.pipeline function found. Define a function with @dsl.pipeline (e.g. pipeline, my_pipeline, or any name).",
|
||||||
|
file=sys.stderr,
|
||||||
|
)
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Loading…
Reference in new issue