修改模块名
This commit is contained in:
parent
50b5f07e55
commit
b8c42c44d6
|
@ -3,8 +3,21 @@ package globals
|
||||||
import (
|
import (
|
||||||
cldstg "gitlink.org.cn/cloudream/common/api/storage"
|
cldstg "gitlink.org.cn/cloudream/common/api/storage"
|
||||||
"gitlink.org.cn/cloudream/common/api/unifyops"
|
"gitlink.org.cn/cloudream/common/api/unifyops"
|
||||||
|
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||||
|
cltmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
|
||||||
|
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ExecutorMQPool *execmq.Pool
|
||||||
|
|
||||||
|
var CollectorMQPool *cltmq.Pool
|
||||||
|
|
||||||
|
func InitMQPool(cfg *scmq.Config) {
|
||||||
|
ExecutorMQPool = execmq.NewPool(cfg)
|
||||||
|
|
||||||
|
CollectorMQPool = cltmq.NewPool(cfg)
|
||||||
|
}
|
||||||
|
|
||||||
var CloudreamStoragePool *cldstg.Pool
|
var CloudreamStoragePool *cldstg.Pool
|
||||||
|
|
||||||
func InitCloudreamStoragePool(cfg *cldstg.Config) {
|
func InitCloudreamStoragePool(cfg *cldstg.Config) {
|
|
@ -7,10 +7,9 @@ import (
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
rabbitCli *mq.RabbitMQClient
|
rabbitCli *mq.RabbitMQClient
|
||||||
id int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(id int64, cfg *mymq.Config) (*Client, error) {
|
func NewClient(cfg *mymq.Config) (*Client, error) {
|
||||||
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "")
|
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -18,10 +17,43 @@ func NewClient(id int64, cfg *mymq.Config) (*Client, error) {
|
||||||
|
|
||||||
return &Client{
|
return &Client{
|
||||||
rabbitCli: rabbitCli,
|
rabbitCli: rabbitCli,
|
||||||
id: id,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) Close() {
|
func (c *Client) Close() {
|
||||||
c.rabbitCli.Close()
|
c.rabbitCli.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PoolClient struct {
|
||||||
|
*Client
|
||||||
|
owner *Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *PoolClient) Close() {
|
||||||
|
c.owner.Release(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Pool struct {
|
||||||
|
mqcfg *mymq.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPool(mqcfg *mymq.Config) *Pool {
|
||||||
|
return &Pool{
|
||||||
|
mqcfg: mqcfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (p *Pool) Acquire() (*PoolClient, error) {
|
||||||
|
cli, err := NewClient(p.mqcfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PoolClient{
|
||||||
|
Client: cli,
|
||||||
|
owner: p,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) Release(cli *PoolClient) {
|
||||||
|
cli.Client.Close()
|
||||||
|
}
|
||||||
|
|
|
@ -1,27 +0,0 @@
|
||||||
package dcontroller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
|
||||||
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
rabbitCli *mq.RabbitMQClient
|
|
||||||
id int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewClient(id int64, cfg *mymq.Config) (*Client, error) {
|
|
||||||
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Client{
|
|
||||||
rabbitCli: rabbitCli,
|
|
||||||
id: id,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Close() {
|
|
||||||
c.rabbitCli.Close()
|
|
||||||
}
|
|
|
@ -1,4 +1,4 @@
|
||||||
package dcontroller
|
package executor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gitlink.org.cn/cloudream/common/models"
|
"gitlink.org.cn/cloudream/common/models"
|
|
@ -0,0 +1,59 @@
|
||||||
|
package executor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||||
|
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
rabbitCli *mq.RabbitMQClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClient(cfg *mymq.Config) (*Client, error) {
|
||||||
|
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Client{
|
||||||
|
rabbitCli: rabbitCli,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) Close() {
|
||||||
|
c.rabbitCli.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
type PoolClient struct {
|
||||||
|
*Client
|
||||||
|
owner *Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *PoolClient) Close() {
|
||||||
|
c.owner.Release(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Pool struct {
|
||||||
|
mqcfg *mymq.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPool(mqcfg *mymq.Config) *Pool {
|
||||||
|
return &Pool{
|
||||||
|
mqcfg: mqcfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
func (p *Pool) Acquire() (*PoolClient, error) {
|
||||||
|
cli, err := NewClient(p.mqcfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PoolClient{
|
||||||
|
Client: cli,
|
||||||
|
owner: p,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Pool) Release(cli *PoolClient) {
|
||||||
|
cli.Client.Close()
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package dcontroller
|
package executor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
|
@ -4,16 +4,16 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
||||||
dctrlmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/dcontroller"
|
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||||
schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task"
|
schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (svc *Service) StartStorageLoadPackage(msg *dctrlmq.StartStorageLoadPackage) (*dctrlmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
|
func (svc *Service) StartStorageLoadPackage(msg *execmq.StartStorageLoadPackage) (*execmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
|
||||||
tsk := svc.taskManager.StartNew(schtsk.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID))
|
tsk := svc.taskManager.StartNew(schtsk.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID))
|
||||||
return mq.ReplyOK(dctrlmq.NewStartStorageLoadPackageResp(tsk.ID()))
|
return mq.ReplyOK(execmq.NewStartStorageLoadPackageResp(tsk.ID()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *Service) WaitStorageLoadPackage(msg *dctrlmq.WaitStorageLoadPackage) (*dctrlmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
|
func (svc *Service) WaitStorageLoadPackage(msg *execmq.WaitStorageLoadPackage) (*execmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
|
||||||
tsk := svc.taskManager.FindByID(msg.TaskID)
|
tsk := svc.taskManager.FindByID(msg.TaskID)
|
||||||
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
|
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
|
||||||
errMsg := ""
|
errMsg := ""
|
||||||
|
@ -21,17 +21,17 @@ func (svc *Service) WaitStorageLoadPackage(msg *dctrlmq.WaitStorageLoadPackage)
|
||||||
errMsg = tsk.Error().Error()
|
errMsg = tsk.Error().Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
return mq.ReplyOK(dctrlmq.NewWaitStorageLoadPackageResp(true, errMsg))
|
return mq.ReplyOK(execmq.NewWaitStorageLoadPackageResp(true, errMsg))
|
||||||
}
|
}
|
||||||
return mq.ReplyOK(dctrlmq.NewWaitStorageLoadPackageResp(false, ""))
|
return mq.ReplyOK(execmq.NewWaitStorageLoadPackageResp(false, ""))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *Service) StartStorageCreatePackage(msg *dctrlmq.StartStorageCreatePackage) (*dctrlmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
|
func (svc *Service) StartStorageCreatePackage(msg *execmq.StartStorageCreatePackage) (*execmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
|
||||||
tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy))
|
tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy))
|
||||||
return mq.ReplyOK(dctrlmq.NewStartStorageCreatePackageResp(tsk.ID()))
|
return mq.ReplyOK(execmq.NewStartStorageCreatePackageResp(tsk.ID()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *Service) WaitStorageCreatePackage(msg *dctrlmq.WaitStorageCreatePackage) (*dctrlmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
|
func (svc *Service) WaitStorageCreatePackage(msg *execmq.WaitStorageCreatePackage) (*execmq.WaitStorageCreatePackageResp, *mq.CodeMessage) {
|
||||||
tsk := svc.taskManager.FindByID(msg.TaskID)
|
tsk := svc.taskManager.FindByID(msg.TaskID)
|
||||||
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
|
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
|
||||||
tskBody := tsk.Body().(*schtsk.StorageCreatePackage)
|
tskBody := tsk.Body().(*schtsk.StorageCreatePackage)
|
||||||
|
@ -41,17 +41,17 @@ func (svc *Service) WaitStorageCreatePackage(msg *dctrlmq.WaitStorageCreatePacka
|
||||||
errMsg = tsk.Error().Error()
|
errMsg = tsk.Error().Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
return mq.ReplyOK(dctrlmq.NewWaitStorageCreatePackageResp(true, errMsg, tskBody.Result.PackageID))
|
return mq.ReplyOK(execmq.NewWaitStorageCreatePackageResp(true, errMsg, tskBody.Result.PackageID))
|
||||||
}
|
}
|
||||||
return mq.ReplyOK(dctrlmq.NewWaitStorageCreatePackageResp(false, "", 0))
|
return mq.ReplyOK(execmq.NewWaitStorageCreatePackageResp(false, "", 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *Service) StartCacheMovePackage(msg *dctrlmq.StartCacheMovePackage) (*dctrlmq.StartCacheMovePackageResp, *mq.CodeMessage) {
|
func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) {
|
||||||
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID))
|
tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID))
|
||||||
return mq.ReplyOK(dctrlmq.NewStartCacheMovePackageResp(tsk.ID()))
|
return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (svc *Service) WaitCacheMovePackage(msg *dctrlmq.WaitCacheMovePackage) (*dctrlmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
|
func (svc *Service) WaitCacheMovePackage(msg *execmq.WaitCacheMovePackage) (*execmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
|
||||||
tsk := svc.taskManager.FindByID(msg.TaskID)
|
tsk := svc.taskManager.FindByID(msg.TaskID)
|
||||||
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
|
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
|
||||||
errMsg := ""
|
errMsg := ""
|
||||||
|
@ -59,7 +59,7 @@ func (svc *Service) WaitCacheMovePackage(msg *dctrlmq.WaitCacheMovePackage) (*dc
|
||||||
errMsg = tsk.Error().Error()
|
errMsg = tsk.Error().Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
return mq.ReplyOK(dctrlmq.NewWaitCacheMovePackageResp(true, errMsg))
|
return mq.ReplyOK(execmq.NewWaitCacheMovePackageResp(true, errMsg))
|
||||||
}
|
}
|
||||||
return mq.ReplyOK(dctrlmq.NewWaitCacheMovePackageResp(false, ""))
|
return mq.ReplyOK(execmq.NewWaitCacheMovePackageResp(false, ""))
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ import (
|
||||||
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||||
"gitlink.org.cn/cloudream/scheduler/common/globals"
|
"gitlink.org.cn/cloudream/scheduler/common/globals"
|
||||||
dctrlmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/dcontroller"
|
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
|
||||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
|
"gitlink.org.cn/cloudream/scheduler/executor/internal/config"
|
||||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/services"
|
"gitlink.org.cn/cloudream/scheduler/executor/internal/services"
|
||||||
"gitlink.org.cn/cloudream/scheduler/executor/internal/task"
|
"gitlink.org.cn/cloudream/scheduler/executor/internal/task"
|
||||||
|
@ -29,7 +29,7 @@ func main() {
|
||||||
|
|
||||||
taskMgr := task.NewManager()
|
taskMgr := task.NewManager()
|
||||||
|
|
||||||
mqSvr, err := dctrlmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
|
mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatalf("new dcontroller server failed, err: %s", err.Error())
|
logger.Fatalf("new dcontroller server failed, err: %s", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -45,7 +45,7 @@ func main() {
|
||||||
<-forever
|
<-forever
|
||||||
}
|
}
|
||||||
|
|
||||||
func serveMQServer(server *dctrlmq.Server) {
|
func serveMQServer(server *execmq.Server) {
|
||||||
logger.Info("start serving mq server")
|
logger.Info("start serving mq server")
|
||||||
|
|
||||||
err := server.Serve()
|
err := server.Serve()
|
||||||
|
|
Loading…
Reference in New Issue