实现预调度方案生成

This commit is contained in:
Sydonian 2023-09-22 17:23:55 +08:00
parent 507fe137e5
commit 6f34785cd6
15 changed files with 1246 additions and 550 deletions

View File

@ -20,8 +20,8 @@ type Reporter struct {
reportNow chan bool
}
func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) Reporter {
return Reporter{
func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) *Reporter {
return &Reporter{
advisorID: advisorID,
reportInterval: reportInterval,
taskStatus: make(map[string]advtsk.TaskStatus),

View File

@ -5,8 +5,12 @@ import (
"sort"
"github.com/inhies/go-bytesize"
"github.com/samber/lo"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
"gitlink.org.cn/cloudream/common/utils/math"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
@ -29,394 +33,396 @@ const (
LoadedWeight float64 = 2
)
var ErrScheduleWaiting = fmt.Errorf("no appropriate scheduling node found, please wait")
var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait")
type Scheduler interface {
Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error)
Schedule(info *jobmod.NormalJob) (*jobmod.JobScheduleScheme, error)
}
type DefaultSchedule struct {
type candidateSlwNode struct {
SlwNode uopsdk.SlwNode
IsPreScheduledNode bool // 是否是在预调度时被选中的节点
Resource resourcesDetail
Files filesDetail
}
type CandidateSlwNodeInfo struct {
slwNodeID uopsdk.SlwNodeID
resourceLevel int
resourceScore float64
nodeScore float64
scheme jobmod.JobScheduleScheme
type resourcesDetail struct {
CPU resourceDetail
GPU resourceDetail
NPU resourceDetail
MLU resourceDetail
Storage resourceDetail
Memory resourceDetail
TotalScore float64
AvgScore float64
MaxLevel int
}
func NewDefaultSchedule() *DefaultSchedule {
return &DefaultSchedule{}
}
// 备选节点排序规则:
// 1、按 resourceLevel 升序排序
// 2、若 resourceLevel 同为 1按照 nodeScore 降序排序
// 3、若 resourceLevel 同为 2 或 3按照 resourceScore 降序排序
type ByResourceLevel []CandidateSlwNodeInfo
func (a ByResourceLevel) Len() int { return len(a) }
func (a ByResourceLevel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByResourceLevel) Less(i, j int) bool {
if a[i].resourceLevel < a[j].resourceLevel {
return true
} else if a[i].resourceLevel == a[j].resourceLevel {
switch a[i].resourceLevel {
case ResourceLevel1:
return a[i].nodeScore > a[j].nodeScore
case ResourceLevel2:
return a[i].resourceScore > a[j].resourceScore
case ResourceLevel3:
return a[i].resourceScore > a[j].resourceScore
}
}
return false
}
func (s *DefaultSchedule) Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
JobQueue := make(chan jobmod.NormalJob, 100)
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
var candidateSlwNodeInfos []CandidateSlwNodeInfo
// 查询有哪些算力中心可作为备选,并为各节点划分资源等级
resp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
if err != nil {
return nil, err
}
slwNodeScores, err := s.ComputeAllSlwNodeScore(info.Files)
if err != nil {
return nil, err
}
for _, node := range resp.Nodes {
rl, rs, err := s.computeResourceScore(info.Info.Resources, node.ID)
if err != nil {
return nil, err
}
var nodeScore float64
var scheme jobmod.JobScheduleScheme
score, ok := slwNodeScores[node.ID]
if ok {
nodeScore = score.CodeScore.cachedScore +
score.CodeScore.loadedScore +
score.DatasetScore.cachedScore +
score.DatasetScore.loadedScore +
score.ImageScore.cachedScore +
score.ImageScore.loadedScore
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: node.ID,
Code: jobmod.FileScheduleScheme{Action: s.SchemeStgAction(score.CodeScore.Isloaded)},
Dataset: jobmod.FileScheduleScheme{Action: s.SchemeStgAction(score.DatasetScore.Isloaded)},
Image: jobmod.FileScheduleScheme{Action: s.SchemeImageAction(score.DatasetScore.Isloaded)},
}
} else {
nodeScore = 0
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: node.ID,
Code: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Dataset: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Image: jobmod.FileScheduleScheme{Action: jobmod.ActionImportImage},
}
}
slwNodeInfo := CandidateSlwNodeInfo{
slwNodeID: node.ID,
resourceLevel: rl,
resourceScore: rs,
nodeScore: nodeScore,
scheme: scheme,
}
candidateSlwNodeInfos = append(candidateSlwNodeInfos, slwNodeInfo)
}
sort.Sort(ByResourceLevel(candidateSlwNodeInfos))
if candidateSlwNodeInfos[0].resourceLevel == ResourceLevel3 {
//TODO 放入队列中,等待合适时机重新调度
JobQueue <- info
return nil, ErrScheduleWaiting
} else {
return &candidateSlwNodeInfos[0].scheme, nil
}
}
func (s *DefaultSchedule) SchemeStgAction(isLoaded bool) jobmod.FileScheduleAction {
if isLoaded {
return jobmod.ActionNo
}
return jobmod.ActionLoad
}
func (s *DefaultSchedule) SchemeImageAction(isLoaded bool) jobmod.FileScheduleAction {
if isLoaded {
return jobmod.ActionNo
}
return jobmod.ActionImportImage
}
type ResourceInfo struct {
Type string
type resourceDetail struct {
Level int
Score float64
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultSchedule) computeResourceScore(resources schsdk.JobResourcesInfo, slwNodeID uopsdk.SlwNodeID) (int, float64, error) {
type filesDetail struct {
Dataset fileDetail
Code fileDetail
Image fileDetail
TotalScore float64
}
type fileDetail struct {
CachingScore float64
LoadingScore float64
IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
}
type CandidateSlwNodeArr []*candidateSlwNode
func (a CandidateSlwNodeArr) Len() int { return len(a) }
func (a CandidateSlwNodeArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CandidateSlwNodeArr) Less(i, j int) bool {
n1 := a[i]
n2 := a[j]
// 如果节点是预调度中选中的节点,那么只要资源满足需求,就优先选择这个节点
if n1.IsPreScheduledNode && n1.Resource.MaxLevel < ResourceLevel3 {
return true
}
if n2.IsPreScheduledNode && n2.Resource.MaxLevel < ResourceLevel3 {
return false
}
// 优先判断资源等级,资源等级越低,代表越满足需求
if n1.Resource.MaxLevel < n2.Resource.MaxLevel {
return true
}
if n1.Resource.MaxLevel > n2.Resource.MaxLevel {
return false
}
// 等级相同时,根据单项分值比较
switch n1.Resource.MaxLevel {
case ResourceLevel1:
// 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑
return n1.Files.TotalScore > n2.Files.TotalScore
case ResourceLevel2:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
case ResourceLevel3:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
}
return false
}
type DefaultScheduler struct {
}
func NewDefaultSchedule() *DefaultScheduler {
return &DefaultScheduler{}
}
func (s *DefaultScheduler) Schedule(job *jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return 0, 0, fmt.Errorf("new collector client: %w", err)
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
res, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID))
allSlwNodes := make(map[uopsdk.SlwNodeID]*candidateSlwNode)
// 查询有哪些算力中心可用
getNodesResp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
if err != nil {
return 0, 0, err
return nil, err
}
resourceLevels := []ResourceInfo{}
for _, r := range res.Datas {
switch r.(type) {
if len(getNodesResp.Nodes) == 0 {
return nil, ErrNoAvailableScheme
}
case *uopsdk.CPUResourceData:
availCPU := r.(*uopsdk.CPUResourceData).Available
if resources.CPU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: uopsdk.ResourceTypeCPU,
Level: s.computeSingleLevel(float64(availCPU.Value), resources.CPU),
Score: (float64(availCPU.Value) / resources.CPU) * CpuResourceWeight,
})
}
case *uopsdk.NPUResourceData:
availNPU := r.(*uopsdk.NPUResourceData).Available
if resources.NPU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: uopsdk.ResourceTypeNPU,
Level: s.computeSingleLevel(float64(availNPU.Value), resources.NPU),
Score: (float64(availNPU.Value) / resources.NPU) * CpuResourceWeight,
})
}
case *uopsdk.GPUResourceData:
availGPU := r.(*uopsdk.GPUResourceData).Available
if resources.GPU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: uopsdk.ResourceTypeGPU,
Level: s.computeSingleLevel(float64(availGPU.Value), resources.GPU),
Score: (float64(availGPU.Value) / resources.GPU) * CpuResourceWeight,
})
}
case *uopsdk.MLUResourceData:
availMLU := r.(*uopsdk.MLUResourceData).Available
if resources.MLU > 0 {
resourceLevels = append(resourceLevels, ResourceInfo{
Type: uopsdk.ResourceTypeMLU,
Level: s.computeSingleLevel(float64(availMLU.Value), resources.MLU),
Score: (float64(availMLU.Value) / resources.MLU) * CpuResourceWeight,
})
}
case *uopsdk.StorageResourceData:
availStorage := r.(*uopsdk.StorageResourceData).Available
if resources.Storage > 0 {
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return 0, 0, err
}
resourceLevels = append(resourceLevels, ResourceInfo{
Type: uopsdk.ResourceTypeStorage,
Level: s.computeSingleLevel(float64(bytesStorage), float64(resources.Storage)),
Score: (float64(bytesStorage) / float64(resources.Storage)) * StgResourceWeight,
})
}
case *uopsdk.MemoryResourceData:
availMemory := r.(*uopsdk.MemoryResourceData).Available
if resources.Memory > 0 {
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return 0, 0, err
}
resourceLevels = append(resourceLevels, ResourceInfo{
Type: uopsdk.ResourceTypeMemory,
Level: s.computeSingleLevel(float64(bytesMemory), float64(resources.Memory)),
Score: (float64(bytesMemory) / float64(resources.Memory)) * StgResourceWeight,
})
}
default:
fmt.Println("Unknown Resource Type")
for _, slwNode := range getNodesResp.Nodes {
allSlwNodes[slwNode.ID] = &candidateSlwNode{
SlwNode: slwNode,
IsPreScheduledNode: slwNode.ID == job.TargetSlwNodeID,
}
}
// 计算资源等级
sort.Slice(resourceLevels, func(i, j int) bool {
return resourceLevels[i].Level > resourceLevels[j].Level
})
resourceLevel := resourceLevels[0].Level
// 计算
err = s.calcFileScore(job.Files, allSlwNodes)
if err != nil {
return nil, err
}
err = s.calcResourceScore(job, allSlwNodes)
if err != nil {
return nil, err
}
allSlwNodesArr := lo.Values(allSlwNodes)
sort.Sort(CandidateSlwNodeArr(allSlwNodesArr))
targetNode := allSlwNodesArr[0]
if targetNode.Resource.MaxLevel == ResourceLevel3 {
return nil, ErrNoAvailableScheme
}
scheme := s.makeSchemeForNode(targetNode)
return &scheme, nil
}
func (s *DefaultScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) jobmod.JobScheduleScheme {
scheme := jobmod.JobScheduleScheme{
TargetSlwNodeID: targetSlwNode.SlwNode.ID,
}
if !targetSlwNode.Files.Dataset.IsLoaded {
scheme.Dataset.Action = jobmod.ActionLoad
}
if !targetSlwNode.Files.Code.IsLoaded {
scheme.Dataset.Action = jobmod.ActionLoad
}
if !targetSlwNode.Files.Image.IsLoaded {
scheme.Dataset.Action = jobmod.ActionImportImage
}
return scheme
}
func (s *DefaultScheduler) calcResourceScore(job *jobmod.NormalJob, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
for _, slwNode := range allSlwNodes {
res, err := s.calcOneResourceScore(job.Info.Resources, slwNode.SlwNode.ID)
if err != nil {
return err
}
slwNode.Resource = *res
}
return nil
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID uopsdk.SlwNodeID) (*resourcesDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID))
if err != nil {
return nil, err
}
var resDetail resourcesDetail
//计算资源得分
totalScore := 0.0
resourceScore := 0.0
if len(resourceLevels) > 0 {
for _, resource := range resourceLevels {
totalScore += resource.Score
maxLevel := 0
resKinds := 0
if requires.CPU > 0 {
res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.CPU.Level = ResourceLevel3
resDetail.CPU.Score = 0
} else {
resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU)
resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight
}
resourceScore = totalScore / float64(len(resourceLevels))
maxLevel = math.Max(maxLevel, resDetail.CPU.Level)
totalScore += resDetail.CPU.Score
resKinds++
}
return resourceLevel, resourceScore, nil
if requires.GPU > 0 {
res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.GPU.Level = ResourceLevel3
resDetail.GPU.Score = 0
} else {
resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU)
resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.GPU.Level)
totalScore += resDetail.GPU.Score
resKinds++
}
if requires.NPU > 0 {
res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.NPU.Level = ResourceLevel3
resDetail.NPU.Score = 0
} else {
resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU)
resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.NPU.Level)
totalScore += resDetail.NPU.Score
resKinds++
}
if requires.MLU > 0 {
res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.MLU.Level = ResourceLevel3
resDetail.MLU.Score = 0
} else {
resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU)
resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.MLU.Level)
totalScore += resDetail.MLU.Score
resKinds++
}
if requires.Storage > 0 {
res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Storage.Level = ResourceLevel3
resDetail.Storage.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}
resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage))
resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.Storage.Level)
totalScore += resDetail.Storage.Score
resKinds++
}
if requires.Memory > 0 {
res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Memory.Level = ResourceLevel3
resDetail.Memory.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}
resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory))
resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.Memory.Level)
totalScore += resDetail.Memory.Score
resKinds++
}
if resKinds == 0 {
return &resDetail, nil
}
resDetail.TotalScore = totalScore
resDetail.AvgScore = resDetail.AvgScore / float64(resKinds)
resDetail.MaxLevel = maxLevel
return &resDetail, nil
}
func (s *DefaultSchedule) computeSingleLevel(avail float64, need float64) int {
if avail >= 1.5*need {
func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int {
if avai >= 1.5*need {
return ResourceLevel1
} else if avail >= need {
return ResourceLevel2
} else {
return ResourceLevel3
}
}
type SlwNodeScore struct {
CodeScore stgScore
DatasetScore stgScore
ImageScore stgScore
}
if avai >= need {
return ResourceLevel2
}
type stgScore struct {
cachedScore float64
loadedScore float64
Isloaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
return ResourceLevel3
}
// 计算节点得分情况
func (s *DefaultSchedule) ComputeAllSlwNodeScore(files jobmod.JobFiles) (map[uopsdk.SlwNodeID]SlwNodeScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
// 只计算运控返回的计算中心上的存储服务的数据权重
stgNodeToSlwNode := make(map[int64]*candidateSlwNode)
for _, slwNode := range allSlwNodes {
stgNodeToSlwNode[slwNode.SlwNode.StgNodeID] = slwNode
}
defer colCli.Close()
slwNodeScores := make(map[uopsdk.SlwNodeID]SlwNodeScore)
//计算code相关得分
codeStgScores, err := s.computeAllStgScore(files.Code.PackageID)
codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, stgNodeToSlwNode)
if err != nil {
return nil, err
return fmt.Errorf("calc code file score: %w", err)
}
for id, code := range codeStgScores {
slwNodeScores[id] = SlwNodeScore{
CodeScore: code,
}
for id, score := range codeFileScores {
allSlwNodes[id].Files.Code = *score
}
//计算dataset相关得分
datasetStgScores, err := s.computeAllStgScore(files.Dataset.PackageID)
datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, stgNodeToSlwNode)
if err != nil {
return nil, err
return fmt.Errorf("calc dataset file score: %w", err)
}
for id, dataset := range datasetStgScores {
if exist, ok := slwNodeScores[id]; ok {
exist.DatasetScore = dataset
slwNodeScores[id] = exist
} else {
slwNodeScores[id] = SlwNodeScore{
DatasetScore: dataset,
}
}
for id, score := range datasetFileScores {
allSlwNodes[id].Files.Dataset = *score
}
//计算image相关得分
magCli, err := globals.ManagerMQPool.Acquire()
imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allSlwNodes, stgNodeToSlwNode)
if err != nil {
return nil, fmt.Errorf("new manager client: %w", err)
return fmt.Errorf("calc image file score: %w", err)
}
defer magCli.Close()
// TODO 内部ImageID和算力中心ImageID对应关系为一对多
res, err := magCli.GetImageInfo(manager.NewGetImageInfo(files.Image.ImageID))
if err != nil {
return nil, err
for id, score := range imageFileScores {
allSlwNodes[id].Files.Image = *score
}
imageStgScores, err := s.computeAllStgScore(files.Image.PackageID)
if err != nil {
return nil, err
}
for id, image := range imageStgScores {
// TODO 此处id错误根据算力中心id做判断待修改
var isLoaded bool
for _, info := range res.ImportingInfos {
if id == info.SlwNodeID {
imageResp, err := colCli.GetImageList(collector.NewGetImageList(id))
if err != nil {
return nil, err
}
for _, imageID := range imageResp.ImageIDs {
if imageID == info.SlwNodeImageID {
isLoaded = true
break
}
}
}
}
if exist, ok := slwNodeScores[id]; ok {
exist.ImageScore = stgScore{
cachedScore: image.cachedScore,
loadedScore: image.loadedScore,
Isloaded: isLoaded,
}
slwNodeScores[id] = exist
} else {
slwNodeScores[id] = SlwNodeScore{
ImageScore: stgScore{
cachedScore: image.cachedScore,
loadedScore: image.loadedScore,
Isloaded: isLoaded,
},
}
}
for _, slwNode := range allSlwNodes {
slwNode.Files.TotalScore = slwNode.Files.Code.CachingScore +
slwNode.Files.Code.LoadingScore +
slwNode.Files.Dataset.CachingScore +
slwNode.Files.Dataset.LoadingScore +
slwNode.Files.Image.CachingScore +
slwNode.Files.Image.LoadingScore
}
return slwNodeScores, nil
return nil
}
// 计算package在各节点的得分情况
func (s *DefaultSchedule) computeAllStgScore(packageID int64) (map[int64]stgScore, error) {
colCli, err := globals.CollectorMQPool.Acquire()
func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
stgScores := make(map[int64]stgScore)
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
if err != nil {
return nil, err
}
for _, nodeInfo := range cachedResp.NodeInfos {
stgScores[nodeInfo.NodeID] = stgScore{
for _, stgNodeCacheInfo := range cachedResp.NodeInfos {
slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID]
if !ok {
continue
}
slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
cachedScore: float64(nodeInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
@ -425,18 +431,90 @@ func (s *DefaultSchedule) computeAllStgScore(packageID int64) (map[int64]stgScor
return nil, err
}
for _, nodeID := range loadedResp.StgNodeIDs {
if exist, ok := stgScores[nodeID]; ok {
exist.loadedScore = 1 * LoadedWeight
exist.Isloaded = true
stgScores[nodeID] = exist
} else {
stgScores[nodeID] = stgScore{
loadedScore: 1 * LoadedWeight,
Isloaded: true,
}
for _, stgNodeID := range loadedResp.StgNodeIDs {
slwNode, ok := stgNodeToSlwNode[stgNodeID]
if !ok {
continue
}
fsc, ok := slwNodeFileScores[slwNode.SlwNode.ID]
if !ok {
fsc = &fileDetail{}
slwNodeFileScores[slwNode.SlwNode.ID] = fsc
}
fsc.LoadingScore = 1 * LoadedWeight
fsc.IsLoaded = true
}
return slwNodeFileScores, nil
}
// 计算package在各节点的得分情况
func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
magCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
imageInfoResp, err := magCli.GetImageInfo(manager.NewGetImageInfo(imageID))
if err != nil {
return nil, fmt.Errorf("getting image info: %w", err)
}
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, imageInfoResp.PackageID))
if err != nil {
return nil, err
}
for _, stgNodeCacheInfo := range cachedResp.NodeInfos {
slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID]
if !ok {
continue
}
slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
return stgScores, nil
// 镜像的LoadingScore是判断是否导入到算力中心
for _, importing := range imageInfoResp.ImportingInfos {
_, ok := allSlwNodes[importing.SlwNodeID]
if !ok {
continue
}
fsc, ok := slwNodeFileScores[importing.SlwNodeID]
if !ok {
fsc = &fileDetail{}
slwNodeFileScores[importing.SlwNodeID] = fsc
}
fsc.LoadingScore = 1 * LoadedWeight
fsc.IsLoaded = true
}
return slwNodeFileScores, nil
}
func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T {
for _, data := range all {
if ret, ok := data.(T); ok {
return ret
}
}
var def T
return def
}

View File

@ -0,0 +1,85 @@
package scheduler
import (
"context"
"sync"
"time"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type schedulingJob struct {
Job jobmod.NormalJob
Callback *future.SetValueFuture[*jobmod.JobScheduleScheme]
}
type Service struct {
scheduler Scheduler
jobs []*schedulingJob
lock sync.Mutex
hasNewJob chan bool
}
func NewService(scheduler Scheduler) *Service {
return &Service{
scheduler: scheduler,
hasNewJob: make(chan bool),
}
}
func (s *Service) MakeScheme(job jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
s.lock.Lock()
callback := future.NewSetValue[*jobmod.JobScheduleScheme]()
s.jobs = append(s.jobs, &schedulingJob{
Job: job,
Callback: callback,
})
s.lock.Unlock()
select {
case s.hasNewJob <- true:
default:
}
return callback.WaitValue(context.Background())
}
func (s *Service) Serve() error {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.tryMakeScheme()
case <-s.hasNewJob:
s.tryMakeScheme()
}
}
}
func (s *Service) tryMakeScheme() {
s.lock.Lock()
defer s.lock.Unlock()
for i, job := range s.jobs {
scheme, err := s.scheduler.Schedule(&job.Job)
if err == nil {
job.Callback.SetValue(scheme)
s.jobs[i] = nil
continue
}
if err == ErrNoAvailableScheme {
continue
}
job.Callback.SetError(err)
s.jobs[i] = nil
}
s.jobs = lo.Reject(s.jobs, func(item *schedulingJob, idx int) bool { return item == nil })
}

View File

@ -1,19 +1,13 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
"github.com/inhies/go-bytesize"
)
type MakeScheduleScheme struct {
@ -33,11 +27,9 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
scheme, err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus(err.Error(), jobmod.JobScheduleScheme{}))
} else {
// 将调度方案上报给manager
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus("", scheme))
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus("", *scheme))
}
ctx.reporter.ReportNow()
@ -46,184 +38,15 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
})
}
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) (jobmod.JobScheduleScheme, error) {
var scheme jobmod.JobScheduleScheme
isAvailable, err := t.CheckResourceAvailability()
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) (*jobmod.JobScheduleScheme, error) {
scheme, err := ctx.scheduleSvc.MakeScheme(t.Job)
if err != nil {
return scheme, err
}
var defaultSchedule scheduler.DefaultSchedule
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心并生成调度方案
resp, err := defaultSchedule.ComputeAllSlwNodeScore(t.Job.Files)
score, ok := resp[t.Job.TargetSlwNodeID]
if ok {
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: t.Job.TargetSlwNodeID,
Code: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeStgAction(score.CodeScore.Isloaded)},
Dataset: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeStgAction(score.DatasetScore.Isloaded)},
Image: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeImageAction(score.DatasetScore.Isloaded)},
}
} else {
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: t.Job.TargetSlwNodeID,
Code: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Dataset: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Image: jobmod.FileScheduleScheme{Action: jobmod.ActionImportImage},
}
}
if err != nil {
return scheme, err
}
// 重新执行预调度方案,寻找最优节点
} else {
s, err := defaultSchedule.Schedule(t.Job)
scheme = *s
if err != nil {
return scheme, err
}
return nil, err
}
return scheme, nil
}
// 检查预调度节点资源是否足够
func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return false, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
neededCPU := t.Job.Info.Resources.CPU
if neededCPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeCPU,
))
if err != nil {
return false, err
}
availCPU := resp.Data.(*uopsdk.CPUResourceData).Available
if float64(availCPU.Value) < 1.5*neededCPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient CPU resources, want: %f, available: %d%s", 1.5*neededCPU, availCPU.Value, availCPU.Unit)
return false, nil
}
}
neededNPU := t.Job.Info.Resources.NPU
if neededNPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeNPU,
))
if err != nil {
return false, err
}
availNPU := resp.Data.(*uopsdk.NPUResourceData).Available
if float64(availNPU.Value) < 1.5*neededNPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient NPU resources, want: %f, available: %d%s", 1.5*neededNPU, availNPU.Value, availNPU.Unit)
return false, nil
}
}
neededGPU := t.Job.Info.Resources.GPU
if neededGPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeGPU,
))
if err != nil {
return false, err
}
availGPU := resp.Data.(*uopsdk.GPUResourceData).Available
if float64(availGPU.Value) < 1.5*neededGPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient GPU resources, want: %f, available: %d%s", 1.5*neededGPU, availGPU.Value, availGPU.Unit)
return false, nil
}
}
neededMLU := t.Job.Info.Resources.MLU
if neededMLU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeMLU,
))
if err != nil {
return false, err
}
availMLU := resp.Data.(*uopsdk.MLUResourceData).Available
if float64(availMLU.Value) < 1.5*neededMLU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient MLU resources, want: %f, available: %d%s", 1.5*neededMLU, availMLU.Value, availMLU.Unit)
return false, nil
}
}
neededStorage := t.Job.Info.Resources.Storage
if neededStorage > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeStorage,
))
if err != nil {
return false, err
}
availStorage := resp.Data.(*uopsdk.StorageResourceData).Available
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return false, err
}
if int64(bytesStorage) < int64(1.5*float64(neededStorage)) {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient storage resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededStorage)), availStorage.Value, availStorage.Unit)
return false, nil
}
}
neededMemory := t.Job.Info.Resources.Memory
if neededMemory > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeMemory,
))
if err != nil {
return false, err
}
availMemory := resp.Data.(*uopsdk.MemoryResourceData).Available
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return false, err
}
if int64(bytesMemory) < int64(1.5*float64(neededMemory)) {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient memory resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededMemory)), availMemory.Value, availMemory.Unit)
return false, nil
}
}
return true, nil
}
func init() {
Register(NewMakeScheduleScheme)
}

