Merge pull request 'updated loadpriority' (#370) from tzwang/pcm-coordinator:master into master

This commit is contained in:
tzwang 2024-12-18 18:03:33 +08:00
commit 7d2d42bf24
4 changed files with 89 additions and 21 deletions

View File

@ -1,7 +1,12 @@
package strategy
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
)
type LoadPriority struct {
replicas int32
Replicas int32
Clusters []*CLusterLoad
}
@ -11,6 +16,31 @@ type CLusterLoad struct {
TaskPredictedNum int64
}
func NewLoadPriority() *LoadPriority {
return &LoadPriority{}
func NewLoadPriority(replicas int32, resources []*collector.ResourceSpec) *LoadPriority {
var clusters []*CLusterLoad
for _, resource := range resources {
if resource.ClusterId == "" {
continue
}
cluster := &CLusterLoad{
ClusterId: resource.ClusterId,
}
for _, res := range resource.Resources {
r, ok := res.(*collector.Usage)
if !ok {
continue
}
switch r.Type {
case storeLink.RUNNINGTASK:
num, ok := r.Total.Value.(int64)
if !ok {
continue
}
cluster.TaskRunningNum = num
}
}
clusters = append(clusters, cluster)
}
return &LoadPriority{Replicas: replicas, Clusters: clusters}
}

View File

@ -346,6 +346,15 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
}
}()
go func() {
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: 1},
}
ch <- rate
}()
go func() {
wg.Wait()
close(ch)

View File

@ -1005,12 +1005,12 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype
func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
var wg sync.WaitGroup
wg.Add(4)
var uwg sync.WaitGroup
wg.Add(2)
uwg.Add(4)
var ch = make(chan *collector.Usage, 2)
var qCh = make(chan *collector.Usage, 2)
var uCh = make(chan *collector.Usage)
defer close(uCh)
resUsage := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(s.participantId, 10),
@ -1041,12 +1041,15 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
queChargeRate, _ := strconv.ParseFloat(data.QueChargeRate, 64)
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate},
}
go func() {
defer uwg.Done()
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate},
}
uCh <- rate
uCh <- rate
}()
var freeNodes int64
var cpuPerNode int64
@ -1080,8 +1083,32 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
if err != nil {
return
}
run := &collector.Usage{}
run.Type = strings.ToUpper(RUNNINGTASK)
if len(jobList.Jobs) == 0 {
go func() {
defer uwg.Done()
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: 0,
}
uCh <- run
}()
return
} else {
go func() {
defer uwg.Done()
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: len(jobList.Jobs),
}
uCh <- run
}()
}
var cpureqed atomic.Int64
@ -1091,19 +1118,18 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
jwg.Add(1)
job := j
go func() {
defer jwg.Done()
h := http.Request{}
jreq := &hpcAC.JobDetailReq{
JobId: job.JobId,
}
detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq)
if err != nil || detail.Data == nil {
jwg.Done()
return
}
cpureqed.Add(int64(detail.Data.ProcNumReq))
dcureqed.Add(int64(detail.Data.DcuNumReq))
jwg.Done()
}()
}
jwg.Wait()
@ -1139,7 +1165,7 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
// 查询用户共享存储配额及使用量
go func() {
defer wg.Done()
defer uwg.Done()
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
@ -1167,7 +1193,7 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
// 查询用户信息
go func() {
defer wg.Done()
defer uwg.Done()
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
@ -1187,11 +1213,16 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
}()
go func() {
for v := range uCh {
resources = append(resources, v)
}
uwg.Wait()
close(uCh)
}()
for v := range uCh {
resources = append(resources, v)
}
wg.Wait()
if len(qCh) == 0 {
for v := range ch {
v.Available = v.Total
@ -1203,8 +1234,6 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS
}
}
wg.Wait()
resUsage.Resources = resources
return resUsage, nil

View File

@ -73,7 +73,7 @@ const (
DEPLOY_INSTANCE_PREFIEX = "infer"
BALANCE = "balance"
RATE = "rate"
PERHOUR = "rmb-per-hour"
PERHOUR = "per-hour"
NUMBER = "number"
KILOBYTE = "kb"
CPUCORE = "core"