forked from JointCloud/pcm-coordinator
调度结构修改
This commit is contained in:
parent
a07fd6bad5
commit
35dc202d29
|
@ -17,7 +17,6 @@ package scheduler
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
|
||||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
|
||||
|
@ -38,10 +37,15 @@ func NewCloudScheduler() *cloudScheduler {
|
|||
}
|
||||
|
||||
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
|
||||
//参数为空返回 nil
|
||||
if len(providers) == 0 || task == nil {
|
||||
return nil, errors.New("算法获取参数为空")
|
||||
}
|
||||
////参数为空,返回 nil
|
||||
//if len(providers) == 0 || task == nil {
|
||||
// return nil, errors.New("算法获取参数为空")
|
||||
//}
|
||||
//
|
||||
////仅有一个provider,返回nil
|
||||
//if len(providers) == 1 {
|
||||
// return nil, nil
|
||||
//}
|
||||
|
||||
//调度算法
|
||||
strategy := algo.NewK8sStrategy(task, providers...)
|
||||
|
|
|
@ -83,6 +83,9 @@ func (s *scheduler) AssignAndSchedule() error {
|
|||
|
||||
// ParticipantIds 返回唯一值
|
||||
if len(s.participantIds) == 1 {
|
||||
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
|
||||
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
|
||||
}
|
||||
s.task.ParticipantId = s.participantIds[0]
|
||||
return nil
|
||||
}
|
||||
|
@ -93,16 +96,21 @@ func (s *scheduler) AssignAndSchedule() error {
|
|||
return err
|
||||
}
|
||||
|
||||
//集群数量不满足,指定到标签匹配后第一个集群
|
||||
if len(providerList) < 2 {
|
||||
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
|
||||
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
|
||||
}
|
||||
s.task.ParticipantId = s.participantIds[0]
|
||||
return nil
|
||||
}
|
||||
|
||||
//调度算法
|
||||
strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if strategy == nil {
|
||||
s.task.ParticipantId = s.participantIds[0]
|
||||
return nil
|
||||
}
|
||||
|
||||
//调度结果
|
||||
err = s.assignReplicasToResult(strategy, providerList)
|
||||
if err != nil {
|
||||
|
@ -131,23 +139,38 @@ func (s *scheduler) SaveToDb() error {
|
|||
|
||||
func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, error) {
|
||||
task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin)
|
||||
// 查询集群是否可用
|
||||
err := s.checkAvailableParticipants(&providerList)
|
||||
// 过滤可用集群
|
||||
err := s.filterAvailableProviders(&providerList)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
//可用集群为0
|
||||
if len(providerList) == 0 {
|
||||
return nil, nil, errors.New("未能获取可用集群")
|
||||
}
|
||||
|
||||
return task, providerList, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) error {
|
||||
func (s *scheduler) checkIfParticipantAvailable(id ParticipantId) bool {
|
||||
|
||||
workingIds, err := s.getAvailableParticipantIds()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return contains(workingIds, int64(id))
|
||||
}
|
||||
|
||||
func (s *scheduler) getAvailableParticipantIds() ([]int64, error) {
|
||||
|
||||
resp, err := s.participantRpc.ListParticipant(context.Background(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.Code != 200 {
|
||||
return errors.New("集群列表查询失败")
|
||||
return nil, errors.New("集群列表查询失败")
|
||||
}
|
||||
|
||||
var workingIds []int64
|
||||
|
@ -158,9 +181,19 @@ func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) e
|
|||
workingIds = append(workingIds, e.ParticipantId)
|
||||
}
|
||||
|
||||
return workingIds, nil
|
||||
}
|
||||
|
||||
func (s *scheduler) filterAvailableProviders(providerList *[]*algo.Provider) error {
|
||||
|
||||
workingIds, err := s.getAvailableParticipantIds()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var tempList []*algo.Provider
|
||||
for _, provider := range *providerList {
|
||||
if contains(workingIds, provider.Pid) {
|
||||
if contains(workingIds, provider.Pid) && contains(s.participantIds, provider.Pid) {
|
||||
tempList = append(tempList, provider)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue