forked from JointCloud/pcm-coordinator
Merge pull request '超算提交任务更新' (#441) from zhouqj_dev into master
This commit is contained in:
commit
75343be0cb
|
@ -10,37 +10,33 @@ info(
|
|||
|
||||
type (
|
||||
commitHpcTaskReq {
|
||||
ClusterId string `json:"clusterId,optional"`
|
||||
Name string `json:"name"`
|
||||
Account string `json:"account,optional"`
|
||||
Backend string `json:"backend"`
|
||||
ClusterId string `json:"clusterId"`
|
||||
App string `json:"app"`
|
||||
Description string `json:"description,optional"`
|
||||
TenantId int64 `json:"tenantId,optional"`
|
||||
TaskId int64 `json:"taskId,optional"`
|
||||
AdapterIds []string `json:"adapterIds,optional"`
|
||||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||
CardCount int64 `json:"cardCount,optional"`
|
||||
WorkDir string `json:"workDir,optional"` //paratera:workingDir
|
||||
WallTime string `json:"wallTime,optional"`
|
||||
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
|
||||
AppType string `json:"appType,optional"`
|
||||
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
|
||||
Queue string `json:"queue,optional"`
|
||||
NNode string `json:"nNode,optional"`
|
||||
SubmitType string `json:"submitType,optional"`
|
||||
StdInput string `json:"stdInput,optional"`
|
||||
ClusterType string `json:"clusterType,optional"`
|
||||
Partition string `json:"partition"`
|
||||
UserId int64 `json:"userId,optional"`
|
||||
Token string `json:"token,optional"`
|
||||
UserIp string `json:"userIp,optional"`
|
||||
}
|
||||
|
||||
commitHpcTaskResp {
|
||||
ClusterId int64 `json:"clusterId"`
|
||||
JobId string `json:"jobId"`
|
||||
OperateType string `json:"operateType,optional"`
|
||||
Parameters map[string]string `json:"parameters"`
|
||||
CustomParams map[string]string `json:"customParams"`
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
type (
|
||||
commitHpcTaskResp {
|
||||
Code int `json:"code"`
|
||||
Data Data `json:"data"`
|
||||
Msg string `json:"msg"`
|
||||
TraceId string `json:"trace_id"`
|
||||
}
|
||||
Data {
|
||||
Backend string `json:"backend"`
|
||||
JobInfo map[string]string `json:"jobInfo"`
|
||||
}
|
||||
|
||||
)
|
||||
|
||||
|
||||
type (
|
||||
hpcOverViewReq {
|
||||
}
|
||||
|
@ -160,7 +156,6 @@ type (
|
|||
InstanceType int32 `form:"instanceType,optional"`
|
||||
InstanceClass string `form:"instanceClass,optional"`
|
||||
InstanceName string `form:"instanceName,optional"`
|
||||
PageInfo
|
||||
}
|
||||
HpcInstanceCenterResp {
|
||||
InstanceCenterList []HpcInstanceCenterList `json:"instanceCenterList" copier:"InstanceCenterList"`
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||
|
@ -21,15 +22,15 @@ func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
|||
}
|
||||
// 获取ip信息
|
||||
ip := utils.GetClientIP(r)
|
||||
req.UserIp = ip
|
||||
req.Parameters["UserIp"] = ip
|
||||
// 获取token信息
|
||||
token := r.Header.Get("Authorization")
|
||||
req.Token = token
|
||||
req.Parameters["Token"] = token
|
||||
// 获取用户信息
|
||||
userStr := r.Header.Get("User")
|
||||
user := &models.JccUserInfo{}
|
||||
json.Unmarshal([]byte(userStr), user)
|
||||
req.UserId = user.Id
|
||||
req.Parameters["UserId"] = strconv.FormatInt(user.Id, 10)
|
||||
|
||||
l := hpc.NewCommitHpcTaskLogic(r.Context(), svcCtx)
|
||||
resp, err := l.CommitHpcTask(&req)
|
||||
|
|
|
@ -6,9 +6,6 @@ import (
|
|||
"github.com/go-resty/resty/v2"
|
||||
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil"
|
||||
v1 "gitlink.org.cn/JointCloud/pcm-hpc/routers/v1"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -32,6 +29,27 @@ func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Com
|
|||
}
|
||||
}
|
||||
|
||||
type JobSpec struct {
|
||||
Name string // 应用名称: BWA/lammps
|
||||
Backend string // 后端类型:slurm/sugonac
|
||||
App string
|
||||
OperateType string // 应用内操作类型: bwa:构建索引/对比序列
|
||||
Parameters map[string]string // 通用参数
|
||||
CustomParams map[string]string // 各平台自定义参数
|
||||
}
|
||||
type ResultParticipant struct {
|
||||
Code int `json:"code"`
|
||||
Data struct {
|
||||
Backend string `json:"backend"`
|
||||
JobInfo struct {
|
||||
JobDir string `json:"jobDir"`
|
||||
JobId string `json:"jobId"`
|
||||
} `json:"jobInfo"`
|
||||
} `json:"data"`
|
||||
Msg string `json:"msg"`
|
||||
TraceId string `json:"trace_id"`
|
||||
}
|
||||
|
||||
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
|
||||
|
||||
var clusterInfo types.ClusterInfo
|
||||
|
@ -42,13 +60,14 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
|
|||
}
|
||||
|
||||
// 构建主任务结构体
|
||||
userId, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64)
|
||||
taskModel := models.Task{
|
||||
Name: req.Name,
|
||||
Description: req.Description,
|
||||
CommitTime: time.Now(),
|
||||
Status: "Running",
|
||||
AdapterTypeDict: "2",
|
||||
UserId: req.UserId,
|
||||
UserId: userId,
|
||||
}
|
||||
|
||||
// 保存任务数据到数据库
|
||||
|
@ -65,6 +84,8 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
|
|||
return nil, errors.New("no corresponding adapter found")
|
||||
}
|
||||
clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
|
||||
cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64)
|
||||
timelimit, _ := strconv.ParseInt(req.Parameters["timeLimit"], 10, 64)
|
||||
hpcInfo := models.TaskHpc{
|
||||
TaskId: taskModel.Id,
|
||||
AdapterId: clusterInfo.AdapterId,
|
||||
|
@ -72,24 +93,27 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
|
|||
ClusterId: clusterId,
|
||||
ClusterName: clusterInfo.Name,
|
||||
Name: taskModel.Name,
|
||||
CmdScript: req.CmdScript,
|
||||
Backend: req.Backend,
|
||||
OperateType: req.OperateType,
|
||||
CmdScript: req.Parameters["cmdScript"],
|
||||
StartTime: time.Now().String(),
|
||||
CardCount: req.CardCount,
|
||||
WorkDir: req.WorkDir,
|
||||
WallTime: req.WallTime,
|
||||
AppType: req.AppType,
|
||||
AppName: req.AppName,
|
||||
Queue: req.Queue,
|
||||
SubmitType: req.SubmitType,
|
||||
NNode: req.NNode,
|
||||
CardCount: cardCount,
|
||||
WorkDir: req.Parameters["workDir"],
|
||||
WallTime: req.Parameters["wallTime"],
|
||||
AppType: req.Parameters["appType"],
|
||||
AppName: req.Parameters["appName"],
|
||||
Queue: req.Parameters["queue"],
|
||||
SubmitType: req.Parameters["submitType"],
|
||||
NNode: req.Parameters["nNode"],
|
||||
Account: clusterInfo.Username,
|
||||
StdInput: req.StdInput,
|
||||
Partition: req.Partition,
|
||||
StdInput: req.Parameters["stdInput"],
|
||||
Partition: req.Parameters["partition"],
|
||||
CreatedTime: time.Now(),
|
||||
UpdatedTime: time.Now(),
|
||||
Status: "Running",
|
||||
TimeLimit: timelimit,
|
||||
}
|
||||
hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir
|
||||
hpcInfo.WorkDir = clusterInfo.WorkDir + req.Parameters["WorkDir"]
|
||||
tx = l.svcCtx.DbEngin.Create(&hpcInfo)
|
||||
if tx.Error != nil {
|
||||
return nil, tx.Error
|
||||
|
@ -109,64 +133,46 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
|
|||
if result.Error != nil {
|
||||
logx.Errorf("Task creation failure, err: %v", result.Error)
|
||||
}
|
||||
resp = &types.CommitHpcTaskResp{
|
||||
JobId: string(""),
|
||||
}
|
||||
// 数据上链
|
||||
// 查询资源价格
|
||||
var price int64
|
||||
l.svcCtx.DbEngin.Raw("select price from resource_cost where resource_id = ?", clusterId).Scan(&price)
|
||||
l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)
|
||||
|
||||
bytes, _ := json.Marshal(taskModel)
|
||||
remoteUtil.Evidence(remoteUtil.EvidenceParam{
|
||||
UserIp: req.UserIp,
|
||||
Url: l.svcCtx.Config.BlockChain.Url,
|
||||
ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
|
||||
FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
|
||||
Type: l.svcCtx.Config.BlockChain.Type,
|
||||
Token: req.Token,
|
||||
Amount: price,
|
||||
Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
|
||||
})
|
||||
//bytes, _ := json.Marshal(taskModel)
|
||||
//remoteUtil.Evidence(remoteUtil.EvidenceParam{
|
||||
// UserIp: req.Parameters["UserIp"],
|
||||
// Url: l.svcCtx.Config.BlockChain.Url,
|
||||
// ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
|
||||
// FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
|
||||
// Type: l.svcCtx.Config.BlockChain.Type,
|
||||
// Token: req.Parameters["Token"],
|
||||
// Amount: price,
|
||||
// Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
|
||||
//})
|
||||
// 提交job到指定集群
|
||||
logx.Info("提交job到指定集群")
|
||||
go func() {
|
||||
submitJob(&hpcInfo, &clusterInfo, server)
|
||||
}()
|
||||
resp, _ = submitJob(req, server)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo, adapterAddress string) (int, error) {
|
||||
SubmitJobReq := v1.SubmitJobReq{
|
||||
Server: clusterInfo.Server,
|
||||
Version: clusterInfo.Version,
|
||||
Username: clusterInfo.Username,
|
||||
Token: clusterInfo.Token,
|
||||
JobOptions: v1.JobOptions{
|
||||
Script: hpcInfo.CmdScript,
|
||||
Job: &v1.JobProperties{
|
||||
Account: hpcInfo.Account,
|
||||
Name: hpcInfo.Name,
|
||||
NTasks: 1,
|
||||
CurrentWorkingDirectory: hpcInfo.WorkDir,
|
||||
Partition: hpcInfo.Partition,
|
||||
Environment: map[string]string{"PATH": clusterInfo.EnvPath,
|
||||
"LD_LIBRARY_PATH": clusterInfo.EnvLdPath},
|
||||
StandardOutput: hpcInfo.WorkDir + "/job.out",
|
||||
StandardError: hpcInfo.WorkDir + "/job.err",
|
||||
},
|
||||
},
|
||||
func submitJob(req *types.CommitHpcTaskReq, adapterAddress string) (resp *types.CommitHpcTaskResp, err error) {
|
||||
req.Parameters["jobName"] = req.Name
|
||||
reqParticipant := JobSpec{
|
||||
Name: req.Name,
|
||||
Backend: req.Backend,
|
||||
App: req.App,
|
||||
OperateType: req.OperateType,
|
||||
Parameters: req.Parameters,
|
||||
CustomParams: req.CustomParams,
|
||||
}
|
||||
var resp v1.SubmitJobResp
|
||||
httpClient := resty.New().R()
|
||||
logx.Info("远程调用p端接口开始")
|
||||
_, err := httpClient.SetHeader("Content-Type", "application/json").
|
||||
SetBody(SubmitJobReq).
|
||||
httpClient.SetHeader("Content-Type", "application/json").
|
||||
SetBody(reqParticipant).
|
||||
SetResult(&resp).
|
||||
Post(adapterAddress + "/api/v1/job/submit")
|
||||
Post(adapterAddress + "/api/v1/jobs")
|
||||
logx.Info("远程调用p端接口完成")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return resp.JobId, nil
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
|
|
@ -1307,34 +1307,26 @@ type ResourceCostRecord struct {
|
|||
}
|
||||
|
||||
type CommitHpcTaskReq struct {
|
||||
ClusterId string `json:"clusterId,optional"`
|
||||
Name string `json:"name"`
|
||||
Account string `json:"account,optional"`
|
||||
Description string `json:"description,optional"`
|
||||
TenantId int64 `json:"tenantId,optional"`
|
||||
TaskId int64 `json:"taskId,optional"`
|
||||
AdapterIds []string `json:"adapterIds,optional"`
|
||||
MatchLabels map[string]string `json:"matchLabels,optional"`
|
||||
CardCount int64 `json:"cardCount,optional"`
|
||||
WorkDir string `json:"workDir,optional"` //paratera:workingDir
|
||||
WallTime string `json:"wallTime,optional"`
|
||||
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
|
||||
AppType string `json:"appType,optional"`
|
||||
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
|
||||
Queue string `json:"queue,optional"`
|
||||
NNode string `json:"nNode,optional"`
|
||||
SubmitType string `json:"submitType,optional"`
|
||||
StdInput string `json:"stdInput,optional"`
|
||||
ClusterType string `json:"clusterType,optional"`
|
||||
Partition string `json:"partition"`
|
||||
UserId int64 `json:"userId,optional"`
|
||||
Token string `json:"token,optional"`
|
||||
UserIp string `json:"userIp,optional"`
|
||||
Name string `json:"name"`
|
||||
Backend string `json:"backend"` //
|
||||
ClusterId string `json:"clusterId"`
|
||||
App string `json:"app"`
|
||||
Description string `json:"description,optional"`
|
||||
OperateType string `json:"operateType,optional"`
|
||||
Parameters map[string]string `json:"parameters"`
|
||||
CustomParams map[string]string `json:"customParams"`
|
||||
}
|
||||
|
||||
type CommitHpcTaskResp struct {
|
||||
ClusterId int64 `json:"clusterId"`
|
||||
JobId string `json:"jobId"`
|
||||
Code int `json:"code"`
|
||||
Data Data `json:"data"`
|
||||
Msg string `json:"msg"`
|
||||
TraceId string `json:"trace_id"`
|
||||
}
|
||||
|
||||
type Data struct {
|
||||
Backend string `json:"backend"`
|
||||
JobInfo map[string]string `json:"jobInfo"`
|
||||
}
|
||||
|
||||
type HpcOverViewReq struct {
|
||||
|
|
|
@ -44,6 +44,8 @@ type (
|
|||
ClusterId int64 `db:"cluster_id"` //集群id
|
||||
ClusterName string `db:"cluster_name"` //集群名称
|
||||
Name string `db:"name"` // 名称
|
||||
Backend string `db:"backend"` // 平台类型
|
||||
OperateType string `db:"operate_type"` // 操作类型
|
||||
Status string `db:"status"` // 状态
|
||||
CmdScript string `db:"cmd_script"`
|
||||
StartTime string `db:"start_time"` // 开始时间
|
||||
|
@ -78,6 +80,7 @@ type (
|
|||
UpdatedBy int64 `db:"updated_by"` // 更新人
|
||||
UpdatedTime time.Time `db:"updated_time"` // 更新时间
|
||||
UserId int64 `db:"user_id"`
|
||||
TimeLimit int64 `db:"time_limit"`
|
||||
}
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue