优化任务提交等接口

This commit is contained in:
JeshuaRen 2025-01-25 08:01:57 +08:00
parent a03436cc10
commit 3b1c478e1d
19 changed files with 3209 additions and 301 deletions

View File

@ -30,21 +30,22 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
}
func (svc *Service) PackageGetLoadedStgs(msg *colmq.PackageGetLoadedStgs) (*colmq.PackageGetLoadedStgsResp, *mq.CodeMessage) {
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
logger.Warnf("new storage client, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "new storage client failed")
}
defer schglb.CloudreamStoragePool.Release(stgCli)
resp, err := stgCli.Package().GetLoadedStorages(cdsapi.PackageGetLoadedStoragesReq{
PackageID: msg.PackageID,
UserID: msg.UserID,
})
if err != nil {
logger.Warnf("get package loaded stg nodes failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get package loaded stg nodes failed")
}
return mq.ReplyOK(colmq.NewPackageGetLoadedStgsResp(resp.StorageIDs))
//stgCli, err := schglb.CloudreamStoragePool.Acquire()
//if err != nil {
// logger.Warnf("new storage client, err: %s", err.Error())
// return nil, mq.Failed(errorcode.OperationFailed, "new storage client failed")
//}
//defer schglb.CloudreamStoragePool.Release(stgCli)
//
//resp, err := stgCli.Package().GetLoadedStorages(cdsapi.PackageGetLoadedStoragesReq{
// PackageID: msg.PackageID,
// UserID: msg.UserID,
//})
//if err != nil {
// logger.Warnf("get package loaded stg nodes failed, err: %s", err.Error())
// return nil, mq.Failed(errorcode.OperationFailed, "get package loaded stg nodes failed")
//}
//
//return mq.ReplyOK(colmq.NewPackageGetLoadedStgsResp(resp.StorageIDs))
return mq.ReplyOK(colmq.NewPackageGetLoadedStgsResp(nil))
}

View File

