pcm-coordinator/internal/logic/core/syncresourcespeclogic.go

137 lines
4.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package core
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gorm.io/gorm"
"strconv"
"time"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type SyncResourceSpecLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewSyncResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncResourceSpecLogic {
return &SyncResourceSpecLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (resp *types.CommonResp, err error) {
var mainSpec models.TResourceSpec
if err := l.svcCtx.DbEngin.Where("id = ? AND deleted_at IS NULL", req.Id).
First(&mainSpec).
Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, errors.Errorf("资源规格不存在 (ID: %s)", req.Id)
}
return nil, errors.Wrapf(err, "查询资源规格失败 (ID: %s)", req.Id)
}
// 获取集群资源数据
startTime := time.Now()
compareLogic := NewCompareResourceSpecLogic(l.ctx, l.svcCtx)
apiResources, err := compareLogic.FetchClusterResources(strconv.FormatInt(mainSpec.ClusterId, 10), mainSpec.Tag)
log.Debug().Msgf("调用获取ai训练资源接口耗时 %v", time.Since(startTime))
if err != nil {
log.Error().Err(err).Msg("同步集群资源失败")
return nil, fmt.Errorf("同步集群资源失败,请稍后重试")
}
for _, response := range apiResources {
// 转换API响应到数据库模型
_, apiSpecs, err := compareLogic.processAPIResponse(response, req.UserId)
if err != nil {
return nil, err
}
// 同步资源到数据库
for _, spec := range apiSpecs {
if spec.SourceKey == mainSpec.SourceKey {
err := l.updateResource(&mainSpec, spec)
if err != nil {
return nil, err
}
}
}
}
return nil, nil
}
func (l *SyncResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec) error {
return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
updates := map[string]interface{}{
"type": newSpec.Type,
"total_count": newSpec.TotalCount,
"available_count": newSpec.AvailableCount,
"change_type": ChangeTypeNormal,
"update_time": time.Now(),
}
if err := tx.Model(existing).Updates(updates).Error; err != nil {
return fmt.Errorf("failed to update resource: %w", err)
}
return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs, newSpec.Tag)
})
}
func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec, tag string) error {
// 处理基础资源更新
var existingResources []models.TBaseResourceSpec
if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
return fmt.Errorf("failed to query base resources: %w", err)
}
existingMap := make(map[string]models.TBaseResourceSpec)
for _, r := range existingResources {
key := resourceKey(r.Type, r.Name, tag)
existingMap[key] = r
}
// 处理更新和新增
for i, newRes := range newResources {
newRes.ResourceSpecId = specID
key := resourceKey(newRes.Type, newRes.Name, tag)
if existing, exists := existingMap[key]; exists {
newRes.Id = existing.Id
newRes.CreateTime = existing.CreateTime
if err := tx.Save(&newRes).Error; err != nil {
return fmt.Errorf("failed to update base resource: %w", err)
}
} else {
if err := tx.Create(&newRes).Error; err != nil {
return fmt.Errorf("failed to create base resource: %w", err)
}
}
newResources[i] = newRes
}
// 处理删除
currentIDs := make(map[int64]struct{})
for _, r := range newResources {
currentIDs[r.Id] = struct{}{}
}
for _, existing := range existingResources {
if _, exists := currentIDs[existing.Id]; !exists {
if err := tx.Delete(&existing).Error; err != nil {
return fmt.Errorf("failed to delete base resource: %w", err)
}
}
}
return nil
}