pcm-coordinator/api/internal/logic/ai/getcenteroverviewlogic.go

162 lines
4.1 KiB
Go

package ai
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"strconv"
"sync"
"time"
)
type GetCenterOverviewLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetCenterOverviewLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetCenterOverviewLogic {
return &GetCenterOverviewLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverviewResp, err error) {
resp = &types.CenterOverviewResp{}
var mu sync.RWMutex
ch := make(chan struct{})
var centerNum int32
var taskNum int32
var cardNum int32
var totalTops float64
adapterList, err := l.svcCtx.Scheduler.AiStorages.GetAdaptersByType("1")
if err != nil {
return nil, err
}
centerNum = int32(len(adapterList))
resp.CenterNum = centerNum
go l.updateClusterResource(&mu, ch, adapterList)
for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)
if err != nil {
continue
}
taskNum += int32(len(taskList))
}
resp.TaskNum = taskNum
for _, adapter := range adapterList {
clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(cluster.Id)
mu.RUnlock()
if err != nil {
continue
}
cardNum += int32(clusterResource.CardTotal)
totalTops += clusterResource.CardTopsTotal
}
}
resp.CardNum = cardNum
resp.PowerInTops = totalTops
select {
case _ = <-ch:
return resp, nil
case <-time.After(1 * time.Second):
return resp, nil
}
}
func (l *GetCenterOverviewLogic) updateClusterResource(mu *sync.RWMutex, ch chan<- struct{}, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {
clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id)
if err != nil {
continue
}
for _, cluster := range clusters.List {
c := cluster
mu.RLock()
clusterResource, err := l.svcCtx.Scheduler.AiStorages.GetClusterResourcesById(c.Id)
mu.RUnlock()
if err != nil {
continue
}
wg.Add(1)
go func() {
_, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id]
if !ok {
wg.Done()
return
}
stat, err := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(l.ctx)
if err != nil {
wg.Done()
return
}
if stat == nil {
wg.Done()
return
}
clusterType, err := strconv.ParseInt(adapter.Type, 10, 64)
if err != nil {
wg.Done()
return
}
var cardTotal int64
var topsTotal float64
for _, card := range stat.CardsAvail {
cardTotal += int64(card.CardNum)
topsTotal += card.TOpsAtFp16 * float64(card.CardNum)
}
mu.Lock()
if (models.TClusterResource{} == *clusterResource) {
err = l.svcCtx.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal),
stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal)
if err != nil {
mu.Unlock()
wg.Done()
return
}
} else {
clusterResource.CardTotal = cardTotal
clusterResource.CardTopsTotal = topsTotal
clusterResource.CpuAvail = float64(stat.CpuCoreAvail)
clusterResource.CpuTotal = float64(stat.CpuCoreTotal)
clusterResource.MemAvail = stat.MemAvail
clusterResource.MemTotal = stat.MemTotal
clusterResource.DiskAvail = stat.DiskAvail
clusterResource.DiskTotal = stat.DiskTotal
err := l.svcCtx.Scheduler.AiStorages.UpdateClusterResources(clusterResource)
if err != nil {
mu.Unlock()
wg.Done()
return
}
}
mu.Unlock()
wg.Done()
}()
}
}
wg.Wait()
ch <- struct{}{}
}