调整绑定、上传、调度等接口

This commit is contained in:
JeshuaRen 2025-02-21 14:30:37 +08:00
parent d541338a55
commit 318d4ffcb3
12 changed files with 526 additions and 297 deletions

View File

@ -37,8 +37,10 @@
"pcmHub": [
{
"name": "OpenI",
"APIKey": "",
"URL": "https://qaygilqyldcc.test.jointcloud.net:443"
"APIKey": "8cff1d2db9171462c02901d086d13221389fd082",
"URL": "https://qaygilqyldcc.test.jointcloud.net:443",
"username": "nudt-ysz",
"password": "nudt@123"
},
{
"name": "ModelArts",

View File

@ -1,59 +1,13 @@
{
"userID": 5,
"jobSetInfo": {
"jobs": [
{
"localJobID": "1",
"name": "trainingtask-suzxflhfijpl-01",
"description": "01",
"type": "PCM",
"files": {
"dataset": {
"type": "Binding",
"bindingID": 52
},
"model": {
"type": "Binding",
"bindingID": 55
},
"code": {
"type": "Binding",
"bindingID": 54
},
"image": {
"type": "Binding",
"bindingID": 53
}
},
"jobResources": {
"scheduleStrategy": "dataLocality",
"clusters": [
{
"clusterID": "1777240145309732864",
"runtime": {
"command": "",
"envs": [],
"params": []
},
"resources": [
{
"type": "CPU",
"number": 896
},
{
"type": "GPU"
},
{
"type": "RAM"
},
{
"type": "VRAM"
}
]
}
]
}
}
]
"type": "children",
"param": {
"clusterID": "1865927992266461184",
"name": "hello",
"description": "asd",
"packageID": 1161,
"filePath": "/1111.txt",
"parentImageID": 5,
"imageID": "1234"
}
}

View File

@ -7,7 +7,6 @@ import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
@ -26,7 +25,7 @@ var ManagerMQPool mgrmq.Pool
var PCMSchePool pcmsch.Pool
var UploaderPool uploadersdk.Pool
//var UploaderPool uploadersdk.Pool
var BlockChainPool blockchain.Pool
@ -43,9 +42,9 @@ func InitBlockChainPool(cfg *blockchain.Config) {
BlockChainConfig = cfg
}
func InitUploaderPool(cfg *uploadersdk.Config) {
UploaderPool = uploadersdk.NewPool(cfg)
}
//func InitUploaderPool(cfg *uploadersdk.Config) {
// UploaderPool = uploadersdk.NewPool(cfg)
//}
func InitPCMSchePool(cfg *pcmsch.Config) {
PCMSchePool = pcmsch.NewPool(cfg)

View File

@ -142,7 +142,7 @@ func (db *UploadDataDB) QueryPackageByBindingID(ctx SQLContext, id uploadersdk.D
return res, err
}
func (db *UploadDataDB) InsertPackage(ctx SQLContext, newPackage uploadersdk.Package) error {
func (db *UploadDataDB) InsertPackage(ctx SQLContext, newPackage uploadersdk.Package, clusters []uploadersdk.Cluster) error {
// 查询是否存在
if err := ctx.Table("package").Where("package_id = ?", newPackage.PackageID).First(&uploadersdk.PackageDAO{}).Error; err == nil {
@ -166,10 +166,19 @@ func (db *UploadDataDB) InsertPackage(ctx SQLContext, newPackage uploadersdk.Pac
BindingID: -1,
}
// 插入新包
if err := ctx.Table("package").Create(&dao).Error; err != nil {
tx := ctx.Begin()
if err = tx.Table("package").Create(&dao).Error; err != nil {
tx.Rollback()
return err
}
for _, cluster := range clusters {
if err = tx.Table("uploadedCluster").Create(&cluster).Error; err != nil {
tx.Rollback()
return err
}
}
tx.Commit()
return nil
}
@ -277,6 +286,16 @@ func (db *UploadDataDB) GetClusterMapping(ctx SQLContext) (map[schsdk.ClusterID]
return ret, nil
}
func (db *UploadDataDB) GetClusterByID(ctx SQLContext, clusterIDs []schsdk.ClusterID) ([]uploadersdk.ClusterMapping, error) {
var ret []uploadersdk.ClusterMapping
if err := ctx.Table("ClusterMapping").Where("cluster_id in ?", clusterIDs).Find(&ret).Error; err != nil {
return ret, err
}
return ret, nil
}
func (db *UploadDataDB) InsertUploadedCluster(ctx SQLContext, cluster uploadersdk.Cluster) error {
if err := ctx.Table("uploadedCluster").Create(&cluster).Error; err != nil {
@ -315,25 +334,30 @@ func (db *UploadDataDB) InsertClonePackage(ctx SQLContext, dao uploadersdk.Packa
return nil
}
func (db *UploadDataDB) GetClonePackageByPkgID(ctx SQLContext, packageID cdssdk.PackageID, clonePackageID cdssdk.PackageID) ([]uploadersdk.PackageCloneVO, error) {
func (db *UploadDataDB) GetParentClonePackageByPkgID(ctx SQLContext, packageID cdssdk.PackageID) (*uploadersdk.PackageCloneVO, error) {
var res uploadersdk.PackageCloneVO
// 获取父算法
err := ctx.Table("packageClone").
Where("parent_package_id = ? and clone_package_id = ?", packageID, packageID).
Preload("ClusterMapping").
Find(&res).Error
if err != nil {
return nil, err
}
return &res, nil
}
func (db *UploadDataDB) GetChildrenClonePackageByPkgID(ctx SQLContext, packageID cdssdk.PackageID) ([]uploadersdk.PackageCloneVO, error) {
var res []uploadersdk.PackageCloneVO
if packageID == -1 {
err := ctx.Table("packageClone").
Where("parent_package_id = ? and clone_package_id != ?", packageID, clonePackageID).
//Preload("ClusterMapping").
Find(&res).Error
if err != nil {
return nil, err
}
} else {
err := ctx.Table("packageClone").
Where("parent_package_id = ? and clone_package_id != ?", packageID, clonePackageID).
Preload("ClusterMapping").
Find(&res).Error
if err != nil {
return nil, err
}
// 获取子算法
err := ctx.Table("packageClone").
Where("parent_package_id = ? and clone_package_id != ?", packageID, packageID).
Preload("ClusterMapping").
Find(&res).Error
if err != nil {
return nil, err
}
return res, nil
@ -522,23 +546,40 @@ func (db *UploadDataDB) DeleteBindingsByID(ctx SQLContext, IDs []int64) error {
return err
}
if err := tx.Table("bindingCluster").Where("binding_id in ?", IDs).Delete(&uploadersdk.BindingCluster{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
func (db *UploadDataDB) InsertOrUpdateBinding(ctx SQLContext, data uploadersdk.Binding) (*uploadersdk.DataID, error) {
func (db *UploadDataDB) GetBindingByName(ctx SQLContext, userID cdssdk.UserID, name string, dataType string) (*uploadersdk.Binding, error) {
// 根据name查询是否存在
var existData uploadersdk.Binding
err := ctx.Table("BindingData").Where("user_id = ? and name = ? and data_type = ?", data.UserID, data.Name, data.DataType).Find(&existData).Error
err := ctx.Table("BindingData").Where("user_id = ? and name = ? and data_type = ?", userID, name, dataType).Find(&existData).Error
if err != nil {
return nil, err
}
// 如果存在,则报错
if existData.ID != 0 {
return nil, fmt.Errorf("the binding name is already exists")
}
return &existData, nil
}
err = ctx.Table("BindingData").Clauses(clause.OnConflict{
func (db *UploadDataDB) InsertOrUpdateBinding(ctx SQLContext, data uploadersdk.Binding, bindingClusters []uploadersdk.BindingCluster, packageID cdssdk.PackageID) (*uploadersdk.DataID, error) {
//// 根据name查询是否存在
//existData, err := db.GetBindingByName(ctx, data.UserID, data.Name, data.DataType)
//if err != nil {
// return nil, err
//}
//// 如果存在,则报错
//if existData.ID != 0 {
// return nil, fmt.Errorf("the binding name is already exists")
//}
tx := ctx.Begin()
// 插入BindingData
err := tx.Table("BindingData").Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ID"}}, // 指定冲突列
DoUpdates: clause.Assignments(map[string]interface{}{
"name": data.Name,
@ -547,13 +588,36 @@ func (db *UploadDataDB) InsertOrUpdateBinding(ctx SQLContext, data uploadersdk.B
}).Create(&data).Error
if err != nil {
tx.Rollback()
return nil, err
}
if data.ID == 0 {
tx.Rollback()
return nil, fmt.Errorf("insert failed: ID is 0")
}
// 插入bindingCluster
for _, bc := range bindingClusters {
bc.BindingID = data.ID
err = tx.Table("bindingCluster").Create(&bc).Error
if err != nil {
tx.Rollback()
return nil, err
}
}
// 更新package的binding_id
pkgDao := uploadersdk.PackageDAO{
BindingID: data.ID,
}
if err := tx.Table("package").Where("package_id = ?", packageID).Updates(&pkgDao).Error; err != nil {
tx.Rollback()
return nil, err
}
tx.Commit()
return &data.ID, nil
}

View File

@ -2609,3 +2609,12 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the
2025-02-19 15:19:22 [WARN] [HTTP:JobSet.Binding] getting service list: the binding name is already exists
2025-02-19 15:24:36 [INFO] start serving http at: :7891
2025-02-19 17:04:17 [INFO] start serving http at: :7891
2025-02-19 17:11:21 [INFO] start serving http at: :7891
2025-02-19 17:11:34 [WARN] [HTTP:JobSet.CreateFolder] creating folder: delete package: code: BadArgument, message: missing argument or invalid argument
2025-02-21 11:30:13 [INFO] start serving http at: :7891
2025-02-21 11:34:12 [WARN] [HTTP:JobSet.CreateFolder] creating folder: code package already exists
2025-02-21 11:38:03 [WARN] [HTTP:JobSet.CreateFolder] creating folder: clone package: code: BadArgument, message: missing argument or invalid argument
2025-02-21 14:20:38 [INFO] start serving http at: :7891
2025-02-21 14:20:53 [WARN] [HTTP:JobSet.CreateFolder] creating folder: code parent package is not exists
2025-02-21 14:22:35 [WARN] [HTTP:JobSet.CreateFolder] creating folder: clone package: code: BadArgument, message: missing argument or invalid argument
2025-02-21 14:25:36 [WARN] [HTTP:JobSet.CreateFolder] creating folder: clone package: code: BadArgument, message: missing argument or invalid argument

View File

@ -48,7 +48,7 @@ func serve(configPath string, address string) {
}
schglb.InitPCMSchePool(&config.Cfg().PCMScheduler)
schglb.InitUploaderPool(&config.Cfg().Uploader)
//schglb.InitUploaderPool(&config.Cfg().Uploader)
schglb.InitBlockChainPool(&config.Cfg().BlockChain)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
schglb.InitUploadCache()
@ -68,10 +68,9 @@ func serve(configPath string, address string) {
hubConfig := &pcmHubConfig.Config{
Platforms: config.Cfg().PCMHub,
}
hubClient, err := pcmHubClient.NewClient(hubConfig)
if err != nil {
logger.Fatalf("new pcm hub client failed, err: %s", err.Error())
}
//hubClient, err := pcmHubClient.NewClient(hubConfig)
hubClient := pcmHubClient.InitGeneralClient(hubConfig, -1)
svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient)
if err != nil {
logger.Fatalf("new service failed, err: %s", err.Error())

View File

@ -434,32 +434,6 @@ func (s *JobSetService) Binding(ctx *gin.Context) {
ctx.JSON(http.StatusOK, OK("success"))
}
func (s *JobSetService) ChildrenBinding(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.Binding")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[BindingReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
err = s.svc.JobSetSvc().DataBinding(req.ID, req.UserID, req.Info)
if err != nil {
log.Warnf("getting service list: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "binding data failed, error: "+err.Error()))
return
}
ctx.JSON(http.StatusOK, OK("success"))
}
type RemoveBindingReq struct {
PackageIDs []cdssdk.PackageID `json:"packageIDs"`
BindingIDs []int64 `json:"bindingIDs"`

View File

@ -3,6 +3,7 @@ package state2
import (
"encoding/json"
"fmt"
pcmHub "gitlink.org.cn/JointCloud/pcm-hub/client"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/sdks/blockchain"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
@ -22,14 +23,16 @@ type DataSchedule struct {
taskID sch.TaskID
scheduleData []sch.ScheduleData
blockChainToken string
hubClient *pcmHub.GeneralClient
}
func NewDataSchedule(userID cdssdk.UserID, taskID sch.TaskID, scheduleData []sch.ScheduleData, blockChainToken string) *DataSchedule {
func NewDataSchedule(userID cdssdk.UserID, taskID sch.TaskID, scheduleData []sch.ScheduleData, blockChainToken string, hubClient *pcmHub.GeneralClient) *DataSchedule {
return &DataSchedule{
userID: userID,
taskID: taskID,
scheduleData: scheduleData,
blockChainToken: blockChainToken,
hubClient: hubClient,
}
}
@ -44,11 +47,11 @@ func (s *DataSchedule) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
}
func (s *DataSchedule) do(rtx jobmgr.JobStateRunContext) ([]sch.DataScheduleResults, error) {
uploaderCli, err := schglb.UploaderPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new scheduler client: %w", err)
}
defer schglb.UploaderPool.Release(uploaderCli)
//uploaderCli, err := schglb.UploaderPool.Acquire()
//if err != nil {
// return nil, fmt.Errorf("new scheduler client: %w", err)
//}
//defer schglb.UploaderPool.Release(uploaderCli)
// 从数据库中获取集群映射
clusterMapping, err := rtx.Mgr.DB.UploadData().GetClusterMapping(rtx.Mgr.DB.DefCtx())
@ -108,36 +111,37 @@ func (s *DataSchedule) do(rtx jobmgr.JobStateRunContext) ([]sch.DataScheduleResu
ScheduleUrls: scheduleUrls,
}
}
println(scheduleTarget)
// 发送调度请求
req := uploadersdk.DataScheduleReq{
PackageID: data.PackageID,
DataType: data.DataType,
ScheduleTarget: scheduleTarget,
}
scheduleResult, err := uploaderCli.DataSchedule(req)
if err != nil {
return nil, fmt.Errorf("schedule data: %w", err)
}
if len(errResults) > 0 {
scheduleResult = append(scheduleResult, errResults...)
}
results = append(results, sch.DataScheduleResults{
DataType: data.DataType,
Results: scheduleResult,
})
// 写入到存证服务
blockChains, err := s.blockChain(scheduleResult)
if err != nil {
return nil, fmt.Errorf("blockchain: %w", err)
}
if blockChains != nil && len(blockChains) > 0 {
err = rtx.Mgr.DB.UploadData().InsertBlockchains(rtx.Mgr.DB.DefCtx(), blockChains)
if err != nil {
return nil, fmt.Errorf("insert blockchains: %w", err)
}
}
//req := uploadersdk.DataScheduleReq{
// PackageID: data.PackageID,
// DataType: data.DataType,
// ScheduleTarget: scheduleTarget,
//}
//scheduleResult, err := uploaderCli.DataSchedule(req)
//if err != nil {
// return nil, fmt.Errorf("schedule data: %w", err)
//}
//if len(errResults) > 0 {
// scheduleResult = append(scheduleResult, errResults...)
//}
//results = append(results, sch.DataScheduleResults{
// DataType: data.DataType,
// Results: scheduleResult,
//})
//
//// 写入到存证服务
//blockChains, err := s.blockChain(scheduleResult)
//if err != nil {
// return nil, fmt.Errorf("blockchain: %w", err)
//}
//if blockChains != nil && len(blockChains) > 0 {
// err = rtx.Mgr.DB.UploadData().InsertBlockchains(rtx.Mgr.DB.DefCtx(), blockChains)
// if err != nil {
// return nil, fmt.Errorf("insert blockchains: %w", err)
// }
//}
}
return results, nil
}

View File

@ -3,6 +3,8 @@ package state2
import (
"encoding/json"
"fmt"
pcmHub "gitlink.org.cn/JointCloud/pcm-hub/client"
"gitlink.org.cn/JointCloud/pcm-hub/storagekit/types"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/sdks/blockchain"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
@ -28,17 +30,20 @@ type DataUpload struct {
blockChainToken string
uploadID int64
//storages []cdssdk.StorageID
task *jobTask.JobTask[sch.TaskMessage]
lock sync.Mutex
task *jobTask.JobTask[sch.TaskMessage]
lock sync.Mutex
hubClient *pcmHub.GeneralClient
}
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) *DataUpload {
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage], hubClient *pcmHub.GeneralClient) *DataUpload {
hubClient.UserId = int(userID)
return &DataUpload{
userID: userID,
uploadInfo: uploadInfo,
dataType: dataType,
blockChainToken: blockChainToken,
task: task,
hubClient: hubClient,
}
}
@ -67,45 +72,69 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
// 通过URL上传
case *sch.RemoteUploadInfo:
uploaderCli, err := schglb.UploaderPool.Acquire()
if err != nil {
return fmt.Errorf("new scheduler client: %w", err)
}
defer schglb.UploaderPool.Release(uploaderCli)
//uploaderCli, err := schglb.UploaderPool.Acquire()
//if err != nil {
// return fmt.Errorf("new scheduler client: %w", err)
//}
//defer schglb.UploaderPool.Release(uploaderCli)
req := uploadersdk.UploadReq{
//req := uploadersdk.UploadReq{
// DataType: s.dataType,
// Source: &uploadersdk.UrlSource{
// Type: sch.StorageTypeURL,
// Url: info.Url,
// },
// Target: &uploadersdk.UrlTarget{
// Type: sch.StorageTypeURL,
// ClusterID: uploadersdk.ClusterID(info.Cluster),
// JCSUploadInfo: cdsapi.ObjectUploadInfo{
// UserID: s.userID,
// PackageID: info.PackageID,
// },
// },
//}
param := types.UploadReq{
DataType: s.dataType,
Source: &uploadersdk.UrlSource{
Type: sch.StorageTypeURL,
Url: info.Url,
Source: types.Source{
Url: info.Url,
},
Target: &uploadersdk.UrlTarget{
Type: sch.StorageTypeURL,
ClusterID: uploadersdk.ClusterID(info.Cluster),
JCSUploadInfo: cdsapi.ObjectUploadInfo{
UserID: s.userID,
PackageID: info.PackageID,
Target: types.Target{
ClusterId: string(info.Cluster),
JCSUploadInfo: types.ObjectUploadInfo{
UserID: int(s.userID),
PackageID: uint(info.PackageID),
},
},
}
uploadResp, err := uploaderCli.Upload(req)
uploadResp, err := s.hubClient.DataUpload(param)
if err != nil {
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("upload data: %s", err.Error()))
return fmt.Errorf("upload data: %w", err)
return fmt.Errorf("upload to pcm-hub client error: %w", err)
}
//jsonData, err := json.Marshal(uploadResp.JsonData)
//println(jsonData)
if uploadResp.JsonData != "" {
pkgDao := uploadersdk.PackageDAO{
JsonData: uploadResp.JsonData,
}
err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, pkgDao)
if err != nil {
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("update package: %w", err))
return fmt.Errorf("update package: %w", err)
}
//uploadResp, err := uploaderCli.Upload(req)
//if err != nil {
// s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("upload data: %s", err.Error()))
// return fmt.Errorf("upload data: %w", err)
//}
//if uploadResp.JsonData != "" {
// pkgDao := uploadersdk.PackageDAO{
// JsonData: uploadResp.JsonData,
// }
// err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), cdssdk.PackageID(uploadResp.PackageID), pkgDao)
// if err != nil {
// s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.FailedStatus, fmt.Sprintf("update package: %w", err))
// return fmt.Errorf("update package: %w", err)
// }
//}
for _, id := range uploadResp.ObjectIDs {
objectIDs = append(objectIDs, cdssdk.ObjectID(id))
}
objectIDs = uploadResp.ObjectIDs
s.sendStatus(s.userID, info.PackageID, s.dataType, info, sch.SuccessStatus, "upload success!")
}

View File

@ -2,6 +2,7 @@ package state2
import (
"fmt"
pcmHub "gitlink.org.cn/JointCloud/pcm-hub/client"
"gitlink.org.cn/cloudream/common/pkgs/logger"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
@ -16,13 +17,16 @@ type PCMJobCreate struct {
jobInfo *schsdk.PCMJobInfo
blockChainToken string
userID cdssdk.UserID
hubClient *pcmHub.GeneralClient
}
func NewPCMJobCreate(userID cdssdk.UserID, info *schsdk.PCMJobInfo, blockChainToken string) *PCMJobCreate {
func NewPCMJobCreate(userID cdssdk.UserID, info *schsdk.PCMJobInfo, blockChainToken string, hubClient *pcmHub.GeneralClient) *PCMJobCreate {
hubClient.UserId = int(userID)
return &PCMJobCreate{
userID: userID,
jobInfo: info,
blockChainToken: blockChainToken,
hubClient: hubClient,
}
}
@ -38,7 +42,7 @@ func (s *PCMJobCreate) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
logger.Error(err.Error())
rtx.Mgr.ChangeState(jo, state.FailureComplete(err))
} else {
rtx.Mgr.ChangeState(jo, NewDataSchedule(s.userID, scheduleData.TaskID, scheduleData.ScheduleDatas, s.blockChainToken))
rtx.Mgr.ChangeState(jo, NewDataSchedule(s.userID, scheduleData.TaskID, scheduleData.ScheduleDatas, s.blockChainToken, s.hubClient))
}
}
@ -87,25 +91,25 @@ func (s *PCMJobCreate) do(rtx jobmgr.JobStateRunContext) (*sch.CreateJobResp, er
dataDistribute.Code = append(dataDistribute.Code, sch.CodeDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
DataName: data.PackageName,
})
case sch.DATASET:
dataDistribute.Dataset = append(dataDistribute.Dataset, sch.DatasetDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
DataName: data.PackageName,
})
case sch.MODEL:
dataDistribute.Model = append(dataDistribute.Model, sch.ModelDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
DataName: data.PackageName,
})
case sch.IMAGE:
dataDistribute.Image = append(dataDistribute.Image, sch.ImageDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
DataName: data.PackageName,
})
}
}

View File

@ -8,7 +8,6 @@ import (
"gitlink.org.cn/JointCloud/pcm-hub/aikit/common/dataset"
"gitlink.org.cn/JointCloud/pcm-hub/aikit/common/model"
jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task"
"math/rand"
"sort"
"strconv"
@ -104,7 +103,7 @@ func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams,
jo := job.NewNormalJob(schsdk.NormalJobInfo{})
jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo,
InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken, task),
InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken, task, svc.hubClient),
})
jobSetID := svc.jobMgr.SubmitJobSet(jobs)
@ -125,7 +124,7 @@ func (svc *JobSetService) Submit(userID cdssdk.UserID, jobSet schsdk.JobSetInfo,
jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo,
//InitState: state.NewPreSchuduling(preSch),
InitState: state2.NewPCMJobCreate(userID, info, token),
InitState: state2.NewPCMJobCreate(userID, info, token, svc.hubClient),
})
case *schsdk.NormalJobInfo:
@ -433,100 +432,230 @@ func (svc *JobSetService) QueryUploaded(queryParams sch.QueryData) ([]uploadersd
func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserID, info sch.DataBinding) error {
var bindingData uploadersdk.Binding
var bindingClusters []uploadersdk.BindingCluster
var packageID cdssdk.PackageID
//isCode := false
//mapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
//if err != nil {
// return err
//}
switch bd := info.(type) {
case *sch.DatasetBinding:
pkg, err := svc.queryPackage(bd.PackageID)
pkg, clusterMap, err := svc.queryAndVerify(bd.PackageID, bd.ClusterIDs, userID, bd.Name, sch.DATASET)
if err != nil {
return err
}
filePath := sch.Split + sch.DATASET + sch.Split + pkg.PackageName
ds := &dataset.Dataset{Name: bd.Name, Description: bd.Description, FilePath: filePath, Category: dataset.CommonValue(bd.Category)}
resp, err := svc.hubClient.BindDataset("ModelArts", ds)
param := dataScheduleParam{
BindingClusterIDs: bd.ClusterIDs,
ClusterMap: clusterMap,
PackageID: bd.PackageID,
UploadedClusters: pkg.UploadedCluster,
UserID: userID,
}
// 如果目标集群没有数据,则需要尽心数据调度
err = svc.dataSchedule(param)
if err != nil {
return err
}
jsonData, err := json.Marshal(resp.Data)
if err != nil {
return err
for _, clusterID := range bd.ClusterIDs {
clusterInfo, ok := clusterMap[clusterID]
if !ok {
return fmt.Errorf("cluster %s not found", clusterID)
}
filePath := clusterInfo.StoragePath + sch.Split + sch.DATASET + sch.Split + pkg.PackageName
datasetParam := &dataset.Dataset{
Name: bd.Name,
Description: bd.Description,
FilePath: filePath,
Category: dataset.CommonValue(bd.Category),
}
var jsonData []byte
switch clusterInfo.ClusterName {
case sch.PlatformModelArts:
resp, err := svc.hubClient.ModelArts.BindDataset(datasetParam)
if err != nil {
return err
}
jsonData, err = json.Marshal(resp.Data)
if err != nil {
return err
}
case sch.PlatformOpenI:
datasetParam.RepoName = ""
resp, err := svc.hubClient.OpenI.BindDataset(datasetParam)
if err != nil {
return err
}
jsonData, err = json.Marshal(resp.Data)
if err != nil {
return err
}
}
bindingClusters = append(bindingClusters, uploadersdk.BindingCluster{
ClusterID: uploadersdk.ClusterID(clusterID),
Status: sch.SuccessStatus,
JsonData: string(jsonData),
})
}
content, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData))
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content))
packageID = bd.PackageID
case *sch.CodeBinding:
pkg, err := svc.queryPackage(bd.PackageID)
pkg, clusterMap, err := svc.queryAndVerify(bd.PackageID, bd.ClusterIDs, userID, bd.Name, sch.CODE)
if err != nil {
return err
}
filePath := sch.Split + sch.CODE + sch.Split + pkg.PackageName
engine := `
{
"EngineName": "Ascend-Powered-Engine",
"EngineVersion": "pytorch_1.11.0-cann_6.3.2-py_3.7-euler_2.8.3-aarch64-d910",
"ImageUrl": "atelier/pytorch_1_11_ascend:pytorch_1.11.0-cann_6.3.2-py_3.7-euler_2.8.3-aarch64-d910-20230727154652-7d74011",
"InstallSysPackages": true
}
`
code := &algorithm.Algorithm{Name: bd.Name, Description: bd.Description, Engine: engine, CodeDir: filePath, BootFile: filePath + sch.Split + bd.FilePath, Branch: "main"}
resp, err := svc.hubClient.BindAlgorithm("ModelArts", code)
if err != nil {
return err
param := dataScheduleParam{
BindingClusterIDs: bd.ClusterIDs,
ClusterMap: clusterMap,
PackageID: bd.PackageID,
UploadedClusters: pkg.UploadedCluster,
UserID: userID,
}
jsonData, err := json.Marshal(resp.Data)
// 如果目标集群没有数据,则需要尽心数据调度
err = svc.dataSchedule(param)
if err != nil {
return err
}
//isCode = true
for _, clusterID := range bd.ClusterIDs {
clusterInfo, ok := clusterMap[clusterID]
if !ok {
return fmt.Errorf("cluster %s not found", clusterID)
}
filePath := clusterInfo.StoragePath + sch.Split + sch.CODE + sch.Split + pkg.PackageName
codeParam := &algorithm.Algorithm{
Name: bd.Name,
Description: bd.Description,
CodeDir: filePath,
//Engine: "",
BootFile: "",
}
var jsonData []byte
switch clusterInfo.ClusterName {
case sch.PlatformModelArts:
resp, err := svc.hubClient.ModelArts.BindAlgorithm(codeParam)
if err != nil {
return err
}
jsonData, err = json.Marshal(resp.Data)
if err != nil {
return err
}
case sch.PlatformOpenI:
codeParam.Branch = "master"
resp, err := svc.hubClient.OpenI.BindAlgorithm(codeParam)
if err != nil {
return err
}
jsonData, err = json.Marshal(resp.Data)
if err != nil {
return err
}
}
bindingClusters = append(bindingClusters, uploadersdk.BindingCluster{
ClusterID: uploadersdk.ClusterID(clusterID),
Status: sch.SuccessStatus,
JsonData: string(jsonData),
})
}
content, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData))
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content))
packageID = bd.PackageID
case *sch.ImageBinding:
content, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), "")
//content, err := json.Marshal(bd)
//if err != nil {
// return err
//}
//bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), "")
//packageID = bd.PackageID
return fmt.Errorf("not support image binding")
case *sch.ModelBinding:
pkg, err := svc.queryPackage(bd.PackageID)
pkg, clusterMap, err := svc.queryAndVerify(bd.PackageID, bd.ClusterIDs, userID, bd.Name, sch.MODEL)
if err != nil {
return err
}
filePath := sch.Split + sch.MODEL + sch.Split + pkg.PackageName
engine := "TensorFlow"
md := &model.Model{Name: bd.Name, Description: bd.Description, Type: bd.Category, Version: bd.Version, Engine: model.CommonValue(engine), FilePath: filePath}
resp, err := svc.hubClient.BindModel("ModelArts", md)
param := dataScheduleParam{
BindingClusterIDs: bd.ClusterIDs,
ClusterMap: clusterMap,
PackageID: bd.PackageID,
UploadedClusters: pkg.UploadedCluster,
UserID: userID,
}
// 如果目标集群没有数据,则需要尽心数据调度
err = svc.dataSchedule(param)
if err != nil {
return err
}
jsonData, err := json.Marshal(resp.Data)
if err != nil {
return err
for _, clusterID := range bd.ClusterIDs {
clusterInfo, ok := clusterMap[clusterID]
if !ok {
return fmt.Errorf("cluster %s not found", clusterID)
}
filePath := clusterInfo.StoragePath + sch.Split + sch.MODEL + sch.Split + pkg.PackageName
modelParam := &model.Model{
Name: bd.Name,
Description: bd.Description,
Type: bd.ModelType,
FilePath: filePath,
//Engine: model.CommonValue(),
//Version:
}
var jsonData []byte
switch clusterInfo.ClusterName {
case sch.PlatformModelArts:
resp, err := svc.hubClient.ModelArts.BindModel(modelParam)
if err != nil {
return err
}
jsonData, err = json.Marshal(resp.Data)
if err != nil {
return err
}
case sch.PlatformOpenI:
modelParam.RepoName = ""
resp, err := svc.hubClient.OpenI.BindModel(modelParam)
if err != nil {
return err
}
jsonData, err = json.Marshal(resp.Data)
if err != nil {
return err
}
}
bindingClusters = append(bindingClusters, uploadersdk.BindingCluster{
ClusterID: uploadersdk.ClusterID(clusterID),
Status: sch.SuccessStatus,
JsonData: string(jsonData),
})
}
content, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData))
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content))
packageID = bd.PackageID
}
@ -534,61 +663,113 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI
bindingData.AccessLevel = sch.PrivateAccess
}
bindingData.CreateTime = time.Now()
bindingID, err := svc.db.UploadData().InsertOrUpdateBinding(svc.db.DefCtx(), bindingData)
_, err := svc.db.UploadData().InsertOrUpdateBinding(svc.db.DefCtx(), bindingData, bindingClusters, packageID)
if err != nil {
return err
}
pkgDao := uploadersdk.PackageDAO{
BindingID: *bindingID,
}
err = svc.db.UploadData().UpdatePackage(svc.db.DefCtx(), packageID, pkgDao)
if err != nil {
return err
}
// 算法类型需要进行版本管理
//if isCode {
// pkg, err := svc.db.UploadData().QueryPackageByID(svc.db.DefCtx(), packageID)
// if err != nil {
// return err
// }
// err = svc.db.UploadData().InsertClonePackage(svc.db.DefCtx(), packageID, packageID, pkg.PackageName, 1)
// if err != nil {
// return err
// }
//}
return nil
}
func (svc *JobSetService) queryPackage(pacakgeID cdssdk.PackageID) (*uploadersdk.PackageDAO, error) {
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), pacakgeID)
if err != nil {
return nil, err
}
if pkg.PackageID == 0 {
return nil, fmt.Errorf("no package found")
type dataScheduleParam struct {
UserID cdssdk.UserID
PackageID cdssdk.PackageID
BindingClusterIDs []schsdk.ClusterID
UploadedClusters []uploadersdk.Cluster
ClusterMap map[schsdk.ClusterID]uploadersdk.ClusterMapping
}
func (svc *JobSetService) dataSchedule(param dataScheduleParam) error {
// 筛选出需要数据调度的集群
var clusters []uploadersdk.ClusterMapping
for _, cid := range param.BindingClusterIDs {
isMatch := false
for _, cluster := range param.UploadedClusters {
if cid == cluster.ClusterID {
isMatch = true
break
}
}
if !isMatch {
clusterInfo, ok := param.ClusterMap[cid]
if ok {
clusters = append(clusters, clusterInfo)
}
}
}
// 进行数据调度
for _, cluster := range clusters {
// 数据调度
_, err := svc.hubClient.LoadPackage(uint(param.PackageID), uint(param.UserID), uint(cluster.StorageID), "")
if err != nil {
logger.Error("data schedule failed, error: ", err.Error())
return err
}
// 将调度成功的集群加入到uploadedCluster
cluster := uploadersdk.Cluster{
PackageID: param.PackageID,
ClusterID: cluster.ClusterID,
StorageID: cluster.StorageID,
}
err = svc.db.UploadData().InsertUploadedCluster(svc.db.DefCtx(), cluster)
if err != nil {
logger.Error("insert uploadedCluster failed, error: ", err.Error())
return err
}
}
return nil
}
func (svc *JobSetService) queryAndVerify(pacakgeID cdssdk.PackageID, clusterIDs []schsdk.ClusterID, userID cdssdk.UserID, name string, dataType string) (*uploadersdk.PackageDAO, map[schsdk.ClusterID]uploadersdk.ClusterMapping, error) {
// 查询是否已经绑定
existBinding, err := svc.db.UploadData().GetBindingByName(svc.db.DefCtx(), userID, name, dataType)
if err != nil {
return nil, nil, err
}
if existBinding.ID != 0 {
return nil, nil, fmt.Errorf("name %s already exists", name)
}
// 查询package
pkg, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), pacakgeID)
if err != nil {
return nil, nil, err
}
if pkg.PackageID == 0 {
return nil, nil, fmt.Errorf("no package found")
}
// 如果这个package已经被绑定则不允许再绑定
if pkg.BindingID != -1 {
binding, err := svc.db.UploadData().GetBindingsByID(svc.db.DefCtx(), pkg.BindingID)
if err != nil {
return nil, err
return nil, nil, err
}
return nil, fmt.Errorf("binding already exists, name: " + binding.Name)
return nil, nil, fmt.Errorf("binding already exists, name: " + binding.Name)
}
return &pkg, nil
clusterMap := make(map[schsdk.ClusterID]uploadersdk.ClusterMapping)
clusters, err := svc.db.UploadData().GetClusterByID(svc.db.DefCtx(), clusterIDs)
if err != nil {
return nil, nil, err
}
for _, cluster := range clusters {
clusterMap[cluster.ClusterID] = cluster
}
return &pkg, clusterMap, nil
}
func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string, jsonData string) uploadersdk.Binding {
func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string) uploadersdk.Binding {
bindingData := uploadersdk.Binding{
ID: id,
Name: name,
DataType: dataType,
UserID: userID,
Content: content,
JsonData: jsonData,
//JsonData: jsonData,
}
return bindingData
}
@ -838,23 +1019,23 @@ func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataT
}
// 对Package进行预调度并写入到数据库中
err = svc.packageScheduler(pkg.PackageID, uploadPriority)
clusters, err := svc.packageScheduler(pkg.PackageID, uploadPriority)
if err != nil {
return err
}
// 写入数据库存档
err = svc.JobSetSvc().db.UploadData().InsertPackage(svc.db.DefCtx(), pkg)
err = svc.JobSetSvc().db.UploadData().InsertPackage(svc.db.DefCtx(), pkg, clusters)
if err != nil {
return err
}
return nil
}
func (svc *JobSetService) packageScheduler(packageID cdssdk.PackageID, uploadPriority sch.UploadPriority) error {
func (svc *JobSetService) packageScheduler(packageID cdssdk.PackageID, uploadPriority sch.UploadPriority) ([]uploadersdk.Cluster, error) {
clusterMapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
if err != nil {
return fmt.Errorf("query cluster mapping error: %w", err)
return nil, fmt.Errorf("query cluster mapping error: %w", err)
}
var clusters []uploadersdk.Cluster
@ -863,12 +1044,12 @@ func (svc *JobSetService) packageScheduler(packageID cdssdk.PackageID, uploadPri
// 进行预调度
clusterID, err := svc.preScheduler.ScheduleJob(uploadPriority.ResourcePriorities, clusterMapping)
if err != nil {
return fmt.Errorf("pre scheduling: %w", err)
return nil, fmt.Errorf("pre scheduling: %w", err)
}
storageID, ok := clusterMapping[*clusterID]
if !ok {
return fmt.Errorf("cluster %d not found", clusterID)
return nil, fmt.Errorf("cluster %d not found", clusterID)
}
cluster := uploadersdk.Cluster{
PackageID: packageID,
@ -894,17 +1075,17 @@ func (svc *JobSetService) packageScheduler(packageID cdssdk.PackageID, uploadPri
}
if len(clusters) == 0 {
return errors.New("no storage is available")
return nil, errors.New("no storage is available")
}
for _, clst := range clusters {
err := svc.db.UploadData().InsertUploadedCluster(svc.db.DefCtx(), clst)
if err != nil {
return err
}
}
//for _, clst := range clusters {
// err := svc.db.UploadData().InsertUploadedCluster(svc.db.DefCtx(), clst)
// if err != nil {
// return nil, err
// }
//}
return nil
return clusters, nil
}
func (svc *JobSetService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error {
@ -1147,17 +1328,20 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
Name: param.PackageName,
BucketID: param.BucketID,
}
pkgs, err := svc.db.UploadData().GetClonePackageByPkgID(svc.db.DefCtx(), param.PackageID, -1)
parentPkg, err := svc.db.UploadData().GetParentClonePackageByPkgID(svc.db.DefCtx(), param.PackageID)
if err != nil {
return nil, fmt.Errorf("query clone package: %w", err)
}
if len(pkgs) != 0 {
return nil, fmt.Errorf("code package already exists")
}
// 如果子算法列表已经存在数据则执行clone操作新增子算法
if cloneType == sch.ChildrenType {
switch cloneType {
case sch.ChildrenType:
// 判断父算法是否已经创建
if parentPkg.ParentPackageID == 0 {
return nil, fmt.Errorf("code parent package is not exists")
}
// 复制package
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
@ -1165,7 +1349,7 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
version := strconv.FormatInt(time.Now().Unix(), 10) + "_" + string(rune(rand.Intn(1000)))
version := strconv.FormatInt(time.Now().Unix(), 10)
packageName := fmt.Sprintf("%s_%s", param.PackageName, version)
cloneReq := cdsapi.PackageClone{
PackageID: param.PackageID,
@ -1179,6 +1363,11 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
}
clonePackageID = cloneResp.Package.PackageID
pkg = cloneResp.Package
case sch.ParentType:
// 判断父算法是否已经创建
if parentPkg.ParentPackageID != 0 {
return nil, fmt.Errorf("code parent package alread exists")
}
}
packageCloneDAO := uploadersdk.PackageCloneDAO{
@ -1205,6 +1394,7 @@ func (svc *JobSetService) ClonePackage(userID cdssdk.UserID, param uploadersdk.P
func (svc *JobSetService) QueryClonePackage(packageID cdssdk.PackageID, userID cdssdk.UserID, dataType string) ([]uploadersdk.PackageCloneVO, error) {
// 获取父算法列表
if packageID == -1 {
pkgs, err := svc.db.UploadData().GetCloneParentPackage(svc.db.DefCtx(), userID, dataType)
if err != nil {
@ -1214,7 +1404,8 @@ func (svc *JobSetService) QueryClonePackage(packageID cdssdk.PackageID, userID c
return pkgs, nil
}
pkgs, err := svc.db.UploadData().GetClonePackageByPkgID(svc.db.DefCtx(), packageID, packageID)
// 获取子算法
pkgs, err := svc.db.UploadData().GetChildrenClonePackageByPkgID(svc.db.DefCtx(), packageID)
if err != nil {
return nil, fmt.Errorf("query children package: %w", err)
}

View File

@ -11,11 +11,11 @@ type Service struct {
preScheduler prescheduler2.PreScheduler
jobMgr *jobmgr.Manager
db *db.DB
hubClient *hub.Client
hubClient *hub.GeneralClient
//JobTask *jobTask.JobTask[string]
}
func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB, hubClient *hub.Client) (*Service, error) {
func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB, hubClient *hub.GeneralClient) (*Service, error) {
return &Service{
preScheduler: preScheduler,
jobMgr: jobMgr,