diff --git a/common/assets/confs/advisor.config.json b/common/assets/confs/advisor.config.json index 89cf278..8172759 100644 --- a/common/assets/confs/advisor.config.json +++ b/common/assets/confs/advisor.config.json @@ -15,7 +15,11 @@ "address": "localhost:5672", "account": "cloudream", "password": "123456", - "vhost": "/" + "vhost": "/", + "param": { + "retryNum": 5, + "retryInterval": 5000 + } }, "cloudreamStorage": { "url": "http://121.36.5.116:7890" diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 714b814..1901eb7 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -15,7 +15,11 @@ "address": "localhost:5672", "account": "cloudream", "password": "123456", - "vhost": "/" + "vhost": "/", + "param": { + "retryNum": 5, + "retryInterval": 5000 + } }, "cloudreamStorage": { "url": "http://121.36.5.116:7890" diff --git a/common/assets/confs/collector.config.json b/common/assets/confs/collector.config.json index 49a36ec..6e27803 100644 --- a/common/assets/confs/collector.config.json +++ b/common/assets/confs/collector.config.json @@ -15,7 +15,11 @@ "address": "localhost:5672", "account": "cloudream", "password": "123456", - "vhost": "/" + "vhost": "/", + "param": { + "retryNum": 5, + "retryInterval": 5000 + } }, "cloudreamStorage": { "url": "http://121.36.5.116:7890" diff --git a/common/assets/confs/executor.config.json b/common/assets/confs/executor.config.json index 7ba8615..620dc5f 100644 --- a/common/assets/confs/executor.config.json +++ b/common/assets/confs/executor.config.json @@ -13,7 +13,11 @@ "address": "101.201.215.196:5672", "account": "cloudream", "password": "123456", - "vhost": "/" + "vhost": "/", + "param": { + "retryNum": 5, + "retryInterval": 5000 + } }, "cloudreamStorage": { "url": "http://121.36.5.116:7890" diff --git a/common/assets/confs/manager.config.json b/common/assets/confs/manager.config.json index 73ff695..c050ee7 100644 --- a/common/assets/confs/manager.config.json +++ b/common/assets/confs/manager.config.json @@ -15,7 +15,11 @@ "address": "localhost:5672", "account": "cloudream", "password": "123456", - "vhost": "/" + "vhost": "/", + "param": { + "retryNum": 5, + "retryInterval": 5000 + } }, "db": { "address": "101.201.215.196:3306", diff --git a/common/globals/pools.go b/common/globals/pools.go index b350c50..8bedc62 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -3,7 +3,7 @@ package schglb import ( pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor" @@ -27,12 +27,12 @@ func InitMQPool(cfg *scmq.Config) { ManagerMQPool = mgrmq.NewPool(cfg) } -var CloudreamStoragePool cdssdk.Pool -var CloudreamStorageConfig *cdssdk.Config +var CloudreamStoragePool cdsapi.Pool +var CloudreamStorageConfig *cdsapi.Config -func InitCloudreamStoragePool(cfg *cdssdk.Config) { +func InitCloudreamStoragePool(cfg *cdsapi.Config) { CloudreamStorageConfig = cfg - CloudreamStoragePool = cdssdk.NewPool(cfg) + CloudreamStoragePool = cdsapi.NewPool(cfg) } var UnifyOpsPool uopsdk.Pool diff --git a/common/pkgs/mq/executor/server.go b/common/pkgs/mq/executor/server.go index c4299c5..f79c074 100644 --- a/common/pkgs/mq/executor/server.go +++ b/common/pkgs/mq/executor/server.go @@ -2,6 +2,7 @@ package executor import ( "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/common/utils/sync2" mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) @@ -46,7 +47,7 @@ func (s *Server) Stop() { s.rabbitSvr.Close() } -func (s *Server) Serve() error { +func (s *Server) Serve() *sync2.UnboundChannel[mq.RabbitMQLogEvent] { return s.rabbitSvr.Start() } diff --git a/manager/internal/jobmgr/job/state/executing.go b/manager/internal/jobmgr/job/state/executing.go index 12b86c9..eaa524d 100644 --- a/manager/internal/jobmgr/job/state/executing.go +++ b/manager/internal/jobmgr/job/state/executing.go @@ -6,7 +6,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdsapi "gitlink.org.cn/cloudream/common/sdks/storage" schmod "gitlink.org.cn/cloudream/scheduler/common/models" "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor" "gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr" @@ -51,7 +51,7 @@ func (s *JobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmo func (s *JobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { // TODO UserID - userID := cdssdk.UserID(1) + userID := cdsapi.UserID(1) err := error(nil) switch runningJob := jo.Body.(type) { @@ -133,20 +133,20 @@ func (s *JobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { return err } -func getDataSetPathByID(packageID cdssdk.PackageID) string { +func getDataSetPathByID(packageID cdsapi.PackageID) string { // TODO 临时使用,这个路径应该来自于CDS dataSetPath := filepath.Join("packages", "1", fmt.Sprintf("%v", packageID)) return dataSetPath } -func loadDatasetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) { +func loadDatasetPackage(userID cdsapi.UserID, packageID cdsapi.PackageID, storageID cdsapi.StorageID) (string, error) { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { return "", err } defer schglb.CloudreamStoragePool.Release(stgCli) - loadPackageResp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{ + loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{ UserID: userID, PackageID: packageID, StorageID: storageID, @@ -200,7 +200,7 @@ func (s *JobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd strin } } -func (s *JobExecuting) submitDataPreprocessTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdssdk.StorageID, userID cdssdk.UserID) (string, error) { +func (s *JobExecuting) submitDataPreprocessTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdsapi.StorageID, userID cdsapi.UserID) (string, error) { objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID) if err != nil { logger.Error(err.Error()) @@ -231,7 +231,7 @@ func (s *JobExecuting) submitDataPreprocessTask(rtx jobmgr.JobStateRunContext, c return tskStatus.InstanceID, nil } -func (s *JobExecuting) submitFinetuningTask(userID cdssdk.UserID, rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdssdk.StorageID, runningJob *job.NormalJob) error { +func (s *JobExecuting) submitFinetuningTask(userID cdsapi.UserID, rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, storageID cdsapi.StorageID, runningJob *job.NormalJob) error { objectStorage, modelInfo, err := getModelInfoAndObjectStorage(rtx, runningJob.Info.ModelJobInfo.ModelID, storageID) if err != nil { @@ -265,7 +265,7 @@ func (s *JobExecuting) submitFinetuningTask(userID cdssdk.UserID, rtx jobmgr.Job } func (s *JobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job, runningJob *job.InstanceJob, ccInfo schmod.ComputingCenter, - storageID cdssdk.StorageID, userID cdssdk.UserID, envs []schsdk.KVPair) error { + storageID cdsapi.StorageID, userID cdsapi.UserID, envs []schsdk.KVPair) error { modelJobInfo := runningJob.Info.ModelJobInfo @@ -370,7 +370,7 @@ func (s *JobExecuting) submitInstanceTask(rtx jobmgr.JobStateRunContext, jo *job } } -func getModelInfoAndObjectStorage(rtx jobmgr.JobStateRunContext, modelID schsdk.ModelID, storageID cdssdk.StorageID) (*schmod.ObjectStorage, *schmod.ModelResource, error) { +func getModelInfoAndObjectStorage(rtx jobmgr.JobStateRunContext, modelID schsdk.ModelID, storageID cdsapi.StorageID) (*schmod.ObjectStorage, *schmod.ModelResource, error) { objectStorage, err := rtx.Mgr.DB.ObjectStorage().GetObjectStorageByStorageID(rtx.Mgr.DB.SQLCtx(), storageID) if err != nil { logger.Error(err.Error()) @@ -426,7 +426,7 @@ func getRuntimeCommand(runtime schsdk.JobRuntimeInfo, dataSetPath string, output return cmd, envs } -func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdssdk.UserID) (*schmod.ComputingCenter, *cdssdk.StorageGetResp, error) { +func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, userID cdsapi.UserID) (*schmod.ComputingCenter, *cdsapi.StorageGetResp, error) { ccInfo, err := rtx.Mgr.DB.ComputingCenter().GetByID(rtx.Mgr.DB.SQLCtx(), targetCCID) if err != nil { return nil, nil, fmt.Errorf("getting computing center info: %w", err) @@ -437,7 +437,7 @@ func getCCInfoAndStgInfo(rtx jobmgr.JobStateRunContext, targetCCID schsdk.CCID, return nil, nil, fmt.Errorf("new cds client: %w", err) } defer schglb.CloudreamStoragePool.Release(stgCli) - getStg, err := stgCli.StorageGet(cdssdk.StorageGet{ + getStg, err := stgCli.StorageGet(cdsapi.StorageGet{ UserID: userID, StorageID: ccInfo.CDSStorageID, }) @@ -470,7 +470,7 @@ func (s *DataReturnJobExecuting) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr. func (s *DataReturnJobExecuting) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) error { reJob := jo.Body.(*job.DataReturnJob) - userID := cdssdk.UserID(1) + userID := cdsapi.UserID(1) log := logger.WithType[JobExecuting]("State").WithField("JobID", jo.JobID)