Merge pull request 'update tasksync' (#511) from tzwang/pcm-coordinator:master into master

This commit is contained in:
tzwang 2025-06-23 17:47:01 +08:00
commit 5882083278
9 changed files with 46 additions and 20 deletions

View File

@ -2,7 +2,6 @@ package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
@ -14,17 +13,16 @@ import (
type PageListTaskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
taskStatus *status.TaskStatus
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewPageListTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PageListTaskLogic {
return &PageListTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
taskStatus: status.NewTaskStatus(svcCtx.Scheduler.AiStorages, svcCtx.Scheduler.AiService.AiCollectorAdapterMap, &svcCtx.Config),
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
//taskStatus: status.NewTaskStatus(svcCtx.Scheduler.AiStorages, svcCtx.Scheduler.AiService.AiCollectorAdapterMap, &svcCtx.Config),
}
}
@ -64,8 +62,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
}
// 更新智算任务状态
go l.taskStatus.UpdateTaskStatus(list)
go l.taskStatus.UpdateAiTaskStatus(list)
go l.svcCtx.Scheduler.AiService.St.UpdateTaskStatus(list)
go l.svcCtx.Scheduler.AiService.St.UpdateAiTaskStatus(list)
// 计算每个任务的运行时间x
for _, model := range list {

View File

@ -5,7 +5,6 @@ import (
"errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -73,7 +72,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi
list := common.ConcatMultipleSlices(slices)
if len(list) != 0 {
go status.UpdateDeployInstanceStatusBatch(l.svcCtx, list, true)
go l.svcCtx.Scheduler.AiService.Si.UpdateDeployInstanceStatusBatch(list, true)
}
resp.List = &deployTasks

View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -83,7 +82,7 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta
<-buf
return
}
if status.CheckStopStatus(in) {
if l.svcCtx.Scheduler.AiService.Si.CheckStopStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId)
if !success {
e := struct {

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -84,7 +83,7 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc
<-buf
return
}
if status.CheckRunningStatus(in) {
if l.svcCtx.Scheduler.AiService.Si.CheckRunningStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId)
if !success {
e := struct {

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"strconv"
@ -38,7 +37,7 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan
return nil, err
}
if status.CheckRunningStatus(in) {
if l.svcCtx.Scheduler.AiService.Si.CheckRunningStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, in.InstanceId)
if !success {
return nil, errors.New("stop instance failed")

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
@ -170,6 +171,20 @@ func (s *AiStorage) UpdateTask(task *types.TaskModel) error {
return nil
}
func (s *AiStorage) AllTask() ([]*types.TaskModel, error) {
var list []*types.TaskModel
// 构建数据库查询
db := s.DbEngin.Model(&types.TaskModel{}).Table("task")
// 查询任务列表
if err := db.Order("created_time desc").Find(&list).Error; err != nil {
return nil, result.NewDefaultError(err.Error())
}
return list, nil
}
func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error {
var aiOpt *option.AiOption
switch (opt).(type) {

View File

@ -8,6 +8,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/task/tasksync"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink/octopusHttp"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@ -35,6 +36,8 @@ type AiService struct {
LocalCache map[string]interface{}
Conf *config.Config
TaskSyncLock sync.Mutex
St *tasksync.SyncTrain
Si *tasksync.SyncInfer
}
func NewAiService(conf *config.Config, storages *database.AiStorage, localCache map[string]interface{}) (*AiService, error) {
@ -65,6 +68,12 @@ func NewAiService(conf *config.Config, storages *database.AiStorage, localCache
aiService.InferenceAdapterMap[id] = inferMap
}
st := tasksync.NewTrainTask(storages, aiService.AiCollectorAdapterMap, conf)
si := tasksync.NewInferTask(storages, aiService.InferenceAdapterMap, conf)
aiService.St = st
aiService.Si = si
return aiService, nil
}

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
@ -95,7 +96,10 @@ type OctopusHttp struct {
}
func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp {
token, _ := NewToken(server, host, user, pwd)
token, err := NewToken(server, host, user, pwd)
if err != nil {
logx.Infof("Init OctopusHttp, id: %d, host: %s, token error: %s \n", id, host, err)
}
return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}
}

View File

@ -111,10 +111,14 @@ func NewServiceContext(c config.Config) *ServiceContext {
hpcStorage := &database.HpcStorage{DbEngin: dbEngin}
cache := make(map[string]interface{}, 0)
aiService, err := service.NewAiService(&c, storage, cache)
if err != nil {
logx.Error(err.Error())
panic(err)
}
hpcService, err := service.NewHpcService(&c, hpcStorage, cache)
if err != nil {
logx.Error(err.Error())
return nil
panic(err)
}
scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService)
return &ServiceContext{