Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhouqunjie 2023-11-22 18:13:08 +08:00
commit 9905536031
17 changed files with 322 additions and 103 deletions

View File

@ -8,6 +8,21 @@ info(
)
/******************find datasetList start*************************/
type ControllerMetricsReq {
ParticipantId int64 `form:"participantId"`
Namespace string `form:"namespace"`
Pods string `form:"pods"`
Steps string `form:"steps"`
Start string `form:"start"`
End string `form:"end"`
}
type ControllerMetricsResp {
Data interface{} `json:"data"`
}
type ApplyReq {
YamlString string `json:"yamlString" copier:"yamlString"`
}

View File

@ -141,6 +141,10 @@ service pcm {
@doc "yaml删除"
@handler deleteYamlHandler
get /cloud/DeleteYaml (ApplyReq) returns (DeleteResp)
@doc "控制器监控"
@handler controllerMetricsHandler
get /cloud/controller/Metrics (ControllerMetricsReq) returns (ControllerMetricsResp)
}
//智算二级接口

View File

@ -2,15 +2,12 @@ NacosConfig:
DataId: pcm-core-api.yaml
Group: DEFAULT_GROUP
ServerConfigs:
# - IpAddr: 127.0.0.1
# Port: 8848
# - IpAddr: 10.101.15.7
# Port: 8848
- IpAddr: 119.45.100.73
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: nacos.jcce.dev
Port: 8848
ClientConfig:
NamespaceId: tzwang
# NamespaceId: test
NamespaceId: test
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir:

View File

@ -24,7 +24,7 @@ func AddCronGroup(svc *svc.ServiceContext) {
SyncParticipantRpc(svc)
})
// 删除三天前的监控信息
svc.Cron.AddFunc("*/5 * * * * ?", func() {
svc.Cron.AddFunc("0 0 0 ? * ? ", func() {
ClearMetricsData(svc)
})

View File

@ -19,6 +19,7 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
)
@ -30,14 +31,22 @@ func SyncParticipantRpc(svc *svc.ServiceContext) {
}
for _, participant := range participants {
// 初始化p端rpc客户端
if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil {
switch participant.Type {
case constants.CLOUD:
// 初始化p端rpc客户端
svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{
Endpoints: []string{participant.RpcAddress},
NonBlock: true,
}))
// 初始化p端prometheus client
promClient, err := tracker.NewPrometheus(participant.MetricsUrl)
if err != nil {
return
}
svc.PromClient[participant.Id] = promClient
}
}
}

View File

@ -0,0 +1,25 @@
package cloud
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/cloud"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
)
func ControllerMetricsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ControllerMetricsReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := cloud.NewControllerMetricsLogic(r.Context(), svcCtx)
resp, err := l.ControllerMetrics(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -162,6 +162,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/cloud/DeleteYaml",
Handler: cloud.DeleteYamlHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/cloud/controller/Metrics",
Handler: cloud.ControllerMetricsHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)

View File

@ -0,0 +1,35 @@
package cloud
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker"
"time"
"github.com/zeromicro/go-zero/core/logx"
)
type ControllerMetricsLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewControllerMetricsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ControllerMetricsLogic {
return &ControllerMetricsLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ControllerMetricsLogic) ControllerMetrics(req *types.ControllerMetricsReq) (resp *types.ControllerMetricsResp, err error) {
resp = &types.ControllerMetricsResp{}
metrics := l.svcCtx.PromClient[req.ParticipantId].GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, req.Start, req.End, 10*time.Minute, tracker.ControllerOption{
PodsName: req.Pods,
Namespace: req.Namespace,
})
resp.Data = metrics
return resp, nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/zeromicro/go-zero/zrpc"
"gitlink.org.cn/jcce-pcm/pcm-ac/hpcacclient"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/config"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gitlink.org.cn/jcce-pcm/pcm-participant-ceph/cephclient"
@ -56,6 +57,7 @@ type ServiceContext struct {
Downloader *s3manager.Downloader
Uploader *s3manager.Uploader
K8sRpc map[int64]kubernetesclient.Kubernetes
PromClient map[int64]tracker.Prometheus
ParticipantRpc participantservice.ParticipantService
}
@ -103,6 +105,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)),
OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)),
K8sRpc: make(map[int64]kubernetesclient.Kubernetes),
PromClient: make(map[int64]tracker.Prometheus),
ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)),
DockerClient: dockerClient,
Downloader: downloader,

