修复一些编译错误
This commit is contained in:
parent
f5cb48eede
commit
28ddbebc06
|
@ -382,13 +382,13 @@ func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int {
|
|||
// 计算节点得分情况
|
||||
func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsdk.CCID]*candidate) error {
|
||||
// 只计算运控返回的计算中心上的存储服务的数据权重
|
||||
cdsNodeToCC := make(map[cdssdk.NodeID]*candidate)
|
||||
cdsStgToCC := make(map[cdssdk.StorageID]*candidate)
|
||||
for _, cc := range allCCs {
|
||||
cdsNodeToCC[cc.CC.CDSNodeID] = cc
|
||||
cdsStgToCC[cc.CC.CDSStorageID] = cc
|
||||
}
|
||||
|
||||
//计算code相关得分
|
||||
codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsNodeToCC)
|
||||
codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsStgToCC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("calc code file score: %w", err)
|
||||
}
|
||||
|
@ -397,7 +397,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd
|
|||
}
|
||||
|
||||
//计算dataset相关得分
|
||||
datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsNodeToCC)
|
||||
datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsStgToCC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("calc dataset file score: %w", err)
|
||||
}
|
||||
|
@ -406,7 +406,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd
|
|||
}
|
||||
|
||||
//计算image相关得分
|
||||
imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsNodeToCC)
|
||||
imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsStgToCC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("calc image file score: %w", err)
|
||||
}
|
||||
|
@ -427,7 +427,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd
|
|||
}
|
||||
|
||||
// 计算package在各节点的得分情况
|
||||
func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
colCli, err := schglb.CollectorMQPool.Acquire()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new collector client: %w", err)
|
||||
|
@ -441,8 +441,8 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN
|
|||
return nil, err
|
||||
}
|
||||
|
||||
for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
|
||||
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
|
||||
for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
|
||||
cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -453,13 +453,13 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN
|
|||
}
|
||||
}
|
||||
|
||||
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID))
|
||||
loadedResp, err := colCli.PackageGetLoadedStgs(collector.NewPackageGetLoadedStgs(1, packageID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, cdsNodeID := range loadedResp.StgNodeIDs {
|
||||
cc, ok := cdsNodeToCC[cdsNodeID]
|
||||
for _, cdsNodeID := range loadedResp.StgIDs {
|
||||
cc, ok := cdsStgToCC[cdsNodeID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -478,7 +478,7 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsN
|
|||
}
|
||||
|
||||
// 计算package在各节点的得分情况
|
||||
func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
colCli, err := schglb.CollectorMQPool.Acquire()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new collector client: %w", err)
|
||||
|
@ -504,8 +504,8 @@ func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map
|
|||
return nil, err
|
||||
}
|
||||
|
||||
for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
|
||||
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
|
||||
for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
|
||||
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.StorageID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@ package config
|
|||
|
||||
import (
|
||||
log "gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
|
||||
c "gitlink.org.cn/cloudream/common/utils/config"
|
||||
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||
|
@ -11,7 +11,7 @@ import (
|
|||
type Config struct {
|
||||
Logger log.Config `json:"logger"`
|
||||
RabbitMQ mymq.Config `json:"rabbitMQ"`
|
||||
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
|
||||
CloudreamStorage cdsapi.Config `json:"cloudreamStorage"`
|
||||
UnifyOps uopsdk.Config `json:"unifyOps"`
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"gitlink.org.cn/cloudream/common/consts/errorcode"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
|
||||
)
|
||||
|
@ -17,7 +17,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
|
|||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
resp, err := stgCli.Package().GetCachedNodes(cdssdk.PackageGetCachedNodesReq{
|
||||
resp, err := stgCli.Package().GetCachedStorages(cdsapi.PackageGetCachedStoragesReq{
|
||||
PackageID: msg.PackageID,
|
||||
UserID: msg.UserID,
|
||||
})
|
||||
|
@ -26,10 +26,10 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
|
|||
return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed")
|
||||
}
|
||||
|
||||
return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize))
|
||||
return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.StorageInfos, resp.PackageSize))
|
||||
}
|
||||
|
||||
func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) {
|
||||
func (svc *Service) PackageGetLoadedStgs(msg *colmq.PackageGetLoadedStgs) (*colmq.PackageGetLoadedStgsResp, *mq.CodeMessage) {
|
||||
stgCli, err := schglb.CloudreamStoragePool.Acquire()
|
||||
if err != nil {
|
||||
logger.Warnf("new storage client, err: %s", err.Error())
|
||||
|
@ -37,7 +37,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes
|
|||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
resp, err := stgCli.Package().GetLoadedNodes(cdssdk.PackageGetLoadedNodesReq{
|
||||
resp, err := stgCli.Package().GetLoadedStorages(cdsapi.PackageGetLoadedStoragesReq{
|
||||
PackageID: msg.PackageID,
|
||||
UserID: msg.UserID,
|
||||
})
|
||||
|
@ -46,5 +46,5 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes
|
|||
return nil, mq.Failed(errorcode.OperationFailed, "get package loaded stg nodes failed")
|
||||
}
|
||||
|
||||
return mq.ReplyOK(colmq.NewPackageGetLoadedStgNodesResp(resp.NodeIDs))
|
||||
return mq.ReplyOK(colmq.NewPackageGetLoadedStgsResp(resp.StorageIDs))
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package schglb
|
|||
import (
|
||||
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
|
||||
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
|
||||
|
@ -27,12 +27,12 @@ func InitMQPool(cfg *scmq.Config) {
|
|||
ManagerMQPool = mgrmq.NewPool(cfg)
|
||||
}
|
||||
|
||||
var CloudreamStoragePool cdssdk.Pool
|
||||
var CloudreamStorageConfig *cdssdk.Config
|
||||
var CloudreamStoragePool cdsapi.Pool
|
||||
var CloudreamStorageConfig *cdsapi.Config
|
||||
|
||||
func InitCloudreamStoragePool(cfg *cdssdk.Config) {
|
||||
func InitCloudreamStoragePool(cfg *cdsapi.Config) {
|
||||
CloudreamStorageConfig = cfg
|
||||
CloudreamStoragePool = cdssdk.NewPool(cfg)
|
||||
CloudreamStoragePool = cdsapi.NewPool(cfg)
|
||||
}
|
||||
|
||||
var UnifyOpsPool uopsdk.Pool
|
||||
|
|
|
@ -23,8 +23,6 @@ type ComputingCenter struct {
|
|||
UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID" db:"UOPSlwNodeID"`
|
||||
// 计算中心在PCM系统的ID
|
||||
PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID" db:"PCMParticipantID"`
|
||||
// 计算中心在存储系统的ID
|
||||
CDSNodeID cdssdk.NodeID `json:"cdsNodeID" db:"CDSNodeID"`
|
||||
// 此算力中心的存储服务对应在存储系统中的ID
|
||||
CDSStorageID cdssdk.StorageID `json:"cdsStorageID" db:"CDSStorageID"`
|
||||
// 计算中心名称
|
||||
|
|
|
@ -10,6 +10,9 @@ import (
|
|||
"gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config"
|
||||
)
|
||||
|
||||
// TODO 迁移到Gorm
|
||||
// TODO ComputingCenter去掉了CDSNodeID字段,需要修改DB的结构
|
||||
|
||||
type DB struct {
|
||||
d *sqlx.DB
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ func (s *Server) Stop() {
|
|||
s.rabbitSvr.Close()
|
||||
}
|
||||
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
|
||||
return s.rabbitSvr.Start()
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ func (s *Server) Stop() {
|
|||
s.rabbitSvr.Close()
|
||||
}
|
||||
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
|
||||
return s.rabbitSvr.Start()
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
type StorageService interface {
|
||||
PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes) (*PackageGetCachedStgNodesResp, *mq.CodeMessage)
|
||||
|
||||
PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes) (*PackageGetLoadedStgNodesResp, *mq.CodeMessage)
|
||||
PackageGetLoadedStgs(msg *PackageGetLoadedStgs) (*PackageGetLoadedStgsResp, *mq.CodeMessage)
|
||||
}
|
||||
|
||||
// 获取package的缓存分布情况
|
||||
|
@ -30,11 +30,11 @@ func NewPackageGetCachedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageI
|
|||
PackageID: packageID,
|
||||
}
|
||||
}
|
||||
func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp {
|
||||
func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.StoragePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp {
|
||||
return &PackageGetCachedStgNodesResp{
|
||||
PackageCachingInfo: cdssdk.PackageCachingInfo{
|
||||
NodeInfos: nodeInfos,
|
||||
PackageSize: packageSize,
|
||||
StorageInfos: nodeInfos,
|
||||
PackageSize: packageSize,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -43,29 +43,29 @@ func (c *Client) PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes, opts ..
|
|||
}
|
||||
|
||||
// 获取package的存储分布情况
|
||||
var _ = Register(Service.PackageGetLoadedStgNodes)
|
||||
var _ = Register(Service.PackageGetLoadedStgs)
|
||||
|
||||
type PackageGetLoadedStgNodes struct {
|
||||
type PackageGetLoadedStgs struct {
|
||||
mq.MessageBodyBase
|
||||
UserID cdssdk.UserID `json:"userID"`
|
||||
PackageID cdssdk.PackageID `json:"packageID"`
|
||||
}
|
||||
type PackageGetLoadedStgNodesResp struct {
|
||||
type PackageGetLoadedStgsResp struct {
|
||||
mq.MessageBodyBase
|
||||
StgNodeIDs []cdssdk.NodeID `json:"stgNodeIDs"`
|
||||
StgIDs []cdssdk.StorageID `json:"stgNodeIDs"`
|
||||
}
|
||||
|
||||
func NewPackageGetLoadedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgNodes {
|
||||
return &PackageGetLoadedStgNodes{
|
||||
func NewPackageGetLoadedStgs(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgs {
|
||||
return &PackageGetLoadedStgs{
|
||||
UserID: userID,
|
||||
PackageID: packageID,
|
||||
}
|
||||
}
|
||||
func NewPackageGetLoadedStgNodesResp(nodeIDs []cdssdk.NodeID) *PackageGetLoadedStgNodesResp {
|
||||
return &PackageGetLoadedStgNodesResp{
|
||||
StgNodeIDs: nodeIDs,
|
||||
func NewPackageGetLoadedStgsResp(stgIDs []cdssdk.StorageID) *PackageGetLoadedStgsResp {
|
||||
return &PackageGetLoadedStgsResp{
|
||||
StgIDs: stgIDs,
|
||||
}
|
||||
}
|
||||
func (c *Client) PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes, opts ...mq.RequestOption) (*PackageGetLoadedStgNodesResp, error) {
|
||||
return mq.Request(Service.PackageGetLoadedStgNodes, c.rabbitCli, msg, opts...)
|
||||
func (c *Client) PackageGetLoadedStgs(msg *PackageGetLoadedStgs, opts ...mq.RequestOption) (*PackageGetLoadedStgsResp, error) {
|
||||
return mq.Request(Service.PackageGetLoadedStgs, c.rabbitCli, msg, opts...)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package executor
|
|||
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||
"gitlink.org.cn/cloudream/common/utils/sync2"
|
||||
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||
)
|
||||
|
||||
|
@ -46,7 +47,7 @@ func (s *Server) Stop() {
|
|||
s.rabbitSvr.Close()
|
||||
}
|
||||
|
||||
func (s *Server) Serve() error {
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
|
||||
return s.rabbitSvr.Start()
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ func (c *HttpClient) CreateSugonInstance(token string, config map[string]interfa
|
|||
Token: token,
|
||||
}
|
||||
body, err := json.Marshal(config)
|
||||
resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{
|
||||
resp, err := http2.PostJSON(targetURL, http2.RequestParam{
|
||||
Body: body,
|
||||
Header: header,
|
||||
})
|
||||
|
|
|
@ -57,7 +57,7 @@ func (c *HttpClient) SubmitTask(req *StartTask) (*StartTaskResp, error) {
|
|||
}
|
||||
|
||||
data, err := serder.ObjectToJSONEx(req)
|
||||
resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{
|
||||
resp, err := http2.PostJSON(targetURL, http2.RequestParam{
|
||||
Body: data,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -128,7 +128,7 @@ func (c *HttpClient) OperateTask(req *TaskOperateInfo) (*TaskOperateResp, error)
|
|||
}
|
||||
|
||||
data, err := serder.ObjectToJSONEx(req)
|
||||
resp, err := http2.PostJSONRow(targetURL, http2.RequestParam{
|
||||
resp, err := http2.PostJSON(targetURL, http2.RequestParam{
|
||||
Body: data,
|
||||
})
|
||||
if err != nil {
|
||||
|
|
|
@ -6,18 +6,18 @@ type CacheMovePackage struct {
|
|||
TaskInfoBase
|
||||
UserID cdssdk.UserID `json:"userID"`
|
||||
PackageID cdssdk.PackageID `json:"packageID"`
|
||||
StgNodeID cdssdk.NodeID `json:"stgNodeID"`
|
||||
StgID cdssdk.StorageID `json:"stgID"`
|
||||
}
|
||||
type CacheMovePackageStatus struct {
|
||||
TaskStatusBase
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage {
|
||||
func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *CacheMovePackage {
|
||||
return &CacheMovePackage{
|
||||
UserID: userID,
|
||||
PackageID: packageID,
|
||||
StgNodeID: stgNodeID,
|
||||
StgID: stgID,
|
||||
}
|
||||
}
|
||||
func NewCacheMovePackageStatus(err string) *CacheMovePackageStatus {
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package task
|
||||
|
||||
import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
import (
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
)
|
||||
|
||||
type StorageMoveObject struct {
|
||||
TaskInfoBase
|
||||
ObjectMove cdssdk.ObjectMove `json:"objectMove"`
|
||||
ObjectMove cdsapi.ObjectMove `json:"objectMove"`
|
||||
}
|
||||
|
||||
type StorageMoveObjectStatus struct {
|
||||
|
@ -12,7 +14,7 @@ type StorageMoveObjectStatus struct {
|
|||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func NewStorageMoveObject(objectMove cdssdk.ObjectMove) *StorageMoveObject {
|
||||
func NewStorageMoveObject(objectMove cdsapi.ObjectMove) *StorageMoveObject {
|
||||
return &StorageMoveObject{
|
||||
ObjectMove: objectMove,
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func (s *Server) Stop() {
|
|||
s.rabbitSvr.Close()
|
||||
}
|
||||
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQLogEvent] {
|
||||
func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] {
|
||||
return s.rabbitSvr.Start()
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package prescheduler
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/inhies/go-bytesize"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
|
@ -171,14 +172,14 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int
|
|||
// 计算节点得分情况
|
||||
func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error {
|
||||
// 只计算运控返回的可用计算中心上的存储服务的数据权重
|
||||
cdsNodeToCC := make(map[cdssdk.NodeID]*candidate)
|
||||
cdsStgToCC := make(map[cdssdk.StorageID]*candidate)
|
||||
for _, cc := range allCCs {
|
||||
cdsNodeToCC[cc.CC.CDSNodeID] = cc
|
||||
cdsStgToCC[cc.CC.CDSStorageID] = cc
|
||||
}
|
||||
|
||||
//计算code相关得分
|
||||
if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok {
|
||||
codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
|
||||
codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsStgToCC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("calc code file score: %w", err)
|
||||
}
|
||||
|
@ -189,7 +190,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma
|
|||
|
||||
//计算dataset相关得分
|
||||
if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok {
|
||||
datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC)
|
||||
datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsStgToCC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("calc dataset file score: %w", err)
|
||||
}
|
||||
|
@ -201,7 +202,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma
|
|||
//计算image相关得分
|
||||
if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok {
|
||||
//计算image相关得分
|
||||
imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC)
|
||||
imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsStgToCC)
|
||||
if err != nil {
|
||||
return fmt.Errorf("calc image file score: %w", err)
|
||||
}
|
||||
|
@ -223,7 +224,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma
|
|||
}
|
||||
|
||||
// 计算package在各节点的得分情况
|
||||
func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
colCli, err := schglb.CollectorMQPool.Acquire()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new collector client: %w", err)
|
||||
|
@ -238,8 +239,8 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c
|
|||
return nil, err
|
||||
}
|
||||
|
||||
for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
|
||||
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
|
||||
for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
|
||||
cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -251,13 +252,13 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c
|
|||
}
|
||||
|
||||
// TODO UserID
|
||||
loadedResp, err := colCli.PackageGetLoadedStgNodes(collector.NewPackageGetLoadedStgNodes(1, packageID))
|
||||
loadedResp, err := colCli.PackageGetLoadedStgs(collector.NewPackageGetLoadedStgs(1, packageID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, cdsNodeID := range loadedResp.StgNodeIDs {
|
||||
cc, ok := cdsNodeToCC[cdsNodeID]
|
||||
for _, cdsStgID := range loadedResp.StgIDs {
|
||||
cc, ok := cdsStgToCC[cdsStgID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
@ -276,7 +277,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, c
|
|||
}
|
||||
|
||||
// 计算package在各节点的得分情况
|
||||
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsStgToCC map[cdssdk.StorageID]*candidate) (map[schsdk.CCID]*fileDetail, error) {
|
||||
colCli, err := schglb.CollectorMQPool.Acquire()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("new collector client: %w", err)
|
||||
|
@ -302,8 +303,8 @@ func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs
|
|||
return nil, err
|
||||
}
|
||||
|
||||
for _, cdsNodeCacheInfo := range cachedResp.NodeInfos {
|
||||
cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID]
|
||||
for _, cdsNodeCacheInfo := range cachedResp.StorageInfos {
|
||||
cc, ok := cdsStgToCC[cdsNodeCacheInfo.StorageID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -471,30 +471,30 @@ func (s *DefaultPreScheduler) scheduleForSingleJob(job *schedulingJob, ccs map[s
|
|||
func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(files schsdk.JobFilesInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) {
|
||||
if localFile, ok := files.Dataset.(*schsdk.LocalJobFileInfo); ok {
|
||||
if _, ok := schemes[localFile.LocalPath]; !ok {
|
||||
cdsNodeID := ccs[targetCCID].CDSNodeID
|
||||
cdsNodeID := ccs[targetCCID].CDSStorageID
|
||||
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
|
||||
LocalPath: localFile.LocalPath,
|
||||
UploadToCDSNodeID: &cdsNodeID,
|
||||
LocalPath: localFile.LocalPath,
|
||||
UploadToCDStorageID: cdsNodeID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if localFile, ok := files.Code.(*schsdk.LocalJobFileInfo); ok {
|
||||
if _, ok := schemes[localFile.LocalPath]; !ok {
|
||||
cdsNodeID := ccs[targetCCID].CDSNodeID
|
||||
cdsNodeID := ccs[targetCCID].CDSStorageID
|
||||
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
|
||||
LocalPath: localFile.LocalPath,
|
||||
UploadToCDSNodeID: &cdsNodeID,
|
||||
LocalPath: localFile.LocalPath,
|
||||
UploadToCDStorageID: cdsNodeID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if localFile, ok := files.Image.(*schsdk.LocalJobFileInfo); ok {
|
||||
if _, ok := schemes[localFile.LocalPath]; !ok {
|
||||
cdsNodeID := ccs[targetCCID].CDSNodeID
|
||||
cdsNodeID := ccs[targetCCID].CDSStorageID
|
||||
schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{
|
||||
LocalPath: localFile.LocalPath,
|
||||
UploadToCDSNodeID: &cdsNodeID,
|
||||
LocalPath: localFile.LocalPath,
|
||||
UploadToCDStorageID: cdsNodeID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
log "gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
c "gitlink.org.cn/cloudream/common/utils/config"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||
|
@ -19,7 +19,7 @@ type Config struct {
|
|||
Logger log.Config `json:"logger"`
|
||||
ReportIntervalSec int `json:"reportIntervalSec"`
|
||||
RabbitMQ mymq.Config `json:"rabbitMQ"`
|
||||
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
|
||||
CloudreamStorage cdsapi.Config `json:"cloudreamStorage"`
|
||||
PCM pcmsdk.Config `json:"pcm"`
|
||||
Application Application `json:"application"`
|
||||
Rclone schsdk.Rclone `json:"rclone"`
|
||||
|
|
|
@ -2,8 +2,9 @@ package task
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
||||
)
|
||||
|
@ -38,10 +39,10 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
|
|||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
_, err = stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{
|
||||
_, err = stgCli.CacheMovePackage(cdsapi.CacheMovePackageReq{
|
||||
UserID: t.UserID,
|
||||
PackageID: t.PackageID,
|
||||
NodeID: t.StgNodeID,
|
||||
StorageID: t.StgID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -2,8 +2,9 @@ package task
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
||||
)
|
||||
|
@ -37,7 +38,7 @@ func (t *StorageCreatePackage) do(task *Task, ctx TaskContext) error {
|
|||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
resp, err := stgCli.StorageCreatePackage(cdssdk.StorageCreatePackageReq{
|
||||
resp, err := stgCli.StorageCreatePackage(cdsapi.StorageCreatePackageReq{
|
||||
UserID: t.UserID,
|
||||
StorageID: t.StorageID,
|
||||
Path: t.Path,
|
||||
|
|
|
@ -2,8 +2,9 @@ package task
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
|
||||
)
|
||||
|
@ -39,7 +40,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) {
|
|||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
resp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{
|
||||
resp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
|
||||
UserID: t.UserID,
|
||||
PackageID: t.PackageID,
|
||||
StorageID: t.StorageID,
|
||||
|
|
|
@ -3,7 +3,7 @@ package config
|
|||
import (
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
"gitlink.org.cn/cloudream/common/utils/config"
|
||||
db "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config"
|
||||
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||
|
@ -13,7 +13,7 @@ type Config struct {
|
|||
Logger logger.Config `json:"logger"`
|
||||
RabbitMQ scmq.Config `json:"rabbitMQ"`
|
||||
DB db.Config `json:"db"`
|
||||
CloudreamStorage cdssdk.Config `json:"cloudreamStorage"`
|
||||
CloudreamStorage cdsapi.Config `json:"cloudreamStorage"`
|
||||
ReportTimeoutSecs int `json:"reportTimeoutSecs"`
|
||||
CDSRclone schsdk.Rclone `json:"CDSRclone"`
|
||||
}
|
||||
|
|
|
@ -5,11 +5,11 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||
|
@ -128,7 +128,7 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState
|
|||
if scheme.Action == jobmod.ActionMove {
|
||||
logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID)
|
||||
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("moving package: %w", err)
|
||||
}
|
||||
|
@ -169,12 +169,15 @@ func (s *Adjusting) doPackageScheduling(ctx context.Context, rtx jobmgr.JobState
|
|||
|
||||
func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme) error {
|
||||
if scheme.Action == jobmod.ActionImportImage {
|
||||
// TODO 镜像文件位置需要重新设计
|
||||
return fmt.Errorf("not implemented yet")
|
||||
|
||||
if file.PackageID == nil {
|
||||
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID)
|
||||
}
|
||||
|
||||
// TODO UserID
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("moving package: %w", err)
|
||||
}
|
||||
|
@ -194,7 +197,7 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu
|
|||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
// TODO UserID
|
||||
pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
|
||||
pkgObjs, err := stgCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting package objects: %w", err)
|
||||
}
|
||||
|
@ -207,27 +210,27 @@ func (s *Adjusting) doImageScheduling(ctx context.Context, rtx jobmgr.JobStateRu
|
|||
return fmt.Errorf("there must be only 1 object in the package which will be imported")
|
||||
}
|
||||
|
||||
taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("moving package: %w", err)
|
||||
}
|
||||
// taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdsapi.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("moving package: %w", err)
|
||||
// }
|
||||
|
||||
fut2 := taskStatus2.Receive()
|
||||
status2 := <-fut2.Chan()
|
||||
if err != nil {
|
||||
return fmt.Errorf("uploading image: %w", err)
|
||||
}
|
||||
// fut2 := taskStatus2.Receive()
|
||||
// status2 := <-fut2.Chan()
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("uploading image: %w", err)
|
||||
// }
|
||||
|
||||
uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus)
|
||||
if uploadStatus.Error != "" {
|
||||
return fmt.Errorf("uploading image: %s", uploadStatus.Error)
|
||||
}
|
||||
// uploadStatus := status2.Value.Status.(*exectsk.UploadImageStatus)
|
||||
// if uploadStatus.Error != "" {
|
||||
// return fmt.Errorf("uploading image: %s", uploadStatus.Error)
|
||||
// }
|
||||
|
||||
// TODO 镜像名称
|
||||
err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating image info: %w", err)
|
||||
}
|
||||
// // TODO 镜像名称
|
||||
// err = rtx.Mgr.DB.PCMImage().Create(rtx.Mgr.DB.SQLCtx(), file.ImageID, targetCCID, uploadStatus.PCMImageID, uploadStatus.Name, time.Now())
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("creating image info: %w", err)
|
||||
// }
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,15 +3,17 @@ package state
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/future"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||
|
@ -146,7 +148,7 @@ func loadDatasetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storag
|
|||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{
|
||||
loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
|
||||
UserID: userID,
|
||||
PackageID: packageID,
|
||||
StorageID: storageID,
|
||||
|
@ -426,7 +428,7 @@ func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, dataSetPath string, output
|
|||
return cmd, envs
|
||||
}
|
||||
|
||||
func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdssdk.StorageGetResp, error) {
|
||||
func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdsapi.StorageGetResp, error) {
|
||||
ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("getting computing center info: %w", err)
|
||||
|
@ -437,7 +439,7 @@ func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID,
|
|||
return nil, nil, fmt.Errorf("new cds client: %w", err)
|
||||
}
|
||||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
|
||||
getStg, err := stgCli.StorageGet(cdsapi.StorageGet{
|
||||
UserID: userID,
|
||||
StorageID: ccInfo.CDSStorageID,
|
||||
})
|
||||
|
|
|
@ -3,10 +3,12 @@ package state
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitlink.org.cn/cloudream/common/pkgs/future"
|
||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr"
|
||||
|
@ -77,12 +79,12 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
|
|||
}
|
||||
|
||||
userID := cdssdk.UserID(1)
|
||||
getStg, err := stgCli.StorageGet(cdssdk.StorageGet{
|
||||
getStg, err := stgCli.StorageGet(cdsapi.StorageGet{
|
||||
UserID: userID,
|
||||
StorageID: ccInfo.CDSStorageID,
|
||||
})
|
||||
|
||||
loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{
|
||||
loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
|
||||
UserID: userID,
|
||||
PackageID: dtrJob.DataReturnPackageID,
|
||||
StorageID: getStg.StorageID,
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||
|
||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
|
||||
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||
|
@ -148,9 +148,9 @@ func (s *PreScheduling) doPackageScheduling(ctx context.Context, rtx jobmgr.JobS
|
|||
}
|
||||
|
||||
if scheme.Action == jobmod.ActionMove {
|
||||
logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSNodeID)
|
||||
logger.Debugf("begin move pacakge %v to %v", file.PackageID, s.targetCCInfo.CDSStorageID)
|
||||
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("moving package: %w", err)
|
||||
}
|
||||
|
@ -223,12 +223,15 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
|
|||
}
|
||||
|
||||
if scheme.Action == jobmod.ActionImportImage {
|
||||
// TODO 需要重新设计镜像导入流程
|
||||
return fmt.Errorf("not implemented")
|
||||
|
||||
if file.PackageID == nil {
|
||||
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, s.targetCCInfo.CCID)
|
||||
}
|
||||
|
||||
// TODO UserID
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSNodeID), s.targetCCInfo)
|
||||
taskStatus, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewCacheMovePackage(1, *file.PackageID, s.targetCCInfo.CDSStorageID), s.targetCCInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("moving package: %w", err)
|
||||
}
|
||||
|
@ -248,7 +251,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
|
|||
defer schglb.CloudreamStoragePool.Release(stgCli)
|
||||
|
||||
// TODO UserID
|
||||
pkgObjs, err := stgCli.Object().GetPackageObjects(cdssdk.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
|
||||
pkgObjs, err := stgCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{UserID: 1, PackageID: *file.PackageID})
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting package objects: %w", err)
|
||||
}
|
||||
|
@ -261,7 +264,7 @@ func (s *PreScheduling) doImageScheduling(ctx context.Context, rtx jobmgr.JobSta
|
|||
return fmt.Errorf("there must be only 1 object in the package which will be imported")
|
||||
}
|
||||
|
||||
taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
|
||||
taskStatus2, err := rtx.Mgr.ExecMgr.StartTask(exectsk.NewUploadImage(s.targetCCInfo.PCMParticipantID, cdsapi.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)), s.targetCCInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("moving package: %w", err)
|
||||
}
|
||||
|
|
|
@ -198,7 +198,7 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService
|
|||
var jobSetServiceInfos []schsdk.JobSetServiceInfo
|
||||
|
||||
for _, jo := range jobs {
|
||||
var cdsNodeID *cdssdk.NodeID
|
||||
var cdsNodeID cdssdk.StorageID
|
||||
|
||||
norJob, ok := jo.Body.(*jobmod.NormalJobDump)
|
||||
if !ok {
|
||||
|
@ -212,20 +212,16 @@ func (svc *Service) GetServiceList(msg *mgrmq.GetServiceList) (*mgrmq.GetService
|
|||
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get cdsNodeID failed by CCID: %s", err.Error()))
|
||||
}
|
||||
|
||||
cdsNodeID = &computingCenter.CDSNodeID
|
||||
|
||||
} else {
|
||||
//返回空指针,表明查询任务不在执行状态,没有id
|
||||
cdsNodeID = nil
|
||||
cdsNodeID = computingCenter.CDSStorageID
|
||||
}
|
||||
|
||||
norJobInfo := jo.Info.(*schsdk.NormalJobInfo)
|
||||
for _, servicePortInfo := range norJobInfo.Services.ServicePortInfos {
|
||||
jobSetServiceInfo := schsdk.JobSetServiceInfo{
|
||||
Name: servicePortInfo.Name,
|
||||
Port: servicePortInfo.Port,
|
||||
CDSNodeID: cdsNodeID,
|
||||
LocalJobID: norJobInfo.LocalJobID,
|
||||
Name: servicePortInfo.Name,
|
||||
Port: servicePortInfo.Port,
|
||||
CDSStorageID: cdsNodeID,
|
||||
LocalJobID: norJobInfo.LocalJobID,
|
||||
}
|
||||
jobSetServiceInfos = append(jobSetServiceInfos, jobSetServiceInfo)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue