修改子算法接口参数

This commit is contained in:
JeshuaRen 2025-02-21 15:35:43 +08:00
parent 318d4ffcb3
commit 190bc58fdb
8 changed files with 111 additions and 188 deletions

View File

@ -46,6 +46,11 @@
"name": "ModelArts",
"APIKey": "",
"URL": "https://fwoofrxezczr.test.jointcloud.net:443"
},
{
"name": "JCS",
"APIKey": "",
"URL": "http://121.36.5.116:32010"
}
]
}

View File

@ -3,10 +3,12 @@
"type": "children",
"param": {
"clusterID": "1865927992266461184",
"name": "hello",
"name": "zeeTest",
"description": "asd",
"packageID": 1161,
"filePath": "/1111.txt",
"packageID": 1151,
"filePath": "/222.txt",
"packageName": "anzetest",
"bootstrapObjectID": 50085,
"parentImageID": 5,
"imageID": "1234"
}

View File

@ -2618,3 +2618,31 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the
2025-02-21 14:20:53 [WARN] [HTTP:JobSet.CreateFolder] creating folder: code parent package is not exists
2025-02-21 14:22:35 [WARN] [HTTP:JobSet.CreateFolder] creating folder: clone package: code: BadArgument, message: missing argument or invalid argument
2025-02-21 14:25:36 [WARN] [HTTP:JobSet.CreateFolder] creating folder: clone package: code: BadArgument, message: missing argument or invalid argument
2025-02-21 14:37:20 [INFO] start serving http at: :7891
2025-02-21 14:38:09 [DEBU] uploading job
2025-02-21 14:38:42 [DEBU] uploading job
2025-02-21 14:40:35 [INFO] start serving http at: :7891
2025-02-21 14:40:43 [DEBU] uploading job
2025-02-21 14:40:55 [ERRO] upload to pcm-hub client error: 调用jcs创建package失败: bucket is not avaiable to the user
2025-02-21 14:40:55 [INFO] jobID: %s change state from %s to %s0&{5 0xc00025a050 code 0 0xc000112be8 {{} {1 0}} 0xc0001129f0} &{0xc00051e080}
2025-02-21 14:40:55 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-21 14:40:55 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload to pcm-hub client error: 调用jcs创建package失败: bucket is not avaiable to the user
2025-02-21 14:40:55 [INFO] job set 0 completed
2025-02-21 14:45:25 [INFO] start serving http at: :7891
2025-02-21 14:45:46 [DEBU] uploading job
2025-02-21 14:48:53 [INFO] start serving http at: :7891
2025-02-21 14:48:59 [DEBU] uploading job
2025-02-21 14:59:29 [ERRO] upload to pcm-hub client error: 调用jcs创建package失败: bucket is not avaiable to the user
2025-02-21 14:59:29 [INFO] jobID: %s change state from %s to %s0&{5 0xc0001ce840 code 0 0xc000400378 {{} {1 0}} 0xc000654330} &{0xc00061e160}
2025-02-21 14:59:29 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-21 14:59:29 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload to pcm-hub client error: 调用jcs创建package失败: bucket is not avaiable to the user
2025-02-21 14:59:29 [INFO] job set 0 completed
2025-02-21 15:00:39 [INFO] start serving http at: :7891
2025-02-21 15:16:10 [INFO] start serving http at: :7891
2025-02-21 15:16:28 [DEBU] uploading job
2025-02-21 15:33:24 [ERRO] list objects: Post "http://localhost:32010/object/listByIDs": dial tcp [::1]:32010: connectex: No connection could be made because the target machine actively refused it.
2025-02-21 15:33:24 [ERRO] blockchain: list objects: Post "http://localhost:32010/object/listByIDs": dial tcp [::1]:32010: connectex: No connection could be made because the target machine actively refused it.
2025-02-21 15:33:24 [INFO] jobID: %s change state from %s to %s0&{5 1161 0xc0002ae050 code 1 0xc00014aa50 {{} {1 0}} 0xc00014aaf8} &{0xc0001f2320}
2025-02-21 15:33:24 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-21 15:33:24 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: blockchain: list objects: Post "http://localhost:32010/object/listByIDs": dial tcp [::1]:32010: connectex: No connection could be made because the target machine actively refused it.
2025-02-21 15:33:24 [INFO] job set 0 completed

