增加调度方案

This commit is contained in:
songjc 2023-09-18 08:55:31 +08:00
parent a6bab2d048
commit 176e098b3a
9 changed files with 542 additions and 83 deletions

View File

@ -0,0 +1,429 @@
package scheduler
import (
"fmt"
"sort"
"github.com/inhies/go-bytesize"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
const (
//每个节点划分的资源等级:
// ResourceLevel1表示所有资源类型均满足 大于等于1.5倍
ResourceLevel1 = 1
// ResourceLevel2表示不满足Level1但所有资源类型均满足 大于等于1倍
ResourceLevel2 = 2
// ResourceLevel3 表示某些资源类型 小于一倍
ResourceLevel3 = 3
CpuResourceWeight float64 = 1
StgResourceWeight float64 = 1.2
CachingWeight float64 = 1
LoadedWeight float64 = 2
)
var ErrScheduleWaiting = fmt.Errorf("no appropriate scheduling node found, please wait")
type Scheduler interface {
Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error)
}
type Rescheduling struct {
}
type SlwNodeInfo struct {
slwNodeID int64
resourceLevel int
resourceScore float64
nodeScore float64
scheme *jobmod.JobScheduleScheme
}
func NewRescheduling() *Rescheduling {
return &Rescheduling{}
}
func (r *Rescheduling) Schedule(info jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
JobQueue := make(chan jobmod.NormalJob, 100)
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
var backupSlwNodeInfos []SlwNodeInfo
// 查询有哪些算力中心可作为备选,并为各节点划分资源等级
resp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
if err != nil {
return nil, err
}
for _, node := range resp.Nodes {
rl, rs, err := r.CheckResourceAvailability(info.Info.Resources, node.ID)
if err != nil {
return nil, err
}
slwNodeInfo := SlwNodeInfo{
slwNodeID: node.ID,
resourceLevel: rl,
resourceScore: rs,
}
backupSlwNodeInfos = append(backupSlwNodeInfos, slwNodeInfo)
}
// 遍历各节点,并筛选出满足资源条件的节点
var matchingSlwNodes []SlwNodeInfo
for _, slwNode := range backupSlwNodeInfos {
if slwNode.resourceLevel == ResourceLevel1 {
matchingSlwNodes = append(matchingSlwNodes, slwNode)
}
}
// 如果只有一个满足条件的节点, 直接选择该节点,并将存储资源调度到该节点上
if len(matchingSlwNodes) == 1 {
_, scheme, err := r.ComputeSlwNodeScore(info.Files, matchingSlwNodes[0].slwNodeID)
if err != nil {
return nil, err
}
return scheme, nil
//如果有两个及以上节点满足条件,计算各节点的分数, 并选择得分最高的一个
} else if len(matchingSlwNodes) > 1 {
for _, slwNode := range matchingSlwNodes {
score, scheme, err := r.ComputeSlwNodeScore(info.Files, slwNode.slwNodeID)
if err != nil {
return nil, err
}
slwNode.nodeScore = score
slwNode.scheme = scheme
}
sort.Slice(matchingSlwNodes, func(i, j int) bool {
return matchingSlwNodes[i].nodeScore > matchingSlwNodes[j].nodeScore
})
return matchingSlwNodes[0].scheme, nil
// 如果没有满足条件的节点, 在ResourceLevel为2的节点中进行选择
} else {
var secondaryMatchingSlwNodes []SlwNodeInfo
for _, slwNode := range backupSlwNodeInfos {
if slwNode.resourceLevel == ResourceLevel2 {
secondaryMatchingSlwNodes = append(secondaryMatchingSlwNodes, slwNode)
}
}
// 如果只有一个满足条件的节点, 直接选择该节点,并将存储资源调度到该节点上
if len(secondaryMatchingSlwNodes) == 1 {
_, scheme, err := r.ComputeSlwNodeScore(info.Files, secondaryMatchingSlwNodes[0].slwNodeID)
if err != nil {
return nil, err
}
return scheme, nil
//如果有两个及以上节点满足条件,选择资源最充足的一个
} else if len(secondaryMatchingSlwNodes) > 1 {
sort.Slice(secondaryMatchingSlwNodes, func(i, j int) bool {
return secondaryMatchingSlwNodes[i].resourceScore > secondaryMatchingSlwNodes[j].resourceScore
})
_, scheme, err := r.ComputeSlwNodeScore(info.Files, secondaryMatchingSlwNodes[0].slwNodeID)
if err != nil {
return nil, err
}
return scheme, nil
//若仍没有符合条件的节点,放入队列中等待
} else {
//TODO 放入队列中,等待合适时机重新调度
JobQueue <- info
return nil, ErrScheduleWaiting
}
}
}
// 划分节点资源等级,并计算资源得分
func (r *Rescheduling) CheckResourceAvailability(resources models.JobResourcesInfo, slwNodeID int64) (int, float64, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, 0, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
res, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID))
if err != nil {
return 0, 0, err
}
var CPUScore float64
var NPUScore float64
var GPUScore float64
var MLUScore float64
var StorageScore float64
var MemoryScore float64
resourceLevel := ResourceLevel1
resourceCount := 0
for _, r := range res.Datas {
switch r.(type) {
case *models.CPUResourceData:
availCPU := r.(*models.CPUResourceData).Available
if resources.CPU > 0 {
resourceCount++
if float64(availCPU.Value) < 1.5*resources.CPU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availCPU.Value) < resources.CPU {
resourceLevel = ResourceLevel3
}
CPUScore = (float64(availCPU.Value) / resources.CPU) * CpuResourceWeight
}
case *models.NPUResourceData:
availNPU := r.(*models.NPUResourceData).Available
if resources.NPU > 0 {
resourceCount++
if float64(availNPU.Value) < 1.5*resources.NPU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availNPU.Value) < resources.NPU {
resourceLevel = ResourceLevel3
}
NPUScore = (float64(availNPU.Value) / resources.NPU) * CpuResourceWeight
}
case *models.GPUResourceData:
availGPU := r.(*models.GPUResourceData).Available
if resources.GPU > 0 {
resourceCount++
if float64(availGPU.Value) < 1.5*resources.GPU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availGPU.Value) < resources.GPU {
resourceLevel = ResourceLevel3
}
GPUScore = (float64(availGPU.Value) / resources.GPU) * CpuResourceWeight
}
case *models.MLUResourceData:
availMLU := r.(*models.MLUResourceData).Available
if resources.MLU > 0 {
resourceCount++
if float64(availMLU.Value) < 1.5*resources.MLU && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(availMLU.Value) < resources.MLU {
resourceLevel = ResourceLevel3
}
MLUScore = (float64(availMLU.Value) / resources.MLU) * CpuResourceWeight
}
case *models.StorageResourceData:
availStorage := r.(*models.StorageResourceData).Available
if resources.Storage > 0 {
resourceCount++
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return 0, 0, err
}
if float64(bytesStorage) < 1.5*float64(resources.Storage) && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(bytesStorage) < float64(resources.Storage) {
resourceLevel = ResourceLevel3
}
StorageScore = (float64(bytesStorage) / float64(resources.Storage)) * StgResourceWeight
}
case *models.MemoryResourceData:
availMemory := r.(*models.MemoryResourceData).Available
if resources.Memory > 0 {
resourceCount++
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return 0, 0, err
}
if float64(bytesMemory) < 1.5*float64(resources.Memory) && resourceLevel == ResourceLevel1 {
resourceLevel = ResourceLevel2
}
if float64(bytesMemory) < float64(resources.Memory) {
resourceLevel = ResourceLevel3
}
MemoryScore = (float64(bytesMemory) / float64(resources.Memory)) * StgResourceWeight
}
default:
fmt.Println("Unknown Resource Type")
}
}
// 计算资源权值之和
resourceScore := (CPUScore + NPUScore + GPUScore + MLUScore + StorageScore + MemoryScore) / float64(resourceCount)
return resourceLevel, resourceScore, nil
}
// 计算节点得分情况,并生成调度方案
func (r *Rescheduling) ComputeSlwNodeScore(files jobmod.JobFiles, slwNodeID int64) (float64, *jobmod.JobScheduleScheme, error) {
var scheme *jobmod.JobScheduleScheme
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
//计算code相关得分
codeScore, isLoading, err := r.ComputeStgScore(files.Code.PackageID, slwNodeID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
if isLoading {
scheme.Code = jobmod.FileScheduleScheme{
Action: jobmod.ActionLoad,
}
} else {
scheme.Code = jobmod.FileScheduleScheme{
Action: jobmod.ActionNo,
}
}
//计算dataset相关得分
datasetScore, isLoading, err := r.ComputeStgScore(files.Dataset.PackageID, slwNodeID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
if isLoading {
scheme.Dataset = jobmod.FileScheduleScheme{
Action: jobmod.ActionLoad,
}
} else {
scheme.Dataset = jobmod.FileScheduleScheme{
Action: jobmod.ActionNo,
}
}
//计算image相关得分
imageScore, _, err := r.ComputeStgScore(files.Image.PackageID, slwNodeID)
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
magCli, err := globals.ManagerMQPool.Acquire()
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
res, err := magCli.GetImageInfo(manager.NewGetImageInfo(files.Image.ImageID))
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
imageResp, err := colCli.GetImageList(collector.NewGetImageList(slwNodeID))
if err != nil {
return 0, &jobmod.JobScheduleScheme{}, err
}
isImport := true
for _, imageID := range imageResp.ImageIDs {
if imageID == res.SlwNodeImageID {
isImport = false
break
}
}
if isImport {
scheme.Image = jobmod.FileScheduleScheme{
Action: jobmod.ActionImportImage,
}
} else {
scheme.Image = jobmod.FileScheduleScheme{
Action: jobmod.ActionNo,
}
}
slwNodeScore := codeScore + datasetScore + imageScore
scheme.TargetSlwNodeID = slwNodeID
return slwNodeScore, scheme, nil
}
// 计算package在该节点下的得分情况
func (r *Rescheduling) ComputeStgScore(packageID int64, slwNodeID int64) (float64, bool, error) {
colCli, err := globals.CollectorMQPool.Acquire()
if err != nil {
return 0, false, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
if err != nil {
return 0, false, err
}
var cachedScore float64
for _, nodeInfo := range cachedResp.NodeInfos {
if nodeInfo.NodeID == slwNodeID {
//TODO 根据缓存方式不同,可能会有不同的计算方式
cachedScore = float64(nodeInfo.FileSize/cachedResp.PackageSize) * CachingWeight
break
}
}
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(0, packageID))
if err != nil {
return 0, false, err
}
var loadedScore float64
for _, nodeID := range loadedResp.StgNodeIDs {
if nodeID == slwNodeID {
loadedScore = 1 * LoadedWeight
break
}
}
var isLoading bool
if loadedScore == 0 {
isLoading = true
} else {
isLoading = false
}
return cachedScore + loadedScore, isLoading, nil
}

View File

@ -7,8 +7,9 @@ import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler"
"gitlink.org.cn/cloudream/scheduler/common/globals"
"gitlink.org.cn/cloudream/scheduler/common/models/job"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
@ -16,8 +17,8 @@ import (
)
type MakeScheduleScheme struct {
Job job.NormalJob
preAdjustNodeID int64
Job jobmod.NormalJob
Scheme jobmod.JobScheduleScheme
}
func NewMakeScheduleScheme() *MakeScheduleScheme {
@ -32,10 +33,10 @@ func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskConte
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", err.Error(), true, advtsk.AdjustedScheme{}))
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", err.Error(), true, jobmod.JobScheduleScheme{}))
} else {
///////// 修改
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", "", false, advtsk.AdjustedScheme{}))
// 将调度方案上报给manager
ctx.reporter.Report(task.ID(), advtsk.NewScheduleSchemeTaskStatus("failed", "", false, jobmod.JobScheduleScheme{}))
}
ctx.reporter.ReportNow()
@ -49,12 +50,26 @@ func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) error {
if err != nil {
return err
}
var rescheduling scheduler.Rescheduling
var scheme jobmod.JobScheduleScheme
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心
} else {
// 确认code、dataset、image是否已经调度到该中心并生成调度方案
_, s, err := rescheduling.ComputeSlwNodeScore(t.Job.Files, t.Job.TargetSlwNodeID)
scheme = *s
if err != nil {
return err
}
// 重新执行预调度方案,寻找最优节点
} else {
s, err := rescheduling.Schedule(t.Job)
scheme = *s
if err != nil {
return err
}
}
t.Scheme = scheme
return nil
}
@ -69,14 +84,14 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
neededCPU := t.Job.Info.Resources.CPU
if neededCPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
t.Job.TargetSlwNodeID,
models.ResourceTypeCPU,
))
if err != nil {
return false, err
}
availCPU := resp.Data.(models.CPUResourceData).Available
availCPU := resp.Data.(*models.CPUResourceData).Available
if float64(availCPU.Value) < 1.5*neededCPU {
logger.WithField("JobID", t.Job.JobID).
@ -88,14 +103,14 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
neededNPU := t.Job.Info.Resources.NPU
if neededNPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
t.Job.TargetSlwNodeID,
models.ResourceTypeNPU,
))
if err != nil {
return false, err
}
availNPU := resp.Data.(models.NPUResourceData).Available
availNPU := resp.Data.(*models.NPUResourceData).Available
if float64(availNPU.Value) < 1.5*neededNPU {
logger.WithField("JobID", t.Job.JobID).
@ -107,14 +122,14 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
neededGPU := t.Job.Info.Resources.GPU
if neededGPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
t.Job.TargetSlwNodeID,
models.ResourceTypeGPU,
))
if err != nil {
return false, err
}
availGPU := resp.Data.(models.GPUResourceData).Available
availGPU := resp.Data.(*models.GPUResourceData).Available
if float64(availGPU.Value) < 1.5*neededGPU {
logger.WithField("JobID", t.Job.JobID).
@ -126,14 +141,14 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
neededMLU := t.Job.Info.Resources.MLU
if neededMLU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
t.Job.TargetSlwNodeID,
models.ResourceTypeMLU,
))
if err != nil {
return false, err
}
availMLU := resp.Data.(models.MLUResourceData).Available
availMLU := resp.Data.(*models.MLUResourceData).Available
if float64(availMLU.Value) < 1.5*neededMLU {
logger.WithField("JobID", t.Job.JobID).
@ -145,14 +160,14 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
neededStorage := t.Job.Info.Resources.Storage
if neededStorage > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
t.Job.TargetSlwNodeID,
models.ResourceTypeStorage,
))
if err != nil {
return false, err
}
availStorage := resp.Data.(models.StorageResourceData).Available
availStorage := resp.Data.(*models.StorageResourceData).Available
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
@ -169,14 +184,14 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
neededMemory := t.Job.Info.Resources.Memory
if neededMemory > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.preAdjustNodeID,
t.Job.TargetSlwNodeID,
models.ResourceTypeMemory,
))
if err != nil {
return false, err
}
availMemory := resp.Data.(models.MemoryResourceData).Available
availMemory := resp.Data.(*models.MemoryResourceData).Available
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
@ -188,7 +203,6 @@ func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
Infof("insufficient memory resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededMemory)), availMemory.Value, availMemory.Unit)
return false, nil
}
}
return true, nil
}

