增加任务完整执行过程所需的服务接口

This commit is contained in:
Sydonian 2023-09-08 16:39:16 +08:00
parent d71de178ea
commit ebbfabd689
29 changed files with 802 additions and 388 deletions

View File

@ -1,9 +1,12 @@
package prescheduler
import "gitlink.org.cn/cloudream/common/models"
import (
"gitlink.org.cn/cloudream/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type PreScheduler interface {
Schedule(info models.JobSetInfo) (*models.JobSetPreScheduleScheme, *models.JobSetFilesUploadScheme, error)
Schedule(info models.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *models.JobSetFilesUploadScheme, error)
}
type DefaultPreScheduler struct {
@ -13,6 +16,6 @@ func NewDefaultPreScheduler() *DefaultPreScheduler {
return &DefaultPreScheduler{}
}
func (s *DefaultPreScheduler) Schedule(info models.JobSetInfo) (*models.JobSetPreScheduleScheme, *models.JobSetFilesUploadScheme, error) {
func (s *DefaultPreScheduler) Schedule(info models.JobSetInfo) (*jobmod.JobSetPreScheduleScheme, *models.JobSetFilesUploadScheme, error) {
}

View File

@ -9,10 +9,11 @@ import (
)
type Config struct {
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cldstg.Config `json:"cloudreamStorage"`
UnifyOps uniops.Config `json:"unifyOps"`
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
CloudreamStorage cldstg.Config `json:"cloudreamStorage"`
UnifyOps uniops.Config `json:"unifyOps"`
SlwNodes []SlwNodeConfig `json:"slwNodes"`
// PCM cldstg.Config `json:"pcm"`
}
@ -25,3 +26,9 @@ func Init() error {
func Cfg() *Config {
return &cfg
}
type SlwNodeConfig struct {
SlwNodeID int64 `json:"slwNodeID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
}

View File

@ -1,4 +1,4 @@
package services
package mq
import (
"gitlink.org.cn/cloudream/common/api/unifyops"
@ -10,23 +10,6 @@ import (
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
func (svc *Service) GetSlwNodeInfo(msg *colmq.GetSlwNodeInfo) (*colmq.GetSlwNodeInfoResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetSlwNodeInfoResp](errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
resps, err := uniOpsCli.GetSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetSlwNodeInfoResp](errorcode.OperationFailed, "get slwNode info failed")
}
return mq.ReplyOK(colmq.NewGetSlwNodeInfoResp(resps.Nodes))
}
func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.GetOneResourceDataResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
@ -38,28 +21,28 @@ func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.Ge
var resp models.ResourceData
switch msg.ResourceType {
case models.ResourceTypeCPU:
resp, err = uniOpsCli.GetCPUData(unifyops.Node{
NodeId: msg.NodeId,
resp, err = uniOpsCli.GetCPUData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
case models.ResourceTypeNPU:
resp, err = uniOpsCli.GetNPUData(unifyops.Node{
NodeId: msg.NodeId,
resp, err = uniOpsCli.GetNPUData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
case models.ResourceTypeGPU:
resp, err = uniOpsCli.GetGPUData(unifyops.Node{
NodeId: msg.NodeId,
resp, err = uniOpsCli.GetGPUData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
case models.ResourceTypeMLU:
resp, err = uniOpsCli.GetMLUData(unifyops.Node{
NodeId: msg.NodeId,
resp, err = uniOpsCli.GetMLUData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
case models.ResourceTypeStorage:
resp, err = uniOpsCli.GetStorageData(unifyops.Node{
NodeId: msg.NodeId,
resp, err = uniOpsCli.GetStorageData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
case models.ResourceTypeMemory:
resp, err = uniOpsCli.GetMemoryData(unifyops.Node{
NodeId: msg.NodeId,
resp, err = uniOpsCli.GetMemoryData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
default:
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "invalid resource type")
@ -81,8 +64,8 @@ func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.Ge
}
defer uniOpsCli.Close()
resps, err := uniOpsCli.GetIndicatorData(unifyops.Node{
NodeId: msg.NodeId,
resps, err := uniOpsCli.GetIndicatorData(unifyops.GetOneResourceDataReq{
SlwNodeID: msg.SlwNodeID,
})
if err != nil {
logger.Warnf("get all resource data failed, err: %s", err.Error())

View File

@ -1,4 +1,4 @@
package services
package mq
type Service struct {
}

View File

@ -0,0 +1,76 @@
package mq
import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/collector/internal/config"
"gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
func (svc *Service) GetSlwNodeInfo(msg *colmq.GetSlwNodeInfo) (*colmq.GetSlwNodeInfoResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
resp, err := uniOpsCli.GetAllSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed")
}
node, ok := lo.Find(resp.Nodes, func(item models.SlwNode) bool { return item.ID == msg.SlwNodeID })
if !ok {
logger.WithField("SlwNodeID", msg.SlwNodeID).
Warnf("slw node not found")
return nil, mq.Failed(errorcode.OperationFailed, "slw node not found")
}
// TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的所以这里进行两个数据源的合并
nodeConfig, ok := lo.Find(config.Cfg().SlwNodes, func(item config.SlwNodeConfig) bool { return item.SlwNodeID == msg.SlwNodeID })
if !ok {
logger.WithField("SlwNodeID", msg.SlwNodeID).
Warnf("config not found for this slw node")
return nil, mq.Failed(errorcode.OperationFailed, "config not found for this slw node")
}
node.StgNodeID = nodeConfig.StgNodeID
node.StorageID = nodeConfig.StorageID
return mq.ReplyOK(colmq.NewGetSlwNodeInfoResp(node))
}
func (svc *Service) GetAllSlwNodeInfo(msg *colmq.GetAllSlwNodeInfo) (*colmq.GetAllSlwNodeInfoResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllSlwNodeInfoResp](errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
resp, err := uniOpsCli.GetAllSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllSlwNodeInfoResp](errorcode.OperationFailed, "get slwNode info failed")
}
// TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的所以这里进行两个数据源的合并
for i, node := range resp.Nodes {
nodeConfig, ok := lo.Find(config.Cfg().SlwNodes, func(item config.SlwNodeConfig) bool { return item.SlwNodeID == node.ID })
if !ok {
continue
}
node.StgNodeID = nodeConfig.StgNodeID
node.StorageID = nodeConfig.StorageID
resp.Nodes[i] = node
}
return mq.ReplyOK(colmq.NewGetAllSlwNodeInfoResp(resp.Nodes))
}

View File

@ -0,0 +1,50 @@
package mq
import (
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes) (*colmq.PackageGetCachedStgNodesResp, *mq.CodeMessage) {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedStgNodesResp](errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
resp, err := stgCli.PackageGetCachedNodes(storage.PackageGetCachedNodesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package cached stg nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedStgNodesResp](errorcode.OperationFailed, "get package cached stg nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType))
}
func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedStgNodesResp](errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
resp, err := stgCli.PackageGetLoadedNodes(storage.PackageGetLoadedNodesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package loaded stg nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedStgNodesResp](errorcode.OperationFailed, "get package loaded stg nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetLoadedStgNodesResp(resp.NodeIDs))
}

View File

@ -1,50 +0,0 @@
package services
import (
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
func (svc *Service) PackageGetCachedNodes(msg *colmq.PackageGetCachedNodes) (*colmq.PackageGetCachedNodesResp, *mq.CodeMessage) {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedNodesResp](errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
resp, err := stgCli.PackageGetCachedNodes(storage.PackageGetCachedNodesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package cached nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedNodesResp](errorcode.OperationFailed, "get package cached nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetCachedNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType))
}
func (svc *Service) PackageGetLoadedNodes(msg *colmq.PackageGetLoadedNodes) (*colmq.PackageGetLoadedNodesResp, *mq.CodeMessage) {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedNodesResp](errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
resp, err := stgCli.PackageGetLoadedNodes(storage.PackageGetLoadedNodesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package loaded nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedNodesResp](errorcode.OperationFailed, "get package loaded nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetLoadedNodesResp(resp.NodeIDs))
}

View File

@ -6,7 +6,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/collector/internal/config"
"gitlink.org.cn/cloudream/scheduler/collector/internal/services"
"gitlink.org.cn/cloudream/scheduler/collector/internal/mq"
"gitlink.org.cn/cloudream/scheduler/common/globals"
colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
)
@ -27,7 +27,7 @@ func main() {
globals.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
globals.IniUnifyOpsPool(&config.Cfg().UnifyOps)
mqSvr, err := colmq.NewServer(services.NewService(), &config.Cfg().RabbitMQ)
mqSvr, err := colmq.NewServer(mq.NewService(), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new collector server failed, err: %s", err.Error())
}

View File

@ -1,11 +1,46 @@
package job
import "gitlink.org.cn/cloudream/common/models"
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/reflect"
)
const (
FileScheduleActionNo = "No"
FileScheduleActionMove = "Move"
FileScheduleActionLoad = "Load"
)
type FileScheduleScheme struct {
Action string `json:"action"`
TargetStorageID int64 `json:"targetStorageID"`
}
const (
ImageScheduleActionNo = "No"
ImageScheduleActionImport = "Import"
)
type ImageScheduleScheme struct {
Action string `json:"action"`
}
type JobScheduleScheme struct {
TargetSlwNodeID int64 `json:"targetSlwNodeID"`
Dataset FileScheduleScheme `json:"dataset"`
Code FileScheduleScheme `json:"code"`
Image ImageScheduleScheme `json:"image"`
}
type JobSetPreScheduleScheme struct {
JobSchemes map[string]JobScheduleScheme `json:"jobSchemes"` // 任务的预调度方案。Key为LocalJobIDs
}
type JobSet struct {
JobSetID string // 全局唯一的任务集ID
Jobs []JobSetJobRef // 任务集中包含的任务,只是一个引用
PreScheduleScheme models.JobSetPreScheduleScheme
PreScheduleScheme JobSetPreScheduleScheme
}
type JobSetJobRef struct {
@ -13,6 +48,13 @@ type JobSetJobRef struct {
LocalJobID string // 在当前任务集内的任务ID
}
type Job interface{}
var JobTypeUnion = types.NewTypeUnion[Job](
reflect.TypeOf[NormalJob](),
reflect.TypeOf[ResourceJob](),
)
type NormalJob struct {
JobSetID string // 任务集ID
JobID string // 全局唯一任务ID

View File

@ -1,152 +0,0 @@
package collector
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
// 获取package的缓存分布情况
var _ = Register(Service.PackageGetCachedNodes)
type PackageGetCachedNodes struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
func NewPackageGetCachedNodes(userID int64, packageID int64) PackageGetCachedNodes {
return PackageGetCachedNodes{
UserID: userID,
PackageID: packageID,
}
}
type PackageGetCachedNodesResp struct {
models.PackageCachingInfo
}
func NewPackageGetCachedNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) PackageGetCachedNodesResp {
return PackageGetCachedNodesResp{
PackageCachingInfo: models.PackageCachingInfo{
NodeInfos: nodeInfos,
PackageSize: packageSize,
RedunancyType: redunancyType,
},
}
}
func (c *Client) PackageGetCachedNodes(msg PackageGetCachedNodes, opts ...mq.RequestOption) (*PackageGetCachedNodesResp, error) {
return mq.Request[PackageGetCachedNodesResp](c.rabbitCli, msg, opts...)
}
// 获取package的存储分布情况
var _ = Register(Service.PackageGetLoadedNodes)
type PackageGetLoadedNodes struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
func NewPackageGetLoadedNodes(userID int64, packageID int64) PackageGetLoadedNodes {
return PackageGetLoadedNodes{
UserID: userID,
PackageID: packageID,
}
}
type PackageGetLoadedNodesResp struct {
NodeIDs []int64 `json:"nodeIDs"`
}
func NewPackageGetLoadedNodesResp(nodeIDs []int64) PackageGetLoadedNodesResp {
return PackageGetLoadedNodesResp{
NodeIDs: nodeIDs,
}
}
func (c *Client) PackageGetLoadedNodes(msg PackageGetLoadedNodes, opts ...mq.RequestOption) (*PackageGetLoadedNodesResp, error) {
return mq.Request[PackageGetLoadedNodesResp](c.rabbitCli, msg, opts...)
}
// 获取节点信息
var _ = Register(Service.GetSlwNodeInfo)
type GetSlwNodeInfo struct {
}
func NewGetSlwNodeInfo() GetSlwNodeInfoResp {
return GetSlwNodeInfoResp{}
}
type GetSlwNodeInfoResp struct {
Nodes []models.SlwNode `json:"nodes"`
}
func NewGetSlwNodeInfoResp(nodes []models.SlwNode) GetSlwNodeInfoResp {
return GetSlwNodeInfoResp{
Nodes: nodes,
}
}
func (c *Client) GetSlwNodeInfo(msg GetSlwNodeInfo, opts ...mq.RequestOption) (*[]GetSlwNodeInfoResp, error) {
return mq.Request[[]GetSlwNodeInfoResp](c.rabbitCli, msg, opts...)
}
// 根据nodeID和类型名称获取节点中某一类型的信息
var _ = Register(Service.GetOneResourceData)
type GetOneResourceData struct {
NodeId int64 `json:"nodeId"`
ResourceType string `json:"type"`
}
func NewGetOneResourceData(nodeId int64, resourceType string) GetOneResourceData {
return GetOneResourceData{
NodeId: nodeId,
ResourceType: resourceType,
}
}
type GetOneResourceDataResp struct {
Data models.ResourceData `json:"data"`
}
func NewGetOneResourceDataResp(data models.ResourceData) GetOneResourceDataResp {
return GetOneResourceDataResp{
Data: data,
}
}
func (c *Client) GetOneResourceData(msg GetOneResourceData, opts ...mq.RequestOption) (*GetOneResourceDataResp, error) {
return mq.Request[GetOneResourceDataResp](c.rabbitCli, msg, opts...)
}
// 根据nodeID获取节点全部资源信息
var _ = Register(Service.GetAllResourceData)
type GetAllResourceData struct {
NodeId int64 `json:"nodeId"`
}
func NewGetAllResourceData(nodeId int64) GetAllResourceData {
return GetAllResourceData{
NodeId: nodeId,
}
}
type GetAllResourceDataResp struct {
Datas []models.ResourceData `json:"datas"`
}
func NewGetAllResourceDataResp(datas []models.ResourceData) GetAllResourceDataResp {
return GetAllResourceDataResp{
Datas: datas,
}
}
func (c *Client) GetAllResourceData(msg GetAllResourceData, opts ...mq.RequestOption) (*GetAllResourceDataResp, error) {
return mq.Request[GetAllResourceDataResp](c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(models.ResourceDataTypeUnion)
}

View File

@ -0,0 +1,72 @@
package collector
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
type ResourceService interface {
GetOneResourceData(msg *GetOneResourceData) (*GetOneResourceDataResp, *mq.CodeMessage)
GetAllResourceData(msg *GetAllResourceData) (*GetAllResourceDataResp, *mq.CodeMessage)
}
// 根据nodeID和类型名称获取节点中某一类型的信息
var _ = Register(Service.GetOneResourceData)
type GetOneResourceData struct {
SlwNodeID int64 `json:"slwNodeID"`
ResourceType string `json:"type"`
}
func NewGetOneResourceData(nodeId int64, resourceType string) GetOneResourceData {
return GetOneResourceData{
SlwNodeID: nodeId,
ResourceType: resourceType,
}
}
type GetOneResourceDataResp struct {
Data models.ResourceData `json:"data"`
}
func NewGetOneResourceDataResp(data models.ResourceData) GetOneResourceDataResp {
return GetOneResourceDataResp{
Data: data,
}
}
func (c *Client) GetOneResourceData(msg GetOneResourceData, opts ...mq.RequestOption) (*GetOneResourceDataResp, error) {
return mq.Request[GetOneResourceDataResp](c.rabbitCli, msg, opts...)
}
// 根据nodeID获取节点全部资源信息
var _ = Register(Service.GetAllResourceData)
type GetAllResourceData struct {
SlwNodeID int64 `json:"slwNodeID"`
}
func NewGetAllResourceData(nodeId int64) GetAllResourceData {
return GetAllResourceData{
SlwNodeID: nodeId,
}
}
type GetAllResourceDataResp struct {
Datas []models.ResourceData `json:"datas"`
}
func NewGetAllResourceDataResp(datas []models.ResourceData) GetAllResourceDataResp {
return GetAllResourceDataResp{
Datas: datas,
}
}
func (c *Client) GetAllResourceData(msg GetAllResourceData, opts ...mq.RequestOption) (*GetAllResourceDataResp, error) {
return mq.Request[GetAllResourceDataResp](c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(models.ResourceDataTypeUnion)
}

View File

@ -10,12 +10,11 @@ const (
)
type Service interface {
PackageGetCachedNodes(msg *PackageGetCachedNodes) (*PackageGetCachedNodesResp, *mq.CodeMessage)
PackageGetLoadedNodes(msg *PackageGetLoadedNodes) (*PackageGetLoadedNodesResp, *mq.CodeMessage)
ResourceService
GetSlwNodeInfo(msg *GetSlwNodeInfo) (*GetSlwNodeInfoResp, *mq.CodeMessage)
GetOneResourceData(msg *GetOneResourceData) (*GetOneResourceDataResp, *mq.CodeMessage)
GetAllResourceData(msg *GetAllResourceData) (*GetAllResourceDataResp, *mq.CodeMessage)
SlwService
StorageService
}
type Server struct {

View File

@ -0,0 +1,56 @@
package collector
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
type SlwService interface {
GetSlwNodeInfo(msg *GetSlwNodeInfo) (*GetSlwNodeInfoResp, *mq.CodeMessage)
GetAllSlwNodeInfo(msg *GetAllSlwNodeInfo) (*GetAllSlwNodeInfoResp, *mq.CodeMessage)
}
// 获取单个节点的信息
var _ = Register(Service.GetSlwNodeInfo)
type GetSlwNodeInfo struct {
SlwNodeID int64 `json:"slwNodeID"`
}
type GetSlwNodeInfoResp struct {
models.SlwNode
}
func NewGetSlwNodeInfo(slwNodeID int64) GetSlwNodeInfo {
return GetSlwNodeInfo{
SlwNodeID: slwNodeID,
}
}
func NewGetSlwNodeInfoResp(node models.SlwNode) GetSlwNodeInfoResp {
return GetSlwNodeInfoResp{
SlwNode: node,
}
}
func (c *Client) GetSlwNodeInfo(msg GetSlwNodeInfo, opts ...mq.RequestOption) (*[]GetSlwNodeInfoResp, error) {
return mq.Request[[]GetSlwNodeInfoResp](c.rabbitCli, msg, opts...)
}
// 获取所有节点信息
var _ = Register(Service.GetAllSlwNodeInfo)
type GetAllSlwNodeInfo struct{}
type GetAllSlwNodeInfoResp struct {
Nodes []models.SlwNode `json:"nodes"`
}
func NewGetAllSlwNodeInfo() GetAllSlwNodeInfoResp {
return GetAllSlwNodeInfoResp{}
}
func NewGetAllSlwNodeInfoResp(nodes []models.SlwNode) GetAllSlwNodeInfoResp {
return GetAllSlwNodeInfoResp{
Nodes: nodes,
}
}
func (c *Client) GetAllSlwNodeInfo(msg GetAllSlwNodeInfo, opts ...mq.RequestOption) (*[]GetAllSlwNodeInfoResp, error) {
return mq.Request[[]GetAllSlwNodeInfoResp](c.rabbitCli, msg, opts...)
}

View File

@ -0,0 +1,68 @@
package collector
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
type StorageService interface {
PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes) (*PackageGetCachedStgNodesResp, *mq.CodeMessage)
PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes) (*PackageGetLoadedStgNodesResp, *mq.CodeMessage)
}
// 获取package的缓存分布情况
var _ = Register(Service.PackageGetCachedStgNodes)
type PackageGetCachedStgNodes struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetCachedStgNodesResp struct {
models.PackageCachingInfo
}
func NewPackageGetCachedStgNodes(userID int64, packageID int64) PackageGetCachedStgNodes {
return PackageGetCachedStgNodes{
UserID: userID,
PackageID: packageID,
}
}
func NewPackageGetCachedStgNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) PackageGetCachedStgNodesResp {
return PackageGetCachedStgNodesResp{
PackageCachingInfo: models.PackageCachingInfo{
NodeInfos: nodeInfos,
PackageSize: packageSize,
RedunancyType: redunancyType,
},
}
}
func (c *Client) PackageGetCachedStgNodes(msg PackageGetCachedStgNodes, opts ...mq.RequestOption) (*PackageGetCachedStgNodesResp, error) {
return mq.Request[PackageGetCachedStgNodesResp](c.rabbitCli, msg, opts...)
}
// 获取package的存储分布情况
var _ = Register(Service.PackageGetLoadedStgNodes)
type PackageGetLoadedStgNodes struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetLoadedStgNodesResp struct {
StgNodeIDs []int64 `json:"stgNodeIDs"`
}
func NewPackageGetLoadedStgNodes(userID int64, packageID int64) PackageGetLoadedStgNodes {
return PackageGetLoadedStgNodes{
UserID: userID,
PackageID: packageID,
}
}
func NewPackageGetLoadedStgNodesResp(nodeIDs []int64) PackageGetLoadedStgNodesResp {
return PackageGetLoadedStgNodesResp{
StgNodeIDs: nodeIDs,
}
}
func (c *Client) PackageGetLoadedStgNodes(msg PackageGetLoadedStgNodes, opts ...mq.RequestOption) (*PackageGetLoadedStgNodesResp, error) {
return mq.Request[PackageGetLoadedStgNodesResp](c.rabbitCli, msg, opts...)
}

View File

@ -0,0 +1,45 @@
package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type AdvisorService interface {
ReportAdvisorTaskStatus(msg *ReportAdvisorTaskStatus) (*ReportAdvisorTaskStatusResp, *mq.CodeMessage)
}
// 接收advisor上报的存活状态及任务执行情况
var _ = Register(Service.ReportAdvisorTaskStatus)
type ReportAdvisorTaskStatus struct {
AdvisorID string `json:"advisorID"`
TaskStatus []AdvisorTaskStatus `json:"taskStatus"`
}
type ReportAdvisorTaskStatusResp struct {
}
type AdvisorTaskStatus struct {
TaskID string
Status advtsk.TaskStatus
}
func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus) ReportAdvisorTaskStatus {
return ReportAdvisorTaskStatus{
AdvisorID: advisorID,
TaskStatus: taskStatus,
}
}
func NewReportAdvisorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
}
func NewAdvisorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) AdvisorTaskStatus {
return AdvisorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportAdvisorTaskStatus(msg ReportAdvisorTaskStatus, opts ...mq.RequestOption) (*ReportAdvisorTaskStatusResp, error) {
return mq.Request[ReportAdvisorTaskStatusResp](c.rabbitCli, msg, opts...)
}

View File

@ -1,125 +0,0 @@
package manager
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
// 接收executor上报的存活状态及任务执行情况
var _ = Register(Service.ReportExecutorTaskStatus)
type ReportExecutorTaskStatus struct {
ExecutorID string `json:"executorID"`
TaskStatus []ExecutorTaskStatus `json:"taskStatus"`
}
type ReportExecutorTaskStatusResp struct {
}
type ExecutorTaskStatus struct {
TaskID string
Status exectsk.TaskStatus
}
func NewReportExecutorTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) ReportExecutorTaskStatus {
return ReportExecutorTaskStatus{
ExecutorID: executorID,
TaskStatus: taskStatus,
}
}
func NewReportExecutorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
}
func NewExecutorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) ExecutorTaskStatus {
return ExecutorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportExecutorTaskStatus(msg ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
return mq.Request[ReportExecutorTaskStatusResp](c.rabbitCli, msg, opts...)
}
// 接收advisor上报的存活状态及任务执行情况
var _ = Register(Service.ReportAdvisorTaskStatus)
type ReportAdvisorTaskStatus struct {
AdvisorID string `json:"advisorID"`
TaskStatus []AdvisorTaskStatus `json:"taskStatus"`
}
type ReportAdvisorTaskStatusResp struct {
}
type AdvisorTaskStatus struct {
TaskID string
Status advtsk.TaskStatus
}
func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus) ReportAdvisorTaskStatus {
return ReportAdvisorTaskStatus{
AdvisorID: advisorID,
TaskStatus: taskStatus,
}
}
func NewReportAdvisorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
}
func NewAdvisorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) AdvisorTaskStatus {
return AdvisorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportAdvisorTaskStatus(msg ReportAdvisorTaskStatus, opts ...mq.RequestOption) (*ReportAdvisorTaskStatusResp, error) {
return mq.Request[ReportAdvisorTaskStatusResp](c.rabbitCli, msg, opts...)
}
// 提交任务集
type SubmitJobSet struct {
JobSet models.JobSetInfo `json:"jobSet"`
PreScheduleScheme models.JobSetPreScheduleScheme `json:"preScheduleScheme"`
}
type SubmitJobSetResp struct {
JobSetID string `json:"jobSetID"`
}
func NewSubmitJobSet(jobSet models.JobSetInfo, preScheduleScheme models.JobSetPreScheduleScheme) SubmitJobSet {
return SubmitJobSet{
JobSet: jobSet,
PreScheduleScheme: preScheduleScheme,
}
}
func NewSubmitJobSetResp(jobSetID string) SubmitJobSetResp {
return SubmitJobSetResp{
JobSetID: jobSetID,
}
}
func (c *Client) SubmitJobSet(msg SubmitJobSet, opts ...mq.RequestOption) (*SubmitJobSetResp, error) {
return mq.Request[SubmitJobSetResp](c.rabbitCli, msg, opts...)
}
// JobSet中需要使用的一个文件上传完成
type JobSetLocalFileUploaded struct {
JobSetID string `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因
PackageID int64 `json:"packageID"` // 如果上传文件成功那么这个字段是上传之后得到的PackageID
}
type JobSetLocalFileUploadedResp struct {
}
func NewJobSetLocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) JobSetLocalFileUploaded {
return JobSetLocalFileUploaded{
JobSetID: jobSetID,
LocalPath: localPath,
Error: err,
PackageID: packageID,
}
}
func NewJobSetLocalFileUploadedResp() JobSetLocalFileUploadedResp {
return JobSetLocalFileUploadedResp{}
}
func (c *Client) JobSetLocalFileUploaded(msg JobSetLocalFileUploaded, opts ...mq.RequestOption) (*JobSetLocalFileUploadedResp, error) {
return mq.Request[JobSetLocalFileUploadedResp](c.rabbitCli, msg, opts...)
}

View File

@ -0,0 +1,44 @@
package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type ExecutorService interface {
ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus) (*ReportExecutorTaskStatusResp, *mq.CodeMessage)
}
// 接收executor上报的存活状态及任务执行情况
var _ = Register(Service.ReportExecutorTaskStatus)
type ReportExecutorTaskStatus struct {
ExecutorID string `json:"executorID"`
TaskStatus []ExecutorTaskStatus `json:"taskStatus"`
}
type ReportExecutorTaskStatusResp struct {
}
type ExecutorTaskStatus struct {
TaskID string
Status exectsk.TaskStatus
}
func NewReportExecutorTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) ReportExecutorTaskStatus {
return ReportExecutorTaskStatus{
ExecutorID: executorID,
TaskStatus: taskStatus,
}
}
func NewReportExecutorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
}
func NewExecutorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) ExecutorTaskStatus {
return ExecutorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportExecutorTaskStatus(msg ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
return mq.Request[ReportExecutorTaskStatusResp](c.rabbitCli, msg, opts...)
}

View File

@ -0,0 +1,61 @@
package manager
import "gitlink.org.cn/cloudream/common/pkgs/mq"
type ImageService interface {
CreateImage(msg *CreateImage) (*CreateImageResp, *mq.CodeMessage)
GetImageInfo(msg *GetImageInfo) (*GetImageInfoResp, *mq.CodeMessage)
}
// 创建镜像
var _ = Register(Service.CreateImage)
type CreateImage struct {
SlwNodeImageID string `json:"slwNodeImageID"` // 算力中心的镜像ID
PackageID int64 `json:"packageID"` // 镜像文件
}
type CreateImageResp struct {
ImageID string `json:"imageID"`
}
func NewCreateImage(slwNodeImageID string, packageID int64) CreateImage {
return CreateImage{
SlwNodeImageID: slwNodeImageID,
PackageID: packageID,
}
}
func NewCreateImageResp(imageID string) CreateImageResp {
return CreateImageResp{
ImageID: imageID,
}
}
func (c *Client) CreateImage(msg CreateImage, opts ...mq.RequestOption) (*CreateImageResp, error) {
return mq.Request[CreateImageResp](c.rabbitCli, msg, opts...)
}
// 查询镜像信息
var _ = Register(Service.GetImageInfo)
type GetImageInfo struct {
ImageID string `json:"imageID"`
}
type GetImageInfoResp struct {
SlwNodeImageID string `json:"slwNodeImageID"` // 算力中心的镜像ID
PackageID int64 `json:"packageID"` // 镜像文件
}
func NewGetImageInfo(imageID string) GetImageInfo {
return GetImageInfo{
ImageID: imageID,
}
}
func NewGetImageInfoResp(slwNodeImageID string, packageID int64) GetImageInfoResp {
return GetImageInfoResp{
SlwNodeImageID: slwNodeImageID,
PackageID: packageID,
}
}
func (c *Client) GetImageInfo(msg GetImageInfo, opts ...mq.RequestOption) (*GetImageInfoResp, error) {
return mq.Request[GetImageInfoResp](c.rabbitCli, msg, opts...)
}

View File

@ -0,0 +1,92 @@
package manager
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type JobService interface {
SubmitJobSet(msg *SubmitJobSet) (*SubmitJobSetResp, *mq.CodeMessage)
JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded) (*JobSetLocalFileUploadedResp, *mq.CodeMessage)
}
// 提交任务集
var _ = Register(Service.SubmitJobSet)
type SubmitJobSet struct {
JobSet models.JobSetInfo `json:"jobSet"`
PreScheduleScheme jobmod.JobSetPreScheduleScheme `json:"preScheduleScheme"`
}
type SubmitJobSetResp struct {
JobSetID string `json:"jobSetID"`
}
func NewSubmitJobSet(jobSet models.JobSetInfo, preScheduleScheme jobmod.JobSetPreScheduleScheme) SubmitJobSet {
return SubmitJobSet{
JobSet: jobSet,
PreScheduleScheme: preScheduleScheme,
}
}
func NewSubmitJobSetResp(jobSetID string) SubmitJobSetResp {
return SubmitJobSetResp{
JobSetID: jobSetID,
}
}
func (c *Client) SubmitJobSet(msg SubmitJobSet, opts ...mq.RequestOption) (*SubmitJobSetResp, error) {
return mq.Request[SubmitJobSetResp](c.rabbitCli, msg, opts...)
}
// JobSet中需要使用的一个文件上传完成
var _ = Register(Service.JobSetLocalFileUploaded)
type JobSetLocalFileUploaded struct {
JobSetID string `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因
PackageID int64 `json:"packageID"` // 如果上传文件成功那么这个字段是上传之后得到的PackageID
}
type JobSetLocalFileUploadedResp struct {
}
func NewJobSetLocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) JobSetLocalFileUploaded {
return JobSetLocalFileUploaded{
JobSetID: jobSetID,
LocalPath: localPath,
Error: err,
PackageID: packageID,
}
}
func NewJobSetLocalFileUploadedResp() JobSetLocalFileUploadedResp {
return JobSetLocalFileUploadedResp{}
}
func (c *Client) JobSetLocalFileUploaded(msg JobSetLocalFileUploaded, opts ...mq.RequestOption) (*JobSetLocalFileUploadedResp, error) {
return mq.Request[JobSetLocalFileUploadedResp](c.rabbitCli, msg, opts...)
}
// 获取任务数据
type GetJob struct {
JobID string `json:"jobID"`
}
type GetJobResp struct {
Job jobmod.Job `json:"job"`
}
func NewGetJob(jobID string) GetJob {
return GetJob{
JobID: jobID,
}
}
func NewGetJobResp(job jobmod.Job) GetJobResp {
return GetJobResp{
Job: job,
}
}
func (c *Client) GetJob(msg GetJob, opts ...mq.RequestOption) (*GetJobResp, error) {
return mq.Request[GetJobResp](c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(jobmod.JobTypeUnion)
}

View File

@ -10,13 +10,13 @@ const (
)
type Service interface {
SubmitJobSet(msg *SubmitJobSet) (*SubmitJobSetResp, *mq.CodeMessage)
AdvisorService
JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded) (*JobSetLocalFileUploadedResp, *mq.CodeMessage)
ExecutorService
ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus) (*ReportExecutorTaskStatusResp, *mq.CodeMessage)
ImageService
ReportAdvisorTaskStatus(msg *ReportAdvisorTaskStatus) (*ReportAdvisorTaskStatusResp, *mq.CodeMessage)
JobService
}
type Server struct {

4
go.mod
View File

@ -6,6 +6,8 @@ replace gitlink.org.cn/cloudream/common v0.0.0 => ../common
require (
github.com/gin-gonic/gin v1.9.1
github.com/google/uuid v1.3.0
github.com/samber/lo v1.38.1
gitlink.org.cn/cloudream/common v0.0.0
google.golang.org/grpc v1.54.0
)
@ -22,7 +24,6 @@ require (
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/imdario/mergo v0.3.15 // indirect
@ -34,7 +35,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/samber/lo v1.36.0 // indirect
github.com/sirupsen/logrus v1.9.2 // indirect
github.com/streadway/amqp v1.1.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect

9
go.sum
View File

@ -47,7 +47,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
@ -59,13 +58,12 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.36.0 h1:4LaOxH1mHnbDGhTVE0i1z8v/lWaQW8AIfOD3HU4mSaw=
github.com/samber/lo v1.36.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sirupsen/logrus v1.9.2 h1:oxx1eChJGI6Uks2ZC4W1zpLlVgqB8ner4EuQwV4Ik1Y=
github.com/sirupsen/logrus v1.9.2/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU=
@ -83,7 +81,6 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
@ -115,8 +112,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,22 @@
package config
import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/config"
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
type Config struct {
Logger logger.Config `json:"logger"`
RabbitMQ scmq.Config `json:"rabbitMQ"`
}
var cfg Config
func Init() error {
return config.DefaultLoad("client", &cfg)
}
func Cfg() *Config {
return &cfg
}

View File

@ -0,0 +1,10 @@
package mq
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
func (svc *Service) ReportAdvisorTaskStatus(msg *mgrmq.ReportAdvisorTaskStatus) (*mgrmq.ReportAdvisorTaskStatusResp, *mq.CodeMessage) {
}

View File

@ -0,0 +1,10 @@
package mq
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
func (svc *Service) ReportExecutorTaskStatus(msg *mgrmq.ReportExecutorTaskStatus) (*mgrmq.ReportExecutorTaskStatusResp, *mq.CodeMessage) {
}

View File

@ -0,0 +1,14 @@
package mq
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
func (svc *Service) CreateImage(msg *mgrmq.CreateImage) (*mgrmq.CreateImageResp, *mq.CodeMessage) {
}
func (svc *Service) GetImageInfo(msg *mgrmq.GetImageInfo) (*mgrmq.GetImageInfoResp, *mq.CodeMessage) {
}

View File

@ -0,0 +1,20 @@
package mq
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
// 提交任务集
func (svc *Service) SubmitJobSet(msg *mgrmq.SubmitJobSet) (*mgrmq.SubmitJobSetResp, *mq.CodeMessage) {
}
// 任务集中某个文件上传完成
func (svc *Service) JobSetLocalFileUploaded(msg *mgrmq.JobSetLocalFileUploaded) (*mgrmq.JobSetLocalFileUploadedResp, *mq.CodeMessage) {
}
func (svc *Service) GetJob(msg *mgrmq.GetJob) (*mgrmq.GetJobResp, *mq.CodeMessage) {
}

View File

@ -0,0 +1,8 @@
package mq
type Service struct {
}
func NewService() (*Service, error) {
return &Service{}, nil
}

62
manager/main.go Normal file
View File

@ -0,0 +1,62 @@
package main
import (
"fmt"
"os"
_ "google.golang.org/grpc/balancer/grpclb"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/common/globals"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
"gitlink.org.cn/cloudream/scheduler/manager/internal/config"
mqsvc "gitlink.org.cn/cloudream/scheduler/manager/internal/mq"
)
func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}
globals.InitMQPool(&config.Cfg().RabbitMQ)
svc, err := mqsvc.NewService()
if err != nil {
fmt.Printf("new service: %s", err.Error())
os.Exit(1)
}
mqSvr, err := mgrmq.NewServer(svc, &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new manager mq server: %s", err.Error())
}
mqSvr.OnError = func(err error) {
logger.Warnf("manager server err: %s", err.Error())
}
// 启动服务
go serveMQServer(mqSvr)
forever := make(chan bool)
<-forever
}
func serveMQServer(server *mgrmq.Server) {
logger.Info("start serving mq server")
err := server.Serve()
if err != nil {
logger.Errorf("mq server stopped with error: %s", err.Error())
}
logger.Info("mq server stopped")
}