diff --git a/internal/logic/cloud/commitgeneraltasklogic.go b/internal/logic/cloud/commitgeneraltasklogic.go index e07fc804c..87f232459 100644 --- a/internal/logic/cloud/commitgeneraltasklogic.go +++ b/internal/logic/cloud/commitgeneraltasklogic.go @@ -5,6 +5,7 @@ import ( "context" "github.com/pkg/errors" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -69,7 +70,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er utils.Convert(&req, &opt) sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) - results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, scheduler.JOINT_CLOUD_MODE) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return err diff --git a/internal/logic/core/commitvmtasklogic.go b/internal/logic/core/commitvmtasklogic.go index 7e79514b5..3d0c1cb88 100644 --- a/internal/logic/core/commitvmtasklogic.go +++ b/internal/logic/core/commitvmtasklogic.go @@ -4,6 +4,7 @@ import ( "context" "fmt" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" @@ -62,7 +63,7 @@ func (l *CommitVmTaskLogic) CommitVmTask(req *types.CommitVmTaskReq) (resp *type return nil, err } // 3、Return scheduling results - results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(vmSchdl, scheduler.JOINT_CLOUD_MODE) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return nil, err diff --git a/internal/logic/schedule/scheduleruntasklogic.go b/internal/logic/schedule/scheduleruntasklogic.go index 21d305c81..c68adeb6e 100644 --- a/internal/logic/schedule/scheduleruntasklogic.go +++ b/internal/logic/schedule/scheduleruntasklogic.go @@ -42,6 +42,11 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ return nil, err } + //schedule, err := l.svcCtx.Scheduler.AssignAndSchedule() + //if err != nil { + // return nil, err + //} + adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(ADAPTERID) if err != nil { return nil, err diff --git a/internal/logic/schedule/schedulesubmitlogic.go b/internal/logic/schedule/schedulesubmitlogic.go index 064588f17..c5934a8ae 100644 --- a/internal/logic/schedule/schedulesubmitlogic.go +++ b/internal/logic/schedule/schedulesubmitlogic.go @@ -2,6 +2,7 @@ package schedule import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" @@ -51,7 +52,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type return nil, err } - results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.JOINT_CLOUD_MODE) if err != nil { return nil, err } diff --git a/internal/mqs/ScheduleAi.go b/internal/mqs/ScheduleAi.go index 748c64217..2406a90f1 100644 --- a/internal/mqs/ScheduleAi.go +++ b/internal/mqs/ScheduleAi.go @@ -16,6 +16,7 @@ package mqs import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" ) @@ -40,7 +41,7 @@ func (l *AiQueue) Consume(val string) error { aiSchdl, _ := schedulers.NewAiScheduler(l.ctx, val, l.svcCtx.Scheduler, nil) // 调度算法 - _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl) + _, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, scheduler.JOINT_CLOUD_MODE) if err != nil { return err } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 5df4c0f0e..37ad5a7ee 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -29,6 +29,11 @@ import ( "sync" ) +const ( + JOINT_CLOUD_MODE = iota + 1 + STORAGE_SCHEDULE_MODE +) + type Scheduler struct { task *response.TaskInfo participantIds []int64 @@ -43,7 +48,7 @@ type Scheduler struct { type SubSchedule interface { GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) PickOptimalStrategy() (strategy.Strategy, error) - AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) + AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) } func NewScheduler(subSchedule SubSchedule, val string, dbEngin *gorm.DB) (*Scheduler, error) { @@ -126,7 +131,7 @@ func (s *Scheduler) TempAssign() error { return nil } -func (s *Scheduler) AssignAndSchedule(ss SubSchedule) (interface{}, error) { +func (s *Scheduler) AssignAndSchedule(ss SubSchedule, mode int) (interface{}, error) { //choose strategy strategy, err := ss.PickOptimalStrategy() if err != nil { @@ -140,7 +145,7 @@ func (s *Scheduler) AssignAndSchedule(ss SubSchedule) (interface{}, error) { } //assign tasks to clusters - resp, err := ss.AssignTask(clusters) + resp, err := ss.AssignTask(clusters, mode) if err != nil { return nil, err } diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index beca0aa60..8fc3b21cc 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -140,7 +140,7 @@ func (as *AiScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return nil, errors.New("no strategy has been chosen") } -func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { +func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) { if clusters == nil { return nil, errors.New("clusters is nil") } @@ -173,7 +173,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa wg.Add(1) go func() { opt, _ := cloneAiOption(as.option) - resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt) + resp, err := executorMap[c.ClusterId].Execute(as.ctx, opt, mode) if err != nil { e := struct { err error @@ -227,6 +227,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa return nil, err } + // aiTasks adapterName, err := as.AiStorages.GetAdapterNameById(as.option.AdapterId) if err != nil { return nil, err diff --git a/internal/scheduler/schedulers/cloudScheduler.go b/internal/scheduler/schedulers/cloudScheduler.go index cbd862b22..19b8b1783 100644 --- a/internal/scheduler/schedulers/cloudScheduler.go +++ b/internal/scheduler/schedulers/cloudScheduler.go @@ -114,7 +114,7 @@ func (as *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) { return nil, errors.New("no strategy has been chosen") } -func (as *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { +func (as *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) { if clusters == nil { return nil, errors.New("clusters is nil") } diff --git a/internal/scheduler/schedulers/hpcScheduler.go b/internal/scheduler/schedulers/hpcScheduler.go index 4ca0e04a3..0fb0ff5fb 100644 --- a/internal/scheduler/schedulers/hpcScheduler.go +++ b/internal/scheduler/schedulers/hpcScheduler.go @@ -50,6 +50,6 @@ func (h *HpcScheduler) genTaskAndProviders(task *response.TaskInfo) (*providerPr return nil, nil } -func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { +func (h *HpcScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) { return nil, nil } diff --git a/internal/scheduler/schedulers/vmScheduler.go b/internal/scheduler/schedulers/vmScheduler.go index 0163d6931..172d483db 100644 --- a/internal/scheduler/schedulers/vmScheduler.go +++ b/internal/scheduler/schedulers/vmScheduler.go @@ -114,7 +114,7 @@ func (vm *VmScheduler) genTaskAndProviders() (*providerPricing.Task, []*provider return nil, providerList, nil } -func (as *VmScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) { +func (as *VmScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int) (interface{}, error) { //TODO implement me if clusters == nil { return nil, errors.New("clusters is nil") diff --git a/internal/scheduler/service/executor/aiExecutor.go b/internal/scheduler/service/executor/aiExecutor.go index 5cdf65a9b..e252d16d9 100644 --- a/internal/scheduler/service/executor/aiExecutor.go +++ b/internal/scheduler/service/executor/aiExecutor.go @@ -6,5 +6,5 @@ import ( ) type AiExecutor interface { - Execute(ctx context.Context, option *option.AiOption) (interface{}, error) + Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) } diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index 5c0bf9613..ca044212c 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -570,7 +570,7 @@ func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*co return &task, nil } -func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { +func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { err := m.GenerateSubmitParams(ctx, option) if err != nil { return nil, err diff --git a/internal/storeLink/octopus.go b/internal/storeLink/octopus.go index 852e9c3b4..ecc7bdc7c 100644 --- a/internal/storeLink/octopus.go +++ b/internal/storeLink/octopus.go @@ -555,7 +555,7 @@ func (o *OctopusLink) GetTrainingTask(ctx context.Context, taskId string) (*coll return &task, nil } -func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { +func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { err := o.GenerateSubmitParams(ctx, option) if err != nil { return nil, err diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index c95d1f556..637a141e8 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -47,7 +47,7 @@ func NewOpenI(host string, id int64, name string, token string) *OpenI { } } -func (o OpenI) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { +func (o OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { return nil, errors.New("failed to implement") } diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index 5a0a43692..b0a3afb44 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -587,7 +587,7 @@ func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*colle return &task, nil } -func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { +func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { err := s.GenerateSubmitParams(ctx, option) if err != nil { return nil, err diff --git a/internal/storeLink/template.go b/internal/storeLink/template.go index b9c3d2873..f5147016f 100644 --- a/internal/storeLink/template.go +++ b/internal/storeLink/template.go @@ -28,7 +28,7 @@ func NewTemplate(host string, id int64, name string, token string) *Template { } // Execute 执行任务 -func (o Template) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { +func (o Template) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { return nil, errors.New("failed to implement") }