更新函数名

This commit is contained in:
songjc 2023-09-11 08:57:55 +08:00
parent efddc9a8dc
commit fab24f13b5
13 changed files with 226 additions and 169 deletions

View File

@ -1,14 +1,22 @@
package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/common/utils/convertto"
"gitlink.org.cn/cloudream/scheduler/common/globals"
"gitlink.org.cn/cloudream/scheduler/common/models/job"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
type GetScheduleScheme struct {
Job job.NormalJob
preAdjustNodeID int64
}
func NewGetScheduleScheme() *GetScheduleScheme {
@ -23,7 +31,9 @@ func (t *GetScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskContex
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewScheduleTaskStatus("failed", err.Error(), 0))
ctx.reporter.Report(task.ID(), advtsk.NewTaskStatus("failed", err.Error(), true, advtsk.AdjustedScheme{}))
} else {
ctx.reporter.Report(task.ID(), advtsk.NewTaskStatus("failed", err.Error(), false, advtsk.AdjustedScheme{}))
}
ctx.reporter.ReportNow()
@ -33,41 +43,138 @@ func (t *GetScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskContex
}
func (t *GetScheduleScheme) do(taskID string, ctx TaskContext) error {
// pcmCli, err := globals.PCMPool.Acquire()
// if err != nil {
// return fmt.Errorf("new pcm client: %w", err)
// }
// defer pcmCli.Close()
isAvailable, err := t.CheckResourceAvailability()
if err != nil {
return err
}
// resp, err := pcmCli.ScheduleTask(pcm.ScheduleTaskReq{
// })
// if err != nil {
// return err
// }
// var prevStatus string
// for {
// tsResp, err := pcmCli.GetTaskStatus(pcm.GetTaskStatusReq{
// NodeID: t.nodeID,
// PCMJobID: resp.PCMJobID,
// })
// if err != nil {
// return err
// }
// if tsResp.Status != prevStatus {
// ctx.reporter.Report(taskID, exectsk.NewScheduleTaskStatus(tsResp.Status, "", resp.PCMJobID))
// }
// prevStatus = tsResp.Status
// // TODO 根据接口result返回情况修改
// // 根据返回的result判定任务是否完成若完成 跳出循环,结束任务
// if tsResp.Status == "Completed" {
// return nil
// }
// }
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心
} else {
// 重新执行预调度方案,寻找最优节点
}
return nil
}
// 检查预调度节点资源是否足够
func (t *GetScheduleScheme) CheckResourceAvailability() (bool, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return false, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
neededCPU := t.Job.Info.Resources.CPU
if neededCPU > 0 {
resp, err := colCli.GetOneResourceData(collector.GetOneResourceData{
NodeId: t.preAdjustNodeID,
ResourceType: models.ResourceTypeCPU,
})
if err != nil {
return false, err
}
availCPU := resp.Data.(models.CPUResourceData).Available.Value
if float64(availCPU) < 1.5*neededCPU {
fmt.Printf("Schedule Scheme is wrong: Insufficient cpu")
return false, nil
}
}
neededNPU := t.Job.Info.Resources.NPU
if neededNPU > 0 {
resp, err := colCli.GetOneResourceData(collector.GetOneResourceData{
NodeId: t.preAdjustNodeID,
ResourceType: models.ResourceTypeNPU,
})
if err != nil {
return false, err
}
availNPU := resp.Data.(models.NPUResourceData).Available.Value
if float64(availNPU) < 1.5*neededNPU {
fmt.Printf("Schedule Scheme is wrong: Insufficient npu")
return false, nil
}
}
neededGPU := t.Job.Info.Resources.GPU
if neededGPU > 0 {
resp, err := colCli.GetOneResourceData(collector.GetOneResourceData{
NodeId: t.preAdjustNodeID,
ResourceType: models.ResourceTypeGPU,
})
if err != nil {
return false, err
}
availGPU := resp.Data.(models.GPUResourceData).Available.Value
if float64(availGPU) < 1.5*neededGPU {
fmt.Printf("Schedule Scheme is wrong: Insufficient gpu")
return false, nil
}
}
neededMLU := t.Job.Info.Resources.MLU
if neededMLU > 0 {
resp, err := colCli.GetOneResourceData(collector.GetOneResourceData{
NodeId: t.preAdjustNodeID,
ResourceType: models.ResourceTypeMLU,
})
if err != nil {
return false, err
}
availMLU := resp.Data.(models.MLUResourceData).Available.Value
if float64(availMLU) < 1.5*neededMLU {
fmt.Printf("Schedule Scheme is wrong: Insufficient mlu")
return false, nil
}
}
neededStorage := t.Job.Info.Resources.Storage
if neededStorage > 0 {
resp, err := colCli.GetOneResourceData(collector.GetOneResourceData{
NodeId: t.preAdjustNodeID,
ResourceType: models.ResourceTypeStorage,
})
if err != nil {
return false, err
}
availStorage := resp.Data.(models.StorageResourceData).Available.Value
bytesStorage := convertto.GBToBytes(availStorage)
if bytesStorage < int64(1.5*float64(neededStorage)) {
fmt.Printf("Schedule Scheme is wrong: Insufficient storage")
return false, nil
}
}
neededMemory := t.Job.Info.Resources.Memory
if neededMemory > 0 {
resp, err := colCli.GetOneResourceData(collector.GetOneResourceData{
NodeId: t.preAdjustNodeID,
ResourceType: models.ResourceTypeMemory,
})
if err != nil {
return false, err
}
availMemory := resp.Data.(models.MemoryResourceData).Available.Value
bytesMemory := convertto.GBToBytes(availMemory)
if bytesMemory < int64(1.5*float64(neededMemory)) {
fmt.Printf("Schedule Scheme is wrong: Insufficient memory")
return false, nil
}
}
return true, nil
}

