From 28ddbebc0662947aa02f5bfda47bda098b6b4d25 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 8 Nov 2024 15:13:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=80=E4=BA=9B=E7=BC=96?= =?UTF-8?q?=E8=AF=91=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- advisor/internal/scheduler/scheduler.go | 28 +++++------ collector/internal/config/config.go | 4 +- collector/internal/mq/storage.go | 12 ++--- common/globals/pools.go | 10 ++-- common/models/models.go | 2 - common/pkgs/db/db.go | 3 ++ common/pkgs/mq/advisor/server.go | 2 +- common/pkgs/mq/collector/server.go | 2 +- common/pkgs/mq/collector/storage.go | 30 ++++++------ common/pkgs/mq/executor/server.go | 3 +- common/pkgs/mq/executor/sugoncloud.go | 2 +- common/pkgs/mq/executor/task.go | 4 +- .../mq/executor/task/cache_move_package.go | 6 +-- .../mq/executor/task/storage_move_object.go | 8 ++-- common/pkgs/mq/manager/server.go | 2 +- common/pkgs/prescheduler/calc_score.go | 29 ++++++------ .../pkgs/prescheduler/default_prescheduler.go | 18 +++---- executor/internal/config/config.go | 4 +- executor/internal/task/cache_move_package.go | 7 +-- .../internal/task/storage_create_package.go | 5 +- .../internal/task/storage_load_package.go | 5 +- manager/internal/config/config.go | 4 +- .../internal/jobmgr/job/state/adjusting.go | 47 ++++++++++--------- .../internal/jobmgr/job/state/executing.go | 12 +++-- .../jobmgr/job/state/multiInstance_update.go | 6 ++- .../jobmgr/job/state/prescheduling.go | 15 +++--- manager/internal/mq/job.go | 16 +++---- 27 files changed, 150 insertions(+), 136 deletions(-) diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index 313a71e..4dd6c5e 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -382,13 +382,13 @@ func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int { // 计算节点得分情况 func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsdk.CCID]*candidate) error { // 只计算运控返回的计算中心上的存储服务的数据权重 - cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) + cdsStgToCC := make(map[cdssdk.StorageID]*candidate) for _, cc := range allCCs { - cdsNodeToCC[cc.CC.CDSNodeID] = cc + cdsStgToCC[cc.CC.CDSStorageID] = cc } //计算code相关得分 - codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsNodeToCC) + codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsStgToCC) if err != nil { return fmt.Errorf("calc code file score: %w", err) } @@ -397,7 +397,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd } //计算dataset相关得分 - datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsNodeToCC) + datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsStgToCC) if err != nil { return fmt.Errorf("calc dataset file score: %w", err) } @@ -406,7 +406,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd } //计算image相关得分 - imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsNodeToCC) + imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsStgToCC) if err != nil { return fmt.Errorf("calc image file score: %w", err) } @@ -427,7 +427,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd } // 计算package在各节点的得分情况 -func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -441,8 +441,8 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN return nil, err } - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + for _, cdsNodeCacheInfo := range cachedResp.StorageInfos { + cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID] if !ok { continue } @@ -453,13 +453,13 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN } } - loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) + loadedResp, err := colCli.PackageGetLoadedStgs(collector.NewPackageGetLoadedStgs(1, packageID)) if err != nil { return nil, err } - for _, cdsNodeID := range loadedResp.StgNodeIDs { - cc, ok := cdsNodeToCC[cdsNodeID] + for _, cdsNodeID := range loadedResp.StgIDs { + cc, ok := cdsStgToCC[cdsNodeID] if !ok { continue } @@ -478,7 +478,7 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN } // 计算package在各节点的得分情况 -func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -504,8 +504,8 @@ func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map return nil, err } - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + for _, cdsNodeCacheInfo := range cachedResp.StorageInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.StorageID] if !ok { continue } diff --git a/collector/internal/config/config.go b/collector/internal/config/config.go index 0408383..8d3ecfe 100644 --- a/collector/internal/config/config.go +++ b/collector/internal/config/config.go @@ -2,7 +2,7 @@ package config import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" c "gitlink.org.cn/cloudream/common/utils/config" mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" @@ -11,7 +11,7 @@ import ( type Config struct { Logger log.Config `json:"logger"` RabbitMQ mymq.Config `json:"rabbitMQ"` - CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` + CloudreamStorage cdsapi.Config `json:"cloudreamStorage"` UnifyOps uopsdk.Config `json:"unifyOps"` } diff --git a/collector/internal/mq/storage.go b/collector/internal/mq/storage.go index 761fadb..41725e6 100644 --- a/collector/internal/mq/storage.go +++ b/collector/internal/mq/storage.go @@ -4,7 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" ) @@ -17,7 +17,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.Package().GetCachedNodes(cdssdk.PackageGetCachedNodesReq{ + resp, err := stgCli.Package().GetCachedStorages(cdsapi.PackageGetCachedStoragesReq{ PackageID: msg.PackageID, UserID: msg.UserID, }) @@ -26,10 +26,10 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed") } - return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize)) + return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.StorageInfos, resp.PackageSize)) } -func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) { +func (svc *Service) PackageGetLoadedStgs(msg *colmq.PackageGetLoadedStgs) (*colmq.PackageGetLoadedStgsResp, *mq.CodeMessage) { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { logger.Warnf("new storage client, err: %s", err.Error()) @@ -37,7 +37,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.Package().GetLoadedNodes(cdssdk.PackageGetLoadedNodesReq{ + resp, err := stgCli.Package().GetLoadedStorages(cdsapi.PackageGetLoadedStoragesReq{ PackageID: msg.PackageID, UserID: msg.UserID, }) @@ -46,5 +46,5 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes return nil, mq.Failed(errorcode.OperationFailed, "get package loaded stg nodes failed") } - return mq.ReplyOK(colmq.NewPackageGetLoadedStgNodesResp(resp.NodeIDs)) + return mq.ReplyOK(colmq.NewPackageGetLoadedStgsResp(resp.StorageIDs)) } diff --git a/common/globals/pools.go b/common/globals/pools.go index b350c50..8bedc62 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -3,7 +3,7 @@ package schglb import ( pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor" @@ -27,12 +27,12 @@ func InitMQPool(cfg *scmq.Config) { ManagerMQPool = mgrmq.NewPool(cfg) } -var CloudreamStoragePool cdssdk.Pool -var CloudreamStorageConfig *cdssdk.Config +var CloudreamStoragePool cdsapi.Pool +var CloudreamStorageConfig *cdsapi.Config -func InitCloudreamStoragePool(cfg *cdssdk.Config) { +func InitCloudreamStoragePool(cfg *cdsapi.Config) { CloudreamStorageConfig = cfg - CloudreamStoragePool = cdssdk.NewPool(cfg) + CloudreamStoragePool = cdsapi.NewPool(cfg) } var UnifyOpsPool uopsdk.Pool diff --git a/common/models/models.go b/common/models/models.go index 0de6419..d00e140 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -23,8 +23,6 @@ type ComputingCenter struct { UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID" db:"UOPSlwNodeID"` // 计算中心在PCM系统的ID PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID" db:"PCMParticipantID"` - // 计算中心在存储系统的ID - CDSNodeID cdssdk.NodeID `json:"cdsNodeID" db:"CDSNodeID"` // 此算力中心的存储服务对应在存储系统中的ID CDSStorageID cdssdk.StorageID `json:"cdsStorageID" db:"CDSStorageID"` // 计算中心名称 diff --git a/common/pkgs/db/db.go b/common/pkgs/db/db.go index 6b75d8f..381c8df 100644 --- a/common/pkgs/db/db.go +++ b/common/pkgs/db/db.go @@ -10,6 +10,9 @@ import ( "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config" ) +// TODO 迁移到Gorm +// TODO ComputingCenter去掉了CDSNodeID字段,需要修改DB的结构 + type DB struct { d *sqlx.DB } diff --git a/common/pkgs/mq/advisor/server.go b/common/pkgs/mq/advisor/server.go index 078d6c7..9d6a6ea 100644 --- a/common/pkgs/mq/advisor/server.go +++ b/common/pkgs/mq/advisor/server.go @@ -44,7 +44,7 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] { +func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] { return s.rabbitSvr.Start() } diff --git a/common/pkgs/mq/collector/server.go b/common/pkgs/mq/collector/server.go index ab9b0eb..f5d4d4d 100644 --- a/common/pkgs/mq/collector/server.go +++ b/common/pkgs/mq/collector/server.go @@ -50,7 +50,7 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] { +func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] { return s.rabbitSvr.Start() } diff --git a/common/pkgs/mq/collector/storage.go b/common/pkgs/mq/collector/storage.go index fd17b79..33eda7d 100644 --- a/common/pkgs/mq/collector/storage.go +++ b/common/pkgs/mq/collector/storage.go @@ -8,7 +8,7 @@ import ( type StorageService interface { PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes) (*PackageGetCachedStgNodesResp, *mq.CodeMessage) - PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes) (*PackageGetLoadedStgNodesResp, *mq.CodeMessage) + PackageGetLoadedStgs(msg *PackageGetLoadedStgs) (*PackageGetLoadedStgsResp, *mq.CodeMessage) } // 获取package的缓存分布情况 @@ -30,11 +30,11 @@ func NewPackageGetCachedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageI PackageID: packageID, } } -func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp { +func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.StoragePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp { return &PackageGetCachedStgNodesResp{ PackageCachingInfo: cdssdk.PackageCachingInfo{ - NodeInfos: nodeInfos, - PackageSize: packageSize, + StorageInfos: nodeInfos, + PackageSize: packageSize, }, } } @@ -43,29 +43,29 @@ func (c *Client) PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes, opts .. } // 获取package的存储分布情况 -var _ = Register(Service.PackageGetLoadedStgNodes) +var _ = Register(Service.PackageGetLoadedStgs) -type PackageGetLoadedStgNodes struct { +type PackageGetLoadedStgs struct { mq.MessageBodyBase UserID cdssdk.UserID `json:"userID"` PackageID cdssdk.PackageID `json:"packageID"` } -type PackageGetLoadedStgNodesResp struct { +type PackageGetLoadedStgsResp struct { mq.MessageBodyBase - StgNodeIDs []cdssdk.NodeID `json:"stgNodeIDs"` + StgIDs []cdssdk.StorageID `json:"stgNodeIDs"` } -func NewPackageGetLoadedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgNodes { - return &PackageGetLoadedStgNodes{ +func NewPackageGetLoadedStgs(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgs { + return &PackageGetLoadedStgs{ UserID: userID, PackageID: packageID, } } -func NewPackageGetLoadedStgNodesResp(nodeIDs []cdssdk.NodeID) *PackageGetLoadedStgNodesResp { - return &PackageGetLoadedStgNodesResp{ - StgNodeIDs: nodeIDs, +func NewPackageGetLoadedStgsResp(stgIDs []cdssdk.StorageID) *PackageGetLoadedStgsResp { + return &PackageGetLoadedStgsResp{ + StgIDs: stgIDs, } } -func (c *Client) PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes, opts ...mq.RequestOption) (*PackageGetLoadedStgNodesResp, error) { - return mq.Request(Service.PackageGetLoadedStgNodes, c.rabbitCli, msg, opts...) +func (c *Client) PackageGetLoadedStgs(msg *PackageGetLoadedStgs, opts ...mq.RequestOption) (*PackageGetLoadedStgsResp, error) { + return mq.Request(Service.PackageGetLoadedStgs, c.rabbitCli, msg, opts...) } diff --git a/common/pkgs/mq/executor/server.go b/common/pkgs/mq/executor/server.go index c4299c5..5d520c4 100644 --- a/common/pkgs/mq/executor/server.go +++ b/common/pkgs/mq/executor/server.go @@ -2,6 +2,7 @@ package executor import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/sync2" mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) @@ -46,7 +47,7 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Serve() error { +func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] { return s.rabbitSvr.Start() } diff --git a/common/pkgs/mq/executor/sugoncloud.go b/common/pkgs/mq/executor/sugoncloud.go index 25ce820..403228d 100644 --- a/common/pkgs/mq/executor/sugoncloud.go +++ b/common/pkgs/mq/executor/sugoncloud.go @@ -27,7 +27,7 @@ func (c *HttpClient) CreateSugonInstance(token string, config map[string]interfa Token: token, } body, err := json.Marshal(config) - resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{ + resp, err := http2.PostJSON(targetURL, http2.RequestParam{ Body: body, Header: header, }) diff --git a/common/pkgs/mq/executor/task.go b/common/pkgs/mq/executor/task.go index bbf2d1e..c1c61c3 100644 --- a/common/pkgs/mq/executor/task.go +++ b/common/pkgs/mq/executor/task.go @@ -57,7 +57,7 @@ func (c *HttpClient) SubmitTask(req *StartTask) (*StartTaskResp, error) { } data, err := serder.ObjectToJSONEx(req) - resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{ + resp, err := http2.PostJSON(targetURL, http2.RequestParam{ Body: data, }) if err != nil { @@ -128,7 +128,7 @@ func (c *HttpClient) OperateTask(req *TaskOperateInfo) (*TaskOperateResp, error) } data, err := serder.ObjectToJSONEx(req) - resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{ + resp, err := http2.PostJSON(targetURL, http2.RequestParam{ Body: data, }) if err != nil { diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go index 5414bf3..455d6ef 100644 --- a/common/pkgs/mq/executor/task/cache_move_package.go +++ b/common/pkgs/mq/executor/task/cache_move_package.go @@ -6,18 +6,18 @@ type CacheMovePackage struct { TaskInfoBase UserID cdssdk.UserID `json:"userID"` PackageID cdssdk.PackageID `json:"packageID"` - StgNodeID cdssdk.NodeID `json:"stgNodeID"` + StgID cdssdk.StorageID `json:"stgID"` } type CacheMovePackageStatus struct { TaskStatusBase Error string `json:"error"` } -func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage { +func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CacheMovePackage { return &CacheMovePackage{ UserID: userID, PackageID: packageID, - StgNodeID: stgNodeID, + StgID: stgID, } } func NewCacheMovePackageStatus(err string) *CacheMovePackageStatus { diff --git a/common/pkgs/mq/executor/task/storage_move_object.go b/common/pkgs/mq/executor/task/storage_move_object.go index a008601..fe4f920 100644 --- a/common/pkgs/mq/executor/task/storage_move_object.go +++ b/common/pkgs/mq/executor/task/storage_move_object.go @@ -1,10 +1,12 @@ package task -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +import ( + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" +) type StorageMoveObject struct { TaskInfoBase - ObjectMove cdssdk.ObjectMove `json:"objectMove"` + ObjectMove cdsapi.ObjectMove `json:"objectMove"` } type StorageMoveObjectStatus struct { @@ -12,7 +14,7 @@ type StorageMoveObjectStatus struct { Error string `json:"error"` } -func NewStorageMoveObject(objectMove cdssdk.ObjectMove) *StorageMoveObject { +func NewStorageMoveObject(objectMove cdsapi.ObjectMove) *StorageMoveObject { return &StorageMoveObject{ ObjectMove: objectMove, } diff --git a/common/pkgs/mq/manager/server.go b/common/pkgs/mq/manager/server.go index f8368ce..6f3ed0a 100644 --- a/common/pkgs/mq/manager/server.go +++ b/common/pkgs/mq/manager/server.go @@ -54,7 +54,7 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] { +func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] { return s.rabbitSvr.Start() } diff --git a/common/pkgs/prescheduler/calc_score.go b/common/pkgs/prescheduler/calc_score.go index d0d37ce..c80a7ad 100644 --- a/common/pkgs/prescheduler/calc_score.go +++ b/common/pkgs/prescheduler/calc_score.go @@ -2,6 +2,7 @@ package prescheduler import ( "fmt" + "github.com/inhies/go-bytesize" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -171,14 +172,14 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int // 计算节点得分情况 func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { // 只计算运控返回的可用计算中心上的存储服务的数据权重 - cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) + cdsStgToCC := make(map[cdssdk.StorageID]*candidate) for _, cc := range allCCs { - cdsNodeToCC[cc.CC.CDSNodeID] = cc + cdsStgToCC[cc.CC.CDSStorageID] = cc } //计算code相关得分 if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { - codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsStgToCC) if err != nil { return fmt.Errorf("calc code file score: %w", err) } @@ -189,7 +190,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma //计算dataset相关得分 if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { - datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) + datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsStgToCC) if err != nil { return fmt.Errorf("calc dataset file score: %w", err) } @@ -201,7 +202,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma //计算image相关得分 if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { //计算image相关得分 - imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) + imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsStgToCC) if err != nil { return fmt.Errorf("calc image file score: %w", err) } @@ -223,7 +224,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma } // 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -238,8 +239,8 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c return nil, err } - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + for _, cdsNodeCacheInfo := range cachedResp.StorageInfos { + cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID] if !ok { continue } @@ -251,13 +252,13 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c } // TODO UserID - loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID)) + loadedResp, err := colCli.PackageGetLoadedStgs(collector.NewPackageGetLoadedStgs(1, packageID)) if err != nil { return nil, err } - for _, cdsNodeID := range loadedResp.StgNodeIDs { - cc, ok := cdsNodeToCC[cdsNodeID] + for _, cdsStgID := range loadedResp.StgIDs { + cc, ok := cdsStgToCC[cdsStgID] if !ok { continue } @@ -276,7 +277,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c } // 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -302,8 +303,8 @@ func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs return nil, err } - for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { - cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + for _, cdsNodeCacheInfo := range cachedResp.StorageInfos { + cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID] if !ok { continue } diff --git a/common/pkgs/prescheduler/default_prescheduler.go b/common/pkgs/prescheduler/default_prescheduler.go index 0db18e3..079b40c 100644 --- a/common/pkgs/prescheduler/default_prescheduler.go +++ b/common/pkgs/prescheduler/default_prescheduler.go @@ -471,30 +471,30 @@ func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[s func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID + cdsNodeID := ccs[targetCCID].CDSStorageID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, + LocalPath: localFile.LocalPath, + UploadToCDStorageID: cdsNodeID, } } } if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID + cdsNodeID := ccs[targetCCID].CDSStorageID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, + LocalPath: localFile.LocalPath, + UploadToCDStorageID: cdsNodeID, } } } if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { - cdsNodeID := ccs[targetCCID].CDSNodeID + cdsNodeID := ccs[targetCCID].CDSStorageID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ - LocalPath: localFile.LocalPath, - UploadToCDSNodeID: &cdsNodeID, + LocalPath: localFile.LocalPath, + UploadToCDStorageID: cdsNodeID, } } } diff --git a/executor/internal/config/config.go b/executor/internal/config/config.go index 92d306c..21d42dd 100644 --- a/executor/internal/config/config.go +++ b/executor/internal/config/config.go @@ -4,7 +4,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" c "gitlink.org.cn/cloudream/common/utils/config" schmod "gitlink.org.cn/cloudream/scheduler/common/models" mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" @@ -19,7 +19,7 @@ type Config struct { Logger log.Config `json:"logger"` ReportIntervalSec int `json:"reportIntervalSec"` RabbitMQ mymq.Config `json:"rabbitMQ"` - CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` + CloudreamStorage cdsapi.Config `json:"cloudreamStorage"` PCM pcmsdk.Config `json:"pcm"` Application Application `json:"application"` Rclone schsdk.Rclone `json:"rclone"` diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index 78b1953..abfdc31 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -2,8 +2,9 @@ package task import ( "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -38,10 +39,10 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { } defer schglb.CloudreamStoragePool.Release(stgCli) - _, err = stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ + _, err = stgCli.CacheMovePackage(cdsapi.CacheMovePackageReq{ UserID: t.UserID, PackageID: t.PackageID, - NodeID: t.StgNodeID, + StorageID: t.StgID, }) if err != nil { return err diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go index e14eae6..4787bf0 100644 --- a/executor/internal/task/storage_create_package.go +++ b/executor/internal/task/storage_create_package.go @@ -2,8 +2,9 @@ package task import ( "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -37,7 +38,7 @@ func (t *StorageCreatePackage) do(task *Task, ctx TaskContext) error { } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.StorageCreatePackage(cdssdk.StorageCreatePackageReq{ + resp, err := stgCli.StorageCreatePackage(cdsapi.StorageCreatePackageReq{ UserID: t.UserID, StorageID: t.StorageID, Path: t.Path, diff --git a/executor/internal/task/storage_load_package.go b/executor/internal/task/storage_load_package.go index ff4df89..6e8684c 100644 --- a/executor/internal/task/storage_load_package.go +++ b/executor/internal/task/storage_load_package.go @@ -2,8 +2,9 @@ package task import ( "fmt" + "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -39,7 +40,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) { } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{ + resp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{ UserID: t.UserID, PackageID: t.PackageID, StorageID: t.StorageID, diff --git a/manager/internal/config/config.go b/manager/internal/config/config.go index 119503c..279f93a 100644 --- a/manager/internal/config/config.go +++ b/manager/internal/config/config.go @@ -3,7 +3,7 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/config" db "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config" scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" @@ -13,7 +13,7 @@ type Config struct { Logger logger.Config `json:"logger"` RabbitMQ scmq.Config `json:"rabbitMQ"` DB db.Config `json:"db"` - CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` + CloudreamStorage cdsapi.Config `json:"cloudreamStorage"` ReportTimeoutSecs int `json:"reportTimeoutSecs"` CDSRclone schsdk.Rclone `json:"CDSRclone"` } diff --git a/manager/internal/jobmgr/job/state/adjusting.go b/manager/internal/jobmgr/job/state/adjusting.go index 514223c..e7b4cc7 100644 --- a/manager/internal/jobmgr/job/state/adjusting.go +++ b/manager/internal/jobmgr/job/state/adjusting.go @@ -5,11 +5,11 @@ import ( "errors" "fmt" "sync" - "time" "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" @@ -128,7 +128,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState if scheme.Action == jobmod.ActionMove { logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID) - taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } @@ -169,12 +169,15 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error { if scheme.Action == jobmod.ActionImportImage { + // TODO 镜像文件位置需要重新设计 + return fmt.Errorf("not implemented yet") + if file.PackageID == nil { return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID) } // TODO UserID - taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } @@ -194,7 +197,7 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu defer schglb.CloudreamStoragePool.Release(stgCli) // TODO UserID - pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID}) + pkgObjs, err := stgCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID}) if err != nil { return fmt.Errorf("getting package objects: %w", err) } @@ -207,27 +210,27 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu return fmt.Errorf("there must be only 1 object in the package which will be imported") } - taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) - if err != nil { - return fmt.Errorf("moving package: %w", err) - } + // taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdsapi.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) + // if err != nil { + // return fmt.Errorf("moving package: %w", err) + // } - fut2 := taskStatus2.Receive() - status2 := <-fut2.Chan() - if err != nil { - return fmt.Errorf("uploading image: %w", err) - } + // fut2 := taskStatus2.Receive() + // status2 := <-fut2.Chan() + // if err != nil { + // return fmt.Errorf("uploading image: %w", err) + // } - uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus) - if uploadStatus.Error != "" { - return fmt.Errorf("uploading image: %s", uploadStatus.Error) - } + // uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus) + // if uploadStatus.Error != "" { + // return fmt.Errorf("uploading image: %s", uploadStatus.Error) + // } - // TODO 镜像名称 - err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) - if err != nil { - return fmt.Errorf("creating image info: %w", err) - } + // // TODO 镜像名称 + // err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now()) + // if err != nil { + // return fmt.Errorf("creating image info: %w", err) + // } return nil } diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index 12b86c9..8673d6f 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -3,15 +3,17 @@ package state import ( "context" "fmt" + "path/filepath" + "time" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schmod "gitlink.org.cn/cloudream/scheduler/common/models" "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" "gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr" - "path/filepath" - "time" "gitlink.org.cn/cloudream/common/pkgs/logger" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" @@ -146,7 +148,7 @@ func loadDatasetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storag } defer schglb.CloudreamStoragePool.Release(stgCli) - loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{ + loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{ UserID: userID, PackageID: packageID, StorageID: storageID, @@ -426,7 +428,7 @@ func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, dataSetPath string, output return cmd, envs } -func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdssdk.StorageGetResp, error) { +func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdsapi.StorageGetResp, error) { ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID) if err != nil { return nil, nil, fmt.Errorf("getting computing center info: %w", err) @@ -437,7 +439,7 @@ func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, return nil, nil, fmt.Errorf("new cds client: %w", err) } defer schglb.CloudreamStoragePool.Release(stgCli) - getStg, err := stgCli.StorageGet(cdssdk.StorageGet{ + getStg, err := stgCli.StorageGet(cdsapi.StorageGet{ UserID: userID, StorageID: ccInfo.CDSStorageID, }) diff --git a/manager/internal/jobmgr/job/state/multiInstance_update.go b/manager/internal/jobmgr/job/state/multiInstance_update.go index e06090e..9644ff4 100644 --- a/manager/internal/jobmgr/job/state/multiInstance_update.go +++ b/manager/internal/jobmgr/job/state/multiInstance_update.go @@ -3,10 +3,12 @@ package state import ( "context" "fmt" + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" @@ -77,12 +79,12 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) } userID := cdssdk.UserID(1) - getStg, err := stgCli.StorageGet(cdssdk.StorageGet{ + getStg, err := stgCli.StorageGet(cdsapi.StorageGet{ UserID: userID, StorageID: ccInfo.CDSStorageID, }) - loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{ + loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{ UserID: userID, PackageID: dtrJob.DataReturnPackageID, StorageID: getStg.StorageID, diff --git a/manager/internal/jobmgr/job/state/prescheduling.go b/manager/internal/jobmgr/job/state/prescheduling.go index 3a8dd12..9fd83e1 100644 --- a/manager/internal/jobmgr/job/state/prescheduling.go +++ b/manager/internal/jobmgr/job/state/prescheduling.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" @@ -148,9 +148,9 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS } if scheme.Action == jobmod.ActionMove { - logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSNodeID) + logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID) - taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } @@ -223,12 +223,15 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta } if scheme.Action == jobmod.ActionImportImage { + // TODO 需要重新设计镜像导入流程 + return fmt.Errorf("not implemented") + if file.PackageID == nil { return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID) } // TODO UserID - taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo) + taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } @@ -248,7 +251,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta defer schglb.CloudreamStoragePool.Release(stgCli) // TODO UserID - pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID}) + pkgObjs, err := stgCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID}) if err != nil { return fmt.Errorf("getting package objects: %w", err) } @@ -261,7 +264,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta return fmt.Errorf("there must be only 1 object in the package which will be imported") } - taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) + taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdsapi.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo) if err != nil { return fmt.Errorf("moving package: %w", err) } diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go index 9c557f7..11169dd 100644 --- a/manager/internal/mq/job.go +++ b/manager/internal/mq/job.go @@ -198,7 +198,7 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService var jobSetServiceInfos []schsdk.JobSetServiceInfo for _, jo := range jobs { - var cdsNodeID *cdssdk.NodeID + var cdsNodeID cdssdk.StorageID norJob, ok := jo.Body.(*jobmod.NormalJobDump) if !ok { @@ -212,20 +212,16 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get cdsNodeID failed by CCID: %s", err.Error())) } - cdsNodeID = &computingCenter.CDSNodeID - - } else { - //返回空指针,表明查询任务不在执行状态,没有id - cdsNodeID = nil + cdsNodeID = computingCenter.CDSStorageID } norJobInfo := jo.Info.(*schsdk.NormalJobInfo) for _, servicePortInfo := range norJobInfo.Services.ServicePortInfos { jobSetServiceInfo := schsdk.JobSetServiceInfo{ - Name: servicePortInfo.Name, - Port: servicePortInfo.Port, - CDSNodeID: cdsNodeID, - LocalJobID: norJobInfo.LocalJobID, + Name: servicePortInfo.Name, + Port: servicePortInfo.Port, + CDSStorageID: cdsNodeID, + LocalJobID: norJobInfo.LocalJobID, } jobSetServiceInfos = append(jobSetServiceInfos, jobSetServiceInfo) }