View File

@ -7,11 +7,13 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/task"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
reporter "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
)
type TaskContext struct {
reporter *reporter.Reporter
reporter *reporter.Reporter
scheduleSvc *scheduler.Service
}
// 需要在Task结束后主动调用completing函数将在Manager加锁期间被调用
@ -28,10 +30,11 @@ type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager(reporter *reporter.Reporter) Manager {
func NewManager(reporter *reporter.Reporter, scheduleSvc *scheduler.Service) Manager {
return Manager{
Manager: task.NewManager(TaskContext{
reporter: reporter,
reporter: reporter,
scheduleSvc: scheduleSvc,
}),
}
}

View File

@ -9,6 +9,7 @@ import (
"gitlink.org.cn/cloudream/scheduler/advisor/internal/config"
myglbs "gitlink.org.cn/cloudream/scheduler/advisor/internal/globals"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/services"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/task"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
@ -31,7 +32,9 @@ func main() {
rpter := reporter.NewReporter(myglbs.AdvisorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec))
taskMgr := task.NewManager(&rpter)
schSvc := scheduler.NewService(scheduler.NewDefaultSchedule())
taskMgr := task.NewManager(rpter, schSvc)
mqSvr, err := advmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
if err != nil {
@ -45,7 +48,9 @@ func main() {
// 启动服务
go serveMQServer(mqSvr)
go serveReporter(&rpter)
go serveReporter(rpter)
go serveScheduleService(schSvc)
forever := make(chan bool)
<-forever
@ -72,3 +77,14 @@ func serveReporter(rpt *reporter.Reporter) {
logger.Info("rpt stopped")
}
func serveScheduleService(svc *scheduler.Service) {
logger.Info("start serving scheduler service")
err := svc.Serve()
if err != nil {
logger.Errorf("scheduler service with error: %s", err.Error())
}
logger.Info("scheduler service stopped")
}

View File

@ -0,0 +1,700 @@
package prescheduler
import (
"fmt"
"sort"
"github.com/inhies/go-bytesize"
"github.com/samber/lo"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
"gitlink.org.cn/cloudream/common/utils/math"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
const (
//每个节点划分的资源等级:
// ResourceLevel1表示所有资源类型均满足 大于等于1.5倍
ResourceLevel1 = 1
// ResourceLevel2表示不满足Level1但所有资源类型均满足 大于等于1倍
ResourceLevel2 = 2
// ResourceLevel3 表示某些资源类型 小于一倍
ResourceLevel3 = 3
CpuResourceWeight float64 = 1
StgResourceWeight float64 = 1.2
CachingWeight float64 = 1
LoadedWeight float64 = 2
)
var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait")
type candidateSlwNode struct {
SlwNode uopsdk.SlwNode
IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点
Resource resourcesDetail
Files filesDetail
}
type resourcesDetail struct {
CPU resourceDetail
GPU resourceDetail
NPU resourceDetail
MLU resourceDetail
Storage resourceDetail
Memory resourceDetail
TotalScore float64
AvgScore float64
MaxLevel int
}
type resourceDetail struct {
Level int
Score float64
}
type filesDetail struct {
Dataset fileDetail
Code fileDetail
Image fileDetail
TotalScore float64
}
type fileDetail struct {
CachingScore float64
LoadingScore float64
IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心
}
type schedulingJob struct {
Job schsdk.JobInfo
Afters []string
}
type CandidateSlwNodeArr []*candidateSlwNode
func (a CandidateSlwNodeArr) Len() int { return len(a) }
func (a CandidateSlwNodeArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a CandidateSlwNodeArr) Less(i, j int) bool {
n1 := a[i]
n2 := a[j]
// 优先与所依赖的任务放到一起,但要求那个节点的资源足够
if n1.IsReferencedJobTarget && n1.Resource.MaxLevel < ResourceLevel3 {
return true
}
if n2.IsReferencedJobTarget && n2.Resource.MaxLevel < ResourceLevel3 {
return true
}
// 优先判断资源等级,资源等级越低,代表越满足需求
if n1.Resource.MaxLevel < n2.Resource.MaxLevel {
return true
}
if n1.Resource.MaxLevel > n2.Resource.MaxLevel {
return false
}
// 等级相同时,根据单项分值比较
switch n1.Resource.MaxLevel {
case ResourceLevel1:
// 数据文件总分越高,代表此节点上拥有的数据文件越完整,则越优先考虑
return n1.Files.TotalScore > n2.Files.TotalScore
case ResourceLevel2:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
case ResourceLevel3:
// 资源分的平均值越高,代表资源越空余,则越优先考虑
return n1.Resource.AvgScore > n2.Resource.AvgScore
}
return false
}
type DefaultPreScheduler struct {
}
func NewDefaultPreScheduler() *DefaultPreScheduler {
return &DefaultPreScheduler{}
}
func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) {
jobSetScheme := &jobmod.JobSetPreScheduleScheme{
JobSchemes: make(map[string]jobmod.JobScheduleScheme),
}
filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme)
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
// 查询有哪些算力中心可用
getNodesResp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
if err != nil {
return nil, nil, fmt.Errorf("getting all slw node info: %w", err)
}
slwNodes := make(map[uopsdk.SlwNodeID]uopsdk.SlwNode)
for _, node := range getNodesResp.Nodes {
slwNodes[node.ID] = node
}
// 先根据任务配置收集它们依赖的任务的LocalID
var schJobs []schedulingJob
for _, job := range info.Jobs {
j := schedulingJob{
Job: job,
}
if norJob, ok := job.(*schsdk.NormalJobInfo); ok {
if resFile, ok := norJob.Files.Dataset.(*schsdk.ResourceJobFileInfo); ok {
j.Afters = append(j.Afters, resFile.ResourceLocalJobID)
}
if resFile, ok := norJob.Files.Code.(*schsdk.ResourceJobFileInfo); ok {
j.Afters = append(j.Afters, resFile.ResourceLocalJobID)
}
} else if resJob, ok := job.(*schsdk.ResourceJobInfo); ok {
j.Afters = append(j.Afters, resJob.TargetLocalJobID)
}
schJobs = append(schJobs, j)
}
// 然后根据引用进行排序
schJobs, ok := s.orderByReference(schJobs)
if !ok {
return nil, nil, fmt.Errorf("circular reference detected between jobs in the job set")
}
// 经过排序后,按顺序生成调度方案
for _, job := range schJobs {
if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok {
scheme, err := s.scheduleForNormalJob(info, &job, slwNodes, jobSetScheme.JobSchemes)
if err != nil {
return nil, nil, err
}
jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme
// 检查数据文件的配置项,生成上传文件方案
s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetSlwNodeID, filesUploadSchemes, slwNodes)
}
// 回源任务目前不需要生成调度方案
}
return jobSetScheme, &schsdk.JobSetFilesUploadScheme{
LocalFileSchemes: lo.Values(filesUploadSchemes),
}, nil
}
func (s *DefaultPreScheduler) orderByReference(jobs []schedulingJob) ([]schedulingJob, bool) {
type jobOrder struct {
Job schedulingJob
Afters []string
}
var jobOrders []jobOrder
for _, job := range jobs {
od := jobOrder{
Job: job,
}
copy(od.Afters, job.Afters)
jobOrders = append(jobOrders, od)
}
// 然后排序
var orderedJob []schedulingJob
for {
rm := 0
for i, jo := range jobOrders {
// 找到没有依赖的任务,然后将其取出
if len(jo.Afters) == 0 {
orderedJob = append(orderedJob, jo.Job)
// 删除其他任务对它的引用
for _, job2 := range jobOrders {
job2.Afters = lo.Reject(job2.Afters, func(item string, idx int) bool { return item == jo.Job.Job.GetLocalJobID() })
}
rm++
}
jobOrders[i-rm] = jobOrders[i]
}
jobOrders = jobOrders[:len(jobOrders)-rm]
if len(jobOrders) == 0 {
break
}
// 遍历一轮后没有找到无依赖的任务,那么就是存在循环引用,排序失败
if rm == 0 {
return nil, false
}
}
return orderedJob, true
}
func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, slwNodes map[uopsdk.SlwNodeID]uopsdk.SlwNode, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) {
allSlwNodes := make(map[uopsdk.SlwNodeID]*candidateSlwNode)
// 初始化备选节点信息
for _, slwNode := range slwNodes {
caNode := &candidateSlwNode{
SlwNode: slwNode,
}
// 检查此节点是否是它所引用的任务所选的节点
for _, af := range job.Afters {
resJob := findJobInfo[*schsdk.ResourceJobInfo](jobSet.Jobs, af)
if resJob == nil {
return nil, fmt.Errorf("resource job %s not found in the job set", af)
}
// 由于jobs已经按照引用排序所以正常情况下这里肯定能取到值
scheme, ok := jobSchemes[resJob.TargetLocalJobID]
if !ok {
continue
}
if scheme.TargetSlwNodeID == slwNode.ID {
caNode.IsReferencedJobTarget = true
break
}
}
allSlwNodes[slwNode.ID] = caNode
}
norJob := job.Job.(*schsdk.NormalJobInfo)
// 计算文件占有量得分
err := s.calcFileScore(norJob.Files, allSlwNodes)
if err != nil {
return nil, err
}
// 计算资源余量得分
err = s.calcResourceScore(norJob, allSlwNodes)
if err != nil {
return nil, err
}
allSlwNodesArr := lo.Values(allSlwNodes)
sort.Sort(CandidateSlwNodeArr(allSlwNodesArr))
targetNode := allSlwNodesArr[0]
if targetNode.Resource.MaxLevel == ResourceLevel3 {
return nil, ErrNoAvailableScheme
}
scheme := s.makeSchemeForNode(targetNode)
return &scheme, nil
}
func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetSlwNodeID uopsdk.SlwNodeID, schemes map[string]schsdk.LocalFileUploadScheme, slwNodes map[uopsdk.SlwNodeID]uopsdk.SlwNode) {
if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
stgNodeID := slwNodes[targetSlwNodeID].StgNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToStgNodeID: &stgNodeID,
}
}
}
if localFile, ok := norJob.Files.Code.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
stgNodeID := slwNodes[targetSlwNodeID].StgNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToStgNodeID: &stgNodeID,
}
}
}
if localFile, ok := norJob.Files.Image.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
stgNodeID := slwNodes[targetSlwNodeID].StgNodeID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
LocalPath: localFile.LocalPath,
UploadToStgNodeID: &stgNodeID,
}
}
}
}
func (s *DefaultPreScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) jobmod.JobScheduleScheme {
scheme := jobmod.JobScheduleScheme{
TargetSlwNodeID: targetSlwNode.SlwNode.ID,
}
if !targetSlwNode.Files.Dataset.IsLoaded {
scheme.Dataset.Action = jobmod.ActionLoad
}
if !targetSlwNode.Files.Code.IsLoaded {
scheme.Dataset.Action = jobmod.ActionLoad
}
if !targetSlwNode.Files.Image.IsLoaded {
scheme.Dataset.Action = jobmod.ActionImportImage
}
return scheme
}
func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
for _, slwNode := range allSlwNodes {
res, err := s.calcOneResourceScore(job.Resources, slwNode.SlwNode.ID)
if err != nil {
return err
}
slwNode.Resource = *res
}
return nil
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID uopsdk.SlwNodeID) (*resourcesDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID))
if err != nil {
return nil, err
}
var resDetail resourcesDetail
//计算资源得分
totalScore := 0.0
maxLevel := 0
resKinds := 0
if requires.CPU > 0 {
res := findResuorce[*uopsdk.CPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.CPU.Level = ResourceLevel3
resDetail.CPU.Score = 0
} else {
resDetail.CPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.CPU)
resDetail.CPU.Score = (float64(res.Available.Value) / requires.CPU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.CPU.Level)
totalScore += resDetail.CPU.Score
resKinds++
}
if requires.GPU > 0 {
res := findResuorce[*uopsdk.GPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.GPU.Level = ResourceLevel3
resDetail.GPU.Score = 0
} else {
resDetail.GPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.GPU)
resDetail.GPU.Score = (float64(res.Available.Value) / requires.GPU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.GPU.Level)
totalScore += resDetail.GPU.Score
resKinds++
}
if requires.NPU > 0 {
res := findResuorce[*uopsdk.NPUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.NPU.Level = ResourceLevel3
resDetail.NPU.Score = 0
} else {
resDetail.NPU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.NPU)
resDetail.NPU.Score = (float64(res.Available.Value) / requires.NPU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.NPU.Level)
totalScore += resDetail.NPU.Score
resKinds++
}
if requires.MLU > 0 {
res := findResuorce[*uopsdk.MLUResourceData](getResDataResp.Datas)
if res == nil {
resDetail.MLU.Level = ResourceLevel3
resDetail.MLU.Score = 0
} else {
resDetail.MLU.Level = s.calcResourceLevel(float64(res.Available.Value), requires.MLU)
resDetail.MLU.Score = (float64(res.Available.Value) / requires.MLU) * CpuResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.MLU.Level)
totalScore += resDetail.MLU.Score
resKinds++
}
if requires.Storage > 0 {
res := findResuorce[*uopsdk.StorageResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Storage.Level = ResourceLevel3
resDetail.Storage.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}
resDetail.Storage.Level = s.calcResourceLevel(float64(bytes), float64(requires.Storage))
resDetail.Storage.Score = (float64(bytes) / float64(requires.Storage)) * StgResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.Storage.Level)
totalScore += resDetail.Storage.Score
resKinds++
}
if requires.Memory > 0 {
res := findResuorce[*uopsdk.MemoryResourceData](getResDataResp.Datas)
if res == nil {
resDetail.Memory.Level = ResourceLevel3
resDetail.Memory.Score = 0
} else {
bytes, err := bytesize.Parse(fmt.Sprintf("%f%s", res.Available.Value, res.Available.Unit))
if err != nil {
return nil, err
}
resDetail.Memory.Level = s.calcResourceLevel(float64(bytes), float64(requires.Memory))
resDetail.Memory.Score = (float64(bytes) / float64(requires.Memory)) * StgResourceWeight
}
maxLevel = math.Max(maxLevel, resDetail.Memory.Level)
totalScore += resDetail.Memory.Score
resKinds++
}
if resKinds == 0 {
return &resDetail, nil
}
resDetail.TotalScore = totalScore
resDetail.AvgScore = resDetail.AvgScore / float64(resKinds)
resDetail.MaxLevel = maxLevel
return &resDetail, nil
}
func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int {
if avai >= 1.5*need {
return ResourceLevel1
}
if avai >= need {
return ResourceLevel2
}
return ResourceLevel3
}
// 计算节点得分情况
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
// 只计算运控返回的计算中心上的存储服务的数据权重
stgNodeToSlwNode := make(map[int64]*candidateSlwNode)
for _, slwNode := range allSlwNodes {
stgNodeToSlwNode[slwNode.SlwNode.StgNodeID] = slwNode
}
//计算code相关得分
if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok {
codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, stgNodeToSlwNode)
if err != nil {
return fmt.Errorf("calc code file score: %w", err)
}
for id, score := range codeFileScores {
allSlwNodes[id].Files.Code = *score
}
}
//计算dataset相关得分
if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok {
datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, stgNodeToSlwNode)
if err != nil {
return fmt.Errorf("calc dataset file score: %w", err)
}
for id, score := range datasetFileScores {
allSlwNodes[id].Files.Dataset = *score
}
}
//计算image相关得分
if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok {
//计算image相关得分
imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allSlwNodes, stgNodeToSlwNode)
if err != nil {
return fmt.Errorf("calc image file score: %w", err)
}
for id, score := range imageFileScores {
allSlwNodes[id].Files.Image = *score
}
}
for _, slwNode := range allSlwNodes {
slwNode.Files.TotalScore = slwNode.Files.Code.CachingScore +
slwNode.Files.Code.LoadingScore +
slwNode.Files.Dataset.CachingScore +
slwNode.Files.Dataset.LoadingScore +
slwNode.Files.Image.CachingScore +
slwNode.Files.Image.LoadingScore
}
return nil
}
// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
if err != nil {
return nil, err
}
for _, stgNodeCacheInfo := range cachedResp.NodeInfos {
slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID]
if !ok {
continue
}
slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(0, packageID))
if err != nil {
return nil, err
}
for _, stgNodeID := range loadedResp.StgNodeIDs {
slwNode, ok := stgNodeToSlwNode[stgNodeID]
if !ok {
continue
}
fsc, ok := slwNodeFileScores[slwNode.SlwNode.ID]
if !ok {
fsc = &fileDetail{}
slwNodeFileScores[slwNode.SlwNode.ID] = fsc
}
fsc.LoadingScore = 1 * LoadedWeight
fsc.IsLoaded = true
}
return slwNodeFileScores, nil
}
// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
magCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
imageInfoResp, err := magCli.GetImageInfo(manager.NewGetImageInfo(imageID))
if err != nil {
return nil, fmt.Errorf("getting image info: %w", err)
}
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, imageInfoResp.PackageID))
if err != nil {
return nil, err
}
for _, stgNodeCacheInfo := range cachedResp.NodeInfos {
slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID]
if !ok {
continue
}
slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{
//TODO 根据缓存方式不同,可能会有不同的计算方式
CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight,
}
}
// 镜像的LoadingScore是判断是否导入到算力中心
for _, importing := range imageInfoResp.ImportingInfos {
_, ok := allSlwNodes[importing.SlwNodeID]
if !ok {
continue
}
fsc, ok := slwNodeFileScores[importing.SlwNodeID]
if !ok {
fsc = &fileDetail{}
slwNodeFileScores[importing.SlwNodeID] = fsc
}
fsc.LoadingScore = 1 * LoadedWeight
fsc.IsLoaded = true
}
return slwNodeFileScores, nil
}
func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T {
for _, data := range all {
if ret, ok := data.(T); ok {
return ret
}
}
var def T
return def
}
func findJobInfo[T schsdk.JobInfo](jobs []schsdk.JobInfo, localJobID string) T {
for _, job := range jobs {
if ret, ok := job.(T); ok && job.GetLocalJobID() == localJobID {
return ret
}
}
var def T
return def
}

