From a9e69e8fd7e0d53306de35be55ecf3482292b6e8 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 21 Feb 2025 17:22:42 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/assets/confs/test.json | 23 ++++++------ log/schedulerclient.log | 19 ++++++++++ schedulerMiddleware/internal/cmdline/serve.go | 8 +++-- schedulerMiddleware/internal/http/jobset.go | 36 +++++++++---------- .../internal/services/jobset.go | 15 +++++++- 5 files changed, 69 insertions(+), 32 deletions(-) diff --git a/common/assets/confs/test.json b/common/assets/confs/test.json index fa0bb45..f2d1baf 100644 --- a/common/assets/confs/test.json +++ b/common/assets/confs/test.json @@ -1,15 +1,16 @@ { "userID": 5, - "type": "children", - "param": { - "clusterID": "1865927992266461184", - "name": "zeeTest", - "description": "asd", - "packageID": 1151, - "filePath": "/222.txt", - "packageName": "anzetest", - "bootstrapObjectID": 50085, - "parentImageID": 5, - "imageID": "1234" + "packageID": 1191, + "uploadParams": { + "dataType": "code", + "uploadInfo": { + "type": "url", + "url": "https://gitlink.org.cn/keytoolazy/adblock_auto.git", + "clusterID": [ + "1790300942428540928" + ], + "packageID": 1191, + "dataName": "adblock_auto" + } } } \ No newline at end of file diff --git a/log/schedulerclient.log b/log/schedulerclient.log index 90cd5f6..6ba249a 100644 --- a/log/schedulerclient.log +++ b/log/schedulerclient.log @@ -2646,3 +2646,22 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the 2025-02-21 15:33:24 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed 2025-02-21 15:33:24 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: blockchain: list objects: Post "http://localhost:32010/object/listByIDs": dial tcp [::1]:32010: connectex: No connection could be made because the target machine actively refused it. 2025-02-21 15:33:24 [INFO] job set 0 completed +2025-02-21 15:33:36 [INFO] start serving http at: :7891 +2025-02-21 15:34:35 [WARN] [HTTP:JobSet.CreateFolder] creating folder: clone package: Post "http://localhost:32010/package/clone": dial tcp [::1]:32010: connectex: No connection could be made because the target machine actively refused it. +2025-02-21 15:40:09 [INFO] start serving http at: :7891 +2025-02-21 15:40:27 [DEBU] uploading job +2025-02-21 15:41:22 [ERRO] blockchain: invoke blockchain: code: 401, message: 暂未登录或token已经过期 +2025-02-21 15:41:22 [INFO] jobID: %s change state from %s to %s0&{5 1161 0xc0002420a0 code 1 0xc000464390 {{} {1 0}} 0xc0004b8990} &{0xc000226640} +2025-02-21 15:41:22 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-21 15:41:22 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: blockchain: invoke blockchain: code: 401, message: 暂未登录或token已经过期 +2025-02-21 15:41:22 [INFO] job set 0 completed +2025-02-21 16:25:33 [INFO] start serving http at: :7891 +2025-02-21 16:28:19 [DEBU] uploading job +2025-02-21 16:32:03 [INFO] start serving http at: :7891 +2025-02-21 16:32:06 [DEBU] uploading job +2025-02-21 16:33:22 [INFO] start serving http at: :7891 +2025-02-21 16:33:55 [DEBU] uploading job +2025-02-21 16:36:30 [INFO] start serving http at: :7891 +2025-02-21 16:37:27 [DEBU] uploading job +2025-02-21 16:37:35 [INFO] start serving http at: :7891 +2025-02-21 16:39:13 [DEBU] uploading job diff --git a/schedulerMiddleware/internal/cmdline/serve.go b/schedulerMiddleware/internal/cmdline/serve.go index 071907f..6e22c85 100644 --- a/schedulerMiddleware/internal/cmdline/serve.go +++ b/schedulerMiddleware/internal/cmdline/serve.go @@ -68,8 +68,12 @@ func serve(configPath string, address string) { hubConfig := &pcmHubConfig.Config{ Platforms: config.Cfg().PCMHub, } - //hubClient, err := pcmHubClient.NewClient(hubConfig) - hubClient := pcmHubClient.InitGeneralClient(hubConfig, -1) + + hubClient, err := pcmHubClient.InitGeneralClient(hubConfig, -1) + if err != nil { + logger.Fatalf("new hub client failed, err: %s", err.Error()) + os.Exit(1) + } svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient) if err != nil { diff --git a/schedulerMiddleware/internal/http/jobset.go b/schedulerMiddleware/internal/http/jobset.go index 4181569..659b01f 100644 --- a/schedulerMiddleware/internal/http/jobset.go +++ b/schedulerMiddleware/internal/http/jobset.go @@ -50,7 +50,7 @@ func (s *JobSetService) Submit(ctx *gin.Context) { req, err := serder.JSONToObjectEx[JobSetSubmitReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -117,7 +117,7 @@ func (s *JobSetService) Upload(ctx *gin.Context) { req, err := serder.JSONToObjectEx[UploadReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -167,7 +167,7 @@ func (s *JobSetService) UploadStatus(ctx *gin.Context) { req, err := serder.JSONToObjectEx[UploadStatusReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -216,7 +216,7 @@ func (s *JobSetService) CreateFolder(ctx *gin.Context) { req, err := serder.JSONToObjectEx[CreateFolderReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -248,7 +248,7 @@ func (s *JobSetService) DeleteFile(ctx *gin.Context) { req, err := serder.JSONToObjectEx[DeleteFileReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -281,7 +281,7 @@ func (s *JobSetService) DeleteFolder(ctx *gin.Context) { req, err := serder.JSONToObjectEx[DeleteFolderReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -320,7 +320,7 @@ func (s *JobSetService) QueryUploaded(ctx *gin.Context) { req, err := serder.JSONToObjectEx[QueryUploadedReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -358,7 +358,7 @@ func (s *JobSetService) Binding(ctx *gin.Context) { req, err := serder.JSONToObjectEx[BindingReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -389,7 +389,7 @@ func (s *JobSetService) RemoveBinding(ctx *gin.Context) { req, err := serder.JSONToObjectEx[RemoveBindingReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -425,7 +425,7 @@ func (s *JobSetService) QueryBinding(ctx *gin.Context) { req, err := serder.JSONToObjectEx[QueryBindingReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -460,7 +460,7 @@ func (s *JobSetService) CreatePackage(ctx *gin.Context) { req, err := serder.JSONToObjectEx[PackageCreate](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -492,7 +492,7 @@ func (s *JobSetService) DeletePackage(ctx *gin.Context) { req, err := serder.JSONToObjectEx[PackageDelete](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -527,7 +527,7 @@ func (s *JobSetService) QueryResource(ctx *gin.Context) { req, err := serder.JSONToObjectEx[QueryResourceReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -564,7 +564,7 @@ func (s *JobSetService) ResourceRange(ctx *gin.Context) { req, err := serder.JSONToObjectEx[QueryResourceRangeReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -603,7 +603,7 @@ func (s *JobSetService) QueryImages(ctx *gin.Context) { req, err := serder.JSONToObjectEx[QueryImagesReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -642,7 +642,7 @@ func (s *JobSetService) ClonePackage(ctx *gin.Context) { req, err := serder.JSONToObjectEx[ClonePackageReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -681,7 +681,7 @@ func (s *JobSetService) QueryClonePackage(ctx *gin.Context) { req, err := serder.JSONToObjectEx[QueryClonePackageReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } @@ -716,7 +716,7 @@ func (s *JobSetService) RemoveClonePackage(ctx *gin.Context) { req, err := serder.JSONToObjectEx[RemoveClonePackageReq](bodyData) if err != nil { log.Warnf("parsing request body: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed, error: "+err.Error())) return } diff --git a/schedulerMiddleware/internal/services/jobset.go b/schedulerMiddleware/internal/services/jobset.go index 56af49a..8556b9f 100644 --- a/schedulerMiddleware/internal/services/jobset.go +++ b/schedulerMiddleware/internal/services/jobset.go @@ -521,6 +521,15 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI var jsonData []byte switch clusterInfo.ClusterName { case sch.PlatformModelArts: + engine := algorithm.Engine{ + EngineName: "Ascend-Powered-Engine", + EngineVersion: "", // 从URL里提取 + ImageUrl: "", // 从数据库中获取 + InstallSysPackages: true, + } + + println(engine) + resp, err := svc.hubClient.ModelArts.BindAlgorithm(codeParam) if err != nil { return err @@ -530,11 +539,15 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI return err } case sch.PlatformOpenI: - codeParam.Branch = "master" + //openi不需要传Engine resp, err := svc.hubClient.OpenI.BindAlgorithm(codeParam) if err != nil { return err } + err = svc.hubClient.UploadOpenIAlgorithm(codeParam, int(bd.PackageID)) + if err != nil { + return err + } jsonData, err = json.Marshal(resp.Data) if err != nil { return err