diff --git a/internal/logic/inference/createinferencetasklogic.go b/internal/logic/inference/createinferencetasklogic.go index 3a34220d1..ceccdb439 100644 --- a/internal/logic/inference/createinferencetasklogic.go +++ b/internal/logic/inference/createinferencetasklogic.go @@ -7,6 +7,7 @@ import ( "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/task" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "strconv" @@ -78,6 +79,10 @@ func (l *CreateInferenceTaskLogic) CreateInferenceTask(req *types.CreateInferenc err = l.createInferenceTask(taskId, adapterClusterMap, opt) if err != nil { + if len(assignedClusters) != 0 { + _ = status.ReportStatus(l.svcCtx, taskName, strconv.FormatInt(taskId, 10), assignedClusters[0].ClusterId, "", false, "") + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } return nil, err } diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 75d1c3c16..19df353d0 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -261,13 +261,6 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass return err } - //report msg - report := &jcs.JobStatusReportReq{ - TaskName: "", - TaskID: strconv.FormatInt(taskId, 10), - Messages: make([]*jcs.ReportMessage, 0), - } - var errmsg string for _, err := range errs { e := (err).(struct { @@ -284,14 +277,23 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass return errors.New("database add failed: " + err.Error()) } - //add report msg - jobMsg := &jcs.ReportMessage{ + //report msg + report := &jcs.TrainReportMessage{ + Type: "Train", + TaskName: "", + TaskID: strconv.FormatInt(taskId, 10), Status: false, Message: msg, ClusterID: e.clusterId, Output: "", } - report.Messages = append(report.Messages, jobMsg) + + //report status + _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) + + logx.Errorf(errors.New(errmsg).Error()) + return errors.New(errmsg) + } for _, s := range results { as.option.ComputeCard = s.Card //execute card @@ -313,18 +315,19 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass } } //add report msg - jobMsg := &jcs.ReportMessage{ + report := &jcs.TrainReportMessage{ + Type: "Train", + TaskName: "", + TaskID: strconv.FormatInt(taskId, 10), Status: false, Message: s.Msg, ClusterID: s.ClusterId, Output: "", } - report.Messages = append(report.Messages, jobMsg) + //report status + _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) } - //report status - _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) - logx.Errorf(errors.New(errmsg).Error()) return errors.New(errmsg) } diff --git a/internal/scheduler/service/utils/jcs/middleware.go b/internal/scheduler/service/utils/jcs/middleware.go index 1f131b859..33fa43662 100644 --- a/internal/scheduler/service/utils/jcs/middleware.go +++ b/internal/scheduler/service/utils/jcs/middleware.go @@ -10,18 +10,30 @@ import ( ) type JobStatusReportReq struct { - TaskName string `json:"taskName"` - TaskID string `json:"taskID"` - Messages []*ReportMessage `json:"messages"` + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Messages []interface{} `json:"messages"` } -type ReportMessage struct { +type TrainReportMessage struct { + Type string `json:"type"` + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` Status bool `json:"status"` Message string `json:"message"` ClusterID string `json:"clusterID"` Output string `json:"output"` } +type InferReportMessage struct { + Type string `json:"type"` + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Status bool `json:"status"` + Message string `json:"message"` + ClusterID string `json:"clusterID"` + Url string `json:"url"` +} -func StatusReport(url string, report *JobStatusReportReq) error { +func StatusReport(url string, report interface{}) error { resp := struct { Code string `json:"code"` Msg string `json:"message"` @@ -49,7 +61,7 @@ func StatusReport(url string, report *JobStatusReportReq) error { return nil } -func TempSaveReportToTask(store *database.AiStorage, task *types.TaskModel, report *JobStatusReportReq) error { +func TempSaveReportToTask(store *database.AiStorage, task *types.TaskModel, report interface{}) error { jsonBytes, err := json.Marshal(report) task.Result = string(jsonBytes) diff --git a/internal/scheduler/service/utils/status/deployInstance.go b/internal/scheduler/service/utils/status/deployInstance.go index 4faeca630..55b49deec 100644 --- a/internal/scheduler/service/utils/status/deployInstance.go +++ b/internal/scheduler/service/utils/status/deployInstance.go @@ -1,6 +1,7 @@ package status import ( + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" @@ -97,6 +98,11 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe } return } + url := ins.InferUrl + err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "") + if err != nil { + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } instance.Status = constants.Running case "stopped": if instance.Status == constants.Stopped { @@ -120,6 +126,11 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe } return } + url := ins.InferUrl + err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "") + if err != nil { + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } instance.Status = constants.Running case "stopped": if instance.Status == constants.Stopped { @@ -130,6 +141,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe return } instance.Status = constants.Stopped + case "failed": + if instance.Status == constants.Failed { + if ch != nil { + <-ch + return + } + return + } + err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status) + if err != nil { + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } + instance.Status = constants.Failed default: instance.Status = ins.Status } @@ -166,6 +190,11 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe } return } + url := ins.InferUrl + err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), url, true, "") + if err != nil { + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } instance.Status = constants.Running case "STOPPED": if instance.Status == constants.Stopped { @@ -184,6 +213,10 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe } return } + err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status) + if err != nil { + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } instance.Status = constants.Failed case "FAILED": if instance.Status == constants.Failed { @@ -193,6 +226,10 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe } return } + err := ReportStatus(svc, instance.InstanceName, strconv.FormatInt(instance.DeployInstanceTaskId, 10), strconv.FormatInt(instance.ClusterId, 10), "", false, ins.Status) + if err != nil { + logx.Errorf("############ Report Infer Task Status Message Error %s", err.Error()) + } instance.Status = constants.Failed default: instance.Status = ins.Status diff --git a/internal/scheduler/service/utils/status/hpc_task_sync.go b/internal/scheduler/service/utils/status/hpc_task_sync.go index ebe0c6e29..fd4f165fa 100644 --- a/internal/scheduler/service/utils/status/hpc_task_sync.go +++ b/internal/scheduler/service/utils/status/hpc_task_sync.go @@ -14,19 +14,16 @@ import ( ) func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpcTask *models.TaskHpc, status bool, message string) error { - report := &jcs.JobStatusReportReq{ - TaskName: task.Name, - TaskID: strconv.FormatInt(task.Id, 10), - Messages: make([]*jcs.ReportMessage, 0), - } - - jobMsg := &jcs.ReportMessage{ + report := &jcs.TrainReportMessage{ + Type: "Train", + TaskName: task.Name, + TaskID: strconv.FormatInt(task.Id, 10), Status: status, Message: message, ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10), Output: hpcTask.WorkDir, } - report.Messages = append(report.Messages, jobMsg) + marshal, _ := jsoniter.MarshalToString(report) log.Debug().Msgf("通知中间件任务状态参数: [%v]", marshal) err := jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) diff --git a/internal/scheduler/service/utils/status/statusSync.go b/internal/scheduler/service/utils/status/statusSync.go index 50d1c276f..f142fa247 100644 --- a/internal/scheduler/service/utils/status/statusSync.go +++ b/internal/scheduler/service/utils/status/statusSync.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -191,12 +192,12 @@ func (s *TaskStatus) updateAiTask(aiTaskList []*models.TaskAi) { } func (s *TaskStatus) reportStatusMessages(task *types.TaskModel, aiTask *models.TaskAi) error { - report := &jcs.JobStatusReportReq{ + report := &jcs.TrainReportMessage{ + Type: "Train", TaskName: task.Name, TaskID: strconv.FormatInt(task.Id, 10), - Messages: make([]*jcs.ReportMessage, 0), } - //add report msg + var output string switch aiTask.ClusterName { case "openI": @@ -205,13 +206,10 @@ func (s *TaskStatus) reportStatusMessages(task *types.TaskModel, aiTask *models. output = aiTask.Output } - jobMsg := &jcs.ReportMessage{ - Status: true, - Message: "", - ClusterID: strconv.FormatInt(aiTask.ClusterId, 10), - Output: output, - } - report.Messages = append(report.Messages, jobMsg) + report.Status = true + report.Message = "" + report.ClusterID = strconv.FormatInt(aiTask.ClusterId, 10) + report.Output = output err := jcs.StatusReport(s.config.JcsMiddleware.JobStatusReportUrl, report) if err != nil { @@ -224,3 +222,21 @@ func (s *TaskStatus) reportStatusMessages(task *types.TaskModel, aiTask *models. } return nil } + +func ReportStatus(svc *svc.ServiceContext, taskName string, taskId string, clusterId string, url string, status bool, msg string) error { + report := &jcs.InferReportMessage{ + Type: "Inference", + TaskName: taskName, + TaskID: taskId, + Status: status, + Message: msg, + ClusterID: clusterId, + Url: url, + } + + err := jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) + if err != nil { + return err + } + return nil +} diff --git a/internal/scheduler/service/utils/status/taskStatusSync.go b/internal/scheduler/service/utils/status/taskStatusSync.go index b34539e52..f91cab6cd 100644 --- a/internal/scheduler/service/utils/status/taskStatusSync.go +++ b/internal/scheduler/service/utils/status/taskStatusSync.go @@ -166,12 +166,12 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { } func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error { - report := &jcs.JobStatusReportReq{ + report := &jcs.TrainReportMessage{ + Type: "Train", TaskName: task.Name, TaskID: strconv.FormatInt(task.Id, 10), - Messages: make([]*jcs.ReportMessage, 0), } - //add report msg + var output string switch aiTask.ClusterName { case "openI": @@ -180,17 +180,17 @@ func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask output = aiTask.Output } - jobMsg := &jcs.ReportMessage{ - Status: true, - Message: "", - ClusterID: strconv.FormatInt(aiTask.ClusterId, 10), - Output: output, + report.Status = true + report.Message = "" + report.ClusterID = strconv.FormatInt(aiTask.ClusterId, 10) + report.Output = output + + err := jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) + if err != nil { + return err } - report.Messages = append(report.Messages, jobMsg) - _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) - - err := jcs.TempSaveReportToTask(svc.Scheduler.AiStorages, task, report) + err = jcs.TempSaveReportToTask(svc.Scheduler.AiStorages, task, report) if err != nil { return err }