pcm-coordinator/pkg/tracker/tracker.go

340 lines
11 KiB
Go

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package tracker
import (
"context"
"github.com/prometheus/alertmanager/api/v2/client"
"github.com/prometheus/alertmanager/cli"
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"net/url"
"strconv"
"strings"
"sync"
"time"
)
var (
ClusterCpuUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_cpu_utilisation",
Help: "Cluster CPU Utilisation Rate.",
}, []string{"cluster_name", "adapter_id"})
ClusterCpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_cpu_avail",
Help: "Cluster CPU Available.",
}, []string{"cluster_name", "adapter_id"})
ClusterCpuTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_cpu_total",
Help: "Cluster CPU Total.",
}, []string{"cluster_name", "adapter_id"})
ClusterMemoryUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_memory_utilisation",
Help: "Cluster Memory Utilisation Rate.",
}, []string{"cluster_name", "adapter_id"})
ClusterMemoryAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_memory_avail",
Help: "Cluster Memory Available.",
}, []string{"cluster_name", "adapter_id"})
ClusterMemoryTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_memory_total",
Help: "Cluster Memory Total.",
}, []string{"cluster_name", "adapter_id"})
ClusterDiskUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_disk_utilisation",
Help: "Cluster Disk Utilisation Rate.",
}, []string{"cluster_name", "adapter_id"})
ClusterDiskAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_disk_avail",
Help: "Cluster Disk Available.",
}, []string{"cluster_name", "adapter_id"})
ClusterDiskTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_disk_total",
Help: "Cluster Disk Total.",
}, []string{"cluster_name", "adapter_id"})
ClusterPodUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_pod_utilisation",
Help: "Cluster Pod Utilisation.",
}, []string{"cluster_name", "adapter_id"})
ClusterPodCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_pod_count",
Help: "Cluster Pod Count.",
}, []string{"cluster_name", "adapter_id"})
ClusterPodTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_pod_total",
Help: "Cluster Pod total.",
}, []string{"cluster_name", "adapter_id"})
ClusterCpuCoreHoursGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_cpu_core_hours",
Help: "Cluster Cpu Core Hours.",
}, []string{"cluster_name", "adapter_id"})
ClusterCardsAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_cards_avail",
Help: "Cluster Cards Available.",
}, []string{"cluster_name", "adapter_id"})
ClusterGpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_gpu_avail",
Help: "Cluster Gpu Available.",
}, []string{"cluster_name", "adapter_id"})
metrics = []prometheus.Collector{
ClusterCpuUtilisationGauge,
ClusterCpuAvailGauge,
ClusterCpuTotalGauge,
ClusterMemoryUtilisationGauge,
ClusterMemoryAvailGauge,
ClusterMemoryTotalGauge,
ClusterDiskUtilisationGauge,
ClusterDiskAvailGauge,
ClusterDiskTotalGauge,
ClusterPodUtilisationGauge,
ClusterPodCountGauge,
ClusterPodTotalGauge,
}
)
type ClusterLoadRecord struct {
AdapterId int64 `json:"adapterId,optional"`
ClusterName string `json:"clusterName,optional"`
CpuAvail float64 `json:"cpuAvail,optional"`
CpuTotal float64 `json:"cpuTotal,optional"`
CpuUtilisation float64 `json:"cpuUtilisation,optional"`
MemoryAvail float64 `json:"memoryAvail,optional"`
MemoryUtilisation float64 `json:"memoryUtilisation,optional"`
MemoryTotal float64 `json:"memoryTotal,optional"`
DiskAvail float64 `json:"diskAvail,optional"`
DiskTotal float64 `json:"diskTotal,optional"`
DiskUtilisation float64 `json:"diskUtilisation,optional"`
PodsUtilisation float64 `json:"podsUtilisation,optional"`
PodsCount int64 `json:"podsCount,optional"`
PodsTotal int64 `json:"podsTotal,optional"`
}
func init() {
prometheus.MustRegister(metrics...)
}
type Prometheus struct {
prometheus Interface
client v1.API
}
// NewPrometheus 初始化Prometheus客户端
func NewPrometheus(address string) (Prometheus, error) {
cfg := api.Config{
Address: address,
}
promClient, err := api.NewClient(cfg)
return Prometheus{client: v1.NewAPI(promClient)}, err
}
func NewAlertClient(address string) *client.AlertmanagerAPI {
alertManagerClient := cli.NewAlertmanagerClient(&url.URL{Host: address})
return alertManagerClient
}
func ParseTime(timestamp string) (time.Time, error) {
// Parse time params
startInt, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return time.Now(), err
}
return time.Unix(startInt, 0), nil
}
func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric {
var res []Metric
var mtx sync.Mutex
var wg sync.WaitGroup
opts := NewQueryOptions()
o.Apply(opts)
for _, metric := range metrics {
wg.Add(1)
go func(metric string) {
parsedResp := Metric{MetricName: metric}
startTimestamp, err := ParseTime(start)
if err != nil {
return
}
endTimestamp, err := ParseTime(end)
if err != nil {
return
}
timeRange := v1.Range{
Start: startTimestamp,
End: endTimestamp,
Step: step,
}
p.client.Rules(context.Background())
value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
}
mtx.Lock()
res = append(res, parsedResp)
mtx.Unlock()
wg.Done()
}(metric)
}
wg.Wait()
return res
}
func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
res := MetricData{MetricType: MetricTypeMatrix}
data, _ := value.(model.Matrix)
for _, v := range data {
if metricFilter != nil && !metricFilter(v.Metric) {
continue
}
mv := MetricValue{
Metadata: make(map[string]string),
}
for k, v := range v.Metric {
mv.Metadata[string(k)] = string(v)
}
for _, k := range v.Values {
mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)})
}
res.MetricValues = append(res.MetricValues, mv)
}
return res
}
func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
var res []Metric
var mtx sync.Mutex
var wg sync.WaitGroup
opts := NewQueryOptions()
o.Apply(opts)
for _, metric := range metrics {
wg.Add(1)
go func(metric string) {
parsedResp := Metric{MetricName: metric}
value, _, err := p.client.Query(context.Background(), makeExpr(metric, *opts), ts)
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryResp(value, genMetricFilter(o))
}
mtx.Lock()
res = append(res, parsedResp)
mtx.Unlock()
wg.Done()
}(metric)
}
wg.Wait()
return res
}
func parseQueryResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
res := MetricData{MetricType: MetricTypeVector}
data, _ := value.(model.Vector)
for _, v := range data {
if metricFilter != nil && !metricFilter(v.Metric) {
continue
}
mv := MetricValue{
Metadata: make(map[string]string),
}
for k, v := range v.Metric {
mv.Metadata[string(k)] = string(v)
}
mv.Sample = &Point{float64(v.Timestamp) / 1000, float64(v.Value)}
res.MetricValues = append(res.MetricValues, mv)
}
return res
}
func genMetricFilter(o QueryOption) func(metric model.Metric) bool {
if o != nil {
if po, ok := o.(PodOption); ok {
if po.NamespacedResourcesFilter != "" {
namespacedPodsMap := make(map[string]struct{})
for _, s := range strings.Split(po.NamespacedResourcesFilter, "|") {
namespacedPodsMap[s] = struct{}{}
}
return func(metric model.Metric) bool {
if len(metric) == 0 {
return false
}
_, ok := namespacedPodsMap[string(metric["namespace"])+"/"+string(metric["pod"])]
return ok
}
}
}
}
return func(metric model.Metric) bool {
return true
}
}
func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) {
opts := NewQueryOptions()
o.Apply(opts)
value, _, err := p.client.Query(context.Background(), makeExpr(expr, *opts), time.Now())
if err != nil {
return nil, err
}
return value, nil
}
func SyncClusterLoad(record ClusterLoadRecord) {
ClusterCpuUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuUtilisation)
ClusterCpuAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuAvail)
ClusterCpuTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuTotal)
ClusterMemoryUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryUtilisation)
ClusterMemoryAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryAvail)
ClusterMemoryTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryTotal)
ClusterDiskUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskUtilisation)
ClusterDiskAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskAvail)
ClusterDiskTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskTotal)
ClusterPodUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.PodsUtilisation)
ClusterPodCountGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsCount))
ClusterPodTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsTotal))
}