JCC-CSScheduler/advisor/internal/scheduler/scheduler.go

442 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package scheduler
import (
"fmt"
"sort"
"github.com/inhies/go-bytesize"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
const (
//每个节点划分的资源等级:
// ResourceLevel1表示所有资源类型均满足 大于等于1.5倍
ResourceLevel1 = 1
// ResourceLevel2表示不满足Level1但所有资源类型均满足 大于等于1倍
ResourceLevel2 = 2
// ResourceLevel3 表示某些资源类型 小于一倍
ResourceLevel3 = 3
CpuResourceWeight float64 = 1
StgResourceWeight float64 = 1.2
CachingWeight float64 = 1
LoadedWeight float64 = 2
)
var ErrScheduleWaiting = fmt.Errorf("no appropriate scheduling node found, please wait")
type Scheduler interface {
Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error)
}
type DefaultSchedule struct {
}
type CandidateSlwNodeInfo struct {
slwNodeID int64
resourceLevel int
resourceScore float64
nodeScore float64
scheme jobmod.JobScheduleScheme
}
func NewDefaultSchedule() *DefaultSchedule {
return &DefaultSchedule{}
}
// 备选节点排序规则:
// 1、按 resourceLevel 升序排序
// 2、若 resourceLevel 同为 1按照 nodeScore 降序排序
// 3、若 resourceLevel 同为 2 或 3按照 resourceScore 降序排序
type ByResourceLevel []CandidateSlwNodeInfo
func (a ByResourceLevel) Len() int { return len(a) }
func (a ByResourceLevel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByResourceLevel) Less(i, j int) bool {
if a[i].resourceLevel < a[j].resourceLevel {
return true
} else if a[i].resourceLevel == a[j].resourceLevel {
switch a[i].resourceLevel {
case ResourceLevel1:
return a[i].nodeScore > a[j].nodeScore
case ResourceLevel2:
return a[i].resourceScore > a[j].resourceScore
case ResourceLevel3:
return a[i].resourceScore > a[j].resourceScore
}
}
return false
}
func (s *DefaultSchedule) Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
JobQueue := make(chan jobmod.NormalJob, 100)
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
var candidateSlwNodeInfos []CandidateSlwNodeInfo
// 查询有哪些算力中心可作为备选,并为各节点划分资源等级
resp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
if err != nil {
return nil, err
}
slwNodeScores, err := s.ComputeAllSlwNodeScore(info.Files)
if err != nil {
return nil, err
}
for _, node := range resp.Nodes {
rl, rs, err := s.computeResourceScore(info.Info.Resources, node.ID)
if err != nil {
return nil, err
}
var nodeScore float64
var scheme jobmod.JobScheduleScheme
score, ok := slwNodeScores[node.ID]
if ok {
nodeScore = score.CodeScore.cachedScore +
score.CodeScore.loadedScore +
score.DatasetScore.cachedScore +
score.DatasetScore.loadedScore +
score.ImageScore.cachedScore +
score.ImageScore.loadedScore
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: node.ID,
Code: jobmod.FileScheduleScheme{Action: s.SchemeStgAction(score.CodeScore.Isloaded)},
Dataset: jobmod.FileScheduleScheme{Action: s.SchemeStgAction(score.DatasetScore.Isloaded)},
Image: jobmod.FileScheduleScheme{Action: s.SchemeImageAction(score.DatasetScore.Isloaded)},
}
} else {
nodeScore = 0
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: node.ID,
Code: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Dataset: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Image: jobmod.FileScheduleScheme{Action: jobmod.ActionImportImage},
}
}
slwNodeInfo := CandidateSlwNodeInfo{
slwNodeID: node.ID,
resourceLevel: rl,
resourceScore: rs,
nodeScore: nodeScore,
scheme: scheme,
}
candidateSlwNodeInfos = append(candidateSlwNodeInfos, slwNodeInfo)
}
sort.Sort(ByResourceLevel(candidateSlwNodeInfos))
if candidateSlwNodeInfos[0].resourceLevel == ResourceLevel3 {
//TODO 放入队列中,等待合适时机重新调度
JobQueue <- info
return nil, ErrScheduleWaiting
} else {
return &candidateSlwNodeInfos[0].scheme, nil
}
}
func (s *DefaultSchedule) SchemeStgAction(isLoaded bool) jobmod.FileScheduleAction {
if isLoaded {
return jobmod.ActionNo
}
return jobmod.ActionLoad
}
func (s *DefaultSchedule) SchemeImageAction(isLoaded bool) jobmod.FileScheduleAction {
if isLoaded {
return jobmod.ActionNo
}
return jobmod.ActionImportImage
}
type ResourceInfo struct {
Type string
Level int
Score float64
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultSchedule) computeResourceScore(resources models.JobResourcesInfo, slwNodeID int64) (int, float64, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, 0, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
res, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID))
if err != nil {
return 0, 0, err
}
resourceLevels := []ResourceInfo{}
for _, r := range res.Datas {
switch r.(type) {
case *models.CPUResourceData:
availCPU := r.(*models.CPUResourceData).Available
if resources.CPU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeCPU,
Level: s.computeSingleLevel(float64(availCPU.Value), resources.CPU),
Score: (float64(availCPU.Value) / resources.CPU) * CpuResourceWeight,
})
}
case *models.NPUResourceData:
availNPU := r.(*models.NPUResourceData).Available
if resources.NPU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeNPU,
Level: s.computeSingleLevel(float64(availNPU.Value), resources.NPU),
Score: (float64(availNPU.Value) / resources.NPU) * CpuResourceWeight,
})
}
case *models.GPUResourceData:
availGPU := r.(*models.GPUResourceData).Available
if resources.GPU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeGPU,
Level: s.computeSingleLevel(float64(availGPU.Value), resources.GPU),
Score: (float64(availGPU.Value) / resources.GPU) * CpuResourceWeight,
})
}
case *models.MLUResourceData:
availMLU := r.(*models.MLUResourceData).Available
if resources.MLU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeMLU,
Level: s.computeSingleLevel(float64(availMLU.Value), resources.MLU),
Score: (float64(availMLU.Value) / resources.MLU) * CpuResourceWeight,
})
}
case *models.StorageResourceData:
availStorage := r.(*models.StorageResourceData).Available
if resources.Storage > 0 {
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return 0, 0, err
}
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeStorage,
Level: s.computeSingleLevel(float64(bytesStorage), float64(resources.Storage)),
Score: (float64(bytesStorage) / float64(resources.Storage)) * StgResourceWeight,
})
}
case *models.MemoryResourceData:
availMemory := r.(*models.MemoryResourceData).Available
if resources.Memory > 0 {
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return 0, 0, err
}
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeMemory,
Level: s.computeSingleLevel(float64(bytesMemory), float64(resources.Memory)),
Score: (float64(bytesMemory) / float64(resources.Memory)) * StgResourceWeight,
})
}
default:
fmt.Println("Unknown Resource Type")
}
}
// 计算资源等级
sort.Slice(resourceLevels, func(i, j int) bool {
return resourceLevels[i].Level > resourceLevels[j].Level
})
resourceLevel := resourceLevels[0].Level
//计算资源得分
totalScore := 0.0
resourceScore := 0.0
if len(resourceLevels) > 0 {
for _, resource := range resourceLevels {
totalScore += resource.Score
}
resourceScore = totalScore / float64(len(resourceLevels))
}
return resourceLevel, resourceScore, nil
}
func (s *DefaultSchedule) computeSingleLevel(avail float64, need float64) int {
if avail >= 1.5*need {
return ResourceLevel1
} else if avail >= need {
return ResourceLevel2
} else {
return ResourceLevel3
}
}
type SlwNodeScore struct {
CodeScore stgScore
DatasetScore stgScore
ImageScore stgScore
}
type stgScore struct {
cachedScore float64
loadedScore float64
Isloaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
}
// 计算节点得分情况
func (s *DefaultSchedule) ComputeAllSlwNodeScore(files jobmod.JobFiles) (map[int64]SlwNodeScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
slwNodeScores := make(map[int64]SlwNodeScore)
//计算code相关得分
codeStgScores, err := s.computeAllStgScore(files.Code.PackageID)
if err != nil {
return nil, err
}
for id, code := range codeStgScores {
slwNodeScores[id] = SlwNodeScore{
CodeScore: code,
}
}
//计算dataset相关得分
datasetStgScores, err := s.computeAllStgScore(files.Dataset.PackageID)
if err != nil {
return nil, err
}
for id, dataset := range datasetStgScores {
if exist, ok := slwNodeScores[id]; ok {
exist.DatasetScore = dataset
slwNodeScores[id] = exist
} else {
slwNodeScores[id] = SlwNodeScore{
DatasetScore: dataset,
}
}
}
//计算image相关得分
magCli, err := globals.ManagerMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
// TODO 内部ImageID和算力中心ImageID对应关系为一对多
res, err := magCli.GetImageInfo(manager.NewGetImageInfo(files.Image.ImageID))
if err != nil {
return nil, err
}
imageStgScores, err := s.computeAllStgScore(files.Image.PackageID)
if err != nil {
return nil, err
}
for id, image := range imageStgScores {
// TODO 此处id错误根据算力中心id做判断待修改
var isLoaded bool
for _, info := range res.ImportingInfos {
if id == info.SlwNodeID {
imageResp, err := colCli.GetImageList(collector.NewGetImageList(id))
if err != nil {
return nil, err
}
for _, imageID := range imageResp.ImageIDs {
if imageID == info.SlwNodeImageID {
isLoaded = true
break
}
}
}
}
if exist, ok := slwNodeScores[id]; ok {
exist.ImageScore = stgScore{
cachedScore: image.cachedScore,
loadedScore: image.loadedScore,
Isloaded: isLoaded,
}
slwNodeScores[id] = exist
} else {
slwNodeScores[id] = SlwNodeScore{
ImageScore: stgScore{
cachedScore: image.cachedScore,
loadedScore: image.loadedScore,
Isloaded: isLoaded,
},
}
}
}
return slwNodeScores, nil
}
// 计算package在各节点的得分情况
func (s *DefaultSchedule) computeAllStgScore(packageID int64) (map[int64]stgScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
stgScores := make(map[int64]stgScore)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
if err != nil {
return nil, err
}
for _, nodeInfo := range cachedResp.NodeInfos {
stgScores[nodeInfo.NodeID] = stgScore{
//TODO 根据缓存方式不同,可能会有不同的计算方式
cachedScore: float64(nodeInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(0, packageID))
if err != nil {
return nil, err
}
for _, nodeID := range loadedResp.StgNodeIDs {
if exist, ok := stgScores[nodeID]; ok {
exist.loadedScore = 1 * LoadedWeight
exist.Isloaded = true
stgScores[nodeID] = exist
} else {
stgScores[nodeID] = stgScore{
loadedScore: 1 * LoadedWeight,
Isloaded: true,
}
}
}
return stgScores, nil
}