Merge pull request 'fix noticeinfo bugs' (#221) from tzwang/pcm-coordinator:master into master

Former-commit-id: 738c04b5d9
This commit is contained in:
tzwang 2024-06-07 18:55:57 +08:00
commit 9cd2e072b2
4 changed files with 81 additions and 7 deletions

View File

@ -114,7 +114,28 @@ func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
wg.Done()
return
}
t.Status = trainingTask.Status
switch trainingTask.Status {
case constants.Running:
if t.Status != trainingTask.Status {
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "running", "任务运行中")
t.Status = trainingTask.Status
}
case constants.Failed:
if t.Status != trainingTask.Status {
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "failed", "任务失败")
t.Status = trainingTask.Status
}
case constants.Completed:
if t.Status != trainingTask.Status {
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "completed", "任务完成")
t.Status = trainingTask.Status
}
default:
if t.Status != trainingTask.Status {
svc.Scheduler.AiStorages.AddNoticeInfo(strconv.FormatInt(t.AdapterId, 10), t.AdapterName, strconv.FormatInt(t.ClusterId, 10), t.ClusterName, t.Name, "pending", "任务pending")
t.Status = trainingTask.Status
}
}
t.StartTime = trainingTask.Start
t.EndTime = trainingTask.End
err = svc.Scheduler.AiStorages.UpdateAiTask(t)

View File

@ -64,7 +64,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
synergystatus = 1
}
strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(req.AiOption.Strategy)
if err != nil {
return nil, err
}
adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(rs[0].AdapterId)
if err != nil {
return nil, err
}
id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(req.AiOption.TaskName, strategyCode, synergystatus)
if err != nil {
return nil, err
@ -84,11 +90,13 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type
clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId)
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
if err != nil {
return nil, err
}
l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中")
resp.Results = append(resp.Results, scheResult)
}

View File

@ -2,6 +2,7 @@ package database
import (
"github.com/zeromicro/go-zero/core/logx"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@ -46,6 +47,16 @@ func (s *AiStorage) GetClusterNameById(id string) (string, error) {
return name, nil
}
func (s *AiStorage) GetAdapterNameById(id string) (string, error) {
var name string
tx := s.DbEngin.Raw("select `name` from t_adapter where `id` = ?", id).Scan(&name)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return "", tx.Error
}
return name, nil
}
func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
var list []types.AdapterInfo
var ids []string
@ -102,7 +113,7 @@ func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int6
return taskModel.Id, nil
}
func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, clusterName string, jobId string, status string, msg string) error {
func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error {
// 构建主任务结构体
aId, err := strconv.ParseInt(option.AdapterId, 10, 64)
if err != nil {
@ -116,6 +127,7 @@ func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId
aiTaskModel := models.TaskAi{
TaskId: taskId,
AdapterId: aId,
AdapterName: adapterName,
ClusterId: cId,
ClusterName: clusterName,
Name: option.TaskName,
@ -281,3 +293,28 @@ func (s *AiStorage) GetStrategyCode(name string) (int64, error) {
}
return strategy, nil
}
func (s *AiStorage) AddNoticeInfo(adapterId string, adapterName string, clusterId string, clusterName string, taskName string, noticeType string, incident string) {
aId, err := strconv.ParseInt(adapterId, 10, 64)
if err != nil {
return
}
cId, err := strconv.ParseInt(clusterId, 10, 64)
if err != nil {
return
}
noticeInfo := clientCore.NoticeInfo{
AdapterId: aId,
AdapterName: adapterName,
ClusterId: cId,
ClusterName: clusterName,
NoticeType: noticeType,
TaskName: taskName,
Incident: incident,
CreatedTime: time.Now(),
}
result := s.DbEngin.Table("t_notice").Create(&noticeInfo)
if result.Error != nil {
logx.Errorf("Task creation failure, err: %v", result.Error)
}
}

View File

@ -45,6 +45,8 @@ type AiScheduler struct {
}
type AiResult struct {
AdapterId string
TaskName string
JobId string
ClusterId string
Strategy string
@ -190,6 +192,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
result, _ = convertType(resp)
mu.Unlock()
result.AdapterId = opt.AdapterId
result.TaskName = opt.TaskName
result.Replica = c.Replicas
result.ClusterId = c.ClusterId
result.Strategy = as.option.StrategyName
@ -222,6 +226,10 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
if err != nil {
return nil, errors.New("database add failed: " + err.Error())
}
adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId)
if err != nil {
return nil, err
}
var errmsg string
for _, err := range errs {
@ -234,7 +242,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
clusterName, _ := as.AiStorages.GetClusterNameById(e.clusterId)
err := as.AiStorages.SaveAiTask(taskId, as.option, e.clusterId, clusterName, "", constants.Failed, msg)
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, e.clusterId, clusterName, "", constants.Failed, msg)
if err != nil {
return nil, errors.New("database add failed: " + err.Error())
}
@ -246,14 +254,14 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
if s.Msg != "" {
msg := fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, "", constants.Failed, msg)
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, "", constants.Failed, msg)
if err != nil {
return nil, errors.New("database add failed: " + err.Error())
}
} else {
msg := fmt.Sprintf("clusterId: %v , submitted successfully, jobId: %v \n", s.ClusterId, s.JobId)
errmsg += msg
err := as.AiStorages.SaveAiTask(taskId, as.option, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
err := as.AiStorages.SaveAiTask(taskId, as.option, adapterName, s.ClusterId, clusterName, s.JobId, constants.Saved, msg)
if err != nil {
return nil, errors.New("database add failed: " + err.Error())
}