pcm-hpc/cron/taskcron.go

165 lines
5.3 KiB
Go

package cron
import (
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-hpc/ac"
"gitlink.org.cn/JointCloud/pcm-hpc/config"
"gitlink.org.cn/JointCloud/pcm-hpc/paratera"
"gitlink.org.cn/JointCloud/pcm-hpc/slurm"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
)
var SlurmClient slurm.Client
var ParateraClient paratera.Client
var AcClient ac.Client
func PullTaskInfo() {
yamlFile, err := ioutil.ReadFile("etc/hpc.yaml")
if err != nil {
log.Fatalf("error: %v", err)
}
var config config.Config
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
log.Fatalf("error: %v", err)
}
SlurmClient, _ = slurm.NewClient(slurm.ClientOptions{
RestUserName: config.SlurmClientConf.RestUserName,
CmdUserName: config.SlurmClientConf.CmdUserName,
Password: config.SlurmClientConf.Password,
Token: config.SlurmClientConf.Token,
URL: config.SlurmClientConf.URL,
AdaptMode: config.SlurmClientConf.AdaptMode,
ClientVersion: config.SlurmClientConf.ClientVersion,
})
ParateraClient, _ = paratera.NewClient(paratera.ClientOptions{
Url: config.ParateraClientConf.Url,
TokenType: config.ParateraClientConf.TokenType,
ThirdParty: config.ParateraClientConf.ThirdParty,
Phone: config.ParateraClientConf.Phone,
Password: config.ParateraClientConf.Password,
})
AcClient, _ = ac.NewClient(ac.ClientOptions{
ClusterUrl: config.ACClientConf.ClusterUrl,
TokenUrl: config.ACClientConf.TokenUrl,
StateUrl: config.ACClientConf.StateUrl,
User: config.ACClientConf.User,
Password: config.ACClientConf.Password,
OrgId: config.ACClientConf.OrgId,
EndPoint: config.ACClientConf.EndPoint,
Token: config.ACClientConf.Token,
ClusterID: config.ACClientConf.ClusterID,
BaseEndpoint: config.ACClientConf.BaseEndpoint,
})
opt := coreClient.Options{
Url: config.CoreClientConf.Url,
DataSource: config.CoreClientConf.DataSource,
}
coreCli, _ := coreClient.NewClient(opt)
taskOpt := coreClient.TaskOptions{}
coreTask, _ := coreCli.Task(taskOpt)
var oldHpcList []*coreClient.HpcInfo
adapterId := config.AdapterId
// 查询core端分发下来的任务列表
pullReq := coreClient.PullTaskInfoReq{
AdapterId: int64(adapterId),
}
taskList, err := coreTask.PullTaskInfo(pullReq)
if taskList != nil && len(taskList.HpcInfoList) != 0 {
//tool.Convert(&taskList.HpcInfoList, &oldCloudList)
if err != nil {
return
}
// 遍历执行任务操作
for index, _ := range taskList.HpcInfoList {
// 删除任务
if taskList.HpcInfoList[index].Status == "WaitDelete" {
}
// 执行任务
if taskList.HpcInfoList[index].Status == "Saved" {
switch taskList.HpcInfoList[index].ClusterType {
case "ac":
jober, _ := AcClient.Job(ac.JobOptions{})
submitReq := ac.SubmitJobReq{
Apptype: taskList.HpcInfoList[index].AppType,
Appname: taskList.HpcInfoList[index].AppName,
MapAppJobInfo: &ac.MapAppJobInfo{
GapCmdFile: taskList.HpcInfoList[index].CmdScript,
GapNnode: taskList.HpcInfoList[index].NNode,
GapSubmitType: taskList.HpcInfoList[index].SubmitType,
GapJobName: taskList.HpcInfoList[index].Name,
GapWorkDir: taskList.HpcInfoList[index].WorkDir,
GapQueue: taskList.HpcInfoList[index].Queue,
GapWallTime: taskList.HpcInfoList[index].WallTime,
GapAppname: taskList.HpcInfoList[index].AppName,
GapStdOutFile: taskList.HpcInfoList[index].StdOutFile,
GapStdErrFile: taskList.HpcInfoList[index].StdErrFile,
},
}
jober.SubmitJob(submitReq)
case "slurm":
jober, _ := SlurmClient.Job(slurm.JobOptions{})
submitReq := slurm.JobOptions{
Script: taskList.HpcInfoList[index].CmdScript,
Job: &slurm.JobProperties{
Account: taskList.HpcInfoList[index].Account,
Name: taskList.HpcInfoList[index].Name,
Nodes: make([]uint32, taskList.HpcInfoList[index].AllocNodes),
CurrentWorkingDirectory: taskList.HpcInfoList[index].WorkDir,
StandardOutput: taskList.HpcInfoList[index].StdOutFile,
StandardError: taskList.HpcInfoList[index].StdErrFile,
//todo
Environment: map[string]string{"PATH": "/bin:/usr/bin/:/usr/local/bin/",
"LD_LIBRARY_PATH": "/lib/:/lib64/:/usr/local/lib"},
},
Jobs: nil,
}
jober.SubmitJob(submitReq)
case "paratera":
jober, _ := ParateraClient.Job(paratera.JobOptions{})
sbs := paratera.Sbs{
JobGroupName: taskList.HpcInfoList[index].Queue,
JobName: taskList.HpcInfoList[index].Name,
SubmitProfile: paratera.SubmitProfile{
BootScript: taskList.HpcInfoList[index].CmdScript,
WorkingDir: taskList.HpcInfoList[index].CmdScript,
},
}
jober.SubmitSbsJob(sbs)
}
}
}
// 同步信息到core端
PushReq := coreClient.PushTaskInfoReq{
AdapterId: int64(adapterId),
HpcInfoList: make([]*coreClient.HpcInfo, 0),
}
for _, newHpc := range taskList.HpcInfoList {
for _, oldHpc := range oldHpcList {
if oldHpc.JobId == newHpc.JobId && oldHpc.Status != newHpc.Status {
PushReq.HpcInfoList = append(PushReq.HpcInfoList, newHpc)
}
}
}
if len(PushReq.HpcInfoList) != 0 {
_, err := coreTask.PushTaskInfo(PushReq)
if err != nil {
return
}
}
}
}