Merge pull request 'collector init' (#1) from feature_sjc into master

This commit is contained in:
baohan 2023-08-28 09:25:44 +08:00
commit a3f026eb8f
5 changed files with 257 additions and 0 deletions

27
internal/config/config.go Normal file
View File

@ -0,0 +1,27 @@
package config
import (
cldstg "gitlink.org.cn/cloudream/common/api/storage"
uniops "gitlink.org.cn/cloudream/common/api/unifyops"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
mymq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq"
)
type Config struct {
Logger log.Config `json:"logger"`
RabbitMQ mymq.Config `json:"rabbitMQ`
CloudreamStorage cldstg.Config `json:"cloudreamStorage"`
UnifyOps uniops.Config `json:"unifyOps"`
// PCM cldstg.Config `json:"pcm"`
}
var cfg Config
func Init() error {
return c.DefaultLoad("collector", &cfg)
}
func Cfg() *Config {
return &cfg
}

View File

@ -0,0 +1,8 @@
package services
type Service struct {
}
func NewService() *Service {
return &Service{}
}

View File

@ -0,0 +1,50 @@
package services
import (
"gitlink.org.cn/cloudream/common/api/storage"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/scheduler-common/globals"
colmq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq/collector"
)
func (svc *Service) PackageGetCachedNodes(msg *colmq.PackageGetCachedNodes) (*colmq.PackageGetCachedNodesResp, *mq.CodeMessage) {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedNodesResp](errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
resp, err := stgCli.PackageGetCachedNodes(storage.PackageGetCachedNodesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package cached nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedNodesResp](errorcode.OperationFailed, "get package cached nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetCachedNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType))
}
func (svc *Service) PackageGetLoadedNodes(msg *colmq.PackageGetLoadedNodes) (*colmq.PackageGetLoadedNodesResp, *mq.CodeMessage) {
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedNodesResp](errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
resp, err := stgCli.PackageGetLoadedNodes(storage.PackageGetLoadedNodesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package loaded nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedNodesResp](errorcode.OperationFailed, "get package loaded nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetLoadedNodesResp(resp.NodeIDs))
}

View File

@ -0,0 +1,117 @@
package services
import (
"gitlink.org.cn/cloudream/common/api/unifyops"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/scheduler-common/globals"
colmq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq/collector"
)
func (svc *Service) GetSlwNodeInfo(msg *colmq.GetSlwNodeInfo) (*colmq.GetSlwNodeInfoResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetSlwNodeInfoResp](errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
resps, err := uniOpsCli.GetSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetSlwNodeInfoResp](errorcode.OperationFailed, "get slwNode info failed")
}
return mq.ReplyOK(colmq.NewGetSlwNodeInfoResp(resps.Nodes))
}
func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.GetOneResourceDataResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
var resp models.ResourceData
switch msg.ResourceType {
case models.ResourceTypeCPU:
resp, err = uniOpsCli.GetCPUData(unifyops.Node{
NodeId: msg.NodeId,
})
case models.ResourceTypeNPU:
resp, err = uniOpsCli.GetNPUData(unifyops.Node{
NodeId: msg.NodeId,
})
case models.ResourceTypeGPU:
resp, err = uniOpsCli.GetGPUData(unifyops.Node{
NodeId: msg.NodeId,
})
case models.ResourceTypeMLU:
resp, err = uniOpsCli.GetMLUData(unifyops.Node{
NodeId: msg.NodeId,
})
case models.ResourceTypeStorage:
resp, err = uniOpsCli.GetStorageData(unifyops.Node{
NodeId: msg.NodeId,
})
case models.ResourceTypeMemory:
resp, err = uniOpsCli.GetMemoryData(unifyops.Node{
NodeId: msg.NodeId,
})
default:
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "invalid resource type")
}
if err != nil {
logger.Warnf("get resource data failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "get resource data failed")
}
return mq.ReplyOK(colmq.NewGetOneResourceDataResp(resp))
}
func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.GetAllResourceDataResp, *mq.CodeMessage) {
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
resps, err := uniOpsCli.GetIndicatorData(unifyops.Node{
NodeId: msg.NodeId,
})
if err != nil {
logger.Warnf("get all resource data failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "get all resource data failed")
}
var resourceTypeToModel = map[string]models.ResourceData{
models.ResourceTypeCPU: &models.CPUResourceData{},
models.ResourceTypeNPU: &models.NPUResourceData{},
models.ResourceTypeGPU: &models.GPUResourceData{},
models.ResourceTypeMLU: &models.MLUResourceData{},
models.ResourceTypeStorage: &models.StorageResourceData{},
models.ResourceTypeMemory: &models.MemoryResourceData{},
}
var datas []models.ResourceData
for _, resp := range *resps {
data, exists := resourceTypeToModel[resp.Name]
if !exists {
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "invalid resource type")
}
if err = serder.AnyToAny(resp, data); err != nil {
logger.Warnf("get all resource data failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "get all resource data failed")
}
datas = append(datas, data)
}
return mq.ReplyOK(colmq.NewGetAllResourceDataResp(datas))
}

55
main.go Normal file
View File

@ -0,0 +1,55 @@
package main
import (
"fmt"
"os"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler-collector/internal/config"
"gitlink.org.cn/cloudream/scheduler-collector/internal/services"
"gitlink.org.cn/cloudream/scheduler-common/globals"
colmq "gitlink.org.cn/cloudream/scheduler-common/pkgs/mq/collector"
)
func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}
globals.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
globals.IniUnifyOpsPool(&config.Cfg().UnifyOps)
mqSvr, err := colmq.NewServer(services.NewService(), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new collector server failed, err: %s", err.Error())
}
mqSvr.OnError = func(err error) {
logger.Warnf("collector server err: %s", err.Error())
}
// 启动服务
go serveColServer(mqSvr)
forever := make(chan bool)
<-forever
}
func serveColServer(server *colmq.Server) {
logger.Info("start serving command server")
err := server.Serve()
if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
}
logger.Info("command server stopped")
}