View File

@ -3325,6 +3325,19 @@ type ShowNodeDetailsResp struct {
ErrorMsg string `json:"errorMsg,omitempty"`
}
type ControllerMetricsReq struct {
ParticipantId int64 `form:"participantId"`
Namespace string `form:"namespace"`
Pods string `form:"pods"`
Steps string `form:"steps"`
Start string `form:"start"`
End string `form:"end"`
}
type ControllerMetricsResp struct {
Data interface{} `json:"data"`
}
type ApplyReq struct {
YamlString string `json:"yamlString" copier:"yamlString"`
}

View File

@ -17,7 +17,6 @@ package scheduler
import (
"bytes"
"encoding/json"
"errors"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
@ -38,10 +37,15 @@ func NewCloudScheduler() *cloudScheduler {
}
func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error) {
//参数为空返回 nil
if len(providers) == 0 || task == nil {
return nil, errors.New("算法获取参数为空")
}
////参数为空,返回 nil
//if len(providers) == 0 || task == nil {
// return nil, errors.New("算法获取参数为空")
//}
//
////仅有一个provider返回nil
//if len(providers) == 1 {
// return nil, nil
//}
//调度算法
strategy := algo.NewK8sStrategy(task, providers...)

View File

@ -83,6 +83,9 @@ func (s *scheduler) AssignAndSchedule() error {
// ParticipantIds 返回唯一值
if len(s.participantIds) == 1 {
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
}
s.task.ParticipantId = s.participantIds[0]
return nil
}
@ -93,16 +96,21 @@ func (s *scheduler) AssignAndSchedule() error {
return err
}
//集群数量不满足,指定到标签匹配后第一个集群
if len(providerList) < 2 {
if !s.checkIfParticipantAvailable(ParticipantId(s.participantIds[0])) {
return errors.Errorf("集群 %d 不可用", s.participantIds[0])
}
s.task.ParticipantId = s.participantIds[0]
return nil
}
//调度算法
strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...)
if err != nil {
return err
}
if strategy == nil {
s.task.ParticipantId = s.participantIds[0]
return nil
}
//调度结果
err = s.assignReplicasToResult(strategy, providerList)
if err != nil {
@ -131,23 +139,38 @@ func (s *scheduler) SaveToDb() error {
func (s *scheduler) obtainParamsforStrategy() (*algo.Task, []*algo.Provider, error) {
task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin)
// 查询集群是否可用
err := s.checkAvailableParticipants(&providerList)
// 过滤可用集群
err := s.filterAvailableProviders(&providerList)
if err != nil {
return nil, nil, err
}
//可用集群为0
if len(providerList) == 0 {
return nil, nil, errors.New("未能获取可用集群")
}
return task, providerList, nil
}
func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) error {
func (s *scheduler) checkIfParticipantAvailable(id ParticipantId) bool {
workingIds, err := s.getAvailableParticipantIds()
if err != nil {
return false
}
return contains(workingIds, int64(id))
}
func (s *scheduler) getAvailableParticipantIds() ([]int64, error) {
resp, err := s.participantRpc.ListParticipant(context.Background(), nil)
if err != nil {
return err
return nil, err
}
if resp.Code != 200 {
return errors.New("集群列表查询失败")
return nil, errors.New("集群列表查询失败")
}
var workingIds []int64
@ -158,9 +181,19 @@ func (s *scheduler) checkAvailableParticipants(providerList *[]*algo.Provider) e
workingIds = append(workingIds, e.ParticipantId)
}
return workingIds, nil
}
func (s *scheduler) filterAvailableProviders(providerList *[]*algo.Provider) error {
workingIds, err := s.getAvailableParticipantIds()
if err != nil {
return err
}
var tempList []*algo.Provider
for _, provider := range *providerList {
if contains(workingIds, provider.Pid) {
if contains(workingIds, provider.Pid) && contains(s.participantIds, provider.Pid) {
tempList = append(tempList, provider)
}
}

View File

@ -20,7 +20,7 @@ type Interface interface {
//GetMetric(expr string, time time.Time) Metric
//GetMetricOverTime(expr string, start, end time.Time, step time.Duration) Metric
GetNamedMetrics(metrics []string, time time.Time, opt QueryOption) []Metric
//GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt QueryOption) []Metric
GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, opt QueryOption) []Metric
//GetMetadata(namespace string) []Metadata
//GetMetricLabelSet(expr string, start, end time.Time) []map[string]string
//

View File

@ -202,7 +202,7 @@ var promQLTemplates = map[string]string{
"workload_statefulset_unavailable_replicas_ratio": `namespace:statefulset_unavailable_replicas:ratio{$1}`,
// pod
"pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`,
"pod_cpu_usage": `round(sum by (namespace, pod) (irate(container_cpu_usage_seconds_total{job="kubelet", pod!="", image!=""}[5m])) * on (namespace, pod) group_left(owner_kind,owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}, 0.001)`,
"pod_memory_usage": `sum by (namespace, pod) (container_memory_usage_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`,
"pod_memory_usage_wo_cache": `sum by (namespace, pod) (container_memory_working_set_bytes{job="kubelet", pod!="", image!=""}) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`,
"pod_net_bytes_transmitted": `sum by (namespace, pod) (irate(container_network_transmit_bytes_total{pod!="", interface!~"^(cali.+|tunl.+|dummy.+|kube.+|flannel.+|cni.+|docker.+|veth.+|lo.*)", job="kubelet"}[5m])) * on (namespace, pod) group_left(owner_kind, owner_name) kube_pod_owner{$1} * on (namespace, pod) group_left(node) kube_pod_info{$2}`,
@ -274,8 +274,8 @@ func makeExpr(metric string, opts QueryOptions) string {
return makeWorkspaceMetricExpr(tmpl, opts)
case LevelNamespace:
return makeNamespaceMetricExpr(tmpl, opts)
case LevelWorkload:
return makeWorkloadMetricExpr(metric, tmpl, opts)
case LevelController:
return makeControllerMetricExpr(tmpl, opts)
case LevelPod:
return makePodMetricExpr(tmpl, opts)
case LevelContainer:
@ -324,40 +324,20 @@ func makeNamespaceMetricExpr(tmpl string, o QueryOptions) string {
// For monitoring the specific namespaces
// GET /namespaces/{namespace} or
// GET /namespaces
if o.NamespaceName != "" {
namespaceSelector = fmt.Sprintf(`namespace="%s"`, o.NamespaceName)
if o.Namespace != "" {
namespaceSelector = fmt.Sprintf(`namespace="%s"`, o.Namespace)
} else {
namespaceSelector = fmt.Sprintf(`namespace=~"%s"`, o.ResourceFilter)
}
return strings.Replace(tmpl, "$1", namespaceSelector, -1)
}
func makeWorkloadMetricExpr(metric, tmpl string, o QueryOptions) string {
var kindSelector, workloadSelector string
func makeControllerMetricExpr(tmpl string, o QueryOptions) string {
var namespace, podName string
switch o.WorkloadKind {
case "deployment":
o.WorkloadKind = Deployment
case "statefulset":
o.WorkloadKind = StatefulSet
case "daemonset":
o.WorkloadKind = DaemonSet
default:
o.WorkloadKind = ".*"
}
workloadSelector = fmt.Sprintf(`namespace="%s", workload=~"%s:(%s)"`, o.NamespaceName, o.WorkloadKind, o.ResourceFilter)
if strings.Contains(metric, "deployment") {
kindSelector = fmt.Sprintf(`namespace="%s", deployment!="", deployment=~"%s"`, o.NamespaceName, o.ResourceFilter)
}
if strings.Contains(metric, "statefulset") {
kindSelector = fmt.Sprintf(`namespace="%s", statefulset!="", statefulset=~"%s"`, o.NamespaceName, o.ResourceFilter)
}
if strings.Contains(metric, "daemonset") {
kindSelector = fmt.Sprintf(`namespace="%s", daemonset!="", daemonset=~"%s"`, o.NamespaceName, o.ResourceFilter)
}
return strings.NewReplacer("$1", workloadSelector, "$2", kindSelector).Replace(tmpl)
namespace = fmt.Sprintf(`namespace="%s"`, o.Namespace)
podName = fmt.Sprintf(`pod=~"%s"`, o.PodName)
return strings.NewReplacer("$1", namespace, "$2", podName).Replace(tmpl)
}
func makePodMetricExpr(tmpl string, o QueryOptions) string {
@ -365,26 +345,16 @@ func makePodMetricExpr(tmpl string, o QueryOptions) string {
// For monitoriong pods of the specific workload
// GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods
if o.WorkloadName != "" {
switch o.WorkloadKind {
case "deployment":
workloadSelector = fmt.Sprintf(`owner_kind="ReplicaSet", owner_name=~"^%s-[^-]{1,10}$"`, o.WorkloadName)
case "statefulset":
workloadSelector = fmt.Sprintf(`owner_kind="StatefulSet", owner_name="%s"`, o.WorkloadName)
case "daemonset":
workloadSelector = fmt.Sprintf(`owner_kind="DaemonSet", owner_name="%s"`, o.WorkloadName)
}
}
// For monitoring pods in the specific namespace
// GET /namespaces/{namespace}/workloads/{kind}/{workload}/pods or
// GET /namespaces/{namespace}/pods/{pod} or
// GET /namespaces/{namespace}/pods
if o.NamespaceName != "" {
if o.Namespace != "" {
if o.PodName != "" {
podSelector = fmt.Sprintf(`pod="%s", namespace="%s"`, o.PodName, o.NamespaceName)
podSelector = fmt.Sprintf(`pod="%s", namespace="%s"`, o.PodName, o.Namespace)
} else {
podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.NamespaceName)
podSelector = fmt.Sprintf(`pod=~"%s", namespace="%s"`, o.ResourceFilter, o.Namespace)
}
} else {
var namespaces, pods []string
@ -445,9 +415,9 @@ func makePodMetricExpr(tmpl string, o QueryOptions) string {
func makeContainerMetricExpr(tmpl string, o QueryOptions) string {
var containerSelector string
if o.ContainerName != "" {
containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container="%s"`, o.PodName, o.NamespaceName, o.ContainerName)
containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container="%s"`, o.PodName, o.Namespace, o.ContainerName)
} else {
containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container=~"%s"`, o.PodName, o.NamespaceName, o.ResourceFilter)
containerSelector = fmt.Sprintf(`pod="%s", namespace="%s", container=~"%s"`, o.PodName, o.Namespace, o.ResourceFilter)
}
return strings.Replace(tmpl, "$1", containerSelector, -1)
}
@ -458,11 +428,11 @@ func makePVCMetricExpr(tmpl string, o QueryOptions) string {
// For monitoring persistentvolumeclaims in the specific namespace
// GET /namespaces/{namespace}/persistentvolumeclaims/{persistentvolumeclaim} or
// GET /namespaces/{namespace}/persistentvolumeclaims
if o.NamespaceName != "" {
if o.Namespace != "" {
if o.PersistentVolumeClaimName != "" {
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.NamespaceName, o.PersistentVolumeClaimName)
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim="%s"`, o.Namespace, o.PersistentVolumeClaimName)
} else {
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.NamespaceName, o.ResourceFilter)
pvcSelector = fmt.Sprintf(`namespace="%s", persistentvolumeclaim=~"%s"`, o.Namespace, o.ResourceFilter)
}
return strings.Replace(tmpl, "$1", pvcSelector, -1)
}

View File

@ -29,7 +29,7 @@ const (
LevelNamespace
LevelApplication
LevelOpenpitrix
LevelWorkload
LevelController
LevelService
LevelPod
LevelContainer
@ -44,7 +44,7 @@ var MeteringLevelMap = map[string]int{
"LevelWorkspace": LevelWorkspace,
"LevelNamespace": LevelNamespace,
"LevelApplication": LevelApplication,
"LevelWorkload": LevelWorkload,
"LevelController": LevelController,
"LevelService": LevelService,
"LevelPod": LevelPod,
"LevelContainer": LevelContainer,
@ -70,10 +70,11 @@ type QueryOptions struct {
ResourceFilter string
NodeName string
WorkspaceName string
NamespaceName string
Namespace string
WorkloadKind string
WorkloadName string
OwnerName string
PodName string
PodsName string
ContainerName string
StorageClassName string
PersistentVolumeClaimName string
@ -140,7 +141,7 @@ func (no NamespaceOption) Apply(o *QueryOptions) {
o.Level = LevelNamespace
o.ResourceFilter = no.ResourceFilter
o.WorkspaceName = no.WorkspaceName
o.NamespaceName = no.NamespaceName
o.Namespace = no.NamespaceName
o.PVCFilter = no.PVCFilter
o.StorageClassName = no.StorageClassName
}
@ -180,16 +181,16 @@ type ApplicationOption struct {
func (ao ApplicationOption) Apply(o *QueryOptions) {
o.Level = LevelApplication
o.NamespaceName = ao.NamespaceName
o.Namespace = ao.NamespaceName
o.ApplicationName = ao.Application
o.StorageClassName = ao.StorageClassName
app_components := strings.Join(ao.ApplicationComponents[:], "|")
if len(app_components) > 0 {
o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.NamespaceName, app_components)
o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.Namespace, app_components)
} else {
o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.NamespaceName, ".*")
o.ResourceFilter = fmt.Sprintf(`namespace="%s", workload=~"%s"`, o.Namespace, ".*")
}
}
@ -200,9 +201,9 @@ type WorkloadOption struct {
}
func (wo WorkloadOption) Apply(o *QueryOptions) {
o.Level = LevelWorkload
o.Level = LevelController
o.ResourceFilter = wo.ResourceFilter
o.NamespaceName = wo.NamespaceName
o.Namespace = wo.NamespaceName
o.WorkloadKind = wo.WorkloadKind
}
@ -226,15 +227,15 @@ type ServiceOption struct {
func (so ServiceOption) Apply(o *QueryOptions) {
o.Level = LevelService
o.NamespaceName = so.NamespaceName
o.Namespace = so.NamespaceName
o.ServiceName = so.ServiceName
pod_names := strings.Join(so.PodNames, "|")
if len(pod_names) > 0 {
o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, pod_names, o.NamespaceName)
o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, pod_names, o.Namespace)
} else {
o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, ".*", o.NamespaceName)
o.ResourceFilter = fmt.Sprintf(`pod=~"%s", namespace="%s"`, ".*", o.Namespace)
}
}
@ -248,17 +249,33 @@ type PodOption struct {
PodName string
}
type ControllerOption struct {
PodsName string
Namespace string
Kind string
OwnerName string
}
func (po PodOption) Apply(o *QueryOptions) {
o.Level = LevelPod
o.NamespacedResourcesFilter = po.NamespacedResourcesFilter
o.ResourceFilter = po.ResourceFilter
o.NodeName = po.NodeName
o.NamespaceName = po.NamespaceName
o.Namespace = po.NamespaceName
o.WorkloadKind = po.WorkloadKind
o.WorkloadName = po.WorkloadName
o.OwnerName = po.WorkloadName
o.PodName = po.PodName
}
func (co ControllerOption) Apply(o *QueryOptions) {
o.Level = LevelController
o.Namespace = co.Namespace
o.WorkloadKind = co.Kind
o.OwnerName = co.OwnerName
o.PodName = co.PodsName
}
type ContainerOption struct {
ResourceFilter string
NamespaceName string
@ -269,7 +286,7 @@ type ContainerOption struct {
func (co ContainerOption) Apply(o *QueryOptions) {
o.Level = LevelContainer
o.ResourceFilter = co.ResourceFilter
o.NamespaceName = co.NamespaceName
o.Namespace = co.NamespaceName
o.PodName = co.PodName
o.ContainerName = co.ContainerName
}
@ -284,7 +301,7 @@ type PVCOption struct {
func (po PVCOption) Apply(o *QueryOptions) {
o.Level = LevelPVC
o.ResourceFilter = po.ResourceFilter
o.NamespaceName = po.NamespaceName
o.Namespace = po.NamespaceName
o.StorageClassName = po.StorageClassName
o.PersistentVolumeClaimName = po.PersistentVolumeClaimName
@ -304,7 +321,7 @@ type IngressOption struct {
func (no IngressOption) Apply(o *QueryOptions) {
o.Level = LevelIngress
o.ResourceFilter = no.ResourceFilter
o.NamespaceName = no.NamespaceName
o.Namespace = no.NamespaceName
o.Ingress = no.Ingress
o.Job = no.Job
o.PodName = no.Pod

View File

@ -19,26 +19,109 @@ import (
"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"strconv"
"strings"
"sync"
"time"
)
type prometheus struct {
client v1.API
type Prometheus struct {
prometheus Interface
client v1.API
}
// NewPrometheus 初始化Prometheus客户端
func NewPrometheus(address string) (Interface, error) {
func NewPrometheus(address string) (Prometheus, error) {
cfg := api.Config{
Address: address,
}
client, err := api.NewClient(cfg)
return prometheus{client: v1.NewAPI(client)}, err
return Prometheus{client: v1.NewAPI(client)}, err
}
func (p prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
func ParseTime(timestamp string) (time.Time, error) {
// Parse time params
startInt, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return time.Now(), err
}
return time.Unix(startInt, 0), nil
}
func (p Prometheus) GetNamedMetricsByTime(metrics []string, start, end string, step time.Duration, o QueryOption) []Metric {
var res []Metric
var mtx sync.Mutex
var wg sync.WaitGroup
opts := NewQueryOptions()
o.Apply(opts)
for _, metric := range metrics {
wg.Add(1)
go func(metric string) {
parsedResp := Metric{MetricName: metric}
startTimestamp, err := ParseTime(start)
if err != nil {
return
}
endTimestamp, err := ParseTime(end)
if err != nil {
return
}
timeRange := v1.Range{
Start: startTimestamp,
End: endTimestamp,
Step: step,
}
value, _, err := p.client.QueryRange(context.Background(), makeExpr(metric, *opts), timeRange)
if err != nil {
parsedResp.Error = err.Error()
} else {
parsedResp.MetricData = parseQueryRangeResp(value, genMetricFilter(o))
}
mtx.Lock()
res = append(res, parsedResp)
mtx.Unlock()
wg.Done()
}(metric)
}
wg.Wait()
return res
}
func parseQueryRangeResp(value model.Value, metricFilter func(metric model.Metric) bool) MetricData {
res := MetricData{MetricType: MetricTypeMatrix}
data, _ := value.(model.Matrix)
for _, v := range data {
if metricFilter != nil && !metricFilter(v.Metric) {
continue
}
mv := MetricValue{
Metadata: make(map[string]string),
}
for k, v := range v.Metric {
mv.Metadata[string(k)] = string(v)
}
for _, k := range v.Values {
mv.Series = append(mv.Series, Point{float64(k.Timestamp) / 1000, float64(k.Value)})
}
res.MetricValues = append(res.MetricValues, mv)
}
return res
}
func (p Prometheus) GetNamedMetrics(metrics []string, ts time.Time, o QueryOption) []Metric {
var res []Metric
var mtx sync.Mutex
var wg sync.WaitGroup

View File

@ -20,12 +20,18 @@ import (
)
func TestGetNamedMetrics(t *testing.T) {
client, _ := NewPrometheus("http://10.101.15.3:32585")
result := client.GetNamedMetrics([]string{"pod_cpu_resource_limits"}, time.Now(), PodOption{
//client, _ := NewPrometheus("http://10.101.15.3:32585")
//result := client.GetNamedMetrics([]string{"pod_cpu_resource_limits"}, time.Now(), PodOption{
//
// PodName: "prometheus-k8s-0",
// NamespaceName: "monitoring-system",
//})
//println("zzz", result[0].MetricValues[0].Sample.Value())
PodName: "prometheus-k8s-0",
NamespaceName: "monitoring-system",
client, _ := NewPrometheus("http://10.105.20.4:30766")
result := client.GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, "1700521446", "1700551446", 10*time.Minute, ControllerOption{
PodsName: "notification-manager-deployment-78664576cb-vkptn|notification-manager-deployment-78664576cb-5m6mt",
Namespace: "kubesphere-monitoring-system",
})
println("zzz", result[0].MetricValues[0].Sample.Value())
println("zzz", result)
}