View File

@ -101,72 +101,10 @@ func (s *JobSetService) LocalFileUploaded(ctx *gin.Context) {
type UploadReq struct {
UserID cdssdk.UserID `json:"userID"`
PackageID cdssdk.PackageID `json:"packageID"`
UploadParams sch.UploadParams `json:"uploadParams"`
}
//type UploadResp struct {
// JobSetID schsdk.JobSetID `json:"jobSetID"`
// LocalPath string `json:"localPath"`
// StorageIDs []cdssdk.StorageID `json:"storageIDs"`
// BucketID cdssdk.BucketID `json:"bucketID"`
//}
func (s *JobSetService) Upload2(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.Upload")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[UploadReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
blockChainToken := ctx.Request.Header.Get("Authorization")
task := jobTask.NewJobTask[sch.TaskMessage]()
_, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken, task)
if err != nil {
log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "upload file failed, error: "+err.Error()))
return
}
//ctx.JSON(http.StatusOK, OK("success"))
flusher, ok := ctx.Writer.(http.Flusher)
if !ok {
http.Error(ctx.Writer, "Streaming not supported", http.StatusInternalServerError)
return
}
ctx.Writer.Header().Set("Cache-Control", "no-cache")
ctx.Writer.Header().Set("Content-Type", "text/event-stream")
for {
fut := task.Receive()
receive := <-fut.Chan()
if receive.Value.Status == sch.FailedStatus || receive.Value.Status == sch.SuccessStatus {
//i := 1
data := "event: message\ndata: " + receive.Value.Message + "\n\n"
_, err = ctx.Writer.Write([]byte(data))
println(receive.Value.Message)
if err != nil {
break
}
flusher.Flush()
ctx.Writer.CloseNotify()
return
}
}
}
func (s *JobSetService) Upload(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.Upload")
@ -186,7 +124,7 @@ func (s *JobSetService) Upload(ctx *gin.Context) {
blockChainToken := ctx.Request.Header.Get("Authorization")
task := jobTask.NewJobTask[sch.TaskMessage]()
_, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken, task)
_, err = s.svc.JobSetSvc().Upload(req.UserID, req.PackageID, req.UploadParams, blockChainToken, task)
if err != nil {
log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "upload file failed, error: "+err.Error()))

View File

