update tasksync #513

Merged
tzwang merged 1 commits from tzwang/pcm-coordinator:master into master 2025-06-24 15:47:45 +08:00
5 changed files with 39 additions and 13 deletions

View File

@ -59,4 +59,23 @@ func AddCronGroup(svc *svc.ServiceContext) {
status.UpdateHpcTaskStatus(svc)
})
//更新推理任务状态
svc.Cron.AddFunc("*/5 * * * * ?", func() {
tasks, err := svc.Scheduler.AiStorages.GetInferDeployInstanceListLastMonth()
if err != nil {
logx.Error(err)
}
svc.Scheduler.AiService.Si.UpdateDeployInstanceStatusBatch(tasks, true)
})
//更新训练任务状态
svc.Cron.AddFunc("*/10 * * * * ?", func() {
tasks, err := svc.Scheduler.AiStorages.AllTaskLastMonth()
if err != nil {
logx.Error(err)
}
go svc.Scheduler.AiService.St.UpdateTaskStatus(tasks)
go svc.Scheduler.AiService.St.UpdateAiTaskStatus(tasks)
})
}

View File

@ -171,12 +171,17 @@ func (s *AiStorage) UpdateTask(task *types.TaskModel) error {
return nil
}
func (s *AiStorage) AllTask() ([]*types.TaskModel, error) {
func (s *AiStorage) AllTaskLastMonth() ([]*types.TaskModel, error) {
var list []*types.TaskModel
// 构建数据库查询
db := s.DbEngin.Model(&types.TaskModel{}).Table("task")
now := time.Now()
lastMonth := now.AddDate(0, -1, 0)
db = db.Where("created_time >= ?", lastMonth)
// 查询任务列表
if err := db.Order("created_time desc").Find(&list).Error; err != nil {
return nil, result.NewDefaultError(err.Error())
@ -565,9 +570,12 @@ func (s *AiStorage) GetInstanceListByDeployTaskId(id int64) ([]*models.AiInferDe
return list, nil
}
func (s *AiStorage) GetInferDeployInstanceList() ([]*models.AiInferDeployInstance, error) {
func (s *AiStorage) GetInferDeployInstanceListLastMonth() ([]*models.AiInferDeployInstance, error) {
var list []*models.AiInferDeployInstance
tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance").Scan(&list)
now := time.Now()
lastMonth := now.AddDate(0, -1, 0)
tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance where `create_time` >= ?", lastMonth).Scan(&list)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error

View File

@ -252,7 +252,7 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe
}
func UpdateAutoStoppedInstance(svc *svc.ServiceContext) {
list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList()
list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceListLastMonth()
if err != nil {
return
}

View File

@ -272,7 +272,7 @@ func (s *SyncInfer) UpdateDeployInstanceStatus(instance *models.AiInferDeployIns
}
func (s *SyncInfer) UpdateAutoStoppedInstance() {
list, err := s.aiStorages.GetInferDeployInstanceList()
list, err := s.aiStorages.GetInferDeployInstanceListLastMonth()
if err != nil {
return
}

View File

@ -48,6 +48,7 @@ func (s *SyncTrain) UpdateAiTaskStatus(tasklist []*types.TaskModel) {
return
}
buffer := make(chan bool, 10)
for _, task := range list {
aiTaskList, err := s.aiStorages.GetAiTaskListById(task.Id)
if err != nil {
@ -58,7 +59,8 @@ func (s *SyncTrain) UpdateAiTaskStatus(tasklist []*types.TaskModel) {
continue
}
go s.updateAiTask(aiTaskList)
buffer <- true
go s.updateAiTask(aiTaskList, buffer)
}
}
@ -87,23 +89,19 @@ func (s *SyncTrain) UpdateTaskStatus(tasklist []*types.TaskModel) {
continue
}
logx.Errorf("############ Report Status Message Before switch %s", task.Status)
if len(aiTask) == 1 {
logx.Errorf("############ Report Status Message Switch %s", aiTask[0].Status)
switch aiTask[0].Status {
case constants.Completed:
task.Status = constants.Succeeded
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
logx.Errorf("############ Report Status Message: %s", task.Status)
err = s.reportStatusMessages(task, aiTask[0], true)
if err != nil {
logx.Errorf("reportStatusMessages Error %s", err.Error())
}
case constants.Failed:
task.Status = constants.Failed
logx.Errorf("############ Report Status Message Before Sending %s", task.Status)
logx.Errorf("############ Report Status Message: %s", task.Status)
err = s.reportStatusMessages(task, aiTask[0], false)
if err != nil {
logx.Errorf("reportStatusMessages Error %s", err.Error())
@ -124,7 +122,7 @@ func (s *SyncTrain) UpdateTaskStatus(tasklist []*types.TaskModel) {
}
}
func (s *SyncTrain) updateAiTask(aiTaskList []*models.TaskAi) {
func (s *SyncTrain) updateAiTask(aiTaskList []*models.TaskAi, ch chan bool) {
var wg sync.WaitGroup
for _, aitask := range aiTaskList {
t := aitask
@ -187,6 +185,7 @@ func (s *SyncTrain) updateAiTask(aiTaskList []*models.TaskAi) {
}()
}
wg.Wait()
<-ch
}
func (s *SyncTrain) reportStatusMessages(task *types.TaskModel, aiTask *models.TaskAi, status bool) error {