forked from JointCloud/pcm-coordinator
parent
c806fc4e3c
commit
0e1df3b20f
|
@ -936,7 +936,10 @@ type (
|
|||
EnvPath string `json:"envPath,omitempty" db:"env_path"`
|
||||
EnvLdPath string `json:"envLdPath,omitempty" db:"env_ld_path"`
|
||||
WorkDir string `json:"workDir,omitempty" db:"work_dir"`
|
||||
Address string `json:"address,omitempty" db:"address"`
|
||||
ProxyAddress string `json:"proxyAddress,omitempty" db:"proxy_address"`
|
||||
ProxyEnable string `json:"proxyEnable,omitempty" db:"proxy_enable"`
|
||||
Driver string `json:"driver,omitempty" db:"driver"`
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -189,4 +189,38 @@ type(
|
|||
HpcTaskLogReq{
|
||||
TaskId string `path:"taskId"`
|
||||
}
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
HpcAppTemplateInfo {
|
||||
Id int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
App string `json:"app"`
|
||||
AppType string `json:"app_type"`
|
||||
ClusterId int64 `json:"cluster_id"`
|
||||
Content string `json:"content"`
|
||||
Description string `json:"description"`
|
||||
Status int32 `json:"status"`
|
||||
CreateTime string `json:"create_time"`
|
||||
UpdateTime string `json:"update_time"`
|
||||
}
|
||||
// App string `json:"app"`
|
||||
//Backend string `json:"backend" binding:"required,oneof=slurm sugonac"` // 后端类型:slurm/sugonac
|
||||
//Partition string // 分区/队列名称
|
||||
//TaskId string `json:"taskId"`
|
||||
//ClusterId string `json:"clusterId"`
|
||||
//JobName string `json:"jobName"`
|
||||
//ScriptContent string `json:"scriptContent"`
|
||||
//ScriptDir string `json:"scriptDir"`
|
||||
//Parameters map[string]string `json:"parameters"`
|
||||
//TimeLimit time.Duration // 作业时间限制
|
||||
SubmitHpcTaskReq {
|
||||
App string `json:"app"`
|
||||
ClusterId string `json:"clusterId"`
|
||||
JobName string `json:"jobName"`
|
||||
ScriptContent string `json:"scriptContent"`
|
||||
Parameters map[string]string `json:"parameters"`
|
||||
Backend string `json:"backend"`
|
||||
}
|
||||
)
|
|
@ -203,7 +203,7 @@ service pcm {
|
|||
@doc "删除资源规格"
|
||||
@handler deleteResourceSpecHandler
|
||||
delete /core/ai/resourceSpec/delete/:id (DeletePathId) returns (CommonResp)
|
||||
//集群资源规格----- 结束
|
||||
//集群资源规格----- 结束
|
||||
}
|
||||
|
||||
//hpc二级接口
|
||||
|
@ -444,7 +444,7 @@ service pcm {
|
|||
@doc "文本识别"
|
||||
@handler ChatHandler
|
||||
post /ai/chat (ChatReq) returns (ChatResult)
|
||||
/******chat end***********/
|
||||
/******chat end***********/
|
||||
}
|
||||
|
||||
//screen接口
|
||||
|
@ -1133,4 +1133,5 @@ service pcm {
|
|||
|
||||
@handler scheduleSituationHandler
|
||||
get /monitoring/schedule/situation returns (scheduleSituationResp)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -21,6 +21,7 @@ type TaskHPCResult struct {
|
|||
ID uint `gorm:"column:id"` // 对应 t.id
|
||||
JobID string `gorm:"column:job_id"` // 对应 hpc.job_id
|
||||
AdapterId string `gorm:"column:adapter_id"` // 对应 hpc.adapter_id
|
||||
ClusterId string `gorm:"column:cluster_id"` // 对应 hpc.cluster_id
|
||||
}
|
||||
|
||||
func NewCancelJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CancelJobLogic {
|
||||
|
@ -67,7 +68,7 @@ func (l *CancelJobLogic) CancelJob(req *types.CancelJobReq) error {
|
|||
//return nil
|
||||
var hpcR TaskHPCResult
|
||||
tx := l.svcCtx.DbEngin.Raw(
|
||||
"SELECT t.id, hpc.job_id ,hpc.adapter_id FROM task t "+
|
||||
"SELECT t.id, hpc.job_id ,hpc.adapter_id, hpc.cluster_id FROM task t "+
|
||||
"INNER JOIN task_hpc hpc ON t.id = hpc.task_id "+
|
||||
"WHERE adapter_type_dict = 2 AND t.id = ?",
|
||||
req.TaskId,
|
||||
|
@ -84,7 +85,7 @@ func (l *CancelJobLogic) CancelJob(req *types.CancelJobReq) error {
|
|||
return fmt.Errorf("adapter not found")
|
||||
}
|
||||
// 取消作业
|
||||
err := l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].CancelTask(l.ctx, hpcR.JobID)
|
||||
err := l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].CancelTask(l.ctx, hpcR.JobID, hpcR.ClusterId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2,19 +2,22 @@ package hpc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
)
|
||||
|
||||
type CommitHpcTaskLogic struct {
|
||||
|
@ -24,6 +27,25 @@ type CommitHpcTaskLogic struct {
|
|||
hpcService *service.HpcService
|
||||
}
|
||||
|
||||
const (
|
||||
statusSaved = "Saved"
|
||||
statusDeploying = "Deploying"
|
||||
adapterTypeHPC = "2"
|
||||
)
|
||||
|
||||
type JobRequest struct {
|
||||
App string `json:"app"`
|
||||
Common CommonParams `json:"common"`
|
||||
AppSpecific map[string]interface{} `json:"appSpecific"`
|
||||
}
|
||||
type CommonParams struct {
|
||||
JobName string `json:"jobName"`
|
||||
Partition string `json:"partition"`
|
||||
Nodes string `json:"nodes"`
|
||||
NTasks string `json:"ntasks"`
|
||||
Time string `json:"time,omitempty"`
|
||||
}
|
||||
|
||||
func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
|
||||
cache := make(map[string]interface{}, 10)
|
||||
hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache)
|
||||
|
@ -38,57 +60,126 @@ func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Com
|
|||
}
|
||||
}
|
||||
|
||||
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
|
||||
req.Parameters["jobName"] = generateJobName(req)
|
||||
reqStr, _ := jsoniter.MarshalToString(req)
|
||||
yaml := utils.StringToYaml(reqStr)
|
||||
// 新增:缓存模板对象
|
||||
var templateCache = sync.Map{}
|
||||
|
||||
func (l *CommitHpcTaskLogic) getClusterInfo(clusterID string) (*types.ClusterInfo, *types.AdapterInfo, error) {
|
||||
var clusterInfo types.ClusterInfo
|
||||
l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)
|
||||
|
||||
if len(clusterInfo.Id) == 0 {
|
||||
return resp, errors.New("cluster not found")
|
||||
if err := l.svcCtx.DbEngin.Table("t_cluster").Where("id = ?", clusterID).First(&clusterInfo).Error; err != nil {
|
||||
return nil, nil, fmt.Errorf("cluster query failed: %w", err)
|
||||
}
|
||||
|
||||
// 构建主任务结构体
|
||||
userId, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64)
|
||||
taskModel := models.Task{
|
||||
Id: utils.GenSnowflakeID(),
|
||||
Name: req.Name,
|
||||
Description: req.Description,
|
||||
CommitTime: time.Now(),
|
||||
Status: "Saved",
|
||||
AdapterTypeDict: "2",
|
||||
UserId: userId,
|
||||
YamlString: *yaml,
|
||||
}
|
||||
|
||||
// 保存任务数据到数据库
|
||||
tx := l.svcCtx.DbEngin.Create(&taskModel)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
if clusterInfo.Id == "" {
|
||||
return nil, nil, errors.New("cluster not found")
|
||||
}
|
||||
|
||||
var adapterInfo types.AdapterInfo
|
||||
l.svcCtx.DbEngin.Raw("SELECT * FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterInfo)
|
||||
if adapterInfo.Id == "" {
|
||||
return resp, errors.New("adapter not found")
|
||||
if err := l.svcCtx.DbEngin.Table("t_adapter").Where("id = ?", clusterInfo.AdapterId).First(&adapterInfo).Error; err != nil {
|
||||
return nil, nil, fmt.Errorf("adapter query failed: %w", err)
|
||||
}
|
||||
clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
|
||||
cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64)
|
||||
timelimit, _ := strconv.ParseInt(req.Parameters["timeLimit"], 10, 64)
|
||||
hpcInfo := models.TaskHpc{
|
||||
|
||||
if adapterInfo.Id == "" {
|
||||
return nil, nil, errors.New("adapter not found")
|
||||
}
|
||||
|
||||
return &clusterInfo, &adapterInfo, nil
|
||||
}
|
||||
|
||||
func (l *CommitHpcTaskLogic) RenderJobScript(templateContent string, req *JobRequest) (string, error) {
|
||||
// 使用缓存模板
|
||||
tmpl, ok := templateCache.Load(templateContent)
|
||||
if !ok {
|
||||
parsedTmpl, err := template.New("jobScript").Parse(templateContent)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("template parse failed: %w", err)
|
||||
}
|
||||
templateCache.Store(templateContent, parsedTmpl)
|
||||
tmpl = parsedTmpl
|
||||
}
|
||||
|
||||
params := map[string]interface{}{
|
||||
"Common": req.Common,
|
||||
"App": req.AppSpecific,
|
||||
}
|
||||
|
||||
var buf strings.Builder
|
||||
if err := tmpl.(*template.Template).Execute(&buf, params); err != nil {
|
||||
return "", fmt.Errorf("template render failed: %w", err)
|
||||
}
|
||||
return buf.String(), nil
|
||||
}
|
||||
|
||||
func ConvertToJobRequest(job *types.CommitHpcTaskReq) (JobRequest, error) {
|
||||
required := []string{"jobName", "partition", "nodes", "ntasks"}
|
||||
for _, field := range required {
|
||||
if job.Parameters[field] == "" {
|
||||
return JobRequest{}, fmt.Errorf("%s is empty", field)
|
||||
}
|
||||
}
|
||||
|
||||
return JobRequest{
|
||||
App: job.App,
|
||||
Common: CommonParams{
|
||||
JobName: job.Parameters["jobName"],
|
||||
Partition: job.Parameters["partition"],
|
||||
Nodes: job.Parameters["nodes"],
|
||||
NTasks: job.Parameters["ntasks"],
|
||||
Time: job.Parameters["time"],
|
||||
},
|
||||
AppSpecific: utils.MpaStringToInterface(job.Parameters),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (l *CommitHpcTaskLogic) SaveHpcTaskToDB(req *types.CommitHpcTaskReq, jobScript, jobId, workDir string) (taskId string, err error) {
|
||||
// 使用事务确保数据一致性
|
||||
tx := l.svcCtx.DbEngin.Begin()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
tx.Rollback()
|
||||
err = fmt.Errorf("transaction panic: %v", r)
|
||||
} else if err != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
userID, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64)
|
||||
taskID := utils.GenSnowflakeID()
|
||||
taskModel := models.Task{
|
||||
Id: taskID,
|
||||
Name: req.Name,
|
||||
Description: req.Description,
|
||||
CommitTime: time.Now(),
|
||||
Status: statusSaved,
|
||||
AdapterTypeDict: adapterTypeHPC,
|
||||
UserId: userID,
|
||||
}
|
||||
|
||||
if err = tx.Table("task").Create(&taskModel).Error; err != nil {
|
||||
return "", fmt.Errorf("failed to create task: %w", err)
|
||||
}
|
||||
|
||||
clusterInfo, adapterInfo, err := l.getClusterInfo(req.ClusterId)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
paramsJSON, err := jsoniter.MarshalToString(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal parameters: %w", err)
|
||||
}
|
||||
|
||||
clusterID := utils.StringToInt64(clusterInfo.Id)
|
||||
hpcTask := models.TaskHpc{
|
||||
Id: utils.GenSnowflakeID(),
|
||||
TaskId: taskModel.Id,
|
||||
TaskId: taskID,
|
||||
AdapterId: clusterInfo.AdapterId,
|
||||
AdapterName: adapterInfo.Name,
|
||||
ClusterId: clusterId,
|
||||
ClusterId: clusterID,
|
||||
ClusterName: clusterInfo.Name,
|
||||
Name: taskModel.Name,
|
||||
Backend: req.Backend,
|
||||
OperateType: req.OperateType,
|
||||
CmdScript: req.Parameters["cmdScript"],
|
||||
CardCount: cardCount,
|
||||
WorkDir: req.Parameters["workDir"],
|
||||
WallTime: req.Parameters["wallTime"],
|
||||
AppType: req.Parameters["appType"],
|
||||
AppName: req.App,
|
||||
|
@ -100,70 +191,101 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
|
|||
Partition: req.Parameters["partition"],
|
||||
CreatedTime: time.Now(),
|
||||
UpdatedTime: time.Now(),
|
||||
Status: "Deploying",
|
||||
TimeLimit: timelimit,
|
||||
UserId: userId,
|
||||
YamlString: *yaml,
|
||||
Status: statusDeploying,
|
||||
UserId: userID,
|
||||
Params: paramsJSON,
|
||||
Script: jobScript,
|
||||
JobId: jobId,
|
||||
WorkDir: workDir,
|
||||
}
|
||||
hpcInfo.WorkDir = clusterInfo.WorkDir + req.Parameters["WorkDir"]
|
||||
tx = l.svcCtx.DbEngin.Create(&hpcInfo)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
|
||||
if err = tx.Table("task_hpc").Create(&hpcTask).Error; err != nil {
|
||||
return "", fmt.Errorf("failed to create HPC task: %w", err)
|
||||
}
|
||||
// 保存操作记录
|
||||
|
||||
noticeInfo := clientCore.NoticeInfo{
|
||||
AdapterId: clusterInfo.AdapterId,
|
||||
AdapterName: adapterInfo.Name,
|
||||
ClusterId: clusterId,
|
||||
ClusterId: clusterID,
|
||||
ClusterName: clusterInfo.Name,
|
||||
NoticeType: "create",
|
||||
TaskName: req.Name,
|
||||
TaskId: taskModel.Id,
|
||||
TaskId: taskID,
|
||||
Incident: "任务创建中",
|
||||
CreatedTime: time.Now(),
|
||||
}
|
||||
result := l.svcCtx.DbEngin.Table("t_notice").Create(¬iceInfo)
|
||||
if result.Error != nil {
|
||||
logx.Errorf("Task creation failure, err: %v", result.Error)
|
||||
}
|
||||
// 数据上链
|
||||
// 查询资源价格
|
||||
//var price int64
|
||||
//l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)
|
||||
|
||||
//bytes, _ := json.Marshal(taskModel)
|
||||
//remoteUtil.Evidence(remoteUtil.EvidenceParam{
|
||||
// UserIp: req.Parameters["UserIp"],
|
||||
// Url: l.svcCtx.Config.BlockChain.Url,
|
||||
// ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
|
||||
// FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
|
||||
// Type: l.svcCtx.Config.BlockChain.Type,
|
||||
// Token: req.Parameters["Token"],
|
||||
// Amount: price,
|
||||
// Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
|
||||
//})
|
||||
// 提交job到指定集群
|
||||
logx.Info("提交job到指定集群")
|
||||
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(l.ctx, *req)
|
||||
if err = tx.Table("t_notice").Create(¬iceInfo).Error; err != nil {
|
||||
return "", fmt.Errorf("failed to create notice: %w", err)
|
||||
}
|
||||
|
||||
if err = tx.Commit().Error; err != nil {
|
||||
return "", fmt.Errorf("transaction commit failed: %w", err)
|
||||
}
|
||||
|
||||
return utils.Int64ToString(taskID), nil
|
||||
}
|
||||
|
||||
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
|
||||
jobName := generateJobName(req)
|
||||
req.Parameters["jobName"] = jobName
|
||||
|
||||
// 获取集群和适配器信息
|
||||
clusterInfo, adapterInfo, err := l.getClusterInfo(req.ClusterId)
|
||||
if err != nil {
|
||||
logx.Errorf("提交Hpc到指定集群失败, err: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
// 更新任务状态
|
||||
updates := l.svcCtx.DbEngin.Model(&hpcInfo).Updates(models.TaskHpc{
|
||||
Id: hpcInfo.Id,
|
||||
JobId: resp.Data.JobInfo["jobId"],
|
||||
WorkDir: resp.Data.JobInfo["jobDir"],
|
||||
})
|
||||
if updates.Error != nil {
|
||||
return nil, updates.Error
|
||||
|
||||
// 获取模板
|
||||
var templateInfo types.HpcAppTemplateInfo
|
||||
tx := l.svcCtx.DbEngin.Table("hpc_app_template").
|
||||
Where("cluster_id = ? and app = ? ", req.ClusterId, req.App)
|
||||
if req.OperateType != "" {
|
||||
tx.Where("app_type = ?", req.OperateType)
|
||||
}
|
||||
resp.Data.JobInfo["taskId"] = strconv.FormatInt(taskModel.Id, 10)
|
||||
logx.Infof("提交job到指定集群成功, resp: %v", resp)
|
||||
if err := tx.First(&templateInfo).Error; err != nil {
|
||||
return nil, fmt.Errorf("failed to get template: %w", err)
|
||||
}
|
||||
|
||||
// 转换请求参数
|
||||
jobRequest, err := ConvertToJobRequest(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid job request: %w", err)
|
||||
}
|
||||
|
||||
// 渲染脚本
|
||||
script, err := l.RenderJobScript(templateInfo.Content, &jobRequest)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("script rendering failed: %w", err)
|
||||
}
|
||||
|
||||
q, _ := jsoniter.MarshalToString(script)
|
||||
submitQ := types.SubmitHpcTaskReq{
|
||||
App: req.App,
|
||||
ClusterId: req.ClusterId,
|
||||
JobName: jobName,
|
||||
ScriptContent: script,
|
||||
Parameters: req.Parameters,
|
||||
Backend: req.Backend,
|
||||
}
|
||||
log.Info().Msgf("Submitting HPC task to cluster %s with params: %s", clusterInfo.Name, q)
|
||||
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(l.ctx, submitQ)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("task submission failed: %w", err)
|
||||
}
|
||||
|
||||
jobID := resp.Data.JobInfo["jobId"]
|
||||
workDir := resp.Data.JobInfo["jobDir"]
|
||||
taskID, err := l.SaveHpcTaskToDB(req, script, jobID, workDir)
|
||||
if err != nil {
|
||||
log.Error().Msgf("Failed to save task to DB: %v", err)
|
||||
return nil, fmt.Errorf("db save failed: %w", err)
|
||||
}
|
||||
|
||||
resp.Data.JobInfo["taskId"] = taskID
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// generateJobName 根据条件生成 jobName
|
||||
func generateJobName(req *types.CommitHpcTaskReq) string {
|
||||
if req.OperateType == "" {
|
||||
return req.Name
|
||||
|
|
|
@ -36,7 +36,7 @@ func NewGetHpcTaskLogLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Get
|
|||
func (l *GetHpcTaskLogLogic) GetHpcTaskLog(req *types.HpcTaskLogReq) (resp interface{}, err error) {
|
||||
var hpcR TaskHPCResult
|
||||
tx := l.svcCtx.DbEngin.Raw(
|
||||
"SELECT t.id, hpc.job_id ,hpc.adapter_id FROM task t "+
|
||||
"SELECT t.id, hpc.job_id ,hpc.adapter_id ,hpc.cluster_id FROM task t "+
|
||||
"INNER JOIN task_hpc hpc ON t.id = hpc.task_id "+
|
||||
"WHERE adapter_type_dict = 2 AND t.id = ?",
|
||||
req.TaskId,
|
||||
|
@ -53,7 +53,7 @@ func (l *GetHpcTaskLogLogic) GetHpcTaskLog(req *types.HpcTaskLogReq) (resp inter
|
|||
return nil, fmt.Errorf("adapter not found")
|
||||
}
|
||||
// 取消作业
|
||||
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].GetTaskLogs(l.ctx, hpcR.JobID)
|
||||
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].GetTaskLogs(l.ctx, hpcR.JobID, hpcR.ClusterId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -7,10 +7,10 @@ import (
|
|||
)
|
||||
|
||||
type HPCCollector interface {
|
||||
GetTask(ctx context.Context, taskId string) (*Task, error)
|
||||
SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error)
|
||||
CancelTask(ctx context.Context, jobId string) error
|
||||
GetTaskLogs(ctx context.Context, jobId string) (interface{}, error)
|
||||
GetTask(ctx context.Context, taskId string, clusterId string) (*Task, error)
|
||||
SubmitTask(ctx context.Context, req types.SubmitHpcTaskReq) (*types.CommitHpcTaskResp, error)
|
||||
CancelTask(ctx context.Context, jobId string, clusterId string) error
|
||||
GetTaskLogs(ctx context.Context, jobId string, clusterId string) (interface{}, error)
|
||||
}
|
||||
|
||||
type JobInfo struct {
|
||||
|
|
|
@ -23,11 +23,10 @@ type ParticipantHpc struct {
|
|||
}
|
||||
|
||||
const (
|
||||
BackendSlurm = "slurm"
|
||||
JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}"
|
||||
JobDetailUrl = "/api/v1/jobs/detail/{clusterId}/{jobId}"
|
||||
SubmitTaskUrl = "/api/v1/jobs"
|
||||
CancelTaskUrl = "/api/v1/jobs/cancel/{backend}/{jobId}"
|
||||
JobLogUrl = "/api/v1/jobs/logs/{backend}/{jobId}"
|
||||
CancelTaskUrl = "/api/v1/jobs/cancel/{clusterId}/{jobId}"
|
||||
JobLogUrl = "/api/v1/jobs/logs/{clusterId}/{jobId}"
|
||||
)
|
||||
|
||||
func NewHpc(host string, id int64, platform string) *ParticipantHpc {
|
||||
|
@ -39,13 +38,19 @@ func NewHpc(host string, id int64, platform string) *ParticipantHpc {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) {
|
||||
func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string, clusterId string) (*collector.Task, error) {
|
||||
reqUrl := c.host + JobDetailUrl
|
||||
hpcResp := &collector.HpcJobDetailResp{}
|
||||
httpClient := resty.New().R()
|
||||
_, err := httpClient.SetHeader("Content-Type", "application/json").
|
||||
SetPathParam("jobId", taskId).
|
||||
SetPathParam("backend", "slurm").
|
||||
_, err := httpClient.SetHeaders(
|
||||
map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"traceId": result.TraceIDFromContext(ctx),
|
||||
}).
|
||||
SetPathParams(map[string]string{
|
||||
"clusterId": clusterId,
|
||||
"jobId": taskId,
|
||||
}).
|
||||
SetResult(&hpcResp).
|
||||
Get(reqUrl)
|
||||
if err != nil {
|
||||
|
@ -83,7 +88,7 @@ func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector
|
|||
return &resp, nil
|
||||
}
|
||||
|
||||
func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) {
|
||||
func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.SubmitHpcTaskReq) (*types.CommitHpcTaskResp, error) {
|
||||
reqUrl := c.host + SubmitTaskUrl
|
||||
resp := types.CommitHpcTaskResp{}
|
||||
logx.WithContext(ctx).Infof("提交任务到超算集群, url: %s, req: %+v", reqUrl, req)
|
||||
|
@ -104,7 +109,7 @@ func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTask
|
|||
return &resp, nil
|
||||
}
|
||||
|
||||
func (c *ParticipantHpc) CancelTask(ctx context.Context, jobId string) error {
|
||||
func (c *ParticipantHpc) CancelTask(ctx context.Context, jobId string, clusterId string) error {
|
||||
reqUrl := c.host + CancelTaskUrl
|
||||
resp := types.CommonResp{}
|
||||
logx.WithContext(ctx).Infof("取消超算集群任务, url: %s, jobId: %s", reqUrl, jobId)
|
||||
|
@ -114,8 +119,8 @@ func (c *ParticipantHpc) CancelTask(ctx context.Context, jobId string) error {
|
|||
"Content-Type": "application/json",
|
||||
"traceId": result.TraceIDFromContext(ctx),
|
||||
}).SetPathParams(map[string]string{
|
||||
"backend": BackendSlurm,
|
||||
"jobId": jobId,
|
||||
"clusterId": clusterId,
|
||||
"jobId": jobId,
|
||||
}).SetResult(&resp).Delete(reqUrl)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -126,7 +131,7 @@ func (c *ParticipantHpc) CancelTask(ctx context.Context, jobId string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *ParticipantHpc) GetTaskLogs(ctx context.Context, jobId string) (interface{}, error) {
|
||||
func (c *ParticipantHpc) GetTaskLogs(ctx context.Context, jobId string, clusterId string) (interface{}, error) {
|
||||
logx.WithContext(ctx).Infof("获取超算集群任务日志, url: %s, jobId: %s", JobLogUrl, jobId)
|
||||
if jobId == "" {
|
||||
return nil, fmt.Errorf("jobId is empty")
|
||||
|
@ -137,8 +142,8 @@ func (c *ParticipantHpc) GetTaskLogs(ctx context.Context, jobId string) (interfa
|
|||
"Content-Type": "application/json",
|
||||
"traceId": result.TraceIDFromContext(ctx),
|
||||
}).SetPathParams(map[string]string{
|
||||
"backend": BackendSlurm,
|
||||
"jobId": jobId,
|
||||
"clusterId": clusterId,
|
||||
"jobId": jobId,
|
||||
}).SetResult(&resp)
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
@ -64,8 +65,9 @@ func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
|
|||
logx.Errorf(tx.Error.Error())
|
||||
break
|
||||
}
|
||||
clusterId := utils.Int64ToString(hpc.ClusterId)
|
||||
h := http.Request{}
|
||||
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId)
|
||||
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId)
|
||||
if err != nil {
|
||||
logx.Errorf(err.Error())
|
||||
break
|
||||
|
|
11557
internal/types/types.go
11557
internal/types/types.go
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,29 @@
|
|||
package models
|
||||
|
||||
import "github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
|
||||
var _ HpcAppTemplateModel = (*customHpcAppTemplateModel)(nil)
|
||||
|
||||
type (
|
||||
// HpcAppTemplateModel is an interface to be customized, add more methods here,
|
||||
// and implement the added methods in customHpcAppTemplateModel.
|
||||
HpcAppTemplateModel interface {
|
||||
hpcAppTemplateModel
|
||||
withSession(session sqlx.Session) HpcAppTemplateModel
|
||||
}
|
||||
|
||||
customHpcAppTemplateModel struct {
|
||||
*defaultHpcAppTemplateModel
|
||||
}
|
||||
)
|
||||
|
||||
// NewHpcAppTemplateModel returns a model for the database table.
|
||||
func NewHpcAppTemplateModel(conn sqlx.SqlConn) HpcAppTemplateModel {
|
||||
return &customHpcAppTemplateModel{
|
||||
defaultHpcAppTemplateModel: newHpcAppTemplateModel(conn),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *customHpcAppTemplateModel) withSession(session sqlx.Session) HpcAppTemplateModel {
|
||||
return NewHpcAppTemplateModel(sqlx.NewSqlConnFromSession(session))
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
// Code generated by goctl. DO NOT EDIT.
|
||||
// versions:
|
||||
// goctl version: 1.8.1
|
||||
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"gorm.io/gorm"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/zeromicro/go-zero/core/stores/builder"
|
||||
"github.com/zeromicro/go-zero/core/stores/sqlx"
|
||||
"github.com/zeromicro/go-zero/core/stringx"
|
||||
)
|
||||
|
||||
var (
|
||||
hpcAppTemplateFieldNames = builder.RawFieldNames(&HpcAppTemplate{})
|
||||
hpcAppTemplateRows = strings.Join(hpcAppTemplateFieldNames, ",")
|
||||
hpcAppTemplateRowsExpectAutoSet = strings.Join(stringx.Remove(hpcAppTemplateFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), ",")
|
||||
hpcAppTemplateRowsWithPlaceHolder = strings.Join(stringx.Remove(hpcAppTemplateFieldNames, "`id`", "`create_at`", "`create_time`", "`created_at`", "`update_at`", "`update_time`", "`updated_at`"), "=?,") + "=?"
|
||||
)
|
||||
|
||||
type (
|
||||
hpcAppTemplateModel interface {
|
||||
Insert(ctx context.Context, data *HpcAppTemplate) (sql.Result, error)
|
||||
FindOne(ctx context.Context, id int64) (*HpcAppTemplate, error)
|
||||
Update(ctx context.Context, data *HpcAppTemplate) error
|
||||
Delete(ctx context.Context, id int64) error
|
||||
}
|
||||
|
||||
defaultHpcAppTemplateModel struct {
|
||||
conn sqlx.SqlConn
|
||||
table string
|
||||
}
|
||||
|
||||
HpcAppTemplate struct {
|
||||
Id int64 `db:"id"`
|
||||
Name string `db:"name"` // 模板名称
|
||||
App string `db:"app"` // 应用名称
|
||||
AppType string `db:"app_type"` // 应用类型
|
||||
ClusterId int64 `db:"cluster_id"` // 集群id
|
||||
Content string `db:"content"` // 模板内容
|
||||
Description string `db:"description"` // 模板描述
|
||||
Status int64 `db:"status"` // 状态0-禁用, 1-启用
|
||||
CreateTime time.Time `db:"create_time" json:"createTime"` // 创建时间
|
||||
UpdateTime time.Time `db:"update_time" json:"updateTime"` // 更新时间
|
||||
DeletedAt gorm.DeletedAt `db:"deleted_at" json:"-"` // 删除时间
|
||||
}
|
||||
)
|
||||
|
||||
func newHpcAppTemplateModel(conn sqlx.SqlConn) *defaultHpcAppTemplateModel {
|
||||
return &defaultHpcAppTemplateModel{
|
||||
conn: conn,
|
||||
table: "`hpc_app_template`",
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultHpcAppTemplateModel) Delete(ctx context.Context, id int64) error {
|
||||
query := fmt.Sprintf("delete from %s where `id` = ?", m.table)
|
||||
_, err := m.conn.ExecCtx(ctx, query, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *defaultHpcAppTemplateModel) FindOne(ctx context.Context, id int64) (*HpcAppTemplate, error) {
|
||||
query := fmt.Sprintf("select %s from %s where `id` = ? limit 1", hpcAppTemplateRows, m.table)
|
||||
var resp HpcAppTemplate
|
||||
err := m.conn.QueryRowCtx(ctx, &resp, query, id)
|
||||
switch err {
|
||||
case nil:
|
||||
return &resp, nil
|
||||
case sqlx.ErrNotFound:
|
||||
return nil, ErrNotFound
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (m *defaultHpcAppTemplateModel) Insert(ctx context.Context, data *HpcAppTemplate) (sql.Result, error) {
|
||||
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?)", m.table, hpcAppTemplateRowsExpectAutoSet)
|
||||
ret, err := m.conn.ExecCtx(ctx, query, data.Name, data.App, data.AppType, data.ClusterId, data.Content, data.Description, data.Status, data.DeletedAt)
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (m *defaultHpcAppTemplateModel) Update(ctx context.Context, data *HpcAppTemplate) error {
|
||||
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, hpcAppTemplateRowsWithPlaceHolder)
|
||||
_, err := m.conn.ExecCtx(ctx, query, data.Name, data.App, data.AppType, data.ClusterId, data.Content, data.Description, data.Status, data.DeletedAt, data.Id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *defaultHpcAppTemplateModel) tableName() string {
|
||||
return m.table
|
||||
}
|
|
@ -82,6 +82,9 @@ type (
|
|||
UpdatedTime time.Time `db:"updated_time"` // 更新时间
|
||||
UserId int64 `db:"user_id"`
|
||||
TimeLimit int64 `db:"time_limit"`
|
||||
Params string `db:"params"` // 渲染参数
|
||||
Script string `db:"script"` // 生成的脚本
|
||||
TemplateId int64 `db:"template_id"` // 模板ID
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -48,3 +48,21 @@ func IsValidHostAddress(input string) bool {
|
|||
}
|
||||
return re.MatchString(input)
|
||||
}
|
||||
|
||||
func MpaStringToInterface(m map[string]string) map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
for k, v := range m {
|
||||
result[k] = v
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func MapInterfaceToString(m map[string]interface{}) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for k, v := range m {
|
||||
if str, ok := v.(string); ok {
|
||||
result[k] = str
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue