优化MQAPI的声明方式

This commit is contained in:
Sydonian 2023-09-12 09:45:58 +08:00
parent 30324ff351
commit a7206a446c
17 changed files with 248 additions and 191 deletions

View File

@ -14,7 +14,7 @@ func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.Ge
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "new unifyOps client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
@ -45,12 +45,12 @@ func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.Ge
SlwNodeID: msg.SlwNodeID,
})
default:
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "invalid resource type")
return nil, mq.Failed(errorcode.OperationFailed, "invalid resource type")
}
if err != nil {
logger.Warnf("get resource data failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetOneResourceDataResp](errorcode.OperationFailed, "get resource data failed")
return nil, mq.Failed(errorcode.OperationFailed, "get resource data failed")
}
return mq.ReplyOK(colmq.NewGetOneResourceDataResp(resp))
@ -60,7 +60,7 @@ func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.Ge
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "new unifyOps client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
@ -69,7 +69,7 @@ func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.Ge
})
if err != nil {
logger.Warnf("get all resource data failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllResourceDataResp](errorcode.OperationFailed, "get all resource data failed")
return nil, mq.Failed(errorcode.OperationFailed, "get all resource data failed")
}
return mq.ReplyOK(colmq.NewGetAllResourceDataResp(*resps))

View File

@ -50,14 +50,14 @@ func (svc *Service) GetAllSlwNodeInfo(msg *colmq.GetAllSlwNodeInfo) (*colmq.GetA
uniOpsCli, err := globals.UnifyOpsPool.Acquire()
if err != nil {
logger.Warnf("new unifyOps client, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllSlwNodeInfoResp](errorcode.OperationFailed, "new unifyOps client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed")
}
defer uniOpsCli.Close()
resp, err := uniOpsCli.GetAllSlwNodeInfo()
if err != nil {
logger.Warnf("get slwNode info failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.GetAllSlwNodeInfoResp](errorcode.OperationFailed, "get slwNode info failed")
return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed")
}
// TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的所以这里进行两个数据源的合并

View File

@ -13,7 +13,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedStgNodesResp](errorcode.OperationFailed, "new storage client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
@ -23,7 +23,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
})
if err != nil {
logger.Warnf("get package cached stg nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetCachedStgNodesResp](errorcode.OperationFailed, "get package cached stg nodes failed")
return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType))
@ -33,7 +33,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes
stgCli, err := globals.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedStgNodesResp](errorcode.OperationFailed, "new storage client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new storage client failed")
}
defer stgCli.Close()
@ -43,7 +43,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes
})
if err != nil {
logger.Warnf("get package loaded stg nodes failed, err: %s", err.Error())
return mq.ReplyFailed[colmq.PackageGetLoadedStgNodesResp](errorcode.OperationFailed, "get package loaded stg nodes failed")
return nil, mq.Failed(errorcode.OperationFailed, "get package loaded stg nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetLoadedStgNodesResp(resp.NodeIDs))

View File

@ -52,7 +52,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any {
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()
@ -62,7 +62,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T
// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any {
func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()

View File

@ -15,56 +15,54 @@ type ResourceService interface {
var _ = Register(Service.GetOneResourceData)
type GetOneResourceData struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
ResourceType string `json:"type"`
}
type GetOneResourceDataResp struct {
mq.MessageBodyBase
Data models.ResourceData `json:"data"`
}
func NewGetOneResourceData(nodeId int64, resourceType string) GetOneResourceData {
return GetOneResourceData{
func NewGetOneResourceData(nodeId int64, resourceType string) *GetOneResourceData {
return &GetOneResourceData{
SlwNodeID: nodeId,
ResourceType: resourceType,
}
}
type GetOneResourceDataResp struct {
Data models.ResourceData `json:"data"`
}
func NewGetOneResourceDataResp(data models.ResourceData) GetOneResourceDataResp {
return GetOneResourceDataResp{
func NewGetOneResourceDataResp(data models.ResourceData) *GetOneResourceDataResp {
return &GetOneResourceDataResp{
Data: data,
}
}
func (c *Client) GetOneResourceData(msg GetOneResourceData, opts ...mq.RequestOption) (*GetOneResourceDataResp, error) {
return mq.Request[GetOneResourceDataResp](c.rabbitCli, msg, opts...)
func (c *Client) GetOneResourceData(msg *GetOneResourceData, opts ...mq.RequestOption) (*GetOneResourceDataResp, error) {
return mq.Request(Service.GetOneResourceData, c.rabbitCli, msg, opts...)
}
// 根据nodeID获取节点全部资源信息
var _ = Register(Service.GetAllResourceData)
type GetAllResourceData struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
}
func NewGetAllResourceData(nodeId int64) GetAllResourceData {
return GetAllResourceData{
SlwNodeID: nodeId,
}
}
type GetAllResourceDataResp struct {
mq.MessageBodyBase
Datas []models.ResourceData `json:"datas"`
}
func NewGetAllResourceDataResp(datas []models.ResourceData) GetAllResourceDataResp {
return GetAllResourceDataResp{
func NewGetAllResourceData(nodeId int64) *GetAllResourceData {
return &GetAllResourceData{
SlwNodeID: nodeId,
}
}
func NewGetAllResourceDataResp(datas []models.ResourceData) *GetAllResourceDataResp {
return &GetAllResourceDataResp{
Datas: datas,
}
}
func (c *Client) GetAllResourceData(msg GetAllResourceData, opts ...mq.RequestOption) (*GetAllResourceDataResp, error) {
return mq.Request[GetAllResourceDataResp](c.rabbitCli, msg, opts...)
func (c *Client) GetAllResourceData(msg *GetAllResourceData, opts ...mq.RequestOption) (*GetAllResourceDataResp, error) {
return mq.Request(Service.GetAllResourceData, c.rabbitCli, msg, opts...)
}
func init() {

View File

@ -56,7 +56,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any {
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()
@ -66,7 +66,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T
// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any {
func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()

View File

@ -15,42 +15,47 @@ type SlwService interface {
var _ = Register(Service.GetSlwNodeInfo)
type GetSlwNodeInfo struct {
mq.MessageBodyBase
SlwNodeID int64 `json:"slwNodeID"`
}
type GetSlwNodeInfoResp struct {
mq.MessageBodyBase
models.SlwNode
}
func NewGetSlwNodeInfo(slwNodeID int64) GetSlwNodeInfo {
return GetSlwNodeInfo{
func NewGetSlwNodeInfo(slwNodeID int64) *GetSlwNodeInfo {
return &GetSlwNodeInfo{
SlwNodeID: slwNodeID,
}
}
func NewGetSlwNodeInfoResp(node models.SlwNode) GetSlwNodeInfoResp {
return GetSlwNodeInfoResp{
func NewGetSlwNodeInfoResp(node models.SlwNode) *GetSlwNodeInfoResp {
return &GetSlwNodeInfoResp{
SlwNode: node,
}
}
func (c *Client) GetSlwNodeInfo(msg GetSlwNodeInfo, opts ...mq.RequestOption) (*[]GetSlwNodeInfoResp, error) {
return mq.Request[[]GetSlwNodeInfoResp](c.rabbitCli, msg, opts...)
func (c *Client) GetSlwNodeInfo(msg *GetSlwNodeInfo, opts ...mq.RequestOption) (*GetSlwNodeInfoResp, error) {
return mq.Request(Service.GetSlwNodeInfo, c.rabbitCli, msg, opts...)
}
// 获取所有节点信息
var _ = Register(Service.GetAllSlwNodeInfo)
type GetAllSlwNodeInfo struct{}
type GetAllSlwNodeInfo struct {
mq.MessageBodyBase
}
type GetAllSlwNodeInfoResp struct {
mq.MessageBodyBase
Nodes []models.SlwNode `json:"nodes"`
}
func NewGetAllSlwNodeInfo() GetAllSlwNodeInfoResp {
return GetAllSlwNodeInfoResp{}
func NewGetAllSlwNodeInfo() *GetAllSlwNodeInfoResp {
return &GetAllSlwNodeInfoResp{}
}
func NewGetAllSlwNodeInfoResp(nodes []models.SlwNode) GetAllSlwNodeInfoResp {
return GetAllSlwNodeInfoResp{
func NewGetAllSlwNodeInfoResp(nodes []models.SlwNode) *GetAllSlwNodeInfoResp {
return &GetAllSlwNodeInfoResp{
Nodes: nodes,
}
}
func (c *Client) GetAllSlwNodeInfo(msg GetAllSlwNodeInfo, opts ...mq.RequestOption) (*[]GetAllSlwNodeInfoResp, error) {
return mq.Request[[]GetAllSlwNodeInfoResp](c.rabbitCli, msg, opts...)
func (c *Client) GetAllSlwNodeInfo(msg *GetAllSlwNodeInfo, opts ...mq.RequestOption) (*GetAllSlwNodeInfoResp, error) {
return mq.Request(Service.GetAllSlwNodeInfo, c.rabbitCli, msg, opts...)
}

View File

@ -15,21 +15,23 @@ type StorageService interface {
var _ = Register(Service.PackageGetCachedStgNodes)
type PackageGetCachedStgNodes struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetCachedStgNodesResp struct {
mq.MessageBodyBase
models.PackageCachingInfo
}
func NewPackageGetCachedStgNodes(userID int64, packageID int64) PackageGetCachedStgNodes {
return PackageGetCachedStgNodes{
func NewPackageGetCachedStgNodes(userID int64, packageID int64) *PackageGetCachedStgNodes {
return &PackageGetCachedStgNodes{
UserID: userID,
PackageID: packageID,
}
}
func NewPackageGetCachedStgNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) PackageGetCachedStgNodesResp {
return PackageGetCachedStgNodesResp{
func NewPackageGetCachedStgNodesResp(nodeInfos []models.NodePackageCachingInfo, packageSize int64, redunancyType string) *PackageGetCachedStgNodesResp {
return &PackageGetCachedStgNodesResp{
PackageCachingInfo: models.PackageCachingInfo{
NodeInfos: nodeInfos,
PackageSize: packageSize,
@ -37,32 +39,34 @@ func NewPackageGetCachedStgNodesResp(nodeInfos []models.NodePackageCachingInfo,
},
}
}
func (c *Client) PackageGetCachedStgNodes(msg PackageGetCachedStgNodes, opts ...mq.RequestOption) (*PackageGetCachedStgNodesResp, error) {
return mq.Request[PackageGetCachedStgNodesResp](c.rabbitCli, msg, opts...)
func (c *Client) PackageGetCachedStgNodes(msg *PackageGetCachedStgNodes, opts ...mq.RequestOption) (*PackageGetCachedStgNodesResp, error) {
return mq.Request(Service.PackageGetCachedStgNodes, c.rabbitCli, msg, opts...)
}
// 获取package的存储分布情况
var _ = Register(Service.PackageGetLoadedStgNodes)
type PackageGetLoadedStgNodes struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetLoadedStgNodesResp struct {
mq.MessageBodyBase
StgNodeIDs []int64 `json:"stgNodeIDs"`
}
func NewPackageGetLoadedStgNodes(userID int64, packageID int64) PackageGetLoadedStgNodes {
return PackageGetLoadedStgNodes{
func NewPackageGetLoadedStgNodes(userID int64, packageID int64) *PackageGetLoadedStgNodes {
return &PackageGetLoadedStgNodes{
UserID: userID,
PackageID: packageID,
}
}
func NewPackageGetLoadedStgNodesResp(nodeIDs []int64) PackageGetLoadedStgNodesResp {
return PackageGetLoadedStgNodesResp{
func NewPackageGetLoadedStgNodesResp(nodeIDs []int64) *PackageGetLoadedStgNodesResp {
return &PackageGetLoadedStgNodesResp{
StgNodeIDs: nodeIDs,
}
}
func (c *Client) PackageGetLoadedStgNodes(msg PackageGetLoadedStgNodes, opts ...mq.RequestOption) (*PackageGetLoadedStgNodesResp, error) {
return mq.Request[PackageGetLoadedStgNodesResp](c.rabbitCli, msg, opts...)
func (c *Client) PackageGetLoadedStgNodes(msg *PackageGetLoadedStgNodes, opts ...mq.RequestOption) (*PackageGetLoadedStgNodesResp, error) {
return mq.Request(Service.PackageGetLoadedStgNodes, c.rabbitCli, msg, opts...)
}

View File

@ -15,142 +15,144 @@ type PCMService interface {
}
// 启动上传(并注册)镜像任务
var _ = Register(PCMService.StartUploadImage)
var _ = Register(Service.StartUploadImage)
type StartUploadImage struct {
mq.MessageBodyBase
NodeID int64 `json:"nodeID"`
ImagePath string `json:"imagePath"`
}
type StartUploadImageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartUploadImage(nodeID int64, imagePath string) StartUploadImage {
return StartUploadImage{
func NewStartUploadImage(nodeID int64, imagePath string) *StartUploadImage {
return &StartUploadImage{
NodeID: nodeID,
ImagePath: imagePath,
}
}
func NewStartUploadImageResp(taskID string) StartUploadImageResp {
return StartUploadImageResp{
func NewStartUploadImageResp(taskID string) *StartUploadImageResp {
return &StartUploadImageResp{
TaskID: taskID,
}
}
func (c *Client) StartUploadImage(msg StartUploadImage, opts ...mq.RequestOption) (*StartUploadImageResp, error) {
return mq.Request[StartUploadImageResp](c.rabbitCli, msg, opts...)
func (c *Client) StartUploadImage(msg *StartUploadImage, opts ...mq.RequestOption) (*StartUploadImageResp, error) {
return mq.Request(Service.StartUploadImage, c.rabbitCli, msg, opts...)
}
// 查询镜像列表
var _ = Register(PCMService.GetImageList)
var _ = Register(Service.GetImageList)
type GetImageList struct {
mq.MessageBodyBase
NodeID int64 `json:"nodeID"`
}
func NewGetImageList(nodeID int64) GetImageList {
return GetImageList{
NodeID: nodeID,
}
}
type GetImageListResp struct {
mq.MessageBodyBase
ImageIDs []int64 `json:"imageIDs"`
}
func NewGetImageListResp(imageIDs []int64) GetImageListResp {
return GetImageListResp{
func NewGetImageList(nodeID int64) *GetImageList {
return &GetImageList{
NodeID: nodeID,
}
}
func NewGetImageListResp(imageIDs []int64) *GetImageListResp {
return &GetImageListResp{
ImageIDs: imageIDs,
}
}
func (c *Client) GetImageList(msg GetImageList, opts ...mq.RequestOption) (*GetImageListResp, error) {
return mq.Request[GetImageListResp](c.rabbitCli, msg, opts...)
func (c *Client) GetImageList(msg *GetImageList, opts ...mq.RequestOption) (*GetImageListResp, error) {
return mq.Request(Service.GetImageList, c.rabbitCli, msg, opts...)
}
// 删除镜像
var _ = Register(PCMService.DeleteImage)
var _ = Register(Service.DeleteImage)
type DeleteImage struct {
mq.MessageBodyBase
NodeID int64 `json:"nodeID"`
PCMJobID int64 `json:"pcmJobID"`
}
type DeleteImageResp struct {
mq.MessageBodyBase
Result string `json:"result"`
}
func NewDeleteImage(nodeID int64, pcmJobID int64) DeleteImage {
return DeleteImage{
func NewDeleteImage(nodeID int64, pcmJobID int64) *DeleteImage {
return &DeleteImage{
NodeID: nodeID,
PCMJobID: pcmJobID,
}
}
type DeleteImageResp struct {
Result string `json:"result"`
}
func NewDeleteImageResp(result string) DeleteImageResp {
return DeleteImageResp{
func NewDeleteImageResp(result string) *DeleteImageResp {
return &DeleteImageResp{
Result: result,
}
}
func (c *Client) DeleteImage(msg DeleteImage, opts ...mq.RequestOption) (*DeleteImageResp, error) {
return mq.Request[DeleteImageResp](c.rabbitCli, msg, opts...)
func (c *Client) DeleteImage(msg *DeleteImage, opts ...mq.RequestOption) (*DeleteImageResp, error) {
return mq.Request(Service.DeleteImage, c.rabbitCli, msg, opts...)
}
// 启动提交任务
var _ = Register(PCMService.StartScheduleTask)
var _ = Register(Service.StartScheduleTask)
type StartScheduleTask struct {
mq.MessageBodyBase
NodeID int64 `json:"nodeID"`
Envs []map[string]string `json:"envs"`
ImageID int64 `json:"imageID"`
CMDLine string `json:"cmdLine"`
}
type StartScheduleTaskResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartScheduleTask(nodeID int64, envs []map[string]string, imageID int64, cmdLine string) StartScheduleTask {
return StartScheduleTask{
func NewStartScheduleTask(nodeID int64, envs []map[string]string, imageID int64, cmdLine string) *StartScheduleTask {
return &StartScheduleTask{
NodeID: nodeID,
Envs: envs,
ImageID: imageID,
CMDLine: cmdLine,
}
}
func NewStartScheduleTaskResp(taskID string) StartScheduleTaskResp {
return StartScheduleTaskResp{
func NewStartScheduleTaskResp(taskID string) *StartScheduleTaskResp {
return &StartScheduleTaskResp{
TaskID: taskID,
}
}
func (c *Client) StartScheduleTask(msg StartUploadImage, opts ...mq.RequestOption) (*StartScheduleTaskResp, error) {
return mq.Request[StartScheduleTaskResp](c.rabbitCli, msg, opts...)
func (c *Client) StartScheduleTask(msg *StartScheduleTask, opts ...mq.RequestOption) (*StartScheduleTaskResp, error) {
return mq.Request(Service.StartScheduleTask, c.rabbitCli, msg, opts...)
}
// 删除任务
var _ = Register(PCMService.DeleteTask)
var _ = Register(Service.DeleteTask)
type DeleteTask struct {
mq.MessageBodyBase
NodeID int64 `json:"nodeID"`
PCMJobID int64 `json:"pcmJobID"`
}
type DeleteTaskResp struct {
mq.MessageBodyBase
Result string `json:"result"`
}
func NewDeleteTask(nodeID int64, pcmJobID int64) DeleteTask {
return DeleteTask{
func NewDeleteTask(nodeID int64, pcmJobID int64) *DeleteTask {
return &DeleteTask{
NodeID: nodeID,
PCMJobID: pcmJobID,
}
}
type DeleteTaskResp struct {
Result string `json:"result"`
}
func NewDeleteTaskResp(result string) DeleteTaskResp {
return DeleteTaskResp{
func NewDeleteTaskResp(result string) *DeleteTaskResp {
return &DeleteTaskResp{
Result: result,
}
}
func (c *Client) DeleteTask(msg DeleteTask, opts ...mq.RequestOption) (*DeleteTaskResp, error) {
return mq.Request[DeleteTaskResp](c.rabbitCli, msg, opts...)
func (c *Client) DeleteTask(msg *DeleteTask, opts ...mq.RequestOption) (*DeleteTaskResp, error) {
return mq.Request(Service.DeleteTask, c.rabbitCli, msg, opts...)
}

View File

@ -54,7 +54,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any {
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()
@ -64,7 +64,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T
// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any {
func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()

View File

@ -14,36 +14,39 @@ type StorageService interface {
}
// 启动存储系统调度文件任务
var _ = Register(StorageService.StartStorageLoadPackage)
var _ = Register(Service.StartStorageLoadPackage)
type StartStorageLoadPackage struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"`
}
type StartStorageLoadPackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartStorageLoadPackage(packageID int64, userID int64) StartStorageLoadPackage {
return StartStorageLoadPackage{
func NewStartStorageLoadPackage(packageID int64, userID int64) *StartStorageLoadPackage {
return &StartStorageLoadPackage{
PackageID: packageID,
UserID: userID,
}
}
func NewStartStorageLoadPackageResp(taskID string) StartStorageLoadPackageResp {
return StartStorageLoadPackageResp{
func NewStartStorageLoadPackageResp(taskID string) *StartStorageLoadPackageResp {
return &StartStorageLoadPackageResp{
TaskID: taskID,
}
}
func (c *Client) StartStorageLoadPackage(msg StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) {
return mq.Request[StartStorageLoadPackageResp](c.rabbitCli, msg, opts...)
func (c *Client) StartStorageLoadPackage(msg *StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) {
return mq.Request(Service.StartStorageLoadPackage, c.rabbitCli, msg, opts...)
}
// 启动存储系统从存储服务上传文件任务
var _ = Register(StorageService.StartStorageCreatePackage)
var _ = Register(Service.StartStorageCreatePackage)
type StartStorageCreatePackage struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
StorageID int64 `json:"storageID"`
Path string `json:"path"`
@ -52,11 +55,12 @@ type StartStorageCreatePackage struct {
Redundancy models.TypedRedundancyInfo `json:"redundancy"`
}
type StartStorageCreatePackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) StartStorageCreatePackage {
return StartStorageCreatePackage{
func NewStartStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy models.TypedRedundancyInfo) *StartStorageCreatePackage {
return &StartStorageCreatePackage{
UserID: userID,
StorageID: storageID,
Path: filePath,
@ -65,39 +69,41 @@ func NewStartStorageCreatePackage(userID int64, storageID int64, filePath string
Redundancy: redundancy,
}
}
func NewStartStorageCreatePackageResp(taskID string) StartStorageCreatePackageResp {
return StartStorageCreatePackageResp{
func NewStartStorageCreatePackageResp(taskID string) *StartStorageCreatePackageResp {
return &StartStorageCreatePackageResp{
TaskID: taskID,
}
}
func (c *Client) StartStorageCreatePackage(msg StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) {
return mq.Request[StartStorageCreatePackageResp](c.rabbitCli, msg, opts...)
func (c *Client) StartStorageCreatePackage(msg *StartStorageCreatePackage, opts ...mq.RequestOption) (*StartStorageCreatePackageResp, error) {
return mq.Request(Service.StartStorageCreatePackage, c.rabbitCli, msg, opts...)
}
// 启动存储系统调度文件到某个节点的缓存的任务
var _ = Register(StorageService.StartCacheMovePackage)
var _ = Register(Service.StartCacheMovePackage)
type StartCacheMovePackage struct {
mq.MessageBodyBase
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
NodeID int64 `json:"nodeID"`
}
type StartCacheMovePackageResp struct {
mq.MessageBodyBase
TaskID string `json:"taskID"`
}
func NewStartCacheMovePackage(userID int64, packageID int64, nodeID int64) StartCacheMovePackage {
return StartCacheMovePackage{
func NewStartCacheMovePackage(userID int64, packageID int64, nodeID int64) *StartCacheMovePackage {
return &StartCacheMovePackage{
UserID: userID,
PackageID: packageID,
NodeID: nodeID,
}
}
func NewStartCacheMovePackageResp(taskID string) StartCacheMovePackageResp {
return StartCacheMovePackageResp{
func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp {
return &StartCacheMovePackageResp{
TaskID: taskID,
}
}
func (c *Client) StartCacheMovePackage(msg StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) {
return mq.Request[StartCacheMovePackageResp](c.rabbitCli, msg, opts...)
func (c *Client) StartCacheMovePackage(msg *StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) {
return mq.Request(Service.StartCacheMovePackage, c.rabbitCli, msg, opts...)
}

View File

@ -14,32 +14,34 @@ type AdvisorService interface {
var _ = Register(Service.ReportAdvisorTaskStatus)
type ReportAdvisorTaskStatus struct {
mq.MessageBodyBase
AdvisorID string `json:"advisorID"`
TaskStatus []AdvisorTaskStatus `json:"taskStatus"`
}
type ReportAdvisorTaskStatusResp struct {
mq.MessageBodyBase
}
type AdvisorTaskStatus struct {
TaskID string
Status advtsk.TaskStatus
}
func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus) ReportAdvisorTaskStatus {
return ReportAdvisorTaskStatus{
func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus) *ReportAdvisorTaskStatus {
return &ReportAdvisorTaskStatus{
AdvisorID: advisorID,
TaskStatus: taskStatus,
}
}
func NewReportAdvisorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
func NewReportAdvisorTaskStatusResp() *ReportExecutorTaskStatusResp {
return &ReportExecutorTaskStatusResp{}
}
func NewAdvisorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) AdvisorTaskStatus {
func NewAdvisorTaskStatus(taskID string, status exectsk.TaskStatus) AdvisorTaskStatus {
return AdvisorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportAdvisorTaskStatus(msg ReportAdvisorTaskStatus, opts ...mq.RequestOption) (*ReportAdvisorTaskStatusResp, error) {
return mq.Request[ReportAdvisorTaskStatusResp](c.rabbitCli, msg, opts...)
func (c *Client) ReportAdvisorTaskStatus(msg *ReportAdvisorTaskStatus, opts ...mq.RequestOption) (*ReportAdvisorTaskStatusResp, error) {
return mq.Request(Service.ReportAdvisorTaskStatus, c.rabbitCli, msg, opts...)
}

View File

@ -13,32 +13,34 @@ type ExecutorService interface {
var _ = Register(Service.ReportExecutorTaskStatus)
type ReportExecutorTaskStatus struct {
mq.MessageBodyBase
ExecutorID string `json:"executorID"`
TaskStatus []ExecutorTaskStatus `json:"taskStatus"`
}
type ReportExecutorTaskStatusResp struct {
mq.MessageBodyBase
}
type ExecutorTaskStatus struct {
TaskID string
Status exectsk.TaskStatus
}
func NewReportExecutorTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) ReportExecutorTaskStatus {
return ReportExecutorTaskStatus{
func NewReportExecutorTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus {
return &ReportExecutorTaskStatus{
ExecutorID: executorID,
TaskStatus: taskStatus,
}
}
func NewReportExecutorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp {
return &ReportExecutorTaskStatusResp{}
}
func NewExecutorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) ExecutorTaskStatus {
func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTaskStatus {
return ExecutorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportExecutorTaskStatus(msg ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
return mq.Request[ReportExecutorTaskStatusResp](c.rabbitCli, msg, opts...)
func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
return mq.Request(Service.ReportExecutorTaskStatus, c.rabbitCli, msg, opts...)
}

View File

@ -12,50 +12,54 @@ type ImageService interface {
var _ = Register(Service.CreateImage)
type CreateImage struct {
mq.MessageBodyBase
SlwNodeImageID string `json:"slwNodeImageID"` // 算力中心的镜像ID
PackageID int64 `json:"packageID"` // 镜像文件
}
type CreateImageResp struct {
mq.MessageBodyBase
ImageID string `json:"imageID"`
}
func NewCreateImage(slwNodeImageID string, packageID int64) CreateImage {
return CreateImage{
func NewCreateImage(slwNodeImageID string, packageID int64) *CreateImage {
return &CreateImage{
SlwNodeImageID: slwNodeImageID,
PackageID: packageID,
}
}
func NewCreateImageResp(imageID string) CreateImageResp {
return CreateImageResp{
func NewCreateImageResp(imageID string) *CreateImageResp {
return &CreateImageResp{
ImageID: imageID,
}
}
func (c *Client) CreateImage(msg CreateImage, opts ...mq.RequestOption) (*CreateImageResp, error) {
return mq.Request[CreateImageResp](c.rabbitCli, msg, opts...)
func (c *Client) CreateImage(msg *CreateImage, opts ...mq.RequestOption) (*CreateImageResp, error) {
return mq.Request(Service.CreateImage, c.rabbitCli, msg, opts...)
}
// 查询镜像信息
var _ = Register(Service.GetImageInfo)
type GetImageInfo struct {
mq.MessageBodyBase
ImageID string `json:"imageID"`
}
type GetImageInfoResp struct {
mq.MessageBodyBase
SlwNodeImageID string `json:"slwNodeImageID"` // 算力中心的镜像ID
PackageID int64 `json:"packageID"` // 镜像文件
}
func NewGetImageInfo(imageID string) GetImageInfo {
return GetImageInfo{
func NewGetImageInfo(imageID string) *GetImageInfo {
return &GetImageInfo{
ImageID: imageID,
}
}
func NewGetImageInfoResp(slwNodeImageID string, packageID int64) GetImageInfoResp {
return GetImageInfoResp{
func NewGetImageInfoResp(slwNodeImageID string, packageID int64) *GetImageInfoResp {
return &GetImageInfoResp{
SlwNodeImageID: slwNodeImageID,
PackageID: packageID,
}
}
func (c *Client) GetImageInfo(msg GetImageInfo, opts ...mq.RequestOption) (*GetImageInfoResp, error) {
return mq.Request[GetImageInfoResp](c.rabbitCli, msg, opts...)
func (c *Client) GetImageInfo(msg *GetImageInfo, opts ...mq.RequestOption) (*GetImageInfoResp, error) {
return mq.Request(Service.GetImageInfo, c.rabbitCli, msg, opts...)
}

View File

@ -10,81 +10,115 @@ type JobService interface {
SubmitJobSet(msg *SubmitJobSet) (*SubmitJobSetResp, *mq.CodeMessage)
JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded) (*JobSetLocalFileUploadedResp, *mq.CodeMessage)
GetJob(msg *GetJob) (*GetJobResp, *mq.CodeMessage)
GetJobSetJobs(msg *GetJobSetJobs) (*GetJobSetJobsResp, *mq.CodeMessage)
}
// 提交任务集
var _ = Register(Service.SubmitJobSet)
type SubmitJobSet struct {
mq.MessageBodyBase
JobSet models.JobSetInfo `json:"jobSet"`
PreScheduleScheme jobmod.JobSetPreScheduleScheme `json:"preScheduleScheme"`
}
type SubmitJobSetResp struct {
mq.MessageBodyBase
JobSetID string `json:"jobSetID"`
}
func NewSubmitJobSet(jobSet models.JobSetInfo, preScheduleScheme jobmod.JobSetPreScheduleScheme) SubmitJobSet {
return SubmitJobSet{
func NewSubmitJobSet(jobSet models.JobSetInfo, preScheduleScheme jobmod.JobSetPreScheduleScheme) *SubmitJobSet {
return &SubmitJobSet{
JobSet: jobSet,
PreScheduleScheme: preScheduleScheme,
}
}
func NewSubmitJobSetResp(jobSetID string) SubmitJobSetResp {
return SubmitJobSetResp{
func NewSubmitJobSetResp(jobSetID string) *SubmitJobSetResp {
return &SubmitJobSetResp{
JobSetID: jobSetID,
}
}
func (c *Client) SubmitJobSet(msg SubmitJobSet, opts ...mq.RequestOption) (*SubmitJobSetResp, error) {
return mq.Request[SubmitJobSetResp](c.rabbitCli, msg, opts...)
func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*SubmitJobSetResp, error) {
return mq.Request(Service.SubmitJobSet, c.rabbitCli, msg, opts...)
}
// JobSet中需要使用的一个文件上传完成
var _ = Register(Service.JobSetLocalFileUploaded)
type JobSetLocalFileUploaded struct {
mq.MessageBodyBase
JobSetID string `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因
PackageID int64 `json:"packageID"` // 如果上传文件成功那么这个字段是上传之后得到的PackageID
}
type JobSetLocalFileUploadedResp struct {
mq.MessageBodyBase
}
func NewJobSetLocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) JobSetLocalFileUploaded {
return JobSetLocalFileUploaded{
func NewJobSetLocalFileUploaded(jobSetID string, localPath string, err string, packageID int64) *JobSetLocalFileUploaded {
return &JobSetLocalFileUploaded{
JobSetID: jobSetID,
LocalPath: localPath,
Error: err,
PackageID: packageID,
}
}
func NewJobSetLocalFileUploadedResp() JobSetLocalFileUploadedResp {
return JobSetLocalFileUploadedResp{}
func NewJobSetLocalFileUploadedResp() *JobSetLocalFileUploadedResp {
return &JobSetLocalFileUploadedResp{}
}
func (c *Client) JobSetLocalFileUploaded(msg JobSetLocalFileUploaded, opts ...mq.RequestOption) (*JobSetLocalFileUploadedResp, error) {
return mq.Request[JobSetLocalFileUploadedResp](c.rabbitCli, msg, opts...)
func (c *Client) JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded, opts ...mq.RequestOption) (*JobSetLocalFileUploadedResp, error) {
return mq.Request(Service.JobSetLocalFileUploaded, c.rabbitCli, msg, opts...)
}
// 获取任务数据
type GetJob struct {
mq.MessageBodyBase
JobID string `json:"jobID"`
}
type GetJobResp struct {
mq.MessageBodyBase
Job jobmod.Job `json:"job"`
}
func NewGetJob(jobID string) GetJob {
return GetJob{
func NewGetJob(jobID string) *GetJob {
return &GetJob{
JobID: jobID,
}
}
func NewGetJobResp(job jobmod.Job) GetJobResp {
return GetJobResp{
func NewGetJobResp(job jobmod.Job) *GetJobResp {
return &GetJobResp{
Job: job,
}
}
func (c *Client) GetJob(msg GetJob, opts ...mq.RequestOption) (*GetJobResp, error) {
return mq.Request[GetJobResp](c.rabbitCli, msg, opts...)
func (c *Client) GetJob(msg *GetJob, opts ...mq.RequestOption) (*GetJobResp, error) {
return mq.Request(Service.GetJob, c.rabbitCli, msg, opts...)
}
// 获取指定任务集中的所有任务数据
type GetJobSetJobs struct {
mq.MessageBodyBase
JobSetID string `json:"jobSetID"`
}
type GetJobSetJobsResp struct {
mq.MessageBodyBase
Jobs []jobmod.Job `json:"jobs"`
}
func NewGetJobSetJobs(jobSetID string) *GetJobSetJobs {
return &GetJobSetJobs{
JobSetID: jobSetID,
}
}
func NewGetJobSetJobsResp(jobs []jobmod.Job) *GetJobSetJobsResp {
return &GetJobSetJobsResp{
Jobs: jobs,
}
}
func (c *Client) GetJobSetJobs(msg *GetJobSetJobs, opts ...mq.RequestOption) (*GetJobSetJobsResp, error) {
return mq.Request(Service.GetJobSetJobs, c.rabbitCli, msg, opts...)
}
func init() {

View File

@ -58,7 +58,7 @@ var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any {
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()
@ -68,7 +68,7 @@ func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*T
// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any {
func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()

View File

@ -19,7 +19,7 @@ func (svc *Service) GetImageList(msg *execmq.GetImageList) (*execmq.GetImageList
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return mq.ReplyFailed[execmq.GetImageListResp](errorcode.OperationFailed, "new pcm client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
@ -28,7 +28,7 @@ func (svc *Service) GetImageList(msg *execmq.GetImageList) (*execmq.GetImageList
})
if err != nil {
logger.Warnf("get image list failed, err: %s", err.Error())
return mq.ReplyFailed[execmq.GetImageListResp](errorcode.OperationFailed, "get image list failed")
return nil, mq.Failed(errorcode.OperationFailed, "get image list failed")
}
return mq.ReplyOK(execmq.NewGetImageListResp(resp.ImageIDs))
@ -38,7 +38,7 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return mq.ReplyFailed[execmq.DeleteImageResp](errorcode.OperationFailed, "new pcm client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
@ -48,7 +48,7 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes
})
if err != nil {
logger.Warnf("delete image failed, err: %s", err.Error())
return mq.ReplyFailed[execmq.DeleteImageResp](errorcode.OperationFailed, "delete image failed")
return nil, mq.Failed(errorcode.OperationFailed, "delete image failed")
}
return mq.ReplyOK(execmq.NewDeleteImageResp(resp.Result))
}
@ -62,7 +62,7 @@ func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp,
pcmCli, err := globals.PCMPool.Acquire()
if err != nil {
logger.Warnf("new pcm client, err: %s", err.Error())
return mq.ReplyFailed[execmq.DeleteTaskResp](errorcode.OperationFailed, "new pcm client failed")
return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed")
}
defer pcmCli.Close()
@ -72,7 +72,7 @@ func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp,
})
if err != nil {
logger.Warnf("delete task failed, err: %s", err.Error())
return mq.ReplyFailed[execmq.DeleteTaskResp](errorcode.OperationFailed, "delete task failed")
return nil, mq.Failed(errorcode.OperationFailed, "delete task failed")
}
return mq.ReplyOK(execmq.NewDeleteTaskResp(resp.Result))
}