View File

@ -3,20 +3,19 @@ package advisor
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/models/job"
)
// 获取调度方案
var _ = Register(Service.StartGetScheduleScheme)
type StartGetScheduleScheme struct {
// UserID int64 `json:"userID"`
// PackageID int64 `json:"packageID"`
Job job.NormalJob `json:"job"`
}
func NewStartGetScheduleScheme() StartGetScheduleScheme {
func NewStartGetScheduleScheme(job job.NormalJob) StartGetScheduleScheme {
return StartGetScheduleScheme{
// UserID: userID,
// PackageID: packageID,
Job: job,
}
}

View File

@ -1,73 +1,21 @@
package task
type TaskStatus interface{}
type TaskStatusConst interface {
TaskStatus | ScheduleTaskStatus | UploadImageTaskStatus
}
type ScheduleTaskStatus struct {
type TaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
PCMJobID int64 `json:"pcmJobID"`
IsAdjustment bool `json:"isAdjustment"`
AdjustedScheme AdjustedScheme `json:"adjustedScheme"`
}
func NewScheduleTaskStatus(status string, err string, pcmJobID int64) ScheduleTaskStatus {
return ScheduleTaskStatus{
Status: status,
Error: err,
PCMJobID: pcmJobID,
}
}
type UploadImageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
ImageID int64 `json:"imageID"`
}
func NewUploadImageTaskStatus(status string, err string, imageID int64) UploadImageTaskStatus {
return UploadImageTaskStatus{
Status: status,
Error: err,
ImageID: imageID,
}
}
type CacheMovePackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
}
func NewCacheMovePackageTaskStatus(status string, err string) CacheMovePackageTaskStatus {
return CacheMovePackageTaskStatus{
Status: status,
Error: err,
}
}
type CreatePackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
}
func NewCreatePackageTaskStatus(status string, err string, packageID int64) CreatePackageTaskStatus {
return CreatePackageTaskStatus{
Status: status,
Error: err,
PackageID: packageID,
}
}
type LoadPackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
}
func NewLoadPackageTaskStatus(status string, err string) LoadPackageTaskStatus {
return LoadPackageTaskStatus{
type AdjustedScheme struct {
NodeID int64 `json:"nodeID"`
}
func NewTaskStatus(status string, err string, isAdjustment bool, adjustedScheme AdjustedScheme) TaskStatus {
return TaskStatus{
Status: status,
Error: err,
IsAdjustment: isAdjustment,
AdjustedScheme: adjustedScheme,
}
}

View File

@ -18,16 +18,16 @@ type PCMService interface {
var _ = Register(PCMService.StartUploadImage)
type StartUploadImage struct {
NodeID int64 `json:"nodeID"`
SlwNodeID int64 `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
}
type StartUploadImageResp struct {
TaskID string `json:"taskID"`
}
func NewStartUploadImage(nodeID int64, imagePath string) StartUploadImage {
func NewStartUploadImage(slwNodeID int64, imagePath string) StartUploadImage {
return StartUploadImage{
NodeID: nodeID,
SlwNodeID: slwNodeID,
ImagePath: imagePath,
}
}
@ -44,12 +44,12 @@ func (c *Client) StartUploadImage(msg StartUploadImage, opts ...mq.RequestOption
var _ = Register(PCMService.GetImageList)
type GetImageList struct {
NodeID int64 `json:"nodeID"`
SlwNodeID int64 `json:"slwNodeID"`
}
func NewGetImageList(nodeID int64) GetImageList {
func NewGetImageList(slwNodeID int64) GetImageList {
return GetImageList{
NodeID: nodeID,
SlwNodeID: slwNodeID,
}
}
@ -71,13 +71,13 @@ func (c *Client) GetImageList(msg GetImageList, opts ...mq.RequestOption) (*GetI
var _ = Register(PCMService.DeleteImage)
type DeleteImage struct {
NodeID int64 `json:"nodeID"`
SlwNodeID int64 `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewDeleteImage(nodeID int64, pcmJobID int64) DeleteImage {
func NewDeleteImage(slwNodeID int64, pcmJobID int64) DeleteImage {
return DeleteImage{
NodeID: nodeID,
SlwNodeID: slwNodeID,
PCMJobID: pcmJobID,
}
}
@ -100,7 +100,7 @@ func (c *Client) DeleteImage(msg DeleteImage, opts ...mq.RequestOption) (*Delete
var _ = Register(PCMService.StartScheduleTask)
type StartScheduleTask struct {
NodeID int64 `json:"nodeID"`
SlwNodeID int64 `json:"slwNodeID"`
Envs []map[string]string `json:"envs"`
ImageID int64 `json:"imageID"`
CMDLine string `json:"cmdLine"`
@ -109,9 +109,9 @@ type StartScheduleTaskResp struct {
TaskID string `json:"taskID"`
}
func NewStartScheduleTask(nodeID int64, envs []map[string]string, imageID int64, cmdLine string) StartScheduleTask {
func NewStartScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) StartScheduleTask {
return StartScheduleTask{
NodeID: nodeID,
SlwNodeID: slwNodeID,
Envs: envs,
ImageID: imageID,
CMDLine: cmdLine,
@ -130,13 +130,13 @@ func (c *Client) StartScheduleTask(msg StartUploadImage, opts ...mq.RequestOptio
var _ = Register(PCMService.DeleteTask)
type DeleteTask struct {
NodeID int64 `json:"nodeID"`
SlwNodeID int64 `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewDeleteTask(nodeID int64, pcmJobID int64) DeleteTask {
func NewDeleteTask(slwNodeID int64, pcmJobID int64) DeleteTask {
return DeleteTask{
NodeID: nodeID,
SlwNodeID: slwNodeID,
PCMJobID: pcmJobID,
}
}

View File

@ -80,17 +80,17 @@ var _ = Register(StorageService.StartCacheMovePackage)
type StartCacheMovePackage struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
NodeID int64 `json:"nodeID"`
StgNodeID int64 `json:"stgNodeID"`
}
type StartCacheMovePackageResp struct {
TaskID string `json:"taskID"`
}
func NewStartCacheMovePackage(userID int64, packageID int64, nodeID int64) StartCacheMovePackage {
func NewStartCacheMovePackage(userID int64, packageID int64, stgNodeID int64) StartCacheMovePackage {
return StartCacheMovePackage{
UserID: userID,
PackageID: packageID,
NodeID: nodeID,
StgNodeID: stgNodeID,
}
}
func NewStartCacheMovePackageResp(taskID string) StartCacheMovePackageResp {

View File

@ -61,10 +61,10 @@ func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus
TaskStatus: taskStatus,
}
}
func NewReportAdvisorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
func NewReportAdvisorTaskStatusResp() ReportAdvisorTaskStatusResp {
return ReportAdvisorTaskStatusResp{}
}
func NewAdvisorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) AdvisorTaskStatus {
func NewAdvisorTaskStatus(taskID string, status advtsk.TaskStatus) AdvisorTaskStatus {
return AdvisorTaskStatus{
TaskID: taskID,
Status: status,

View File

@ -11,7 +11,7 @@ import (
)
func (svc *Service) StartUploadImage(msg *execmq.StartUploadImage) (*execmq.StartUploadImageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewPCMUploadImage(msg.NodeID, msg.ImagePath))
tsk := svc.taskManager.StartNew(schtsk.NewPCMUploadImage(msg.SlwNodeID, msg.ImagePath))
return mq.ReplyOK(execmq.NewStartUploadImageResp(tsk.ID()))
}
@ -24,7 +24,7 @@ func (svc *Service) GetImageList(msg *execmq.GetImageList) (*execmq.GetImageList
defer pcmCli.Close()
resp, err := pcmCli.GetImageList(pcm.GetImageListReq{
NodeID: msg.NodeID,
SlwNodeID: msg.SlwNodeID,
})
if err != nil {
logger.Warnf("get image list failed, err: %s", err.Error())
@ -43,7 +43,7 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes
defer pcmCli.Close()
resp, err := pcmCli.DeleteImage(pcm.DeleteImageReq{
NodeID: msg.NodeID,
SlwNodeID: msg.SlwNodeID,
PCMJobID: msg.PCMJobID,
})
if err != nil {
@ -54,7 +54,7 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes
}
func (svc *Service) StartScheduleTask(msg *execmq.StartScheduleTask) (*execmq.StartScheduleTaskResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewPCMScheduleTask(msg.NodeID, msg.Envs, msg.ImageID, msg.CMDLine))
tsk := svc.taskManager.StartNew(schtsk.NewPCMScheduleTask(msg.SlwNodeID, msg.Envs, msg.ImageID, msg.CMDLine))
return mq.ReplyOK(execmq.NewStartScheduleTaskResp(tsk.ID()))
}
@ -67,7 +67,7 @@ func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp,
defer pcmCli.Close()
resp, err := pcmCli.DeleteTask(pcm.DeleteTaskReq{
NodeID: msg.NodeID,
SlwNodeID: msg.SlwNodeID,
PCMJobID: msg.PCMJobID,
})
if err != nil {

View File

@ -17,7 +17,7 @@ func (svc *Service) StartStorageCreatePackage(msg *execmq.StartStorageCreatePack
}
func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID))
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.StgNodeID))
// tsk := svc.taskManager.StartNew(task.TaskBody[schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID)])
return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID()))
}

View File

@ -14,14 +14,14 @@ import (
type CacheMovePackage struct {
userID int64
packageID int64
nodeID int64
stgNodeID int64
}
func NewCacheMovePackage(userID int64, packageID int64, nodeID int64) *CacheMovePackage {
func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage {
return &CacheMovePackage{
userID: userID,
packageID: packageID,
nodeID: nodeID,
stgNodeID: stgNodeID,
}
}
@ -54,6 +54,6 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
return stgCli.CacheMovePackage(storage.CacheMovePackageReq{
UserID: t.userID,
PackageID: t.packageID,
NodeID: t.packageID,
StgNodeID: t.stgNodeID,
})
}

View File

@ -13,15 +13,15 @@ import (
)
type PCMScheduleTask struct {
nodeID int64
slwNodeID int64
envs []map[string]string
imageID int64
cmdLine string
}
func NewPCMScheduleTask(nodeID int64, envs []map[string]string, imageID int64, cmdLine string) *PCMScheduleTask {
func NewPCMScheduleTask(slwNodeID int64, envs []map[string]string, imageID int64, cmdLine string) *PCMScheduleTask {
return &PCMScheduleTask{
nodeID: nodeID,
slwNodeID: slwNodeID,
envs: envs,
imageID: imageID,
cmdLine: cmdLine,
@ -53,7 +53,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
defer pcmCli.Close()
resp, err := pcmCli.ScheduleTask(pcm.ScheduleTaskReq{
NodeID: t.nodeID,
SlwNodeID: t.slwNodeID,
Envs: t.envs,
ImageID: t.imageID,
CMDLine: t.cmdLine,
@ -66,7 +66,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
var prevStatus string
for {
tsResp, err := pcmCli.GetTaskStatus(pcm.GetTaskStatusReq{
NodeID: t.nodeID,
SlwNodeID: t.slwNodeID,
PCMJobID: resp.PCMJobID,
})
if err != nil {

View File

@ -12,13 +12,13 @@ import (
)
type PCMUploadImage struct {
nodeID int64
slwNodeID int64
imagePath string
}
func NewPCMUploadImage(nodeID int64, imagePath string) *PCMUploadImage {
func NewPCMUploadImage(slwNodeID int64, imagePath string) *PCMUploadImage {
return &PCMUploadImage{
nodeID: nodeID,
slwNodeID: slwNodeID,
imagePath: imagePath,
}
}
@ -48,7 +48,7 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error {
defer pcmCli.Close()
resp, err := pcmCli.UploadImage(pcm.UploadImageReq{
NodeID: t.nodeID,
SlwNodeID: t.slwNodeID,
ImagePath: t.imagePath,
})
if err != nil {

1
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect

2
go.sum
View File

@ -41,6 +41,8 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=