diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..13566b8
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..7e8c9ec
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/scheduler.iml b/.idea/scheduler.iml
new file mode 100644
index 0000000..5e764c4
--- /dev/null
+++ b/.idea/scheduler.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go
index 313a71e..4dd6c5e 100644
--- a/advisor/internal/scheduler/scheduler.go
+++ b/advisor/internal/scheduler/scheduler.go
@@ -382,13 +382,13 @@ func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int {
// 计算节点得分情况
func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsdk.CCID]*candidate) error {
// 只计算运控返回的计算中心上的存储服务的数据权重
- cdsNodeToCC := make(map[cdssdk.NodeID]*candidate)
+ cdsStgToCC := make(map[cdssdk.StorageID]*candidate)
for _, cc := range allCCs {
- cdsNodeToCC[cc.CC.CDSNodeID] = cc
+ cdsStgToCC[cc.CC.CDSStorageID] = cc
}
//计算code相关得分
- codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsNodeToCC)
+ codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsStgToCC)
if err != nil {
return fmt.Errorf("calc code file score: %w", err)
}
@@ -397,7 +397,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd
}
//计算dataset相关得分
- datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsNodeToCC)
+ datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsStgToCC)
if err != nil {
return fmt.Errorf("calc dataset file score: %w", err)
}
@@ -406,7 +406,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd
}
//计算image相关得分
- imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsNodeToCC)
+ imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsStgToCC)
if err != nil {
return fmt.Errorf("calc image file score: %w", err)
}
@@ -427,7 +427,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd
}
// 计算package在各节点的得分情况
-func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
+func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@@ -441,8 +441,8 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN
return nil, err
}
- for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
- cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
+ for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
+ cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID]
if !ok {
continue
}
@@ -453,13 +453,13 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN
}
}
- loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID))
+ loadedResp, err := colCli.PackageGetLoadedStgs(collector.NewPackageGetLoadedStgs(1, packageID))
if err != nil {
return nil, err
}
- for _, cdsNodeID := range loadedResp.StgNodeIDs {
- cc, ok := cdsNodeToCC[cdsNodeID]
+ for _, cdsNodeID := range loadedResp.StgIDs {
+ cc, ok := cdsStgToCC[cdsNodeID]
if !ok {
continue
}
@@ -478,7 +478,7 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN
}
// 计算package在各节点的得分情况
-func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
+func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@@ -504,8 +504,8 @@ func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map
return nil, err
}
- for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
- cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
+ for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
+ cc, ok := cdsNodeToCC[cdsNodeCacheInfo.StorageID]
if !ok {
continue
}
diff --git a/collector/internal/config/config.go b/collector/internal/config/config.go
index 0408383..8d3ecfe 100644
--- a/collector/internal/config/config.go
+++ b/collector/internal/config/config.go
@@ -2,7 +2,7 @@ package config
import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
c "gitlink.org.cn/cloudream/common/utils/config"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
@@ -11,7 +11,7 @@ import (
type Config struct {
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
- CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
+ CloudreamStorage cdsapi.Config `json:"cloudreamStorage"`
UnifyOps uopsdk.Config `json:"unifyOps"`
}
diff --git a/collector/internal/mq/storage.go b/collector/internal/mq/storage.go
index 761fadb..41725e6 100644
--- a/collector/internal/mq/storage.go
+++ b/collector/internal/mq/storage.go
@@ -4,7 +4,7 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
@@ -17,7 +17,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
}
defer schglb.CloudreamStoragePool.Release(stgCli)
- resp, err := stgCli.Package().GetCachedNodes(cdssdk.PackageGetCachedNodesReq{
+ resp, err := stgCli.Package().GetCachedStorages(cdsapi.PackageGetCachedStoragesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
@@ -26,10 +26,10 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed")
}
- return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize))
+ return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.StorageInfos, resp.PackageSize))
}
-func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) {
+func (svc *Service) PackageGetLoadedStgs(msg *colmq.PackageGetLoadedStgs) (*colmq.PackageGetLoadedStgsResp, *mq.CodeMessage) {
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
@@ -37,7 +37,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes
}
defer schglb.CloudreamStoragePool.Release(stgCli)
- resp, err := stgCli.Package().GetLoadedNodes(cdssdk.PackageGetLoadedNodesReq{
+ resp, err := stgCli.Package().GetLoadedStorages(cdsapi.PackageGetLoadedStoragesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
@@ -46,5 +46,5 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes
return nil, mq.Failed(errorcode.OperationFailed, "get package loaded stg nodes failed")
}
- return mq.ReplyOK(colmq.NewPackageGetLoadedStgNodesResp(resp.NodeIDs))
+ return mq.ReplyOK(colmq.NewPackageGetLoadedStgsResp(resp.StorageIDs))
}
diff --git a/common/models/models.go b/common/models/models.go
index 0de6419..d00e140 100644
--- a/common/models/models.go
+++ b/common/models/models.go
@@ -23,8 +23,6 @@ type ComputingCenter struct {
UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID" db:"UOPSlwNodeID"`
// 计算中心在PCM系统的ID
PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID" db:"PCMParticipantID"`
- // 计算中心在存储系统的ID
- CDSNodeID cdssdk.NodeID `json:"cdsNodeID" db:"CDSNodeID"`
// 此算力中心的存储服务对应在存储系统中的ID
CDSStorageID cdssdk.StorageID `json:"cdsStorageID" db:"CDSStorageID"`
// 计算中心名称
diff --git a/common/pkgs/db/db.go b/common/pkgs/db/db.go
index 6b75d8f..381c8df 100644
--- a/common/pkgs/db/db.go
+++ b/common/pkgs/db/db.go
@@ -10,6 +10,9 @@ import (
"gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config"
)
+// TODO 迁移到Gorm
+// TODO ComputingCenter去掉了CDSNodeID字段,需要修改DB的结构
+
type DB struct {
d *sqlx.DB
}
diff --git a/common/pkgs/mq/advisor/server.go b/common/pkgs/mq/advisor/server.go
index 078d6c7..9d6a6ea 100644
--- a/common/pkgs/mq/advisor/server.go
+++ b/common/pkgs/mq/advisor/server.go
@@ -44,7 +44,7 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
-func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
+func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
return s.rabbitSvr.Start()
}
diff --git a/common/pkgs/mq/collector/server.go b/common/pkgs/mq/collector/server.go
index ab9b0eb..f5d4d4d 100644
--- a/common/pkgs/mq/collector/server.go
+++ b/common/pkgs/mq/collector/server.go
@@ -50,7 +50,7 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
-func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
+func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
return s.rabbitSvr.Start()
}
diff --git a/common/pkgs/mq/collector/storage.go b/common/pkgs/mq/collector/storage.go
index fd17b79..33eda7d 100644
--- a/common/pkgs/mq/collector/storage.go
+++ b/common/pkgs/mq/collector/storage.go
@@ -8,7 +8,7 @@ import (
type StorageService interface {
PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes) (*PackageGetCachedStgNodesResp, *mq.CodeMessage)
- PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes) (*PackageGetLoadedStgNodesResp, *mq.CodeMessage)
+ PackageGetLoadedStgs(msg *PackageGetLoadedStgs) (*PackageGetLoadedStgsResp, *mq.CodeMessage)
}
// 获取package的缓存分布情况
@@ -30,11 +30,11 @@ func NewPackageGetCachedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageI
PackageID: packageID,
}
}
-func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp {
+func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.StoragePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp {
return &PackageGetCachedStgNodesResp{
PackageCachingInfo: cdssdk.PackageCachingInfo{
- NodeInfos: nodeInfos,
- PackageSize: packageSize,
+ StorageInfos: nodeInfos,
+ PackageSize: packageSize,
},
}
}
@@ -43,29 +43,29 @@ func (c *Client) PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes, opts ..
}
// 获取package的存储分布情况
-var _ = Register(Service.PackageGetLoadedStgNodes)
+var _ = Register(Service.PackageGetLoadedStgs)
-type PackageGetLoadedStgNodes struct {
+type PackageGetLoadedStgs struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
}
-type PackageGetLoadedStgNodesResp struct {
+type PackageGetLoadedStgsResp struct {
mq.MessageBodyBase
- StgNodeIDs []cdssdk.NodeID `json:"stgNodeIDs"`
+ StgIDs []cdssdk.StorageID `json:"stgNodeIDs"`
}
-func NewPackageGetLoadedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgNodes {
- return &PackageGetLoadedStgNodes{
+func NewPackageGetLoadedStgs(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgs {
+ return &PackageGetLoadedStgs{
UserID: userID,
PackageID: packageID,
}
}
-func NewPackageGetLoadedStgNodesResp(nodeIDs []cdssdk.NodeID) *PackageGetLoadedStgNodesResp {
- return &PackageGetLoadedStgNodesResp{
- StgNodeIDs: nodeIDs,
+func NewPackageGetLoadedStgsResp(stgIDs []cdssdk.StorageID) *PackageGetLoadedStgsResp {
+ return &PackageGetLoadedStgsResp{
+ StgIDs: stgIDs,
}
}
-func (c *Client) PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes, opts ...mq.RequestOption) (*PackageGetLoadedStgNodesResp, error) {
- return mq.Request(Service.PackageGetLoadedStgNodes, c.rabbitCli, msg, opts...)
+func (c *Client) PackageGetLoadedStgs(msg *PackageGetLoadedStgs, opts ...mq.RequestOption) (*PackageGetLoadedStgsResp, error) {
+ return mq.Request(Service.PackageGetLoadedStgs, c.rabbitCli, msg, opts...)
}
diff --git a/common/pkgs/mq/executor/server.go b/common/pkgs/mq/executor/server.go
index f79c074..5d520c4 100644
--- a/common/pkgs/mq/executor/server.go
+++ b/common/pkgs/mq/executor/server.go
@@ -47,7 +47,7 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
-func (s *Server) Serve() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
+func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
return s.rabbitSvr.Start()
}
diff --git a/common/pkgs/mq/executor/sugoncloud.go b/common/pkgs/mq/executor/sugoncloud.go
index 25ce820..403228d 100644
--- a/common/pkgs/mq/executor/sugoncloud.go
+++ b/common/pkgs/mq/executor/sugoncloud.go
@@ -27,7 +27,7 @@ func (c *HttpClient) CreateSugonInstance(token string, config map[string]interfa
Token: token,
}
body, err := json.Marshal(config)
- resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{
+ resp, err := http2.PostJSON(targetURL, http2.RequestParam{
Body: body,
Header: header,
})
diff --git a/common/pkgs/mq/executor/task.go b/common/pkgs/mq/executor/task.go
index bbf2d1e..c1c61c3 100644
--- a/common/pkgs/mq/executor/task.go
+++ b/common/pkgs/mq/executor/task.go
@@ -57,7 +57,7 @@ func (c *HttpClient) SubmitTask(req *StartTask) (*StartTaskResp, error) {
}
data, err := serder.ObjectToJSONEx(req)
- resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{
+ resp, err := http2.PostJSON(targetURL, http2.RequestParam{
Body: data,
})
if err != nil {
@@ -128,7 +128,7 @@ func (c *HttpClient) OperateTask(req *TaskOperateInfo) (*TaskOperateResp, error)
}
data, err := serder.ObjectToJSONEx(req)
- resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{
+ resp, err := http2.PostJSON(targetURL, http2.RequestParam{
Body: data,
})
if err != nil {
diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go
index 5414bf3..455d6ef 100644
--- a/common/pkgs/mq/executor/task/cache_move_package.go
+++ b/common/pkgs/mq/executor/task/cache_move_package.go
@@ -6,18 +6,18 @@ type CacheMovePackage struct {
TaskInfoBase
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
- StgNodeID cdssdk.NodeID `json:"stgNodeID"`
+ StgID cdssdk.StorageID `json:"stgID"`
}
type CacheMovePackageStatus struct {
TaskStatusBase
Error string `json:"error"`
}
-func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage {
+func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CacheMovePackage {
return &CacheMovePackage{
UserID: userID,
PackageID: packageID,
- StgNodeID: stgNodeID,
+ StgID: stgID,
}
}
func NewCacheMovePackageStatus(err string) *CacheMovePackageStatus {
diff --git a/common/pkgs/mq/executor/task/storage_move_object.go b/common/pkgs/mq/executor/task/storage_move_object.go
index a008601..fe4f920 100644
--- a/common/pkgs/mq/executor/task/storage_move_object.go
+++ b/common/pkgs/mq/executor/task/storage_move_object.go
@@ -1,10 +1,12 @@
package task
-import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+import (
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
+)
type StorageMoveObject struct {
TaskInfoBase
- ObjectMove cdssdk.ObjectMove `json:"objectMove"`
+ ObjectMove cdsapi.ObjectMove `json:"objectMove"`
}
type StorageMoveObjectStatus struct {
@@ -12,7 +14,7 @@ type StorageMoveObjectStatus struct {
Error string `json:"error"`
}
-func NewStorageMoveObject(objectMove cdssdk.ObjectMove) *StorageMoveObject {
+func NewStorageMoveObject(objectMove cdsapi.ObjectMove) *StorageMoveObject {
return &StorageMoveObject{
ObjectMove: objectMove,
}
diff --git a/common/pkgs/mq/manager/server.go b/common/pkgs/mq/manager/server.go
index f8368ce..6f3ed0a 100644
--- a/common/pkgs/mq/manager/server.go
+++ b/common/pkgs/mq/manager/server.go
@@ -54,7 +54,7 @@ func (s *Server) Stop() {
s.rabbitSvr.Close()
}
-func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
+func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
return s.rabbitSvr.Start()
}
diff --git a/common/pkgs/prescheduler/calc_score.go b/common/pkgs/prescheduler/calc_score.go
index d0d37ce..c80a7ad 100644
--- a/common/pkgs/prescheduler/calc_score.go
+++ b/common/pkgs/prescheduler/calc_score.go
@@ -2,6 +2,7 @@ package prescheduler
import (
"fmt"
+
"github.com/inhies/go-bytesize"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -171,14 +172,14 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int
// 计算节点得分情况
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error {
// 只计算运控返回的可用计算中心上的存储服务的数据权重
- cdsNodeToCC := make(map[cdssdk.NodeID]*candidate)
+ cdsStgToCC := make(map[cdssdk.StorageID]*candidate)
for _, cc := range allCCs {
- cdsNodeToCC[cc.CC.CDSNodeID] = cc
+ cdsStgToCC[cc.CC.CDSStorageID] = cc
}
//计算code相关得分
if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok {
- codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
+ codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsStgToCC)
if err != nil {
return fmt.Errorf("calc code file score: %w", err)
}
@@ -189,7 +190,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma
//计算dataset相关得分
if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok {
- datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
+ datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsStgToCC)
if err != nil {
return fmt.Errorf("calc dataset file score: %w", err)
}
@@ -201,7 +202,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma
//计算image相关得分
if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok {
//计算image相关得分
- imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC)
+ imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsStgToCC)
if err != nil {
return fmt.Errorf("calc image file score: %w", err)
}
@@ -223,7 +224,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma
}
// 计算package在各节点的得分情况
-func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
+func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@@ -238,8 +239,8 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c
return nil, err
}
- for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
- cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
+ for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
+ cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID]
if !ok {
continue
}
@@ -251,13 +252,13 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c
}
// TODO UserID
- loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID))
+ loadedResp, err := colCli.PackageGetLoadedStgs(collector.NewPackageGetLoadedStgs(1, packageID))
if err != nil {
return nil, err
}
- for _, cdsNodeID := range loadedResp.StgNodeIDs {
- cc, ok := cdsNodeToCC[cdsNodeID]
+ for _, cdsStgID := range loadedResp.StgIDs {
+ cc, ok := cdsStgToCC[cdsStgID]
if !ok {
continue
}
@@ -276,7 +277,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c
}
// 计算package在各节点的得分情况
-func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
+func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new collector client: %w", err)
@@ -302,8 +303,8 @@ func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs
return nil, err
}
- for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
- cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
+ for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
+ cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID]
if !ok {
continue
}
diff --git a/common/pkgs/prescheduler/default_prescheduler.go b/common/pkgs/prescheduler/default_prescheduler.go
index 0db18e3..079b40c 100644
--- a/common/pkgs/prescheduler/default_prescheduler.go
+++ b/common/pkgs/prescheduler/default_prescheduler.go
@@ -471,30 +471,30 @@ func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[s
func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) {
if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
- cdsNodeID := ccs[targetCCID].CDSNodeID
+ cdsNodeID := ccs[targetCCID].CDSStorageID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
- LocalPath: localFile.LocalPath,
- UploadToCDSNodeID: &cdsNodeID,
+ LocalPath: localFile.LocalPath,
+ UploadToCDStorageID: cdsNodeID,
}
}
}
if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
- cdsNodeID := ccs[targetCCID].CDSNodeID
+ cdsNodeID := ccs[targetCCID].CDSStorageID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
- LocalPath: localFile.LocalPath,
- UploadToCDSNodeID: &cdsNodeID,
+ LocalPath: localFile.LocalPath,
+ UploadToCDStorageID: cdsNodeID,
}
}
}
if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok {
if _, ok := schemes[localFile.LocalPath]; !ok {
- cdsNodeID := ccs[targetCCID].CDSNodeID
+ cdsNodeID := ccs[targetCCID].CDSStorageID
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
- LocalPath: localFile.LocalPath,
- UploadToCDSNodeID: &cdsNodeID,
+ LocalPath: localFile.LocalPath,
+ UploadToCDStorageID: cdsNodeID,
}
}
}
diff --git a/executor/internal/config/config.go b/executor/internal/config/config.go
index 92d306c..21d42dd 100644
--- a/executor/internal/config/config.go
+++ b/executor/internal/config/config.go
@@ -4,7 +4,7 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
c "gitlink.org.cn/cloudream/common/utils/config"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
@@ -19,7 +19,7 @@ type Config struct {
Logger log.Config `json:"logger"`
ReportIntervalSec int `json:"reportIntervalSec"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
- CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
+ CloudreamStorage cdsapi.Config `json:"cloudreamStorage"`
PCM pcmsdk.Config `json:"pcm"`
Application Application `json:"application"`
Rclone schsdk.Rclone `json:"rclone"`
diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go
index 78b1953..abfdc31 100644
--- a/executor/internal/task/cache_move_package.go
+++ b/executor/internal/task/cache_move_package.go
@@ -2,8 +2,9 @@ package task
import (
"fmt"
+
"gitlink.org.cn/cloudream/common/pkgs/logger"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@@ -38,10 +39,10 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
}
defer schglb.CloudreamStoragePool.Release(stgCli)
- _, err = stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{
+ _, err = stgCli.CacheMovePackage(cdsapi.CacheMovePackageReq{
UserID: t.UserID,
PackageID: t.PackageID,
- NodeID: t.StgNodeID,
+ StorageID: t.StgID,
})
if err != nil {
return err
diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go
index e14eae6..4787bf0 100644
--- a/executor/internal/task/storage_create_package.go
+++ b/executor/internal/task/storage_create_package.go
@@ -2,8 +2,9 @@ package task
import (
"fmt"
+
"gitlink.org.cn/cloudream/common/pkgs/logger"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@@ -37,7 +38,7 @@ func (t *StorageCreatePackage) do(task *Task, ctx TaskContext) error {
}
defer schglb.CloudreamStoragePool.Release(stgCli)
- resp, err := stgCli.StorageCreatePackage(cdssdk.StorageCreatePackageReq{
+ resp, err := stgCli.StorageCreatePackage(cdsapi.StorageCreatePackageReq{
UserID: t.UserID,
StorageID: t.StorageID,
Path: t.Path,
diff --git a/executor/internal/task/storage_load_package.go b/executor/internal/task/storage_load_package.go
index ff4df89..6e8684c 100644
--- a/executor/internal/task/storage_load_package.go
+++ b/executor/internal/task/storage_load_package.go
@@ -2,8 +2,9 @@ package task
import (
"fmt"
+
"gitlink.org.cn/cloudream/common/pkgs/logger"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
@@ -39,7 +40,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) {
}
defer schglb.CloudreamStoragePool.Release(stgCli)
- resp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{
+ resp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
UserID: t.UserID,
PackageID: t.PackageID,
StorageID: t.StorageID,
diff --git a/log/advisor.log b/log/advisor.log
new file mode 100644
index 0000000..b9d065a
--- /dev/null
+++ b/log/advisor.log
@@ -0,0 +1,9 @@
+2024-04-16 10:03:59 [INFO] start serving mq server
+2024-04-16 10:03:59 [INFO] start serving scheduler service
+2024-04-16 10:03:59 [INFO] start serving reporter
+2024-04-16 10:04:14 [INFO] start serving mq server
+2024-04-16 10:04:14 [INFO] start serving reporter
+2024-04-16 10:04:14 [INFO] start serving scheduler service
+2024-04-16 10:04:31 [INFO] start serving scheduler service
+2024-04-16 10:04:31 [INFO] start serving reporter
+2024-04-16 10:04:31 [INFO] start serving mq server
diff --git a/log/collector.log b/log/collector.log
new file mode 100644
index 0000000..d19e8e6
--- /dev/null
+++ b/log/collector.log
@@ -0,0 +1 @@
+2024-04-16 10:03:21 [INFO] start serving command server
diff --git a/manager/internal/config/config.go b/manager/internal/config/config.go
index 119503c..279f93a 100644
--- a/manager/internal/config/config.go
+++ b/manager/internal/config/config.go
@@ -3,7 +3,7 @@ package config
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
"gitlink.org.cn/cloudream/common/utils/config"
db "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config"
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
@@ -13,7 +13,7 @@ type Config struct {
Logger logger.Config `json:"logger"`
RabbitMQ scmq.Config `json:"rabbitMQ"`
DB db.Config `json:"db"`
- CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
+ CloudreamStorage cdsapi.Config `json:"cloudreamStorage"`
ReportTimeoutSecs int `json:"reportTimeoutSecs"`
CDSRclone schsdk.Rclone `json:"CDSRclone"`
}
diff --git a/manager/internal/jobmgr/job/state/adjusting.go b/manager/internal/jobmgr/job/state/adjusting.go
index 514223c..e7b4cc7 100644
--- a/manager/internal/jobmgr/job/state/adjusting.go
+++ b/manager/internal/jobmgr/job/state/adjusting.go
@@ -5,11 +5,11 @@ import (
"errors"
"fmt"
"sync"
- "time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
@@ -128,7 +128,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState
if scheme.Action == jobmod.ActionMove {
logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID)
- taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
+ taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
@@ -169,12 +169,15 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState
func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error {
if scheme.Action == jobmod.ActionImportImage {
+ // TODO 镜像文件位置需要重新设计
+ return fmt.Errorf("not implemented yet")
+
if file.PackageID == nil {
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID)
}
// TODO UserID
- taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
+ taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
@@ -194,7 +197,7 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu
defer schglb.CloudreamStoragePool.Release(stgCli)
// TODO UserID
- pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
+ pkgObjs, err := stgCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
@@ -207,27 +210,27 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu
return fmt.Errorf("there must be only 1 object in the package which will be imported")
}
- taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
- if err != nil {
- return fmt.Errorf("moving package: %w", err)
- }
+ // taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdsapi.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
+ // if err != nil {
+ // return fmt.Errorf("moving package: %w", err)
+ // }
- fut2 := taskStatus2.Receive()
- status2 := <-fut2.Chan()
- if err != nil {
- return fmt.Errorf("uploading image: %w", err)
- }
+ // fut2 := taskStatus2.Receive()
+ // status2 := <-fut2.Chan()
+ // if err != nil {
+ // return fmt.Errorf("uploading image: %w", err)
+ // }
- uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus)
- if uploadStatus.Error != "" {
- return fmt.Errorf("uploading image: %s", uploadStatus.Error)
- }
+ // uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus)
+ // if uploadStatus.Error != "" {
+ // return fmt.Errorf("uploading image: %s", uploadStatus.Error)
+ // }
- // TODO 镜像名称
- err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
- if err != nil {
- return fmt.Errorf("creating image info: %w", err)
- }
+ // // TODO 镜像名称
+ // err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
+ // if err != nil {
+ // return fmt.Errorf("creating image info: %w", err)
+ // }
return nil
}
diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go
index eaa524d..8673d6f 100644
--- a/manager/internal/jobmgr/job/state/executing.go
+++ b/manager/internal/jobmgr/job/state/executing.go
@@ -3,15 +3,17 @@ package state
import (
"context"
"fmt"
+ "path/filepath"
+ "time"
+
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
- cdsapi "gitlink.org.cn/cloudream/common/sdks/storage"
+ cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
"gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr"
- "path/filepath"
- "time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
@@ -51,7 +53,7 @@ func (s *JobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmo
func (s *JobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
// TODO UserID
- userID := cdsapi.UserID(1)
+ userID := cdssdk.UserID(1)
err := error(nil)
switch runningJob := jo.Body.(type) {
@@ -133,13 +135,13 @@ func (s *JobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
return err
}
-func getDataSetPathByID(packageID cdsapi.PackageID) string {
+func getDataSetPathByID(packageID cdssdk.PackageID) string {
// TODO 临时使用,这个路径应该来自于CDS
dataSetPath := filepath.Join("packages", "1", fmt.Sprintf("%v", packageID))
return dataSetPath
}
-func loadDatasetPackage(userID cdsapi.UserID, packageID cdsapi.PackageID, storageID cdsapi.StorageID) (string, error) {
+func loadDatasetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) {
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return "", err
@@ -200,7 +202,7 @@ func (s *JobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd strin
}
}
-func (s *JobExecuting) submitDataPreprocessTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdsapi.StorageID, userID cdsapi.UserID) (string, error) {
+func (s *JobExecuting) submitDataPreprocessTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdssdk.StorageID, userID cdssdk.UserID) (string, error) {
objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID)
if err != nil {
logger.Error(err.Error())
@@ -231,7 +233,7 @@ func (s *JobExecuting) submitDataPreprocessTask(rtx jobmgr.JobStateRunContext, c
return tskStatus.InstanceID, nil
}
-func (s *JobExecuting) submitFinetuningTask(userID cdsapi.UserID, rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdsapi.StorageID, runningJob *job.NormalJob) error {
+func (s *JobExecuting) submitFinetuningTask(userID cdssdk.UserID, rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdssdk.StorageID, runningJob *job.NormalJob) error {
objectStorage, modelInfo, err := getModelInfoAndObjectStorage(rtx, runningJob.Info.ModelJobInfo.ModelID, storageID)
if err != nil {
@@ -265,7 +267,7 @@ func (s *JobExecuting) submitFinetuningTask(userID cdsapi.UserID, rtx jobmgr.Job
}
func (s *JobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter,
- storageID cdsapi.StorageID, userID cdsapi.UserID, envs []schsdk.KVPair) error {
+ storageID cdssdk.StorageID, userID cdssdk.UserID, envs []schsdk.KVPair) error {
modelJobInfo := runningJob.Info.ModelJobInfo
@@ -370,7 +372,7 @@ func (s *JobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *job
}
}
-func getModelInfoAndObjectStorage(rtx jobmgr.JobStateRunContext, modelID schsdk.ModelID, storageID cdsapi.StorageID) (*schmod.ObjectStorage, *schmod.ModelResource, error) {
+func getModelInfoAndObjectStorage(rtx jobmgr.JobStateRunContext, modelID schsdk.ModelID, storageID cdssdk.StorageID) (*schmod.ObjectStorage, *schmod.ModelResource, error) {
objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID)
if err != nil {
logger.Error(err.Error())
@@ -426,7 +428,7 @@ func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, dataSetPath string, output
return cmd, envs
}
-func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdsapi.UserID) (*schmod.ComputingCenter, *cdsapi.StorageGetResp, error) {
+func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdsapi.StorageGetResp, error) {
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID)
if err != nil {
return nil, nil, fmt.Errorf("getting computing center info: %w", err)
@@ -470,7 +472,7 @@ func (s *DataReturnJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.
func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error {
reJob := jo.Body.(*job.DataReturnJob)
- userID := cdsapi.UserID(1)
+ userID := cdssdk.UserID(1)
log := logger.WithType[JobExecuting]("State").WithField("JobID", jo.JobID)
diff --git a/manager/internal/jobmgr/job/state/multiInstance_update.go b/manager/internal/jobmgr/job/state/multiInstance_update.go
index e06090e..9644ff4 100644
--- a/manager/internal/jobmgr/job/state/multiInstance_update.go
+++ b/manager/internal/jobmgr/job/state/multiInstance_update.go
@@ -3,10 +3,12 @@ package state
import (
"context"
"fmt"
+
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
@@ -77,12 +79,12 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
}
userID := cdssdk.UserID(1)
- getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
+ getStg, err := stgCli.StorageGet(cdsapi.StorageGet{
UserID: userID,
StorageID: ccInfo.CDSStorageID,
})
- loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{
+ loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
UserID: userID,
PackageID: dtrJob.DataReturnPackageID,
StorageID: getStg.StorageID,
diff --git a/manager/internal/jobmgr/job/state/prescheduling.go b/manager/internal/jobmgr/job/state/prescheduling.go
index 3a8dd12..9fd83e1 100644
--- a/manager/internal/jobmgr/job/state/prescheduling.go
+++ b/manager/internal/jobmgr/job/state/prescheduling.go
@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
- cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
+ "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
@@ -148,9 +148,9 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS
}
if scheme.Action == jobmod.ActionMove {
- logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSNodeID)
+ logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID)
- taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
+ taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
@@ -223,12 +223,15 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
}
if scheme.Action == jobmod.ActionImportImage {
+ // TODO 需要重新设计镜像导入流程
+ return fmt.Errorf("not implemented")
+
if file.PackageID == nil {
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID)
}
// TODO UserID
- taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
+ taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
@@ -248,7 +251,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
defer schglb.CloudreamStoragePool.Release(stgCli)
// TODO UserID
- pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
+ pkgObjs, err := stgCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
@@ -261,7 +264,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
return fmt.Errorf("there must be only 1 object in the package which will be imported")
}
- taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
+ taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdsapi.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
if err != nil {
return fmt.Errorf("moving package: %w", err)
}
diff --git a/manager/internal/mq/job.go b/manager/internal/mq/job.go
index 9c557f7..11169dd 100644
--- a/manager/internal/mq/job.go
+++ b/manager/internal/mq/job.go
@@ -198,7 +198,7 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService
var jobSetServiceInfos []schsdk.JobSetServiceInfo
for _, jo := range jobs {
- var cdsNodeID *cdssdk.NodeID
+ var cdsNodeID cdssdk.StorageID
norJob, ok := jo.Body.(*jobmod.NormalJobDump)
if !ok {
@@ -212,20 +212,16 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get cdsNodeID failed by CCID: %s", err.Error()))
}
- cdsNodeID = &computingCenter.CDSNodeID
-
- } else {
- //返回空指针,表明查询任务不在执行状态,没有id
- cdsNodeID = nil
+ cdsNodeID = computingCenter.CDSStorageID
}
norJobInfo := jo.Info.(*schsdk.NormalJobInfo)
for _, servicePortInfo := range norJobInfo.Services.ServicePortInfos {
jobSetServiceInfo := schsdk.JobSetServiceInfo{
- Name: servicePortInfo.Name,
- Port: servicePortInfo.Port,
- CDSNodeID: cdsNodeID,
- LocalJobID: norJobInfo.LocalJobID,
+ Name: servicePortInfo.Name,
+ Port: servicePortInfo.Port,
+ CDSStorageID: cdsNodeID,
+ LocalJobID: norJobInfo.LocalJobID,
}
jobSetServiceInfos = append(jobSetServiceInfos, jobSetServiceInfo)
}