新增上传状态接口

This commit is contained in:
JeshuaRen 2025-02-18 14:57:17 +08:00
parent afa768a8db
commit 9cd704cdd9
8 changed files with 190 additions and 29 deletions

View File

@ -8,6 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
cltmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
@ -31,6 +32,12 @@ var BlockChainPool blockchain.Pool
var BlockChainConfig *blockchain.Config
var UploadCache map[string][]schmod.UploadStatus
func InitUploadCache() {
UploadCache = make(map[string][]schmod.UploadStatus)
}
func InitBlockChainPool(cfg *blockchain.Config) {
BlockChainPool = blockchain.NewPool(cfg)
BlockChainConfig = cfg

View File

@ -2,6 +2,7 @@ package schmod
import (
"fmt"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
"time"
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
@ -159,3 +160,10 @@ const (
// FileHash string `json:"fileHash"`
// FileSize int64 `json:"fileSize"`
//}
type UploadStatus struct {
ID int64 `json:"id"`
Status string `json:"status"`
Message string `json:"message"`
UploadInfo sch.UploadInfo `json:"uploadInfo"`
}

View File

@ -2515,3 +2515,63 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the
2025-02-17 14:57:16 [WARN] [HTTP:JobSet.Binding] getting service list: no package found
2025-02-17 16:01:26 [INFO] start serving http at: :7891
2025-02-17 16:05:19 [DEBU] uploading job
2025-02-17 16:10:00 [INFO] start serving http at: :7891
2025-02-17 16:10:16 [DEBU] uploading job
2025-02-17 16:10:21 [ERRO] upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID
2025-02-17 16:10:26 [INFO] jobID: %s change state from %s to %s0&{1 0xc000166230 code 0xc000534330 {1 0}} &{0xc0001fe500}
2025-02-17 16:10:27 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-17 16:10:27 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID
2025-02-17 16:10:27 [INFO] job set 0 completed
2025-02-17 16:11:31 [DEBU] uploading job
2025-02-17 16:12:19 [ERRO] upload data: code: 500, message:
2025-02-17 16:12:19 [INFO] jobID: %s change state from %s to %s1&{1 0xc00008c280 code 0xc0000080d8 {1 0}} &{0xc0004c45a0}
2025-02-17 16:12:19 [INFO] [JobID:1] state changed: *state2.DataUpload -> *state.Completed
2025-02-17 16:12:19 [INFO] [JobID:1] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-17 16:12:19 [INFO] job set 1 completed
2025-02-17 16:12:37 [DEBU] uploading job
2025-02-17 16:12:37 [ERRO] upload data: code: 500, message:
2025-02-17 16:12:37 [INFO] jobID: %s change state from %s to %s2&{1 0xc00008c7d0 code 0xc000535518 {1 0}} &{0xc0001fe1e0}
2025-02-17 16:12:37 [INFO] [JobID:2] state changed: *state2.DataUpload -> *state.Completed
2025-02-17 16:12:37 [INFO] [JobID:2] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-17 16:12:37 [INFO] job set 2 completed
2025-02-17 16:20:23 [DEBU] uploading job
2025-02-17 16:20:23 [WARN] object is nil
2025-02-17 16:20:24 [ERRO] blockchain: invoke blockchain: code: 401, message: 暂未登录或token已经过期
2025-02-17 16:20:24 [INFO] jobID: %s change state from %s to %s3&{1 0xc000200400 dataset 0xc0005341e0 {1 0}} &{0xc000249c60}
2025-02-17 16:20:24 [INFO] [JobID:3] state changed: *state2.DataUpload -> *state.Completed
2025-02-17 16:20:24 [INFO] [JobID:3] [LastState:*state2.DataUpload] job failed with: blockchain: invoke blockchain: code: 401, message: 暂未登录或token已经过期
2025-02-17 16:20:24 [INFO] job set 3 completed
2025-02-17 16:20:35 [DEBU] uploading job
2025-02-17 16:20:35 [ERRO] upload data: code: 500, message:
2025-02-17 16:20:35 [INFO] jobID: %s change state from %s to %s4&{1 0xc00008c280 code 0xc0000080f0 {1 0}} &{0xc0001feaa0}
2025-02-17 16:20:35 [INFO] [JobID:4] state changed: *state2.DataUpload -> *state.Completed
2025-02-17 16:20:35 [INFO] [JobID:4] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-17 16:20:35 [INFO] job set 4 completed
2025-02-18 10:41:53 [INFO] start serving http at: :7891
2025-02-18 10:42:49 [WARN] [HTTP:JobSet.Binding] getting service list: no package found
2025-02-18 10:45:33 [WARN] [HTTP:JobSet.Binding] getting service list: no package found
2025-02-18 10:49:28 [INFO] start serving http at: :7891
2025-02-18 10:55:26 [INFO] start serving http at: :7891
2025-02-18 10:55:56 [WARN] [HTTP:JobSet.Binding] getting service list: no package found
2025-02-18 14:20:53 [INFO] start serving http at: :7891
2025-02-18 14:21:25 [WARN] [HTTP:JobSet.QueryBinding] getting service list: json: cannot unmarshal string into Go struct field CodeBinding.imageID of type int64
2025-02-18 14:40:41 [INFO] start serving http at: :7891
2025-02-18 14:41:06 [DEBU] uploading job
2025-02-18 14:41:54 [INFO] start serving http at: :7891
2025-02-18 14:41:58 [DEBU] uploading job
2025-02-18 14:42:28 [INFO] start serving http at: :7891
2025-02-18 14:42:31 [DEBU] uploading job
2025-02-18 14:43:01 [INFO] start serving http at: :7891
2025-02-18 14:43:05 [DEBU] uploading job
2025-02-18 14:43:16 [INFO] start serving http at: :7891
2025-02-18 14:43:17 [DEBU] uploading job
2025-02-18 14:44:21 [INFO] start serving http at: :7891
2025-02-18 14:44:28 [DEBU] uploading job
2025-02-18 14:45:06 [INFO] start serving http at: :7891
2025-02-18 14:47:35 [DEBU] uploading job
2025-02-18 14:48:12 [INFO] start serving http at: :7891
2025-02-18 14:48:14 [DEBU] uploading job
2025-02-18 14:49:12 [INFO] start serving http at: :7891
2025-02-18 14:49:42 [DEBU] uploading job
2025-02-18 14:50:45 [INFO] start serving http at: :7891
2025-02-18 14:50:48 [DEBU] uploading job

View File

@ -51,6 +51,7 @@ func serve(configPath string, address string) {
schglb.InitUploaderPool(&config.Cfg().Uploader)
schglb.InitBlockChainPool(&config.Cfg().BlockChain)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
schglb.InitUploadCache()
dbSvc, err := db.NewDB(&config.Cfg().DB)
if err != nil {

View File

@ -9,9 +9,12 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
"gitlink.org.cn/cloudream/common/utils/serder"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task"
"io"
"net/http"
"strconv"
)
type JobSetService struct {
@ -206,6 +209,58 @@ func (s *JobSetService) Upload(ctx *gin.Context) {
}
}
type UploadStatusReq struct {
Operate string `json:"operate"`
ID int64 `json:"id"`
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
DataType string `json:"dataType"`
}
func (s *JobSetService) UploadStatus(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.Upload")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[UploadStatusReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
key := strconv.FormatInt(int64(req.UserID), 10) + strconv.FormatInt(int64(req.PackageID), 10) + req.DataType
if req.Operate == sch.Query {
if val, ok := schglb.UploadCache[key]; ok {
ctx.JSON(http.StatusOK, OK(val))
return
} else {
ctx.JSON(http.StatusOK, OK([]schmod.UploadStatus{}))
return
}
} else if req.Operate == sch.Delete {
if val, ok := schglb.UploadCache[key]; ok {
for _, s := range val {
if s.ID == req.ID {
val = append(val[:s.ID], val[s.ID+1:]...)
schglb.UploadCache[key] = val
break
}
}
ctx.JSON(http.StatusOK, OK("ok"))
return
} else {
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "not found"))
return
}
}
}
type CreateFolderReq struct {
PackageID cdssdk.PackageID `json:"packageID"`
Path string `json:"path"`

View File

@ -39,6 +39,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() {
s.engine.POST("/jobSet/notifyUploaded", s.JobSetSvc().Upload)
s.engine.POST("/jobSet/uploadStatus", s.JobSetSvc().UploadStatus)
s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit)
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
s.engine.POST("/jobSet/queryUploaded", s.JobSetSvc().QueryUploaded)

View File

@ -26,6 +26,7 @@ type DataUpload struct {
uploadInfo sch.UploadInfo
dataType string
blockChainToken string
uploadID int64
//storages []cdssdk.StorageID
task *jobTask.JobTask[sch.TaskMessage]
lock sync.Mutex
@ -62,6 +63,7 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
// 通过本地上传
case *sch.LocalUploadInfo:
objectIDs = info.ObjectIDs
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!")
// 通过URL上传
case *sch.RemoteUploadInfo:
@ -88,36 +90,22 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
}
uploadResp, err := uploaderCli.Upload(req)
if err != nil {
message := sch.TaskMessage{
Status: sch.FailedStatus,
Message: fmt.Sprintf("upload data: %w", err),
}
s.task.Send(message)
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("upload data: %s", err.Error()))
return fmt.Errorf("upload data: %w", err)
}
if uploadResp.JsonData != "" {
err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, uploadResp.JsonData, -1)
if err != nil {
message := sch.TaskMessage{
Status: sch.FailedStatus,
Message: fmt.Sprintf("update package: %w", err),
}
s.task.Send(message)
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("update package: %w", err))
return fmt.Errorf("update package: %w", err)
}
}
objectIDs = uploadResp.ObjectIDs
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!")
}
message := sch.TaskMessage{
Status: sch.SuccessStatus,
Message: "upload success!",
}
s.task.Send(message)
// 传入存证
blockChains, err := s.blockChain(objectIDs)
if err != nil {
@ -132,6 +120,36 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
return nil
}
func (s *DataUpload) sendStatus(userID cdssdk.UserID, packageID cdssdk.PackageID, dataType string, UploadInfo sch.UploadInfo, status string, msg string) {
key := strconv.FormatInt(int64(userID), 10) + strconv.FormatInt(int64(packageID), 10) + dataType
if val, ok := schglb.UploadCache[key]; ok {
val = append(val, schmod.UploadStatus{
ID: s.uploadID,
Status: status,
Message: msg,
UploadInfo: UploadInfo,
})
schglb.UploadCache[key] = val
} else {
schglb.UploadCache[key] = []schmod.UploadStatus{
{
ID: s.uploadID,
Status: status,
Message: msg,
UploadInfo: UploadInfo,
},
}
}
s.uploadID++
message := sch.TaskMessage{
Status: status,
Message: msg,
}
s.task.Send(message)
}
func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.BlockChain, error) {
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {

View File

@ -441,13 +441,10 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI
switch bd := info.(type) {
case *sch.DatasetBinding:
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageID)
pkg, err := svc.queryPackage(bd.PackageID)
if err != nil {
return err
}
if pkg.PackageID == 0 {
return fmt.Errorf("no package found")
}
filePath := sch.Split + sch.DATASET + sch.Split + pkg.PackageName
ds := &dataset.Dataset{Name: bd.Name, Description: bd.Description, FilePath: filePath, Category: dataset.CommonValue(bd.Category)}
@ -467,13 +464,10 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData))
packageID = bd.PackageID
case *sch.CodeBinding:
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageID)
pkg, err := svc.queryPackage(bd.PackageID)
if err != nil {
return err
}
if pkg.PackageID == 0 {
return fmt.Errorf("no package found")
}
filePath := sch.Split + sch.CODE + sch.Split + pkg.PackageName
engine := `
@ -509,13 +503,10 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), "")
packageID = bd.PackageID
case *sch.ModelBinding:
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageID)
pkg, err := svc.queryPackage(bd.PackageID)
if err != nil {
return err
}
if pkg.PackageID == 0 {
return fmt.Errorf("no package found")
}
filePath := sch.Split + sch.MODEL + sch.Split + pkg.PackageName
engine := "TensorFlow"
@ -565,6 +556,26 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI
return nil
}
func (svc *JobSetService) queryPackage(pacakgeID cdssdk.PackageID) (*uploadersdk.PackageDAO, error) {
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), pacakgeID)
if err != nil {
return nil, err
}
if pkg.PackageID == 0 {
return nil, fmt.Errorf("no package found")
}
if pkg.BindingID != -1 {
binding, err := svc.db.UploadData().GetBindingsByID(svc.db.DefCtx(), pkg.BindingID)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("binding already exists, name: " + binding.Name)
}
return &pkg, nil
}
func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string, jsonData string) uploadersdk.Binding {
bindingData := uploadersdk.Binding{
ID: id,