View File

@ -6,16 +6,5 @@ import (
)
type PreScheduler interface {
Schedule(info schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error)
}
type DefaultPreScheduler struct {
}
func NewDefaultPreScheduler() *DefaultPreScheduler {
return &DefaultPreScheduler{}
}
func (s *DefaultPreScheduler) Schedule(info schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error) {
Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *schsdk.JobSetFilesUploadScheme, error)
}

View File

@ -24,7 +24,7 @@ func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schs
}
defer mgrCli.Close()
schScheme, uploadScheme, err := svc.preScheduler.Schedule(info)
schScheme, uploadScheme, err := svc.preScheduler.Schedule(&info)
if err != nil {
return "", nil, fmt.Errorf("")
}

View File

@ -18,7 +18,7 @@ func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.Ge
defer uniOpsCli.Close()
var resp uopsdk.ResourceData
switch msg.ResourceType {
switch msg.Type {
case uopsdk.ResourceTypeCPU:
resp, err = uniOpsCli.GetCPUData(uopsdk.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,

View File

@ -3,7 +3,6 @@ package jobmod
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type NormalJob struct {
@ -41,5 +40,5 @@ type PackageJobFile struct {
type ImageJobFile struct {
PackageID int64 `json:"packageID"`
ImageID schmod.ImageID `json:"imageID"`
ImageID schsdk.ImageID `json:"imageID"`
}

View File

@ -1,15 +1,16 @@
package schmod
import uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
type ExecutorID string
type AdvisorID string
type ImageID string
type ImageInfo struct {
ImageID ImageID `json:"imageID"`
ImageID schsdk.ImageID `json:"imageID"`
PackageID int64 `json:"packageID"` // 镜像文件
ImportingInfos []ImageImportingInfo `json:"importingInfos"` // 此镜像导入到了哪些节点
}

View File

@ -16,18 +16,18 @@ var _ = Register(Service.GetOneResourceData)
type GetOneResourceData struct {
mq.MessageBodyBase
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
ResourceType string `json:"type"`
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
Type uopsdk.ResourceType `json:"type"`
}
type GetOneResourceDataResp struct {
mq.MessageBodyBase
Data uopsdk.ResourceData `json:"data"`
}
func NewGetOneResourceData(nodeID uopsdk.SlwNodeID, resourceType string) *GetOneResourceData {
func NewGetOneResourceData(nodeID uopsdk.SlwNodeID, typ uopsdk.ResourceType) *GetOneResourceData {
return &GetOneResourceData{
SlwNodeID: nodeID,
ResourceType: resourceType,
SlwNodeID: nodeID,
Type: typ,
}
}
func NewGetOneResourceDataResp(data uopsdk.ResourceData) *GetOneResourceDataResp {

View File

@ -2,6 +2,7 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
@ -46,19 +47,19 @@ var _ = Register(Service.GetImageInfo)
type GetImageInfo struct {
mq.MessageBodyBase
ImageID schmod.ImageID `json:"imageID"`
ImageID schsdk.ImageID `json:"imageID"`
}
type GetImageInfoResp struct {
mq.MessageBodyBase
schmod.ImageInfo
}
func NewGetImageInfo(imageID schmod.ImageID) *GetImageInfo {
func NewGetImageInfo(imageID schsdk.ImageID) *GetImageInfo {
return &GetImageInfo{
ImageID: imageID,
}
}
func NewGetImageInfoResp(imageID schmod.ImageID, packageID int64, importingInfo []schmod.ImageImportingInfo) *GetImageInfoResp {
func NewGetImageInfoResp(imageID schsdk.ImageID, packageID int64, importingInfo []schmod.ImageImportingInfo) *GetImageInfoResp {
return &GetImageInfoResp{
ImageInfo: schmod.ImageInfo{
ImageID: imageID,

View File

@ -4,12 +4,13 @@ import (
"fmt"
"sync"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
type Manager struct {
infos map[schmod.ImageID]*schmod.ImageInfo
infos map[schsdk.ImageID]*schmod.ImageInfo
imageIDIndex int64
lock sync.Mutex
@ -17,11 +18,11 @@ type Manager struct {
func NewManager() (*Manager, error) {
return &Manager{
infos: make(map[schmod.ImageID]*schmod.ImageInfo),
infos: make(map[schsdk.ImageID]*schmod.ImageInfo),
}, nil
}
func (m *Manager) GetImageInfo(imageID schmod.ImageID) (*schmod.ImageInfo, error) {
func (m *Manager) GetImageInfo(imageID schsdk.ImageID) (*schmod.ImageInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
@ -33,7 +34,7 @@ func (m *Manager) GetImageInfo(imageID schmod.ImageID) (*schmod.ImageInfo, error
return info, nil
}
func (m *Manager) GetImageImportingInfo(imageID schmod.ImageID, slwNodeID uopsdk.SlwNodeID) (*schmod.ImageImportingInfo, error) {
func (m *Manager) GetImageImportingInfo(imageID schsdk.ImageID, slwNodeID uopsdk.SlwNodeID) (*schmod.ImageImportingInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
@ -55,7 +56,7 @@ func (m *Manager) CreateImage(packageID int64) (*schmod.ImageInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
imageID := schmod.ImageID(fmt.Sprintf("%d", m.imageIDIndex))
imageID := schsdk.ImageID(fmt.Sprintf("%d", m.imageIDIndex))
m.imageIDIndex++
info := &schmod.ImageInfo{
@ -68,7 +69,7 @@ func (m *Manager) CreateImage(packageID int64) (*schmod.ImageInfo, error) {
return info, nil
}
func (m *Manager) AddImageImportingInfo(imageID schmod.ImageID, slwNodeID uopsdk.SlwNodeID, slwNodeImageID uopsdk.SlwNodeImageID) error {
func (m *Manager) AddImageImportingInfo(imageID schsdk.ImageID, slwNodeID uopsdk.SlwNodeID, slwNodeImageID uopsdk.SlwNodeImageID) error {
m.lock.Lock()
defer m.lock.Unlock()