diff --git a/internal/cron/cron.go b/internal/cron/cron.go index f06f9395a..505241b97 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -52,13 +52,7 @@ func AddCronGroup(svc *svc.ServiceContext) { //更新hpc任务状态 svc.Cron.AddFunc("*/5 * * * * ?", func() { - list, err := GetHpcTaskList(svc) - if err != nil { - logx.Errorf(err.Error()) - return - } - status.UpdateTaskStatusByHpc(svc, list) - status.UpdateTaskHpcStatus(svc, list) + status.UpdateHpcTaskStatus(svc) }) } diff --git a/internal/logic/hpc/commithpctasklogic.go b/internal/logic/hpc/commithpctasklogic.go index abcd10b96..f4c3d4607 100644 --- a/internal/logic/hpc/commithpctasklogic.go +++ b/internal/logic/hpc/commithpctasklogic.go @@ -6,7 +6,6 @@ import ( jsoniter "github.com/json-iterator/go" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" - "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "strconv" @@ -85,7 +84,6 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t Backend: req.Backend, OperateType: req.OperateType, CmdScript: req.Parameters["cmdScript"], - StartTime: time.Now().Format(constants.Layout), CardCount: cardCount, WorkDir: req.Parameters["workDir"], WallTime: req.Parameters["wallTime"], @@ -155,5 +153,6 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t if updates.Error != nil { return nil, updates.Error } + resp.Data.JobInfo["taskId"] = strconv.FormatInt(taskModel.Id, 10) return resp, nil } diff --git a/internal/middleware/logmiddleware.go b/internal/middleware/logmiddleware.go index 41a589b9b..4754cf86e 100644 --- a/internal/middleware/logmiddleware.go +++ b/internal/middleware/logmiddleware.go @@ -20,7 +20,7 @@ func LogMiddleware(next http.HandlerFunc) http.HandlerFunc { proxy := &responseProxy{w: w} requestLog(r) next(proxy, r) - logx.Infof("LogMiddleware response uri:%s jsonResult :%+v", r.RequestURI, string(proxy.body)) + logx.Debug("LogMiddleware response uri:%s jsonResult :%+v", r.RequestURI, string(proxy.body)) } } @@ -42,15 +42,15 @@ func (p *responseProxy) WriteHeader(statusCode int) { func requestLog(r *http.Request) { // 打印所有header - logx.Infof("LogMiddleware request uri:%s header :%+v", r.RequestURI, r.Header) + logx.Debug("LogMiddleware request uri:%s header :%+v", r.RequestURI, r.Header) // json日志 if withJsonBody(r) { requestDump, err := httputil.DumpRequest(r, true) - logx.Infof("LogMiddleware request uri:%s jsonParams :%+v, err:%+v", r.RequestURI, string(requestDump), err) + logx.Debug("LogMiddleware request uri:%s jsonParams :%+v, err:%+v", r.RequestURI, string(requestDump), err) } else { // form表单日志和其他 formParams, err := httpx.GetFormValues(r) - logx.Infof("LogMiddleware request uri:%s formParams :%+v, err:%+v", r.RequestURI, formParams, err) + logx.Debug("LogMiddleware request uri:%s formParams :%+v, err:%+v", r.RequestURI, formParams, err) } } diff --git a/internal/scheduler/service/utils/status/hpc_task_sync.go b/internal/scheduler/service/utils/status/hpc_task_sync.go index 7159f47d3..2728f7c3d 100644 --- a/internal/scheduler/service/utils/status/hpc_task_sync.go +++ b/internal/scheduler/service/utils/status/hpc_task_sync.go @@ -1,9 +1,7 @@ package status import ( - "fmt" jsoniter "github.com/json-iterator/go" - "github.com/pkg/errors" "github.com/rs/zerolog/log" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs" @@ -11,23 +9,20 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "net/http" "strconv" - "sync" ) -func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc) error { +func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc, status bool, message string) error { report := &jcs.JobStatusReportReq{ TaskName: task.Name, - TaskID: hpcTask.JobId, + TaskID: strconv.FormatInt(task.Id, 10), Messages: make([]*jcs.ReportMessage, 0), } jobMsg := &jcs.ReportMessage{ - Status: true, - Message: "", + Status: status, + Message: message, ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10), Output: hpcTask.WorkDir, } @@ -42,148 +37,73 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc return nil } -// 更新主表的超算任务状态 -func UpdateTaskStatusByHpc(svc *svc.ServiceContext, tasklist []*types.TaskModel) { +// UpdateHpcTaskStatus 更新超算任务状态,并通知中间件 +func UpdateHpcTaskStatus(svc *svc.ServiceContext) { svc.Scheduler.HpcService.TaskSyncLock.Lock() defer svc.Scheduler.HpcService.TaskSyncLock.Unlock() - - for _, task := range tasklist { - hpcTaskList, err := svc.Scheduler.HpcStorages.GetHpcTaskListById(task.Id) + taskList := make([]*models.TaskHpc, 0) + sqlStr := `select * from task_hpc where job_id!='' and status not in('Failed','Completed','Cancelled') order by created_time desc limit 10` + db := svc.DbEngin.Raw(sqlStr).Scan(&taskList) + if db.Error != nil { + logx.Errorf(db.Error.Error()) + return + } + for _, hpc := range taskList { + //更新task表的超算任务状态 + task := &types.TaskModel{} + tx := svc.DbEngin.Model(models.Task{}).Where("id", hpc.TaskId).Scan(&task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + break + } + h := http.Request{} + hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(hpc.AdapterId, 10)].GetTask(h.Context(), hpc.JobId) if err != nil { logx.Errorf(err.Error()) - return - } - if len(hpcTaskList) == 0 { break } - logx.Errorf("############ Report Status Message Before switch %s", task.Status) - if len(hpcTaskList) == 1 { - logx.Errorf("############ Report Status Message Switch %s", hpcTaskList[0].Status) - switch hpcTaskList[0].Status { - - case constants.Completed: - task.Status = constants.Succeeded - logx.Errorf("############ Report Status Message Before Sending %s", task.Status) - - _ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) - case constants.Running: - task.Status = constants.Running - logx.Errorf("############ Report Status Message Before Sending %s", task.Status) - - _ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) - case constants.Failed: - task.Status = constants.Failed - logx.Errorf("############ Report Status Message Before Sending %s", task.Status) - - _ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) - default: - task.Status = hpcTaskList[0].Status + switch hpcTask.Status { + case constants.Running: + if hpc.Status != hpcTask.Status { + svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "running", "任务运行中") + hpc.Status = hpcTask.Status + task.Status = hpcTask.Status } - - task.StartTime = hpcTaskList[0].StartTime - task.EndTime = hpcTaskList[0].EndTime - err := svc.Scheduler.HpcStorages.UpdateTask(task) - if err != nil { - return + case constants.Failed: + if hpc.Status != hpcTask.Status { + svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "failed", "任务失败") + hpc.Status = hpcTask.Status + task.Status = hpcTask.Status + _ = reportHpcStatusMessages(svc, task, hpc, false, "任务失败") } - break - } - logx.Errorf("############ Report Status Message After switch %s", task.Status) - for i := len(hpcTaskList) - 1; i >= 0; i-- { - if hpcTaskList[i].StartTime == "" { - task.Status = hpcTaskList[i].Status - hpcTaskList = append(hpcTaskList[:i], hpcTaskList[i+1:]...) + case constants.Completed: + if hpc.Status != hpcTask.Status { + svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "completed", "任务完成") + hpc.Status = hpcTask.Status + task.Status = hpcTask.Status + _ = reportHpcStatusMessages(svc, task, hpc, true, "任务完成") + } + default: + if hpc.Status != hpcTask.Status { + svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(hpc.AdapterId, 10), hpc.AdapterName, strconv.FormatInt(hpc.ClusterId, 10), hpc.ClusterName, hpc.Name, "pending", "任务pending") + hpc.Status = hpcTask.Status + task.Status = hpcTask.Status } } - if len(hpcTaskList) == 0 { - err := svc.Scheduler.HpcStorages.UpdateTask(task) - if err != nil { - break - } - break - } - } -} - -// UpdateTaskHpcStatus 更新task_hpc表的任务状态 -func UpdateTaskHpcStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { - svc.Scheduler.HpcService.TaskSyncLock.Lock() - defer svc.Scheduler.HpcService.TaskSyncLock.Unlock() - for _, task := range tasklist { - hpcTaskList, err := svc.Scheduler.HpcStorages.GetHpcTaskListById(task.Id) + task.StartTime = hpcTask.Start + task.EndTime = hpcTask.End + hpc.StartTime = hpcTask.Start + hpc.EndTime = hpcTask.End + logx.Info("# task 开始时间: %v, 结束时间: %v", task.StartTime, task.EndTime) + err = svc.Scheduler.HpcStorages.UpdateTask(task) if err != nil { logx.Errorf(err.Error()) - return + break } - if len(hpcTaskList) == 0 { - return + err = svc.Scheduler.HpcStorages.UpdateHpcTask(hpc) + if err != nil { + logx.Errorf(err.Error()) + break } - updateHpcTask(svc, hpcTaskList...) } - -} - -func updateHpcTask(svc *svc.ServiceContext, hpcTaskList ...*models.TaskHpc) { - var wg sync.WaitGroup - for _, hpc := range hpcTaskList { - t := hpc - if t.Status == constants.Completed || t.Status == constants.Failed || t.JobId == "" || t.Status == constants.Cancelled { - continue - } - wg.Add(1) - go func() { - h := http.Request{} - hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)].GetTask(h.Context(), t.JobId) - if err != nil { - if status.Code(err) == codes.DeadlineExceeded { - msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - - msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - if hpcTask == nil { - wg.Done() - return - } - switch hpcTask.Status { - case constants.Running: - if t.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中") - t.Status = hpcTask.Status - } - case constants.Failed: - if t.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败") - t.Status = hpcTask.Status - } - case constants.Completed: - if t.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成") - t.Status = hpcTask.Status - } - default: - if t.Status != hpcTask.Status { - svc.Scheduler.HpcStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending") - t.Status = hpcTask.Status - } - } - t.StartTime = hpcTask.Start - t.EndTime = hpcTask.End - err = svc.Scheduler.HpcStorages.UpdateHpcTask(t) - if err != nil { - msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) - logx.Errorf(errors.New(msg).Error()) - wg.Done() - return - } - wg.Done() - }() - } - wg.Wait() }