pcm-coordinator/internal/logic/core/compareresourcespeclogic.go

365 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package core
import (
"context"
"fmt"
"github.com/mitchellh/mapstructure"
"github.com/rs/zerolog/log"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CompareResourceSpecLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
const (
ChangeTypeNormal = 0 // 资源规格正常
ChangeTypeModified = 1 // 资源规格变更
ChangeTypeDeleted = 2 // 资源被删除
)
type APIResponse struct {
ClusterId string `json:"ClusterId"`
ClusterType string `json:"clusterType"`
Region string `json:"region"`
Tag string `json:"tag"`
Resources []Resource `json:"resources"`
Msg string `json:"msg"`
}
type Resource struct {
Resource ResourceDetail `json:"resource"`
BaseResources []ResourceDetail `json:"baseResources"`
}
type ResourceDetail struct {
Type string `json:"type"`
Name string `json:"name"`
Total Metric `json:"total"`
Available Metric `json:"available"`
}
type Metric struct {
Unit string `json:"unit"`
Value float64 `json:"value"`
}
func NewCompareResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CompareResourceSpecLogic {
return &CompareResourceSpecLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.FetchResourceSpecReq) (resp *types.PageResult, err error) {
if req.ClusterId == "" {
return resp, nil
}
// 获取集群资源数据
startTime := time.Now()
apiResources, err := l.FetchClusterResources(req.ClusterId, req.Tag)
log.Debug().Msgf("调用获取ai训练资源接口耗时 %v", time.Since(startTime))
if err != nil {
log.Error().Msgf("调用第三方接口获取集群资源失败: %v", err)
return nil, fmt.Errorf("调用第三方接口获取集群资源失败")
}
// 同步资源到数据库
if err := l.syncResourcesToDB(apiResources); err != nil {
return nil, fmt.Errorf("failed to sync resources: %w", err)
}
return
}
func (l *CompareResourceSpecLogic) FetchClusterResources(clusterId string, tag string) ([]APIResponse, error) {
queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{
Type: tag,
})
if err != nil {
return nil, fmt.Errorf("query resources failed: %w", err)
}
var apiResponses []APIResponse
if err := decodeAPIResponse(resources.Data, &apiResponses); err != nil {
return nil, fmt.Errorf("decode response failed: %w", err)
}
// 过滤出指定集群的资源
var filteredResponses []APIResponse
for _, response := range apiResponses {
if response.ClusterId == clusterId && response.Resources != nil {
filteredResponses = append(filteredResponses, response)
}
}
if len(filteredResponses) == 0 {
return nil, fmt.Errorf("no resources found for cluster ID: %s", clusterId)
}
return filteredResponses, nil
}
func decodeAPIResponse(input interface{}, output *[]APIResponse) error {
config := &mapstructure.DecoderConfig{
Result: output,
TagName: "json",
ErrorUnused: true,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeHookFunc(time.RFC3339),
mapstructure.StringToSliceHookFunc(","),
),
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return fmt.Errorf("failed to create decoder: %w", err)
}
if err := decoder.Decode(input); err != nil {
return fmt.Errorf("decoding error: %w", err)
}
return nil
}
func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
for _, response := range apiResponses {
// 转换API响应到数据库模型
dbSpecs, apiSpecs, err := l.processAPIResponse(response)
if err != nil {
return err
}
// 处理资源变更
if err := l.handleResourceChanges(dbSpecs, apiSpecs); err != nil {
return fmt.Errorf("failed to handle resource changes: %w", err)
}
}
return nil
}
func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
ClusterId := utils.StringToInt64(response.ClusterId)
var dbSpecs []models.TResourceSpec
if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
Where("cluster_id = ?", ClusterId).
Where("tag = ?", response.Tag).
Find(&dbSpecs).Error; err != nil {
return nil, nil, fmt.Errorf("database query failed: %w", err)
}
var apiSpecs []models.TResourceSpec
for _, res := range response.Resources {
// 检查资源类型和名称是否存在
if res.Resource.Name == "" || res.Resource.Type == "" {
continue
}
spec := l.convertToResourceSpec(ClusterId, res, response.Tag)
apiSpecs = append(apiSpecs, spec)
}
return dbSpecs, apiSpecs, nil
}
func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
dbMap := make(map[string]models.TResourceSpec)
for _, spec := range dbSpecs {
key := spec.SourceKey
dbMap[key] = spec
}
apiMap := make(map[string]models.TResourceSpec)
for _, spec := range apiSpecs {
key := resourceKey(spec.Type, spec.Name, spec.Tag)
apiMap[key] = spec
}
var createSpecs []*models.TResourceSpec
var modifiedIDs []int64
var normalIDs []int64
// 第一阶段:收集需要处理的操作
for key, apiSpec := range apiMap {
dbSpec, exists := dbMap[key]
if !exists {
// 创建资源副本避免指针重复
newSpec := apiSpec
// 初始化时间字段
newSpec.CreateTime = time.Now()
newSpec.UpdateTime = time.Now()
newSpec.Tag = apiSpec.Tag
createSpecs = append(createSpecs, &newSpec)
continue
}
// 检查资源规格变更
if l.isSpecChanged(dbSpec, apiSpec) {
modifiedIDs = append(modifiedIDs, dbSpec.Id)
} else {
normalIDs = append(normalIDs, dbSpec.Id)
}
}
// 第二阶段:批量处理数据库操作
// 批量创建新资源及关联资源
if len(createSpecs) > 0 {
tx := l.svcCtx.DbEngin.Begin()
if tx.Error != nil {
return fmt.Errorf("failed to start transaction: %w", tx.Error)
}
// 批量插入主资源
if err := tx.CreateInBatches(createSpecs, 100).Error; err != nil {
tx.Rollback()
return fmt.Errorf("failed to batch create resources: %w", err)
}
if err := tx.Commit().Error; err != nil {
return fmt.Errorf("transaction commit failed: %w", err)
}
}
// 批量更新变更资源
now := time.Now()
if len(modifiedIDs) > 0 {
if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("id IN ?", modifiedIDs).
Updates(map[string]interface{}{
"change_type": ChangeTypeModified,
"update_time": now,
}).Error; err != nil {
return fmt.Errorf("batch update modified failed: %w", err)
}
}
// 批量更新正常资源
if len(normalIDs) > 0 {
if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("id IN ?", normalIDs).
Updates(map[string]interface{}{
"change_type": ChangeTypeNormal,
"update_time": now,
}).Error; err != nil {
return fmt.Errorf("batch update normal failed: %w", err)
}
}
// 处理删除的资源
for key, dbSpec := range dbMap {
if _, exists := apiMap[key]; !exists {
if err := l.markResourceDeleted(dbSpec.Id); err != nil {
return err
}
}
}
return nil
}
func resourceKey(resType, name, tag string) string {
return fmt.Sprintf("%s::%s::%s", resType, name, tag)
}
func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(spec).Error; err != nil {
return fmt.Errorf("failed to create resource: %w", err)
}
return nil
})
}
// 标识资源规格变更
func (l *CompareResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec, changeType int) error {
return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
updates := map[string]interface{}{
"change_type": changeType,
"update_time": time.Now(),
}
if err := tx.Model(existing).Updates(updates).Error; err != nil {
return fmt.Errorf("failed to update resource: %w", err)
}
return nil
})
}
func (l *CompareResourceSpecLogic) markResourceDeleted(id int64) error {
return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
Where("id = ?", id).
Update("change_type", ChangeTypeDeleted).
Error
}
func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
if old.TotalCount != new.TotalCount ||
old.AvailableCount != new.AvailableCount ||
old.Region != new.Region {
return true
}
// 比较基础资源
oldBaseMap := make(map[string]models.TBaseResourceSpec)
for _, br := range old.BaseResourceSpecs {
oldBaseMap[resourceKey(br.Type, br.Name, old.Tag)] = br
}
for _, newBr := range new.BaseResourceSpecs {
key := resourceKey(newBr.Type, newBr.Name, new.Tag)
oldBr, exists := oldBaseMap[key]
if !exists ||
oldBr.TotalValue != newBr.TotalValue ||
oldBr.AvailableValue != newBr.AvailableValue {
return true
}
delete(oldBaseMap, key)
}
return len(oldBaseMap) > 0
}
func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string) models.TResourceSpec {
spec := models.TResourceSpec{
SourceKey: resourceKey(res.Resource.Type, res.Resource.Name, tag),
Type: res.Resource.Type,
Name: res.Resource.Name,
Tag: tag,
TotalCount: int64(res.Resource.Total.Value),
AvailableCount: int64(res.Resource.Available.Value),
ClusterId: ClusterId,
CreateTime: time.Now(),
UpdateTime: time.Now(),
ChangeType: ChangeTypeNormal,
}
for _, br := range res.BaseResources {
spec.BaseResourceSpecs = append(spec.BaseResourceSpecs, models.TBaseResourceSpec{
Type: br.Type,
Name: br.Name,
TotalValue: br.Total.Value,
TotalUnit: br.Total.Unit,
AvailableValue: br.Available.Value,
AvailableUnit: br.Available.Unit,
CreateTime: time.Now(),
UpdateTime: time.Now(),
})
}
return spec
}