diff --git a/advisor/internal/reporter/reporter.go b/advisor/internal/reporter/reporter.go index 5426474..1d71fe7 100644 --- a/advisor/internal/reporter/reporter.go +++ b/advisor/internal/reporter/reporter.go @@ -48,7 +48,7 @@ func (r *Reporter) Serve() error { if err != nil { return fmt.Errorf("new manager client: %w", err) } - defer magCli.Close() + defer schglb.ManagerMQPool.Release(magCli) ticker := time.NewTicker(r.reportInterval) defer ticker.Stop() diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index ece7b86..656728b 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -132,7 +132,7 @@ func (s *DefaultScheduler) Schedule(job *jobmod.NormalJob) (*jobmod.JobScheduleS if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) allSlwNodes := make(map[uopsdk.SlwNodeID]*candidateSlwNode) @@ -215,7 +215,7 @@ func (s *DefaultScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID)) if err != nil { @@ -405,7 +405,7 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNod if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail) @@ -456,13 +456,13 @@ func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNode if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) magCli, err := schglb.ManagerMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new manager client: %w", err) } - defer magCli.Close() + defer schglb.ManagerMQPool.Release(magCli) imageInfoResp, err := magCli.GetImageInfo(manager.NewGetImageInfo(imageID)) if err != nil { diff --git a/client/internal/prescheduler/default_prescheduler.go b/client/internal/prescheduler/default_prescheduler.go index 78800bd..da0f4b9 100644 --- a/client/internal/prescheduler/default_prescheduler.go +++ b/client/internal/prescheduler/default_prescheduler.go @@ -136,7 +136,7 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP if err != nil { return nil, nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) // 查询有哪些算力中心可用 getNodesResp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo()) @@ -378,7 +378,7 @@ func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesI if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID)) if err != nil { @@ -575,7 +575,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) slwNodeFileScores := make(map[uopsdk.SlwNodeID]*fileDetail) @@ -626,13 +626,13 @@ func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwN if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) magCli, err := schglb.ManagerMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new manager client: %w", err) } - defer magCli.Close() + defer schglb.ManagerMQPool.Release(magCli) imageInfoResp, err := magCli.GetImageInfo(manager.NewGetImageInfo(imageID)) if err != nil { diff --git a/client/internal/services/jobset.go b/client/internal/services/jobset.go index 30d807f..3d316eb 100644 --- a/client/internal/services/jobset.go +++ b/client/internal/services/jobset.go @@ -22,7 +22,7 @@ func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schs if err != nil { return "", nil, fmt.Errorf("new manager client: %w", err) } - defer mgrCli.Close() + defer schglb.ManagerMQPool.Release(mgrCli) schScheme, uploadScheme, err := svc.preScheduler.Schedule(&info) if err != nil { @@ -43,7 +43,7 @@ func (svc *JobSetService) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath if err != nil { return fmt.Errorf("new manager client: %w", err) } - defer mgrCli.Close() + defer schglb.ManagerMQPool.Release(mgrCli) _, err = mgrCli.JobSetLocalFileUploaded(mgrmq.NewJobSetLocalFileUploaded(jobSetID, localPath, errMsg, packageID)) if err != nil { diff --git a/collector/internal/mq/pcm.go b/collector/internal/mq/pcm.go index 58bc20d..c11c548 100644 --- a/collector/internal/mq/pcm.go +++ b/collector/internal/mq/pcm.go @@ -15,7 +15,7 @@ func (svc *Service) GetImageList(msg *colmq.GetImageList) (*colmq.GetImageListRe logger.Warnf("new pcm client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed") } - defer pcmCli.Close() + defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.GetImageList(pcmsdk.GetImageListReq{ SlwNodeID: msg.SlwNodeID, diff --git a/collector/internal/mq/resource.go b/collector/internal/mq/resource.go index f2a4851..158f08a 100644 --- a/collector/internal/mq/resource.go +++ b/collector/internal/mq/resource.go @@ -15,7 +15,7 @@ func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.Ge logger.Warnf("new unifyOps client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed") } - defer uniOpsCli.Close() + defer schglb.UnifyOpsPool.Release(uniOpsCli) var resp uopsdk.ResourceData switch msg.Type { @@ -61,7 +61,7 @@ func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.Ge logger.Warnf("new unifyOps client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed") } - defer uniOpsCli.Close() + defer schglb.UnifyOpsPool.Release(uniOpsCli) resps, err := uniOpsCli.GetIndicatorData(uopsdk.GetOneResourceDataReq{ SlwNodeID: msg.SlwNodeID, diff --git a/collector/internal/mq/slw.go b/collector/internal/mq/slw.go index 763ef75..416c5b8 100644 --- a/collector/internal/mq/slw.go +++ b/collector/internal/mq/slw.go @@ -17,7 +17,7 @@ func (svc *Service) GetSlwNodeInfo(msg *colmq.GetSlwNodeInfo) (*colmq.GetSlwNode logger.Warnf("new unifyOps client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed") } - defer uniOpsCli.Close() + defer schglb.UnifyOpsPool.Release(uniOpsCli) resp, err := uniOpsCli.GetAllSlwNodeInfo() if err != nil { @@ -52,7 +52,7 @@ func (svc *Service) GetAllSlwNodeInfo(msg *colmq.GetAllSlwNodeInfo) (*colmq.GetA logger.Warnf("new unifyOps client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new unifyOps client failed") } - defer uniOpsCli.Close() + defer schglb.UnifyOpsPool.Release(uniOpsCli) resp, err := uniOpsCli.GetAllSlwNodeInfo() if err != nil { diff --git a/collector/internal/mq/storage.go b/collector/internal/mq/storage.go index 4b5d7b0..2df0ec9 100644 --- a/collector/internal/mq/storage.go +++ b/collector/internal/mq/storage.go @@ -15,7 +15,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes logger.Warnf("new storage client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new storage client failed") } - defer stgCli.Close() + defer schglb.CloudreamStoragePool.Release(stgCli) resp, err := stgCli.PackageGetCachedNodes(stgsdk.PackageGetCachedNodesReq{ PackageID: msg.PackageID, @@ -35,7 +35,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes logger.Warnf("new storage client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new storage client failed") } - defer stgCli.Close() + defer schglb.CloudreamStoragePool.Release(stgCli) resp, err := stgCli.PackageGetLoadedNodes(stgsdk.PackageGetLoadedNodesReq{ PackageID: msg.PackageID, diff --git a/common/globals/pools.go b/common/globals/pools.go index f2eea6c..1c9b176 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -11,13 +11,13 @@ import ( mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) -var ExecutorMQPool *exemq.Pool +var ExecutorMQPool exemq.Pool -var AdvisorMQPool *advmq.Pool +var AdvisorMQPool advmq.Pool -var CollectorMQPool *cltmq.Pool +var CollectorMQPool cltmq.Pool -var ManagerMQPool *mgrmq.Pool +var ManagerMQPool mgrmq.Pool func InitMQPool(cfg *scmq.Config) { ExecutorMQPool = exemq.NewPool(cfg) @@ -26,19 +26,19 @@ func InitMQPool(cfg *scmq.Config) { ManagerMQPool = mgrmq.NewPool(cfg) } -var CloudreamStoragePool *stgsdk.Pool +var CloudreamStoragePool stgsdk.Pool func InitCloudreamStoragePool(cfg *stgsdk.Config) { CloudreamStoragePool = stgsdk.NewPool(cfg) } -var UnifyOpsPool *uopsdk.Pool +var UnifyOpsPool uopsdk.Pool func IniUnifyOpsPool(cfg *uopsdk.Config) { UnifyOpsPool = uopsdk.NewPool(cfg) } -var PCMPool *pcmsdk.Pool +var PCMPool pcmsdk.Pool func InitPCMPool(cfg *pcmsdk.Config) { PCMPool = pcmsdk.NewPool(cfg) diff --git a/common/pkgs/mq/advisor/client.go b/common/pkgs/mq/advisor/client.go index ea84058..4af467e 100644 --- a/common/pkgs/mq/advisor/client.go +++ b/common/pkgs/mq/advisor/client.go @@ -2,15 +2,15 @@ package advisor import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" + schmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) type Client struct { - rabbitCli *mq.RabbitMQClient + rabbitCli *mq.RabbitMQTransport } -func NewClient(cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") +func NewClient(cfg *schmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), ServerQueueName, "") if err != nil { return nil, err } @@ -24,36 +24,24 @@ func (c *Client) Close() { c.rabbitCli.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) +type pool struct { + mqcfg *schmq.Config } -type Pool struct { - mqcfg *mymq.Config -} - -func NewPool(mqcfg *mymq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *schmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { - cli, err := NewClient(p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire() (*Client, error) { + return NewClient(p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/common/pkgs/mq/collector/client.go b/common/pkgs/mq/collector/client.go index 93f395c..8b40349 100644 --- a/common/pkgs/mq/collector/client.go +++ b/common/pkgs/mq/collector/client.go @@ -2,15 +2,15 @@ package collector import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" + schmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) type Client struct { - rabbitCli *mq.RabbitMQClient + rabbitCli *mq.RabbitMQTransport } -func NewClient(cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") +func NewClient(cfg *schmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), ServerQueueName, "") if err != nil { return nil, err } @@ -24,36 +24,24 @@ func (c *Client) Close() { c.rabbitCli.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) +type pool struct { + mqcfg *schmq.Config } -type Pool struct { - mqcfg *mymq.Config -} - -func NewPool(mqcfg *mymq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *schmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { - cli, err := NewClient(p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire() (*Client, error) { + return NewClient(p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/common/pkgs/mq/executor/client.go b/common/pkgs/mq/executor/client.go index 8e70cbf..8836dcb 100644 --- a/common/pkgs/mq/executor/client.go +++ b/common/pkgs/mq/executor/client.go @@ -2,15 +2,15 @@ package executor import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" + schmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) type Client struct { - rabbitCli *mq.RabbitMQClient + rabbitCli *mq.RabbitMQTransport } -func NewClient(cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") +func NewClient(cfg *schmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), ServerQueueName, "") if err != nil { return nil, err } @@ -24,36 +24,24 @@ func (c *Client) Close() { c.rabbitCli.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) +type pool struct { + mqcfg *schmq.Config } -type Pool struct { - mqcfg *mymq.Config -} - -func NewPool(mqcfg *mymq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *schmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { - cli, err := NewClient(p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire() (*Client, error) { + return NewClient(p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/common/pkgs/mq/manager/advisor.go b/common/pkgs/mq/manager/advisor.go index f5ab5e6..b9860dd 100644 --- a/common/pkgs/mq/manager/advisor.go +++ b/common/pkgs/mq/manager/advisor.go @@ -44,5 +44,5 @@ func NewAdvisorTaskStatus(taskID string, status exectsk.TaskStatus) AdvisorTaskS } } func (c *Client) ReportAdvisorTaskStatus(msg *ReportAdvisorTaskStatus, opts ...mq.RequestOption) (*ReportAdvisorTaskStatusResp, error) { - return mq.Request(Service.ReportAdvisorTaskStatus, c.rabbitCli, msg, opts...) + return mq.Request(Service.ReportAdvisorTaskStatus, c.roundTripper, msg, opts...) } diff --git a/common/pkgs/mq/manager/client.go b/common/pkgs/mq/manager/client.go index 32dc418..9731335 100644 --- a/common/pkgs/mq/manager/client.go +++ b/common/pkgs/mq/manager/client.go @@ -2,58 +2,52 @@ package manager import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" + schmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) type Client struct { - rabbitCli *mq.RabbitMQClient + roundTripper mq.RoundTripper } -func NewClient(cfg *mymq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "") +func NewClient(cfg *schmq.Config) (*Client, error) { + rabbitCli, err := mq.NewRabbitMQTransport(cfg.MakeConnectingURL(), ServerQueueName, "") if err != nil { return nil, err } return &Client{ - rabbitCli: rabbitCli, + roundTripper: rabbitCli, + }, nil +} + +func NewClientWithRoundTripper(rd mq.RoundTripper) (*Client, error) { + return &Client{ + roundTripper: rd, }, nil } func (c *Client) Close() { - c.rabbitCli.Close() + c.roundTripper.Close() } -type PoolClient struct { - *Client - owner *Pool +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) } -func (c *PoolClient) Close() { - c.owner.Release(c) +type pool struct { + mqcfg *schmq.Config } -type Pool struct { - mqcfg *mymq.Config -} - -func NewPool(mqcfg *mymq.Config) *Pool { - return &Pool{ +func NewPool(mqcfg *schmq.Config) Pool { + return &pool{ mqcfg: mqcfg, } } -func (p *Pool) Acquire() (*PoolClient, error) { - cli, err := NewClient(p.mqcfg) - if err != nil { - return nil, err - } - - return &PoolClient{ - Client: cli, - owner: p, - }, nil +func (p *pool) Acquire() (*Client, error) { + return NewClient(p.mqcfg) } -func (p *Pool) Release(cli *PoolClient) { - cli.Client.Close() +func (p *pool) Release(cli *Client) { + cli.Close() } diff --git a/common/pkgs/mq/manager/executor.go b/common/pkgs/mq/manager/executor.go index 4ec6d17..a18f51a 100644 --- a/common/pkgs/mq/manager/executor.go +++ b/common/pkgs/mq/manager/executor.go @@ -43,7 +43,7 @@ func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTas } } func (c *Client) ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) { - return mq.Request(Service.ReportExecutorTaskStatus, c.rabbitCli, msg, opts...) + return mq.Request(Service.ReportExecutorTaskStatus, c.roundTripper, msg, opts...) } func init() { diff --git a/common/pkgs/mq/manager/image.go b/common/pkgs/mq/manager/image.go index 0341546..50855a9 100644 --- a/common/pkgs/mq/manager/image.go +++ b/common/pkgs/mq/manager/image.go @@ -69,5 +69,5 @@ func NewGetImageInfoResp(imageID schsdk.ImageID, packageID int64, importingInfo } } func (c *Client) GetImageInfo(msg *GetImageInfo, opts ...mq.RequestOption) (*GetImageInfoResp, error) { - return mq.Request(Service.GetImageInfo, c.rabbitCli, msg, opts...) + return mq.Request(Service.GetImageInfo, c.roundTripper, msg, opts...) } diff --git a/common/pkgs/mq/manager/job.go b/common/pkgs/mq/manager/job.go index f735a83..ddc32ea 100644 --- a/common/pkgs/mq/manager/job.go +++ b/common/pkgs/mq/manager/job.go @@ -41,7 +41,7 @@ func NewSubmitJobSetResp(jobSetID schsdk.JobSetID) *SubmitJobSetResp { } } func (c *Client) SubmitJobSet(msg *SubmitJobSet, opts ...mq.RequestOption) (*SubmitJobSetResp, error) { - return mq.Request(Service.SubmitJobSet, c.rabbitCli, msg, opts...) + return mq.Request(Service.SubmitJobSet, c.roundTripper, msg, opts...) } // JobSet中需要使用的一个文件上传完成 @@ -70,7 +70,7 @@ func NewJobSetLocalFileUploadedResp() *JobSetLocalFileUploadedResp { return &JobSetLocalFileUploadedResp{} } func (c *Client) JobSetLocalFileUploaded(msg *JobSetLocalFileUploaded, opts ...mq.RequestOption) (*JobSetLocalFileUploadedResp, error) { - return mq.Request(Service.JobSetLocalFileUploaded, c.rabbitCli, msg, opts...) + return mq.Request(Service.JobSetLocalFileUploaded, c.roundTripper, msg, opts...) } // 获取任务数据 @@ -94,7 +94,7 @@ func NewGetJobResp(job jobmod.Job) *GetJobResp { } } func (c *Client) GetJob(msg *GetJob, opts ...mq.RequestOption) (*GetJobResp, error) { - return mq.Request(Service.GetJob, c.rabbitCli, msg, opts...) + return mq.Request(Service.GetJob, c.roundTripper, msg, opts...) } /* diff --git a/executor/internal/reporter/reporter.go b/executor/internal/reporter/reporter.go index 79589c8..e0da653 100644 --- a/executor/internal/reporter/reporter.go +++ b/executor/internal/reporter/reporter.go @@ -48,7 +48,7 @@ func (r *Reporter) Serve() error { if err != nil { return fmt.Errorf("new manager client: %w", err) } - defer magCli.Close() + defer schglb.ManagerMQPool.Release(magCli) ticker := time.NewTicker(r.reportInterval) defer ticker.Stop() diff --git a/executor/internal/services/pcm.go b/executor/internal/services/pcm.go index 5dbf4a9..de9ffa1 100644 --- a/executor/internal/services/pcm.go +++ b/executor/internal/services/pcm.go @@ -15,7 +15,7 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes logger.Warnf("new pcm client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed") } - defer pcmCli.Close() + defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.DeleteImage(pcmsdk.DeleteImageReq{ SlwNodeID: msg.SlwNodeID, @@ -34,7 +34,7 @@ func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp, logger.Warnf("new pcm client, err: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "new pcm client failed") } - defer pcmCli.Close() + defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.DeleteTask(pcmsdk.DeleteTaskReq{ SlwNodeID: msg.SlwNodeID, diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index 9c348bc..247368e 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -44,7 +44,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) ([]stgsdk.ObjectCacheInfo, error) if err != nil { return nil, fmt.Errorf("new cloudream storage client: %w", err) } - defer stgCli.Close() + defer schglb.CloudreamStoragePool.Release(stgCli) resp, err := stgCli.CacheMovePackage(stgsdk.CacheMovePackageReq{ UserID: t.UserID, diff --git a/executor/internal/task/pcm_schedule_task.go b/executor/internal/task/pcm_schedule_task.go index 50a8f08..da009c3 100644 --- a/executor/internal/task/pcm_schedule_task.go +++ b/executor/internal/task/pcm_schedule_task.go @@ -44,7 +44,7 @@ func (t *PCMScheduleTask) do(taskID string, ctx TaskContext) error { if err != nil { return fmt.Errorf("new pcm client: %w", err) } - defer pcmCli.Close() + defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.ScheduleTask(pcmsdk.ScheduleTaskReq{ SlwNodeID: t.SlwNodeID, diff --git a/executor/internal/task/pcm_upload_img.go b/executor/internal/task/pcm_upload_img.go index b9730f8..c6f4ee5 100644 --- a/executor/internal/task/pcm_upload_img.go +++ b/executor/internal/task/pcm_upload_img.go @@ -43,7 +43,7 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error { if err != nil { return fmt.Errorf("new pcm client: %w", err) } - defer pcmCli.Close() + defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.UploadImage(pcmsdk.UploadImageReq{ SlwNodeID: t.SlwNodeID, diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go index 0427391..55c2ac2 100644 --- a/executor/internal/task/storage_create_package.go +++ b/executor/internal/task/storage_create_package.go @@ -43,7 +43,7 @@ func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error { if err != nil { return fmt.Errorf("new cloudream storage client: %w", err) } - defer stgCli.Close() + defer schglb.CloudreamStoragePool.Release(stgCli) resp, err := stgCli.StorageCreatePackage(stgsdk.StorageCreatePackageReq{ UserID: t.UserID, diff --git a/executor/internal/task/storage_load_package.go b/executor/internal/task/storage_load_package.go index a125a13..6e611bb 100644 --- a/executor/internal/task/storage_load_package.go +++ b/executor/internal/task/storage_load_package.go @@ -45,7 +45,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) error { if err != nil { return fmt.Errorf("new cloudream storage client: %w", err) } - defer stgCli.Close() + defer schglb.CloudreamStoragePool.Release(stgCli) return stgCli.StorageLoadPackage(stgsdk.StorageLoadPackageReq{ UserID: t.UserID, diff --git a/manager/internal/advisormgr/advisormgr.go b/manager/internal/advisormgr/advisormgr.go index d126334..58fcc63 100644 --- a/manager/internal/advisormgr/advisormgr.go +++ b/manager/internal/advisormgr/advisormgr.go @@ -31,7 +31,7 @@ type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string) type Manager struct { advisors map[schmod.AdvisorID]*AdvisorInfo lock sync.Mutex - advCli *advmq.PoolClient + advCli *advmq.Client onTaskUpdated OnTaskUpdatedCallbackFn onTaskTimeout OnTimeoutCallbackFn diff --git a/manager/internal/executormgr/executormgr.go b/manager/internal/executormgr/executormgr.go index 1ef2097..789c45e 100644 --- a/manager/internal/executormgr/executormgr.go +++ b/manager/internal/executormgr/executormgr.go @@ -32,7 +32,7 @@ type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string) type Manager struct { executors map[schmod.ExecutorID]*ExecutorInfo lock sync.Mutex - exeCli *exemq.PoolClient + exeCli *exemq.Client onTaskUpdated OnTaskUpdatedCallbackFn onTaskTimeout OnTimeoutCallbackFn diff --git a/manager/internal/jobmgr/adjusting_handler.go b/manager/internal/jobmgr/adjusting_handler.go index bde3e84..566c003 100644 --- a/manager/internal/jobmgr/adjusting_handler.go +++ b/manager/internal/jobmgr/adjusting_handler.go @@ -57,7 +57,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState())) return } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(adjustingState.Scheme.TargetSlwNodeID)) if err != nil { diff --git a/manager/internal/jobmgr/executing_handler.go b/manager/internal/jobmgr/executing_handler.go index 76b6bf0..24a7030 100644 --- a/manager/internal/jobmgr/executing_handler.go +++ b/manager/internal/jobmgr/executing_handler.go @@ -147,7 +147,7 @@ func (h *ExecutingHandler) onResourceJobEvent(evt event.Event, job *executingJob h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err.Error()), job.state)) return } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(tarNorJob.TargetSlwNodeID)) if err != nil { diff --git a/manager/internal/jobmgr/prescheduling_handler.go b/manager/internal/jobmgr/prescheduling_handler.go index 8f6e836..edac298 100644 --- a/manager/internal/jobmgr/prescheduling_handler.go +++ b/manager/internal/jobmgr/prescheduling_handler.go @@ -59,7 +59,7 @@ func (h *PreSchedulingHandler) Handle(job jobmod.Job) { h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState())) return } - defer colCli.Close() + defer schglb.CollectorMQPool.Release(colCli) getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(preSchState.Scheme.TargetSlwNodeID)) if err != nil {