forked from JointCloud/pcm-coordinator
340 lines
11 KiB
Go
340 lines
11 KiB
Go
/*
|
|
|
|
Copyright (c) [2023] [pcm]
|
|
[pcm-coordinator] is licensed under Mulan PSL v2.
|
|
You can use this software according to the terms and conditions of the Mulan PSL v2.
|
|
You may obtain a copy of Mulan PSL v2 at:
|
|
http://license.coscl.org.cn/MulanPSL2
|
|
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
See the Mulan PSL v2 for more details.
|
|
|
|
*/
|
|
|
|
package tracker
|
|
|
|
import (
|
|
"context"
|
|
"github.com/prometheus/alertmanager/api/v2/client"
|
|
"github.com/prometheus/alertmanager/cli"
|
|
"github.com/prometheus/client_golang/api"
|
|
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var (
|
|
ClusterCpuUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_cpu_utilisation",
|
|
Help: "Cluster CPU Utilisation Rate.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterCpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_cpu_avail",
|
|
Help: "Cluster CPU Available.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterCpuTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_cpu_total",
|
|
Help: "Cluster CPU Total.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterMemoryUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_memory_utilisation",
|
|
Help: "Cluster Memory Utilisation Rate.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterMemoryAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_memory_avail",
|
|
Help: "Cluster Memory Available.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterMemoryTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_memory_total",
|
|
Help: "Cluster Memory Total.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterDiskUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_disk_utilisation",
|
|
Help: "Cluster Disk Utilisation Rate.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterDiskAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_disk_avail",
|
|
Help: "Cluster Disk Available.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterDiskTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_disk_total",
|
|
Help: "Cluster Disk Total.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterPodUtilisationGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_pod_utilisation",
|
|
Help: "Cluster Pod Utilisation.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterPodCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_pod_count",
|
|
Help: "Cluster Pod Count.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterPodTotalGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_pod_total",
|
|
Help: "Cluster Pod total.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterCpuCoreHoursGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_cpu_core_hours",
|
|
Help: "Cluster Cpu Core Hours.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterCardsAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_cards_avail",
|
|
Help: "Cluster Cards Available.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
ClusterGpuAvailGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
|
Name: "cluster_gpu_avail",
|
|
Help: "Cluster Gpu Available.",
|
|
}, []string{"cluster_name", "adapter_id"})
|
|
|
|
metrics = []prometheus.Collector{
|
|
ClusterCpuUtilisationGauge,
|
|
ClusterCpuAvailGauge,
|
|
ClusterCpuTotalGauge,
|
|
ClusterMemoryUtilisationGauge,
|
|
ClusterMemoryAvailGauge,
|
|
ClusterMemoryTotalGauge,
|
|
ClusterDiskUtilisationGauge,
|
|
ClusterDiskAvailGauge,
|
|
ClusterDiskTotalGauge,
|
|
ClusterPodUtilisationGauge,
|
|
ClusterPodCountGauge,
|
|
ClusterPodTotalGauge,
|
|
}
|
|
)
|
|
|
|
type ClusterLoadRecord struct {
|
|
AdapterId int64 `json:"adapterId,optional"`
|
|
ClusterName string `json:"clusterName,optional"`
|
|
CpuAvail float64 `json:"cpuAvail,optional"`
|
|
CpuTotal float64 `json:"cpuTotal,optional"`
|
|
CpuUtilisation float64 `json:"cpuUtilisation,optional"`
|
|
MemoryAvail float64 `json:"memoryAvail,optional"`
|
|
MemoryUtilisation float64 `json:"memoryUtilisation,optional"`
|
|
MemoryTotal float64 `json:"memoryTotal,optional"`
|
|
DiskAvail float64 `json:"diskAvail,optional"`
|
|
DiskTotal float64 `json:"diskTotal,optional"`
|
|
DiskUtilisation float64 `json:"diskUtilisation,optional"`
|
|
PodsUtilisation float64 `json:"podsUtilisation,optional"`
|
|
PodsCount int64 `json:"podsCount,optional"`
|
|
PodsTotal int64 `json:"podsTotal,optional"`
|
|
}
|
|
|
|
func init() {
|
|
prometheus.MustRegister(metrics...)
|
|
}
|
|
|
|
type Prometheus struct {
|
|
prometheus Interface
|
|
client v1.API
|
|
}
|
|
|
|
// NewPrometheus 初始化Prometheus客户端
|
|
func NewPrometheus(address string) (Prometheus, error) {
|
|
cfg := api.Config{
|
|
Address: address,
|
|
}
|
|
|
|
promClient, err := api.NewClient(cfg)
|
|
return Prometheus{client: v1.NewAPI(promClient)}, err
|
|
}
|
|
|
|
func NewAlertClient(address string) *client.AlertmanagerAPI {
|
|
alertManagerClient := cli.NewAlertmanagerClient(&url.URL{Host: address})
|
|
return alertManagerClient
|
|
}
|
|
|
|
func ParseTime(timestamp string) (time.Time, error) {
|
|
// Parse time params
|
|
startInt, err := strconv.ParseInt(timestamp, 10, 64)
|
|
if err != nil {
|
|
return time.Now(), err
|
|
}
|
|
return time.Unix(startInt, 0), nil
|
|
|
|
}
|
|
|
|
func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric {
|
|
var res []Metric
|
|
var mtx sync.Mutex
|
|
var wg sync.WaitGroup
|
|
opts := NewQueryOptions()
|
|
o.Apply(opts)
|
|
|
|
for _, metric := range metrics {
|
|
wg.Add(1)
|
|
go func(metric string) {
|
|
parsedResp := Metric{MetricName: metric}
|
|
startTimestamp, err := ParseTime(start)
|
|
if err != nil {
|
|
return
|
|
}
|
|
endTimestamp, err := ParseTime(end)
|
|
if err != nil {
|
|
return
|
|
}
|
|
timeRange := v1.Range{
|
|
Start: startTimestamp,
|
|
End: endTimestamp,
|
|
Step: step,
|
|
}
|
|
p.client.Rules(context.Background())
|
|
value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
|
|
if err != nil {
|
|
parsedResp.Error = err.Error()
|
|
} else {
|
|
parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
|
|
}
|
|
|
|
mtx.Lock()
|
|
res = append(res, parsedResp)
|
|
mtx.Unlock()
|
|
|
|
wg.Done()
|
|
}(metric)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return res
|
|
}
|
|
|
|
func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
|
|
res := MetricData{MetricType: MetricTypeMatrix}
|
|
|
|
data, _ := value.(model.Matrix)
|
|
|
|
for _, v := range data {
|
|
if metricFilter != nil && !metricFilter(v.Metric) {
|
|
continue
|
|
}
|
|
mv := MetricValue{
|
|
Metadata: make(map[string]string),
|
|
}
|
|
|
|
for k, v := range v.Metric {
|
|
mv.Metadata[string(k)] = string(v)
|
|
}
|
|
|
|
for _, k := range v.Values {
|
|
mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)})
|
|
}
|
|
|
|
res.MetricValues = append(res.MetricValues, mv)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
|
|
var res []Metric
|
|
var mtx sync.Mutex
|
|
var wg sync.WaitGroup
|
|
|
|
opts := NewQueryOptions()
|
|
o.Apply(opts)
|
|
|
|
for _, metric := range metrics {
|
|
wg.Add(1)
|
|
go func(metric string) {
|
|
parsedResp := Metric{MetricName: metric}
|
|
|
|
value, _, err := p.client.Query(context.Background(), makeExpr(metric, *opts), ts)
|
|
if err != nil {
|
|
parsedResp.Error = err.Error()
|
|
} else {
|
|
parsedResp.MetricData = parseQueryResp(value, genMetricFilter(o))
|
|
}
|
|
|
|
mtx.Lock()
|
|
res = append(res, parsedResp)
|
|
mtx.Unlock()
|
|
|
|
wg.Done()
|
|
}(metric)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
return res
|
|
}
|
|
func parseQueryResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
|
|
res := MetricData{MetricType: MetricTypeVector}
|
|
|
|
data, _ := value.(model.Vector)
|
|
|
|
for _, v := range data {
|
|
if metricFilter != nil && !metricFilter(v.Metric) {
|
|
continue
|
|
}
|
|
mv := MetricValue{
|
|
Metadata: make(map[string]string),
|
|
}
|
|
|
|
for k, v := range v.Metric {
|
|
mv.Metadata[string(k)] = string(v)
|
|
}
|
|
|
|
mv.Sample = &Point{float64(v.Timestamp) / 1000, float64(v.Value)}
|
|
|
|
res.MetricValues = append(res.MetricValues, mv)
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func genMetricFilter(o QueryOption) func(metric model.Metric) bool {
|
|
if o != nil {
|
|
if po, ok := o.(PodOption); ok {
|
|
if po.NamespacedResourcesFilter != "" {
|
|
namespacedPodsMap := make(map[string]struct{})
|
|
for _, s := range strings.Split(po.NamespacedResourcesFilter, "|") {
|
|
namespacedPodsMap[s] = struct{}{}
|
|
}
|
|
return func(metric model.Metric) bool {
|
|
if len(metric) == 0 {
|
|
return false
|
|
}
|
|
_, ok := namespacedPodsMap[string(metric["namespace"])+"/"+string(metric["pod"])]
|
|
return ok
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return func(metric model.Metric) bool {
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (p Prometheus) GetRawData(expr string, o QueryOption) (model.Value, error) {
|
|
opts := NewQueryOptions()
|
|
o.Apply(opts)
|
|
value, _, err := p.client.Query(context.Background(), makeExpr(expr, *opts), time.Now())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return value, nil
|
|
}
|
|
|
|
func SyncClusterLoad(record ClusterLoadRecord) {
|
|
ClusterCpuUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuUtilisation)
|
|
ClusterCpuAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuAvail)
|
|
ClusterCpuTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.CpuTotal)
|
|
|
|
ClusterMemoryUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryUtilisation)
|
|
ClusterMemoryAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryAvail)
|
|
ClusterMemoryTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.MemoryTotal)
|
|
|
|
ClusterDiskUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskUtilisation)
|
|
ClusterDiskAvailGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskAvail)
|
|
ClusterDiskTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.DiskTotal)
|
|
|
|
ClusterPodUtilisationGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(record.PodsUtilisation)
|
|
ClusterPodCountGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsCount))
|
|
ClusterPodTotalGauge.WithLabelValues(record.ClusterName, strconv.FormatInt(record.AdapterId, 10)).Set(float64(record.PodsTotal))
|
|
}
|