From b70c45167ceb69f6c7fcd3ba52438f0842fefd54 Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 6 Nov 2024 19:06:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=B6=85=E7=AE=97=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=8F=90=E4=BA=A4=20=E8=BF=94=E5=9B=9EjobId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/types.go | 3 +- desc/core/pcm-core.api | 5 +- desc/hpc/pcm-hpc.api | 5 +- .../logic/adapters/getadapterrelationlogic.go | 2 +- internal/logic/core/pulltaskinfologic.go | 1 - internal/logic/hpc/canceljoblogic.go | 1 - internal/logic/hpc/commithpctasklogic.go | 74 ++++++++++++++----- internal/types/types.go | 10 +-- pkg/models/taskhpcmodel.go | 29 -------- pkg/models/taskhpcmodel_gen.go | 14 +--- 10 files changed, 70 insertions(+), 74 deletions(-) delete mode 100644 pkg/models/taskhpcmodel.go diff --git a/client/types.go b/client/types.go index 43b063511..fe971d001 100644 --- a/client/types.go +++ b/client/types.go @@ -112,7 +112,8 @@ type HpcInfo struct { StdOutFile string `json:"stdOutFile"` // 工作路径/std.err.%j StdErrFile string `json:"stdErrFile"` // 工作路径/std.err.%j StdInput string `json:"stdInput"` - Environment string `json:"environment"` + EnvPath string `json:"envPath"` + EnvLdPath string `json:"envLdPath"` DeletedFlag int64 `json:"deletedFlag"` // 是否删除(0-否,1-是) CreatedBy int64 `json:"createdBy"` // 创建人 CreatedTime time.Time `json:"createdTime"` // 创建时间 diff --git a/desc/core/pcm-core.api b/desc/core/pcm-core.api index 873b38db3..e9fc40f16 100644 --- a/desc/core/pcm-core.api +++ b/desc/core/pcm-core.api @@ -902,7 +902,7 @@ type ( } ClusterInfo { Id string `json:"id,omitempty" db:"id"` - AdapterId string `json:"adapterId,omitempty" db:"adapter_id"` + AdapterId int64 `json:"adapterId,omitempty" db:"adapter_id"` Name string `json:"name,omitempty" db:"name"` Nickname string `json:"nickname,omitempty" db:"nickname"` Description string `json:"description,omitempty" db:"description"` @@ -923,7 +923,8 @@ type ( RegionDict string `json:"regionDict,omitempty" db:"region_dict"` Location string `json:"location,omitempty" db:"location"` CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - Environment string `json:"environment,omitempty" db:"environment"` + EnvPath string `json:"envPath,omitempty" db:"env_path"` + EnvLdPath string `json:"envLdPath,omitempty" db:"env_ld_path"` WorkDir string `json:"workDir,omitempty" db:"work_dir"` } ) diff --git a/desc/hpc/pcm-hpc.api b/desc/hpc/pcm-hpc.api index 6ddcb8747..a1d67390d 100644 --- a/desc/hpc/pcm-hpc.api +++ b/desc/hpc/pcm-hpc.api @@ -33,9 +33,8 @@ type ( } commitHpcTaskResp { - TaskId int64 `json:"taskId"` - Code int32 `json:"code"` - Msg string `json:"msg"` + ClusterId int64 `json:"clusterId"` + JobId string `json:"jobId"` } ) diff --git a/internal/logic/adapters/getadapterrelationlogic.go b/internal/logic/adapters/getadapterrelationlogic.go index 3e44e8b64..f4270e514 100644 --- a/internal/logic/adapters/getadapterrelationlogic.go +++ b/internal/logic/adapters/getadapterrelationlogic.go @@ -58,7 +58,7 @@ func (l *GetAdapterRelationLogic) GetAdapterRelation(req *types.AdapterRelationQ cr = &types.ClusterRelationInfo{} utils.Convert(&v, &cr) cr.CId = c.Id - cr.CAdapterId = c.AdapterId + cr.CAdapterId = string(c.AdapterId) cr.CName = c.Name cr.CNickname = c.Nickname cr.CDescription = c.Description diff --git a/internal/logic/core/pulltaskinfologic.go b/internal/logic/core/pulltaskinfologic.go index 671e07707..6e44d7de9 100644 --- a/internal/logic/core/pulltaskinfologic.go +++ b/internal/logic/core/pulltaskinfologic.go @@ -49,7 +49,6 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie } var clusterType string l.svcCtx.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType) - utils.Convert(hpcInfo.Environment, &resp.HpcInfoList[i].Environment) resp.HpcInfoList[i].ClusterType = clusterType } } diff --git a/internal/logic/hpc/canceljoblogic.go b/internal/logic/hpc/canceljoblogic.go index dc2699ee4..843b3e4ff 100644 --- a/internal/logic/hpc/canceljoblogic.go +++ b/internal/logic/hpc/canceljoblogic.go @@ -23,7 +23,6 @@ func NewCancelJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CancelJ } func (l *CancelJobLogic) CancelJob(req *types.CancelJobReq) error { - // todo: add your logic here and delete this line var clusterInfo *types.ClusterInfo tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where id = ?", req.ClusterId).Scan(&clusterInfo) if tx.Error != nil { diff --git a/internal/logic/hpc/commithpctasklogic.go b/internal/logic/hpc/commithpctasklogic.go index 03ae41762..9b5e24045 100644 --- a/internal/logic/hpc/commithpctasklogic.go +++ b/internal/logic/hpc/commithpctasklogic.go @@ -5,6 +5,8 @@ import ( "errors" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-hpc/slurm" + "strconv" "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" @@ -33,9 +35,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo) if len(clusterInfo.Id) == 0 { - resp.Code = 400 - resp.Msg = "no cluster found" - return resp, nil + return resp, errors.New("cluster not found") } // 构建主任务结构体 @@ -43,7 +43,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t Name: req.Name, Description: req.Description, CommitTime: time.Now(), - Status: "Saved", + Status: "Running", AdapterTypeDict: "2", } @@ -53,22 +53,18 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t return nil, tx.Error } - var clusterName string - var adapterId int64 var adapterName string - l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", req.ClusterId).Scan(&clusterName) - l.svcCtx.DbEngin.Raw("SELECT adapter_id FROM `t_cluster` where id = ?", req.ClusterId).Scan(&adapterId) - l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", adapterId).Scan(&adapterName) + l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName) if len(adapterName) == 0 || adapterName == "" { return nil, errors.New("no corresponding adapter found") } hpcInfo := models.TaskHpc{ TaskId: taskModel.Id, - AdapterId: adapterId, + AdapterId: clusterInfo.AdapterId, AdapterName: adapterName, ClusterId: req.ClusterId, - ClusterName: clusterName, + ClusterName: clusterInfo.Name, Name: taskModel.Name, CmdScript: req.CmdScript, StartTime: time.Now().String(), @@ -83,21 +79,26 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t Account: clusterInfo.Username, StdInput: req.StdInput, Partition: req.Partition, - Environment: clusterInfo.Environment, CreatedTime: time.Now(), UpdatedTime: time.Now(), - Status: "Saved", + Status: "Running", } hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir tx = l.svcCtx.DbEngin.Create(&hpcInfo) if tx.Error != nil { return nil, tx.Error } + // 提交job到指定集群 + jobId, err := submitJob(&hpcInfo, &clusterInfo) + if err != nil { + return nil, err + } + // 保存操作记录 noticeInfo := clientCore.NoticeInfo{ - AdapterId: adapterId, + AdapterId: clusterInfo.AdapterId, AdapterName: adapterName, ClusterId: req.ClusterId, - ClusterName: clusterName, + ClusterName: clusterInfo.Name, NoticeType: "create", TaskName: req.Name, Incident: "任务创建中", @@ -108,9 +109,46 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t logx.Errorf("Task creation failure, err: %v", result.Error) } resp = &types.CommitHpcTaskResp{ - Code: 200, - Msg: "success", - TaskId: taskModel.Id, + JobId: string(jobId), } + return resp, nil } + +func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo) (string, error) { + + client, err := slurm.NewClient(slurm.ClientOptions{ + URL: clusterInfo.Server, + ClientVersion: clusterInfo.Version, + RestUserName: clusterInfo.Username, + Token: clusterInfo.Token}) + if err != nil { + return "", err + } + job, err := client.Job(slurm.JobOptions{}) + if err != nil { + return "", err + } + // 封装请求参数 + submitReq := slurm.JobOptions{ + Script: hpcInfo.CmdScript, + Job: &slurm.JobProperties{ + Account: hpcInfo.Account, + Name: hpcInfo.Name, + NTasks: 1, + CurrentWorkingDirectory: hpcInfo.WorkDir, + Partition: hpcInfo.Partition, + Environment: map[string]string{"PATH": clusterInfo.EnvPath, + "LD_LIBRARY_PATH": clusterInfo.EnvLdPath}, + }, + } + submitReq.Job.StandardOutput = submitReq.Job.CurrentWorkingDirectory + "/%j.out" + submitReq.Job.StandardError = submitReq.Job.CurrentWorkingDirectory + "/%j.err" + + jobResp, err := job.SubmitJob(submitReq) + if err != nil { + return "", err + } + jobId := strconv.Itoa(jobResp.JobId) + return jobId, nil +} diff --git a/internal/types/types.go b/internal/types/types.go index 6d026f46f..608482b17 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -835,7 +835,7 @@ type ClusterCreateReq struct { type ClusterInfo struct { Id string `json:"id,omitempty" db:"id"` - AdapterId string `json:"adapterId,omitempty" db:"adapter_id"` + AdapterId int64 `json:"adapterId,omitempty" db:"adapter_id"` Name string `json:"name,omitempty" db:"name"` Nickname string `json:"nickname,omitempty" db:"nickname"` Description string `json:"description,omitempty" db:"description"` @@ -856,7 +856,8 @@ type ClusterInfo struct { RegionDict string `json:"regionDict,omitempty" db:"region_dict"` Location string `json:"location,omitempty" db:"location"` CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - Environment string `json:"environment,omitempty" db:"environment"` + EnvPath string `json:"envPath,omitempty" db:"env_path"` + EnvLdPath string `json:"envLdPath,omitempty" db:"env_ld_path"` WorkDir string `json:"workDir,omitempty" db:"work_dir"` } @@ -1298,9 +1299,8 @@ type CommitHpcTaskReq struct { } type CommitHpcTaskResp struct { - TaskId int64 `json:"taskId"` - Code int32 `json:"code"` - Msg string `json:"msg"` + ClusterId int64 `json:"clusterId"` + JobId string `json:"jobId"` } type HpcOverViewReq struct { diff --git a/pkg/models/taskhpcmodel.go b/pkg/models/taskhpcmodel.go deleted file mode 100644 index 0088303be..000000000 --- a/pkg/models/taskhpcmodel.go +++ /dev/null @@ -1,29 +0,0 @@ -package models - -import "github.com/zeromicro/go-zero/core/stores/sqlx" - -var _ TaskHpcModel = (*customTaskHpcModel)(nil) - -type ( - // TaskHpcModel is an interface to be customized, add more methods here, - // and implement the added methods in customTaskHpcModel. - TaskHpcModel interface { - taskHpcModel - withSession(session sqlx.Session) TaskHpcModel - } - - customTaskHpcModel struct { - *defaultTaskHpcModel - } -) - -// NewTaskHpcModel returns a model for the database table. -func NewTaskHpcModel(conn sqlx.SqlConn) TaskHpcModel { - return &customTaskHpcModel{ - defaultTaskHpcModel: newTaskHpcModel(conn), - } -} - -func (m *customTaskHpcModel) withSession(session sqlx.Session) TaskHpcModel { - return NewTaskHpcModel(sqlx.NewSqlConnFromSession(session)) -} diff --git a/pkg/models/taskhpcmodel_gen.go b/pkg/models/taskhpcmodel_gen.go index cc4c458a0..e99455f2e 100644 --- a/pkg/models/taskhpcmodel_gen.go +++ b/pkg/models/taskhpcmodel_gen.go @@ -72,12 +72,12 @@ type ( StdErrFile string `db:"std_err_file"` // 工作路径/std.err.%j StdInput string `db:"std_input"` Partition string `db:"partition"` - Environment string `db:"environment"` DeletedFlag int64 `db:"deleted_flag"` // 是否删除(0-否,1-是) CreatedBy int64 `db:"created_by"` // 创建人 CreatedTime time.Time `db:"created_time"` // 创建时间 UpdatedBy int64 `db:"updated_by"` // 更新人 UpdatedTime time.Time `db:"updated_time"` // 更新时间 + } ) @@ -108,18 +108,6 @@ func (m *defaultTaskHpcModel) FindOne(ctx context.Context, id int64) (*TaskHpc, } } -func (m *defaultTaskHpcModel) Insert(ctx context.Context, data *TaskHpc) (sql.Result, error) { - query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskHpcRowsExpectAutoSet) - ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.JobId, data.ClusterId, data.Name, data.Status, data.CmdScript, data.StartTime, data.RunningTime, data.DerivedEs, data.Cluster, data.BlockId, data.AllocNodes, data.AllocCpu, data.CardCount, data.Version, data.Account, data.WorkDir, data.AssocId, data.ExitCode, data.WallTime, data.Result, data.DeletedAt, data.YamlString, data.AppType, data.AppName, data.Queue, data.SubmitType, data.NNode, data.StdOutFile, data.StdErrFile, data.StdInput, data.Environment, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime) - return ret, err -} - -func (m *defaultTaskHpcModel) Update(ctx context.Context, data *TaskHpc) error { - query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskHpcRowsWithPlaceHolder) - _, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.JobId, data.ClusterId, data.Name, data.Status, data.CmdScript, data.StartTime, data.RunningTime, data.DerivedEs, data.Cluster, data.BlockId, data.AllocNodes, data.AllocCpu, data.CardCount, data.Version, data.Account, data.WorkDir, data.AssocId, data.ExitCode, data.WallTime, data.Result, data.DeletedAt, data.YamlString, data.AppType, data.AppName, data.Queue, data.SubmitType, data.NNode, data.StdOutFile, data.StdErrFile, data.StdInput, data.Environment, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id) - return err -} - func (m *defaultTaskHpcModel) tableName() string { return m.table }