@ -21,14 +21,15 @@
"url": "https://comnet.jointcloud.net/pcm/v1/schedule"
},
"uploader": {
"url": "https://kbguhfxfanfp.test.jointcloud.net:443/v1/storage"
"url": "https://kbguhfxfanfp.test.jointcloud.net:443/v1/storage/data"
},
"blockChain": {
"url": "https://ai4m.jointcloud.net/blockChain",
"contractAddress": "0xc860ab27901b3c2b810165a6096c64d88763617f",
"functionName": "storeEvidence",
"memberName": "pcm",
"type": "6"
"url": "https://dev.jointcloud.net/apis",
"loginUrl": "https://dev.jointcloud.net/apis",
"userName": "zhishi",
"password": "123456",
"contractAddress": "0x297df6a189fe951ef0c304e70d4ad96d5d674559",
"functionName": "storeEvidence"
},
"cloudreamStorage": {
"url": "http://localhost:32010"

View File

@ -1,34 +1,36 @@
{
"uploadParams": {
"dataType": "dataset",
"uploadInfo": {
"type": "local",
"localPath": "yuque_mind.jpeg"
"inputs": [
{
"url": "https://pcm2-bucket2.obs.cn-north-4.myhuaweicloud.com:443/cds/blocks/50/Full50B464DB2FDDC29D0380D9FFAB6D944FAF5C7624955D757939280590F01F3ECD?AWSAccessKeyId=CANMDYKXIWRDR0IYDB32&Expires=1737685829&Signature=m4IdspV4RQcJgm4Ls9%2Fosaxr2o4%3D"
},
"dataName": "yuque_mind.jpeg",
"uploadPriority": {
"type": "preference",
"priorities": [
{
"type": "region",
"options": [
"华东区域",
"华北区域"
]
},
{
"type": "chip",
"options": [
"DCU"
]
},
{
"type": "bias",
"options": [
"网络优先"
]
}
]
{
"url": "https://pcm2-bucket2.obs.cn-north-4.myhuaweicloud.com:443/cds/blocks/C0/FullC036CBB7553A909F8B8877D4461924307F27ECB66CFF928EEEAFD569C3887E29?AWSAccessKeyId=CANMDYKXIWRDR0IYDB32&Expires=1737685829&Signature=pQTrQSPUw3k4h4Q6yfZp3a8wQ6M%3D"
},
{
"url": "https://pcm2-bucket2.obs.cn-north-4.myhuaweicloud.com:443/cds/blocks/54/Full543F38D9F524238AC0239263AA0DD4B4328763818EA98A7A5F72E59748FDA27A?AWSAccessKeyId=CANMDYKXIWRDR0IYDB32&Expires=1737685829&Signature=voCaGtEJ6z%2Bp8TWl8hJw4ytatnE%3D"
}
}
],
"outputs": [
"/tmp/IGsnvEIyru_0",
"/tmp/IGsnvEIyru_1",
"/tmp/IGsnvEIyru_2"
],
"coefs": [
[
186,
2,
185
],
[
186,
3,
184
],
[
1,
1,
1
]
],
"chunkSize": 5242880
}

View File

@ -144,6 +144,14 @@ const (
HuaweiCloud = "HuaweiCloud"
AliCloud = "AliCloud"
SugonCloud = "SugonCloud"
Write = "write"
Schedule = "schedule"
Update = "update"
BlockChain_Upload = "5"
BlockChain_Schedule = "6"
BlockChain_Access = "7"
)
//type FileUploadedInfo struct {

144
common/pkgs/db/access.go Normal file
View File

@ -0,0 +1,144 @@
package db
import (
"fmt"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
)
type AccessDB struct {
*DB
}
func (db *DB) Access() *AccessDB {
return &AccessDB{DB: db}
}
func (db *AccessDB) CreateAccessRequest(ctx SQLContext, ret sch.PermissionApply) error {
if err := ctx.Table("accessRequests").Create(&ret).Error; err != nil {
return err
}
return nil
}
func (db *AccessDB) GetAccessRequestByID(ctx SQLContext, id int64) (sch.AccessRequest, error) {
var ret sch.AccessRequest
if err := ctx.Table("accessRequests").Where("id = ?", id).First(&ret).Error; err != nil {
return ret, err
}
return ret, nil
}
func (db *AccessDB) GetAccessRequestByBindingID(ctx SQLContext, applicantID cdssdk.UserID, ownerID cdssdk.UserID, id int64, status []string) (sch.AccessRequest, error) {
var ret sch.AccessRequest
if applicantID != -1 {
if err := ctx.Table("accessRequests").Where("binding_id = ? and applicant_id = ? and status not in ?", id, applicantID, status).First(&ret).Error; err != nil {
return ret, err
}
}
if ownerID != -1 {
if err := ctx.Table("accessRequests").Where("binding_id = ? and data_owner_id = ? and status in ?", id, ownerID, status).First(&ret).Error; err != nil {
return ret, err
}
}
return ret, nil
}
func (db *AccessDB) GetAccessRequest(ctx SQLContext, userID cdssdk.UserID, dataType string) ([]uploadersdk.BindingAccessData, error) {
var ret []uploadersdk.BindingAccessData
sql := `
select
ar.id as ID,
bd.user_id,
bd.name,
bd.data_type,
bd.content,
bd.access_level,
ar.applicant_id,
ar.status
from
(
select
id,
data_owner_id as user_id,
applicant_id,
binding_id,
status
from
accessRequests
where data_owner_id = ?
and status in ?
) as ar
left join (
select
*
from
BindingData
where
user_id = ?
and data_type = ?
and access_level = ?
) as bd on
bd.user_id = ar.user_id
and bd.id = ar.binding_id
`
status := []string{sch.PendingStatus, sch.ApprovedStatus, sch.RejectedStatus}
err := ctx.Raw(sql, userID, status, userID, dataType, sch.ApplyAccess).Scan(&ret).Error
if err != nil {
return ret, err
}
return ret, nil
}
func (db *AccessDB) UpdateStatusByID(ctx SQLContext, id int64, status string, reason string) error {
if status == sch.RejectedStatus {
err := ctx.Table("accessRequests").Where("id = ?", id).Update("status", status).Update("reject_reason", reason).Error
if err != nil {
return err
}
} else {
err := ctx.Table("accessRequests").Where("id = ?", id).Update("status", status).Error
if err != nil {
return err
}
}
return nil
}
func (db *AccessDB) UpdateStatusByBindingID(ctx SQLContext, id uploadersdk.DataID, oldStatus string, newStatus string) error {
err := ctx.Table("accessRequests").Where("binding_id = ? and status = ?", id, oldStatus).Update("status", newStatus).Error
if err != nil {
return err
}
return nil
}
func (db *AccessDB) CreateAccessLog(ctx SQLContext, ret sch.AccessLog) error {
return ctx.Table("accessLogs").Create(ret).Error
}
func (db *AccessDB) GetUserBySsoID(ctx SQLContext, id string) (cdssdk.UserID, error) {
var ret cdssdk.UserID
sql := `
select
id
from
users
where
sso_id = ?
`
err := ctx.Raw(sql, id).Scan(&ret).Error
if err != nil {
return ret, err
}
if ret == 0 {
return ret, fmt.Errorf("user not found")
}
return ret, nil
}

View File

@ -1,10 +1,13 @@
package db
import (
"encoding/json"
"fmt"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
"gitlink.org.cn/cloudream/common/utils/serder"
"gorm.io/gorm/clause"
"strings"
"time"
@ -18,9 +21,9 @@ func (db *DB) UploadData() *UploadDataDB {
return &UploadDataDB{DB: db}
}
func (db *UploadDataDB) GetByPackageID(ctx SQLContext, packageIDs []cdssdk.PackageID, bindingIDs []int64) ([]uploadersdk.Package, error) {
var ret []uploadersdk.Package
err := ctx.Table("UploadData").Where("packageID IN ? or bindingID IN ?", packageIDs, bindingIDs).Find(&ret).Error
func (db *UploadDataDB) GetByPackageID(ctx SQLContext, packageIDs []cdssdk.PackageID, bindingIDs []int64) ([]uploadersdk.PackageDAO, error) {
var ret []uploadersdk.PackageDAO
err := ctx.Table("package").Where("package_id IN ? or binding_id IN ?", packageIDs, bindingIDs).Preload("UploadedCluster").Find(&ret).Error
return ret, err
}
@ -79,7 +82,38 @@ func (db *UploadDataDB) DeleteFolder(ctx SQLContext, packageID cdssdk.PackageID,
func (db *UploadDataDB) QueryPackage(ctx SQLContext, queryParams sch.QueryData) ([]uploadersdk.Package, error) {
var ret []uploadersdk.PackageDAO
err := ctx.Table("package").Where("user_id = ? and data_type = ?", queryParams.UserID, queryParams.DataType).Find(&ret).Error
err := ctx.Table("package").Where("user_id = ? and data_type = ?", queryParams.UserID, queryParams.DataType).Preload("UploadedCluster").Find(&ret).Error
// 将数据转换成 uploadersdk.Package
var res []uploadersdk.Package
for _, dao := range ret {
uploadPriority, err := serder.JSONToObjectEx[sch.UploadPriority]([]byte(dao.UploadPriority))
if err != nil {
return nil, err
}
pkg := uploadersdk.Package{
UserID: dao.UserID,
PackageID: dao.PackageID,
PackageName: dao.PackageName,
BucketID: dao.BucketID,
DataType: dao.DataType,
JsonData: dao.JsonData,
BindingID: dao.BindingID,
UploadPriority: uploadPriority,
CreateTime: dao.CreateTime,
UploadedCluster: dao.UploadedCluster,
}
res = append(res, pkg)
}
return res, err
}
func (db *UploadDataDB) QueryPackageByBindingID(ctx SQLContext, id uploadersdk.DataID) ([]uploadersdk.Package, error) {
var ret []uploadersdk.PackageDAO
err := ctx.Table("package").Where("binding_id = ?", id).Preload("Versions").Find(&ret).Error
// 将数据转换成 uploadersdk.Package
var res []uploadersdk.Package
@ -88,10 +122,13 @@ func (db *UploadDataDB) QueryPackage(ctx SQLContext, queryParams sch.QueryData)
UserID: dao.UserID,
PackageID: dao.PackageID,
PackageName: dao.PackageName,
BucketID: dao.BucketID,
DataType: dao.DataType,
JsonData: dao.JsonData,
BindingID: dao.BindingID,
CreateTime: dao.CreateTime,
UploadedCluster: dao.UploadedCluster,
Versions: dao.Versions,
}
res = append(res, pkg)
}
@ -103,15 +140,24 @@ func (db *UploadDataDB) InsertPackage(ctx SQLContext, newPackage uploadersdk.Pac
// 查询是否存在
if err := ctx.Table("package").Where("package_id = ?", newPackage.PackageID).First(&uploadersdk.PackageDAO{}).Error; err == nil {
return nil
return fmt.Errorf("package already exists")
}
// 将uploadPriority转成json string
uploadPriorityStr, err := json.Marshal(newPackage.UploadPriority)
if err != nil {
return fmt.Errorf("failed to marshal uploadPriority: %w", err)
}
dao := uploadersdk.PackageDAO{
PackageID: newPackage.PackageID,
PackageName: newPackage.PackageName,
DataType: newPackage.DataType,
UserID: newPackage.UserID,
BindingID: -1,
PackageID: newPackage.PackageID,
PackageName: newPackage.PackageName,
DataType: newPackage.DataType,
UserID: newPackage.UserID,
BucketID: newPackage.BucketID,
UploadPriority: string(uploadPriorityStr),
CreateTime: time.Now(),
BindingID: -1,
}
// 插入新包
@ -122,16 +168,91 @@ func (db *UploadDataDB) InsertPackage(ctx SQLContext, newPackage uploadersdk.Pac
}
func (db *UploadDataDB) DeletePackage(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) error {
err := ctx.Table("package").
// 开启事务
tx := ctx.Begin()
err := tx.Table("package").
Where("package_id = ? and user_id = ?", packageID, userID).
Delete(&uploadersdk.PackageDAO{}).Error
return err
if err != nil {
tx.Rollback()
return err
}
err = tx.Table("uploadedCluster").
Where("package_id = ?", packageID).
Delete(&uploadersdk.Cluster{}).Error
if err != nil {
tx.Rollback()
return err
}
err = tx.Table("folders").
Where("package_id = ?", packageID).
Delete(&uploadersdk.Folder{}).Error
if err != nil {
tx.Rollback()
return err
}
err = tx.Table("packageVersion").
Where("parent_package_id = ?", packageID).
Delete(&uploadersdk.PackageVersion{}).Error
if err != nil {
tx.Rollback()
return err
}
// 提交事务
tx.Commit()
return nil
}
func (db *UploadDataDB) QueryPackageByID(ctx SQLContext, ID cdssdk.PackageID) (uploadersdk.PackageDAO, error) {
var ret uploadersdk.PackageDAO
err := ctx.Table("package").Where("package_id = ?", ID).Omit("Objects").Preload("UploadedCluster").Find(&ret).Error
return ret, err
err := ctx.Table("package").Where("package_id = ?", ID).
Omit("Objects").Preload("Versions").
Preload("UploadedCluster").Find(&ret).Error
if err != nil {
return ret, err
}
// 判断是否查询到数据
if ret.PackageID == 0 {
sql := `
select
pv.package_id,
pv.package_name,
pkg.user_id,
pkg.bucket_id,
pkg.data_type,
pkg.json_data,
pkg.binding_id,
pkg.upload_priority
from
(
select
*
from
packageVersion
where
package_id = ?
) as pv
left join(
select
*
from
package
) as pkg
on
pv.parent_package_id = pkg.package_id
`
err = ctx.Raw(sql, ID).Scan(&ret).Error
if err != nil {
return ret, err
}
if ret.PackageID == 0 {
return ret, fmt.Errorf("package not found")
}
}
return ret, nil
}
type ClusterMappingRow struct {
@ -155,6 +276,14 @@ func (db *UploadDataDB) GetClusterMapping(ctx SQLContext) (map[schsdk.ClusterID]
return ret, nil
}
func (db *UploadDataDB) InsertUploadedCluster(ctx SQLContext, cluster uploadersdk.Cluster) error {
if err := ctx.Table("uploadedCluster").Create(&cluster).Error; err != nil {
return err
}
return nil
}
func (db *UploadDataDB) UpdatePackage(ctx SQLContext, packageID cdssdk.PackageID, jsonData string, bindingID uploadersdk.DataID) error {
if jsonData != "" {
@ -172,6 +301,30 @@ func (db *UploadDataDB) UpdatePackage(ctx SQLContext, packageID cdssdk.PackageID
return nil
}
func (db *UploadDataDB) InsertPackageVersion(ctx SQLContext, parentPackageID cdssdk.PackageID, packageID cdssdk.PackageID, packageName string, version int64) error {
if err := ctx.Table("packageVersion").Where("parent_package_id = ? and package_version = ?", parentPackageID, version).First(&uploadersdk.PackageVersion{}).Error; err == nil {
return fmt.Errorf("package %d has version %d", parentPackageID, version)
}
if err := ctx.Table("packageVersion").Create(&uploadersdk.PackageVersion{
ParentPackageID: parentPackageID,
PackageID: packageID,
PackageName: packageName,
Version: version,
}).Error; err != nil {
return err
}
return nil
}
func (db *UploadDataDB) GetMaxVersion(ctx SQLContext, parentPackageID cdssdk.PackageID) (int64, error) {
sql := "SELECT MAX(package_version) FROM packageVersion WHERE parent_package_id = ? or package_id = ?"
var ret int64
if err := ctx.Raw(sql, parentPackageID, parentPackageID).Scan(&ret).Error; err != nil {
return 0, err
}
return ret, nil
}
func (db *UploadDataDB) InsertBlockchains(ctx SQLContext, blockchains []*uploadersdk.BlockChain) error {
if err := ctx.Table("BlockChain").Create(&blockchains).Error; err != nil {
@ -181,18 +334,205 @@ func (db *UploadDataDB) InsertBlockchains(ctx SQLContext, blockchains []*uploade
return nil
}
func (db *UploadDataDB) InsertOrUpdateBinding(ctx SQLContext, data uploadersdk.BindingData) error {
err := ctx.Table("BindingData").Clauses(clause.OnConflict{
func (db *UploadDataDB) GetPrivateBindings(ctx SQLContext, userID cdssdk.UserID, dataType string) ([]uploadersdk.Binding, error) {
var rows []uploadersdk.Binding
err := ctx.Table("BindingData").Where("user_id = ? and data_type = ?", userID, dataType).Find(&rows).Error
if err != nil {
return nil, err
}
return rows, nil
}
func (db *UploadDataDB) GetApplyBindings(ctx SQLContext, userID cdssdk.UserID, level string, dataType string) ([]uploadersdk.BindingAccessData, error) {
var ret []uploadersdk.BindingAccessData
sql := `
select
bd.id as ID,
bd.user_id,
bd.name,
bd.data_type,
bd.content,
ar.applicant_id,
ar.status
from
(
select
*
from
BindingData
where
access_level = ?
and data_type = ?
and user_id != ?
) as bd
left join (
select
data_owner_id,
binding_id,
applicant_id,
status
from
accessRequests
where applicant_id = ?
and status != ?
) as ar on
bd.user_id = ar.data_owner_id
and bd.id = ar.binding_id
`
err := ctx.Raw(sql, level, dataType, userID, userID, sch.ExpiredStatus).Scan(&ret).Error
if err != nil {
return ret, err
}
return ret, nil
}
func (db *UploadDataDB) GetPublicBindings(ctx SQLContext, level string, dataType string, userID cdssdk.UserID) ([]uploadersdk.BindingAccessData, error) {
var ret []uploadersdk.BindingAccessData
sql := `
select
bd.id as ID,
bd.user_id,
bd.name,
bd.data_type,
bd.content,
bd.access_level,
ar.applicant_id,
ar.status
from
(
select
*
from
BindingData
where
access_level = ?
and data_type = ?
and user_id not in (?)
) as bd
left join (
select
data_owner_id as user_id,
applicant_id,
status
from
accessRequests
) as ar on
bd.user_id = ar.user_id
`
err := ctx.Raw(sql, level, dataType, userID).Scan(&ret).Error
if err != nil {
return ret, err
}
return ret, nil
}
func (db *UploadDataDB) GetBindingsByID(ctx SQLContext, ID uploadersdk.DataID) (*uploadersdk.Binding, error) {
var rows uploadersdk.Binding
err := ctx.Table("BindingData").Where("id = ?", ID).Find(&rows).Error
if err != nil {
return nil, err
}
return &rows, nil
}
func (db *UploadDataDB) DeleteBindingsByID(ctx SQLContext, IDs []int64) error {
tx := ctx.Begin()
if err := tx.Table("package").Where("binding_id in ?", IDs).Update("binding_id", -1).Error; err != nil {
tx.Rollback()
return err
}
err := tx.Table("accessRequests").Where("binding_id in ? and status = ?", IDs, sch.ApprovedStatus).Update("status", sch.ExpiredStatus).Error
if err != nil {
tx.Rollback()
return err
}
if err := tx.Table("BindingData").Where("id in ?", IDs).Delete(&uploadersdk.Binding{}).Error; err != nil {
tx.Rollback()
return err
}
tx.Commit()
return nil
}
func (db *UploadDataDB) InsertOrUpdateBinding(ctx SQLContext, data uploadersdk.Binding) (*uploadersdk.DataID, error) {
// 根据name查询是否存在
var existData uploadersdk.Binding
err := ctx.Table("BindingData").Where("user_id = ? and name = ? and data_type = ?", data.UserID, data.Name, data.DataType).Find(&existData).Error
if err != nil {
return nil, err
}
// 如果存在,则报错
if existData.ID != 0 {
return nil, fmt.Errorf("the binding name is already exists")
}
err = ctx.Table("BindingData").Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ID"}}, // 指定冲突列
DoUpdates: clause.Assignments(map[string]interface{}{
"bindingName": data.BindingName,
//"bindingType": data.BindingType,
"name": data.Name,
"content": data.Content,
}),
}).Create(&data).Error
if err != nil {
return err
return nil, err
}
if data.ID == 0 {
return nil, fmt.Errorf("insert failed: ID is 0")
}
return &data.ID, nil
}
func (db *UploadDataDB) UpdateBindingAccess(ctx SQLContext, id uploadersdk.DataID, level string) error {
if err := ctx.Table("BindingData").Where("id = ?", id).Update("access_level", level).Error; err != nil {
return err
}
return nil
}
func (db *UploadDataDB) GetImageByID(ctx SQLContext, IDs []int64) ([]sch.ClusterImage, error) {
sql := `
select
bd.id as id,
bd.name as name,
ig.cluster_id as ClusterID,
ig.image_id as ClusterImageID,
ig.card_type as CardType
from
(
select
id,
name
from
BindingData
where
id in (?)
) as bd
left join (
select
id,
cluster_id,
image_id,
card_type
from
clusterImage
) as ig
on
bd.id = ig.id
`
var rows []sch.ClusterImage
err := ctx.Raw(sql, IDs).Scan(&rows).Error
//err := ctx.Table("clusterImage").Where("ID = ?", ID).Find(&rows).Error
if err != nil {
return nil, err
}
return rows, nil
}

View File

@ -311,13 +311,13 @@ func matchRegion(region string, regionPriority *pcmsch.RegionPriority) bool {
}
// 匹配卡类型选择
func matchChipType(resources []pcmsch.TmpResourceData, chipPriority *pcmsch.ChipPriority) bool {
func matchChipType(resources []pcmsch.ClusterResource, chipPriority *pcmsch.ChipPriority) bool {
if chipPriority == nil || len(chipPriority.Options) == 0 {
return true
}
for _, resource := range resources {
if contains(chipPriority.Options, string(resource.Type)) {
if contains(chipPriority.Options, string(resource.Resource.Type)) {
return true
}
}
@ -373,23 +373,27 @@ func matchFunction(functionType string, biasPriority *pcmsch.BiasPriority) bool
}
// 获取剩余资源(可以基于具体资源类型进行扩展)
func getRemainingResources(resources []pcmsch.TmpResourceData) float64 {
func getRemainingResources(resources []pcmsch.ClusterResource) float64 {
var totalAvailable float64
for _, resource := range resources {
switch resource.Type {
case pcmsch.ResourceTypeCPU:
totalAvailable += resource.Available.Value * CpuResourceWeight
switch resource.Resource.Type {
case pcmsch.ResourceTypeNPU:
totalAvailable += resource.Available.Value * CpuResourceWeight
totalAvailable += resource.Resource.Available.Value * CpuResourceWeight
case pcmsch.ResourceTypeGPU:
totalAvailable += resource.Available.Value * CpuResourceWeight
totalAvailable += resource.Resource.Available.Value * CpuResourceWeight
case pcmsch.ResourceTypeMLU:
totalAvailable += resource.Available.Value * CpuResourceWeight
case pcmsch.ResourceTypeStorage:
totalAvailable += float64(resource.Available.Value) * StgResourceWeight
case pcmsch.ResourceTypeMemory:
totalAvailable += float64(resource.Available.Value) * StgResourceWeight
totalAvailable += resource.Resource.Available.Value * CpuResourceWeight
}
for _, baseResource := range resource.BaseResources {
switch baseResource.Type {
case pcmsch.ResourceTypeCPU:
totalAvailable += baseResource.Available.Value * CpuResourceWeight
case pcmsch.ResourceTypeStorage:
totalAvailable += baseResource.Available.Value * StgResourceWeight
case pcmsch.ResourceTypeMemory:
totalAvailable += baseResource.Available.Value * StgResourceWeight
}
}
}

View File

@ -2,6 +2,8 @@ package task
import (
"fmt"
"strconv"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
@ -40,16 +42,18 @@ func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) {
}
defer schglb.CloudreamStoragePool.Release(stgCli)
resp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
path := strconv.FormatInt(int64(t.PackageID), 10) + "_" + strconv.FormatInt(time.Now().Unix(), 10)
_, err = stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
UserID: t.UserID,
PackageID: t.PackageID,
StorageID: t.StorageID,
RootPath: path,
})
if err != nil {
return "", err
}
return resp.PackagePath, nil
return path, nil
}
func init() {

File diff suppressed because it is too large Load Diff

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"path/filepath"
"strconv"
"time"
"gitlink.org.cn/cloudream/common/pkgs/future"
@ -147,16 +148,19 @@ func loadDatasetPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storag
}
defer schglb.CloudreamStoragePool.Release(stgCli)
loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
rootPath := "dataset_" + strconv.FormatInt(time.Now().Unix(), 10)
_, err = stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
UserID: userID,
PackageID: packageID,
StorageID: storageID,
RootPath: rootPath,
})
if err != nil {
return "", err
}
logger.Info("load pacakge path: " + loadPackageResp.FullPath)
return loadPackageResp.FullPath, nil
logger.Info("load pacakge path: " + rootPath)
return rootPath, nil
}
func (s *JobExecuting) submitNormalTask(rtx jobmgr.JobStateRunContext, cmd string, envs []schsdk.KVPair, ccInfo schmod.ComputingCenter, pcmImgInfo schmod.PCMImage, resourceID pcmsdk.ResourceID) error {

View File

@ -3,6 +3,8 @@ package state
import (
"context"
"fmt"
"strconv"
"time"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
@ -84,16 +86,19 @@ func (s *MultiInstanceUpdate) do(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job)
StorageID: ccInfo.CDSStorageID,
})
loadPackageResp, err := stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
rootPath := "model_update_" + strconv.FormatInt(time.Now().Unix(), 10)
_, err = stgCli.StorageLoadPackage(cdsapi.StorageLoadPackageReq{
UserID: userID,
PackageID: dtrJob.DataReturnPackageID,
StorageID: getStg.StorageID,
RootPath: rootPath,
})
if err != nil {
return fmt.Errorf("loading package: %w", err)
}
logger.Info("load pacakge path: " + loadPackageResp.FullPath)
fullPath = loadPackageResp.FullPath
logger.Info("load pacakge path: " + rootPath)
fullPath = rootPath
}
// 发送事件更新各个instance

View File

@ -0,0 +1,173 @@
package http
import (
"fmt"
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
"net/http"
)
type AccessService struct {
*Server
}
func (s *Server) AccessSvc() *AccessService {
return &AccessService{
Server: s,
}
}
type ApplyAccessReq struct {
PermissionApply sch.PermissionApply `json:"apply"`
}
// 访问申请
func (s *AccessService) ApplyRequestAccess(ctx *gin.Context) {
var req ApplyAccessReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
err := s.svc.AccessSvc().ApplyRequestAccess(req.PermissionApply)
if err != nil {
log.Warnf("get access requests: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("get access requests: %v", err)))
return
}
ctx.JSON(http.StatusOK, OK("ok"))
}
type ApprovalAccessReq struct {
PermissionApproval sch.PermissionApproval `json:"approval"`
}
func (s *AccessService) ApprovalRequestAccess(ctx *gin.Context) {
var req ApprovalAccessReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
err := s.svc.AccessSvc().ApprovalRequestAccess(req.PermissionApproval)
if err != nil {
log.Warnf("get access requests: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("get access requests: %v", err)))
return
}
ctx.JSON(http.StatusOK, OK("ok"))
}
type UpdateAccessRequestStatusReq struct {
UserID cdssdk.UserID `json:"userID"`
BindingID int64 `json:"bindingID"`
Status string `json:"status"`
}
// 更新访问申请状态
func (s *AccessService) UpdateAccessRequestStatus(ctx *gin.Context) {
var req UpdateAccessRequestStatusReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
err := s.svc.AccessSvc().UpdateAccessRequestStatus(req.UserID, req.BindingID, req.Status)
if err != nil {
log.Warnf("get access requests: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("get access requests: %v", err)))
return
}
ctx.JSON(http.StatusOK, OK("ok"))
}
type GetAccessRequestReq struct {
UserID cdssdk.UserID `json:"userID"`
DataType string `json:"dataType"`
}
type GetAccessRequestResp struct {
Datas []uploadersdk.BindingAccessData `json:"datas"`
}
// 获取访问申请列表
func (s *AccessService) GetAccessRequests(ctx *gin.Context) {
var req GetAccessRequestReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
datas, err := s.svc.AccessSvc().GetAccessRequests(req.UserID, req.DataType)
if err != nil {
log.Warnf("get access requests: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("get access requests: %v", err)))
return
}
ctx.JSON(http.StatusOK, OK(GetAccessRequestResp{
Datas: datas,
}))
}
type UpdateBindingDataAccessReq struct {
ID uploadersdk.DataID `json:"ID"`
Level string `json:"level"`
}
func (s *AccessService) UpdateBindingDataAccess(ctx *gin.Context) {
var req UpdateBindingDataAccessReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
err := s.svc.AccessSvc().UpdateBindingDataAccess(req.ID, req.Level)
if err != nil {
log.Warnf("get access requests: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("get access requests: %v", err)))
return
}
ctx.JSON(http.StatusOK, OK("ok"))
}
type QueryUserReq struct {
SsoID string `form:"ssoID"`
}
type QueryUserResp struct {
UserID cdssdk.UserID `json:"userID"`
}
func (s *AccessService) QueryUser(ctx *gin.Context) {
var req QueryUserReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}
userID, err := s.svc.AccessSvc().QueryUser(req.SsoID)
if err != nil {
log.Warnf("get access requests: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("get access requests: %v", err)))
return
}
ctx.JSON(http.StatusOK, OK(QueryUserResp{
UserID: userID,
}))
}

View File

@ -28,6 +28,11 @@ type JobSetSubmitResp struct {
FilesUploadScheme schsdk.JobSetFilesUploadScheme `json:"filesUploadScheme"`
}
type JobSetSubmitReq struct {
UserID cdssdk.UserID `json:"userID" binding:"required"`
JobSetInfo schsdk.JobSetInfo `json:"jobSetInfo" binding:"required"`
}
func (s *JobSetService) Submit(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.Submit")
@ -38,21 +43,23 @@ func (s *JobSetService) Submit(ctx *gin.Context) {
return
}
jobSetInfo, err := serder.JSONToObjectEx[schsdk.JobSetInfo](bodyData)
req, err := serder.JSONToObjectEx[JobSetSubmitReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
schScheme, uploadScheme, err := s.svc.JobSetSvc().PreScheduler(jobSetInfo)
blockChainToken := ctx.Request.Header.Get("Authorization")
schScheme, uploadScheme, err := s.svc.JobSetSvc().PreScheduler(req.JobSetInfo)
if err != nil {
log.Warnf("pre-scheduling jobset: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "pre-scheduling jobset failed"))
return
}
jobsetID, err := s.svc.JobSetSvc().Submit(jobSetInfo, schScheme)
jobsetID, err := s.svc.JobSetSvc().Submit(req.UserID, req.JobSetInfo, schScheme, blockChainToken)
if err != nil {
log.Warnf("submitting jobset: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "submit jobset failed"))
@ -71,8 +78,6 @@ type JobSetLocalFileUploadedReq struct {
Error string `json:"error"`
PackageID cdssdk.PackageID `json:"packageID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
//FolderID uploadersdk.FolderID `json:"folderID"`
//UploadedInfo []schmod.FileUploadedInfo `json:"uploadedInfo"`
}
func (s *JobSetService) LocalFileUploaded(ctx *gin.Context) {
@ -95,12 +100,12 @@ type UploadReq struct {
UploadParams sch.UploadParams `json:"uploadParams"`
}
type UploadResp struct {
JobSetID schsdk.JobSetID `json:"jobSetID"`
LocalPath string `json:"localPath"`
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
BucketID cdssdk.BucketID `json:"bucketID"`
}
//type UploadResp struct {
// JobSetID schsdk.JobSetID `json:"jobSetID"`
// LocalPath string `json:"localPath"`
// StorageIDs []cdssdk.StorageID `json:"storageIDs"`
// BucketID cdssdk.BucketID `json:"bucketID"`
//}
func (s *JobSetService) Upload(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.Upload")
@ -118,25 +123,16 @@ func (s *JobSetService) Upload(ctx *gin.Context) {
return
}
jobsetID, storages, err := s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams)
blockChainToken := ctx.Request.Header.Get("Authorization")
_, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken)
if err != nil {
log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "upload file failed, error: "+err.Error()))
return
}
switch info := req.UploadParams.UploadInfo.(type) {
case *sch.LocalUploadInfo:
ctx.JSON(http.StatusOK, OK(UploadResp{
JobSetID: *jobsetID,
LocalPath: info.LocalPath,
StorageIDs: *storages,
BucketID: 1,
}))
case *sch.RemoteUploadInfo:
ctx.JSON(http.StatusOK, OK("success"))
}
ctx.JSON(http.StatusOK, OK("success"))
}
@ -197,7 +193,7 @@ func (s *JobSetService) DeleteFile(ctx *gin.Context) {
if err != nil {
log.Warnf("creating folder: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create folder failed"))
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete file failed"))
return
}
@ -282,11 +278,9 @@ func (s *JobSetService) QueryUploaded(ctx *gin.Context) {
}
type BindingReq struct {
ID uploadersdk.DataID `json:"ID"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
BindingName string `json:"bindingName" binding:"required"`
BindingType string `json:"bindingType" binding:"required"`
PacakgeIDs []cdssdk.PackageID `json:"pacakgeIDs" binding:"required"`
ID uploadersdk.DataID `json:"ID"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
Info sch.DataBinding `json:"info" binding:"required"`
}
func (s *JobSetService) Binding(ctx *gin.Context) {
@ -305,13 +299,7 @@ func (s *JobSetService) Binding(ctx *gin.Context) {
return
}
params := uploadersdk.BindingData{
ID: req.ID,
UserID: req.UserID,
BindingName: req.BindingName,
BindingType: req.BindingType,
}
err = s.svc.JobSetSvc().DataBinding(params, req.PacakgeIDs)
err = s.svc.JobSetSvc().DataBinding(req.ID, req.UserID, req.Info)
if err != nil {
log.Warnf("getting service list: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "binding data failed, error: "+err.Error()))
@ -322,7 +310,8 @@ func (s *JobSetService) Binding(ctx *gin.Context) {
}
type RemoveBindingReq struct {
UploadDatas []uploadersdk.DataID `json:"uploadDatas"`
PackageIDs []cdssdk.PackageID `json:"packageIDs"`
BindingIDs []int64 `json:"bindingIDs"`
}
func (s *JobSetService) RemoveBinding(ctx *gin.Context) {
@ -334,14 +323,14 @@ func (s *JobSetService) RemoveBinding(ctx *gin.Context) {
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[BindingReq](bodyData)
req, err := serder.JSONToObjectEx[RemoveBindingReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
err = s.svc.JobSetSvc().RemoveBinding(req.PacakgeIDs)
err = s.svc.JobSetSvc().RemoveBinding(req.PackageIDs, req.BindingIDs)
if err != nil {
log.Warnf("getting service list: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "remove binding failed, error: "+err.Error()))
@ -351,10 +340,48 @@ func (s *JobSetService) RemoveBinding(ctx *gin.Context) {
ctx.JSON(http.StatusOK, OK("success"))
}
type QueryBindingReq struct {
DataType string `json:"dataType" binding:"required"`
Param sch.QueryBindingDataParam `json:"param" binding:"required"`
}
type QueryBindingResp struct {
Datas []uploadersdk.BindingDetail `json:"datas"`
}
func (s *JobSetService) QueryBinding(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.QueryBinding")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[QueryBindingReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
details, err := s.svc.JobSetSvc().QueryBinding(req.DataType, req.Param)
if err != nil {
log.Warnf("getting service list: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "binding data failed, error: "+err.Error()))
return
}
ctx.JSON(http.StatusOK, OK(QueryBindingResp{
Datas: details,
}))
}
type PackageCreate struct {
UserID cdssdk.UserID `json:"userID"`
Name string `json:"name"`
DataType string `json:"dataType"`
UserID cdssdk.UserID `json:"userID"`
Name string `json:"name"`
DataType string `json:"dataType"`
UploadPriority sch.UploadPriority `json:"uploadPriority"`
}
func (s *JobSetService) CreatePackage(ctx *gin.Context) {
@ -373,7 +400,7 @@ func (s *JobSetService) CreatePackage(ctx *gin.Context) {
return
}
err = s.svc.JobSetSvc().CreatePackage(req.UserID, req.Name, req.DataType)
err = s.svc.JobSetSvc().CreatePackage(req.UserID, req.Name, req.DataType, req.UploadPriority)
if err != nil {
log.Warnf("creating folder: %s", err.Error())
@ -415,3 +442,157 @@ func (s *JobSetService) DeletePackage(ctx *gin.Context) {
ctx.JSON(http.StatusOK, OK("success"))
}
type QueryResourceReq struct {
QueryResource sch.ResourceRange `json:"queryResource"`
}
type QueryResourceResp struct {
Resource []sch.ClusterDetail `json:"resource"`
}
func (s *JobSetService) QueryResource(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.CreateFolder")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[QueryResourceReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
resource, err := s.svc.JobSetSvc().QueryResource(req.QueryResource)
if err != nil {
log.Warnf("creating folder: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error()))
return
}
ctx.JSON(http.StatusOK, OK(QueryResourceResp{
Resource: resource,
}))
}
type QueryResourceRangeReq struct {
UserID cdssdk.UserID `json:"userID" binding:"required"`
}
type QueryResourceRangeResp struct {
ResourceRanges []sch.ResourceRange `json:"resourceRanges"`
}
func (s *JobSetService) ResourceRange(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.CreateFolder")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[QueryResourceRangeReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
println(req.UserID)
resource, err := s.svc.JobSetSvc().ResourceRange()
if err != nil {
log.Warnf("creating folder: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error()))
return
}
ctx.JSON(http.StatusOK, OK(QueryResourceRangeResp{
ResourceRanges: resource,
}))
}
type QueryImagesReq struct {
IDs []int64 `json:"ids" binding:"required"`
}
type QueryImagesResp struct {
Images []sch.ClusterImage `json:"images"`
}
func (s *JobSetService) QueryImages(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.CreateFolder")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[QueryImagesReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
images, err := s.svc.JobSetSvc().QueryImages(req.IDs)
if err != nil {
log.Warnf("creating folder: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error()))
return
}
ctx.JSON(http.StatusOK, OK(QueryImagesResp{
Images: images,
}))
}
type UpdateCodeReq struct {
UserID cdssdk.UserID `json:"userID" binding:"required"`
BucketID cdssdk.BucketID `json:"bucketID" binding:"required"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
PackageName string `json:"packageName" binding:"required"`
//Version int64 `json:"version" binding:"required"`
}
type UpdateCodeResp struct {
Package cdssdk.Package `json:"newPackage"`
}
func (s *JobSetService) UpdateCode(ctx *gin.Context) {
log := logger.WithField("HTTP", "JobSet.CreateFolder")
bodyData, err := io.ReadAll(ctx.Request.Body)
if err != nil {
log.Warnf("reading request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "read request body failed"))
return
}
req, err := serder.JSONToObjectEx[UpdateCodeReq](bodyData)
if err != nil {
log.Warnf("parsing request body: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
return
}
pkg, err := s.svc.JobSetSvc().UpdateCode(req.UserID, req.BucketID, req.PackageID, req.PackageName)
if err != nil {
log.Warnf("creating folder: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error()))
return
}
ctx.JSON(http.StatusOK, OK(UpdateCodeResp{
Package: *pkg,
}))
}

View File

@ -38,7 +38,7 @@ func (s *Server) Serve() error {
}
func (s *Server) initRouters() {
s.engine.POST("/jobSet/upload", s.JobSetSvc().Upload)
s.engine.POST("/jobSet/notifyUploaded", s.JobSetSvc().Upload)
s.engine.POST("/jobSet/submit", s.JobSetSvc().Submit)
s.engine.POST("/jobSet/localFileUploaded", s.JobSetSvc().LocalFileUploaded)
s.engine.POST("/jobSet/queryUploaded", s.JobSetSvc().QueryUploaded)
@ -46,10 +46,24 @@ func (s *Server) initRouters() {
s.engine.POST("/jobSet/createPackage", s.JobSetSvc().CreatePackage)
s.engine.POST("/jobSet/deletePackage", s.JobSetSvc().DeletePackage)
s.engine.POST("/jobSet/resourceRange", s.JobSetSvc().ResourceRange)
s.engine.POST("/jobSet/queryResource", s.JobSetSvc().QueryResource)
s.engine.POST("/jobSet/queryImages", s.JobSetSvc().QueryImages)
s.engine.POST("/jobSet/updateCode", s.JobSetSvc().UpdateCode)
s.engine.POST("/jobSet/createFolder", s.JobSetSvc().CreateFolder)
s.engine.POST("/jobSet/deleteFolder", s.JobSetSvc().DeleteFolder)
s.engine.POST("/jobSet/deleteFile", s.JobSetSvc().DeleteFile)
s.engine.POST("/jobSet/binding", s.JobSetSvc().Binding)
s.engine.POST("/jobSet/queryBinding", s.JobSetSvc().QueryBinding)
s.engine.POST("/jobSet/removeBinding", s.JobSetSvc().RemoveBinding)
s.engine.POST("/access/applyRequestAccess", s.AccessSvc().ApplyRequestAccess)
s.engine.POST("/access/approvalRequestAccess", s.AccessSvc().ApprovalRequestAccess)
s.engine.POST("/access/updateAccessRequestStatus", s.AccessSvc().UpdateAccessRequestStatus)
s.engine.POST("/access/getAccessRequests", s.AccessSvc().GetAccessRequests)
s.engine.POST("/access/updateBindingDataAccess", s.AccessSvc().UpdateBindingDataAccess)
s.engine.GET("/access/queryUser", s.AccessSvc().QueryUser)
}

View File

@ -1,24 +1,35 @@
package state2
import (
"encoding/json"
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/sdks/blockchain"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr"
"strconv"
"time"
)
type DataSchedule struct {
taskID sch.TaskID
scheduleData []sch.ScheduleData
userID cdssdk.UserID
taskID sch.TaskID
scheduleData []sch.ScheduleData
blockChainToken string
}
func NewDataSchedule(taskID sch.TaskID, scheduleData []sch.ScheduleData) *DataSchedule {
func NewDataSchedule(userID cdssdk.UserID, taskID sch.TaskID, scheduleData []sch.ScheduleData, blockChainToken string) *DataSchedule {
return &DataSchedule{
taskID: taskID,
scheduleData: scheduleData,
userID: userID,
taskID: taskID,
scheduleData: scheduleData,
blockChainToken: blockChainToken,
}
}
@ -48,50 +59,163 @@ func (s *DataSchedule) do(rtx jobmgr.JobStateRunContext) ([]sch.DataScheduleResu
var results []sch.DataScheduleResults
for _, data := range s.scheduleData {
var clusters []uploadersdk.Cluster
//var clusters []uploadersdk.Cluster
var errResults []sch.DataScheduleResult
// 根据clusterID获取JCS的storageID
for _, id := range data.ClusterIDs {
storageID, ok := clusterMapping[id]
if !ok {
errResults = append(errResults, sch.DataScheduleResult{
Clusters: sch.DataDetail{
// 测试,后续需要删除
data.StorageType = "jcs"
var scheduleTarget uploadersdk.ScheduleTarget
switch data.StorageType {
case sch.StorageTypeJCS:
var scheduleStorages []uploadersdk.ScheduleStorage
for _, id := range data.ClusterIDs {
storageID, ok := clusterMapping[id]
if !ok {
detail := sch.DataDetail{
ClusterID: id,
},
Msg: "cluster not found",
Status: false,
}
errResults = append(errResults, sch.DataScheduleResult{
Clusters: []sch.DataDetail{
detail,
},
Msg: "cluster not found",
Status: false,
})
logger.Error(fmt.Errorf("cluster %d not found", id))
continue
}
rootPath := strconv.FormatInt(int64(s.userID), 10) + "_" + strconv.FormatInt(int64(s.taskID), 10)
scheduleStorages = append(scheduleStorages, uploadersdk.ScheduleStorage{
StorageID: storageID,
RootPath: rootPath,
})
logger.Error(fmt.Errorf("cluster %d not found", id))
continue
}
clusters = append(clusters, uploadersdk.Cluster{
ClusterID: id,
StorageID: storageID,
})
scheduleTarget = &uploadersdk.JCSScheduleTarget{
UserID: s.userID,
ScheduleStorages: scheduleStorages,
}
case sch.StorageTypeURL:
var scheduleUrls []uploadersdk.ScheduleUrl
for _, id := range data.ClusterIDs {
scheduleUrls = append(scheduleUrls, uploadersdk.ScheduleUrl{
ClusterID: uploadersdk.ClusterID(id),
JsonData: "",
})
}
scheduleTarget = &uploadersdk.UrlScheduleTarget{
ScheduleUrls: scheduleUrls,
}
}
// 发送调度请求
req := uploadersdk.DataScheduleReq{
Clusters: clusters,
PackageID: data.PackageID,
//StorageType: data.StorageType,
PackageID: data.PackageID,
DataType: data.DataType,
ScheduleTarget: scheduleTarget,
}
scheduleResult, err := uploaderCli.DataSchedule(req)
if err != nil {
return nil, fmt.Errorf("schedule data: %w", err)
}
if len(errResults) > 0 {
scheduleResult.Results = append(scheduleResult.Results, errResults...)
scheduleResult = append(scheduleResult, errResults...)
}
results = append(results, sch.DataScheduleResults{
DataType: data.DataType,
Results: scheduleResult.Results,
Results: scheduleResult,
})
// 写入到存证服务
blockChains, err := s.blockChain(scheduleResult)
if err != nil {
return nil, fmt.Errorf("blockchain: %w", err)
}
if blockChains != nil && len(blockChains) > 0 {
err = rtx.Mgr.DB.UploadData().InsertBlockchains(rtx.Mgr.DB.DefCtx(), blockChains)
if err != nil {
return nil, fmt.Errorf("insert blockchains: %w", err)
}
}
}
return results, nil
}
func (s *DataSchedule) blockChain(results []sch.DataScheduleResult) ([]*uploadersdk.BlockChain, error) {
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return nil, fmt.Errorf("new scheduler client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
bcCli, err := schglb.BlockChainPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new blockchain client: %w", err)
}
defer schglb.BlockChainPool.Release(bcCli)
var blockChains []*uploadersdk.BlockChain
for _, res := range results {
objects, err := cdsCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{
UserID: s.userID,
PackageID: res.PackageID,
})
if err != nil {
return nil, fmt.Errorf("get package objects: %w", err)
}
for _, obj := range objects.Objects {
now := time.Now()
timestamp := now.UnixNano() / int64(time.Millisecond)
chainID := strconv.FormatInt(int64(obj.ObjectID), 10) + "_" + strconv.FormatInt(timestamp, 10)
formattedTime := now.Format("2006-01-02 15:04:05")
//paths := strings.Split(obj.Path, "/")
//fileName := paths[len(paths)-1]
// 去掉hash前四个字符
fileHash := obj.FileHash[4:]
var args = make(map[string]string)
args["userID"] = strconv.FormatInt(int64(s.userID), 10)
//args["fileName"] = fileName
args["fileHash"] = string(fileHash)
//args["fileSize"] = strconv.FormatInt(obj.Size, 10)
args["objectID"] = strconv.FormatInt(int64(obj.ObjectID), 10)
if len(res.Clusters) > 0 {
args["targetCluster"] = string(res.Clusters[0].ClusterID)
}
args["createTime"] = formattedTime
// 将map转换成json字符串
argsJson, _ := json.Marshal(args)
argsArr := []string{chainID, string(argsJson)}
req := blockchain.InvokeReq{
ContractAddress: schglb.BlockChainConfig.ContractAddress,
FunctionName: schglb.BlockChainConfig.FunctionName,
//MemberName: schglb.BlockChainConfig.MemberName,
Type: schmod.BlockChain_Schedule,
Args: argsArr,
}
err = bcCli.BlockChainInvoke(req, s.blockChainToken)
if err != nil {
return nil, fmt.Errorf("invoke blockchain: %w", err)
}
blockChains = append(blockChains, &uploadersdk.BlockChain{
ObjectID: obj.ObjectID,
BlockChainID: chainID,
BlockChainType: schmod.Schedule,
})
}
}
return blockChains, nil
}
func (s *DataSchedule) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {
return &jobmod.NormalJobReadyToExecuteDump{}
}

View File

@ -1,7 +1,6 @@
package state2
import (
"context"
"encoding/json"
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger"
@ -11,9 +10,9 @@ import (
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/event"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state"
"strconv"
"strings"
@ -22,19 +21,21 @@ import (
)
type DataUpload struct {
userID cdssdk.UserID
uploadInfo sch.UploadInfo
dataType string
storages []cdssdk.StorageID
lock sync.Mutex
userID cdssdk.UserID
uploadInfo sch.UploadInfo
dataType string
blockChainToken string
//storages []cdssdk.StorageID
lock sync.Mutex
}
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, storages []cdssdk.StorageID) *DataUpload {
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string) *DataUpload {
return &DataUpload{
userID: userID,
uploadInfo: uploadInfo,
dataType: dataType,
storages: storages,
userID: userID,
uploadInfo: uploadInfo,
dataType: dataType,
blockChainToken: blockChainToken,
//storages: storages,
}
}
@ -52,43 +53,13 @@ func (s *DataUpload) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
}
func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 获取集群信息
var clusters []*uploadersdk.Cluster
for _, id := range s.storages {
clusters = append(clusters, &uploadersdk.Cluster{
StorageID: id,
})
}
var objectIDs []cdssdk.ObjectID
// 存证信息
//var fileInfos []schmod.FileUploadedInfo
//var folderID uploadersdk.FolderID
switch info := s.uploadInfo.(type) {
// 通过本地上传
case *sch.LocalUploadInfo:
// 等待上传完成
// TODO 需要设置超时机制
evt, ok := event.WaitTypeAnd[*event.LocalFileUploaded](ctx, rtx.EventSet, func(e *event.LocalFileUploaded) bool {
return e.LocalPath == info.LocalPath
})
if !ok {
return fmt.Errorf("local file %s not uploaded", info.LocalPath)
}
if evt.Error.Error() != "" {
return evt.Error
}
//packageData.PackageID = evt.PackageID
objectIDs = evt.ObjectIDs
//packageData.Name = info.LocalPath
//folderID = evt.FolderID
//fileInfos = evt.UploadedInfo
objectIDs = info.ObjectIDs
// 通过URL上传
case *sch.RemoteUploadInfo:
@ -98,17 +69,17 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
}
defer schglb.UploaderPool.Release(uploaderCli)
var targetClusters []uploadersdk.ClusterID
for _, id := range info.TargetClusters {
targetClusters = append(targetClusters, uploadersdk.ClusterID(id))
}
req := uploadersdk.UploadReq{
Type: s.dataType,
DataType: s.dataType,
Source: &uploadersdk.UrlSource{
Url: info.Url,
},
Target: &uploadersdk.UrlTarget{
Clusters: targetClusters,
ClusterID: uploadersdk.ClusterID(info.Cluster),
JCSUploadInfo: cdsapi.ObjectUploadInfo{
UserID: s.userID,
PackageID: info.PackageID,
},
},
}
uploadResp, err := uploaderCli.Upload(req)
@ -127,13 +98,6 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
}
// 将上传结果写入数据库
//packageData.UserID = 1
//dataID, err := rtx.Mgr.DB.UploadData().InsertPackage(rtx.Mgr.DB.DefCtx(), packageData, clusters, folderID)
//if err != nil {
// return fmt.Errorf("insert upload data fail: %w", err)
//}
// 传入存证
blockChains, err := s.blockChain(objectIDs)
if err != nil {
@ -155,7 +119,18 @@ func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.Blo
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
objects, err := cdsCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{})
//objects, err := cdsCli.Object().GetPackageObjects(cdsapi.ObjectGetPackageObjects{})
objects, err := cdsCli.Object().ListByIDs(cdsapi.ObjectListByIDs{
ObjectIDs: objectIDs,
UserID: s.userID,
})
if err != nil {
logger.Error(fmt.Errorf("list objects: %w", err))
return nil, fmt.Errorf("list objects: %w", err)
}
if objects.Objects == nil || len(objects.Objects) == 0 {
return nil, fmt.Errorf("objects is nil")
}
bcCli, err := schglb.BlockChainPool.Acquire()
if err != nil {
@ -166,10 +141,14 @@ func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.Blo
var blockChains []*uploadersdk.BlockChain
for _, obj := range objects.Objects {
if obj == nil {
logger.Warnf("object is nil")
continue
}
now := time.Now()
timestamp := now.UnixNano() / int64(time.Millisecond)
fileNo := strconv.FormatInt(int64(obj.ObjectID), 10) + "_" + strconv.FormatInt(timestamp, 10)
chainID := strconv.FormatInt(int64(obj.ObjectID), 10) + "_" + strconv.FormatInt(timestamp, 10)
formattedTime := now.Format("2006-01-02 15:04:05")
paths := strings.Split(obj.Path, "/")
fileName := paths[len(paths)-1]
@ -182,34 +161,39 @@ func (s *DataUpload) blockChain(objectIDs []cdssdk.ObjectID) ([]*uploadersdk.Blo
args["fileName"] = fileName
args["fileHash"] = string(fileHash)
args["fileSize"] = strconv.FormatInt(obj.Size, 10)
args["fileNo"] = fileNo
args["objectID"] = strconv.FormatInt(int64(obj.ObjectID), 10)
args["createTime"] = formattedTime
// 将map转换成json字符串
argsJson, _ := json.Marshal(args)
argsArr := []string{fileNo, string(argsJson)}
argsArr := []string{chainID, string(argsJson)}
req := blockchain.InvokeReq{
ContractAddress: schglb.BlockChainConfig.ContractAddress,
FunctionName: schglb.BlockChainConfig.FunctionName,
MemberName: schglb.BlockChainConfig.MemberName,
Type: schglb.BlockChainConfig.Type,
Args: argsArr,
//MemberName: schglb.BlockChainConfig.MemberName,
Type: schmod.BlockChain_Upload,
Args: argsArr,
}
err = bcCli.BlockChainInvoke(req)
err = bcCli.BlockChainInvoke(req, s.blockChainToken)
if err != nil {
return nil, fmt.Errorf("invoke blockchain: %w", err)
}
blockChains = append(blockChains, &uploadersdk.BlockChain{
ObjectID: obj.ObjectID,
BlockChainID: fileNo,
ObjectID: obj.ObjectID,
BlockChainID: chainID,
BlockChainType: schmod.Write,
//FileHash: string(fileHash),
//FileName: fileName,
//FileSize: obj.Size,
})
}
if blockChains == nil {
return nil, fmt.Errorf("blockchains is nil")
}
return blockChains, nil
}

View File

@ -13,12 +13,16 @@ import (
)
type PCMJobCreate struct {
jobInfo *schsdk.PCMJobInfo
jobInfo *schsdk.PCMJobInfo
blockChainToken string
userID cdssdk.UserID
}
func NewPCMJobCreate(info *schsdk.PCMJobInfo) *PCMJobCreate {
func NewPCMJobCreate(userID cdssdk.UserID, info *schsdk.PCMJobInfo, blockChainToken string) *PCMJobCreate {
return &PCMJobCreate{
jobInfo: info,
userID: userID,
jobInfo: info,
blockChainToken: blockChainToken,
}
}
@ -34,7 +38,7 @@ func (s *PCMJobCreate) Run(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) {
logger.Error(err.Error())
rtx.Mgr.ChangeState(jo, state.FailureComplete(err))
} else {
rtx.Mgr.ChangeState(jo, NewDataSchedule(scheduleData.TaskID, scheduleData.ScheduleDatas))
rtx.Mgr.ChangeState(jo, NewDataSchedule(s.userID, scheduleData.TaskID, scheduleData.ScheduleDatas, s.blockChainToken))
}
}
@ -49,12 +53,12 @@ func (s *PCMJobCreate) do(rtx jobmgr.JobStateRunContext) (*sch.CreateJobResp, er
// 获取所有packageID
var packages []cdssdk.PackageID
var bindingIDs []int64
collectDataID(s.jobInfo.Files.Code, packages, bindingIDs)
collectDataID(s.jobInfo.Files.Dataset, packages, bindingIDs)
collectDataID(s.jobInfo.Files.Image, packages, bindingIDs)
collectDataID(s.jobInfo.Files.Model, packages, bindingIDs)
packages, bindingIDs = collectDataID(s.jobInfo.Files.Code, packages, bindingIDs)
packages, bindingIDs = collectDataID(s.jobInfo.Files.Dataset, packages, bindingIDs)
packages, bindingIDs = collectDataID(s.jobInfo.Files.Image, packages, bindingIDs)
packages, bindingIDs = collectDataID(s.jobInfo.Files.Model, packages, bindingIDs)
if (len(packages) & len(bindingIDs)) == 0 {
if len(packages) == 0 && len(bindingIDs) == 0 {
return nil, fmt.Errorf("no packageID")
}
@ -83,26 +87,32 @@ func (s *PCMJobCreate) do(rtx jobmgr.JobStateRunContext) (*sch.CreateJobResp, er
dataDistribute.Code = append(dataDistribute.Code, sch.CodeDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
})
case sch.DATASET:
dataDistribute.Dataset = append(dataDistribute.Dataset, sch.DatasetDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
})
case sch.MODEL:
dataDistribute.Model = append(dataDistribute.Model, sch.ModelDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
})
case sch.IMAGE:
dataDistribute.Image = append(dataDistribute.Image, sch.ImageDistribute{
Clusters: clusters,
PackageID: data.PackageID,
DataName: "test",
})
}
}
req := sch.CreateJobReq{
//Name: s.jobInfo.Name,
//Description: s.jobInfo.Description,
DataDistribute: dataDistribute,
JobResources: s.jobInfo.JobResources,
}
@ -115,13 +125,16 @@ func (s *PCMJobCreate) do(rtx jobmgr.JobStateRunContext) (*sch.CreateJobResp, er
return resp, nil
}
func collectDataID(fileInfo schsdk.JobFileInfo, packageIDs []cdssdk.PackageID, bindingIDs []int64) {
func collectDataID(fileInfo schsdk.JobFileInfo, packageIDs []cdssdk.PackageID, bindingIDs []int64) ([]cdssdk.PackageID, []int64) {
switch info := fileInfo.(type) {
case *schsdk.PackageJobFileInfo:
packageIDs = append(packageIDs, info.PackageID)
case *schsdk.BindingJobFileInfo:
bindingIDs = append(bindingIDs, info.BindingID)
case *schsdk.ImageJobFileInfo:
packageIDs = append(packageIDs, cdssdk.PackageID(info.ImageID))
}
return packageIDs, bindingIDs
}
func (s *PCMJobCreate) Dump(rtx jobmgr.JobStateRunContext, jo *jobmgr.Job) jobmod.JobStateDump {

View File

@ -0,0 +1,135 @@
package services
import (
"fmt"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
"golang.org/x/exp/slices"
"time"
)
type AccessService struct {
*Service
}
func (svc *Service) AccessSvc() *AccessService {
return &AccessService{Service: svc}
}
func (svc *AccessService) LogAccess(log sch.AccessLog) error {
err := svc.db.Access().CreateAccessLog(svc.db.DefCtx(), log)
return err
}
func (svc *AccessService) GetAccessRequests(userID cdssdk.UserID, dataType string) ([]uploadersdk.BindingAccessData, error) {
ar, err := svc.db.Access().GetAccessRequest(svc.db.DefCtx(), userID, dataType)
if err != nil {
return nil, err
}
return ar, nil
}
func (svc *AccessService) UpdateAccessRequestStatus(userID cdssdk.UserID, bindingID int64, status string) error {
allowStatus := []string{sch.RevokedStatus, sch.CancelStatus, sch.ExpiredStatus}
if !slices.Contains(allowStatus, status) {
return fmt.Errorf("operation not allowed")
}
var stat []string
switch status {
case sch.RevokedStatus:
stat = append(stat, sch.PendingStatus)
case sch.CancelStatus:
stat = append(stat, sch.ApprovedStatus)
}
if len(stat) == 0 {
return fmt.Errorf("operation not allowed")
}
data, err := svc.db.Access().GetAccessRequestByBindingID(svc.db.DefCtx(), -1, userID, bindingID, stat)
if err != nil {
return err
}
if data.ID == 0 {
return fmt.Errorf("the apply is not exists")
}
err = svc.db.Access().UpdateStatusByID(svc.db.DefCtx(), int64(data.ID), status, "")
if err != nil {
return err
}
return nil
}
func (svc *AccessService) ApplyRequestAccess(permissionApply sch.PermissionApply) error {
// 判断是否已经存在,存在则报错
status := []string{sch.ExpiredStatus}
if _, err := svc.db.Access().GetAccessRequestByBindingID(svc.db.DefCtx(), permissionApply.ApplicantID, -1, permissionApply.BindingID, status); err == nil {
return fmt.Errorf("the apply is already exists")
}
permissionApply.CreatedAt = time.Now()
permissionApply.Status = sch.PendingStatus
err := svc.db.Access().CreateAccessRequest(svc.db.DefCtx(), permissionApply)
if err != nil {
return err
}
err = svc.LogAccess(sch.AccessLog{
AccessType: "apply",
OwnerID: permissionApply.OwnerID,
ApplicantID: permissionApply.ApplicantID,
BindingID: permissionApply.BindingID,
AccessTime: time.Now(),
})
if err != nil {
return err
}
return nil
}
func (svc *AccessService) ApprovalRequestAccess(permissionApproval sch.PermissionApproval) error {
status := []string{sch.ApprovedStatus, sch.RejectedStatus, sch.ExpiredStatus}
if !slices.Contains(status, permissionApproval.Status) {
return fmt.Errorf("operation not allowed")
}
err := svc.db.Access().UpdateStatusByID(svc.db.DefCtx(), permissionApproval.ID, permissionApproval.Status, permissionApproval.Reason)
if err != nil {
return err
}
return nil
}
// 修改数据的访问级别
func (svc *AccessService) UpdateBindingDataAccess(id uploadersdk.DataID, level string) error {
data, err := svc.db.UploadData().GetBindingsByID(svc.db.DefCtx(), id)
if err != nil {
return err
}
// 判断数据是否为空
if data == nil {
return fmt.Errorf("the data is not exists")
}
// 如果数据的访问级别为申请访问且修改为私有访问,则将所有申请访问的申请状态修改为已过期
if data.AccessLevel == sch.ApplyAccess && level == sch.PrivateAccess {
err = svc.db.Access().UpdateStatusByBindingID(svc.db.DefCtx(), id, sch.ApprovedStatus, sch.ExpiredStatus)
if err != nil {
return err
}
}
err = svc.db.UploadData().UpdateBindingAccess(svc.db.DefCtx(), id, level)
if err != nil {
return err
}
return nil
}
func (svc *AccessService) QueryUser(id string) (cdssdk.UserID, error) {
userID, err := svc.db.Access().GetUserBySsoID(svc.db.DefCtx(), id)
if err != nil {
return userID, err
}
return userID, err
}

View File

@ -1,6 +1,7 @@
package services
import (
"encoding/json"
"errors"
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/logger"
@ -15,6 +16,8 @@ import (
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state2"
"sort"
"strings"
"time"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@ -43,7 +46,7 @@ func (svc *JobSetService) PreScheduler(jobSet schsdk.JobSetInfo) (*jobmod.JobSet
return schScheme, uploadScheme, nil
}
func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams) (*schsdk.JobSetID, *[]cdssdk.StorageID, error) {
func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, blockChainToken string) (*schsdk.JobSetID, error) {
logger.Debugf("uploading job")
// 查询数据库里维护的集群
@ -54,56 +57,56 @@ func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams)
//}
// 获取集群与存储的对应关系
clusterMapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
if err != nil {
return nil, nil, fmt.Errorf("query cluster mapping error: %w", err)
}
var storages []cdssdk.StorageID
switch uploadPriority := params.UploadPriority.(type) {
case *sch.Preferences:
// 进行预调度
clusterID, err := svc.preScheduler.ScheduleJob(uploadPriority.ResourcePriorities, clusterMapping)
if err != nil {
return nil, nil, fmt.Errorf("pre scheduling: %w", err)
}
storageID, ok := clusterMapping[*clusterID]
if !ok {
return nil, nil, fmt.Errorf("cluster %d not found", clusterID)
}
storages = append(storages, storageID)
case *sch.SpecifyCluster:
// 指定集群
for _, clusterID := range uploadPriority.Clusters {
storageID, ok := clusterMapping[clusterID]
if !ok {
logger.Warnf("cluster %d not found", clusterID)
continue
}
storages = append(storages, storageID)
}
}
if len(storages) == 0 {
return nil, nil, errors.New("no storage is available")
}
//clusterMapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
//if err != nil {
// return nil, nil, fmt.Errorf("query cluster mapping error: %w", err)
//}
//
//var storages []cdssdk.StorageID
//switch uploadPriority := params.UploadPriority.(type) {
//case *sch.Preferences:
// // 进行预调度
// clusterID, err := svc.preScheduler.ScheduleJob(uploadPriority.ResourcePriorities, clusterMapping)
// if err != nil {
// return nil, nil, fmt.Errorf("pre scheduling: %w", err)
// }
//
// storageID, ok := clusterMapping[*clusterID]
// if !ok {
// return nil, nil, fmt.Errorf("cluster %d not found", clusterID)
// }
//
// storages = append(storages, storageID)
//case *sch.SpecifyCluster:
// // 指定集群
// for _, clusterID := range uploadPriority.Clusters {
// storageID, ok := clusterMapping[clusterID]
// if !ok {
// logger.Warnf("cluster %d not found", clusterID)
// continue
// }
// storages = append(storages, storageID)
// }
//}
//
//if len(storages) == 0 {
// return nil, nil, errors.New("no storage is available")
//}
var jobs []jobmgr.SubmittingJob
jo := job.NewNormalJob(schsdk.NormalJobInfo{})
jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo,
InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, storages),
InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken),
})
jobSetID := svc.jobMgr.SubmitJobSet(jobs)
return &jobSetID, &storages, nil
return &jobSetID, nil
}
// Submit 提交任务集
func (svc *JobSetService) Submit(jobSet schsdk.JobSetInfo, schScheme *jobmod.JobSetPreScheduleScheme) (*schsdk.JobSetID, error) {
func (svc *JobSetService) Submit(userID cdssdk.UserID, jobSet schsdk.JobSetInfo, schScheme *jobmod.JobSetPreScheduleScheme, token string) (*schsdk.JobSetID, error) {
logger.Debugf("submitting job")
var jobs []jobmgr.SubmittingJob
@ -115,7 +118,7 @@ func (svc *JobSetService) Submit(jobSet schsdk.JobSetInfo, schScheme *jobmod.Job
jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo,
//InitState: state.NewPreSchuduling(preSch),
InitState: state2.NewPCMJobCreate(info),
InitState: state2.NewPCMJobCreate(userID, info, token),
})
case *schsdk.NormalJobInfo:
@ -265,7 +268,7 @@ func (svc *JobSetService) DeleteFolder(userID cdssdk.UserID, packageID cdssdk.Pa
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
list, err := cdsCli.Object().List(cdsapi.ObjectList{
list, err := cdsCli.Object().ListByPath(cdsapi.ObjectListByPath{
UserID: userID,
PackageID: packageID,
Path: path,
@ -314,31 +317,62 @@ func (svc *JobSetService) QueryUploaded(queryParams sch.QueryData) ([]uploadersd
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
queryListReq := cdsapi.ObjectList{
queryListReq := cdsapi.ObjectListByPath{
UserID: queryParams.UserID,
PackageID: queryParams.PackageID,
Path: queryParams.Path,
IsPrefix: true,
}
objList, err := cdsCli.Object().List(queryListReq)
objList, err := cdsCli.Object().ListByPath(queryListReq)
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to query uploaded data: %w", err)
}
folderMap := make(map[string]cdssdk.Object)
var modifyObjs []cdssdk.Object
for _, obj := range objList.Objects {
// 去掉obj中path从0到queryParams.Path这段字符串
obj.Path = strings.TrimPrefix(obj.Path, queryParams.Path)
pathArr := strings.Split(obj.Path, "/")
if len(pathArr) > 2 {
splitPath := "/" + pathArr[1]
folderMap[splitPath] = cdssdk.Object{
ObjectID: -1,
PackageID: obj.PackageID,
Path: pathArr[1],
Size: 0,
CreateTime: obj.CreateTime,
}
continue
}
modifyObjs = append(modifyObjs, obj)
}
folders, err := svc.db.UploadData().QueryFolder(svc.db.DefCtx(), queryParams)
if err != nil {
return nil, 0, 0, fmt.Errorf("failed to query uploaded data: %w", err)
}
for _, folder := range folders {
f := cdssdk.Object{
folder.Path = strings.TrimPrefix(folder.Path, queryParams.Path)
if folder.Path == "" {
continue
}
folderMap[folder.Path] = cdssdk.Object{
ObjectID: -1,
PackageID: folder.PackageID,
Path: folder.Path,
Size: 0,
CreateTime: folder.CreateTime,
}
objList.Objects = append(objList.Objects, f)
}
// 遍历folderMap将folderMap的值赋给objList.Objects
for _, obj := range folderMap {
modifyObjs = append(modifyObjs, obj)
}
objList.Objects = modifyObjs
// 根据orderBy字段排序
sort.Slice(objList.Objects, func(i, j int) bool {
if queryParams.OrderBy == sch.OrderByName {
@ -357,7 +391,7 @@ func (svc *JobSetService) QueryUploaded(queryParams sch.QueryData) ([]uploadersd
if queryParams.PageSize > 0 {
start := (queryParams.CurrentPage - 1) * queryParams.PageSize
end := start + queryParams.PageSize
if start >= totalNum {
if start > totalNum {
return nil, 0, 0, nil
}
if end > totalNum {
@ -374,10 +408,13 @@ func (svc *JobSetService) QueryUploaded(queryParams sch.QueryData) ([]uploadersd
}
pkg := uploadersdk.Package{
PackageID: data.PackageID,
BucketID: data.BucketID,
DataType: data.DataType,
PackageName: data.PackageName,
JsonData: data.JsonData,
BindingID: data.BindingID,
UserID: data.UserID,
CreateTime: data.CreateTime,
Objects: objList.Objects,
UploadedCluster: data.UploadedCluster,
}
@ -386,15 +423,98 @@ func (svc *JobSetService) QueryUploaded(queryParams sch.QueryData) ([]uploadersd
return datas, totalPages, totalNum, nil
}
func (svc *JobSetService) DataBinding(bindingData uploadersdk.BindingData, pacakgeIDs []cdssdk.PackageID) error {
func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserID, info sch.DataBinding) error {
err := svc.db.UploadData().InsertOrUpdateBinding(svc.db.DefCtx(), bindingData)
var bindingData uploadersdk.Binding
var packageIDs []cdssdk.PackageID
isCode := false
switch bd := info.(type) {
case *sch.DatasetBinding:
jsonData, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData))
packageIDs = bd.PackageIDs
case *sch.CodeBinding:
isCode = true
jsonData, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData))
packageIDs = []cdssdk.PackageID{bd.PackageID}
case *sch.ImageBinding:
jsonData, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData))
packageIDs = bd.PackageIDs
case *sch.ModelBinding:
jsonData, err := json.Marshal(bd)
if err != nil {
return err
}
bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData))
packageIDs = bd.PackageIDs
}
if bindingData.AccessLevel == "" {
bindingData.AccessLevel = sch.PrivateAccess
}
bindingData.CreateTime = time.Now()
bindingID, err := svc.db.UploadData().InsertOrUpdateBinding(svc.db.DefCtx(), bindingData)
if err != nil {
return err
}
for _, id := range pacakgeIDs {
err = svc.db.UploadData().UpdatePackage(svc.db.DefCtx(), id, "", bindingData.ID)
for _, id := range packageIDs {
err = svc.db.UploadData().UpdatePackage(svc.db.DefCtx(), id, "", *bindingID)
if err != nil {
return err
}
// 算法类型需要进行版本管理
if isCode {
pkg, err := svc.db.UploadData().QueryPackageByID(svc.db.DefCtx(), id)
if err != nil {
return err
}
err = svc.db.UploadData().InsertPackageVersion(svc.db.DefCtx(), id, id, pkg.PackageName, 1)
if err != nil {
return err
}
}
}
return nil
}
func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string) uploadersdk.Binding {
bindingData := uploadersdk.Binding{
ID: id,
Name: name,
DataType: dataType,
UserID: userID,
Content: content,
}
return bindingData
}
func (svc *JobSetService) RemoveBinding(pacakgeIDs []cdssdk.PackageID, bindingIDs []int64) error {
if len(pacakgeIDs) > 0 {
for _, id := range pacakgeIDs {
err := svc.db.UploadData().UpdatePackage(svc.db.DefCtx(), id, "", uploadersdk.DataID(-1))
if err != nil {
return err
}
}
}
if len(bindingIDs) > 0 {
err := svc.db.UploadData().DeleteBindingsByID(svc.db.DefCtx(), bindingIDs)
if err != nil {
return err
}
@ -403,29 +523,189 @@ func (svc *JobSetService) DataBinding(bindingData uploadersdk.BindingData, pacak
return nil
}
func (svc *JobSetService) RemoveBinding(pacakgeIDs []cdssdk.PackageID) error {
func (svc *JobSetService) DeleteBinding(IDs []int64) error {
for _, id := range pacakgeIDs {
err := svc.db.UploadData().UpdatePackage(svc.db.DefCtx(), id, "", uploadersdk.DataID(-1))
if err != nil {
return err
}
err := svc.db.UploadData().DeleteBindingsByID(svc.db.DefCtx(), IDs)
if err != nil {
return err
}
return nil
}
func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataType string) error {
func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDataParam) ([]uploadersdk.BindingDetail, error) {
switch p := param.(type) {
case *sch.PrivateLevel:
return svc.queryPrivateBinding(p.UserID, uploadersdk.DataID(p.BindingID), dataType)
case *sch.PublicLevel:
var details []uploadersdk.BindingDetail
datas, err := svc.db.UploadData().GetPublicBindings(svc.db.DefCtx(), p.Type, dataType, p.UserID)
if err != nil {
return nil, err
}
for _, data := range datas {
var info sch.DataBinding
binding := uploadersdk.Binding{
DataType: dataType,
Content: data.Content,
}
info, err = unmarshalBinding(binding)
if err != nil {
return nil, err
}
bindingDetail := uploadersdk.BindingDetail{
ID: data.ID,
UserID: data.UserID,
Name: data.Name,
Info: info,
AccessLevel: data.AccessLevel,
}
details = append(details, bindingDetail)
}
return details, nil
case *sch.ApplyLevel:
var details []uploadersdk.BindingDetail
datas, err := svc.db.UploadData().GetApplyBindings(svc.db.DefCtx(), p.UserID, p.Type, dataType)
if err != nil {
return nil, err
}
for _, data := range datas {
var info sch.DataBinding
// 只有approved状态的数据才能看到详情
if data.Status == sch.ApprovedStatus {
binding := uploadersdk.Binding{
DataType: dataType,
Content: data.Content,
}
info, err = unmarshalBinding(binding)
if err != nil {
return nil, err
}
}
bindingDetail := uploadersdk.BindingDetail{
ID: data.ID,
UserID: data.UserID,
Name: data.Name,
Info: info,
Status: data.Status,
AccessLevel: data.AccessLevel,
}
details = append(details, bindingDetail)
}
return details, nil
}
return nil, fmt.Errorf("unknown query binding data type")
}
func (svc *JobSetService) queryPrivateBinding(userID cdssdk.UserID, bindingID uploadersdk.DataID, dataType string) ([]uploadersdk.BindingDetail, error) {
var details []uploadersdk.BindingDetail
if bindingID == -1 {
datas, err := svc.db.UploadData().GetPrivateBindings(svc.db.DefCtx(), userID, dataType)
if err != nil {
return nil, err
}
for _, data := range datas {
info, err := unmarshalBinding(data)
if err != nil {
return nil, err
}
bindingDetail := uploadersdk.BindingDetail{
ID: data.ID,
UserID: data.UserID,
Info: info,
AccessLevel: data.AccessLevel,
}
details = append(details, bindingDetail)
}
return details, nil
}
data, err := svc.db.UploadData().GetBindingsByID(svc.db.DefCtx(), bindingID)
if err != nil {
return nil, err
}
info, err := unmarshalBinding(*data)
if err != nil {
return nil, err
}
packages, err := svc.db.UploadData().QueryPackageByBindingID(svc.db.DefCtx(), bindingID)
if err != nil {
return nil, err
}
detail := uploadersdk.BindingDetail{
ID: bindingID,
UserID: data.UserID,
Packages: packages,
Info: info,
AccessLevel: data.AccessLevel,
}
details = append(details, detail)
return details, nil
}
func unmarshalBinding(data uploadersdk.Binding) (sch.DataBinding, error) {
var info sch.DataBinding
switch data.DataType {
case sch.DATASET:
var content sch.DatasetBinding
err := json.Unmarshal([]byte(data.Content), &content)
if err != nil {
return nil, err
}
info = &content
case sch.CODE:
var content sch.CodeBinding
err := json.Unmarshal([]byte(data.Content), &content)
if err != nil {
return nil, err
}
info = &content
case sch.IMAGE:
var content sch.ImageBinding
err := json.Unmarshal([]byte(data.Content), &content)
if err != nil {
return nil, err
}
info = &content
case sch.MODEL:
var content sch.ModelBinding
err := json.Unmarshal([]byte(data.Content), &content)
if err != nil {
return nil, err
}
info = &content
}
return info, nil
}
func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataType string, uploadPriority sch.UploadPriority) error {
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
bucketID := cdssdk.BucketID(1)
// 创建package
newPackage, err := cdsCli.Package().Create(cdsapi.PackageCreate{
UserID: userID,
BucketID: 1,
BucketID: bucketID,
Name: name,
})
if err != nil {
@ -433,10 +713,18 @@ func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataT
}
pkg := uploadersdk.Package{
UserID: userID,
PackageID: newPackage.Package.PackageID,
PackageName: name,
DataType: dataType,
UserID: userID,
PackageID: newPackage.Package.PackageID,
PackageName: name,
BucketID: bucketID,
DataType: dataType,
UploadPriority: uploadPriority,
}
// 对Package进行预调度并写入到数据库中
err = svc.packageScheduler(pkg.PackageID, uploadPriority)
if err != nil {
return err
}
// 写入数据库存档
@ -447,6 +735,62 @@ func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataT
return nil
}
func (svc *JobSetService) packageScheduler(packageID cdssdk.PackageID, uploadPriority sch.UploadPriority) error {
clusterMapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
if err != nil {
return fmt.Errorf("query cluster mapping error: %w", err)
}
var clusters []uploadersdk.Cluster
switch uploadPriority := uploadPriority.(type) {
case *sch.Preferences:
// 进行预调度
clusterID, err := svc.preScheduler.ScheduleJob(uploadPriority.ResourcePriorities, clusterMapping)
if err != nil {
return fmt.Errorf("pre scheduling: %w", err)
}
storageID, ok := clusterMapping[*clusterID]
if !ok {
return fmt.Errorf("cluster %d not found", clusterID)
}
cluster := uploadersdk.Cluster{
PackageID: packageID,
ClusterID: *clusterID,
StorageID: storageID,
}
clusters = append(clusters, cluster)
case *sch.SpecifyCluster:
// 指定集群
for _, clusterID := range uploadPriority.Clusters {
storageID, ok := clusterMapping[clusterID]
if !ok {
logger.Warnf("cluster %d not found", clusterID)
continue
}
cluster := uploadersdk.Cluster{
PackageID: packageID,
ClusterID: clusterID,
StorageID: storageID,
}
clusters = append(clusters, cluster)
}
}
if len(clusters) == 0 {
return errors.New("no storage is available")
}
for _, clst := range clusters {
err := svc.db.UploadData().InsertUploadedCluster(svc.db.DefCtx(), clst)
if err != nil {
return err
}
}
return nil
}
func (svc *JobSetService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) error {
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
@ -465,3 +809,248 @@ func (svc *JobSetService) DeletePackage(userID cdssdk.UserID, packageID cdssdk.P
}
return nil
}
func (svc *JobSetService) QueryResource(queryResource sch.ResourceRange) ([]sch.ClusterDetail, error) {
clusterDetails, err := svc.getClusterResources()
if err != nil {
return nil, err
}
var results []sch.ClusterDetail
for _, cluster := range clusterDetails {
if cluster.Resources == nil || len(cluster.Resources) == 0 {
continue
}
ok := isAppropriateResources(cluster.Resources, queryResource)
if ok {
results = append(results, cluster)
}
}
return results, nil
}
func isAppropriateResources(resources []sch.ClusterResource, queryResource sch.ResourceRange) bool {
for _, resource := range resources {
if resource.Resource.Type == queryResource.Type {
//ok := compareResource(queryResource.GPU.Min, queryResource.GPU.Max, resource.Resource.Available.Value)
//if !ok {
// return false
//}
if resource.BaseResources == nil || len(resource.BaseResources) == 0 {
return false
}
ok := false
for _, baseResource := range resource.BaseResources {
if baseResource.Type == sch.ResourceTypeCPU {
ok = compareResource(queryResource.CPU.Min, queryResource.CPU.Max, baseResource.Available.Value)
if !ok {
return false
}
}
if baseResource.Type == sch.ResourceTypeMemory {
ok = compareResource(queryResource.Memory.Min, queryResource.Memory.Max, baseResource.Available.Value)
if !ok {
return false
}
}
if baseResource.Type == sch.ResourceTypeStorage {
ok = compareResource(queryResource.Storage.Min, queryResource.Storage.Max, baseResource.Available.Value)
if !ok {
return false
}
}
}
return true
}
}
return false
}
func compareResource(min float64, max float64, v float64) bool {
if min > max {
return false
}
if min == 0 && max == 0 {
return true
}
if v >= min && v <= max {
return true
}
return false
}
func (svc *JobSetService) ResourceRange() ([]sch.ResourceRange, error) {
clusterDetails, err := svc.getClusterResources()
if err != nil {
return nil, err
}
// 初始化一个空的 map 来存储资源类型的 Range 数据
resourceMap := make(map[sch.ResourceType]sch.ResourceRange)
// 遍历所有 ClusterDetail
for _, cluster := range clusterDetails {
var CPUValue float64
var MemValue float64
var StorageValue float64
for _, resource := range cluster.Resources {
for _, baseResource := range resource.BaseResources {
// 检查资源类型,跳过不需要统计的类型
switch baseResource.Type {
case sch.ResourceTypeCPU:
CPUValue = baseResource.Available.Value
case sch.ResourceTypeMemory:
MemValue = baseResource.Available.Value
case sch.ResourceTypeStorage:
StorageValue = baseResource.Available.Value
}
}
}
// 遍历每个 ClusterDetail 的资源列表
for _, resource := range cluster.Resources {
// 获取资源类型的 key
resourceType := resource.Resource.Type
// 获取资源的 Available Value
//availableValue := resource.Available.Value
// 获取现有的 ResourceRange
resourceRange, exists := resourceMap[resourceType]
if !exists {
// 如果该资源类型还没有添加过,初始化一个新的 Range
resourceRange = sch.ResourceRange{
Type: resourceType,
GPU: sch.Range{}, // 初始的 GPU 范围
GPUNumber: 0, // 初始的 GPU 数量
CPU: sch.Range{}, // 初始的 CPU 范围
Memory: sch.Range{}, // 初始的 Memory 范围
Storage: sch.Range{}, // 初始的 Storage 范围
}
}
if CPUValue < resourceRange.CPU.Min || resourceRange.CPU.Min == 0 {
resourceRange.CPU.Min = CPUValue
}
if CPUValue > resourceRange.CPU.Max {
resourceRange.CPU.Max = CPUValue
}
if MemValue < resourceRange.Memory.Min || resourceRange.Memory.Min == 0 {
resourceRange.Memory.Min = MemValue
}
if MemValue > resourceRange.Memory.Max {
resourceRange.Memory.Max = MemValue
}
if StorageValue < resourceRange.Storage.Min || resourceRange.Storage.Min == 0 {
resourceRange.Storage.Min = StorageValue
}
if StorageValue > resourceRange.Storage.Max {
resourceRange.Storage.Max = StorageValue
}
// 增加资源数量统计
resourceRange.GPUNumber++
// 更新 resourceMap 中对应资源类型的 ResourceRange
resourceMap[resourceType] = resourceRange
}
}
// 将 map 转换为一个 slice
var result []sch.ResourceRange
for _, rangeData := range resourceMap {
result = append(result, rangeData)
}
// 返回统计结果
return result, nil
}
func (svc *JobSetService) getClusterResources() ([]sch.ClusterDetail, error) {
schCli, err := schglb.PCMSchePool.Acquire()
if err != nil {
return nil, fmt.Errorf("new scheduler client: %w", err)
}
defer schglb.PCMSchePool.Release(schCli)
clusterMapping, err := svc.db.UploadData().GetClusterMapping(svc.db.DefCtx())
if err != nil {
return nil, fmt.Errorf("query cluster mapping: %w", err)
}
// 查询指定算力中心
clusterIDs := make([]schsdk.ClusterID, 0, len(clusterMapping))
for id := range clusterMapping {
clusterIDs = append(clusterIDs, id)
}
clusterDetails, err := schCli.GetClusterInfo(sch.GetClusterInfoReq{
IDs: clusterIDs,
})
if err != nil {
return nil, fmt.Errorf("get cluster info: %w", err)
}
if len(clusterDetails) == 0 {
return nil, errors.New("no cluster found")
}
return clusterDetails, nil
}
func (svc *JobSetService) QueryImages(IDs []int64) ([]sch.ClusterImage, error) {
images, err := svc.db.UploadData().GetImageByID(svc.db.DefCtx(), IDs)
if err != nil {
return nil, fmt.Errorf("query images: %w", err)
}
return images, nil
}
func (svc *JobSetService) UpdateCode(userID cdssdk.UserID, bucketID cdssdk.BucketID, packageID cdssdk.PackageID, packageName string) (*cdssdk.Package, error) {
// 复制package
cdsCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return nil, fmt.Errorf("new cds client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(cdsCli)
maxVersion, err := svc.db.UploadData().GetMaxVersion(svc.db.DefCtx(), packageID)
if err != nil {
return nil, fmt.Errorf("get max version: %w", err)
}
version := maxVersion + 1
packageName = fmt.Sprintf("%s_%d", packageName, version)
cloneReq := cdsapi.PackageClone{
PackageID: packageID,
Name: packageName,
BucketID: bucketID,
UserID: userID,
}
cloneResp, err := cdsCli.Package().Clone(cloneReq)
if err != nil {
return nil, fmt.Errorf("clone package: %w", err)
}
// 将package添加到version表
err = svc.db.UploadData().InsertPackageVersion(svc.db.DefCtx(), packageID, cloneResp.Package.PackageID, cloneResp.Package.Name, version)
if err != nil {
return nil, fmt.Errorf("insert package version: %w", err)
}
// 返回package
return &cloneResp.Package, nil
}