feat: alert rule annotation support prom query template func (#2314)

Co-authored-by: Xu Bin <140785332+Reditiny@users.noreply.github.com>
This commit is contained in:
Yening Qin 2024-11-22 16:58:00 +08:00 committed by GitHub
parent c13ecd780b
commit da9ea67cee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 93 additions and 33 deletions

View File

@ -65,6 +65,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
promClients := prom.NewPromClient(ctx)
dispatch.InitRegisterQueryFunc(promClients)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()

View File

@ -1,6 +1,7 @@
package dispatch
import (
"context"
"encoding/json"
"fmt"
"strings"
@ -13,8 +14,10 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/ccfos/nightingale/v6/prom"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
)
@ -27,6 +30,18 @@ type Consumer struct {
promClients *prom.PromClientMap
}
func InitRegisterQueryFunc(promClients *prom.PromClientMap) {
tplx.RegisterQueryFunc(func(datasourceID int64, promql string) model.Value {
if promClients.IsNil(datasourceID) {
return nil
}
readerClient := promClients.GetCli(datasourceID)
value, _, _ := readerClient.Query(context.Background(), promql, time.Now())
return value
})
}
// 创建一个 Consumer 实例
func NewConsumer(alerting aconf.Alerting, ctx *ctx.Context, dispatch *Dispatch, promClients *prom.PromClientMap) *Consumer {
return &Consumer{
@ -169,7 +184,7 @@ func (e *Consumer) queryRecoveryVal(event *models.AlertCurEvent) {
logger.Errorf("rule_eval:%s promql:%s, warnings:%v", getKey(event), promql, warnings)
}
anomalyPoints := common.ConvertAnomalyPoints(value)
anomalyPoints := models.ConvertAnomalyPoints(value)
if len(anomalyPoints) == 0 {
logger.Warningf("rule_eval:%s promql:%s, result is empty", getKey(event), promql)
event.AnnotationsJSON["recovery_promql_error"] = fmt.Sprintf("promql:%s error:%s", promql, "result is empty")

View File

@ -138,8 +138,8 @@ func (arw *AlertRuleWorker) Eval() {
typ := cachedRule.GetRuleType()
var (
anomalyPoints []common.AnomalyPoint
recoverPoints []common.AnomalyPoint
anomalyPoints []models.AnomalyPoint
recoverPoints []models.AnomalyPoint
err error
)
@ -167,7 +167,7 @@ func (arw *AlertRuleWorker) Eval() {
}
if arw.Inhibit {
pointsMap := make(map[string]common.AnomalyPoint)
pointsMap := make(map[string]models.AnomalyPoint)
for _, point := range recoverPoints {
// 对于恢复的事件,合并处理
tagHash := process.TagHash(point)
@ -211,8 +211,8 @@ func (arw *AlertRuleWorker) Stop() {
}
func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]common.AnomalyPoint, error) {
var lst []common.AnomalyPoint
func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]models.AnomalyPoint, error) {
var lst []models.AnomalyPoint
var severity int
var rule *models.PromRuleConfig
@ -273,7 +273,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]common.Ano
}
logger.Debugf("rule_eval:%s query:%+v, value:%v", arw.Key(), query, value)
points := common.ConvertAnomalyPoints(value)
points := models.ConvertAnomalyPoints(value)
for i := 0; i < len(points); i++ {
points[i].Severity = query.Severity
points[i].Query = promql
@ -301,10 +301,10 @@ type sample struct {
// 每个节点先查询无参数的 query, 即 mem_used_percent > curVal, 得到满足值变量的所有结果
// 结果中有满足本节点参数变量的值,加入异常点列表
// 参数变量的值不满足的组合,需要覆盖上层筛选中产生的异常点
func (arw *AlertRuleWorker) VarFilling(query models.PromQuery, readerClient promsdk.API) map[string]common.AnomalyPoint {
func (arw *AlertRuleWorker) VarFilling(query models.PromQuery, readerClient promsdk.API) map[string]models.AnomalyPoint {
fullQuery := removeVal(query.PromQl)
// 存储所有的异常点key 为参数变量的组合,可以实现子筛选对上一层筛选的覆盖
anomalyPoints := make(map[string]common.AnomalyPoint)
anomalyPoints := make(map[string]models.AnomalyPoint)
// 统一变量配置格式
VarConfigForCalc := &models.ChildVarConfig{
ParamVal: make([]map[string]models.ParamQuery, 1),
@ -371,7 +371,7 @@ func (arw *AlertRuleWorker) VarFilling(query models.PromQuery, readerClient prom
curRealQuery = strings.Replace(curRealQuery, fmt.Sprintf("$%s", paramKey), val, -1)
}
if _, ok := paramPermutation[strings.Join(cur, "-")]; ok {
anomalyPoints[strings.Join(cur, "-")] = common.AnomalyPoint{
anomalyPoints[strings.Join(cur, "-")] = models.AnomalyPoint{
Key: seqVals[i].Metric.String(),
Timestamp: seqVals[i].Timestamp.Unix(),
Value: float64(seqVals[i].Value),
@ -591,10 +591,10 @@ func combine(paramKeys []string, paraMap map[string][]string, index int, current
}
}
func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]common.AnomalyPoint, []common.AnomalyPoint, error) {
func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]models.AnomalyPoint, []models.AnomalyPoint, error) {
// 获取查询和规则判断条件
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
points := []models.AnomalyPoint{}
recoverPoints := []models.AnomalyPoint{}
ruleConfig := strings.TrimSpace(rule.RuleConfig)
if ruleConfig == "" {
logger.Warningf("rule_eval:%d promql is blank", rule.Id)
@ -654,8 +654,8 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
return points, recoverPoints, nil
}
func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]common.AnomalyPoint, error) {
var lst []common.AnomalyPoint
func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.AnomalyPoint, error) {
var lst []models.AnomalyPoint
var severity int
var rule *models.HostRuleConfig
@ -721,7 +721,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]common.Ano
}
m["ident"] = target.Ident
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.UpdateAt), trigger.Severity))
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.UpdateAt), trigger.Severity))
}
case "offset":
idents, exists := arw.Processor.TargetsOfAlertRuleCache.Get(arw.Processor.EngineName, arw.Rule.Id)
@ -768,7 +768,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]common.Ano
}
m["ident"] = host
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(offset), trigger.Severity))
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(offset), trigger.Severity))
}
case "pct_target_miss":
t := now - int64(trigger.Duration)
@ -789,16 +789,16 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]common.Ano
logger.Debugf("rule_eval:%s missTargets:%v", arw.Key(), missTargets)
pct := float64(len(missTargets)) / float64(len(idents)) * 100
if pct >= float64(trigger.Percent) {
lst = append(lst, common.NewAnomalyPoint(trigger.Type, nil, now, pct, trigger.Severity))
lst = append(lst, models.NewAnomalyPoint(trigger.Type, nil, now, pct, trigger.Severity))
}
}
}
return lst, nil
}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]models.AnomalyPoint, []models.AnomalyPoint) {
points := []models.AnomalyPoint{}
recoverPoints := []models.AnomalyPoint{}
if len(ruleQuery.Triggers) == 0 {
return points, recoverPoints
@ -871,7 +871,7 @@ func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes
values += fmt.Sprintf("%s:%v ", k, v)
}
point := common.AnomalyPoint{
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),

View File

@ -126,7 +126,7 @@ func NewProcessor(engineName string, rule *models.AlertRule, datasourceId int64,
return p
}
func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inhibit bool) {
func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inhibit bool) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
// 所以这里直接从memsto.AlertRuleCache中获取并覆盖
@ -178,7 +178,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
}
}
func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, now int64, ruleHash string) *models.AlertCurEvent {
func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, now int64, ruleHash string) *models.AlertCurEvent {
p.fillTags(anomalyPoint)
p.mayHandleIdent()
hash := Hash(p.rule.Id, p.datasourceId, anomalyPoint)
@ -559,7 +559,7 @@ func (p *Processor) RecoverAlertCurEventFromDb() {
p.pendingsUseByRecover = NewAlertCurEventMap(pendingsUseByRecoverMap)
}
func (p *Processor) fillTags(anomalyPoint common.AnomalyPoint) {
func (p *Processor) fillTags(anomalyPoint models.AnomalyPoint) {
// handle series tags
tagsMap := make(map[string]string)
for label, value := range anomalyPoint.Labels {
@ -649,10 +649,10 @@ func labelMapToArr(m map[string]string) []string {
return labelStrings
}
func Hash(ruleId, datasourceId int64, vector common.AnomalyPoint) string {
func Hash(ruleId, datasourceId int64, vector models.AnomalyPoint) string {
return str.MD5(fmt.Sprintf("%d_%s_%d_%d_%s", ruleId, vector.Labels.String(), datasourceId, vector.Severity, vector.Query))
}
func TagHash(vector common.AnomalyPoint) string {
func TagHash(vector models.AnomalyPoint) string {
return str.MD5(vector.Labels.String())
}

View File

@ -6,7 +6,6 @@ import (
"strings"
"time"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/dispatch"
"github.com/ccfos/nightingale/v6/alert/mute"
"github.com/ccfos/nightingale/v6/alert/naming"
@ -92,7 +91,7 @@ func (rt *Router) eventPersist(c *gin.Context) {
type eventForm struct {
Alert bool `json:"alert"`
AnomalyPoints []common.AnomalyPoint `json:"vectors"`
AnomalyPoints []models.AnomalyPoint `json:"vectors"`
RuleId int64 `json:"rule_id"`
DatasourceId int64 `json:"datasource_id"`
Inhibit bool `json:"inhibit"`

View File

@ -6,6 +6,7 @@ import (
"github.com/ccfos/nightingale/v6/alert"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/dispatch"
"github.com/ccfos/nightingale/v6/alert/process"
alertrt "github.com/ccfos/nightingale/v6/alert/router"
"github.com/ccfos/nightingale/v6/center/cconf"
@ -103,6 +104,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
sso := sso.Init(config.Center, ctx, configCache)
promClients := prom.NewPromClient(ctx)
dispatch.InitRegisterQueryFunc(promClients)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()

View File

@ -7,6 +7,7 @@ import (
"github.com/ccfos/nightingale/v6/alert"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/dispatch"
"github.com/ccfos/nightingale/v6/alert/process"
alertrt "github.com/ccfos/nightingale/v6/alert/router"
"github.com/ccfos/nightingale/v6/center/metas"
@ -73,6 +74,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
taskTplsCache := memsto.NewTaskTplCache(ctx)
promClients := prom.NewPromClient(ctx)
dispatch.InitRegisterQueryFunc(promClients)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()

View File

@ -115,8 +115,18 @@ func (e *AlertCurEvent) ParseRule(field string) error {
"{{$value := .TriggerValue}}",
}
templateFuncMapCopy := tplx.NewTemplateFuncMap()
templateFuncMapCopy["query"] = func(promql string, param ...int64) []AnomalyPoint {
datasourceId := e.DatasourceId
if len(param) > 0 {
datasourceId = param[0]
}
value := tplx.Query(datasourceId, promql)
return ConvertAnomalyPoints(value)
}
text := strings.Join(append(defs, f), "")
t, err := template.New(fmt.Sprint(e.RuleId)).Funcs(template.FuncMap(tplx.TemplateFuncMap)).Parse(text)
t, err := template.New(fmt.Sprint(e.RuleId)).Funcs(templateFuncMapCopy).Parse(text)
if err != nil {
e.AnnotationsJSON[k] = fmt.Sprintf("failed to parse annotations: %v", err)
continue

View File

@ -1,11 +1,10 @@
package common
package models
import (
"fmt"
"math"
"strings"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/unit"
"github.com/prometheus/common/model"
)
@ -20,7 +19,7 @@ type AnomalyPoint struct {
Query string `json:"query"`
Values string `json:"values"`
ValuesUnit map[string]unit.FormattedValue `json:"values_unit"`
RecoverConfig models.RecoverConfig `json:"recover_config"`
RecoverConfig RecoverConfig `json:"recover_config"`
}
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {

View File

@ -27,6 +27,15 @@ type sample struct {
Value float64
}
type QueryFunc func(int64, string) model.Value
var queryFunc QueryFunc
// RegisterQueryFunc 为了避免循环引用,通过外部注入的方式注册 queryFunc
func RegisterQueryFunc(f QueryFunc) {
queryFunc = f
}
type queryResult []*sample
type queryResultByLabelSorter struct {
@ -564,3 +573,13 @@ func convertToFloat(i interface{}) (float64, error) {
return 0, fmt.Errorf("can't convert %T to float", v)
}
}
func Query(datasourceID int64, promql string) model.Value {
value := queryFunc(datasourceID, promql)
if value != nil {
return value
}
return nil
}

View File

@ -54,6 +54,15 @@ var TemplateFuncMap = template.FuncMap{
"printf": Printf,
}
// NewTemplateFuncMap copy on write for TemplateFuncMap
func NewTemplateFuncMap() template.FuncMap {
m := template.FuncMap{}
for k, v := range TemplateFuncMap {
m[k] = v
}
return m
}
// ReplaceTemplateUseHtml replaces variables in a template string with values.
//
// It accepts the following parameters: