JCC-CSScheduler/advisor/internal/task/schedule_scheme.go

195 lines
5.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/common/globals"
"gitlink.org.cn/cloudream/scheduler/common/models/job"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
"github.com/inhies/go-bytesize"
)
type MakeScheduleScheme struct {
Job job.NormalJob
preAdjustNodeID int64
}
func NewMakeScheduleScheme() *MakeScheduleScheme {
return &MakeScheduleScheme{}
}
func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[MakeScheduleScheme]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", err.Error(), true, advtsk.AdjustedScheme{}))
} else {
///////// 修改
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", "", false, advtsk.AdjustedScheme{}))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) error {
isAvailable, err := t.CheckResourceAvailability()
if err != nil {
return err
}
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心
} else {
// 重新执行预调度方案,寻找最优节点
}
return nil
}
// 检查预调度节点资源是否足够
func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return false, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
neededCPU := t.Job.Info.Resources.CPU
if neededCPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
models.ResourceTypeCPU,
))
if err != nil {
return false, err
}
availCPU := resp.Data.(models.CPUResourceData).Available
if float64(availCPU.Value) < 1.5*neededCPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient CPU resources, want: %f, available: %d%s", 1.5*neededCPU, availCPU.Value, availCPU.Unit)
return false, nil
}
}
neededNPU := t.Job.Info.Resources.NPU
if neededNPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
models.ResourceTypeNPU,
))
if err != nil {
return false, err
}
availNPU := resp.Data.(models.NPUResourceData).Available
if float64(availNPU.Value) < 1.5*neededNPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient NPU resources, want: %f, available: %d%s", 1.5*neededNPU, availNPU.Value, availNPU.Unit)
return false, nil
}
}
neededGPU := t.Job.Info.Resources.GPU
if neededGPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
models.ResourceTypeGPU,
))
if err != nil {
return false, err
}
availGPU := resp.Data.(models.GPUResourceData).Available
if float64(availGPU.Value) < 1.5*neededGPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient GPU resources, want: %f, available: %d%s", 1.5*neededGPU, availGPU.Value, availGPU.Unit)
return false, nil
}
}
neededMLU := t.Job.Info.Resources.MLU
if neededMLU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
models.ResourceTypeMLU,
))
if err != nil {
return false, err
}
availMLU := resp.Data.(models.MLUResourceData).Available
if float64(availMLU.Value) < 1.5*neededMLU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient MLU resources, want: %f, available: %d%s", 1.5*neededMLU, availMLU.Value, availMLU.Unit)
return false, nil
}
}
neededStorage := t.Job.Info.Resources.Storage
if neededStorage > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
models.ResourceTypeStorage,
))
if err != nil {
return false, err
}
availStorage := resp.Data.(models.StorageResourceData).Available
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return false, err
}
if int64(bytesStorage) < int64(1.5*float64(neededStorage)) {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient storage resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededStorage)), availStorage.Value, availStorage.Unit)
return false, nil
}
}
neededMemory := t.Job.Info.Resources.Memory
if neededMemory > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
models.ResourceTypeMemory,
))
if err != nil {
return false, err
}
availMemory := resp.Data.(models.MemoryResourceData).Available
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return false, err
}
if int64(bytesMemory) < int64(1.5*float64(neededMemory)) {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient memory resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededMemory)), availMemory.Value, availMemory.Unit)
return false, nil
}
}
return true, nil
}