refactor UpdateHpcTaskStatus; streamline task synchronization logic, improve error handling, and enhance status reporting

Signed-off-by: jagger <cossjie@foxmail.com>
This commit is contained in:
jagger 2025-07-30 11:24:40 +08:00
parent 3185981438
commit 80f07495d5
1 changed files with 214 additions and 70 deletions

View File

@ -1,6 +1,8 @@
package status
import (
"context"
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog/log"
"github.com/zeromicro/go-zero/core/logx"
@ -10,8 +12,9 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"net/http"
"gorm.io/gorm"
"strconv"
"time"
)
func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc, status bool, message string) error {
@ -38,84 +41,225 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc
}
// UpdateHpcTaskStatus 更新超算任务状态,并通知中间件
//func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
// svc.Scheduler.HpcService.TaskSyncLock.Lock()
// defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()
// taskHpcs := make([]*models.TaskHpc, 0)
// sqlStr := `SELECT *
// FROM task_hpc
// WHERE
// job_id != ''
// AND (
// status NOT IN ('Failed', 'Completed', 'Cancelled')
// OR start_time < created_time
// )
// ORDER BY created_time DESC
// LIMIT 10`
// db := svc.DbEngin.Raw(sqlStr).Scan(&taskHpcs)
// if db.Error != nil {
// logx.Errorf(db.Error.Error())
// return
// }
// for _, hpc := range taskHpcs {
// //更新task表的超算任务状态
// task := &types.TaskModel{}
// tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task)
// if tx.Error != nil {
// logx.Errorf(tx.Error.Error())
// break
// }
// clusterId := utils.Int64ToString(hpc.ClusterId)
// h := http.Request{}
// hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId)
// if err != nil {
// logx.Errorf(err.Error())
// break
// }
// switch hpcTask.Status {
// case constants.Running:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// }
// case constants.Failed:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// logx.Infof("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
// _ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败")
// }
// case constants.Completed:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// logx.Infof("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
// _ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成")
// }
// default:
// if hpc.Status != hpcTask.Status {
// svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending")
// hpc.Status = hpcTask.Status
// task.Status = hpcTask.Status
// }
// }
// //task.Id=hpcTask.
// task.StartTime = hpcTask.Start
// task.EndTime = hpcTask.End
// hpc.StartTime = hpcTask.Start
// hpc.EndTime = hpcTask.End
// logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime)
// err = svc.Scheduler.HpcStorages.UpdateTask(task)
// if err != nil {
// logx.Errorf(err.Error())
// break
// }
// err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc)
// if err != nil {
// logx.Errorf(err.Error())
// break
// }
// }
//}
// UpdateHpcTaskStatus HPC 任务状态同步函数
func UpdateHpcTaskStatus(svc *svc.ServiceContext) {
svc.Scheduler.HpcService.TaskSyncLock.Lock()
defer svc.Scheduler.HpcService.TaskSyncLock.Unlock()
taskList := make([]*models.TaskHpc, 0)
sqlStr := `SELECT *
FROM task_hpc
WHERE
job_id != ''
AND (
status NOT IN ('Failed', 'Completed', 'Cancelled')
OR start_time < created_time
)
ORDER BY created_time DESC
LIMIT 10`
db := svc.DbEngin.Raw(sqlStr).Scan(&taskList)
if db.Error != nil {
logx.Errorf(db.Error.Error())
// 1. 查询需要同步的 HPC 任务
var hpcTasks []*models.TaskHpc
sqlStr := `SELECT * FROM task_hpc WHERE job_id != '' AND status NOT IN ('Failed', 'Completed', 'Cancelled') OR start_time < created_time ORDER BY created_time DESC LIMIT 10`
if err := svc.DbEngin.Raw(sqlStr).Scan(&hpcTasks).Error; err != nil {
logx.Errorf("Failed to query HPC tasks for sync: %v", err)
return
}
for _, hpc := range taskList {
//更新task表的超算任务状态
task := &types.TaskModel{}
tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
break
if len(hpcTasks) == 0 {
return
}
// 2. 批量获取关联的 Task 模型
taskIDs := make([]int64, len(hpcTasks))
for i, hpc := range hpcTasks {
taskIDs[i] = hpc.TaskId
}
taskMap := make(map[int64]*types.TaskModel)
var tasks []*types.TaskModel
if err := svc.DbEngin.Model(&models.Task{}).Where("id IN ?", taskIDs).Find(&tasks).Error; err != nil {
logx.Errorf("Failed to batch query tasks: %v", err)
return
}
for _, task := range tasks {
taskMap[task.Id] = task
}
// 3. 遍历 HPC 任务并更新状态
for _, hpc := range hpcTasks {
task, ok := taskMap[hpc.TaskId]
if !ok {
logx.Errorf("Task with ID %d not found for HPC task %d, skipping", hpc.TaskId, hpc.Id)
continue
}
clusterId := utils.Int64ToString(hpc.ClusterId)
h := http.Request{}
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId, clusterId)
// 使用带超时的 Context防止 API 调用阻塞
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
adapterIDStr := strconv.FormatInt(hpc.AdapterId, 10)
adapter, adapterExists := svc.Scheduler.HpcService.HpcExecutorAdapterMap[adapterIDStr]
if !adapterExists {
logx.Errorf("HPC adapter with ID %s not found, skipping task %s", adapterIDStr, hpc.Name)
continue
}
// 4. 从 HPC 集群获取最新状态
hpcTaskInfo, err := adapter.GetTask(ctx, hpc.JobId, utils.Int64ToString(hpc.ClusterId))
if err != nil {
logx.Errorf(err.Error())
break
logx.Errorf("Failed to get task status from HPC executor for job %s: %v", hpc.JobId, err)
continue // 继续处理下一个任务
}
switch hpcTask.Status {
case constants.Running:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
}
case constants.Failed:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
logx.Infof("[%v]:任务执行失败,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
_ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败")
}
case constants.Completed:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
logx.Infof("[%v]:任务执行完成,发送通知, 任务状态: [%v]", hpcTask, hpcTask.Status)
_ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成")
}
default:
if hpc.Status != hpcTask.Status {
svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending")
hpc.Status = hpcTask.Status
task.Status = hpcTask.Status
}
// 如果状态没有变化,则跳过
if hpc.Status == hpcTaskInfo.Status {
continue
}
task.StartTime = hpcTask.Start
task.EndTime = hpcTask.End
hpc.StartTime = hpcTask.Start
hpc.EndTime = hpcTask.End
logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime)
err = svc.Scheduler.HpcStorages.UpdateTask(task)
// 5. 准备更新
previousStatus := hpc.Status
hpc.Status = hpcTaskInfo.Status
hpc.StartTime = hpcTaskInfo.Start
hpc.EndTime = hpcTaskInfo.End
task.Status = hpcTaskInfo.Status
task.StartTime = hpcTaskInfo.Start
task.EndTime = hpcTaskInfo.End
logx.Infof("HPC task status change detected for job %s: %s -> %s", hpc.JobId, previousStatus, hpc.Status)
// 6. 在事务中更新数据库
err = svc.DbEngin.Transaction(func(tx *gorm.DB) error {
if err := tx.Save(task).Error; err != nil {
return fmt.Errorf("failed to update task table: %w", err)
}
if err := tx.Save(hpc).Error; err != nil {
return fmt.Errorf("failed to update hpc_task table: %w", err)
}
return nil
})
if err != nil {
logx.Errorf(err.Error())
break
logx.Errorf("Failed to update database in transaction for job %s: %v", hpc.JobId, err)
// 事务失败,回滚状态,继续处理下一个任务
hpc.Status = previousStatus
task.Status = previousStatus
continue
}
err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc)
if err != nil {
logx.Errorf(err.Error())
break
// 7. 根据新状态执行后续操作 (通知、报告等)
handleStatusChange(svc, task, hpc, hpcTaskInfo.Status)
}
}
// handleStatusChange 根据新状态执行后续操作
func handleStatusChange(svc *svc.ServiceContext, task *types.TaskModel, hpc *models.TaskHpc, newStatus string) {
adapterIDStr := strconv.FormatInt(hpc.AdapterId, 10)
clusterIDStr := strconv.FormatInt(hpc.ClusterId, 10)
var noticeType, noticeMessage string
var reportSuccess bool
var shouldReport bool
switch newStatus {
case constants.Running:
noticeType = "running"
noticeMessage = "任务运行中"
case constants.Failed:
noticeType = "failed"
noticeMessage = "任务失败"
reportSuccess = false
shouldReport = true
case constants.Completed:
noticeType = "completed"
noticeMessage = "任务完成"
reportSuccess = true
shouldReport = true
case constants.Pending:
noticeType = "pending"
noticeMessage = "任务pending"
default:
// 对于其他未知状态,可以选择记录日志并返回
logx.Errorf("Unhandled HPC task status '%s' for job %s", newStatus, hpc.JobId)
return
}
// 发送通知
svc.Scheduler.HpcStorages.AddNoticeInfo(adapterIDStr, hpc.AdapterName, clusterIDStr, hpc.ClusterName, hpc.Name, noticeType, noticeMessage)
logx.Infof("[%s]: 任务状态变更为 [%s],发送通知。", hpc.Name, newStatus)
// 上报状态
if shouldReport {
if err := reportHpcStatusMessages(svc, task, hpc, reportSuccess, noticeMessage); err != nil {
logx.Errorf("Failed to report HPC status for job %s: %v", hpc.JobId, err)
}
}
}