@ -112,7 +112,7 @@ func (s *DataSchedule) do(rtx jobmgr.JobStateRunContext) ([]sch.DataScheduleResu
}
}
println(scheduleTarget)
// 发送调度请求
//req := uploadersdk.DataScheduleReq{
// PackageID: data.PackageID,
@ -183,7 +183,7 @@ func (s *DataSchedule) blockChain(results []sch.DataScheduleResult) ([]*uploader
fileHash := obj.FileHash[4:]
var args = make(map[string]string)
args["userID"] = strconv.FormatInt(int64(s.userID), 10)
args["UserID"] = strconv.FormatInt(int64(s.userID), 10)
//args["fileName"] = fileName
args["fileHash"] = string(fileHash)
//args["fileSize"] = strconv.FormatInt(obj.Size, 10)

View File

@ -24,27 +24,28 @@ import (
)
type DataUpload struct {
userID cdssdk.UserID
uploadInfo sch.UploadInfo
dataType string
blockChainToken string
UserID cdssdk.UserID
PackageID cdssdk.PackageID
UploadInfo sch.UploadInfo
DataType string
BlockChainToken string
uploadID int64
//storages []cdssdk.StorageID
task *jobTask.JobTask[sch.TaskMessage]
lock sync.Mutex
hubClient *pcmHub.GeneralClient
Task *jobTask.JobTask[sch.TaskMessage]
lock sync.Mutex
HubClient *pcmHub.GeneralClient
}
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage], hubClient *pcmHub.GeneralClient) *DataUpload {
hubClient.UserId = int(userID)
return &DataUpload{
userID: userID,
uploadInfo: uploadInfo,
dataType: dataType,
blockChainToken: blockChainToken,
task: task,
hubClient: hubClient,
}
func NewDataUpload(param *DataUpload) *DataUpload {
//return &DataUpload{
// UserID: param.UserID,
// PackageID: param.PackageID,
// UploadInfo: param.UploadInfo,
// DataType: param.DataType,
// BlockChainToken: param.BlockChainToken,
// Task: param.Task,
// HubClient: param.HubClient,
//}
return param
}
func (s *DataUpload) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
@ -64,78 +65,43 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
var objectIDs []cdssdk.ObjectID
switch info := s.uploadInfo.(type) {
switch info := s.UploadInfo.(type) {
// 通过本地上传
case *sch.LocalUploadInfo:
objectIDs = info.ObjectIDs
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!")
s.sendStatus(s.UserID, s.PackageID, s.DataType, info, sch.SuccessStatus, "upload success!")
// 通过URL上传
case *sch.RemoteUploadInfo:
//uploaderCli, err := schglb.UploaderPool.Acquire()
//if err != nil {
// return fmt.Errorf("new scheduler client: %w", err)
//}
//defer schglb.UploaderPool.Release(uploaderCli)
//req := uploadersdk.UploadReq{
// DataType: s.dataType,
// Source: &uploadersdk.UrlSource{
// Type: sch.StorageTypeURL,
// Url: info.Url,
// },
// Target: &uploadersdk.UrlTarget{
// Type: sch.StorageTypeURL,
// ClusterID: uploadersdk.ClusterID(info.Cluster),
// JCSUploadInfo: cdsapi.ObjectUploadInfo{
// UserID: s.userID,
// PackageID: info.PackageID,
// },
// },
//}
branch := "master"
if info.Branch != "" {
branch = info.Branch
}
param := types.UploadReq{
DataType: s.dataType,
DataType: s.DataType,
Source: types.Source{
Url: info.Url,
Url: info.Url,
Branch: branch,
},
Target: types.Target{
ClusterId: string(info.Cluster),
JCSUploadInfo: types.ObjectUploadInfo{
UserID: int(s.userID),
PackageID: uint(info.PackageID),
UserID: int(s.UserID),
PackageID: uint(s.PackageID),
},
},
}
uploadResp, err := s.hubClient.DataUpload(param)
uploadResp, err := s.HubClient.DataUpload(param)
if err != nil {
return fmt.Errorf("upload to pcm-hub client error: %w", err)
}
//jsonData, err := json.Marshal(uploadResp.JsonData)
//println(jsonData)
//uploadResp, err := uploaderCli.Upload(req)
//if err != nil {
// s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("upload data: %s", err.Error()))
// return fmt.Errorf("upload data: %w", err)
//}
//if uploadResp.JsonData != "" {
// pkgDao := uploadersdk.PackageDAO{
// JsonData: uploadResp.JsonData,
// }
// err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), cdssdk.PackageID(uploadResp.PackageID), pkgDao)
// if err != nil {
// s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("update package: %w", err))
// return fmt.Errorf("update package: %w", err)
// }
//}
for _, id := range uploadResp.ObjectIDs {
objectIDs = append(objectIDs, cdssdk.ObjectID(id))
}
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!")
s.sendStatus(s.UserID, s.PackageID, s.DataType, info, sch.SuccessStatus, "upload success!")
}
// 传入存证
@ -179,7 +145,7 @@ func (s *DataUpload) sendStatus(userID cdssdk.UserID, packageID cdssdk.PackageID
Status: status,
Message: msg,
}
s.task.Send(message)
s.Task.Send(message)
}
func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.BlockChain, error) {
@ -192,7 +158,7 @@ func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.Blo
//objects, err := cdsCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{})
objects, err := cdsCli.Object().ListByIDs(cdsapi.ObjectListByIDs{
ObjectIDs: objectIDs,
UserID: s.userID,
UserID: s.UserID,
})
if err != nil {
logger.Error(fmt.Errorf("list objects: %w", err))
@ -226,8 +192,8 @@ func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.Blo
fileHash := obj.FileHash[4:]
var args = make(map[string]string)
args["userID"] = strconv.FormatInt(int64(s.userID), 10)
args["type"] = s.dataType
args["UserID"] = strconv.FormatInt(int64(s.UserID), 10)
args["type"] = s.DataType
args["fileName"] = fileName
args["fileHash"] = string(fileHash)
args["fileSize"] = strconv.FormatInt(obj.Size, 10)
@ -246,7 +212,7 @@ func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.Blo
Type: schmod.BlockChain_Upload,
Args: argsArr,
}
err = bcCli.BlockChainInvoke(req, s.blockChainToken)
err = bcCli.BlockChainInvoke(req, s.BlockChainToken)
if err != nil {
return nil, fmt.Errorf("invoke blockchain: %w", err)
}

View File

@ -123,7 +123,7 @@ func (s *PCMJobCreate) do(rtx jobmgr.JobStateRunContext) (*sch.CreateJobResp, er
resp, err := schCli.CreateJob(req)
if err != nil {
return nil, fmt.Errorf("create task: %w", err)
return nil, fmt.Errorf("create Task: %w", err)
}
return resp, nil

View File

@ -52,58 +52,36 @@ func (svc *JobSetService) PreScheduler(jobSet schsdk.JobSetInfo) (*jobmod.JobSet
return schScheme, uploadScheme, nil
}
func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) (*schsdk.JobSetID, error) {
func (svc *JobSetService) Upload(userID cdssdk.UserID, packageID cdssdk.PackageID, params sch.UploadParams, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) (*schsdk.JobSetID, error) {
logger.Debugf("uploading job")
// 查询数据库里维护的集群
//ccs, err := svc.db.ComputingCenter().GetAll(svc.db.DefCtx())
//if err != nil {
// logger.Warnf("getting all computing center: %s", err.Error())
// return nil, nil, err
//}
// 根据packageID 查询出对应的bucketID
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), packageID)
if err != nil {
logger.Warnf("getting upload data: %s", err.Error())
return nil, err
}
if pkg.PackageID == 0 {
return nil, errors.New("packageID is not found")
}
svc.hubClient.UserId = int(userID)
svc.hubClient.BucketID = int(pkg.BucketID)
// 获取集群与存储的对应关系
//clusterMapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
//if err != nil {
// return nil, nil, fmt.Errorf("query cluster mapping error: %w", err)
//}
//
//var storages []cdssdk.StorageID
//switch uploadPriority := params.UploadPriority.(type) {
//case *sch.Preferences:
// // 进行预调度
// clusterID, err := svc.preScheduler.ScheduleJob(uploadPriority.ResourcePriorities, clusterMapping)
// if err != nil {
// return nil, nil, fmt.Errorf("pre scheduling: %w", err)
// }
//
// storageID, ok := clusterMapping[*clusterID]
// if !ok {
// return nil, nil, fmt.Errorf("cluster %d not found", clusterID)
// }
//
// storages = append(storages, storageID)
//case *sch.SpecifyCluster:
// // 指定集群
// for _, clusterID := range uploadPriority.Clusters {
// storageID, ok := clusterMapping[clusterID]
// if !ok {
// logger.Warnf("cluster %d not found", clusterID)
// continue
// }
// storages = append(storages, storageID)
// }
//}
//
//if len(storages) == 0 {
// return nil, nil, errors.New("no storage is available")
//}
upload := state2.DataUpload{
UserID: userID,
PackageID: packageID,
BlockChainToken: blockChainToken,
Task: task,
HubClient: svc.hubClient,
DataType: params.DataType,
UploadInfo: params.UploadInfo,
}
var jobs []jobmgr.SubmittingJob
jo := job.NewNormalJob(schsdk.NormalJobInfo{})
jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo,
InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken, task, svc.hubClient),
InitState: state2.NewDataUpload(&upload),
})
jobSetID := svc.jobMgr.SubmitJobSet(jobs)
@ -1326,7 +1304,7 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
pkg := cdssdk.Package{
PackageID: param.PackageID,
Name: param.PackageName,
BucketID: param.BucketID,
//BucketID: param.BucketID,
}
parentPkg, err := svc.db.UploadData().GetParentClonePackageByPkgID(svc.db.DefCtx(), param.PackageID)
@ -1341,7 +1319,13 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
if parentPkg.ParentPackageID == 0 {
return nil, fmt.Errorf("code parent package is not exists")
}
// 查询package用于获取bucketID
queryPkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), param.PackageID)
if err != nil {
return nil, fmt.Errorf("query package: %w", err)
}
// 复制package
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
@ -1354,7 +1338,7 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
cloneReq := cdsapi.PackageClone{
PackageID: param.PackageID,
Name: packageName,
BucketID: param.BucketID,
BucketID: queryPkg.BucketID,
UserID: userID,
}
cloneResp, err := cdsCli.Package().Clone(cloneReq)