pcm-coordinator/api/internal/scheduler/schedulers/cloudScheduler.go

190 lines
6.3 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package schedulers
import (
"context"
"errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy/param"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"math"
"time"
)
type CloudScheduler struct {
yamlString string
task *response.TaskInfo
*scheduler.Scheduler
option *option.CloudOption
ctx context.Context
dbEngin *gorm.DB
promClient tracker.Prometheus
svcCtx *svc.ServiceContext
}
type CloudResult struct {
TaskId string
ClusterId string
ClusterName string
Strategy string
Replica int32
Msg string
}
func NewCloudScheduler(ctx context.Context, val string, scheduler *scheduler.Scheduler, option *option.CloudOption, dbEngin *gorm.DB, promClient tracker.Prometheus) (*CloudScheduler, error) {
return &CloudScheduler{ctx: ctx, yamlString: val, Scheduler: scheduler, option: option, dbEngin: dbEngin, promClient: promClient}, nil
}
func (as *CloudScheduler) GetNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
c := cloud.TaskCloudModel{
AdapterId: uint(participantId),
TaskId: uint(task.TaskId),
Status: "Pending",
YamlString: as.yamlString,
}
utils.Convert(task.Metadata, &c)
return c, nil
}
func (as *CloudScheduler) PickOptimalStrategy() (strategy.Strategy, error) {
if len(as.option.ClusterIds) == 1 {
return &strategy.SingleAssignment{Cluster: &strategy.AssignedCluster{ClusterId: as.option.ClusterIds[0], Replicas: 1}}, nil
}
resources, err := as.findClustersWithResources()
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, errors.New("no cluster has resources")
}
if len(resources) == 1 {
var cluster strategy.AssignedCluster
cluster.ClusterId = resources[0].ClusterId
cluster.Replicas = 1
return &strategy.SingleAssignment{Cluster: &cluster}, nil
}
params := &param.Params{Resources: resources}
switch as.option.Strategy {
case strategy.REPLICATION:
var clusterIds []string
for _, resource := range resources {
clusterIds = append(clusterIds, resource.ClusterId)
}
strategy := strategy.NewReplicationStrategy(clusterIds, as.option.Replica)
return strategy, nil
case strategy.RESOURCES_PRICING:
strategy := strategy.NewPricingStrategy(&param.ResourcePricingParams{Params: params, Replicas: as.option.Replica})
return strategy, nil
case strategy.DYNAMIC_RESOURCES:
strategy := strategy.NewDynamicResourcesStrategy(params.Resources, as.option, 1)
return strategy, nil
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strategy := strategy.NewStaticWeightStrategy(as.option.StaticWeightMap, as.option.Replica)
return strategy, nil
}
return nil, errors.New("no strategy has been chosen")
}
func (as *CloudScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interface{}, error) {
if clusters == nil {
return nil, errors.New("clusters is nil")
}
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
if len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
var results []*CloudResult
for _, cluster := range clusters {
cName := ""
as.dbEngin.Table("t_cluster").Select("name").Where("id=?", cluster.ClusterId).Find(&cName)
cr := CloudResult{
ClusterId: cluster.ClusterId,
ClusterName: cName,
Replica: cluster.Replicas,
}
cr.ClusterId = cluster.ClusterId
cr.Replica = cluster.Replicas
cr.ClusterName = cName
results = append(results, &cr)
}
return results, nil
}
func (as *CloudScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
resp := []*collector.ResourceStats{}
//查询集群资源信息
var rMetrics []tracker.Metric
metrics := []string{"cluster_cpu_utilisation", "cluster_cpu_avail", "cluster_cpu_total", "cluster_memory_total", "cluster_memory_avail", "cluster_memory_utilisation", "cluster_disk_utilisation", "cluster_disk_avail", "cluster_disk_total", "cluster_pod_utilisation"}
var clusterNames []string
as.dbEngin.Table("t_cluster").Select("name").Where("id in ?", as.option.ClusterIds).Find(&clusterNames)
for _, c := range clusterNames {
rMetrics = as.promClient.GetNamedMetrics(metrics, time.Now(), tracker.ClusterOption{ClusterName: c})
r := collector.ResourceStats{}
var cid string
as.dbEngin.Table("t_cluster").Select("id").Where("name = ?", c).Find(&cid)
r.ClusterId = cid
r.Name = c
for _, metric := range rMetrics {
if metric.MetricName == "cluster_cpu_total" {
r.CpuCoreTotal = int64(metric.MetricData.MetricValues[0].Sample.Value())
}
if metric.MetricName == "cluster_cpu_avail" {
cpuAvail := metric.MetricData.MetricValues[0].Sample.Value()
r.CpuCoreAvail = int64(math.Round(cpuAvail))
}
if metric.MetricName == "cluster_memory_total" {
r.MemTotal = metric.MetricData.MetricValues[0].Sample.Value()
}
if metric.MetricName == "cluster_memory_avail" {
r.MemAvail = metric.MetricData.MetricValues[0].Sample.Value()
}
if metric.MetricName == "cluster_disk_total" {
r.DiskTotal = metric.MetricData.MetricValues[0].Sample.Value()
}
if metric.MetricName == "cluster_disk_avail" {
r.DiskAvail = metric.MetricData.MetricValues[0].Sample.Value()
}
}
resp = append(resp, &r)
}
return resp, nil
}