This commit is contained in:
Sydonian 2023-11-02 11:22:13 +08:00
parent 85e7a58512
commit 3cb7095095
17 changed files with 100 additions and 103 deletions

View File

@ -134,7 +134,7 @@ func (s *DefaultScheduler) Schedule(job *jobmod.NormalJob) (*jobmod.JobScheduleS
}
defer schglb.CollectorMQPool.Release(colCli)
allSlwNodes := make(map[uopsdk.SlwNodeID]*candidateSlwNode)
allSlwNodes := make(map[schsdk.SlwNodeID]*candidateSlwNode)
// 查询有哪些算力中心可用
getNodesResp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo())
@ -202,7 +202,7 @@ func (s *DefaultScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) jo
return scheme
}
func (s *DefaultScheduler) calcResourceScore(job *jobmod.NormalJob, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
func (s *DefaultScheduler) calcResourceScore(job *jobmod.NormalJob, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error {
for _, slwNode := range allSlwNodes {
res, err := s.calcOneResourceScore(job.Info.Resources, slwNode.SlwNode.ID)
if err != nil {
@ -216,7 +216,7 @@ func (s *DefaultScheduler) calcResourceScore(job *jobmod.NormalJob, allSlwNodes
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID uopsdk.SlwNodeID) (*resourcesDetail, error) {
func (s *DefaultScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID schsdk.SlwNodeID) (*resourcesDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@ -359,7 +359,7 @@ func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int {
}
// 计算节点得分情况
func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error {
// 只计算运控返回的计算中心上的存储服务的数据权重
stgNodeToSlwNode := make(map[int64]*candidateSlwNode)
for _, slwNode := range allSlwNodes {
@ -406,14 +406,14 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allSlwNodes map[
}
// 计算package在各节点的得分情况
func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
if err != nil {
@ -457,7 +457,7 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNod
}
// 计算package在各节点的得分情况
func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@ -475,7 +475,7 @@ func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNode
return nil, fmt.Errorf("getting image info: %w", err)
}
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, imageInfoResp.PackageID))
if err != nil {

View File

@ -144,7 +144,7 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP
return nil, nil, fmt.Errorf("getting all slw node info: %w", err)
}
slwNodes := make(map[uopsdk.SlwNodeID]uopsdk.SlwNode)
slwNodes := make(map[schsdk.SlwNodeID]uopsdk.SlwNode)
for _, node := range getNodesResp.Nodes {
slwNodes[node.ID] = node
}
@ -256,8 +256,8 @@ func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulin
return orderedJob, true
}
func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, slwNodes map[uopsdk.SlwNodeID]uopsdk.SlwNode, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) {
allSlwNodes := make(map[uopsdk.SlwNodeID]*candidateSlwNode)
func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, slwNodes map[schsdk.SlwNodeID]uopsdk.SlwNode, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) {
allSlwNodes := make(map[schsdk.SlwNodeID]*candidateSlwNode)
// 初始化备选节点信息
for _, slwNode := range slwNodes {
@ -313,7 +313,7 @@ func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, jo
return &scheme, nil
}
func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetSlwNodeID uopsdk.SlwNodeID, schemes map[string]schsdk.LocalFileUploadScheme, slwNodes map[uopsdk.SlwNodeID]uopsdk.SlwNode) {
func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetSlwNodeID schsdk.SlwNodeID, schemes map[string]schsdk.LocalFileUploadScheme, slwNodes map[schsdk.SlwNodeID]uopsdk.SlwNode) {
if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
stgNodeID := slwNodes[targetSlwNodeID].StgNodeID
@ -373,7 +373,7 @@ func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targe
return scheme
}
func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error {
for _, slwNode := range allSlwNodes {
res, err := s.calcOneResourceScore(job.Resources, slwNode.SlwNode.ID)
if err != nil {
@ -387,7 +387,7 @@ func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allSl
}
// 划分节点资源等级,并计算资源得分
func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID uopsdk.SlwNodeID) (*resourcesDetail, error) {
func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID schsdk.SlwNodeID) (*resourcesDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@ -530,7 +530,7 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int
}
// 计算节点得分情况
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode) error {
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error {
// 只计算运控返回的可用计算中心上的存储服务的数据权重
stgNodeToSlwNode := make(map[int64]*candidateSlwNode)
for _, slwNode := range allSlwNodes {
@ -584,14 +584,14 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allSlwNod
}
// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
}
defer schglb.CollectorMQPool.Release(colCli)
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail)
// TODO UserID
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID))
@ -637,7 +637,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw
}
// 计算package在各节点的得分情况
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[uopsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[uopsdk.SlwNodeID]*fileDetail, error) {
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@ -655,7 +655,7 @@ func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwN
return nil, fmt.Errorf("getting image info: %w", err)
}
slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail)
slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail)
cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, imageInfoResp.PackageID))
if err != nil {

View File

@ -2,6 +2,7 @@ package config
import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
c "gitlink.org.cn/cloudream/common/utils/config"
@ -28,7 +29,7 @@ func Cfg() *Config {
}
type SlwNodeConfig struct {
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
}

View File

@ -18,12 +18,12 @@ func (svc *Service) GetImageList(msg *colmq.GetImageList) (*colmq.GetImageListRe
defer schglb.PCMPool.Release(pcmCli)
resp, err := pcmCli.GetImageList(pcmsdk.GetImageListReq{
SlwNodeID: msg.SlwNodeID,
PartID: msg.SlwNodeID,
})
if err != nil {
logger.Warnf("get image list failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get image list failed")
}
return mq.ReplyOK(colmq.NewGetImageListResp(resp.ImageIDs))
return mq.ReplyOK(colmq.NewGetImageListResp(resp.Images))
}

View File

@ -5,7 +5,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/pkgs/types"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
type FileScheduleAction string
@ -24,7 +23,7 @@ type FileScheduleScheme struct {
// 任务调度方案
type JobScheduleScheme struct {
TargetSlwNodeID uopsdk.SlwNodeID `json:"targetSlwNodeID"`
TargetSlwNodeID schsdk.SlwNodeID `json:"targetSlwNodeID"`
Dataset FileScheduleScheme `json:"dataset"`
Code FileScheduleScheme `json:"code"`
Image FileScheduleScheme `json:"image"`

View File

@ -2,14 +2,13 @@ package jobmod
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
type NormalJob struct {
JobBase
Info schsdk.NormalJobInfo `json:"info"` // 提交任务时提供的任务描述信息
Files JobFiles `json:"files"` // 任务需要的文件
TargetSlwNodeID uopsdk.SlwNodeID `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID
TargetSlwNodeID schsdk.SlwNodeID `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID
OutputFullPath string `json:"outputFullPath"` // 程序结果的完整输出路径
}

View File

@ -2,7 +2,6 @@ package schmod
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
type ExecutorID string
@ -16,6 +15,6 @@ type ImageInfo struct {
}
type ImageImportingInfo struct {
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeImageID uopsdk.SlwNodeImageID `json:"slwNodeImageID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeImageID schsdk.SlwNodeImageID `json:"slwNodeImageID"`
}

View File

@ -2,6 +2,8 @@ package collector
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
type PCMService interface {
@ -13,21 +15,21 @@ var _ = Register(Service.GetImageList)
type GetImageList struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
}
type GetImageListResp struct {
mq.MessageBodyBase
ImageIDs []int64 `json:"imageIDs"`
Images []pcmsdk.Image `json:"images"`
}
func NewGetImageList(slwNodeID int64) *GetImageList {
func NewGetImageList(slwNodeID schsdk.SlwNodeID) *GetImageList {
return &GetImageList{
SlwNodeID: slwNodeID,
}
}
func NewGetImageListResp(imageIDs []int64) *GetImageListResp {
func NewGetImageListResp(images []pcmsdk.Image) *GetImageListResp {
return &GetImageListResp{
ImageIDs: imageIDs,
Images: images,
}
}
func (c *Client) GetImageList(msg *GetImageList, opts ...mq.RequestOption) (*GetImageListResp, error) {

View File

@ -2,6 +2,7 @@ package collector
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
@ -16,7 +17,7 @@ var _ = Register(Service.GetOneResourceData)
type GetOneResourceData struct {
mq.MessageBodyBase
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
Type uopsdk.ResourceType `json:"type"`
}
type GetOneResourceDataResp struct {
@ -24,7 +25,7 @@ type GetOneResourceDataResp struct {
Data uopsdk.ResourceData `json:"data"`
}
func NewGetOneResourceData(nodeID uopsdk.SlwNodeID, typ uopsdk.ResourceType) *GetOneResourceData {
func NewGetOneResourceData(nodeID schsdk.SlwNodeID, typ uopsdk.ResourceType) *GetOneResourceData {
return &GetOneResourceData{
SlwNodeID: nodeID,
Type: typ,
@ -44,14 +45,14 @@ var _ = Register(Service.GetAllResourceData)
type GetAllResourceData struct {
mq.MessageBodyBase
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
}
type GetAllResourceDataResp struct {
mq.MessageBodyBase
Datas []uopsdk.ResourceData `json:"datas"`
}
func NewGetAllResourceData(nodeId uopsdk.SlwNodeID) *GetAllResourceData {
func NewGetAllResourceData(nodeId schsdk.SlwNodeID) *GetAllResourceData {
return &GetAllResourceData{
SlwNodeID: nodeId,
}

View File

@ -2,6 +2,7 @@ package collector
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
@ -16,14 +17,14 @@ var _ = Register(Service.GetSlwNodeInfo)
type GetSlwNodeInfo struct {
mq.MessageBodyBase
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
}
type GetSlwNodeInfoResp struct {
mq.MessageBodyBase
uopsdk.SlwNode
}
func NewGetSlwNodeInfo(slwNodeID uopsdk.SlwNodeID) *GetSlwNodeInfo {
func NewGetSlwNodeInfo(slwNodeID schsdk.SlwNodeID) *GetSlwNodeInfo {
return &GetSlwNodeInfo{
SlwNodeID: slwNodeID,
}

View File

@ -1,6 +1,10 @@
package executor
import "gitlink.org.cn/cloudream/common/pkgs/mq"
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
type PCMService interface {
DeleteImage(msg *DeleteImage) (*DeleteImageResp, *mq.CodeMessage)
@ -13,24 +17,21 @@ var _ = Register(Service.DeleteImage)
type DeleteImage struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
ImageID schsdk.SlwNodeImageID `json:"imageID"`
}
type DeleteImageResp struct {
mq.MessageBodyBase
Result string `json:"result"`
}
func NewDeleteImage(slwNodeID int64, pcmJobID int64) *DeleteImage {
func NewDeleteImage(slwNodeID schsdk.SlwNodeID, imageID schsdk.SlwNodeImageID) *DeleteImage {
return &DeleteImage{
SlwNodeID: slwNodeID,
PCMJobID: pcmJobID,
ImageID: imageID,
}
}
func NewDeleteImageResp(result string) *DeleteImageResp {
return &DeleteImageResp{
Result: result,
}
func NewDeleteImageResp() *DeleteImageResp {
return &DeleteImageResp{}
}
func (c *Client) DeleteImage(msg *DeleteImage, opts ...mq.RequestOption) (*DeleteImageResp, error) {
return mq.Request(Service.DeleteImage, c.rabbitCli, msg, opts...)
@ -41,24 +42,21 @@ var _ = Register(Service.DeleteTask)
type DeleteTask struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
TaskID pcmsdk.TaskID `json:"taskID"`
}
type DeleteTaskResp struct {
mq.MessageBodyBase
Result string `json:"result"`
}
func NewDeleteTask(slwNodeID int64, pcmJobID int64) *DeleteTask {
func NewDeleteTask(slwNodeID schsdk.SlwNodeID, taskID pcmsdk.TaskID) *DeleteTask {
return &DeleteTask{
SlwNodeID: slwNodeID,
PCMJobID: pcmJobID,
TaskID: taskID,
}
}
func NewDeleteTaskResp(result string) *DeleteTaskResp {
return &DeleteTaskResp{
Result: result,
}
func NewDeleteTaskResp() *DeleteTaskResp {
return &DeleteTaskResp{}
}
func (c *Client) DeleteTask(msg *DeleteTask, opts ...mq.RequestOption) (*DeleteTaskResp, error) {

View File

@ -2,26 +2,24 @@ package task
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
)
var _ = Register[*ScheduleTask, *ScheduleTaskStatus]()
type ScheduleTask struct {
TaskInfoBase
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
Envs []schsdk.EnvVar `json:"envs"`
SlwNodeImageID uopsdk.SlwNodeImageID `json:"slwNodeImageID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
Envs []schsdk.KVPair `json:"envs"`
SlwNodeImageID schsdk.SlwNodeImageID `json:"slwNodeImageID"`
CMDLine string `json:"cmdLine"`
}
type ScheduleTaskStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
PCMJobID int64 `json:"pcmJobID"`
Status string `json:"status"`
Error string `json:"error"`
}
func NewScheduleTask(slwNodeID uopsdk.SlwNodeID, envs []schsdk.EnvVar, slwNodeImageID uopsdk.SlwNodeImageID, cmdLine string) *ScheduleTask {
func NewScheduleTask(slwNodeID schsdk.SlwNodeID, envs []schsdk.KVPair, slwNodeImageID schsdk.SlwNodeImageID, cmdLine string) *ScheduleTask {
return &ScheduleTask{
SlwNodeID: slwNodeID,
Envs: envs,
@ -30,10 +28,9 @@ func NewScheduleTask(slwNodeID uopsdk.SlwNodeID, envs []schsdk.EnvVar, slwNodeIm
}
}
func NewScheduleTaskStatus(status string, err string, pcmJobID int64) *ScheduleTaskStatus {
func NewScheduleTaskStatus(status string, err string) *ScheduleTaskStatus {
return &ScheduleTaskStatus{
Status: status,
Error: err,
PCMJobID: pcmJobID,
Status: status,
Error: err,
}
}

View File

@ -1,28 +1,30 @@
package task
import uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)
var _ = Register[*UploadImage, *UploadImageStatus]()
type UploadImage struct {
TaskInfoBase
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
}
type UploadImageStatus struct {
TaskStatusBase
Status string `json:"status"`
Error string `json:"error"`
ImageID uopsdk.SlwNodeImageID `json:"imageID"`
ImageID schsdk.SlwNodeImageID `json:"imageID"`
}
func NewUploadImage(slwNodeID uopsdk.SlwNodeID, imagePath string) *UploadImage {
func NewUploadImage(slwNodeID schsdk.SlwNodeID, imagePath string) *UploadImage {
return &UploadImage{
SlwNodeID: slwNodeID,
ImagePath: imagePath,
}
}
func NewUploadImageStatus(status string, err string, imageID uopsdk.SlwNodeImageID) *UploadImageStatus {
func NewUploadImageStatus(status string, err string, imageID schsdk.SlwNodeImageID) *UploadImageStatus {
return &UploadImageStatus{
Status: status,
Error: err,

View File

@ -17,15 +17,15 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes
}
defer schglb.PCMPool.Release(pcmCli)
resp, err := pcmCli.DeleteImage(pcmsdk.DeleteImageReq{
SlwNodeID: msg.SlwNodeID,
PCMJobID: msg.PCMJobID,
err = pcmCli.DeleteImage(pcmsdk.DeleteImageReq{
PartID: msg.SlwNodeID,
ImageID: msg.ImageID,
})
if err != nil {
logger.Warnf("delete image failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "delete image failed")
}
return mq.ReplyOK(execmq.NewDeleteImageResp(resp.Result))
return mq.ReplyOK(execmq.NewDeleteImageResp())
}
func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp, *mq.CodeMessage) {
@ -36,13 +36,13 @@ func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp,
}
defer schglb.PCMPool.Release(pcmCli)
resp, err := pcmCli.DeleteTask(pcmsdk.DeleteTaskReq{
SlwNodeID: msg.SlwNodeID,
PCMJobID: msg.PCMJobID,
err = pcmCli.DeleteTask(pcmsdk.DeleteTaskReq{
PartID: msg.SlwNodeID,
TaskID: msg.TaskID,
})
if err != nil {
logger.Warnf("delete task failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "delete task failed")
}
return mq.ReplyOK(execmq.NewDeleteTaskResp(resp.Result))
return mq.ReplyOK(execmq.NewDeleteTaskResp())
}

View File

@ -30,7 +30,7 @@ func (t *PCMScheduleTask) Execute(task *task.Task[TaskContext], ctx TaskContext,
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewScheduleTaskStatus("failed", err.Error(), 0))
ctx.reporter.Report(task.ID(), exectsk.NewScheduleTaskStatus("failed", err.Error()))
}
ctx.reporter.ReportNow()
@ -46,11 +46,11 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
}
defer schglb.PCMPool.Release(pcmCli)
resp, err := pcmCli.ScheduleTask(pcmsdk.ScheduleTaskReq{
SlwNodeID: t.SlwNodeID,
Envs: t.Envs,
ImageID: t.SlwNodeImageID,
CMDLine: t.CMDLine,
resp, err := pcmCli.SubmitTask(pcmsdk.SubmitTaskReq{
PartID: t.SlwNodeID,
Envs: t.Envs,
ImageID: t.SlwNodeImageID,
CMD: t.CMDLine,
})
if err != nil {
@ -59,23 +59,22 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error {
var prevStatus string
for {
tsResp, err := pcmCli.GetTaskStatus(pcmsdk.GetTaskStatusReq{
SlwNodeID: t.SlwNodeID,
PCMJobID: resp.PCMJobID,
tsResp, err := pcmCli.GetTask(pcmsdk.GetTaskReq{
PartID: t.SlwNodeID,
TaskID: resp.TaskID,
})
if err != nil {
return err
}
if tsResp.Status != prevStatus {
ctx.reporter.Report(taskID, exectsk.NewScheduleTaskStatus(tsResp.Status, "", resp.PCMJobID))
if tsResp.TaskStatus != prevStatus {
ctx.reporter.Report(taskID, exectsk.NewScheduleTaskStatus(tsResp.TaskStatus, ""))
}
prevStatus = tsResp.Status
prevStatus = tsResp.TaskStatus
// TODO 根据接口result返回情况修改
// 根据返回的result判定任务是否完成若完成 跳出循环,结束任务
if tsResp.Status == "Completed" {
if tsResp.TaskStatus == pcmsdk.TaskStatusSuccess {
return nil
}
}

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@ -29,7 +30,7 @@ func (t *PCMUploadImage) Execute(task *task.Task[TaskContext], ctx TaskContext,
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), 0))
ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), schsdk.SlwNodeImageID("")))
}
ctx.reporter.ReportNow()
@ -53,7 +54,6 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error {
return err
}
// TODO 根据接口result返回情况修改
ctx.reporter.Report(taskID, exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID))
return nil
}

View File

@ -5,7 +5,6 @@ import (
"sync"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
)
@ -34,7 +33,7 @@ func (m *Manager) GetImageInfo(imageID schsdk.ImageID) (*schmod.ImageInfo, error
return info, nil
}
func (m *Manager) GetImageImportingInfo(imageID schsdk.ImageID, slwNodeID uopsdk.SlwNodeID) (*schmod.ImageImportingInfo, error) {
func (m *Manager) GetImageImportingInfo(imageID schsdk.ImageID, slwNodeID schsdk.SlwNodeID) (*schmod.ImageImportingInfo, error) {
m.lock.Lock()
defer m.lock.Unlock()
@ -69,7 +68,7 @@ func (m *Manager) CreateImage(packageID int64) (*schmod.ImageInfo, error) {
return info, nil
}
func (m *Manager) AddImageImportingInfo(imageID schsdk.ImageID, slwNodeID uopsdk.SlwNodeID, slwNodeImageID uopsdk.SlwNodeImageID) error {
func (m *Manager) AddImageImportingInfo(imageID schsdk.ImageID, slwNodeID schsdk.SlwNodeID, slwNodeImageID schsdk.SlwNodeImageID) error {
m.lock.Lock()
defer m.lock.Unlock()