From 6a1655e4781d52b5686d1ea8a246c5ed916e0dc7 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 24 Jun 2025 15:47:37 +0800 Subject: [PATCH] update tasksync --- internal/cron/cron.go | 19 +++++++++++++++++++ internal/scheduler/database/aiStorage.go | 14 +++++++++++--- .../service/utils/status/deployInstance.go | 2 +- .../service/utils/task/tasksync/infer.go | 2 +- .../service/utils/task/tasksync/train.go | 15 +++++++-------- 5 files changed, 39 insertions(+), 13 deletions(-) diff --git a/internal/cron/cron.go b/internal/cron/cron.go index c3b2ef1d3..e1d43bd31 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -59,4 +59,23 @@ func AddCronGroup(svc *svc.ServiceContext) { status.UpdateHpcTaskStatus(svc) }) + //更新推理任务状态 + svc.Cron.AddFunc("*/5 * * * * ?", func() { + tasks, err := svc.Scheduler.AiStorages.GetInferDeployInstanceListLastMonth() + if err != nil { + logx.Error(err) + } + svc.Scheduler.AiService.Si.UpdateDeployInstanceStatusBatch(tasks, true) + }) + + //更新训练任务状态 + svc.Cron.AddFunc("*/10 * * * * ?", func() { + tasks, err := svc.Scheduler.AiStorages.AllTaskLastMonth() + if err != nil { + logx.Error(err) + } + go svc.Scheduler.AiService.St.UpdateTaskStatus(tasks) + go svc.Scheduler.AiService.St.UpdateAiTaskStatus(tasks) + }) + } diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index df3e2c0ef..e32bb02ef 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -171,12 +171,17 @@ func (s *AiStorage) UpdateTask(task *types.TaskModel) error { return nil } -func (s *AiStorage) AllTask() ([]*types.TaskModel, error) { +func (s *AiStorage) AllTaskLastMonth() ([]*types.TaskModel, error) { var list []*types.TaskModel // 构建数据库查询 db := s.DbEngin.Model(&types.TaskModel{}).Table("task") + now := time.Now() + lastMonth := now.AddDate(0, -1, 0) + + db = db.Where("created_time >= ?", lastMonth) + // 查询任务列表 if err := db.Order("created_time desc").Find(&list).Error; err != nil { return nil, result.NewDefaultError(err.Error()) @@ -565,9 +570,12 @@ func (s *AiStorage) GetInstanceListByDeployTaskId(id int64) ([]*models.AiInferDe return list, nil } -func (s *AiStorage) GetInferDeployInstanceList() ([]*models.AiInferDeployInstance, error) { +func (s *AiStorage) GetInferDeployInstanceListLastMonth() ([]*models.AiInferDeployInstance, error) { var list []*models.AiInferDeployInstance - tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance").Scan(&list) + now := time.Now() + lastMonth := now.AddDate(0, -1, 0) + + tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance where `create_time` >= ?", lastMonth).Scan(&list) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return nil, tx.Error diff --git a/internal/scheduler/service/utils/status/deployInstance.go b/internal/scheduler/service/utils/status/deployInstance.go index 8e550fb69..b758dc1b6 100644 --- a/internal/scheduler/service/utils/status/deployInstance.go +++ b/internal/scheduler/service/utils/status/deployInstance.go @@ -252,7 +252,7 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe } func UpdateAutoStoppedInstance(svc *svc.ServiceContext) { - list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList() + list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceListLastMonth() if err != nil { return } diff --git a/internal/scheduler/service/utils/task/tasksync/infer.go b/internal/scheduler/service/utils/task/tasksync/infer.go index 773088857..03abc9f24 100644 --- a/internal/scheduler/service/utils/task/tasksync/infer.go +++ b/internal/scheduler/service/utils/task/tasksync/infer.go @@ -272,7 +272,7 @@ func (s *SyncInfer) UpdateDeployInstanceStatus(instance *models.AiInferDeployIns } func (s *SyncInfer) UpdateAutoStoppedInstance() { - list, err := s.aiStorages.GetInferDeployInstanceList() + list, err := s.aiStorages.GetInferDeployInstanceListLastMonth() if err != nil { return } diff --git a/internal/scheduler/service/utils/task/tasksync/train.go b/internal/scheduler/service/utils/task/tasksync/train.go index 79e816eae..eeabfce33 100644 --- a/internal/scheduler/service/utils/task/tasksync/train.go +++ b/internal/scheduler/service/utils/task/tasksync/train.go @@ -48,6 +48,7 @@ func (s *SyncTrain) UpdateAiTaskStatus(tasklist []*types.TaskModel) { return } + buffer := make(chan bool, 10) for _, task := range list { aiTaskList, err := s.aiStorages.GetAiTaskListById(task.Id) if err != nil { @@ -58,7 +59,8 @@ func (s *SyncTrain) UpdateAiTaskStatus(tasklist []*types.TaskModel) { continue } - go s.updateAiTask(aiTaskList) + buffer <- true + go s.updateAiTask(aiTaskList, buffer) } } @@ -87,23 +89,19 @@ func (s *SyncTrain) UpdateTaskStatus(tasklist []*types.TaskModel) { continue } - logx.Errorf("############ Report Status Message Before switch %s", task.Status) if len(aiTask) == 1 { - logx.Errorf("############ Report Status Message Switch %s", aiTask[0].Status) switch aiTask[0].Status { case constants.Completed: task.Status = constants.Succeeded - logx.Errorf("############ Report Status Message Before Sending %s", task.Status) - + logx.Errorf("############ Report Status Message: %s", task.Status) err = s.reportStatusMessages(task, aiTask[0], true) if err != nil { logx.Errorf("reportStatusMessages Error %s", err.Error()) } case constants.Failed: task.Status = constants.Failed - logx.Errorf("############ Report Status Message Before Sending %s", task.Status) - + logx.Errorf("############ Report Status Message: %s", task.Status) err = s.reportStatusMessages(task, aiTask[0], false) if err != nil { logx.Errorf("reportStatusMessages Error %s", err.Error()) @@ -124,7 +122,7 @@ func (s *SyncTrain) UpdateTaskStatus(tasklist []*types.TaskModel) { } } -func (s *SyncTrain) updateAiTask(aiTaskList []*models.TaskAi) { +func (s *SyncTrain) updateAiTask(aiTaskList []*models.TaskAi, ch chan bool) { var wg sync.WaitGroup for _, aitask := range aiTaskList { t := aitask @@ -187,6 +185,7 @@ func (s *SyncTrain) updateAiTask(aiTaskList []*models.TaskAi) { }() } wg.Wait() + <-ch } func (s *SyncTrain) reportStatusMessages(task *types.TaskModel, aiTask *models.TaskAi, status bool) error {