forked from JointCloud/pcm-participant
hpc init
This commit is contained in:
parent
c28a738593
commit
85c69f4464
|
@ -0,0 +1 @@
|
|||
package common
|
|
@ -0,0 +1,128 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/gin-gonic/gin"
|
||||
hpc "gitlink.org.cn/JointCloud/pcm-participant/backend"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/backend/slurm"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/backend/sugonac"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/config"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
// 通用请求和响应结构体
|
||||
|
||||
type GenericSubmitRequest struct {
|
||||
Backend string `json:"backend"` // 后端类型:slurm/sugonac
|
||||
Script string `json:"script"` // 作业脚本内容
|
||||
Parameters map[string]string `json:"parameters"` // 动态参数
|
||||
Partition string `json:"partition"` // 分区/队列名称
|
||||
}
|
||||
|
||||
type JobStatusResponse struct {
|
||||
JobID string `json:"job_id"`
|
||||
Status hpc.JobStatus `json:"status"`
|
||||
StatusText string `json:"status_text"`
|
||||
Details interface{} `json:"details,omitempty"`
|
||||
}
|
||||
|
||||
type ErrorResponse struct {
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// 初始化后端客户端
|
||||
func initBackendOperator(backend string) (hpc.Operator, error) {
|
||||
cfg := config.LoadConfig("config/config.yaml")
|
||||
|
||||
switch backend {
|
||||
case "slurm":
|
||||
client, err := slurm.NewClient(&cfg.Backends.Slurm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("slurm初始化失败: %v", err)
|
||||
}
|
||||
return slurm.NewJobManager(client), nil
|
||||
|
||||
case "sugonac":
|
||||
client := sugonac.NewClient(cfg.Backends.Sugonac.BaseURL, cfg.Backends.Sugonac.Token)
|
||||
return sugonac.NewJobManager(client), nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("不支持的后端类型: %s", backend)
|
||||
}
|
||||
}
|
||||
|
||||
// 统一作业提交接口
|
||||
func SubmitHandler(c *gin.Context) {
|
||||
var req GenericSubmitRequest
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, ErrorResponse{"无效的请求格式"})
|
||||
return
|
||||
}
|
||||
|
||||
operator, err := initBackendOperator(req.Backend)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, ErrorResponse{err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
job := &hpc.JobSpec{
|
||||
Script: req.Script,
|
||||
Parameters: req.Parameters,
|
||||
Partition: req.Partition,
|
||||
}
|
||||
|
||||
jobID, err := operator.SubmitJob(c.Request.Context(), job)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, ErrorResponse{err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"job_id": jobID,
|
||||
"backend": req.Backend,
|
||||
})
|
||||
}
|
||||
|
||||
// 统一状态查询接口
|
||||
func StatusHandler(c *gin.Context) {
|
||||
backend := c.Param("backend")
|
||||
jobID := c.Param("job_id")
|
||||
|
||||
operator, err := initBackendOperator(backend)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, ErrorResponse{err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
status, err := operator.GetJobStatus(c.Request.Context(), jobID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, ErrorResponse{err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, JobStatusResponse{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
StatusText: status.String(),
|
||||
})
|
||||
}
|
||||
func ErrorMiddleware() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
c.Next()
|
||||
if len(c.Errors) > 0 {
|
||||
c.JSON(c.Writer.Status(), ErrorResponse{
|
||||
Error: c.Errors.Last().Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
func SetupRouter() *gin.Engine {
|
||||
r := gin.Default()
|
||||
r.Use(ErrorMiddleware())
|
||||
|
||||
// 通用作业接口
|
||||
r.POST("/api/v1/jobs", SubmitHandler)
|
||||
r.GET("/api/v1/jobs/:backend/:job_id", StatusHandler)
|
||||
|
||||
return r
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
package hpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// JobStatus 表示作业的状态
|
||||
type JobStatus int
|
||||
|
||||
const (
|
||||
StatusPending JobStatus = iota // 作业等待中
|
||||
StatusRunning // 作业运行中
|
||||
StatusCompleted // 作业已完成
|
||||
StatusFailed // 作业失败
|
||||
StatusCancelled // 作业已取消
|
||||
)
|
||||
|
||||
// JobInfo 表示作业的基本信息
|
||||
type JobInfo struct {
|
||||
ID string // 作业ID
|
||||
Name string // 作业名称
|
||||
Status JobStatus // 作业状态
|
||||
SubmitTime time.Time // 提交时间
|
||||
StartTime time.Time // 开始时间(可选)
|
||||
EndTime time.Time // 结束时间(可选)
|
||||
}
|
||||
|
||||
// JobSpec 表示作业的规格
|
||||
type JobSpec struct {
|
||||
Name string // 作业名称
|
||||
Script string // 作业脚本内容
|
||||
Parameters map[string]string // 动态参数
|
||||
OutputDir string // 输出目录
|
||||
TimeLimit time.Duration // 作业时间限制
|
||||
Partition string // 分区/队列名称
|
||||
CustomFlags map[string]string // 自定义参数
|
||||
}
|
||||
|
||||
// Submitter 定义作业提交接口
|
||||
type Submitter interface {
|
||||
SubmitJob(ctx context.Context, job *JobSpec) (jobID string, err error)
|
||||
}
|
||||
|
||||
// StatusChecker 定义作业状态查询接口
|
||||
type StatusChecker interface {
|
||||
GetJobStatus(ctx context.Context, jobID string) (JobStatus, error)
|
||||
CancelJob(ctx context.Context, jobID string) error
|
||||
}
|
||||
|
||||
// JobLister 定义作业列表查询接口
|
||||
type JobLister interface {
|
||||
ListJobs(ctx context.Context, filters map[string]string) ([]JobInfo, error)
|
||||
}
|
||||
|
||||
// Operator 组合所有作业操作接口
|
||||
type Operator interface {
|
||||
Submitter
|
||||
StatusChecker
|
||||
JobLister
|
||||
}
|
||||
|
||||
func (s JobStatus) String() string {
|
||||
return [...]string{
|
||||
"PENDING",
|
||||
"RUNNING",
|
||||
"COMPLETED",
|
||||
"FAILED",
|
||||
"CANCELLED",
|
||||
}[s]
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package hpc
|
||||
|
||||
import "fmt"
|
||||
|
||||
type JobError struct {
|
||||
Code string
|
||||
Message string
|
||||
Platform string // "slurm" 或 "sugonac"
|
||||
}
|
||||
|
||||
func (e *JobError) Error() string {
|
||||
return fmt.Sprintf("[%s] %s: %s", e.Platform, e.Code, e.Message)
|
||||
}
|
||||
|
||||
var (
|
||||
ErrJobNotFound = &JobError{Code: "NOT_FOUND", Message: "作业不存在"}
|
||||
ErrInvalidJobID = &JobError{Code: "INVALID_ID", Message: "无效作业ID"}
|
||||
)
|
|
@ -0,0 +1,46 @@
|
|||
package slurm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/config"
|
||||
"golang.org/x/crypto/ssh"
|
||||
"net"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
conn *ssh.Client
|
||||
config *config.SlurmConfig
|
||||
}
|
||||
|
||||
func NewClient(cfg *config.SlurmConfig) (*Client, error) {
|
||||
// 配置 SSH 客户端
|
||||
sshConfig := &ssh.ClientConfig{
|
||||
User: cfg.User,
|
||||
Auth: []ssh.AuthMethod{
|
||||
ssh.Password(cfg.Password), // 使用密码认证
|
||||
},
|
||||
HostKeyCallback: ssh.InsecureIgnoreHostKey(), // 忽略主机密钥检查
|
||||
Timeout: cfg.Timeout,
|
||||
}
|
||||
|
||||
// 连接 SSH 服务器
|
||||
conn, err := ssh.Dial("tcp",
|
||||
net.JoinHostPort(cfg.Host, fmt.Sprint(cfg.Port)),
|
||||
sshConfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("SSH 连接失败: %w", err)
|
||||
}
|
||||
|
||||
return &Client{conn: conn, config: cfg}, nil
|
||||
}
|
||||
|
||||
func (c *Client) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
func (c *Client) RunCommand(cmd string) (string, error) {
|
||||
session, err := c.conn.NewSession()
|
||||
defer session.Close()
|
||||
output, err := session.CombinedOutput(cmd)
|
||||
return string(output), err
|
||||
}
|
|
@ -0,0 +1,169 @@
|
|||
package slurm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
hpc "gitlink.org.cn/JointCloud/pcm-participant/backend"
|
||||
"html/template"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobManager struct {
|
||||
client *Client
|
||||
}
|
||||
|
||||
func NewJobManager(client *Client) *JobManager {
|
||||
return &JobManager{client: client}
|
||||
}
|
||||
|
||||
// SubmitJob 实现 hpc.Submitter 接口
|
||||
func (m *JobManager) SubmitJob(ctx context.Context, job *hpc.JobSpec) (string, error) {
|
||||
// 生成动态脚本
|
||||
scriptContent, _ := replacePlaceholders(job.Script, job.Parameters)
|
||||
|
||||
// 上传脚本到远程
|
||||
remoteScriptPath := fmt.Sprintf("/volume/home/nudt_ysz/job_%d.sh", time.Now().Unix())
|
||||
if err := m.client.UploadScript(remoteScriptPath, scriptContent); err != nil {
|
||||
return "", fmt.Errorf("上传脚本失败: %w", err)
|
||||
}
|
||||
|
||||
// 执行 sbatch 命令
|
||||
cmd := fmt.Sprintf("sbatch %s", remoteScriptPath)
|
||||
output, err := m.client.RunCommand(cmd)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("sbatch 执行失败: %w (输出: %s)", err, output)
|
||||
}
|
||||
|
||||
// 解析作业ID: "Submitted batch job 12345"
|
||||
parts := strings.Fields(output)
|
||||
if len(parts) < 4 {
|
||||
return "", fmt.Errorf("无法解析作业ID: %s", output)
|
||||
}
|
||||
return parts[3], nil
|
||||
}
|
||||
|
||||
func (m *JobManager) GetJobStatus(ctx context.Context, jobID string) (hpc.JobStatus, error) {
|
||||
cmd := fmt.Sprintf("squeue -j %s --noheader --format='%%T'", jobID)
|
||||
output, err := m.client.RunCommand(cmd)
|
||||
if err != nil {
|
||||
return hpc.StatusFailed, fmt.Errorf("squeue失败: %w", err)
|
||||
}
|
||||
|
||||
status := strings.TrimSpace(output)
|
||||
return parseSlurmStatus(status), nil
|
||||
}
|
||||
|
||||
func (m *JobManager) CancelJob(ctx context.Context, jobID string) error {
|
||||
_, err := m.client.RunCommand(fmt.Sprintf("scancel %s", jobID))
|
||||
if err != nil {
|
||||
return fmt.Errorf("scancel失败: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *JobManager) ListJobs(ctx context.Context, filters map[string]string) ([]hpc.JobInfo, error) {
|
||||
cmd := "squeue --user=$USER --noheader --format='%i %j %T %S'"
|
||||
if len(filters) > 0 {
|
||||
cmd += " " + buildSqueueFilter(filters)
|
||||
}
|
||||
output, _ := m.client.RunCommand(cmd)
|
||||
|
||||
var jobs []hpc.JobInfo
|
||||
for _, line := range strings.Split(output, "\n") {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 4 {
|
||||
continue
|
||||
}
|
||||
|
||||
submitTime, _ := time.Parse("2006-01-02T15:04:05", fields[3])
|
||||
jobs = append(jobs, hpc.JobInfo{
|
||||
ID: fields[0],
|
||||
Name: fields[1],
|
||||
Status: parseSlurmStatus(fields[2]),
|
||||
SubmitTime: submitTime,
|
||||
})
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// 状态映射表
|
||||
var slurmStatusMap = map[string]hpc.JobStatus{
|
||||
"PD": hpc.StatusPending, // Pending
|
||||
"R": hpc.StatusRunning,
|
||||
"CA": hpc.StatusCancelled,
|
||||
"CF": hpc.StatusPending, // Configuring
|
||||
"CG": hpc.StatusRunning, // Completing
|
||||
"CD": hpc.StatusCompleted,
|
||||
"F": hpc.StatusFailed,
|
||||
"TO": hpc.StatusFailed, // Timeout
|
||||
"NF": hpc.StatusFailed, // NodeFail
|
||||
}
|
||||
|
||||
func parseSlurmStatus(status string) hpc.JobStatus {
|
||||
if s, ok := slurmStatusMap[status]; ok {
|
||||
return s
|
||||
}
|
||||
return hpc.StatusFailed
|
||||
}
|
||||
|
||||
func buildSqueueFilter(filters map[string]string) string {
|
||||
var args []string
|
||||
for k, v := range filters {
|
||||
switch k {
|
||||
case "user":
|
||||
args = append(args, "--user="+v)
|
||||
case "partition":
|
||||
args = append(args, "--partition="+v)
|
||||
case "state":
|
||||
args = append(args, "--states="+v)
|
||||
}
|
||||
}
|
||||
return strings.Join(args, " ")
|
||||
}
|
||||
|
||||
// replacePlaceholders 替换脚本中的占位符
|
||||
func replacePlaceholders(script string, params map[string]string) (string, error) {
|
||||
// 创建模板
|
||||
tpl, err := template.New("job-script").Parse(script)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("解析脚本模板失败: %w", err)
|
||||
}
|
||||
|
||||
// 渲染模板
|
||||
var buf bytes.Buffer
|
||||
if err := tpl.Execute(&buf, params); err != nil {
|
||||
return "", fmt.Errorf("渲染脚本模板失败: %w", err)
|
||||
}
|
||||
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
func (c *Client) UploadScript(remotePath string, content string) error {
|
||||
session, err := c.conn.NewSession()
|
||||
if err != nil {
|
||||
return fmt.Errorf("创建SSH会话失败: %w", err)
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
// 使用 scp 上传文件
|
||||
go func() {
|
||||
w, _ := session.StdinPipe()
|
||||
defer w.Close()
|
||||
|
||||
// 发送文件头
|
||||
fmt.Fprintf(w, "C0644 %d %s\n", len(content), filepath.Base(remotePath))
|
||||
io.Copy(w, bytes.NewBufferString(content))
|
||||
fmt.Fprint(w, "\x00") // 结束标记
|
||||
}()
|
||||
|
||||
// 执行 scp 命令
|
||||
if err := session.Run(fmt.Sprintf("/usr/bin/scp -qt %s", remotePath)); err != nil {
|
||||
return fmt.Errorf("上传脚本失败: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
package sugonac
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
token string
|
||||
}
|
||||
|
||||
// NewClient 创建一个新的曙光AC客户端
|
||||
func NewClient(baseURL, token string) *Client {
|
||||
return &Client{
|
||||
httpClient: &http.Client{Timeout: 30 * time.Second},
|
||||
baseURL: baseURL,
|
||||
token: token,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,190 @@
|
|||
package sugonac
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
hpc "gitlink.org.cn/JointCloud/pcm-participant/backend"
|
||||
hpc2 "gitlink.org.cn/JointCloud/pcm-participant/backend/hpc"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobManager struct {
|
||||
client *Client
|
||||
}
|
||||
|
||||
func NewJobManager(client *Client) *JobManager {
|
||||
return &JobManager{client: client}
|
||||
}
|
||||
|
||||
// SubmitJob 实现 hpc.Submitter 接口
|
||||
func (m *JobManager) SubmitJob(ctx context.Context, job *hpc.JobSpec) (string, error) {
|
||||
// 构建 API 请求体
|
||||
reqBody := map[string]interface{}{
|
||||
"job_name": job.Name,
|
||||
"script_body": job.Script,
|
||||
"parameters": job.Parameters,
|
||||
"queue": job.Partition,
|
||||
"time_limit": int(job.TimeLimit.Minutes()),
|
||||
}
|
||||
|
||||
// 发送 HTTP 请求
|
||||
resp, err := m.client.DoRequest("POST", "/api/v1/jobs", reqBody)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("API 请求失败: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 解析响应
|
||||
var result struct {
|
||||
JobID string `json:"job_id"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return "", fmt.Errorf("解析响应失败: %w", err)
|
||||
}
|
||||
|
||||
return result.JobID, nil
|
||||
}
|
||||
|
||||
func (m *JobManager) GetJobStatus(ctx context.Context, jobID string) (hpc.JobStatus, error) {
|
||||
resp, err := m.client.DoRequest("GET", fmt.Sprintf("/jobs/%s", jobID), nil)
|
||||
if err != nil {
|
||||
return hpc.StatusFailed, err
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Status string `json:"status"`
|
||||
}
|
||||
json.NewDecoder(resp.Body).Decode(&result)
|
||||
return parseSugonStatus(result.Status), nil
|
||||
}
|
||||
|
||||
func (m *JobManager) CancelJob(ctx context.Context, jobID string) error {
|
||||
resp, err := m.client.DoRequest("DELETE", fmt.Sprintf("/jobs/%s", jobID), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("取消作业失败: %s", resp.Status)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *JobManager) ListJobs(ctx context.Context, filters map[string]string) ([]hpc.JobInfo, error) {
|
||||
path := "/jobs"
|
||||
if len(filters) > 0 {
|
||||
path += "?" + buildSugonQuery(filters).Encode()
|
||||
}
|
||||
|
||||
resp, err := m.client.DoRequest("GET", path, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var apiJobs []struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Status string `json:"status"`
|
||||
Created string `json:"created_at"`
|
||||
}
|
||||
json.NewDecoder(resp.Body).Decode(&apiJobs)
|
||||
|
||||
var jobs []hpc.JobInfo
|
||||
for _, j := range apiJobs {
|
||||
t, _ := time.Parse(time.RFC3339, j.Created)
|
||||
jobs = append(jobs, hpc.JobInfo{
|
||||
ID: j.ID,
|
||||
Name: j.Name,
|
||||
Status: parseSugonStatus(j.Status),
|
||||
SubmitTime: t,
|
||||
})
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// 状态映射表
|
||||
var sugonStatusMap = map[string]hpc.JobStatus{
|
||||
"1": hpc.StatusPending, // 等待中
|
||||
"2": hpc.StatusRunning, // 运行中
|
||||
"3": hpc.StatusCompleted, // 已完成
|
||||
"4": hpc.StatusFailed, // 已失败
|
||||
"5": hpc.StatusCancelled, // 已取消
|
||||
}
|
||||
|
||||
func parseSugonStatus(status string) hpc.JobStatus {
|
||||
if s, ok := sugonStatusMap[status]; ok {
|
||||
return s
|
||||
}
|
||||
return hpc.StatusFailed
|
||||
}
|
||||
|
||||
func buildSugonQuery(filters map[string]string) url.Values {
|
||||
params := url.Values{}
|
||||
for k, v := range filters {
|
||||
switch k {
|
||||
case "queue":
|
||||
params.Add("queue_name", v)
|
||||
case "start_time":
|
||||
params.Add("start_at", v)
|
||||
case "status":
|
||||
params.Add("job_status", v)
|
||||
}
|
||||
}
|
||||
return params
|
||||
}
|
||||
|
||||
// DoRequest 发送 HTTP 请求
|
||||
func (c *Client) DoRequest(method, path string, body interface{}) (*http.Response, error) {
|
||||
// 构建请求体
|
||||
var bodyReader io.Reader
|
||||
if body != nil {
|
||||
jsonBody, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("编码请求体失败: %w", err)
|
||||
}
|
||||
bodyReader = bytes.NewReader(jsonBody)
|
||||
}
|
||||
|
||||
// 创建 HTTP 请求
|
||||
req, err := http.NewRequest(method, c.baseURL+path, bodyReader)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建请求失败: %w", err)
|
||||
}
|
||||
|
||||
// 设置请求头
|
||||
req.Header.Set("Authorization", "Bearer "+c.token)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// 发送请求
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("发送请求失败: %w", err)
|
||||
}
|
||||
|
||||
// 检查响应状态码
|
||||
if resp.StatusCode >= 400 {
|
||||
return nil, parseSugonError(resp)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// parseSugonError 解析曙光AC API 的错误响应
|
||||
func parseSugonError(resp *http.Response) error {
|
||||
var errBody struct {
|
||||
Code string `json:"error_code"`
|
||||
Message string `json:"error_msg"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&errBody); err != nil {
|
||||
return fmt.Errorf("解析错误响应失败: %w", err)
|
||||
}
|
||||
return &hpc2.JobError{
|
||||
Code: errBody.Code,
|
||||
Message: errBody.Message,
|
||||
Platform: "sugonac",
|
||||
}
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
hpc "gitlink.org.cn/JointCloud/pcm-participant/backend"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/backend/slurm"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/backend/sugonac"
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/config"
|
||||
"os"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var rootCmd = &cobra.Command{
|
||||
Use: "hpc",
|
||||
Short: "超算任务管理工具",
|
||||
Long: `hpc 是一个用于提交和管理超算任务的命令行工具,支持 Slurm 和曙光AC集群。`,
|
||||
}
|
||||
|
||||
var submitCmd = &cobra.Command{
|
||||
Use: "submit [backend] [job-script]",
|
||||
Args: cobra.ExactArgs(2),
|
||||
Short: "提交计算作业",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
backendType := args[0]
|
||||
scriptPath := args[1]
|
||||
|
||||
// 统一加载配置文件
|
||||
cfg := config.LoadConfig("config/config.yaml")
|
||||
|
||||
scriptContent, err := os.ReadFile(scriptPath)
|
||||
if err != nil {
|
||||
fmt.Printf("读取脚本失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
var operator hpc.Operator
|
||||
switch backendType {
|
||||
case "slurm":
|
||||
client, err := slurm.NewClient(&cfg.Backends.Slurm)
|
||||
if err != nil {
|
||||
fmt.Printf("初始化 Slurm 客户端失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
operator = slurm.NewJobManager(client)
|
||||
|
||||
case "sugonac":
|
||||
client := sugonac.NewClient(
|
||||
cfg.Backends.Sugonac.BaseURL,
|
||||
cfg.Backends.Sugonac.Token,
|
||||
)
|
||||
operator = sugonac.NewJobManager(client)
|
||||
|
||||
default:
|
||||
fmt.Printf("不支持的 backend 类型: %s\n", backendType)
|
||||
return
|
||||
}
|
||||
|
||||
job := &hpc.JobSpec{
|
||||
Name: "hpc_job",
|
||||
Script: string(scriptContent),
|
||||
Parameters: map[string]string{"NODE_NUM": "8"},
|
||||
}
|
||||
|
||||
jobID, err := operator.SubmitJob(cmd.Context(), job)
|
||||
if err != nil {
|
||||
fmt.Printf("作业提交失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("✅ 作业已提交 | ID: %s\n", jobID)
|
||||
},
|
||||
}
|
||||
|
||||
var listJobsCmd = &cobra.Command{
|
||||
Use: "listjobs [backend]",
|
||||
Short: "列出超算作业",
|
||||
Long: "列出指定后端集群的所有作业",
|
||||
Args: cobra.ExactArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
backendType := args[0]
|
||||
filters := make(map[string]string)
|
||||
|
||||
if status, _ := cmd.Flags().GetString("status"); status != "" {
|
||||
filters["status"] = status
|
||||
}
|
||||
if queue, _ := cmd.Flags().GetString("queue"); queue != "" {
|
||||
filters["queue"] = queue
|
||||
}
|
||||
|
||||
cfg := config.LoadConfig("config/config.yaml")
|
||||
|
||||
var operator hpc.Operator
|
||||
var err error
|
||||
|
||||
switch backendType {
|
||||
case "slurm":
|
||||
client, err := slurm.NewClient(&cfg.Backends.Slurm)
|
||||
if err != nil {
|
||||
fmt.Printf("初始化Slurm客户端失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
operator = slurm.NewJobManager(client)
|
||||
|
||||
case "sugonac":
|
||||
client := sugonac.NewClient(
|
||||
cfg.Backends.Sugonac.BaseURL,
|
||||
cfg.Backends.Sugonac.Token,
|
||||
)
|
||||
operator = sugonac.NewJobManager(client)
|
||||
|
||||
default:
|
||||
fmt.Printf("不支持的backend类型: %s\n", backendType)
|
||||
return
|
||||
}
|
||||
|
||||
jobs, err := operator.ListJobs(context.Background(), filters)
|
||||
if err != nil {
|
||||
fmt.Printf("获取作业列表失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
if jsonOutput, _ := cmd.Flags().GetBool("json"); jsonOutput {
|
||||
fmt.Println("[")
|
||||
for i, job := range jobs {
|
||||
fmt.Printf(" {\n")
|
||||
fmt.Printf(" \"ID\": \"%s\",\n", job.ID)
|
||||
fmt.Printf(" \"Name\": \"%s\",\n", job.Name)
|
||||
fmt.Printf(" \"Status\": \"%s\",\n", job.Status)
|
||||
fmt.Printf(" \"SubmitTime\": \"%s\"\n", job.SubmitTime.Format("2006-01-02 15:04:05"))
|
||||
if i < len(jobs)-1 {
|
||||
fmt.Println(" },")
|
||||
} else {
|
||||
fmt.Println(" }")
|
||||
}
|
||||
}
|
||||
fmt.Println("]")
|
||||
} else {
|
||||
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||
fmt.Fprintln(w, "JOB ID\tNAME\tSTATUS\tSUBMIT TIME")
|
||||
for _, job := range jobs {
|
||||
fmt.Fprintf(w, "%s\t%s\t%s\t%s\n",
|
||||
job.ID,
|
||||
job.Name,
|
||||
job.Status,
|
||||
job.SubmitTime.Format("2006-01-02 15:04:05"))
|
||||
}
|
||||
w.Flush()
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(submitCmd)
|
||||
rootCmd.AddCommand(listJobsCmd)
|
||||
|
||||
listJobsCmd.Flags().StringP("status", "s", "", "按状态过滤作业")
|
||||
listJobsCmd.Flags().StringP("queue", "q", "", "按队列/分区过滤作业")
|
||||
listJobsCmd.Flags().Bool("json", false, "输出JSON格式")
|
||||
}
|
||||
|
||||
func main() {
|
||||
if err := rootCmd.Execute(); err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v3"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Backends struct {
|
||||
Slurm SlurmConfig `yaml:"slurm"`
|
||||
Sugonac SugonConfig `yaml:"sugonac"`
|
||||
} `yaml:"backends"`
|
||||
}
|
||||
|
||||
// SlurmConfig 定义 Slurm 集群的配置
|
||||
type SlurmConfig struct {
|
||||
Host string `yaml:"host"` // 集群主机地址
|
||||
Port int `yaml:"port"` // SSH 端口
|
||||
User string `yaml:"user"` // SSH 用户名
|
||||
Password string `yaml:"password"` // SSH 密码
|
||||
Timeout time.Duration `yaml:"timeout"` // 连接超时时间
|
||||
}
|
||||
|
||||
// SugonConfig 定义曙光AC集群的配置
|
||||
type SugonConfig struct {
|
||||
BaseURL string `yaml:"base_url"` // 曙光AC API 地址
|
||||
Token string `yaml:"token"` // 认证令牌
|
||||
}
|
||||
|
||||
// LoadConfig 从 YAML 文件加载所有后端的配置
|
||||
func LoadConfig(path string) *Config {
|
||||
// 读取配置文件
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("读取配置文件失败: %v", err))
|
||||
}
|
||||
|
||||
// 解析 YAML
|
||||
var cfg Config
|
||||
if err := yaml.Unmarshal(data, &cfg); err != nil {
|
||||
panic(fmt.Sprintf("解析配置文件失败: %v", err))
|
||||
}
|
||||
|
||||
return &cfg
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
backends:
|
||||
slurm:
|
||||
host: "192.168.87.178"
|
||||
port: 22
|
||||
user: "nudt_ysz"
|
||||
password: "Yangsz$$10" # SSH 密码
|
||||
timeout: 30s
|
||||
|
||||
sugonac:
|
||||
base_url: "https://api.sugon.cn"
|
||||
token: "xxxxxx"
|
|
@ -0,0 +1,39 @@
|
|||
module gitlink.org.cn/JointCloud/pcm-participant
|
||||
|
||||
go 1.23.3
|
||||
|
||||
require (
|
||||
github.com/gin-gonic/gin v1.10.0
|
||||
github.com/spf13/cobra v1.9.1
|
||||
golang.org/x/crypto v0.35.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/bytedance/sonic v1.11.6 // indirect
|
||||
github.com/bytedance/sonic/loader v0.1.1 // indirect
|
||||
github.com/cloudwego/base64x v0.1.4 // indirect
|
||||
github.com/cloudwego/iasm v0.2.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
|
||||
github.com/gin-contrib/sse v0.1.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.20.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||
github.com/spf13/pflag v1.0.6 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||
golang.org/x/arch v0.8.0 // indirect
|
||||
golang.org/x/net v0.25.0 // indirect
|
||||
golang.org/x/sys v0.30.0 // indirect
|
||||
golang.org/x/text v0.22.0 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
)
|
|
@ -0,0 +1,17 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"gitlink.org.cn/JointCloud/pcm-participant/api"
|
||||
"log"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 设置 API 路由
|
||||
router := api.SetupRouter()
|
||||
|
||||
// 启动 API 服务
|
||||
log.Println("启动 API 服务,监听端口 8080...")
|
||||
if err := router.Run(":8080"); err != nil {
|
||||
log.Fatalf("启动 API 服务失败: %v", err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
#!/bin/bash
|
||||
#SBATCH --job-name=sleep_job # 作业名称
|
||||
#SBATCH --output=sleep_job.out # 标准输出文件
|
||||
#SBATCH --error=sleep_job.err # 标准错误文件
|
||||
#SBATCH --ntasks=1 # 任务数
|
||||
#SBATCH --time=00:01:00 # 运行时间(1分钟)
|
||||
#SBATCH --partition=P100 # 分区名称
|
||||
|
||||
echo "Starting sleep job..."
|
||||
sleep 300 # 休眠 60 秒
|
||||
echo "Sleep job completed."
|
|
@ -0,0 +1,5 @@
|
|||
import time
|
||||
|
||||
print("Starting sleep job...")
|
||||
time.sleep(60) # 休眠 60 秒
|
||||
print("Sleep job completed.")
|
Loading…
Reference in New Issue