Signed-off-by: jagger <cossjie@foxmail.com>
This commit is contained in:
jagger 2025-06-11 16:46:03 +08:00
parent 5bef68e391
commit 4529c5686d
14 changed files with 209 additions and 43 deletions

View File

@ -1403,6 +1403,7 @@ type ResourceSpecReq {
Name string `form:"name,optional"`
Status string `form:"status,optional"`
changeType string `form:"changeType,optional"`
Tag string `form:"tag,optional"` // 标签0: 训练1推理-1通用
PageInfo
}
@ -1431,6 +1432,7 @@ type ResourceSpec {
ClusterId string `json:"clusterId" gorm:"column:cluster_id"`
CostPerUnit float64 `json:"costPerUnit" gorm:"column:cost_per_unit"`
CostType string `json:"costType" gorm:"column:cost_type"` //计费类型hourly, daily, monthly,perUse
Tag string `json:"tag" gorm:"tag"` // 标签0: 训练1推理-1通用
UserId string `json:"userId" gorm:"column:user_id"`
CreateTime string `json:"createTime" gorm:"column:create_time"`
UpdateTime string `json:"updateTime" gorm:"column:update_time"`

View File

@ -204,10 +204,13 @@ service pcm {
@handler deleteResourceSpecHandler
delete /core/ai/resourceSpec/delete/:id (DeletePathId) returns (CommonResp)
// @doc "获取任务"
// @handler getResourceRange
// get /core/ai/resourceSpec/range (ResourceRangeReq) returns (ResourceRangeResp)
@doc "获取资源规格列表"
@handler pageResourceRange
get /core/ai/resourceSpec/page (ResourceSpecReq) returns (PageResult)
@doc "获取资源规格范围"
@handler resourceRange
get /core/ai/resourceSpec/range (ResourceSpecReq) returns (ListResult)
//集群资源规格----- 结束
}

View File

@ -0,0 +1,24 @@
package core
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func PageResourceRangeHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ResourceSpecReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := core.NewPageResourceRangeLogic(r.Context(), svcCtx)
resp, err := l.PageResourceRange(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package core
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func ResourceRangeHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ResourceSpecReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := core.NewResourceRangeLogic(r.Context(), svcCtx)
resp, err := l.ResourceRange(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -399,6 +399,18 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/ai/resourceSpec/edit",
Handler: core.EditResourceSpecHandler(serverCtx),
},
{
// 获取资源规格列表
Method: http.MethodGet,
Path: "/core/ai/resourceSpec/page",
Handler: core.PageResourceRangeHandler(serverCtx),
},
{
// 获取资源规格范围
Method: http.MethodGet,
Path: "/core/ai/resourceSpec/range",
Handler: core.ResourceRangeHandler(serverCtx),
},
{
// 同步指定资源规格
Method: http.MethodPut,

View File

@ -33,6 +33,7 @@ type APIResponse struct {
ClusterId string `json:"ClusterId"`
ClusterType string `json:"clusterType"`
Region string `json:"region"`
Tag string `json:"tag"`
Resources []Resource `json:"resources"`
Msg string `json:"msg"`
}
@ -70,9 +71,10 @@ func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.ResourceSpecRe
// 获取集群资源数据
startTime := time.Now()
apiResources, err := l.FetchClusterResources(req.ClusterId)
apiResources, err := l.FetchClusterResources(req.ClusterId, req.Tag)
log.Debug().Msgf("调用获取ai训练资源接口耗时 %v", time.Since(startTime))
if err != nil {
log.Error().Msgf("调用第三方接口获取集群资源失败: %v", err)
return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
}
@ -85,9 +87,11 @@ func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.ResourceSpecRe
return l.queryDatabaseResults(req)
}
func (l *CompareResourceSpecLogic) FetchClusterResources(clusterId string) ([]APIResponse, error) {
func (l *CompareResourceSpecLogic) FetchClusterResources(clusterId string, tag string) ([]APIResponse, error) {
queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{})
resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{
Type: tag,
})
if err != nil {
return nil, fmt.Errorf("query resources failed: %w", err)
}
@ -153,6 +157,7 @@ func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]m
var dbSpecs []models.TResourceSpec
if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
Where("cluster_id = ?", ClusterId).
Where("tag = ?", response.Tag).
Find(&dbSpecs).Error; err != nil {
return nil, nil, fmt.Errorf("database query failed: %w", err)
}
@ -163,7 +168,7 @@ func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]m
if res.Resource.Name == "" || res.Resource.Type == "" {
continue
}
spec := l.convertToResourceSpec(ClusterId, res)
spec := l.convertToResourceSpec(ClusterId, res, response.Tag)
apiSpecs = append(apiSpecs, spec)
}
@ -179,7 +184,7 @@ func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []mod
apiMap := make(map[string]models.TResourceSpec)
for _, spec := range apiSpecs {
key := resourceKey(spec.Type, spec.Name)
key := resourceKey(spec.Type, spec.Name, spec.Tag)
apiMap[key] = spec
}
@ -196,6 +201,7 @@ func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []mod
// 初始化时间字段
newSpec.CreateTime = time.Now()
newSpec.UpdateTime = time.Now()
newSpec.Tag = apiSpec.Tag
createSpecs = append(createSpecs, &newSpec)
continue
}
@ -222,26 +228,6 @@ func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []mod
return fmt.Errorf("failed to batch create resources: %w", err)
}
// 准备关联资源数据
var baseResources []models.TBaseResourceSpec
for _, spec := range createSpecs {
for i := range spec.BaseResourceSpecs {
br := &spec.BaseResourceSpecs[i]
br.ResourceSpecId = spec.Id
br.CreateTime = time.Now()
br.UpdateTime = time.Now()
baseResources = append(baseResources, *br)
}
}
// 批量插入关联资源
if len(baseResources) > 0 {
if err := tx.CreateInBatches(baseResources, 100).Error; err != nil {
tx.Rollback()
return fmt.Errorf("failed to batch create base resources: %w", err)
}
}
if err := tx.Commit().Error; err != nil {
return fmt.Errorf("transaction commit failed: %w", err)
}
@ -284,8 +270,8 @@ func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []mod
return nil
}
func resourceKey(resType, name string) string {
return fmt.Sprintf("%s::%s", resType, name)
func resourceKey(resType, name, tag string) string {
return fmt.Sprintf("%s::%s::%s", resType, name, tag)
}
func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
@ -331,11 +317,11 @@ func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec)
// 比较基础资源
oldBaseMap := make(map[string]models.TBaseResourceSpec)
for _, br := range old.BaseResourceSpecs {
oldBaseMap[resourceKey(br.Type, br.Name)] = br
oldBaseMap[resourceKey(br.Type, br.Name, old.Tag)] = br
}
for _, newBr := range new.BaseResourceSpecs {
key := resourceKey(newBr.Type, newBr.Name)
key := resourceKey(newBr.Type, newBr.Name, new.Tag)
oldBr, exists := oldBaseMap[key]
if !exists ||
oldBr.TotalValue != newBr.TotalValue ||
@ -348,11 +334,12 @@ func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec)
return len(oldBaseMap) > 0
}
func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec {
func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string) models.TResourceSpec {
spec := models.TResourceSpec{
SourceKey: resourceKey(res.Resource.Type, res.Resource.Name),
SourceKey: resourceKey(res.Resource.Type, res.Resource.Name, tag),
Type: res.Resource.Type,
Name: res.Resource.Name,
Tag: tag,
TotalCount: int64(res.Resource.Total.Value),
AvailableCount: int64(res.Resource.Available.Value),
ClusterId: ClusterId,
@ -402,6 +389,7 @@ func (l *CompareResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecR
func (l *CompareResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("cluster_id = ?", utils.StringToInt64(req.ClusterId)).
Where("tag = ?", req.Tag).
Where("deleted_at IS NULL")
if req.Status != "" {

View File

@ -0,0 +1,78 @@
package core
import (
"context"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type PageResourceRangeLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewPageResourceRangeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PageResourceRangeLogic {
return &PageResourceRangeLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *PageResourceRangeLogic) PageResourceRange(req *types.ResourceSpecReq) (resp *types.PageResult, err error) {
if req.PageNum <= 0 || req.PageSize <= 0 {
return nil, fmt.Errorf("invalid pagination parameters: PageNum=%d, PageSize=%d", req.PageNum, req.PageSize)
}
result := &types.PageResult{
PageNum: req.PageNum,
PageSize: req.PageSize,
}
query := l.buildBaseQuery(req)
if err := query.Count(&result.Total).Error; err != nil {
return nil, fmt.Errorf("failed to count records: %w", err)
}
var specs []*models.TResourceSpec
if err := query.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
Scopes(paginate(req.PageNum, req.PageSize)).
Find(&specs).Error; err != nil {
return nil, fmt.Errorf("failed to query resources: %w", err)
}
result.List = specs
return result, nil
}
func (l *PageResourceRangeLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("deleted_at IS NULL")
if req.ClusterId != "" {
query = query.Where("cluster_id = ?", req.ClusterId)
}
if req.Tag != "" {
query = query.Where("tag = ?", req.Tag)
}
if req.Status != "" {
query = query.Where("status = ?", req.Status)
}
if req.ChangeType != "" {
query = query.Where("change_type = ?", req.ChangeType)
}
if req.Type != "" {
query = query.Where("type = ?", req.Type)
}
if req.Name != "" {
query = query.Where("name LIKE ?", "%"+req.Name+"%")
}
return query
}

View File

@ -0,0 +1,30 @@
package core
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type ResourceRangeLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewResourceRangeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ResourceRangeLogic {
return &ResourceRangeLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ResourceRangeLogic) ResourceRange(req *types.ResourceSpecReq) (resp *types.ListResult, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -43,7 +43,7 @@ func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (re
// 获取集群资源数据
startTime := time.Now()
compareLogic := NewCompareResourceSpecLogic(l.ctx, l.svcCtx)
apiResources, err := compareLogic.FetchClusterResources(strconv.FormatInt(mainSpec.ClusterId, 10))
apiResources, err := compareLogic.FetchClusterResources(strconv.FormatInt(mainSpec.ClusterId, 10), mainSpec.Tag)
log.Debug().Msgf("调用获取ai训练资源接口耗时 %v", time.Since(startTime))
if err != nil {
return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
@ -81,11 +81,11 @@ func (l *SyncResourceSpecLogic) updateResource(existing *models.TResourceSpec, n
return fmt.Errorf("failed to update resource: %w", err)
}
return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs)
return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs, newSpec.Tag)
})
}
func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec) error {
func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec, tag string) error {
// 处理基础资源更新
var existingResources []models.TBaseResourceSpec
if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
@ -94,14 +94,14 @@ func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, new
existingMap := make(map[string]models.TBaseResourceSpec)
for _, r := range existingResources {
key := resourceKey(r.Type, r.Name)
key := resourceKey(r.Type, r.Name, tag)
existingMap[key] = r
}
// 处理更新和新增
for i, newRes := range newResources {
newRes.ResourceSpecId = specID
key := resourceKey(newRes.Type, newRes.Name)
key := resourceKey(newRes.Type, newRes.Name, tag)
if existing, exists := existingMap[key]; exists {
newRes.Id = existing.Id

View File

@ -21,6 +21,7 @@ type ResourceSpec struct {
ClusterId string `json:"clusterID"`
ClusterType string `json:"clusterType"`
Region string `json:"region"`
Tag string `json:"tag"`
Resources []interface{} `json:"resources"`
Msg string `json:"msg"`
}

View File

@ -1005,7 +1005,7 @@ func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string)
var VMemorysum int64 = 0
var RunningTaskNum int64 = 0
var BalanceValue float64 = -1
var RateValue float64 = 0.930000
var RateValue = 0.930000
var StorageValue int64 = 1024
var AvailableValue int64 = 886
@ -1083,7 +1083,7 @@ func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string)
resUsage.Resources = append(resUsage.Resources, RunningTaskRes)
resUsage.Resources = append(resUsage.Resources, BalanceRes)
resUsage.Resources = append(resUsage.Resources, RateRes)
resUsage.Region = "Train"
resUsage.Tag = "Train"
case "Inference":
req := &modelarts.ListSpecificationsReq{
@ -1135,7 +1135,7 @@ func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string)
cres.BaseResources = append(cres.BaseResources, UsageMEMORY)
cres.BaseResources = append(cres.BaseResources, UsageVMEMORY)
cres.BaseResources = append(cres.BaseResources, Storage)
resUsage.Region = "Inference"
resUsage.Tag = "Inference"
resUsage.Resources = append(resUsage.Resources, cres)
}
}

View File

@ -781,6 +781,7 @@ func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collec
var resources []interface{}
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
Tag: resrcType,
}
//clres := &collector.ClusterResource{}
creationRequirelUrl := o.host + CreationRequirelUrl

View File

@ -4627,6 +4627,7 @@ type ResourceSpec struct {
ClusterId string `json:"clusterId" gorm:"column:cluster_id"`
CostPerUnit float64 `json:"costPerUnit" gorm:"column:cost_per_unit"`
CostType string `json:"costType" gorm:"column:cost_type"` //计费类型hourly, daily, monthly,perUse
Tag string `json:"tag" gorm:"tag"` // 标签0: 训练1推理-1通用
UserId string `json:"userId" gorm:"column:user_id"`
CreateTime string `json:"createTime" gorm:"column:create_time"`
UpdateTime string `json:"updateTime" gorm:"column:update_time"`
@ -4639,6 +4640,7 @@ type ResourceSpecReq struct {
Name string `form:"name,optional"`
Status string `form:"status,optional"`
ChangeType string `form:"changeType,optional"`
Tag string `form:"tag,optional"` // 标签0: 训练1推理-1通用
PageInfo
}

View File

@ -39,7 +39,7 @@ type (
TResourceSpec struct {
Id int64 `db:"id" json:"id"` // 主键id
SourceKey string `db:"source_key" json:"source_key"` // 数据源标识type-name
SourceKey string `db:"source_key" json:"sourceKey"` // 数据源标识type-name
Type string `db:"type" json:"type"` // 类型名称
Name string `db:"name" json:"name"` // 规格名称
TotalCount int64 `db:"total_count" json:"totalCount"` // 资源总量
@ -50,6 +50,7 @@ type (
ClusterId int64 `db:"cluster_id" json:"clusterId,string"` // 集群ID
CostPerUnit float64 `db:"cost_per_unit" json:"costPerUnit"` // 单位时间积分消耗
CostType string `db:"cost_type" json:"costType"` // 计费类型hourly, daily, monthly,perUse
Tag string `db:"tag" json:"tag"` // 标签Train: 训练Inference推理
UserId int64 `db:"user_id" json:"userId"` // 用户ID
CreateTime time.Time `db:"create_time" json:"createTime"` // 创建时间
UpdateTime time.Time `db:"update_time" json:"updateTime"` // 更新时间