diff --git a/common/globals/pool.go b/common/globals/pools.go similarity index 50% rename from common/globals/pool.go rename to common/globals/pools.go index 7c496ba..22ac8a4 100644 --- a/common/globals/pool.go +++ b/common/globals/pools.go @@ -3,8 +3,21 @@ package globals import ( cldstg "gitlink.org.cn/cloudream/common/api/storage" "gitlink.org.cn/cloudream/common/api/unifyops" + scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" + cltmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" + execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" ) +var ExecutorMQPool *execmq.Pool + +var CollectorMQPool *cltmq.Pool + +func InitMQPool(cfg *scmq.Config) { + ExecutorMQPool = execmq.NewPool(cfg) + + CollectorMQPool = cltmq.NewPool(cfg) +} + var CloudreamStoragePool *cldstg.Pool func InitCloudreamStoragePool(cfg *cldstg.Config) { diff --git a/common/pkgs/mq/collector/client.go b/common/pkgs/mq/collector/client.go index f2ae95d..93f395c 100644 --- a/common/pkgs/mq/collector/client.go +++ b/common/pkgs/mq/collector/client.go @@ -7,10 +7,9 @@ import ( type Client struct { rabbitCli *mq.RabbitMQClient - id int64 } -func NewClient(id int64, cfg *mymq.Config) (*Client, error) { +func NewClient(cfg *mymq.Config) (*Client, error) { rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") if err != nil { return nil, err @@ -18,10 +17,43 @@ func NewClient(id int64, cfg *mymq.Config) (*Client, error) { return &Client{ rabbitCli: rabbitCli, - id: id, }, nil } func (c *Client) Close() { c.rabbitCli.Close() } + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + mqcfg *mymq.Config +} + +func NewPool(mqcfg *mymq.Config) *Pool { + return &Pool{ + mqcfg: mqcfg, + } +} +func (p *Pool) Acquire() (*PoolClient, error) { + cli, err := NewClient(p.mqcfg) + if err != nil { + return nil, err + } + + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + cli.Client.Close() +} diff --git a/common/pkgs/mq/dcontroller/client.go b/common/pkgs/mq/dcontroller/client.go deleted file mode 100644 index 23de74b..0000000 --- a/common/pkgs/mq/dcontroller/client.go +++ /dev/null @@ -1,27 +0,0 @@ -package dcontroller - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" -) - -type Client struct { - rabbitCli *mq.RabbitMQClient - id int64 -} - -func NewClient(id int64, cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") - if err != nil { - return nil, err - } - - return &Client{ - rabbitCli: rabbitCli, - id: id, - }, nil -} - -func (c *Client) Close() { - c.rabbitCli.Close() -} diff --git a/common/pkgs/mq/dcontroller/apis.go b/common/pkgs/mq/executor/apis.go similarity index 99% rename from common/pkgs/mq/dcontroller/apis.go rename to common/pkgs/mq/executor/apis.go index 313ef8a..a4eaa7d 100644 --- a/common/pkgs/mq/dcontroller/apis.go +++ b/common/pkgs/mq/executor/apis.go @@ -1,4 +1,4 @@ -package dcontroller +package executor import ( "gitlink.org.cn/cloudream/common/models" diff --git a/common/pkgs/mq/executor/client.go b/common/pkgs/mq/executor/client.go new file mode 100644 index 0000000..8e70cbf --- /dev/null +++ b/common/pkgs/mq/executor/client.go @@ -0,0 +1,59 @@ +package executor + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" +) + +type Client struct { + rabbitCli *mq.RabbitMQClient +} + +func NewClient(cfg *mymq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") + if err != nil { + return nil, err + } + + return &Client{ + rabbitCli: rabbitCli, + }, nil +} + +func (c *Client) Close() { + c.rabbitCli.Close() +} + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + mqcfg *mymq.Config +} + +func NewPool(mqcfg *mymq.Config) *Pool { + return &Pool{ + mqcfg: mqcfg, + } +} +func (p *Pool) Acquire() (*PoolClient, error) { + cli, err := NewClient(p.mqcfg) + if err != nil { + return nil, err + } + + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + cli.Client.Close() +} diff --git a/common/pkgs/mq/dcontroller/server.go b/common/pkgs/mq/executor/server.go similarity index 99% rename from common/pkgs/mq/dcontroller/server.go rename to common/pkgs/mq/executor/server.go index e8dadcd..e36d701 100644 --- a/common/pkgs/mq/dcontroller/server.go +++ b/common/pkgs/mq/executor/server.go @@ -1,4 +1,4 @@ -package dcontroller +package executor import ( "gitlink.org.cn/cloudream/common/pkgs/mq" diff --git a/executor/internal/services/apis.go b/executor/internal/services/apis.go index 231430b..8a76753 100644 --- a/executor/internal/services/apis.go +++ b/executor/internal/services/apis.go @@ -4,16 +4,16 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/mq" - dctrlmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/dcontroller" + execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" schtsk "gitlink.org.cn/cloudream/scheduler/executor/internal/task" ) -func (svc *Service) StartStorageLoadPackage(msg *dctrlmq.StartStorageLoadPackage) (*dctrlmq.StartStorageLoadPackageResp, *mq.CodeMessage) { +func (svc *Service) StartStorageLoadPackage(msg *execmq.StartStorageLoadPackage) (*execmq.StartStorageLoadPackageResp, *mq.CodeMessage) { tsk := svc.taskManager.StartNew(schtsk.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID)) - return mq.ReplyOK(dctrlmq.NewStartStorageLoadPackageResp(tsk.ID())) + return mq.ReplyOK(execmq.NewStartStorageLoadPackageResp(tsk.ID())) } -func (svc *Service) WaitStorageLoadPackage(msg *dctrlmq.WaitStorageLoadPackage) (*dctrlmq.WaitStorageLoadPackageResp, *mq.CodeMessage) { +func (svc *Service) WaitStorageLoadPackage(msg *execmq.WaitStorageLoadPackage) (*execmq.WaitStorageLoadPackageResp, *mq.CodeMessage) { tsk := svc.taskManager.FindByID(msg.TaskID) if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" @@ -21,17 +21,17 @@ func (svc *Service) WaitStorageLoadPackage(msg *dctrlmq.WaitStorageLoadPackage) errMsg = tsk.Error().Error() } - return mq.ReplyOK(dctrlmq.NewWaitStorageLoadPackageResp(true, errMsg)) + return mq.ReplyOK(execmq.NewWaitStorageLoadPackageResp(true, errMsg)) } - return mq.ReplyOK(dctrlmq.NewWaitStorageLoadPackageResp(false, "")) + return mq.ReplyOK(execmq.NewWaitStorageLoadPackageResp(false, "")) } -func (svc *Service) StartStorageCreatePackage(msg *dctrlmq.StartStorageCreatePackage) (*dctrlmq.StartStorageCreatePackageResp, *mq.CodeMessage) { +func (svc *Service) StartStorageCreatePackage(msg *execmq.StartStorageCreatePackage) (*execmq.StartStorageCreatePackageResp, *mq.CodeMessage) { tsk := svc.taskManager.StartNew(schtsk.NewStorageCreatePackage(msg.UserID, msg.StorageID, msg.Path, msg.BucketID, msg.Name, msg.Redundancy)) - return mq.ReplyOK(dctrlmq.NewStartStorageCreatePackageResp(tsk.ID())) + return mq.ReplyOK(execmq.NewStartStorageCreatePackageResp(tsk.ID())) } -func (svc *Service) WaitStorageCreatePackage(msg *dctrlmq.WaitStorageCreatePackage) (*dctrlmq.WaitStorageCreatePackageResp, *mq.CodeMessage) { +func (svc *Service) WaitStorageCreatePackage(msg *execmq.WaitStorageCreatePackage) (*execmq.WaitStorageCreatePackageResp, *mq.CodeMessage) { tsk := svc.taskManager.FindByID(msg.TaskID) if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { tskBody := tsk.Body().(*schtsk.StorageCreatePackage) @@ -41,17 +41,17 @@ func (svc *Service) WaitStorageCreatePackage(msg *dctrlmq.WaitStorageCreatePacka errMsg = tsk.Error().Error() } - return mq.ReplyOK(dctrlmq.NewWaitStorageCreatePackageResp(true, errMsg, tskBody.Result.PackageID)) + return mq.ReplyOK(execmq.NewWaitStorageCreatePackageResp(true, errMsg, tskBody.Result.PackageID)) } - return mq.ReplyOK(dctrlmq.NewWaitStorageCreatePackageResp(false, "", 0)) + return mq.ReplyOK(execmq.NewWaitStorageCreatePackageResp(false, "", 0)) } -func (svc *Service) StartCacheMovePackage(msg *dctrlmq.StartCacheMovePackage) (*dctrlmq.StartCacheMovePackageResp, *mq.CodeMessage) { +func (svc *Service) StartCacheMovePackage(msg *execmq.StartCacheMovePackage) (*execmq.StartCacheMovePackageResp, *mq.CodeMessage) { tsk := svc.taskManager.StartNew(schtsk.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.NodeID)) - return mq.ReplyOK(dctrlmq.NewStartCacheMovePackageResp(tsk.ID())) + return mq.ReplyOK(execmq.NewStartCacheMovePackageResp(tsk.ID())) } -func (svc *Service) WaitCacheMovePackage(msg *dctrlmq.WaitCacheMovePackage) (*dctrlmq.WaitCacheMovePackageResp, *mq.CodeMessage) { +func (svc *Service) WaitCacheMovePackage(msg *execmq.WaitCacheMovePackage) (*execmq.WaitCacheMovePackageResp, *mq.CodeMessage) { tsk := svc.taskManager.FindByID(msg.TaskID) if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" @@ -59,7 +59,7 @@ func (svc *Service) WaitCacheMovePackage(msg *dctrlmq.WaitCacheMovePackage) (*dc errMsg = tsk.Error().Error() } - return mq.ReplyOK(dctrlmq.NewWaitCacheMovePackageResp(true, errMsg)) + return mq.ReplyOK(execmq.NewWaitCacheMovePackageResp(true, errMsg)) } - return mq.ReplyOK(dctrlmq.NewWaitCacheMovePackageResp(false, "")) + return mq.ReplyOK(execmq.NewWaitCacheMovePackageResp(false, "")) } diff --git a/executor/main.go b/executor/main.go index a82fd6e..041e63c 100644 --- a/executor/main.go +++ b/executor/main.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/scheduler/common/globals" - dctrlmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/dcontroller" + execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" "gitlink.org.cn/cloudream/scheduler/executor/internal/config" "gitlink.org.cn/cloudream/scheduler/executor/internal/services" "gitlink.org.cn/cloudream/scheduler/executor/internal/task" @@ -29,7 +29,7 @@ func main() { taskMgr := task.NewManager() - mqSvr, err := dctrlmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ) + mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new dcontroller server failed, err: %s", err.Error()) } @@ -45,7 +45,7 @@ func main() { <-forever } -func serveMQServer(server *dctrlmq.Server) { +func serveMQServer(server *execmq.Server) { logger.Info("start serving mq server") err := server.Serve()