forked from JointCloud/pcm-coordinator
591 lines
17 KiB
Go
591 lines
17 KiB
Go
package cron
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"github.com/zeromicro/go-zero/zrpc"
|
|
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
|
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
|
|
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
|
|
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
OCTOPUS = "octopus"
|
|
MODELARTS = "modelarts"
|
|
SHUGUANGAI = "shuguangAi"
|
|
)
|
|
|
|
func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
|
|
limit := 10
|
|
offset := 0
|
|
var list []*types.TaskModel
|
|
db := svc.DbEngin.Model(&types.TaskModel{}).Table("task")
|
|
|
|
db = db.Where("deleted_at is null")
|
|
|
|
//count total
|
|
var total int64
|
|
err := db.Count(&total).Error
|
|
db.Limit(limit).Offset(offset)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = db.Order("created_time desc").Find(&list).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return list, nil
|
|
}
|
|
|
|
func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
|
|
list := make([]*types.TaskModel, len(tasklist))
|
|
copy(list, tasklist)
|
|
for i := len(list) - 1; i >= 0; i-- {
|
|
if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
|
|
list = append(list[:i], list[i+1:]...)
|
|
}
|
|
}
|
|
|
|
if len(list) == 0 {
|
|
return
|
|
}
|
|
|
|
task := list[0]
|
|
for i := range list {
|
|
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
|
|
latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
|
|
if latest.Before(earliest) {
|
|
task = list[i]
|
|
}
|
|
}
|
|
|
|
var aiTaskList []*models.TaskAi
|
|
tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
|
|
if len(aiTaskList) == 0 {
|
|
return
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for _, aitask := range aiTaskList {
|
|
t := aitask
|
|
if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
h := http.Request{}
|
|
trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId)
|
|
if err != nil {
|
|
if status.Code(err) == codes.DeadlineExceeded {
|
|
msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
|
|
logx.Errorf(errors.New(msg).Error())
|
|
wg.Done()
|
|
return
|
|
}
|
|
|
|
msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
|
|
logx.Errorf(errors.New(msg).Error())
|
|
wg.Done()
|
|
return
|
|
}
|
|
if trainingTask == nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
switch trainingTask.Status {
|
|
case constants.Running:
|
|
if t.Status != trainingTask.Status {
|
|
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
|
|
t.Status = trainingTask.Status
|
|
}
|
|
case constants.Failed:
|
|
if t.Status != trainingTask.Status {
|
|
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
|
|
t.Status = trainingTask.Status
|
|
}
|
|
case constants.Completed:
|
|
if t.Status != trainingTask.Status {
|
|
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
|
|
t.Status = trainingTask.Status
|
|
}
|
|
default:
|
|
if t.Status != trainingTask.Status {
|
|
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
|
|
t.Status = trainingTask.Status
|
|
}
|
|
}
|
|
t.StartTime = trainingTask.Start
|
|
t.EndTime = trainingTask.End
|
|
err = svc.Scheduler.AiStorages.UpdateAiTask(t)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())
|
|
logx.Errorf(errors.New(msg).Error())
|
|
wg.Done()
|
|
return
|
|
}
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
|
|
list := make([]*types.TaskModel, len(tasklist))
|
|
copy(list, tasklist)
|
|
for i := len(list) - 1; i >= 0; i-- {
|
|
if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
|
|
list = append(list[:i], list[i+1:]...)
|
|
}
|
|
}
|
|
|
|
if len(list) == 0 {
|
|
return
|
|
}
|
|
|
|
task := list[0]
|
|
for i := range list {
|
|
earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime)
|
|
latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime)
|
|
if latest.Before(earliest) {
|
|
task = list[i]
|
|
}
|
|
}
|
|
|
|
// Update Infer Task Status
|
|
if task.TaskTypeDict == "11" || task.TaskTypeDict == "12" {
|
|
UpdateInferTaskStatus(svc, task)
|
|
return
|
|
}
|
|
|
|
var aiTask []*models.TaskAi
|
|
tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
|
|
if len(aiTask) == 0 {
|
|
tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
if len(aiTask) == 1 {
|
|
if aiTask[0].Status == constants.Completed {
|
|
task.Status = constants.Succeeded
|
|
} else {
|
|
task.Status = aiTask[0].Status
|
|
}
|
|
task.StartTime = aiTask[0].StartTime
|
|
task.EndTime = aiTask[0].EndTime
|
|
task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
for i := len(aiTask) - 1; i >= 0; i-- {
|
|
if aiTask[i].StartTime == "" {
|
|
task.Status = aiTask[i].Status
|
|
aiTask = append(aiTask[:i], aiTask[i+1:]...)
|
|
}
|
|
}
|
|
|
|
if len(aiTask) == 0 {
|
|
task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
tx = svc.DbEngin.Table("task").Model(task).Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
|
|
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
|
|
|
|
var status string
|
|
var count int
|
|
for _, a := range aiTask {
|
|
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
|
|
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
|
|
|
|
if s.Before(start) {
|
|
start = s
|
|
}
|
|
|
|
if e.After(end) {
|
|
end = e
|
|
}
|
|
|
|
if a.Status == constants.Failed {
|
|
status = a.Status
|
|
break
|
|
}
|
|
|
|
if a.Status == constants.Pending {
|
|
status = a.Status
|
|
continue
|
|
}
|
|
|
|
if a.Status == constants.Running {
|
|
status = a.Status
|
|
continue
|
|
}
|
|
|
|
if a.Status == constants.Completed {
|
|
count++
|
|
continue
|
|
}
|
|
}
|
|
|
|
if count == len(aiTask) {
|
|
status = constants.Succeeded
|
|
}
|
|
|
|
if status != "" {
|
|
task.Status = status
|
|
task.StartTime = start.Format(constants.Layout)
|
|
task.EndTime = end.Format(constants.Layout)
|
|
}
|
|
|
|
task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
tx = svc.DbEngin.Table("task").Model(task).Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
|
|
var aiType = "1"
|
|
adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
|
|
logx.Errorf(errors.New(msg).Error())
|
|
return
|
|
}
|
|
if len(adapterIds) == 0 {
|
|
return
|
|
}
|
|
|
|
for _, id := range adapterIds {
|
|
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error())
|
|
logx.Errorf(errors.New(msg).Error())
|
|
return
|
|
}
|
|
if len(clusters.List) == 0 {
|
|
continue
|
|
}
|
|
if isAdapterExist(svc, id, len(clusters.List)) {
|
|
continue
|
|
} else {
|
|
if isAdapterEmpty(svc, id) {
|
|
exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List)
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
|
|
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
|
|
} else {
|
|
UpdateClusterMaps(svc, id, clusters.List)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) {
|
|
for _, c := range clusters {
|
|
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id]
|
|
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id]
|
|
if !ok && !ok2 {
|
|
switch c.Name {
|
|
case OCTOPUS:
|
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
|
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf))
|
|
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
|
|
case MODELARTS:
|
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
|
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf))
|
|
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf))
|
|
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
|
|
case SHUGUANGAI:
|
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
|
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf))
|
|
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
|
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
|
|
}
|
|
} else {
|
|
continue
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
|
|
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
|
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
|
if ok && ok2 {
|
|
if len(emap) == clusterNum && len(cmap) == clusterNum {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
|
|
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
|
|
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
|
|
if !ok && !ok2 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
|
|
executorMap := make(map[string]executor.AiExecutor)
|
|
collectorMap := make(map[string]collector.AiCollector)
|
|
for _, c := range clusters {
|
|
switch c.Name {
|
|
case OCTOPUS:
|
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
|
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
|
|
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
|
|
collectorMap[c.Id] = octopus
|
|
executorMap[c.Id] = octopus
|
|
case MODELARTS:
|
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
|
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
|
|
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
|
|
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
|
|
collectorMap[c.Id] = modelarts
|
|
executorMap[c.Id] = modelarts
|
|
case SHUGUANGAI:
|
|
id, _ := strconv.ParseInt(c.Id, 10, 64)
|
|
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
|
|
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
|
|
collectorMap[c.Id] = sgai
|
|
executorMap[c.Id] = sgai
|
|
}
|
|
}
|
|
|
|
return executorMap, collectorMap
|
|
}
|
|
|
|
func UpdateClusterResource(svc *svc.ServiceContext) {
|
|
list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1")
|
|
if err != nil {
|
|
return
|
|
}
|
|
var wg sync.WaitGroup
|
|
for _, adapter := range list {
|
|
clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, cluster := range clusters.List {
|
|
c := cluster
|
|
clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
_, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
|
|
if !ok {
|
|
wg.Done()
|
|
return
|
|
}
|
|
h := http.Request{}
|
|
stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context())
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
if stat == nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
var cardTotal int64
|
|
var topsTotal float64
|
|
for _, card := range stat.CardsAvail {
|
|
cardTotal += int64(card.CardNum)
|
|
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
|
|
}
|
|
|
|
if (models.TClusterResource{} == *clusterResource) {
|
|
err = svc.Scheduler.AiStorages.SaveClusterResources(adapter.Id, c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
|
|
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
} else {
|
|
if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 {
|
|
wg.Done()
|
|
return
|
|
}
|
|
clusterResource.CardTotal = cardTotal
|
|
clusterResource.CardTopsTotal = topsTotal
|
|
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
|
|
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
|
|
clusterResource.MemAvail = stat.MemAvail
|
|
clusterResource.MemTotal = stat.MemTotal
|
|
clusterResource.DiskAvail = stat.DiskAvail
|
|
clusterResource.DiskTotal = stat.DiskTotal
|
|
|
|
err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
|
|
if err != nil {
|
|
wg.Done()
|
|
return
|
|
}
|
|
}
|
|
wg.Done()
|
|
}()
|
|
}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func UpdateInferTaskStatus(svc *svc.ServiceContext, task *types.TaskModel) {
|
|
var aiTask []*models.TaskAi
|
|
tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
|
|
if len(aiTask) == 0 {
|
|
task.Status = constants.Failed
|
|
tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
if len(aiTask) == 1 {
|
|
if aiTask[0].Status == constants.Completed {
|
|
task.StartTime = aiTask[0].StartTime
|
|
task.EndTime = aiTask[0].EndTime
|
|
task.Status = constants.Succeeded
|
|
} else {
|
|
task.StartTime = aiTask[0].StartTime
|
|
task.Status = aiTask[0].Status
|
|
}
|
|
|
|
task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
//for i := len(aiTask) - 1; i >= 0; i-- {
|
|
// if aiTask[i].StartTime == "" {
|
|
// task.Status = aiTask[i].Status
|
|
// aiTask = append(aiTask[:i], aiTask[i+1:]...)
|
|
// }
|
|
//}
|
|
//
|
|
//if len(aiTask) == 0 {
|
|
// task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
// tx = svc.DbEngin.Table("task").Model(task).Updates(task)
|
|
// if tx.Error != nil {
|
|
// logx.Errorf(tx.Error.Error())
|
|
// return
|
|
// }
|
|
// return
|
|
//}
|
|
|
|
if aiTask[0].StartTime == "" {
|
|
return
|
|
}
|
|
|
|
start, _ := time.ParseInLocation(time.RFC3339, aiTask[0].StartTime, time.Local)
|
|
end, _ := time.ParseInLocation(time.RFC3339, aiTask[0].EndTime, time.Local)
|
|
var status string
|
|
var count int
|
|
for _, a := range aiTask {
|
|
if a.Status == constants.Failed {
|
|
status = a.Status
|
|
break
|
|
}
|
|
|
|
if a.Status == constants.Pending {
|
|
status = a.Status
|
|
continue
|
|
}
|
|
|
|
if a.Status == constants.Running {
|
|
status = a.Status
|
|
continue
|
|
}
|
|
|
|
if a.Status == constants.Completed {
|
|
count++
|
|
continue
|
|
}
|
|
}
|
|
|
|
if count == len(aiTask) {
|
|
status = constants.Succeeded
|
|
}
|
|
|
|
if status == constants.Succeeded {
|
|
task.Status = status
|
|
task.StartTime = start.Format(time.RFC3339)
|
|
task.EndTime = end.Format(time.RFC3339)
|
|
} else {
|
|
task.Status = status
|
|
task.StartTime = start.Format(time.RFC3339)
|
|
}
|
|
|
|
task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
tx = svc.DbEngin.Table("task").Model(task).Updates(task)
|
|
if tx.Error != nil {
|
|
logx.Errorf(tx.Error.Error())
|
|
return
|
|
}
|
|
}
|