task submit and status sync test finished

This commit is contained in:
zhouqunjie 2024-03-24 20:20:17 +08:00
parent 78961315a9
commit b46c813e99
13 changed files with 664 additions and 339 deletions

View File

@ -20,6 +20,7 @@ func (c *client) Token(options ClientOptions) (Token, error) {
}
func (c *client) Job(options JobOptions) (Job, error) {
job, _ := newJob(c, &options)
return job, nil
}

View File

@ -6,29 +6,34 @@ type JobOptions struct {
}
type SubmitJobReq struct {
Apptype string `json:"apptype,omitempty"`
Appname string `json:"appname,omitempty"`
StrJobManagerID int64 `json:"strJobManagerID,omitempty"`
MapAppJobInfo *MapAppJobInfo `json:"mapAppJobInfo,omitempty"`
}
type SubmitJobResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}
type MapAppJobInfo struct {
GapCmdFile string `json:"GAP_CMD_FILE,omitempty"` //命令行内容
GapNnode string `json:"GAP_NNODE,omitempty"` //节点个数当指定该参数时GAP_NODE_STRING必须为""
GapNodeString string `json:"GAP_NODE_STRING,omitempty"` //指定节点当指定该参数时GAP_NNODE必须为""
GapSubmitType string `json:"GAP_SUBMIT_TYPE,omitempty"` //cmd命令行模式
GapJobName string `json:"GAP_JOB_NAME,omitempty"` //作业名称
GapWorkDir string `json:"GAP_WORK_DIR,omitempty"` //工作路径
GapQueue string `json:"GAP_QUEUE,omitempty"` ///队列名称
GapNproc string `json:"GAP_NPROC,omitempty"` ///总核心数GAP_NPROC和GAP_PPN选其一填写
GapPpn string `json:"GAP_PPN,omitempty"` //CPU核心/节点GAP_NPROC和GAP_PPN选其一填写
GapNgpu string `json:"GAP_NGPU,omitempty"` //GPU卡数/节点
GapNdcu string `json:"GAP_NDCU,omitempty"` //DCU卡数/节点
GapJobMem string `json:"GAP_JOB_MEM,omitempty"` //每个节点内存值单位为MB/GB
GapWallTime string `json:"GAP_WALL_TIME,omitempty"` //最大运行时长HH:MM:ss
GapExclusive string `json:"GAP_EXCLUSIVE,omitempty"` // 是否独占节点1为独占空为非独占
GapAppname string `json:"GAP_APPNAME,omitempty"` //BASE基础应用支持填写具体的应用英文名称
GapMultiSub string `json:"GAP_MULTI_SUB,omitempty"` //作业组长度建议为小于等于50的正整数
GapCmdFile string `json:"GAP_CMD_FILE,omitempty"` //命令行内容
GapNnode string `json:"GAP_NNODE,omitempty"` //节点个数当指定该参数时GAP_NODE_STRING必须为""
GapNodeString string `json:"GAP_NODE_STRING,omitempty"` //指定节点当指定该参数时GAP_NNODE必须为""
GapSubmitType string `json:"GAP_SUBMIT_TYPE,omitempty"` //cmd命令行模式
GapJobName string `json:"GAP_JOB_NAME,omitempty"` //作业名称
GapWorkDir string `json:"GAP_WORK_DIR,omitempty"` //工作路径
GapQueue string `json:"GAP_QUEUE,omitempty"` ///队列名称
GapNproc string `json:"GAP_NPROC,omitempty"` ///总核心数GAP_NPROC和GAP_PPN选其一填写
//GapPpn string `json:"GAP_PPN,omitempty"` //CPU核心/节点GAP_NPROC和GAP_PPN选其一填写
//GapNgpu string `json:"GAP_NGPU,omitempty"` //GPU卡数/节点
GapNdcu string `json:"GAP_NDCU,omitempty"` //DCU卡数/节点
//GapJobMem string `json:"GAP_JOB_MEM,omitempty"` //每个节点内存值单位为MB/GB
GapWallTime string `json:"GAP_WALL_TIME,omitempty"` //最大运行时长HH:MM:ss
//GapExclusive string `json:"GAP_EXCLUSIVE,omitempty"` // 是否独占节点1为独占空为非独占
GapAppType string `json:"GAP_APPTYPE,omitempty"` //BASIS 应用类型
GapAppName string `json:"GAP_APPNAME,omitempty"` //BASE基础应用支持填写具体的应用英文名称
//GapMultiSub string `json:"GAP_MULTI_SUB,omitempty"` //作业组长度建议为小于等于50的正整数
GapStdOutFile string `json:"GAP_STD_OUT_FILE,omitempty"` //工作路径/std.out.%j
GapStdErrFile string `json:"GAP_STD_ERR_FILE,omitempty"` //工作路径/std.err.%j
}
@ -58,7 +63,35 @@ type ListJobReq struct {
// GetJobReq 作业详情请求体
type GetJobReq struct {
jobId string //作业id
JobId string //作业id
}
type GetJobResp struct {
Code string `json:"code"`
Msg string `json:"msg"`
Data struct {
Total int `json:"total"`
List []struct {
JobId string `json:"jobId"`
JobName string `json:"jobName"`
JobStatus string `json:"jobStatus"`
Queue string `json:"queue"`
User string `json:"user"`
NodeUsed string `json:"nodeUsed"`
ProcNumUsed int `json:"procNumUsed"`
JobStartTime string `json:"jobStartTime"`
JobRunTime string `json:"jobRunTime"`
JobVncSessionInfo interface{} `json:"jobVncSessionInfo"`
JobmanagerId string `json:"jobmanagerId"`
JobmanagerName string `json:"jobmanagerName"`
JobmanagerType string `json:"jobmanagerType"`
ErrorPath string `json:"errorPath"`
OutputPath string `json:"outputPath"`
WorkDir string `json:"workDir"`
Reason string `json:"reason"`
AppType string `json:"appType"`
} `json:"list"`
} `json:"data"`
}
// CancelJobReq 作业取消请求体
@ -68,7 +101,7 @@ type CancelJobReq struct {
type Job interface {
ListJob(listJobReq ListJobReq) string
GetJob(getJobReq GetJobReq) string
SubmitJob(submitJobReq SubmitJobReq) string
GetJob(getJobReq GetJobReq) (getJobResp GetJobResp)
SubmitJob(submitJobReq SubmitJobReq) (submitJobResp SubmitJobResp)
CancelJob(cancelJobReq CancelJobReq) string
}

View File

@ -28,7 +28,7 @@ func newJob(client *client, options *JobOptions) (*job, error) {
func (j *job) ListJob(listJobReq ListJobReq) string {
jobUrl := "/hpc/openapi/v2/jobs?"
clusterId := 1638523853
clusterId := 1637920656
params := map[string]string{
"strClusterIDList": strconv.FormatInt(int64(clusterId), 10),
}
@ -38,9 +38,19 @@ func (j *job) ListJob(listJobReq ListJobReq) string {
return string(result.Body())
}
func (j *job) GetJob(getJobReq GetJobReq) string {
//TODO implement me
panic("implement me")
func (j *job) GetJob(getJobReq GetJobReq) (getJobResp GetJobResp) {
jobUrl := "/hpc/openapi/v2/jobs?"
clusterId := 1637920656
params := map[string]string{
"strClusterIDList": strconv.FormatInt(int64(clusterId), 10),
"jobId": getJobReq.JobId,
}
httpClient := resty.New().R()
result, _ := httpClient.SetHeader("token", j.client.token).SetQueryParams(params).Get(j.client.baseEndpoint + jobUrl)
json.Unmarshal(result.Body(), &getJobResp)
return getJobResp
}
func (j *job) CancelJob(cancelJobReq CancelJobReq) string {
@ -48,11 +58,13 @@ func (j *job) CancelJob(cancelJobReq CancelJobReq) string {
panic("implement me")
}
func (j *job) SubmitJob(submitJobReq SubmitJobReq) string {
func (j *job) SubmitJob(submitJobReq SubmitJobReq) (submitJobResp SubmitJobResp) {
jobSubmitUrl := "/hpc/openapi/v2/apptemplates/{apptype}/{appname}/job"
jobSubmitUrl = strings.Replace(jobSubmitUrl, "{apptype}", submitJobReq.Apptype, -1)
jobSubmitUrl = strings.Replace(jobSubmitUrl, "{appname}", submitJobReq.Appname, -1)
jobSubmitUrl = strings.Replace(jobSubmitUrl, "{apptype}", submitJobReq.MapAppJobInfo.GapAppType, -1)
jobSubmitUrl = strings.Replace(jobSubmitUrl, "{appname}", submitJobReq.MapAppJobInfo.GapAppName, -1)
httpClient := resty.New().R()
jsonStr, _ := json.Marshal(submitJobReq)
params := map[string]string{
"content-type": "application/json",
@ -60,5 +72,6 @@ func (j *job) SubmitJob(submitJobReq SubmitJobReq) string {
}
result, _ := httpClient.SetHeaders(params).SetBody(jsonStr).Post(j.client.baseEndpoint + jobSubmitUrl)
return string(result.Body())
json.Unmarshal(result.Body(), &submitJobResp)
return submitJobResp
}

View File

@ -1,6 +1,7 @@
package ac
import (
"github.com/go-resty/resty/v2"
"log"
"sync"
)
@ -33,21 +34,20 @@ func newToken(client *client, options *ClientOptions) (*token, error) {
return token, nil
}
func (t *token) GetToken(options ClientOptions) string {
//var respAC hpcAC.ACTokenResp
//var tokenString string
//httpClient := resty.New().R()
//params := map[string]string{
// "user": options.User,
// "password": options.Password,
// "orgId": options.OrgId,
//}
//
//httpClient.SetHeaders(params).SetResult(&respAC).Post(options.TokenUrl)
//for _, dt := range respAC.Data {
// if dt.ClusterId == "11276" {
// tokenString = dt.Token
// }
//}
tokenString := ""
var respAC ACTokenResp
var tokenString string
httpClient := resty.New().R()
params := map[string]string{
"user": options.User,
"password": options.Password,
"orgId": options.OrgId,
}
httpClient.SetHeaders(params).SetResult(&respAC).Post(options.TokenUrl)
for _, dt := range respAC.Data {
if dt.ClusterId == "11276" {
tokenString = dt.Token
}
}
return tokenString
}

View File

@ -1,24 +1,21 @@
package config
type Config struct {
AdapterId int `yaml:"AdapterId"`
Adapter Adapter `yaml:"Adapter"`
CoreClientConf CoreClientConf `yaml:"CoreClientConf"`
ACClientConf ACClientConf `yaml:"ACClientConf"`
SlurmClientConf SlurmClientConf `yaml:"SlurmClientConf"`
ParateraClientConf ParateraClientConf `yaml:"ParateraClientConf"`
AdapterId string `yaml:"AdapterId"`
CoreUrl string `yaml:"CoreUrl"`
}
type Adapter struct {
Name string `yaml:"Name"`
Address string `yaml:"Address"`
RpcAddress string `yaml:"RpcAddress"`
Type string `yaml:"Type"`
TenantId int `yaml:"TenantId"`
TenantName string `yaml:"TenantName"`
Labels Labels `yaml:"Labels"`
MetricsUrl string `yaml:"MetricsUrl"`
}
//
//type Adapter struct {
// Name string `yaml:"Name"`
// Address string `yaml:"Address"`
// RpcAddress string `yaml:"RpcAddress"`
// Type string `yaml:"Type"`
// TenantId int `yaml:"TenantId"`
// TenantName string `yaml:"TenantName"`
// Labels Labels `yaml:"Labels"`
// MetricsUrl string `yaml:"MetricsUrl"`
//}
type Labels struct {
Cloud string `yaml:"cloud"`

View File

@ -1,164 +1,152 @@
package cron
import (
"encoding/json"
"github.com/go-resty/resty/v2"
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-hpc/ac"
"gitlink.org.cn/JointCloud/pcm-hpc/config"
"gitlink.org.cn/JointCloud/pcm-hpc/paratera"
"gitlink.org.cn/JointCloud/pcm-hpc/slurm"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
)
var SlurmClient slurm.Client
var ParateraClient paratera.Client
var AcClient ac.Client
var AdapterId int64
var CoreUrl string
func PullTaskInfo() {
yamlFile, err := ioutil.ReadFile("etc/hpc.yaml")
if err != nil {
log.Fatalf("error: %v", err)
}
//pull task from coordinator
var config config.Config
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
log.Fatalf("error: %v", err)
}
httpClient := resty.New().R()
SlurmClient, _ = slurm.NewClient(slurm.ClientOptions{
RestUserName: config.SlurmClientConf.RestUserName,
CmdUserName: config.SlurmClientConf.CmdUserName,
Password: config.SlurmClientConf.Password,
Token: config.SlurmClientConf.Token,
URL: config.SlurmClientConf.URL,
AdaptMode: config.SlurmClientConf.AdaptMode,
ClientVersion: config.SlurmClientConf.ClientVersion,
})
ParateraClient, _ = paratera.NewClient(paratera.ClientOptions{
Url: config.ParateraClientConf.Url,
TokenType: config.ParateraClientConf.TokenType,
ThirdParty: config.ParateraClientConf.ThirdParty,
Phone: config.ParateraClientConf.Phone,
Password: config.ParateraClientConf.Password,
})
AcClient, _ = ac.NewClient(ac.ClientOptions{
ClusterUrl: config.ACClientConf.ClusterUrl,
TokenUrl: config.ACClientConf.TokenUrl,
StateUrl: config.ACClientConf.StateUrl,
User: config.ACClientConf.User,
Password: config.ACClientConf.Password,
OrgId: config.ACClientConf.OrgId,
EndPoint: config.ACClientConf.EndPoint,
Token: config.ACClientConf.Token,
ClusterID: config.ACClientConf.ClusterID,
BaseEndpoint: config.ACClientConf.BaseEndpoint,
})
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetQueryParam("adapterId", strconv.FormatInt(AdapterId, 10)).
Get(CoreUrl + "/pcm/v1/core/pullTaskInfo")
opt := coreClient.Options{
Url: config.CoreClientConf.Url,
DataSource: config.CoreClientConf.DataSource,
}
coreCli, _ := coreClient.NewClient(opt)
taskOpt := coreClient.TaskOptions{}
coreTask, _ := coreCli.Task(taskOpt)
var oldHpcList []*coreClient.HpcInfo
adapterId := config.AdapterId
// 查询core端分发下来的任务列表
pullReq := coreClient.PullTaskInfoReq{
AdapterId: int64(adapterId),
}
var resp coreClient.PullTaskInfoResp
json.Unmarshal(result.Body(), &resp)
taskList, err := coreTask.PullTaskInfo(pullReq)
if resp.HpcInfoList != nil && len(resp.HpcInfoList) != 0 {
var oldHpcInfoList []coreClient.HpcInfo
if taskList != nil && len(taskList.HpcInfoList) != 0 {
//tool.Convert(&taskList.HpcInfoList, &oldCloudList)
if err != nil {
return
}
// 遍历执行任务操作
for index, _ := range taskList.HpcInfoList {
// 删除任务
if taskList.HpcInfoList[index].Status == "WaitDelete" {
}
// 执行任务
if taskList.HpcInfoList[index].Status == "Saved" {
switch taskList.HpcInfoList[index].ClusterType {
for _, hpcInfo := range resp.HpcInfoList {
// submit the saved task
if hpcInfo.Status == "Saved" {
switch hpcInfo.ClusterType {
case "ac":
jober, _ := AcClient.Job(ac.JobOptions{})
submitReq := ac.SubmitJobReq{
Apptype: taskList.HpcInfoList[index].AppType,
Appname: taskList.HpcInfoList[index].AppName,
StrJobManagerID: 1637920656,
MapAppJobInfo: &ac.MapAppJobInfo{
GapCmdFile: taskList.HpcInfoList[index].CmdScript,
GapNnode: taskList.HpcInfoList[index].NNode,
GapSubmitType: taskList.HpcInfoList[index].SubmitType,
GapJobName: taskList.HpcInfoList[index].Name,
GapWorkDir: taskList.HpcInfoList[index].WorkDir,
GapQueue: taskList.HpcInfoList[index].Queue,
GapWallTime: taskList.HpcInfoList[index].WallTime,
GapAppname: taskList.HpcInfoList[index].AppName,
GapStdOutFile: taskList.HpcInfoList[index].StdOutFile,
GapStdErrFile: taskList.HpcInfoList[index].StdErrFile,
GapCmdFile: hpcInfo.CmdScript,
GapNnode: hpcInfo.NNode,
GapSubmitType: hpcInfo.SubmitType,
GapJobName: hpcInfo.Name,
GapWorkDir: hpcInfo.WorkDir,
GapQueue: hpcInfo.Queue,
GapWallTime: hpcInfo.WallTime,
GapAppType: hpcInfo.AppType,
GapNdcu: strconv.Itoa(1),
GapAppName: hpcInfo.AppName,
GapStdOutFile: hpcInfo.StdOutFile,
GapStdErrFile: hpcInfo.StdErrFile,
},
}
jober.SubmitJob(submitReq)
respAC := jober.SubmitJob(submitReq)
hpcInfo.JobId = respAC.Data
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
hpcInfo.Status = "Issued"
case "slurm":
jober, _ := SlurmClient.Job(slurm.JobOptions{})
submitReq := slurm.JobOptions{
Script: taskList.HpcInfoList[index].CmdScript,
Script: hpcInfo.CmdScript,
Job: &slurm.JobProperties{
Account: taskList.HpcInfoList[index].Account,
Name: taskList.HpcInfoList[index].Name,
Nodes: make([]uint32, taskList.HpcInfoList[index].AllocNodes),
CurrentWorkingDirectory: taskList.HpcInfoList[index].WorkDir,
StandardOutput: taskList.HpcInfoList[index].StdOutFile,
StandardError: taskList.HpcInfoList[index].StdErrFile,
Account: hpcInfo.Account,
Name: hpcInfo.Name,
NTasks: 1,
//Nodes: make([]uint32, hpcInfo.AllocNodes),
CurrentWorkingDirectory: hpcInfo.WorkDir,
StandardOutput: hpcInfo.StdOutFile,
StandardError: hpcInfo.StdErrFile,
StandardInput: hpcInfo.StdInput,
//todo
Environment: map[string]string{"PATH": "/bin:/usr/bin/:/usr/local/bin/",
"LD_LIBRARY_PATH": "/lib/:/lib64/:/usr/local/lib"},
},
Jobs: nil,
}
jober.SubmitJob(submitReq)
submitJobResp := jober.SubmitJob(submitReq)
hpcInfo.JobId = strconv.Itoa(submitJobResp.JobId)
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
hpcInfo.Status = "Issued"
case "paratera":
jober, _ := ParateraClient.Job(paratera.JobOptions{})
sbs := paratera.Sbs{
JobGroupName: taskList.HpcInfoList[index].Queue,
JobName: taskList.HpcInfoList[index].Name,
JobGroupName: hpcInfo.AppName,
JobName: hpcInfo.Name,
SubmitProfile: paratera.SubmitProfile{
BootScript: taskList.HpcInfoList[index].CmdScript,
WorkingDir: taskList.HpcInfoList[index].CmdScript,
BootScript: hpcInfo.CmdScript,
WorkingDir: hpcInfo.WorkDir,
},
}
jober.SubmitSbsJob(sbs)
sbsResp := jober.SubmitSbsJob(sbs)
hpcInfo.JobId = sbsResp.Jid
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
hpcInfo.Status = "Issued"
}
} else { //if state is not "saved" ,then get state from participant and sync to core
switch hpcInfo.ClusterType {
case "ac":
jober, _ := AcClient.Job(ac.JobOptions{})
resp := jober.GetJob(ac.GetJobReq{JobId: hpcInfo.JobId})
hpcInfo.Status = resp.Data.List[0].JobStatus
case "slurm":
jober, _ := SlurmClient.Job(slurm.JobOptions{})
resp := jober.GetJob(slurm.GetJobReq{JobId: hpcInfo.JobId})
hpcInfo.Status = resp.Jobs[0].JobState
case "paratera":
jober, _ := ParateraClient.Job(paratera.JobOptions{})
resp := jober.GetJob(paratera.GetJobReq{JobId: hpcInfo.JobId})
hpcInfo.Status = resp.Status
}
}
}
// 同步信息到core端
// push submitted mark to coordinator
PushReq := coreClient.PushTaskInfoReq{
AdapterId: int64(adapterId),
HpcInfoList: make([]*coreClient.HpcInfo, 0),
AdapterId: AdapterId,
HpcInfoList: resp.HpcInfoList,
}
for _, newHpc := range taskList.HpcInfoList {
for _, oldHpc := range oldHpcList {
if oldHpc.JobId == newHpc.JobId && oldHpc.Status != newHpc.Status {
PushReq.HpcInfoList = append(PushReq.HpcInfoList, newHpc)
}
}
}
if len(PushReq.HpcInfoList) != 0 {
_, err := coreTask.PushTaskInfo(PushReq)
if err != nil {
return
}
url := CoreUrl + "/pcm/v1/core/pushTaskInfo"
method := "POST"
jsonStr, _ := json.Marshal(PushReq)
payload := strings.NewReader(string(jsonStr))
client := &http.Client{}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
client.Do(req)
}
}
}
func PushTaskInfo() {
httpClient := resty.New().R()
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetQueryParam("adapterId", strconv.FormatInt(AdapterId, 10)).
Get(CoreUrl + "/pcm/v1/core/pullTaskInfo")
var resp coreClient.PullTaskInfoResp
json.Unmarshal(result.Body(), &resp)
}

33
cron/types.go Normal file
View File

@ -0,0 +1,33 @@
package cron
type ClusterResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
List []struct {
Id string `json:"id"`
AdapterId string `json:"adapterId"`
Name string `json:"name"`
Nickname string `json:"nickname"`
Description string `json:"description"`
Server string `json:"server"`
MonitorServer string `json:"monitorServer,omitempty"`
Label string `json:"label"`
AuthType string `json:"authType"`
CreateTime string `json:"createTime"`
Token string `json:"token,omitempty"`
Region string `json:"region,omitempty"`
ProjectId string `json:"projectId,omitempty"`
Version string `json:"version,omitempty"`
OwnerId string `json:"ownerId,omitempty"`
ProducerDict string `json:"producerDict,omitempty"`
RegionDict string `json:"regionDict,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
} `json:"list"`
Total int `json:"total"`
PageNum int `json:"pageNum"`
PageSize int `json:"pageSize"`
} `json:"data"`
TraceId string `json:"traceId"`
}

View File

@ -1,43 +1,3 @@
AdapterId: 1706858330967773111
CoreUrl: "http://127.0.0.1:8999"
Adapter:
Name: ali
Address: http://121.89.194.135:6443
RpcAddress: pcm-participant-kubernetes-service:2003
Type: "CLOUD"
TenantId: 3
TenantName: ali
Labels: { "cloud": "ali" }
MetricsUrl: http://121.89.194.135:31965
CoreClientConf:
Url: "localhost:8999"
DataSource: ""
ACClientConf:
ClusterUrl: ""
TokenUrl: ""
StateUrl: ""
User: ""
Password: ""
OrgId: ""
EndPoint: ""
Token: ""
ClusterID: ""
BaseEndpoint: ""
SlurmClientConf:
RestUserName: "slurmrestd"
CmdUserName: "root"
Password: ""
Token: ""
URL: ""
AdaptMode: "rest"
ClientVersion: "v0.0.38"
ParateraClientConf:
Url: "https://cloud.paratera.com"
TokenType: "TOKEN"
ThirdParty: "NMPHONE"
Phone: "13278887558"
Password: ""

138
main.go
View File

@ -1,19 +1,91 @@
package main
import (
"fmt"
"encoding/json"
"github.com/go-resty/resty/v2"
"github.com/robfig/cron"
taskcron "gitlink.org.cn/JointCloud/pcm-hpc/cron"
"gitlink.org.cn/JointCloud/pcm-hpc/ac"
confHpc "gitlink.org.cn/JointCloud/pcm-hpc/config"
cronPCM "gitlink.org.cn/JointCloud/pcm-hpc/cron"
"gitlink.org.cn/JointCloud/pcm-hpc/paratera"
"time"
"gitlink.org.cn/JointCloud/pcm-hpc/slurm"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"strconv"
)
func init() {
//load yaml file to get coreUrl and adapterId
yamlFile, err := ioutil.ReadFile("etc/hpc.yaml")
if err != nil {
log.Fatalf("error: %v", err)
}
var config confHpc.Config
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
log.Fatalf("error: %v", err)
}
httpClient := resty.New().R()
params := map[string]string{
"type": "2",
"pageNum": "1",
"pageSize": "10",
"adapterId": config.AdapterId,
}
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetQueryParams(params).Get(config.CoreUrl + "/pcm/v1/adapter/cluster/list")
cronPCM.CoreUrl = config.CoreUrl
adapterId, _ := strconv.ParseInt(config.AdapterId, 10, 64)
cronPCM.AdapterId = adapterId
var resp cronPCM.ClusterResp
json.Unmarshal(result.Body(), &resp)
for _, cluster := range resp.Data.List {
switch cluster.Label {
case "slurm":
cronPCM.SlurmClient, _ = slurm.NewClient(slurm.ClientOptions{
RestUserName: cluster.Username,
CmdUserName: cluster.Username,
Password: cluster.Password,
Token: cluster.Token,
URL: cluster.Server,
AdaptMode: cluster.AuthType,
ClientVersion: cluster.Version,
})
case "ac":
cronPCM.AcClient, _ = ac.NewClient(ac.ClientOptions{
ClusterUrl: "https://wuzh02.hpccube.com:65051/hpc/openapi/v2/cluster",
TokenUrl: "https://ac.sugon.com/ac/openapi/v2/tokens",
StateUrl: "https://ac.sugon.com/ac/openapi/v2/tokens/state",
User: cluster.Username,
Password: cluster.Password,
OrgId: "c8befbc1301665ba2dc5b2826f8dca1e",
EndPoint: "https://api01.hpccube.com:65106",
Token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjb21wdXRlVXNlciI6ImFjZ25ubWZid28iLCJhY2NvdW50U3RhdHVzIjoiVHJpYWwiLCJjcmVhdG9yIjoiYWMiLCJyb2xlIjoiMSIsImV4cGlyZVRpbWUiOiIxNzEwOTkwNjUyNzYwIiwiY2x1c3RlcklkIjoiMTEyNzYiLCJpbnZva2VyIjoiYzhiZWZiYzEzMDE2NjViYTJkYzViMjgyNmY4ZGNhMWUiLCJ1c2VyIjoiYWNnbm5tZmJ3byIsInVzZXJJZCI6IjExNzkxMTQ4MjAwIn0.OKASpJvVetAqhEQPhxcpDEeUbir5bJjCYDrPBbPyXbs",
ClusterID: cluster.AdapterId,
BaseEndpoint: "https://wuzh02.hpccube.com:65051"})
case "paratera":
cronPCM.ParateraClient, _ = paratera.NewClient(paratera.ClientOptions{
Url: cluster.Server,
TokenType: "TOKEN",
ThirdParty: cluster.ProjectId,
Phone: cluster.Username,
Password: cluster.Password})
}
}
}
func main() {
c := cron.New()
// pull task list from coordinator
c.AddFunc("*/5 * * * * ?", func() {
taskcron.PullTaskInfo()
cronPCM.PullTaskInfo()
})
// 启动定时任务
c.Start()
@ -49,28 +121,28 @@ func main() {
fmt.Printf("方法调用时间为 %d 毫秒\n", duration)
****/
start := time.Now().UnixNano() / int64(time.Millisecond)
cli, _ := paratera.NewClient(paratera.ClientOptions{
Url: "https://cloud.paratera.com",
TokenType: "TOKEN",
ThirdParty: "NMPHONE",
Phone: "13278887558",
Password: "eb78bbb29cfb2b31a54751873c3dab5f",
})
job, _ := cli.Job(paratera.JobOptions{})
//sbs job submit
sbs := paratera.Sbs{
JobGroupName: "vasp",
JobName: "sbsjob",
SubmitProfile: paratera.SubmitProfile{
BootScript: "jobsubmit.sh",
WorkingDir: "prn:pcs:sftp:CSTC9:CSTC9:sc30314:file:/public4/home/sc56516/jobs/VNCView_20240203180350",
},
}
result := job.SubmitSbsJob(sbs)
//start := time.Now().UnixNano() / int64(time.Millisecond)
//cli, _ := paratera.NewClient(paratera.ClientOptions{
// Url: "https://cloud.paratera.com",
// TokenType: "TOKEN",
// ThirdParty: "NMPHONE",
// Phone: "13278887558",
// Password: "eb78bbb29cfb2b31a54751873c3dab5f",
//})
//
//job, _ := cli.Job(paratera.JobOptions{})
//
////sbs job submit
//sbs := paratera.Sbs{
// JobGroupName: "vasp",
// JobName: "sbsjob",
// SubmitProfile: paratera.SubmitProfile{
// BootScript: "jobsubmit.sh",
// WorkingDir: "prn:pcs:sftp:CSTC9:CSTC9:sc30314:file:/public4/home/sc56516/jobs/VNCView_20240203180350",
// },
//}
//
//result := job.SubmitSbsJob(sbs)
//abs job submit
/*submitProfiles := make([]paratera.SubmitProfiles, 0)
@ -95,11 +167,11 @@ func main() {
SubmitProfiles: submitProfiles,
}
result := job.SubmitQueueAbsJob(queueAbs)*/
str := fmt.Sprintf("%v", result)
fmt.Println(str)
end := time.Now().UnixNano() / int64(time.Millisecond)
duration := end - start
fmt.Printf("方法调用时间为 %d 毫秒\n", duration)
//
//str := fmt.Sprintf("%v", result)
//fmt.Println(str)
//end := time.Now().UnixNano() / int64(time.Millisecond)
//
//duration := end - start
//fmt.Printf("方法调用时间为 %d 毫秒\n", duration)
}

View File

@ -19,6 +19,35 @@ type SubmitProfile struct {
WorkingDir string `json:"workingDir"`
}
type SbsResp struct {
Jid string `json:"jid"`
}
type GetJobResp struct {
Jid string `json:"jid"`
AppCode string `json:"appCode"`
Version string `json:"version"`
JobName string `json:"jobName"`
Uid string `json:"uid"`
JobUser string `json:"jobUser"`
Zone string `json:"zone"`
Cluster string `json:"cluster"`
RawJobId int `json:"rawJobId"`
Status string `json:"status"`
Slots interface{} `json:"slots"`
Gpu interface{} `json:"gpu"`
Queue interface{} `json:"queue"`
Walltime interface{} `json:"walltime"`
StartTime interface{} `json:"startTime"`
EndTime interface{} `json:"endTime"`
ExpectedEndTime interface{} `json:"expectedEndTime"`
Model string `json:"model"`
Nodes interface{} `json:"nodes"`
SubmitTime int64 `json:"submitTime"`
DeleteStatus int `json:"deleteStatus"`
OutPutPath interface{} `json:"outPutPath"`
LogPath interface{} `json:"logPath"`
}
// QueueAbs 模板提交任务(限定队列)
type QueueAbs struct {
App string `json:"app"`
@ -59,7 +88,7 @@ type ListJobReq struct {
// GetJobReq 作业详情请求体
type GetJobReq struct {
jobId string //作业id
JobId string //作业id
}
// CancelJobReq 作业取消请求体
@ -69,8 +98,8 @@ type CancelJobReq struct {
type Job interface {
ListJob(listJobReq ListJobReq) string
GetJob(getJobReq GetJobReq) string
SubmitSbsJob(sbs Sbs) string
GetJob(getJobReq GetJobReq) (getJobResp GetJobResp)
SubmitSbsJob(sbs Sbs) (sbsResp SbsResp)
SubmitAbsJob(abs Abs) string
SubmitQueueAbsJob(abs QueueAbs) string
CancelJob(cancelJobReq CancelJobReq) string

View File

@ -1,6 +1,7 @@
package paratera
import (
"encoding/json"
"github.com/go-resty/resty/v2"
"log"
"strconv"
@ -43,20 +44,27 @@ func (j *job) ListJob(listJobReq ListJobReq) string {
return result.String()
}
func (j *job) GetJob(getJobReq GetJobReq) string {
//TODO implement me
panic("implement me")
}
func (j *job) SubmitSbsJob(sbs Sbs) string {
func (j *job) GetJob(getJobReq GetJobReq) (getJobResp GetJobResp) {
httpClient := resty.New().R()
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetHeader("para_token", j.client.token).
Get(j.client.url + "/api/pcs/jobs/" + getJobReq.JobId)
json.Unmarshal(result.Body(), &getJobResp)
return getJobResp
}
func (j *job) SubmitSbsJob(sbs Sbs) (sbsResp SbsResp) {
httpClient := resty.New().R()
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetHeader("para_token", j.client.token).
SetHeader("X-SLURM-USER-TOKEN", j.client.token).
SetBody(sbs).
Post(j.client.url + "/api/pcs/jobs/sbs")
return result.String()
json.Unmarshal(result.Body(), &sbsResp)
return sbsResp
}
func (j *job) SubmitAbsJob(abs Abs) string {

View File

@ -1,87 +1,235 @@
package slurm
type JobOptions struct {
Script string
Job *JobProperties
Jobs *JobProperties
Script string `json:"script"`
Job *JobProperties `json:"job"`
}
type JobProperties struct {
Account string
AccountGatherFrequency string
Argv []string
Array string
BatchFeatures string
BeginTime uint32
BurstBuffer string
ClusterConstraint string
Comment string
Constraints string
CoreSpecification uint32
CoresPerSocket uint32
CpuBinding string
CpuBindingHint string
CpuFrequency string
CpusPerGpu string
CpusPerTask uint32
CurrentWorkingDirectory string
Deadline string
DelayBoot uint32
Dependency string
Distribution string
Environment map[string]string
Exclusive string
GetUserEnvironment bool
Gres string
GresFlags string
GpuBinding string
GpuFrequency string
Gpus string
GpusPerNode string
GpusPerSocket string
GpusPerTask string
Hold bool
KillOnInvalidDependency bool
Licenses string
MailType string
MailUser string
McsLabel string
MemoryBinding string
MemoryPerCpu uint32
MemoryPerGpu uint32
MemoryPerNode uint32
MinimumCpusPerNode uint32
MinimumNodes bool
Name string
Nice string
NoKill bool
Nodes []uint32
OpenMode string
Partition string
Priority string
Qos string
Requeue bool
Reservation string
Signal string
SocketsPerNode uint32
SpreadJob bool
StandardError string
StandardInput string
StandardOutput string
Tasks uint32
TasksPerCore uint32
TasksPerNode uint32
TasksPerSocket uint32
ThreadSpecification uint32
ThreadsPerCore uint32
TimeLimit uint32
TimeMinimum uint32
WaitAllNodes bool
Wckey string
Account string `json:"account"`
//AccountGatherFrequency string `json:"account_gather_frequency"`
//Argv []string `json:"argv"`
//Array string `json:"array"`
//BatchFeatures string `json:"batch_features"`
//BeginTime uint32 `json:"begin_time"`
//BurstBuffer string `json:"burst_buffer"`
//ClusterConstraint string `json:"cluster_constraint"`
//Comment string `json:"comment"`
//Constraints string `json:"constraints"`
//CoreSpecification uint32 `json:"core_specification"`
//CoresPerSocket uint32 `json:"cores_per_socket"`
//CpuBinding string `json:"cpu_binding"`
//CpuBindingHint string `json:"cpu_binding_hint"`
//CpuFrequency string `json:"cpu_frequency"`
//CpusPerGpu string `json:"cpus_per_gpu"`
//CpusPerTask uint32 `json:"cpus_per_task"`
CurrentWorkingDirectory string `json:"current_working_directory"`
//Deadline string `json:"deadline"`
//DelayBoot uint32 `json:"delay_boot"`
//Dependency string `json:"dependency"`
//Distribution string `json:"distribution"`
Environment map[string]string `json:"environment"`
//Exclusive string `json:"exclusive"`
//GetUserEnvironment bool `json:"get_user_environment"`
//Gres string `json:"gres"`
//GresFlags string `json:"gres_flags"`
//GpuBinding string `json:"gpu_binding"`
//GpuFrequency string `json:"gpu_frequency"`
//Gpus string `json:"gpus"`
//GpusPerNode string `json:"gpus_per_node"`
//GpusPerSocket string `json:"gpus_per_socket"`
//GpusPerTask string `json:"gpus_per_task"`
//Hold bool `json:"hold"`
//KillOnInvalidDependency bool `json:"kill_on_invalid_dependency"`
//Licenses string `json:"licenses"`
//MailType string `json:"mail_type"`
//MailUser string `json:"mail_user"`
//McsLabel string `json:"mcs_label"`
//MemoryBinding string `json:"memory_binding"`
//MemoryPerCpu uint32 `json:"memory_per_cpu"`
//MemoryPerGpu uint32 `json:"memory_per_gpu"`
//MemoryPerNode uint32 `json:"memory_per_node"`
//MinimumCpusPerNode uint32 `json:"minimum_cpus_per_node"`
//MinimumNodes bool `json:"minimum_nodes"`
Name string `json:"name"`
//Nice string `json:"nice"`
//NoKill bool `json:"no_kill"`
//Nodes []uint32 `json:"nodes"`
//OpenMode string `json:"open_mode"`
//Partition string `json:"partition"`
//Priority string `json:"priority"`
//Qos string `json:"qos"`
//Requeue bool `json:"requeue"`
//Reservation string `json:"reservation"`
//Signal string `json:"signal"`
//SocketsPerNode uint32 `json:"sockets_per_node"`
//SpreadJob bool `json:"spread_job"`
StandardError string `json:"standard_error"`
StandardInput string `json:"standard_input"`
StandardOutput string `json:"standard_output"`
NTasks uint32 `json:"ntasks"`
//TasksPerCore uint32 `json:"tasks_per_core"`
//TasksPerNode uint32 `json:"tasks_per_node"`
//TasksPerSocket uint32 `json:"tasks_per_socket"`
//ThreadSpecification uint32 `json:"thread_specification"`
//ThreadsPerCore uint32 `json:"threads_per_core"`
//TimeLimit uint32 `json:"time_limit"`
//TimeMinimum uint32 `json:"time_minimum"`
//WaitAllNodes bool `json:"wait_all_nodes"`
//Wckey string `json:"wckey"`
}
type SubmitJobResp struct {
Meta struct {
Plugin struct {
Type string `json:"type"`
Name string `json:"name"`
} `json:"plugin"`
Slurm struct {
Version struct {
Major int `json:"major"`
Micro int `json:"micro"`
Minor int `json:"minor"`
} `json:"version"`
Release string `json:"release"`
} `json:"Slurm"`
} `json:"meta"`
Errors []interface{} `json:"errors"`
JobId int `json:"job_id"`
StepId string `json:"step_id"`
JobSubmitUserMsg string `json:"job_submit_user_msg"`
}
type GetJobReq struct {
JobId string `json:"job_id"`
}
type GetJobResp struct {
Errors []struct {
Error string `json:"error"`
Errno int `json:"errno"`
} `json:"errors"`
Jobs []struct {
Account string `json:"account"`
AccrueTime int64 `json:"accrue_time"`
AdminComment string `json:"admin_comment"`
ArrayJobId string `json:"array_job_id"`
ArrayTaskId string `json:"array_task_id"`
ArrayMaxTasks string `json:"array_max_tasks"`
ArrayTaskString string `json:"array_task_string"`
AssociationId string `json:"association_id"`
BatchFeatures string `json:"batch_features"`
BatchFlag bool `json:"batch_flag"`
BatchHost string `json:"batch_host"`
Flags []string `json:"flags"`
BurstBuffer string `json:"burst_buffer"`
BurstBufferState string `json:"burst_buffer_state"`
Cluster string `json:"cluster"`
ClusterFeatures string `json:"cluster_features"`
Command string `json:"command"`
Comment string `json:"comment"`
Contiguous bool `json:"contiguous"`
CoreSpec string `json:"core_spec"`
ThreadSpec string `json:"thread_spec"`
CoresPerSocket string `json:"cores_per_socket"`
BillableTres string `json:"billable_tres"`
CpusPerTask string `json:"cpus_per_task"`
CpuFrequencyMinimum string `json:"cpu_frequency_minimum"`
CpuFrequencyMaximum string `json:"cpu_frequency_maximum"`
CpuFrequencyGovernor string `json:"cpu_frequency_governor"`
CpusPerTres string `json:"cpus_per_tres"`
Deadline string `json:"deadline"`
DelayBoot string `json:"delay_boot"`
Dependency string `json:"dependency"`
DerivedExitCode string `json:"derived_exit_code"`
EligibleTime int64 `json:"eligible_time"`
EndTime int64 `json:"end_time"`
ExcludedNodes string `json:"excluded_nodes"`
ExitCode int `json:"exit_code"`
Features string `json:"features"`
FederationOrigin string `json:"federation_origin"`
FederationSiblingsActive string `json:"federation_siblings_active"`
FederationSiblingsViable string `json:"federation_siblings_viable"`
GresDetail []string `json:"gres_detail"`
GroupId string `json:"group_id"`
JobId string `json:"job_id"`
JobResources struct {
Nodes string `json:"nodes"`
AllocatedCpus int `json:"allocated_cpus"`
AllocatedHosts int `json:"allocated_hosts"`
AllocatedNodes []struct {
Memory int `json:"memory"`
} `json:"allocated_nodes"`
} `json:"job_resources"`
JobState string `json:"job_state"`
LastSchedEvaluation string `json:"last_sched_evaluation"`
Licenses string `json:"licenses"`
MaxCpus string `json:"max_cpus"`
MaxNodes string `json:"max_nodes"`
McsLabel string `json:"mcs_label"`
MemoryPerTres string `json:"memory_per_tres"`
Name string `json:"name"`
Nodes string `json:"nodes"`
Nice string `json:"nice"`
TasksPerCore string `json:"tasks_per_core"`
TasksPerSocket string `json:"tasks_per_socket"`
TasksPerBoard string `json:"tasks_per_board"`
Cpus string `json:"cpus"`
NodeCount string `json:"node_count"`
Tasks string `json:"tasks"`
HetJobId string `json:"het_job_id"`
HetJobIdSet string `json:"het_job_id_set"`
HetJobOffset string `json:"het_job_offset"`
Partition string `json:"partition"`
MemoryPerNode string `json:"memory_per_node"`
MemoryPerCpu string `json:"memory_per_cpu"`
MinimumCpusPerNode string `json:"minimum_cpus_per_node"`
MinimumTmpDiskPerNode string `json:"minimum_tmp_disk_per_node"`
PreemptTime int64 `json:"preempt_time"`
PreSusTime int64 `json:"pre_sus_time"`
Priority string `json:"priority"`
Profile []string `json:"profile"`
Qos string `json:"qos"`
Reboot bool `json:"reboot"`
RequiredNodes string `json:"required_nodes"`
Requeue bool `json:"requeue"`
ResizeTime int64 `json:"resize_time"`
RestartCnt string `json:"restart_cnt"`
ResvName string `json:"resv_name"`
Shared string `json:"shared"`
ShowFlags []string `json:"show_flags"`
SocketsPerBoard string `json:"sockets_per_board"`
SocketsPerNode string `json:"sockets_per_node"`
StartTime int64 `json:"start_time"`
StateDescription string `json:"state_description"`
StateReason string `json:"state_reason"`
StandardError string `json:"standard_error"`
StandardInput string `json:"standard_input"`
StandardOutput string `json:"standard_output"`
SubmitTime int64 `json:"submit_time"`
SuspendTime int64 `json:"suspend_time"`
SystemComment string `json:"system_comment"`
TimeLimit string `json:"time_limit"`
TimeMinimum string `json:"time_minimum"`
ThreadsPerCore string `json:"threads_per_core"`
TresBind string `json:"tres_bind"`
TresFreq string `json:"tres_freq"`
TresPerJob string `json:"tres_per_job"`
TresPerNode string `json:"tres_per_node"`
TresPerSocket string `json:"tres_per_socket"`
TresPerTask string `json:"tres_per_task"`
TresReqStr string `json:"tres_req_str"`
TresAllocStr string `json:"tres_alloc_str"`
UserId string `json:"user_id"`
UserName string `json:"user_name"`
Wckey string `json:"wckey"`
CurrentWorkingDirectory string `json:"current_working_directory"`
} `json:"jobs"`
}
type Job interface {
GetJob(getJobReq GetJobReq) (getJobResp GetJobResp)
ListJob() string
ListDbJob() string
SubmitJob(options JobOptions) string
SubmitJob(options JobOptions) (submitJobResp SubmitJobResp)
}

View File

@ -1,8 +1,13 @@
package slurm
import (
"encoding/json"
"github.com/go-resty/resty/v2"
"io"
"io/ioutil"
"log"
"net/http"
"strings"
"sync"
)
@ -31,7 +36,17 @@ func (j *job) ListJob() string {
Get(j.client.url + "/slurm/" + j.client.clientVersion + "/jobs")
return result.String()
}
func (j *job) GetJob(getJobReq GetJobReq) (getJobResp GetJobResp) {
httpClient := resty.New().R()
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetHeader("X-SLURM-USER-NAME", j.client.restUsername).
SetHeader("X-SLURM-USER-TOKEN", j.client.token).
Get(j.client.url + "/slurm/" + j.client.clientVersion + "/job/" + getJobReq.JobId)
json.Unmarshal(result.Body(), &getJobResp)
return getJobResp
}
func (j *job) ListDbJob() string {
httpClient := resty.New().R()
@ -42,14 +57,42 @@ func (j *job) ListDbJob() string {
return result.String()
}
func (j *job) SubmitJob(jobOptions JobOptions) string {
func (j *job) SubmitJob(jobOptions JobOptions) (submitJobResp SubmitJobResp) {
httpClient := resty.New().R()
url := j.client.url + "/slurm/" + j.client.clientVersion + "/job/submit"
result, _ := httpClient.SetHeader("Content-Type", "application/json").
SetHeader("X-SLURM-USER-NAME", j.client.restUsername).
SetHeader("X-SLURM-USER-TOKEN", j.client.token).
SetBody(jobOptions).
Post(url)
return result.String()
method := "POST"
jsonStr, err := json.Marshal(jobOptions)
payload := strings.NewReader(string(jsonStr))
client := &http.Client{}
req, err := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("X-SLURM-USER-NAME", j.client.restUsername)
req.Header.Add("X-SLURM-USER-TOKEN", j.client.token)
res, _ := client.Do(req)
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
}
}(res.Body)
body, _ := ioutil.ReadAll(res.Body)
err = json.Unmarshal(body, &submitJobResp)
if err != nil {
return SubmitJobResp{}
}
return submitJobResp
//httpClient := resty.New().R()
//
//
//result, _ := httpClient.SetHeader("Content-Type", "application/json").
// SetHeader("X-SLURM-USER-NAME", j.client.restUsername).
// SetHeader("X-SLURM-USER-TOKEN", j.client.token).
// SetBody(jobOptions).
// Post(url)
//return result.String()
}