View File

@ -0,0 +1,29 @@
package mq
import (
"gitlink.org.cn/cloudream/common/api/pcm"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
func (svc *Service) GetImageList(msg *colmq.GetImageList) (*colmq.GetImageListResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
resp, err := pcmCli.GetImageList(pcm.GetImageListReq{
SlwNodeID: msg.SlwNodeID,
})
if err != nil {
logger.Warnf("get image list failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get image list failed")
}
return mq.ReplyOK(colmq.NewGetImageListResp(resp.ImageIDs))
}

View File

@ -3,6 +3,7 @@ package task
import (
"gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type TaskStatus interface {
@ -20,21 +21,17 @@ var TaskStatusTypeUnion = types.NewTypeUnion[TaskStatus](
type ScheduleSchemeTaskStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
IsAdjustment bool `json:"isAdjustment"`
AdjustedScheme AdjustedScheme `json:"adjustedScheme"`
Status string `json:"status"`
Error string `json:"error"`
IsRescheduling bool `json:"isRescheduling"`
RescheduleScheme jobmod.JobScheduleScheme `json:"rescheduleScheme"`
}
type AdjustedScheme struct {
NodeID int64 `json:"nodeID"`
}
func NewScheduleSchemeTaskStatus(status string, err string, isAdjustment bool, adjustedScheme AdjustedScheme) *ScheduleSchemeTaskStatus {
func NewScheduleSchemeTaskStatus(status string, err string, isRescheduling bool, rescheduleScheme jobmod.JobScheduleScheme) *ScheduleSchemeTaskStatus {
return &ScheduleSchemeTaskStatus{
Status: status,
Error: err,
IsAdjustment: isAdjustment,
AdjustedScheme: adjustedScheme,
Status: status,
Error: err,
IsRescheduling: isRescheduling,
RescheduleScheme: rescheduleScheme,
}
}

View File

@ -0,0 +1,35 @@
package collector
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
type PCMService interface {
GetImageList(msg *GetImageList) (*GetImageListResp, *mq.CodeMessage)
}
// 查询镜像列表
var _ = Register(Service.GetImageList)
type GetImageList struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
}
type GetImageListResp struct {
mq.MessageBodyBase
ImageIDs []int64 `json:"imageIDs"`
}
func NewGetImageList(slwNodeID int64) *GetImageList {
return &GetImageList{
SlwNodeID: slwNodeID,
}
}
func NewGetImageListResp(imageIDs []int64) *GetImageListResp {
return &GetImageListResp{
ImageIDs: imageIDs,
}
}
func (c *Client) GetImageList(msg *GetImageList, opts ...mq.RequestOption) (*GetImageListResp, error) {
return mq.Request(Service.GetImageList, c.rabbitCli, msg, opts...)
}

View File

@ -10,6 +10,8 @@ const (
)
type Service interface {
PCMService
ResourceService
SlwService

View File

@ -48,8 +48,8 @@ type GetAllSlwNodeInfoResp struct {
Nodes []models.SlwNode `json:"nodes"`
}
func NewGetAllSlwNodeInfo() *GetAllSlwNodeInfoResp {
return &GetAllSlwNodeInfoResp{}
func NewGetAllSlwNodeInfo() *GetAllSlwNodeInfo {
return &GetAllSlwNodeInfo{}
}
func NewGetAllSlwNodeInfoResp(nodes []models.SlwNode) *GetAllSlwNodeInfoResp {
return &GetAllSlwNodeInfoResp{

View File

@ -3,39 +3,11 @@ package executor
import "gitlink.org.cn/cloudream/common/pkgs/mq"
type PCMService interface {
GetImageList(msg *GetImageList) (*GetImageListResp, *mq.CodeMessage)
DeleteImage(msg *DeleteImage) (*DeleteImageResp, *mq.CodeMessage)
DeleteTask(msg *DeleteTask) (*DeleteTaskResp, *mq.CodeMessage)
}
// 查询镜像列表
var _ = Register(Service.GetImageList)
type GetImageList struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
}
type GetImageListResp struct {
mq.MessageBodyBase
ImageIDs []int64 `json:"imageIDs"`
}
func NewGetImageList(slwNodeID int64) *GetImageList {
return &GetImageList{
SlwNodeID: slwNodeID,
}
}
func NewGetImageListResp(imageIDs []int64) *GetImageListResp {
return &GetImageListResp{
ImageIDs: imageIDs,
}
}
func (c *Client) GetImageList(msg *GetImageList, opts ...mq.RequestOption) (*GetImageListResp, error) {
return mq.Request(Service.GetImageList, c.rabbitCli, msg, opts...)
}
// 删除镜像
var _ = Register(Service.DeleteImage)

View File

@ -9,25 +9,6 @@ import (
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
)
func (svc *Service) GetImageList(msg *execmq.GetImageList) (*execmq.GetImageListResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
resp, err := pcmCli.GetImageList(pcm.GetImageListReq{
SlwNodeID: msg.SlwNodeID,
})
if err != nil {
logger.Warnf("get image list failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get image list failed")
}
return mq.ReplyOK(execmq.NewGetImageListResp(resp.ImageIDs))
}
func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageResp, *mq.CodeMessage) {
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {