update tasksync #511
|
@ -2,7 +2,6 @@ package core
|
|||
|
||||
import (
|
||||
"context"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
||||
|
@ -14,17 +13,16 @@ import (
|
|||
|
||||
type PageListTaskLogic struct {
|
||||
logx.Logger
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
taskStatus *status.TaskStatus
|
||||
ctx context.Context
|
||||
svcCtx *svc.ServiceContext
|
||||
}
|
||||
|
||||
func NewPageListTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PageListTaskLogic {
|
||||
return &PageListTaskLogic{
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
taskStatus: status.NewTaskStatus(svcCtx.Scheduler.AiStorages, svcCtx.Scheduler.AiService.AiCollectorAdapterMap, &svcCtx.Config),
|
||||
Logger: logx.WithContext(ctx),
|
||||
ctx: ctx,
|
||||
svcCtx: svcCtx,
|
||||
//taskStatus: status.NewTaskStatus(svcCtx.Scheduler.AiStorages, svcCtx.Scheduler.AiService.AiCollectorAdapterMap, &svcCtx.Config),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,8 +62,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
|
|||
}
|
||||
|
||||
// 更新智算任务状态
|
||||
go l.taskStatus.UpdateTaskStatus(list)
|
||||
go l.taskStatus.UpdateAiTaskStatus(list)
|
||||
go l.svcCtx.Scheduler.AiService.St.UpdateTaskStatus(list)
|
||||
go l.svcCtx.Scheduler.AiService.St.UpdateAiTaskStatus(list)
|
||||
|
||||
// 计算每个任务的运行时间x
|
||||
for _, model := range list {
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
|
@ -73,7 +72,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi
|
|||
list := common.ConcatMultipleSlices(slices)
|
||||
|
||||
if len(list) != 0 {
|
||||
go status.UpdateDeployInstanceStatusBatch(l.svcCtx, list, true)
|
||||
go l.svcCtx.Scheduler.AiService.Si.UpdateDeployInstanceStatusBatch(list, true)
|
||||
}
|
||||
|
||||
resp.List = &deployTasks
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
|
@ -83,7 +82,7 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta
|
|||
<-buf
|
||||
return
|
||||
}
|
||||
if status.CheckStopStatus(in) {
|
||||
if l.svcCtx.Scheduler.AiService.Si.CheckStopStatus(in) {
|
||||
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId)
|
||||
if !success {
|
||||
e := struct {
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
|
@ -84,7 +83,7 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc
|
|||
<-buf
|
||||
return
|
||||
}
|
||||
if status.CheckRunningStatus(in) {
|
||||
if l.svcCtx.Scheduler.AiService.Si.CheckRunningStatus(in) {
|
||||
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId)
|
||||
if !success {
|
||||
e := struct {
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"strconv"
|
||||
|
@ -38,7 +37,7 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if status.CheckRunningStatus(in) {
|
||||
if l.svcCtx.Scheduler.AiService.Si.CheckRunningStatus(in) {
|
||||
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, in.InstanceId)
|
||||
if !success {
|
||||
return nil, errors.New("stop instance failed")
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
|
||||
"gorm.io/gorm"
|
||||
|
@ -170,6 +171,20 @@ func (s *AiStorage) UpdateTask(task *types.TaskModel) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) AllTask() ([]*types.TaskModel, error) {
|
||||
var list []*types.TaskModel
|
||||
|
||||
// 构建数据库查询
|
||||
db := s.DbEngin.Model(&types.TaskModel{}).Table("task")
|
||||
|
||||
// 查询任务列表
|
||||
if err := db.Order("created_time desc").Find(&list).Error; err != nil {
|
||||
return nil, result.NewDefaultError(err.Error())
|
||||
}
|
||||
|
||||
return list, nil
|
||||
}
|
||||
|
||||
func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName string, clusterId string, clusterName string, jobId string, status string, msg string) error {
|
||||
var aiOpt *option.AiOption
|
||||
switch (opt).(type) {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/task/tasksync"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink/octopusHttp"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
||||
|
@ -35,6 +36,8 @@ type AiService struct {
|
|||
LocalCache map[string]interface{}
|
||||
Conf *config.Config
|
||||
TaskSyncLock sync.Mutex
|
||||
St *tasksync.SyncTrain
|
||||
Si *tasksync.SyncInfer
|
||||
}
|
||||
|
||||
func NewAiService(conf *config.Config, storages *database.AiStorage, localCache map[string]interface{}) (*AiService, error) {
|
||||
|
@ -65,6 +68,12 @@ func NewAiService(conf *config.Config, storages *database.AiStorage, localCache
|
|||
aiService.InferenceAdapterMap[id] = inferMap
|
||||
}
|
||||
|
||||
st := tasksync.NewTrainTask(storages, aiService.AiCollectorAdapterMap, conf)
|
||||
si := tasksync.NewInferTask(storages, aiService.InferenceAdapterMap, conf)
|
||||
|
||||
aiService.St = st
|
||||
aiService.Si = si
|
||||
|
||||
return aiService, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/zeromicro/go-zero/core/logx"
|
||||
common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
|
||||
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
|
||||
|
@ -95,7 +96,10 @@ type OctopusHttp struct {
|
|||
}
|
||||
|
||||
func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp {
|
||||
token, _ := NewToken(server, host, user, pwd)
|
||||
token, err := NewToken(server, host, user, pwd)
|
||||
if err != nil {
|
||||
logx.Infof("Init OctopusHttp, id: %d, host: %s, token error: %s \n", id, host, err)
|
||||
}
|
||||
return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}
|
||||
}
|
||||
|
||||
|
|
|
@ -111,10 +111,14 @@ func NewServiceContext(c config.Config) *ServiceContext {
|
|||
hpcStorage := &database.HpcStorage{DbEngin: dbEngin}
|
||||
cache := make(map[string]interface{}, 0)
|
||||
aiService, err := service.NewAiService(&c, storage, cache)
|
||||
if err != nil {
|
||||
logx.Error(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
hpcService, err := service.NewHpcService(&c, hpcStorage, cache)
|
||||
if err != nil {
|
||||
logx.Error(err.Error())
|
||||
return nil
|
||||
panic(err)
|
||||
}
|
||||
scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService)
|
||||
return &ServiceContext{
|
||||
|
|
Loading…
Reference in New Issue