优化调度方案

This commit is contained in:
songjc 2023-09-19 15:04:45 +08:00
parent a58793e734
commit 20425ea3ad
2 changed files with 258 additions and 233 deletions

View File

@ -34,22 +34,46 @@ type Scheduler interface {
Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error)
}
type Rescheduling struct {
type DefaultSchedule struct {
}
type SlwNodeInfo struct {
type CandidateSlwNodeInfo struct {
slwNodeID int64
resourceLevel int
resourceScore float64
nodeScore float64
scheme *jobmod.JobScheduleScheme
scheme jobmod.JobScheduleScheme
}
func NewRescheduling() *Rescheduling {
return &Rescheduling{}
func NewDefaultSchedule() *DefaultSchedule {
return &DefaultSchedule{}
}
func (r *Rescheduling) Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
// 备选节点排序规则:
// 1、按 resourceLevel 升序排序
// 2、若 resourceLevel 同为 1按照 nodeScore 降序排序
// 3、若 resourceLevel 同为 2 或 3按照 resourceScore 降序排序
type ByResourceLevel []CandidateSlwNodeInfo
func (a ByResourceLevel) Len() int { return len(a) }
func (a ByResourceLevel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByResourceLevel) Less(i, j int) bool {
if a[i].resourceLevel < a[j].resourceLevel {
return true
} else if a[i].resourceLevel == a[j].resourceLevel {
switch a[i].resourceLevel {
case ResourceLevel1:
return a[i].nodeScore > a[j].nodeScore
case ResourceLevel2:
return a[i].resourceScore > a[j].resourceScore
case ResourceLevel3:
return a[i].resourceScore > a[j].resourceScore
}
}
return false
}
func (s *DefaultSchedule) Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
JobQueue := make(chan jobmod.NormalJob, 100)
colCli, err := globals.CollectorMQPool.Acquire()
@ -58,104 +82,94 @@ func (r *Rescheduling) Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleSchem
}
defer colCli.Close()
var backupSlwNodeInfos []SlwNodeInfo
var candidateSlwNodeInfos []CandidateSlwNodeInfo
// 查询有哪些算力中心可作为备选,并为各节点划分资源等级
resp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
if err != nil {
return nil, err
}
slwNodeScores, err := s.ComputeAllSlwNodeScore(info.Files)
if err != nil {
return nil, err
}
for _, node := range resp.Nodes {
rl, rs, err := r.CheckResourceAvailability(info.Info.Resources, node.ID)
rl, rs, err := s.computeResourceScore(info.Info.Resources, node.ID)
if err != nil {
return nil, err
}
slwNodeInfo := SlwNodeInfo{
var nodeScore float64
var scheme jobmod.JobScheduleScheme
score, ok := slwNodeScores[node.ID]
if ok {
nodeScore = score.CodeScore.cachedScore +
score.CodeScore.loadedScore +
score.DatasetScore.cachedScore +
score.DatasetScore.loadedScore +
score.ImageScore.cachedScore +
score.ImageScore.loadedScore
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: node.ID,
Code: jobmod.FileScheduleScheme{Action: s.SchemeStgAction(score.CodeScore.Isloaded)},
Dataset: jobmod.FileScheduleScheme{Action: s.SchemeStgAction(score.DatasetScore.Isloaded)},
Image: jobmod.FileScheduleScheme{Action: s.SchemeImageAction(score.DatasetScore.Isloaded)},
}
} else {
nodeScore = 0
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: node.ID,
Code: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Dataset: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Image: jobmod.FileScheduleScheme{Action: jobmod.ActionImportImage},
}
}
slwNodeInfo := CandidateSlwNodeInfo{
slwNodeID: node.ID,
resourceLevel: rl,
resourceScore: rs,
nodeScore: nodeScore,
scheme: scheme,
}
backupSlwNodeInfos = append(backupSlwNodeInfos, slwNodeInfo)
candidateSlwNodeInfos = append(candidateSlwNodeInfos, slwNodeInfo)
}
// 遍历各节点,并筛选出满足资源条件的节点
var matchingSlwNodes []SlwNodeInfo
for _, slwNode := range backupSlwNodeInfos {
if slwNode.resourceLevel == ResourceLevel1 {
matchingSlwNodes = append(matchingSlwNodes, slwNode)
}
}
sort.Sort(ByResourceLevel(candidateSlwNodeInfos))
// 如果只有一个满足条件的节点, 直接选择该节点,并将存储资源调度到该节点上
if len(matchingSlwNodes) == 1 {
_, scheme, err := r.ComputeSlwNodeScore(info.Files, matchingSlwNodes[0].slwNodeID)
if err != nil {
return nil, err
}
return scheme, nil
//如果有两个及以上节点满足条件,计算各节点的分数, 并选择得分最高的一个
} else if len(matchingSlwNodes) > 1 {
for _, slwNode := range matchingSlwNodes {
score, scheme, err := r.ComputeSlwNodeScore(info.Files, slwNode.slwNodeID)
if err != nil {
return nil, err
}
slwNode.nodeScore = score
slwNode.scheme = scheme
}
sort.Slice(matchingSlwNodes, func(i, j int) bool {
return matchingSlwNodes[i].nodeScore > matchingSlwNodes[j].nodeScore
})
return matchingSlwNodes[0].scheme, nil
// 如果没有满足条件的节点, 在ResourceLevel为2的节点中进行选择
if candidateSlwNodeInfos[0].resourceLevel == ResourceLevel3 {
//TODO 放入队列中,等待合适时机重新调度
JobQueue <- info
return nil, ErrScheduleWaiting
} else {
var secondaryMatchingSlwNodes []SlwNodeInfo
for _, slwNode := range backupSlwNodeInfos {
if slwNode.resourceLevel == ResourceLevel2 {
secondaryMatchingSlwNodes = append(secondaryMatchingSlwNodes, slwNode)
}
}
// 如果只有一个满足条件的节点, 直接选择该节点,并将存储资源调度到该节点上
if len(secondaryMatchingSlwNodes) == 1 {
_, scheme, err := r.ComputeSlwNodeScore(info.Files, secondaryMatchingSlwNodes[0].slwNodeID)
if err != nil {
return nil, err
}
return scheme, nil
//如果有两个及以上节点满足条件,选择资源最充足的一个
} else if len(secondaryMatchingSlwNodes) > 1 {
sort.Slice(secondaryMatchingSlwNodes, func(i, j int) bool {
return secondaryMatchingSlwNodes[i].resourceScore > secondaryMatchingSlwNodes[j].resourceScore
})
_, scheme, err := r.ComputeSlwNodeScore(info.Files, secondaryMatchingSlwNodes[0].slwNodeID)
if err != nil {
return nil, err
}
return scheme, nil
//若仍没有符合条件的节点,放入队列中等待
} else {
//TODO 放入队列中,等待合适时机重新调度
JobQueue <- info
return nil, ErrScheduleWaiting
}
return &candidateSlwNodeInfos[0].scheme, nil
}
}
func (s *DefaultSchedule) SchemeStgAction(isLoaded bool) jobmod.FileScheduleAction {
if isLoaded {
return jobmod.ActionNo
}
return jobmod.ActionLoad
}
func (s *DefaultSchedule) SchemeImageAction(isLoaded bool) jobmod.FileScheduleAction {
if isLoaded {
return jobmod.ActionNo
}
return jobmod.ActionImportImage
}
type ResourceInfo struct {
Type string
Level int
Score float64
}
// 划分节点资源等级,并计算资源得分
func (r *Rescheduling) CheckResourceAvailability(resources models.JobResourcesInfo, slwNodeID int64) (int, float64, error) {
func (s *DefaultSchedule) computeResourceScore(resources models.JobResourcesInfo, slwNodeID int64) (int, float64, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, 0, fmt.Errorf("new collector client: %w", err)
@ -167,15 +181,7 @@ func (r *Rescheduling) CheckResourceAvailability(resources models.JobResourcesIn
return 0, 0, err
}
var CPUScore float64
var NPUScore float64
var GPUScore float64
var MLUScore float64
var StorageScore float64
var MemoryScore float64
resourceLevel := ResourceLevel1
resourceCount := 0
resourceLevels := []ResourceInfo{}
for _, r := range res.Datas {
switch r.(type) {
@ -183,105 +189,74 @@ func (r *Rescheduling) CheckResourceAvailability(resources models.JobResourcesIn
availCPU := r.(*models.CPUResourceData).Available
if resources.CPU > 0 {
resourceCount++
if float64(availCPU.Value) < 1.5*resources.CPU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availCPU.Value) < resources.CPU {
resourceLevel = ResourceLevel3
}
CPUScore = (float64(availCPU.Value) / resources.CPU) * CpuResourceWeight
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeCPU,
Level: s.computeSingleLevel(float64(availCPU.Value), resources.CPU),
Score: (float64(availCPU.Value) / resources.CPU) * CpuResourceWeight,
})
}
case *models.NPUResourceData:
availNPU := r.(*models.NPUResourceData).Available
if resources.NPU > 0 {
resourceCount++
if float64(availNPU.Value) < 1.5*resources.NPU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availNPU.Value) < resources.NPU {
resourceLevel = ResourceLevel3
}
NPUScore = (float64(availNPU.Value) / resources.NPU) * CpuResourceWeight
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeNPU,
Level: s.computeSingleLevel(float64(availNPU.Value), resources.NPU),
Score: (float64(availNPU.Value) / resources.NPU) * CpuResourceWeight,
})
}
case *models.GPUResourceData:
availGPU := r.(*models.GPUResourceData).Available
if resources.GPU > 0 {
resourceCount++
if float64(availGPU.Value) < 1.5*resources.GPU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availGPU.Value) < resources.GPU {
resourceLevel = ResourceLevel3
}
GPUScore = (float64(availGPU.Value) / resources.GPU) * CpuResourceWeight
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeGPU,
Level: s.computeSingleLevel(float64(availGPU.Value), resources.GPU),
Score: (float64(availGPU.Value) / resources.GPU) * CpuResourceWeight,
})
}
case *models.MLUResourceData:
availMLU := r.(*models.MLUResourceData).Available
if resources.MLU > 0 {
resourceCount++
if float64(availMLU.Value) < 1.5*resources.MLU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availMLU.Value) < resources.MLU {
resourceLevel = ResourceLevel3
}
MLUScore = (float64(availMLU.Value) / resources.MLU) * CpuResourceWeight
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeMLU,
Level: s.computeSingleLevel(float64(availMLU.Value), resources.MLU),
Score: (float64(availMLU.Value) / resources.MLU) * CpuResourceWeight,
})
}
case *models.StorageResourceData:
availStorage := r.(*models.StorageResourceData).Available
if resources.Storage > 0 {
resourceCount++
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return 0, 0, err
}
if float64(bytesStorage) < 1.5*float64(resources.Storage) && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(bytesStorage) < float64(resources.Storage) {
resourceLevel = ResourceLevel3
}
StorageScore = (float64(bytesStorage) / float64(resources.Storage)) * StgResourceWeight
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeStorage,
Level: s.computeSingleLevel(float64(bytesStorage), float64(resources.Storage)),
Score: (float64(bytesStorage) / float64(resources.Storage)) * StgResourceWeight,
})
}
case *models.MemoryResourceData:
availMemory := r.(*models.MemoryResourceData).Available
if resources.Memory > 0 {
resourceCount++
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return 0, 0, err
}
if float64(bytesMemory) < 1.5*float64(resources.Memory) && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(bytesMemory) < float64(resources.Memory) {
resourceLevel = ResourceLevel3
}
MemoryScore = (float64(bytesMemory) / float64(resources.Memory)) * StgResourceWeight
resourceLevels = append(resourceLevels, ResourceInfo{
Type: models.ResourceTypeMemory,
Level: s.computeSingleLevel(float64(bytesMemory), float64(resources.Memory)),
Score: (float64(bytesMemory) / float64(resources.Memory)) * StgResourceWeight,
})
}
default:
@ -289,141 +264,176 @@ func (r *Rescheduling) CheckResourceAvailability(resources models.JobResourcesIn
}
}
// 计算资源权值之和
resourceScore := (CPUScore + NPUScore + GPUScore + MLUScore + StorageScore + MemoryScore) / float64(resourceCount)
// 计算资源等级
sort.Slice(resourceLevels, func(i, j int) bool {
return resourceLevels[i].Level > resourceLevels[j].Level
})
resourceLevel := resourceLevels[0].Level
//计算资源得分
totalScore := 0.0
resourceScore := 0.0
if len(resourceLevels) > 0 {
for _, resource := range resourceLevels {
totalScore += resource.Score
}
resourceScore = totalScore / float64(len(resourceLevels))
}
return resourceLevel, resourceScore, nil
}
// 计算节点得分情况,并生成调度方案
func (r *Rescheduling) ComputeSlwNodeScore(files jobmod.JobFiles, slwNodeID int64) (float64, *jobmod.JobScheduleScheme, error) {
var scheme *jobmod.JobScheduleScheme
func (s *DefaultSchedule) computeSingleLevel(avail float64, need float64) int {
if avail >= 1.5*need {
return ResourceLevel1
} else if avail >= need {
return ResourceLevel2
} else {
return ResourceLevel3
}
}
type SlwNodeScore struct {
CodeScore stgScore
DatasetScore stgScore
ImageScore stgScore
}
type stgScore struct {
cachedScore float64
loadedScore float64
Isloaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
}
// 计算节点得分情况
func (s *DefaultSchedule) ComputeAllSlwNodeScore(files jobmod.JobFiles) (map[int64]SlwNodeScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, fmt.Errorf("new collector client: %w", err)
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
//计算code相关得分
codeScore, isLoading, err := r.ComputeStgScore(files.Code.PackageID, slwNodeID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
slwNodeScores := make(map[int64]SlwNodeScore)
if isLoading {
scheme.Code = jobmod.FileScheduleScheme{
Action: jobmod.ActionLoad,
}
} else {
scheme.Code = jobmod.FileScheduleScheme{
Action: jobmod.ActionNo,
//计算code相关得分
codeStgScores, err := s.computeAllStgScore(files.Code.PackageID)
if err != nil {
return nil, err
}
for id, code := range codeStgScores {
slwNodeScores[id] = SlwNodeScore{
CodeScore: code,
}
}
//计算dataset相关得分
datasetScore, isLoading, err := r.ComputeStgScore(files.Dataset.PackageID, slwNodeID)
datasetStgScores, err := s.computeAllStgScore(files.Dataset.PackageID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
return nil, err
}
if isLoading {
scheme.Dataset = jobmod.FileScheduleScheme{
Action: jobmod.ActionLoad,
}
} else {
scheme.Dataset = jobmod.FileScheduleScheme{
Action: jobmod.ActionNo,
for id, dataset := range datasetStgScores {
if exist, ok := slwNodeScores[id]; ok {
exist.DatasetScore = dataset
slwNodeScores[id] = exist
} else {
slwNodeScores[id] = SlwNodeScore{
DatasetScore: dataset,
}
}
}
//计算image相关得分
imageScore, _, err := r.ComputeStgScore(files.Image.PackageID, slwNodeID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
magCli, err := globals.ManagerMQPool.Acquire()
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, fmt.Errorf("new manager client: %w", err)
return nil, fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
// TODO 内部ImageID和算力中心ImageID对应关系为一对多
res, err := magCli.GetImageInfo(manager.NewGetImageInfo(files.Image.ImageID))
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
return nil, err
}
imageResp, err := colCli.GetImageList(collector.NewGetImageList(slwNodeID))
imageStgScores, err := s.computeAllStgScore(files.Image.PackageID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
return nil, err
}
isImport := true
for _, imageID := range imageResp.ImageIDs {
if imageID == res.SlwNodeImageID {
isImport = false
break
for id, image := range imageStgScores {
// TODO 此处id错误根据算力中心id做判断待修改
var isLoaded bool
if id == res.SlwNodeImageID {
imageResp, err := colCli.GetImageList(collector.NewGetImageList(id))
if err != nil {
return nil, err
}
for _, imageID := range imageResp.ImageIDs {
if imageID == res.SlwNodeImageID {
isLoaded = true
break
}
}
}
if exist, ok := slwNodeScores[id]; ok {
exist.ImageScore = stgScore{
cachedScore: image.cachedScore,
loadedScore: image.loadedScore,
Isloaded: isLoaded,
}
slwNodeScores[id] = exist
} else {
slwNodeScores[id] = SlwNodeScore{
ImageScore: stgScore{
cachedScore: image.cachedScore,
loadedScore: image.loadedScore,
Isloaded: isLoaded,
},
}
}
}
if isImport {
scheme.Image = jobmod.FileScheduleScheme{
Action: jobmod.ActionImportImage,
}
} else {
scheme.Image = jobmod.FileScheduleScheme{
Action: jobmod.ActionNo,
}
}
slwNodeScore := codeScore + datasetScore + imageScore
scheme.TargetSlwNodeID = slwNodeID
return slwNodeScore, scheme, nil
return slwNodeScores, nil
}
// 计算package在该节点下的得分情况
func (r *Rescheduling) ComputeStgScore(packageID int64, slwNodeID int64) (float64, bool, error) {
// 计算package在各节点的得分情况
func (s *DefaultSchedule) computeAllStgScore(packageID int64) (map[int64]stgScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, false, fmt.Errorf("new collector client: %w", err)
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
stgScores := make(map[int64]stgScore)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
if err != nil {
return 0, false, err
return nil, err
}
var cachedScore float64
for _, nodeInfo := range cachedResp.NodeInfos {
if nodeInfo.NodeID == slwNodeID {
stgScores[nodeInfo.NodeID] = stgScore{
//TODO 根据缓存方式不同,可能会有不同的计算方式
cachedScore = float64(nodeInfo.FileSize/cachedResp.PackageSize) * CachingWeight
break
cachedScore: float64(nodeInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(0, packageID))
if err != nil {
return 0, false, err
return nil, err
}
var loadedScore float64
for _, nodeID := range loadedResp.StgNodeIDs {
if nodeID == slwNodeID {
loadedScore = 1 * LoadedWeight
break
if exist, ok := stgScores[nodeID]; ok {
exist.loadedScore = 1 * LoadedWeight
exist.Isloaded = true
stgScores[nodeID] = exist
} else {
stgScores[nodeID] = stgScore{
loadedScore: 1 * LoadedWeight,
Isloaded: true,
}
}
}
var isLoading bool
if loadedScore == 0 {
isLoading = true
} else {
isLoading = false
}
return cachedScore + loadedScore, isLoading, nil
return stgScores, nil
}

View File

@ -50,19 +50,34 @@ func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) error {
if err != nil {
return err
}
var rescheduling scheduler.Rescheduling
var defaultSchedule scheduler.DefaultSchedule
var scheme jobmod.JobScheduleScheme
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心并生成调度方案
_, s, err := rescheduling.ComputeSlwNodeScore(t.Job.Files, t.Job.TargetSlwNodeID)
scheme = *s
resp, err := defaultSchedule.ComputeAllSlwNodeScore(t.Job.Files)
score, ok := resp[t.Job.TargetSlwNodeID]
if ok {
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: t.Job.TargetSlwNodeID,
Code: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeStgAction(score.CodeScore.Isloaded)},
Dataset: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeStgAction(score.DatasetScore.Isloaded)},
Image: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeImageAction(score.DatasetScore.Isloaded)},
}
} else {
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: t.Job.TargetSlwNodeID,
Code: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Dataset: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Image: jobmod.FileScheduleScheme{Action: jobmod.ActionImportImage},
}
}
if err != nil {
return err
}
// 重新执行预调度方案,寻找最优节点
} else {
s, err := rescheduling.Schedule(t.Job)
s, err := defaultSchedule.Schedule(t.Job)
scheme = *s
if err != nil {
return err