diff --git a/common/globals/pools.go b/common/globals/pools.go index 40c150e..0f44ec2 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor" cltmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" @@ -31,6 +32,12 @@ var BlockChainPool blockchain.Pool var BlockChainConfig *blockchain.Config +var UploadCache map[string][]schmod.UploadStatus + +func InitUploadCache() { + UploadCache = make(map[string][]schmod.UploadStatus) +} + func InitBlockChainPool(cfg *blockchain.Config) { BlockChainPool = blockchain.NewPool(cfg) BlockChainConfig = cfg diff --git a/common/models/models.go b/common/models/models.go index 71d0a85..a71e434 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -2,6 +2,7 @@ package schmod import ( "fmt" + sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" "time" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" @@ -159,3 +160,10 @@ const ( // FileHash string `json:"fileHash"` // FileSize int64 `json:"fileSize"` //} + +type UploadStatus struct { + ID int64 `json:"id"` + Status string `json:"status"` + Message string `json:"message"` + UploadInfo sch.UploadInfo `json:"uploadInfo"` +} diff --git a/log/schedulerclient.log b/log/schedulerclient.log index 1aa7ed6..50b3c60 100644 --- a/log/schedulerclient.log +++ b/log/schedulerclient.log @@ -2515,3 +2515,63 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the 2025-02-17 14:57:16 [WARN] [HTTP:JobSet.Binding] getting service list: no package found 2025-02-17 16:01:26 [INFO] start serving http at: :7891 2025-02-17 16:05:19 [DEBU] uploading job +2025-02-17 16:10:00 [INFO] start serving http at: :7891 +2025-02-17 16:10:16 [DEBU] uploading job +2025-02-17 16:10:21 [ERRO] upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID +2025-02-17 16:10:26 [INFO] jobID: %s change state from %s to %s0&{1 0xc000166230 code 0xc000534330 {1 0}} &{0xc0001fe500} +2025-02-17 16:10:27 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-17 16:10:27 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID +2025-02-17 16:10:27 [INFO] job set 0 completed +2025-02-17 16:11:31 [DEBU] uploading job +2025-02-17 16:12:19 [ERRO] upload data: code: 500, message: +2025-02-17 16:12:19 [INFO] jobID: %s change state from %s to %s1&{1 0xc00008c280 code 0xc0000080d8 {1 0}} &{0xc0004c45a0} +2025-02-17 16:12:19 [INFO] [JobID:1] state changed: *state2.DataUpload -> *state.Completed +2025-02-17 16:12:19 [INFO] [JobID:1] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-17 16:12:19 [INFO] job set 1 completed +2025-02-17 16:12:37 [DEBU] uploading job +2025-02-17 16:12:37 [ERRO] upload data: code: 500, message: +2025-02-17 16:12:37 [INFO] jobID: %s change state from %s to %s2&{1 0xc00008c7d0 code 0xc000535518 {1 0}} &{0xc0001fe1e0} +2025-02-17 16:12:37 [INFO] [JobID:2] state changed: *state2.DataUpload -> *state.Completed +2025-02-17 16:12:37 [INFO] [JobID:2] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-17 16:12:37 [INFO] job set 2 completed +2025-02-17 16:20:23 [DEBU] uploading job +2025-02-17 16:20:23 [WARN] object is nil +2025-02-17 16:20:24 [ERRO] blockchain: invoke blockchain: code: 401, message: 暂未登录或token已经过期 +2025-02-17 16:20:24 [INFO] jobID: %s change state from %s to %s3&{1 0xc000200400 dataset 0xc0005341e0 {1 0}} &{0xc000249c60} +2025-02-17 16:20:24 [INFO] [JobID:3] state changed: *state2.DataUpload -> *state.Completed +2025-02-17 16:20:24 [INFO] [JobID:3] [LastState:*state2.DataUpload] job failed with: blockchain: invoke blockchain: code: 401, message: 暂未登录或token已经过期 +2025-02-17 16:20:24 [INFO] job set 3 completed +2025-02-17 16:20:35 [DEBU] uploading job +2025-02-17 16:20:35 [ERRO] upload data: code: 500, message: +2025-02-17 16:20:35 [INFO] jobID: %s change state from %s to %s4&{1 0xc00008c280 code 0xc0000080f0 {1 0}} &{0xc0001feaa0} +2025-02-17 16:20:35 [INFO] [JobID:4] state changed: *state2.DataUpload -> *state.Completed +2025-02-17 16:20:35 [INFO] [JobID:4] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-17 16:20:35 [INFO] job set 4 completed +2025-02-18 10:41:53 [INFO] start serving http at: :7891 +2025-02-18 10:42:49 [WARN] [HTTP:JobSet.Binding] getting service list: no package found +2025-02-18 10:45:33 [WARN] [HTTP:JobSet.Binding] getting service list: no package found +2025-02-18 10:49:28 [INFO] start serving http at: :7891 +2025-02-18 10:55:26 [INFO] start serving http at: :7891 +2025-02-18 10:55:56 [WARN] [HTTP:JobSet.Binding] getting service list: no package found +2025-02-18 14:20:53 [INFO] start serving http at: :7891 +2025-02-18 14:21:25 [WARN] [HTTP:JobSet.QueryBinding] getting service list: json: cannot unmarshal string into Go struct field CodeBinding.imageID of type int64 +2025-02-18 14:40:41 [INFO] start serving http at: :7891 +2025-02-18 14:41:06 [DEBU] uploading job +2025-02-18 14:41:54 [INFO] start serving http at: :7891 +2025-02-18 14:41:58 [DEBU] uploading job +2025-02-18 14:42:28 [INFO] start serving http at: :7891 +2025-02-18 14:42:31 [DEBU] uploading job +2025-02-18 14:43:01 [INFO] start serving http at: :7891 +2025-02-18 14:43:05 [DEBU] uploading job +2025-02-18 14:43:16 [INFO] start serving http at: :7891 +2025-02-18 14:43:17 [DEBU] uploading job +2025-02-18 14:44:21 [INFO] start serving http at: :7891 +2025-02-18 14:44:28 [DEBU] uploading job +2025-02-18 14:45:06 [INFO] start serving http at: :7891 +2025-02-18 14:47:35 [DEBU] uploading job +2025-02-18 14:48:12 [INFO] start serving http at: :7891 +2025-02-18 14:48:14 [DEBU] uploading job +2025-02-18 14:49:12 [INFO] start serving http at: :7891 +2025-02-18 14:49:42 [DEBU] uploading job +2025-02-18 14:50:45 [INFO] start serving http at: :7891 +2025-02-18 14:50:48 [DEBU] uploading job diff --git a/schedulerMiddleware/internal/cmdline/serve.go b/schedulerMiddleware/internal/cmdline/serve.go index 291746a..1556c32 100644 --- a/schedulerMiddleware/internal/cmdline/serve.go +++ b/schedulerMiddleware/internal/cmdline/serve.go @@ -51,6 +51,7 @@ func serve(configPath string, address string) { schglb.InitUploaderPool(&config.Cfg().Uploader) schglb.InitBlockChainPool(&config.Cfg().BlockChain) schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage) + schglb.InitUploadCache() dbSvc, err := db.NewDB(&config.Cfg().DB) if err != nil { diff --git a/schedulerMiddleware/internal/http/jobset.go b/schedulerMiddleware/internal/http/jobset.go index 89c520a..78f08f3 100644 --- a/schedulerMiddleware/internal/http/jobset.go +++ b/schedulerMiddleware/internal/http/jobset.go @@ -9,9 +9,12 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader" "gitlink.org.cn/cloudream/common/utils/serder" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task" "io" "net/http" + "strconv" ) type JobSetService struct { @@ -206,6 +209,58 @@ func (s *JobSetService) Upload(ctx *gin.Context) { } } +type UploadStatusReq struct { + Operate string `json:"operate"` + ID int64 `json:"id"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + DataType string `json:"dataType"` +} + +func (s *JobSetService) UploadStatus(ctx *gin.Context) { + log := logger.WithField("HTTP", "JobSet.Upload") + + bodyData, err := io.ReadAll(ctx.Request.Body) + if err != nil { + log.Warnf("reading request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed")) + return + } + req, err := serder.JSONToObjectEx[UploadStatusReq](bodyData) + if err != nil { + log.Warnf("parsing request body: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed")) + return + } + + key := strconv.FormatInt(int64(req.UserID), 10) + strconv.FormatInt(int64(req.PackageID), 10) + req.DataType + if req.Operate == sch.Query { + if val, ok := schglb.UploadCache[key]; ok { + ctx.JSON(http.StatusOK, OK(val)) + return + } else { + ctx.JSON(http.StatusOK, OK([]schmod.UploadStatus{})) + return + } + } else if req.Operate == sch.Delete { + if val, ok := schglb.UploadCache[key]; ok { + for _, s := range val { + if s.ID == req.ID { + val = append(val[:s.ID], val[s.ID+1:]...) + schglb.UploadCache[key] = val + break + } + } + ctx.JSON(http.StatusOK, OK("ok")) + return + } else { + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "not found")) + return + } + } + +} + type CreateFolderReq struct { PackageID cdssdk.PackageID `json:"packageID"` Path string `json:"path"` diff --git a/schedulerMiddleware/internal/http/server.go b/schedulerMiddleware/internal/http/server.go index 0112def..1743f01 100644 --- a/schedulerMiddleware/internal/http/server.go +++ b/schedulerMiddleware/internal/http/server.go @@ -39,6 +39,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.POST("/jobSet/notifyUploaded", s.JobSetSvc().Upload) + s.engine.POST("/jobSet/uploadStatus", s.JobSetSvc().UploadStatus) s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit) s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded) s.engine.POST("/jobSet/queryUploaded", s.JobSetSvc().QueryUploaded) diff --git a/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go b/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go index cefbbcd..7a5d8fe 100644 --- a/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go +++ b/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go @@ -26,6 +26,7 @@ type DataUpload struct { uploadInfo sch.UploadInfo dataType string blockChainToken string + uploadID int64 //storages []cdssdk.StorageID task *jobTask.JobTask[sch.TaskMessage] lock sync.Mutex @@ -62,6 +63,7 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error { // 通过本地上传 case *sch.LocalUploadInfo: objectIDs = info.ObjectIDs + s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!") // 通过URL上传 case *sch.RemoteUploadInfo: @@ -88,36 +90,22 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error { } uploadResp, err := uploaderCli.Upload(req) if err != nil { - message := sch.TaskMessage{ - Status: sch.FailedStatus, - Message: fmt.Sprintf("upload data: %w", err), - } - s.task.Send(message) + s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("upload data: %s", err.Error())) return fmt.Errorf("upload data: %w", err) } if uploadResp.JsonData != "" { err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, uploadResp.JsonData, -1) if err != nil { - message := sch.TaskMessage{ - Status: sch.FailedStatus, - Message: fmt.Sprintf("update package: %w", err), - } - s.task.Send(message) + s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("update package: %w", err)) return fmt.Errorf("update package: %w", err) } } objectIDs = uploadResp.ObjectIDs - + s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!") } - message := sch.TaskMessage{ - Status: sch.SuccessStatus, - Message: "upload success!", - } - s.task.Send(message) - // 传入存证 blockChains, err := s.blockChain(objectIDs) if err != nil { @@ -132,6 +120,36 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error { return nil } +func (s *DataUpload) sendStatus(userID cdssdk.UserID, packageID cdssdk.PackageID, dataType string, UploadInfo sch.UploadInfo, status string, msg string) { + + key := strconv.FormatInt(int64(userID), 10) + strconv.FormatInt(int64(packageID), 10) + dataType + if val, ok := schglb.UploadCache[key]; ok { + val = append(val, schmod.UploadStatus{ + ID: s.uploadID, + Status: status, + Message: msg, + UploadInfo: UploadInfo, + }) + schglb.UploadCache[key] = val + } else { + schglb.UploadCache[key] = []schmod.UploadStatus{ + { + ID: s.uploadID, + Status: status, + Message: msg, + UploadInfo: UploadInfo, + }, + } + } + s.uploadID++ + + message := sch.TaskMessage{ + Status: status, + Message: msg, + } + s.task.Send(message) +} + func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.BlockChain, error) { cdsCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { diff --git a/schedulerMiddleware/internal/services/jobset.go b/schedulerMiddleware/internal/services/jobset.go index b420d12..725c5da 100644 --- a/schedulerMiddleware/internal/services/jobset.go +++ b/schedulerMiddleware/internal/services/jobset.go @@ -441,13 +441,10 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI switch bd := info.(type) { case *sch.DatasetBinding: - pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageID) + pkg, err := svc.queryPackage(bd.PackageID) if err != nil { return err } - if pkg.PackageID == 0 { - return fmt.Errorf("no package found") - } filePath := sch.Split + sch.DATASET + sch.Split + pkg.PackageName ds := &dataset.Dataset{Name: bd.Name, Description: bd.Description, FilePath: filePath, Category: dataset.CommonValue(bd.Category)} @@ -467,13 +464,10 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData)) packageID = bd.PackageID case *sch.CodeBinding: - pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageID) + pkg, err := svc.queryPackage(bd.PackageID) if err != nil { return err } - if pkg.PackageID == 0 { - return fmt.Errorf("no package found") - } filePath := sch.Split + sch.CODE + sch.Split + pkg.PackageName engine := ` @@ -509,13 +503,10 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), "") packageID = bd.PackageID case *sch.ModelBinding: - pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageID) + pkg, err := svc.queryPackage(bd.PackageID) if err != nil { return err } - if pkg.PackageID == 0 { - return fmt.Errorf("no package found") - } filePath := sch.Split + sch.MODEL + sch.Split + pkg.PackageName engine := "TensorFlow" @@ -565,6 +556,26 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI return nil } +func (svc *JobSetService) queryPackage(pacakgeID cdssdk.PackageID) (*uploadersdk.PackageDAO, error) { + pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), pacakgeID) + if err != nil { + return nil, err + } + if pkg.PackageID == 0 { + return nil, fmt.Errorf("no package found") + } + + if pkg.BindingID != -1 { + binding, err := svc.db.UploadData().GetBindingsByID(svc.db.DefCtx(), pkg.BindingID) + if err != nil { + return nil, err + } + return nil, fmt.Errorf("binding already exists, name: " + binding.Name) + } + + return &pkg, nil +} + func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string, jsonData string) uploadersdk.Binding { bindingData := uploadersdk.Binding{ ID: id,