Merge pull request 'bug fix' (#336) from zhangweiii/pcm-coordinator:master into master

This commit is contained in:
zhangweiii 2024-11-06 19:07:29 +08:00
commit 816f7a2cbc
10 changed files with 70 additions and 74 deletions

View File

@ -112,7 +112,8 @@ type HpcInfo struct {
StdOutFile string `json:"stdOutFile"` // 工作路径/std.err.%j
StdErrFile string `json:"stdErrFile"` // 工作路径/std.err.%j
StdInput string `json:"stdInput"`
Environment string `json:"environment"`
EnvPath string `json:"envPath"`
EnvLdPath string `json:"envLdPath"`
DeletedFlag int64 `json:"deletedFlag"` // 是否删除0-否1-是)
CreatedBy int64 `json:"createdBy"` // 创建人
CreatedTime time.Time `json:"createdTime"` // 创建时间

View File

@ -902,7 +902,7 @@ type (
}
ClusterInfo {
Id string `json:"id,omitempty" db:"id"`
AdapterId string `json:"adapterId,omitempty" db:"adapter_id"`
AdapterId int64 `json:"adapterId,omitempty" db:"adapter_id"`
Name string `json:"name,omitempty" db:"name"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Description string `json:"description,omitempty" db:"description"`
@ -923,7 +923,8 @@ type (
RegionDict string `json:"regionDict,omitempty" db:"region_dict"`
Location string `json:"location,omitempty" db:"location"`
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
Environment string `json:"environment,omitempty" db:"environment"`
EnvPath string `json:"envPath,omitempty" db:"env_path"`
EnvLdPath string `json:"envLdPath,omitempty" db:"env_ld_path"`
WorkDir string `json:"workDir,omitempty" db:"work_dir"`
}
)

View File

@ -33,9 +33,8 @@ type (
}
commitHpcTaskResp {
TaskId int64 `json:"taskId"`
Code int32 `json:"code"`
Msg string `json:"msg"`
ClusterId int64 `json:"clusterId"`
JobId string `json:"jobId"`
}
)

View File

@ -58,7 +58,7 @@ func (l *GetAdapterRelationLogic) GetAdapterRelation(req *types.AdapterRelationQ
cr = &types.ClusterRelationInfo{}
utils.Convert(&v, &cr)
cr.CId = c.Id
cr.CAdapterId = c.AdapterId
cr.CAdapterId = string(c.AdapterId)
cr.CName = c.Name
cr.CNickname = c.Nickname
cr.CDescription = c.Description

View File

@ -49,7 +49,6 @@ func (l *PullTaskInfoLogic) PullTaskInfo(req *clientCore.PullTaskInfoReq) (*clie
}
var clusterType string
l.svcCtx.DbEngin.Raw("SELECT label FROM `t_cluster` where id = ? ", hpcInfo.ClusterId).Scan(&clusterType)
utils.Convert(hpcInfo.Environment, &resp.HpcInfoList[i].Environment)
resp.HpcInfoList[i].ClusterType = clusterType
}
}

View File

@ -23,7 +23,6 @@ func NewCancelJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CancelJ
}
func (l *CancelJobLogic) CancelJob(req *types.CancelJobReq) error {
// todo: add your logic here and delete this line
var clusterInfo *types.ClusterInfo
tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where id = ?", req.ClusterId).Scan(&clusterInfo)
if tx.Error != nil {

View File

@ -5,6 +5,8 @@ import (
"errors"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-hpc/slurm"
"strconv"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
@ -33,9 +35,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)
if len(clusterInfo.Id) == 0 {
resp.Code = 400
resp.Msg = "no cluster found"
return resp, nil
return resp, errors.New("cluster not found")
}
// 构建主任务结构体
@ -43,7 +43,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
Name: req.Name,
Description: req.Description,
CommitTime: time.Now(),
Status: "Saved",
Status: "Running",
AdapterTypeDict: "2",
}
@ -53,22 +53,18 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
return nil, tx.Error
}
var clusterName string
var adapterId int64
var adapterName string
l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", req.ClusterId).Scan(&clusterName)
l.svcCtx.DbEngin.Raw("SELECT adapter_id FROM `t_cluster` where id = ?", req.ClusterId).Scan(&adapterId)
l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", adapterId).Scan(&adapterName)
l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName)
if len(adapterName) == 0 || adapterName == "" {
return nil, errors.New("no corresponding adapter found")
}
hpcInfo := models.TaskHpc{
TaskId: taskModel.Id,
AdapterId: adapterId,
AdapterId: clusterInfo.AdapterId,
AdapterName: adapterName,
ClusterId: req.ClusterId,
ClusterName: clusterName,
ClusterName: clusterInfo.Name,
Name: taskModel.Name,
CmdScript: req.CmdScript,
StartTime: time.Now().String(),
@ -83,21 +79,26 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
Account: clusterInfo.Username,
StdInput: req.StdInput,
Partition: req.Partition,
Environment: clusterInfo.Environment,
CreatedTime: time.Now(),
UpdatedTime: time.Now(),
Status: "Saved",
Status: "Running",
}
hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir
tx = l.svcCtx.DbEngin.Create(&hpcInfo)
if tx.Error != nil {
return nil, tx.Error
}
// 提交job到指定集群
jobId, err := submitJob(&hpcInfo, &clusterInfo)
if err != nil {
return nil, err
}
// 保存操作记录
noticeInfo := clientCore.NoticeInfo{
AdapterId: adapterId,
AdapterId: clusterInfo.AdapterId,
AdapterName: adapterName,
ClusterId: req.ClusterId,
ClusterName: clusterName,
ClusterName: clusterInfo.Name,
NoticeType: "create",
TaskName: req.Name,
Incident: "任务创建中",
@ -108,9 +109,46 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
logx.Errorf("Task creation failure, err: %v", result.Error)
}
resp = &types.CommitHpcTaskResp{
Code: 200,
Msg: "success",
TaskId: taskModel.Id,
JobId: string(jobId),
}
return resp, nil
}
func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo) (string, error) {
client, err := slurm.NewClient(slurm.ClientOptions{
URL: clusterInfo.Server,
ClientVersion: clusterInfo.Version,
RestUserName: clusterInfo.Username,
Token: clusterInfo.Token})
if err != nil {
return "", err
}
job, err := client.Job(slurm.JobOptions{})
if err != nil {
return "", err
}
// 封装请求参数
submitReq := slurm.JobOptions{
Script: hpcInfo.CmdScript,
Job: &slurm.JobProperties{
Account: hpcInfo.Account,
Name: hpcInfo.Name,
NTasks: 1,
CurrentWorkingDirectory: hpcInfo.WorkDir,
Partition: hpcInfo.Partition,
Environment: map[string]string{"PATH": clusterInfo.EnvPath,
"LD_LIBRARY_PATH": clusterInfo.EnvLdPath},
},
}
submitReq.Job.StandardOutput = submitReq.Job.CurrentWorkingDirectory + "/%j.out"
submitReq.Job.StandardError = submitReq.Job.CurrentWorkingDirectory + "/%j.err"
jobResp, err := job.SubmitJob(submitReq)
if err != nil {
return "", err
}
jobId := strconv.Itoa(jobResp.JobId)
return jobId, nil
}

View File

@ -835,7 +835,7 @@ type ClusterCreateReq struct {
type ClusterInfo struct {
Id string `json:"id,omitempty" db:"id"`
AdapterId string `json:"adapterId,omitempty" db:"adapter_id"`
AdapterId int64 `json:"adapterId,omitempty" db:"adapter_id"`
Name string `json:"name,omitempty" db:"name"`
Nickname string `json:"nickname,omitempty" db:"nickname"`
Description string `json:"description,omitempty" db:"description"`
@ -856,7 +856,8 @@ type ClusterInfo struct {
RegionDict string `json:"regionDict,omitempty" db:"region_dict"`
Location string `json:"location,omitempty" db:"location"`
CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
Environment string `json:"environment,omitempty" db:"environment"`
EnvPath string `json:"envPath,omitempty" db:"env_path"`
EnvLdPath string `json:"envLdPath,omitempty" db:"env_ld_path"`
WorkDir string `json:"workDir,omitempty" db:"work_dir"`
}
@ -1298,9 +1299,8 @@ type CommitHpcTaskReq struct {
}
type CommitHpcTaskResp struct {
TaskId int64 `json:"taskId"`
Code int32 `json:"code"`
Msg string `json:"msg"`
ClusterId int64 `json:"clusterId"`
JobId string `json:"jobId"`
}
type HpcOverViewReq struct {

View File

@ -1,29 +0,0 @@
package models
import "github.com/zeromicro/go-zero/core/stores/sqlx"
var _ TaskHpcModel = (*customTaskHpcModel)(nil)
type (
// TaskHpcModel is an interface to be customized, add more methods here,
// and implement the added methods in customTaskHpcModel.
TaskHpcModel interface {
taskHpcModel
withSession(session sqlx.Session) TaskHpcModel
}
customTaskHpcModel struct {
*defaultTaskHpcModel
}
)
// NewTaskHpcModel returns a model for the database table.
func NewTaskHpcModel(conn sqlx.SqlConn) TaskHpcModel {
return &customTaskHpcModel{
defaultTaskHpcModel: newTaskHpcModel(conn),
}
}
func (m *customTaskHpcModel) withSession(session sqlx.Session) TaskHpcModel {
return NewTaskHpcModel(sqlx.NewSqlConnFromSession(session))
}

View File

@ -72,12 +72,12 @@ type (
StdErrFile string `db:"std_err_file"` // 工作路径/std.err.%j
StdInput string `db:"std_input"`
Partition string `db:"partition"`
Environment string `db:"environment"`
DeletedFlag int64 `db:"deleted_flag"` // 是否删除0-否1-是)
CreatedBy int64 `db:"created_by"` // 创建人
CreatedTime time.Time `db:"created_time"` // 创建时间
UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间
}
)
@ -108,18 +108,6 @@ func (m *defaultTaskHpcModel) FindOne(ctx context.Context, id int64) (*TaskHpc,
}
}
func (m *defaultTaskHpcModel) Insert(ctx context.Context, data *TaskHpc) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, taskHpcRowsExpectAutoSet)
ret, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.JobId, data.ClusterId, data.Name, data.Status, data.CmdScript, data.StartTime, data.RunningTime, data.DerivedEs, data.Cluster, data.BlockId, data.AllocNodes, data.AllocCpu, data.CardCount, data.Version, data.Account, data.WorkDir, data.AssocId, data.ExitCode, data.WallTime, data.Result, data.DeletedAt, data.YamlString, data.AppType, data.AppName, data.Queue, data.SubmitType, data.NNode, data.StdOutFile, data.StdErrFile, data.StdInput, data.Environment, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime)
return ret, err
}
func (m *defaultTaskHpcModel) Update(ctx context.Context, data *TaskHpc) error {
query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, taskHpcRowsWithPlaceHolder)
_, err := m.conn.ExecCtx(ctx, query, data.TaskId, data.JobId, data.ClusterId, data.Name, data.Status, data.CmdScript, data.StartTime, data.RunningTime, data.DerivedEs, data.Cluster, data.BlockId, data.AllocNodes, data.AllocCpu, data.CardCount, data.Version, data.Account, data.WorkDir, data.AssocId, data.ExitCode, data.WallTime, data.Result, data.DeletedAt, data.YamlString, data.AppType, data.AppName, data.Queue, data.SubmitType, data.NNode, data.StdOutFile, data.StdErrFile, data.StdInput, data.Environment, data.DeletedFlag, data.CreatedBy, data.CreatedTime, data.UpdatedBy, data.UpdatedTime, data.Id)
return err
}
func (m *defaultTaskHpcModel) tableName() string {
return m.table
}