fix resource spec

Signed-off-by: jagger <cossjie@foxmail.com>
This commit is contained in:
jagger 2025-05-26 17:10:49 +08:00
parent daff6afc1b
commit 3c0d54a104
9 changed files with 240 additions and 45 deletions

View File

@ -1453,4 +1453,8 @@ type EditResourceReq {
status string `json:"status" gorm:"column:status"`
CostPerUnit string `json:"costPerUnit" gorm:"column:cost_per_unit"`
CostType string `json:"costType" gorm:"column:cost_type"` //计费类型hourly, daily, monthly,perUse
}
type SyncResourceReq {
Id []string `json:"id"`
}

View File

@ -179,9 +179,14 @@ service pcm {
@handler getClusterByIdHandler
get /core/getClusterById (getClusterByIdReq) returns (getClusterByIdResp)
@doc "获取指定集群资源规格列表"
@handler getClusterResourceSpecHandler
get /core/ai/resourceSpec/list (ResourceSpecReq) returns (PageResult)
//集群资源规格----- 开始
@doc "与Api接口对比集群资源规格"
@handler compareResourceSpecHandler
get /core/ai/resourceSpec/compare (ResourceSpecReq) returns (PageResult)
@doc "同步指定资源规格"
@handler syncResourceSpecHandler
put /core/ai/resourceSpec/sync (SyncResourceReq) returns (CommonResp)
@doc "获取指定资源规格详情"
@handler detailResourceSpecHandler
@ -198,6 +203,7 @@ service pcm {
@doc "删除资源规格"
@handler deleteResourceSpecHandler
delete /core/ai/resourceSpec/delete/:id (DeletePathId) returns (CommonResp)
//集群资源规格----- 结束
}
//hpc二级接口

View File

@ -9,7 +9,7 @@ import (
"net/http"
)
func GetClusterResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
func CompareResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ResourceSpecReq
if err := httpx.Parse(r, &req); err != nil {
@ -17,8 +17,8 @@ func GetClusterResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc
return
}
l := core.NewGetClusterResourceSpecLogic(r.Context(), svcCtx)
resp, err := l.GetClusterResourceSpec(&req)
l := core.NewCompareResourceSpecLogic(r.Context(), svcCtx)
resp, err := l.CompareResourceSpec(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -0,0 +1,24 @@
package core
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/core"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func SyncResourceSpecHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.SyncResourceReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := core.NewSyncResourceSpecLogic(r.Context(), svcCtx)
resp, err := l.SyncResourceSpec(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -370,6 +370,12 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/ai/resourceSpec/cancelAlarm",
Handler: core.CancelResourceSpecAlarmHandler(serverCtx),
},
{
// 与Api接口对比集群资源规格
Method: http.MethodGet,
Path: "/core/ai/resourceSpec/compare",
Handler: core.CompareResourceSpecHandler(serverCtx),
},
{
// 删除资源规格
Method: http.MethodDelete,
@ -389,10 +395,10 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Handler: core.EditResourceSpecHandler(serverCtx),
},
{
// 获取指定集群资源规格列表
Method: http.MethodGet,
Path: "/core/ai/resourceSpec/list",
Handler: core.GetClusterResourceSpecHandler(serverCtx),
// 同步指定资源规格
Method: http.MethodPut,
Path: "/core/ai/resourceSpec/sync",
Handler: core.SyncResourceSpecHandler(serverCtx),
},
{
// 获取节点资产

View File

@ -2,31 +2,32 @@ package core
import (
"context"
"errors"
"fmt"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
"gorm.io/gorm"
)
type GetClusterResourceSpecLogic struct {
type CompareResourceSpecLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
const (
ChangeTypeCreated = 0
ChangeTypeModified = 1
ChangeTypeDeleted = 2
ChangeTypeNormal = 0 // 资源规格正常
ChangeTypeModified = 1 // 资源规格变更
ChangeTypeDeleted = 2 // 资源被删除
)
type APIResponse struct {
@ -54,22 +55,22 @@ type Metric struct {
Value float64 `json:"value"`
}
func NewGetClusterResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetClusterResourceSpecLogic {
return &GetClusterResourceSpecLogic{
func NewCompareResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CompareResourceSpecLogic {
return &CompareResourceSpecLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetClusterResourceSpecLogic) GetClusterResourceSpec(req *types.ResourceSpecReq) (*types.PageResult, error) {
func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.ResourceSpecReq) (resp *types.PageResult, err error) {
if req.ClusterId == "" {
return nil, errors.New("ClusterId is required")
}
// 获取集群资源数据
startTime := time.Now()
apiResources, err := l.fetchClusterResources(req.ClusterId)
apiResources, err := l.FetchClusterResources(req.ClusterId)
log.Debug().Msgf("调用获取ai训练资源接口耗时 %v", time.Since(startTime))
if err != nil {
return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
@ -84,7 +85,7 @@ func (l *GetClusterResourceSpecLogic) GetClusterResourceSpec(req *types.Resource
return l.queryDatabaseResults(req)
}
func (l *GetClusterResourceSpecLogic) fetchClusterResources(ClusterId string) ([]APIResponse, error) {
func (l *CompareResourceSpecLogic) FetchClusterResources(ClusterId string) ([]APIResponse, error) {
queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{
ClusterIDs: []string{ClusterId},
@ -94,14 +95,18 @@ func (l *GetClusterResourceSpecLogic) fetchClusterResources(ClusterId string) ([
}
var apiResponses []APIResponse
if err := l.decodeAPIResponse(resources.Data, &apiResponses); err != nil {
if err := decodeAPIResponse(resources.Data, &apiResponses); err != nil {
return nil, fmt.Errorf("decode response failed: %w", err)
}
for _, resource := range apiResponses {
if resource.Resources == nil {
return nil, fmt.Errorf(resource.Msg)
}
}
return apiResponses, nil
}
func (l *GetClusterResourceSpecLogic) decodeAPIResponse(input interface{}, output *[]APIResponse) error {
func decodeAPIResponse(input interface{}, output *[]APIResponse) error {
config := &mapstructure.DecoderConfig{
Result: output,
TagName: "json",
@ -124,7 +129,7 @@ func (l *GetClusterResourceSpecLogic) decodeAPIResponse(input interface{}, outpu
return nil
}
func (l *GetClusterResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
for _, response := range apiResponses {
// 转换API响应到数据库模型
dbSpecs, apiSpecs, err := l.processAPIResponse(response)
@ -140,7 +145,7 @@ func (l *GetClusterResourceSpecLogic) syncResourcesToDB(apiResponses []APIRespon
return nil
}
func (l *GetClusterResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
ClusterId := utils.StringToInt64(response.ClusterId)
var dbSpecs []models.TResourceSpec
if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
@ -151,6 +156,10 @@ func (l *GetClusterResourceSpecLogic) processAPIResponse(response APIResponse) (
var apiSpecs []models.TResourceSpec
for _, res := range response.Resources {
// 检查资源类型和名称是否存在
if res.Resource.Name == "" || res.Resource.Type == "" {
continue
}
spec := l.convertToResourceSpec(ClusterId, res)
apiSpecs = append(apiSpecs, spec)
}
@ -158,11 +167,11 @@ func (l *GetClusterResourceSpecLogic) processAPIResponse(response APIResponse) (
return dbSpecs, apiSpecs, nil
}
func (l *GetClusterResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
// 创建资源映射
dbMap := make(map[string]models.TResourceSpec)
for _, spec := range dbSpecs {
key := resourceKey(spec.Type, spec.Name)
key := spec.SourceKey
dbMap[key] = spec
}
@ -176,14 +185,20 @@ func (l *GetClusterResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []
for key, apiSpec := range apiMap {
dbSpec, exists := dbMap[key]
if !exists {
// 资源不存在于数据库,创建新资源
if err := l.createNewResource(&apiSpec); err != nil {
return err
}
continue
}
// 资源已存在于数据库检查是否有变更资源规格有变更则更新changeType为变更
if l.isSpecChanged(dbSpec, apiSpec) {
if err := l.updateResource(&dbSpec, apiSpec); err != nil {
if err := l.updateResource(&dbSpec, apiSpec, ChangeTypeModified); err != nil {
return err
}
} else {
// 如果资源存在且没有变更更新changeType为正常
if err := l.updateResource(&dbSpec, apiSpec, ChangeTypeNormal); err != nil {
return err
}
}
@ -205,7 +220,7 @@ func resourceKey(resType, name string) string {
return fmt.Sprintf("%s::%s", resType, name)
}
func (l *GetClusterResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(spec).Error; err != nil {
return fmt.Errorf("failed to create resource: %w", err)
@ -215,13 +230,12 @@ func (l *GetClusterResourceSpecLogic) createNewResource(spec *models.TResourceSp
})
}
func (l *GetClusterResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec) error {
// 标识资源规格变更
func (l *CompareResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec, changeType int) error {
return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
updates := map[string]interface{}{
"total_count": newSpec.TotalCount,
"available_count": newSpec.AvailableCount,
"change_type": ChangeTypeModified,
"update_time": time.Now(),
"change_type": changeType,
"update_time": time.Now(),
}
if err := tx.Model(existing).Updates(updates).Error; err != nil {
@ -232,7 +246,7 @@ func (l *GetClusterResourceSpecLogic) updateResource(existing *models.TResourceS
})
}
func (l *GetClusterResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec) error {
func (l *CompareResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec) error {
// 处理基础资源更新
var existingResources []models.TBaseResourceSpec
if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
@ -281,14 +295,14 @@ func (l *GetClusterResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int6
return nil
}
func (l *GetClusterResourceSpecLogic) markResourceDeleted(id int64) error {
func (l *CompareResourceSpecLogic) markResourceDeleted(id int64) error {
return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("id = ?", id).
Update("change_type", ChangeTypeDeleted).
Error
}
func (l *GetClusterResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
if old.TotalCount != new.TotalCount ||
old.AvailableCount != new.AvailableCount ||
old.Region != new.Region {
@ -315,8 +329,9 @@ func (l *GetClusterResourceSpecLogic) isSpecChanged(old, new models.TResourceSpe
return len(oldBaseMap) > 0
}
func (l *GetClusterResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec {
func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec {
spec := models.TResourceSpec{
SourceKey: resourceKey(res.Resource.Type, res.Resource.Name),
Type: res.Resource.Type,
Name: res.Resource.Name,
TotalCount: int64(res.Resource.Total.Value),
@ -324,7 +339,7 @@ func (l *GetClusterResourceSpecLogic) convertToResourceSpec(ClusterId int64, res
ClusterId: ClusterId,
CreateTime: time.Now(),
UpdateTime: time.Now(),
ChangeType: ChangeTypeCreated,
ChangeType: ChangeTypeNormal,
}
for _, br := range res.BaseResources {
@ -343,7 +358,7 @@ func (l *GetClusterResourceSpecLogic) convertToResourceSpec(ClusterId int64, res
return spec
}
func (l *GetClusterResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecReq) (*types.PageResult, error) {
func (l *CompareResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecReq) (*types.PageResult, error) {
result := &types.PageResult{
PageNum: req.PageNum,
PageSize: req.PageSize,
@ -365,7 +380,7 @@ func (l *GetClusterResourceSpecLogic) queryDatabaseResults(req *types.ResourceSp
return result, nil
}
func (l *GetClusterResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
func (l *CompareResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("cluster_id = ?", utils.StringToInt64(req.ClusterId)).
Where("deleted_at IS NULL")

View File

@ -0,0 +1,135 @@
package core
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"strconv"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type SyncResourceSpecLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewSyncResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncResourceSpecLogic {
return &SyncResourceSpecLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (resp *types.CommonResp, err error) {
var mainSpec models.TResourceSpec
if err := l.svcCtx.DbEngin.Where("id = ? AND deleted_at IS NULL", req.Id).
First(&mainSpec).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errors.Errorf("资源规格不存在 (ID: %s)", req.Id)
}
return nil, errors.Wrapf(err, "查询资源规格失败 (ID: %s)", req.Id)
}
// 获取集群资源数据
startTime := time.Now()
compareLogic := NewCompareResourceSpecLogic(l.ctx, l.svcCtx)
apiResources, err := compareLogic.FetchClusterResources(strconv.FormatInt(mainSpec.ClusterId, 10))
log.Debug().Msgf("调用获取ai训练资源接口耗时 %v", time.Since(startTime))
if err != nil {
return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
}
for _, response := range apiResources {
// 转换API响应到数据库模型
_, apiSpecs, err := compareLogic.processAPIResponse(response)
if err != nil {
return nil, err
}
// 同步资源到数据库
for _, spec := range apiSpecs {
if spec.SourceKey == mainSpec.SourceKey {
err := l.updateResource(&mainSpec, spec)
if err != nil {
return nil, err
}
}
}
}
return nil, nil
}
func (l *SyncResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec) error {
return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
updates := map[string]interface{}{
"type": newSpec.Type,
"total_count": newSpec.TotalCount,
"available_count": newSpec.AvailableCount,
"change_type": ChangeTypeNormal,
"update_time": time.Now(),
}
if err := tx.Model(existing).Updates(updates).Error; err != nil {
return fmt.Errorf("failed to update resource: %w", err)
}
return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs)
})
}
func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec) error {
// 处理基础资源更新
var existingResources []models.TBaseResourceSpec
if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
return fmt.Errorf("failed to query base resources: %w", err)
}
existingMap := make(map[string]models.TBaseResourceSpec)
for _, r := range existingResources {
key := resourceKey(r.Type, r.Name)
existingMap[key] = r
}
// 处理更新和新增
for i, newRes := range newResources {
newRes.ResourceSpecId = specID
key := resourceKey(newRes.Type, newRes.Name)
if existing, exists := existingMap[key]; exists {
newRes.Id = existing.Id
newRes.CreateTime = existing.CreateTime
if err := tx.Save(&newRes).Error; err != nil {
return fmt.Errorf("failed to update base resource: %w", err)
}
} else {
if err := tx.Create(&newRes).Error; err != nil {
return fmt.Errorf("failed to create base resource: %w", err)
}
}
newResources[i] = newRes
}
// 处理删除
currentIDs := make(map[int64]struct{})
for _, r := range newResources {
currentIDs[r.Id] = struct{}{}
}
for _, existing := range existingResources {
if _, exists := currentIDs[existing.Id]; !exists {
if err := tx.Delete(&existing).Error; err != nil {
return fmt.Errorf("failed to delete base resource: %w", err)
}
}
}
return nil
}

View File

@ -5505,6 +5505,10 @@ type SyncClusterAlertReq struct {
AlertRecordsMap map[string]interface{} `json:"alertRecordsMap"`
}
type SyncResourceReq struct {
Id []string `json:"id"`
}
type Tags struct {
}

View File

@ -39,6 +39,7 @@ type (
TResourceSpec struct {
Id int64 `db:"id" json:"id"` // 主键id
SourceKey string `db:"source_key" json:"source_key"` // 数据源标识type-name
Type string `db:"type" json:"type"` // 类型名称
Name string `db:"name" json:"name"` // 规格名称
TotalCount int64 `db:"total_count" json:"totalCount"` // 资源总量