227 lines
5.0 KiB
Go
227 lines
5.0 KiB
Go
package schedule
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
ADAPTERID = "1777144940459986944" // 异构适配器id
|
|
QUERY_TRAIN_RESOURCES = "train_resources"
|
|
QUERY_INFERENCE_RESOURCES = "inference_resources"
|
|
)
|
|
|
|
type QueryResourcesLogic struct {
|
|
logx.Logger
|
|
ctx context.Context
|
|
svcCtx *svc.ServiceContext
|
|
}
|
|
|
|
func NewQueryResourcesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryResourcesLogic {
|
|
return &QueryResourcesLogic{
|
|
Logger: logx.WithContext(ctx),
|
|
ctx: ctx,
|
|
svcCtx: svcCtx,
|
|
}
|
|
}
|
|
|
|
func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp *types.QueryResourcesResp, err error) {
|
|
resp = &types.QueryResourcesResp{}
|
|
|
|
if len(req.ClusterIDs) == 0 {
|
|
cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var resources interface{}
|
|
switch req.Type {
|
|
case "Train":
|
|
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES]
|
|
case "Inference":
|
|
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_INFERENCE_RESOURCES]
|
|
default:
|
|
resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES]
|
|
}
|
|
|
|
specs, ok := resources.([]*collector.ResourceSpec)
|
|
if ok {
|
|
results := handleEmptyResourceUsage(cs.List, specs)
|
|
resp.Data = results
|
|
return resp, nil
|
|
}
|
|
|
|
rus, err := l.QueryResourcesByClusterId(cs.List, req.Type)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results := handleEmptyResourceUsage(cs.List, rus)
|
|
resp.Data = results
|
|
|
|
} else {
|
|
var clusters []types.ClusterInfo
|
|
for _, id := range req.ClusterIDs {
|
|
cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, *cluster)
|
|
}
|
|
|
|
if len(clusters) == 0 {
|
|
return nil, errors.New("no clusters found ")
|
|
}
|
|
|
|
rus, err := l.QueryResourcesByClusterId(clusters, req.Type)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results := handleEmptyResourceUsage(clusters, rus)
|
|
resp.Data = results
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.ClusterInfo, resrcType string) ([]*collector.ResourceSpec, error) {
|
|
var clusters []types.ClusterInfo
|
|
if len(clusterinfos) == 0 {
|
|
cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = cs.List
|
|
} else {
|
|
clusters = clusterinfos
|
|
}
|
|
|
|
var ulist []*collector.ResourceSpec
|
|
var ch = make(chan *collector.ResourceSpec, len(clusters))
|
|
|
|
var wg sync.WaitGroup
|
|
for _, cluster := range clusters {
|
|
wg.Add(1)
|
|
c := cluster
|
|
go func() {
|
|
defer wg.Done()
|
|
done := make(chan bool)
|
|
var u *collector.ResourceSpec
|
|
var err error
|
|
go func() {
|
|
|
|
col, found := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(c.AdapterId, 10)][c.Id]
|
|
if !found {
|
|
done <- true
|
|
return
|
|
}
|
|
|
|
u, err = col.GetResourceSpecs(l.ctx, resrcType)
|
|
if err != nil {
|
|
logx.Error(err)
|
|
done <- true
|
|
return
|
|
}
|
|
|
|
done <- true
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
if u != nil {
|
|
ch <- u
|
|
}
|
|
case <-time.After(10 * time.Second):
|
|
return
|
|
}
|
|
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
close(ch)
|
|
|
|
for v := range ch {
|
|
ulist = append(ulist, v)
|
|
}
|
|
|
|
return ulist, nil
|
|
}
|
|
|
|
func handleEmptyResourceUsage(list []types.ClusterInfo, ulist []*collector.ResourceSpec) []*collector.ResourceSpec {
|
|
var rus []*collector.ResourceSpec
|
|
m := make(map[string]interface{})
|
|
for _, u := range ulist {
|
|
if u == nil {
|
|
continue
|
|
}
|
|
m[u.ClusterId] = u
|
|
}
|
|
|
|
for _, l := range list {
|
|
s, ok := m[l.Id]
|
|
if !ok {
|
|
ru := &collector.ResourceSpec{
|
|
ClusterId: l.Id,
|
|
Resources: nil,
|
|
Msg: "resources unavailable, please retry later",
|
|
}
|
|
rus = append(rus, ru)
|
|
} else {
|
|
if s == nil {
|
|
ru := &collector.ResourceSpec{
|
|
ClusterId: l.Id,
|
|
Resources: nil,
|
|
Msg: "resources unavailable, please retry later",
|
|
}
|
|
rus = append(rus, ru)
|
|
} else {
|
|
r, ok := s.(*collector.ResourceSpec)
|
|
if ok {
|
|
if r.Resources == nil || len(r.Resources) == 0 {
|
|
ru := &collector.ResourceSpec{
|
|
ClusterId: r.ClusterId,
|
|
Resources: nil,
|
|
Msg: "resources unavailable, please retry later",
|
|
}
|
|
rus = append(rus, ru)
|
|
} else {
|
|
// add cluster type
|
|
t, ok := storeLink.ClusterTypeMap[strings.Title(l.Name)]
|
|
if ok {
|
|
r.ClusterType = t
|
|
}
|
|
rus = append(rus, r)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return rus
|
|
}
|
|
|
|
func checkCachingCondition(clusters []types.ClusterInfo, specs []*collector.ResourceSpec) bool {
|
|
var count int
|
|
for _, spec := range specs {
|
|
if spec.Resources != nil {
|
|
count++
|
|
}
|
|
}
|
|
|
|
if count == len(clusters) {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|