refactor: optimize rule datasource set (#2288)

Co-authored-by: Xu Bin <140785332+Reditiny@users.noreply.github.com>
This commit is contained in:
Yening Qin 2024-11-13 20:28:50 +08:00 committed by GitHub
parent 79f3404810
commit 78417b1d5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 450 additions and 225 deletions

View File

@ -102,7 +102,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
naming := naming.NewNaming(ctx, alertc.Heartbeat, alertStats)
writers := writer.NewWriters(pushgwc)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats, datasourceCache)
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
busiGroupCache, alertMuteCache, datasourceCache, promClients, tdendgineClients, naming, ctx, alertStats)

View File

@ -96,8 +96,7 @@ func (s *Scheduler) syncAlertRules() {
ruleType := rule.GetRuleType()
if rule.IsPrometheusRule() || rule.IsLokiRule() || rule.IsTdengineRule() {
datasourceIds := s.promClients.Hit(rule.DatasourceIdsJson)
datasourceIds = append(datasourceIds, s.tdengineClients.Hit(rule.DatasourceIdsJson)...)
datasourceIds := s.datasourceCache.GetIDsByDsCateAndQueries(rule.Cate, rule.DatasourceQueries)
for _, dsId := range datasourceIds {
if !naming.DatasourceHashRing.IsHit(strconv.FormatInt(dsId, 10), fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue
@ -133,7 +132,8 @@ func (s *Scheduler) syncAlertRules() {
} else {
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule
// if rule is not processed by prometheus engine, create it as externalRule
for _, dsId := range rule.DatasourceIdsJson {
dsIds := s.datasourceCache.GetIDsByDsCateAndQueries(rule.Cate, rule.DatasourceQueries)
for _, dsId := range dsIds {
ds := s.datasourceCache.GetById(dsId)
if ds == nil {
logger.Debugf("datasource %d not found", dsId)

View File

@ -26,9 +26,11 @@ type Scheduler struct {
writers *writer.WritersType
stats *astats.Stats
datasourceCache *memsto.DatasourceCacheType
}
func NewScheduler(aconf aconf.Alert, rrc *memsto.RecordingRuleCacheType, promClients *prom.PromClientMap, writers *writer.WritersType, stats *astats.Stats) *Scheduler {
func NewScheduler(aconf aconf.Alert, rrc *memsto.RecordingRuleCacheType, promClients *prom.PromClientMap, writers *writer.WritersType, stats *astats.Stats, datasourceCache *memsto.DatasourceCacheType) *Scheduler {
scheduler := &Scheduler{
aconf: aconf,
recordRules: make(map[string]*RecordRuleContext),
@ -39,6 +41,8 @@ func NewScheduler(aconf aconf.Alert, rrc *memsto.RecordingRuleCacheType, promCli
writers: writers,
stats: stats,
datasourceCache: datasourceCache,
}
go scheduler.LoopSyncRules(context.Background())
@ -67,7 +71,7 @@ func (s *Scheduler) syncRecordRules() {
continue
}
datasourceIds := s.promClients.Hit(rule.DatasourceIdsJson)
datasourceIds := s.datasourceCache.GetIDsByDsCateAndQueries("prometheus", rule.DatasourceQueries)
for _, dsId := range datasourceIds {
if !naming.DatasourceHashRing.IsHit(strconv.FormatInt(dsId, 10), fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue

View File

@ -184,6 +184,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.POST("/query-range-batch", rt.promBatchQueryRange)
pages.POST("/query-instant-batch", rt.promBatchQueryInstant)
pages.GET("/datasource/brief", rt.datasourceBriefs)
pages.POST("/datasource/query", rt.datasourceQuery)
pages.POST("/ds-query", rt.QueryData)
pages.POST("/logs-query", rt.QueryLog)
@ -197,6 +198,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.POST("/query-range-batch", rt.auth(), rt.promBatchQueryRange)
pages.POST("/query-instant-batch", rt.auth(), rt.promBatchQueryInstant)
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
pages.POST("/ds-query", rt.auth(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.QueryLog)

View File

@ -77,6 +77,11 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
for i := 0; i < len(ars); i++ {
ars[i].FillNotifyGroups(rt.Ctx, cache)
ars[i].FillSeverities()
if len(ars[i].DatasourceQueries) != 0 {
ars[i].DatasourceIdsJson = rt.DatasourceCache.GetIDsByDsCateAndQueries(ars[i].Cate, ars[i].DatasourceQueries)
}
rids = append(rids, ars[i].Id)
names = append(names, ars[i].UpdateBy)
}
@ -123,6 +128,10 @@ func (rt *Router) alertRulesGetByService(c *gin.Context) {
cache := make(map[int64]*models.UserGroup)
for i := 0; i < len(ars); i++ {
ars[i].FillNotifyGroups(rt.Ctx, cache)
if len(ars[i].DatasourceQueries) != 0 {
ars[i].DatasourceIdsJson = rt.DatasourceCache.GetIDsByDsCateAndQueries(ars[i].Cate, ars[i].DatasourceQueries)
}
}
}
ginx.NewRender(c).Data(ars, err)
@ -157,6 +166,14 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "input json is empty")
}
for i := range lst {
if len(lst[i].DatasourceQueries) == 0 {
lst[i].DatasourceQueries = []models.DatasourceQuery{
models.DataSourceQueryAll,
}
}
}
bgid := ginx.UrlParamInt64(c, "id")
reterr := rt.alertRuleAdd(lst, username, bgid, c.GetHeader("X-Language"))
@ -164,9 +181,9 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
}
type promRuleForm struct {
Payload string `json:"payload" binding:"required"`
DatasourceIds []int64 `json:"datasource_ids" binding:"required"`
Disabled int `json:"disabled" binding:"gte=0,lte=1"`
Payload string `json:"payload" binding:"required"`
DatasourceQueries []models.DatasourceQuery `json:"datasource_queries" binding:"required"`
Disabled int `json:"disabled" binding:"gte=0,lte=1"`
}
func (rt *Router) alertRuleAddByImportPromRule(c *gin.Context) {
@ -185,7 +202,7 @@ func (rt *Router) alertRuleAddByImportPromRule(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "input yaml is empty")
}
lst := models.DealPromGroup(pr.Groups, f.DatasourceIds, f.Disabled)
lst := models.DealPromGroup(pr.Groups, f.DatasourceQueries, f.Disabled)
username := c.MustGet("username").(string)
bgid := ginx.UrlParamInt64(c, "id")
ginx.NewRender(c).Data(rt.alertRuleAdd(lst, username, bgid, c.GetHeader("X-Language")), nil)
@ -398,6 +415,16 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
}
}
if f.Action == "datasource_change" {
// 修改数据源
if datasourceQueries, has := f.Fields["datasource_queries"]; has {
bytes, err := json.Marshal(datasourceQueries)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
continue
}
}
for k, v := range f.Fields {
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
}
@ -417,6 +444,10 @@ func (rt *Router) alertRuleGet(c *gin.Context) {
return
}
if len(ar.DatasourceQueries) != 0 {
ar.DatasourceIdsJson = rt.DatasourceCache.GetIDsByDsCateAndQueries(ar.Cate, ar.DatasourceQueries)
}
err = ar.FillNotifyGroups(rt.Ctx, make(map[int64]*models.UserGroup))
ginx.Dangerous(err)
@ -623,7 +654,7 @@ func (rt *Router) cloneToMachine(c *gin.Context) {
newRule.CreateAt = now
newRule.RuleConfig = alertRules[i].RuleConfig
exist, err := models.AlertRuleExists(rt.Ctx, 0, newRule.GroupId, newRule.DatasourceIdsJson, newRule.Name)
exist, err := models.AlertRuleExists(rt.Ctx, 0, newRule.GroupId, newRule.Name)
if err != nil {
errMsg[f.IdentList[j]] = err.Error()
continue

View File

@ -251,3 +251,37 @@ func (rt *Router) getDatasourceIds(c *gin.Context) {
ginx.NewRender(c).Data(datasourceIds, err)
}
type datasourceQueryForm struct {
Cate string `json:"datasource_cate"`
DatasourceQueries []models.DatasourceQuery `json:"datasource_queries"`
}
type datasourceQueryResp struct {
ID int64 `json:"id"`
Name string `json:"name"`
}
func (rt *Router) datasourceQuery(c *gin.Context) {
var dsf datasourceQueryForm
ginx.BindJSON(c, &dsf)
datasources, err := models.GetDatasourcesGetsByTypes(rt.Ctx, []string{dsf.Cate})
ginx.Dangerous(err)
nameToID := make(map[string]int64)
IDToName := make(map[int64]string)
for _, ds := range datasources {
nameToID[ds.Name] = ds.Id
IDToName[ds.Id] = ds.Name
}
ids := models.GetDatasourceIDsByDatasourceQueries(dsf.DatasourceQueries, IDToName, nameToID)
var req []datasourceQueryResp
for _, id := range ids {
req = append(req, datasourceQueryResp{
ID: id,
Name: IDToName[id],
})
}
ginx.NewRender(c).Data(req, err)
}

View File

@ -3,8 +3,6 @@ package router
import (
"encoding/json"
"net/http"
"strconv"
"strings"
"time"
"github.com/ccfos/nightingale/v6/models"
@ -74,6 +72,14 @@ func (rt *Router) recordingRuleAddByFE(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "input json is empty")
}
for i := range lst {
if len(lst[i].DatasourceQueries) == 0 {
lst[i].DatasourceQueries = []models.DatasourceQuery{
models.DataSourceQueryAll,
}
}
}
bgid := ginx.UrlParamInt64(c, "id")
reterr := make(map[string]string)
for i := 0; i < count; i++ {
@ -137,23 +143,10 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
f.Fields["update_by"] = c.MustGet("username").(string)
f.Fields["update_at"] = time.Now().Unix()
if _, ok := f.Fields["datasource_ids"]; ok {
// datasource_ids = "1 2 3"
idsStr := strings.Fields(f.Fields["datasource_ids"].(string))
ids := make([]int64, 0)
for _, idStr := range idsStr {
id, err := strconv.ParseInt(idStr, 10, 64)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "datasource_ids error")
}
ids = append(ids, id)
}
bs, err := json.Marshal(ids)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "datasource_ids error")
}
f.Fields["datasource_ids"] = string(bs)
if datasourceQueries, ok := f.Fields["datasource_queries"]; ok {
bytes, err := json.Marshal(datasourceQueries)
ginx.Dangerous(err)
f.Fields["datasource_queries"] = string(bytes)
}
for i := 0; i < len(f.Ids); i++ {

View File

@ -23,7 +23,9 @@ type DatasourceCacheType struct {
DatasourceFilter func([]*models.Datasource, *models.User) []*models.Datasource
sync.RWMutex
ds map[int64]*models.Datasource // key: id
ds map[int64]*models.Datasource // key: id value: datasource
CateToIDs map[string]map[int64]*models.Datasource // key1: cate key2: id value: datasource
CateToNames map[string]map[string]int64 // key1: cate key2: name value: id
}
func NewDatasourceCache(ctx *ctx.Context, stats *Stats) *DatasourceCacheType {
@ -33,6 +35,8 @@ func NewDatasourceCache(ctx *ctx.Context, stats *Stats) *DatasourceCacheType {
ctx: ctx,
stats: stats,
ds: make(map[int64]*models.Datasource),
CateToIDs: make(map[string]map[int64]*models.Datasource),
CateToNames: make(map[string]map[string]int64),
DatasourceCheckHook: func(ctx *gin.Context) bool { return false },
DatasourceFilter: func(ds []*models.Datasource, user *models.User) []*models.Datasource { return ds },
}
@ -40,6 +44,12 @@ func NewDatasourceCache(ctx *ctx.Context, stats *Stats) *DatasourceCacheType {
return ds
}
func (d *DatasourceCacheType) GetIDsByDsCateAndQueries(cate string, datasourceQueries []models.DatasourceQuery) []int64 {
d.Lock()
defer d.Unlock()
return models.GetDatasourceIDsByDatasourceQueries(datasourceQueries, d.CateToIDs[cate], d.CateToNames[cate])
}
func (d *DatasourceCacheType) StatChanged(total, lastUpdated int64) bool {
if d.statTotal == total && d.statLastUpdated == lastUpdated {
return false
@ -49,8 +59,22 @@ func (d *DatasourceCacheType) StatChanged(total, lastUpdated int64) bool {
}
func (d *DatasourceCacheType) Set(ds map[int64]*models.Datasource, total, lastUpdated int64) {
cateToDs := make(map[string]map[int64]*models.Datasource)
cateToNames := make(map[string]map[string]int64)
for _, datasource := range ds {
if _, exists := cateToDs[datasource.PluginType]; !exists {
cateToDs[datasource.PluginType] = make(map[int64]*models.Datasource)
}
cateToDs[datasource.PluginType][datasource.Id] = datasource
if _, exists := cateToNames[datasource.PluginType]; !exists {
cateToNames[datasource.PluginType] = make(map[string]int64)
}
cateToNames[datasource.PluginType][datasource.Name] = datasource.Id
}
d.Lock()
d.CateToIDs = cateToDs
d.ds = ds
d.CateToNames = cateToNames
d.Unlock()
// only one goroutine used, so no need lock
@ -99,20 +123,20 @@ func (d *DatasourceCacheType) syncDatasources() error {
return nil
}
m, err := models.DatasourceGetMap(d.ctx)
ds, err := models.DatasourceGetMap(d.ctx)
if err != nil {
dumper.PutSyncRecord("datasources", start.Unix(), -1, -1, "failed to query records: "+err.Error())
return errors.WithMessage(err, "failed to call DatasourceGetMap")
}
d.Set(m, stat.Total, stat.LastUpdated)
d.Set(ds, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
d.stats.GaugeCronDuration.WithLabelValues("sync_datasources").Set(float64(ms))
d.stats.GaugeSyncNumber.WithLabelValues("sync_datasources").Set(float64(len(m)))
d.stats.GaugeSyncNumber.WithLabelValues("sync_datasources").Set(float64(len(ds)))
logger.Infof("timer: sync datasources done, cost: %dms, number: %d", ms, len(m))
dumper.PutSyncRecord("datasources", start.Unix(), ms, len(m), "success")
logger.Infof("timer: sync datasources done, cost: %dms, number: %d", ms, len(ds))
dumper.PutSyncRecord("datasources", start.Unix(), ms, len(ds), "success")
return nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/tidwall/match"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
@ -45,55 +46,56 @@ const (
type AlertRule struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
DatasourceIds string `json:"-" gorm:"datasource_ids"` // datasource ids
DatasourceIdsJson []int64 `json:"datasource_ids" gorm:"-"` // for fe
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
Name string `json:"name"` // rule name
Note string `json:"note"` // will sent in notify
Prod string `json:"prod"` // product empty means n9e
Algorithm string `json:"algorithm"` // algorithm (''|holtwinters), empty means threshold
AlgoParams string `json:"-" gorm:"algo_params"` // params algorithm need
AlgoParamsJson interface{} `json:"algo_params" gorm:"-"` // for fe
Delay int `json:"delay"` // Time (in seconds) to delay evaluation
Severity int `json:"severity"` // 1: Emergency 2: Warning 3: Notice
Severities []int `json:"severities" gorm:"-"` // 1: Emergency 2: Warning 3: Notice
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
PromForDuration int `json:"prom_for_duration"` // prometheus for, unit:s
PromQl string `json:"prom_ql"` // just one ql
RuleConfig string `json:"-" gorm:"rule_config"` // rule config
RuleConfigJson interface{} `json:"rule_config" gorm:"-"` // rule config for fe
EventRelabelConfig []*pconf.RelabelConfig `json:"event_relabel_config" gorm:"-"` // event relabel config
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
EnableStime string `json:"-"` // split by space: "00:00 10:00 12:00"
EnableStimeJSON string `json:"enable_stime" gorm:"-"` // for fe
EnableStimesJSON []string `json:"enable_stimes" gorm:"-"` // for fe
EnableEtime string `json:"-"` // split by space: "00:00 10:00 12:00"
EnableEtimeJSON string `json:"enable_etime" gorm:"-"` // for fe
EnableEtimesJSON []string `json:"enable_etimes" gorm:"-"` // for fe
EnableDaysOfWeek string `json:"-"` // eg: "0 1 2 3 4 5 6 ; 0 1 2"
EnableDaysOfWeekJSON []string `json:"enable_days_of_week" gorm:"-"` // for fe
EnableDaysOfWeeksJSON [][]string `json:"enable_days_of_weeks" gorm:"-"` // for fe
EnableInBG int `json:"enable_in_bg"` // 0: global 1: enable one busi-group
NotifyRecovered int `json:"notify_recovered"` // whether notify when recovery
NotifyChannels string `json:"-"` // split by space: sms voice email dingtalk wecom
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // split by space: 233 43
NotifyGroupsObj []UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
NotifyRepeatStep int `json:"notify_repeat_step"` // notify repeat interval, unit: min
NotifyMaxNumber int `json:"notify_max_number"` // notify: max number
RecoverDuration int64 `json:"recover_duration"` // unit: s
Callbacks string `json:"-"` // split by space: http://a.com/api/x http://a.com/api/y'
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
RunbookUrl string `json:"runbook_url"` // sop url
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Annotations string `json:"-"` //
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
ExtraConfig string `json:"-" gorm:"extra_config"` // extra config
ExtraConfigJSON interface{} `json:"extra_config" gorm:"-"` // for fe
GroupId int64 `json:"group_id"` // busi group id
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
DatasourceIds string `json:"-" gorm:"datasource_ids"`
DatasourceIdsJson []int64 `json:"datasource_ids,omitempty" gorm:"-"` // alert rule list page use this field
DatasourceQueries []DatasourceQuery `json:"datasource_queries" gorm:"datasource_queries;type:text;serializer:json"` // datasource queries
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
Name string `json:"name"` // rule name
Note string `json:"note"` // will sent in notify
Prod string `json:"prod"` // product empty means n9e
Algorithm string `json:"algorithm"` // algorithm (''|holtwinters), empty means threshold
AlgoParams string `json:"-" gorm:"algo_params"` // params algorithm need
AlgoParamsJson interface{} `json:"algo_params" gorm:"-"` // for fe
Delay int `json:"delay"` // Time (in seconds) to delay evaluation
Severity int `json:"severity"` // 1: Emergency 2: Warning 3: Notice
Severities []int `json:"severities" gorm:"-"` // 1: Emergency 2: Warning 3: Notice
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
PromForDuration int `json:"prom_for_duration"` // prometheus for, unit:s
PromQl string `json:"prom_ql"` // just one ql
RuleConfig string `json:"-" gorm:"rule_config"` // rule config
RuleConfigJson interface{} `json:"rule_config" gorm:"-"` // rule config for fe
EventRelabelConfig []*pconf.RelabelConfig `json:"event_relabel_config" gorm:"-"` // event relabel config
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
EnableStime string `json:"-"` // split by space: "00:00 10:00 12:00"
EnableStimeJSON string `json:"enable_stime" gorm:"-"` // for fe
EnableStimesJSON []string `json:"enable_stimes" gorm:"-"` // for fe
EnableEtime string `json:"-"` // split by space: "00:00 10:00 12:00"
EnableEtimeJSON string `json:"enable_etime" gorm:"-"` // for fe
EnableEtimesJSON []string `json:"enable_etimes" gorm:"-"` // for fe
EnableDaysOfWeek string `json:"-"` // eg: "0 1 2 3 4 5 6 ; 0 1 2"
EnableDaysOfWeekJSON []string `json:"enable_days_of_week" gorm:"-"` // for fe
EnableDaysOfWeeksJSON [][]string `json:"enable_days_of_weeks" gorm:"-"` // for fe
EnableInBG int `json:"enable_in_bg"` // 0: global 1: enable one busi-group
NotifyRecovered int `json:"notify_recovered"` // whether notify when recovery
NotifyChannels string `json:"-"` // split by space: sms voice email dingtalk wecom
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // split by space: 233 43
NotifyGroupsObj []UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
NotifyRepeatStep int `json:"notify_repeat_step"` // notify repeat interval, unit: min
NotifyMaxNumber int `json:"notify_max_number"` // notify: max number
RecoverDuration int64 `json:"recover_duration"` // unit: s
Callbacks string `json:"-"` // split by space: http://a.com/api/x http://a.com/api/y'
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
RunbookUrl string `json:"runbook_url"` // sop url
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Annotations string `json:"-"` //
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
ExtraConfig string `json:"-" gorm:"extra_config"` // extra config
ExtraConfigJSON interface{} `json:"extra_config" gorm:"-"` // for fe
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
@ -174,11 +176,11 @@ type Trigger struct {
Exp string `json:"exp"`
Severity int `json:"severity"`
Type string `json:"type,omitempty"`
Duration int `json:"duration,omitempty"`
Percent int `json:"percent,omitempty"`
Joins []Join `json:"joins"`
JoinRef string `json:"join_ref"`
Type string `json:"type,omitempty"`
Duration int `json:"duration,omitempty"`
Percent int `json:"percent,omitempty"`
Joins []Join `json:"joins"`
JoinRef string `json:"join_ref"`
RecoverConfig RecoverConfig `json:"recover_config"`
}
@ -188,6 +190,132 @@ type Join struct {
On []string `json:"on"`
}
var DataSourceQueryAll = DatasourceQuery{
MatchType: 2,
Op: "in",
Values: []interface{}{DatasourceIdAll},
}
type DatasourceQuery struct {
MatchType int `json:"match_type"`
Op string `json:"op"`
Values []interface{} `json:"values"`
}
// GetDatasourceIDsByDatasourceQueries 从 datasourceQueries 中获取 datasourceIDs
// 查询分为精确\模糊匹配,逻辑有 in 与 not in
// idMap 为当前 datasourceQueries 对应的数据源全集
// nameMap 为所有 datasource 的 name 到 id 的映射,用于名称的模糊匹配
func GetDatasourceIDsByDatasourceQueries[T any](datasourceQueries []DatasourceQuery, idMap map[int64]T, nameMap map[string]int64) []int64 {
if len(datasourceQueries) == 0 {
return nil
}
// 所有 query 取交集,初始集合为全集
curIDs := make(map[int64]struct{})
for id, _ := range idMap {
curIDs[id] = struct{}{}
}
for i := range datasourceQueries {
// 每次 query 都在 curIDs 的基础上得到 dsIDs
dsIDs := make(map[int64]struct{})
q := datasourceQueries[i]
if q.MatchType == 0 {
// 精确匹配转为 id 匹配
idValues := make([]int64, 0, len(q.Values))
for v := range q.Values {
var val int64
switch v := q.Values[v].(type) {
case int64:
val = v
case int:
val = int64(v)
case float64:
val = int64(v)
case float32:
val = int64(v)
case int8:
val = int64(v)
case int16:
val = int64(v)
case int32:
val = int64(v)
default:
continue
}
idValues = append(idValues, int64(val))
}
if q.Op == "in" {
if len(idValues) == 1 && idValues[0] == DatasourceIdAll {
for id := range curIDs {
dsIDs[id] = struct{}{}
}
} else {
for idx := range idValues {
if _, exist := curIDs[idValues[idx]]; exist {
dsIDs[idValues[idx]] = struct{}{}
}
}
}
} else if q.Op == "not in" {
for idx := range idValues {
delete(curIDs, idValues[idx])
}
dsIDs = curIDs
}
} else if q.MatchType == 1 {
// 模糊匹配使用 datasource name
if q.Op == "in" {
for dsName, dsID := range nameMap {
if _, exist := curIDs[dsID]; exist {
for idx := range q.Values {
if _, ok := q.Values[idx].(string); !ok {
continue
}
if match.Match(dsName, q.Values[idx].(string)) {
dsIDs[nameMap[dsName]] = struct{}{}
}
}
}
}
} else if q.Op == "not in" {
for dsName, _ := range nameMap {
for idx := range q.Values {
if _, ok := q.Values[idx].(string); !ok {
continue
}
if match.Match(dsName, q.Values[idx].(string)) {
delete(curIDs, nameMap[dsName])
}
}
}
dsIDs = curIDs
}
} else if q.MatchType == 2 {
// 全部数据源
for id := range curIDs {
dsIDs[id] = struct{}{}
}
}
curIDs = dsIDs
if len(curIDs) == 0 {
break
}
}
dsIds := make([]int64, 0, len(curIDs))
for c := range curIDs {
dsIds = append(dsIds, c)
}
return dsIds
}
func GetHostsQuery(queries []HostQuery) []map[string]interface{} {
var query []map[string]interface{}
for _, q := range queries {
@ -286,9 +414,9 @@ func (ar *AlertRule) Verify() error {
return fmt.Errorf("GroupId(%d) invalid", ar.GroupId)
}
if IsAllDatasource(ar.DatasourceIdsJson) {
ar.DatasourceIdsJson = []int64{0}
}
//if IsAllDatasource(ar.DatasourceIdsJson) {
// ar.DatasourceIdsJson = []int64{0}
//}
if str.Dangerous(ar.Name) {
return errors.New("Name has invalid characters")
@ -342,7 +470,7 @@ func (ar *AlertRule) Add(ctx *ctx.Context) error {
return err
}
exists, err := AlertRuleExists(ctx, 0, ar.GroupId, ar.DatasourceIdsJson, ar.Name)
exists, err := AlertRuleExists(ctx, 0, ar.GroupId, ar.Name)
if err != nil {
return err
}
@ -360,7 +488,7 @@ func (ar *AlertRule) Add(ctx *ctx.Context) error {
func (ar *AlertRule) Update(ctx *ctx.Context, arf AlertRule) error {
if ar.Name != arf.Name {
exists, err := AlertRuleExists(ctx, ar.Id, ar.GroupId, ar.DatasourceIdsJson, arf.Name)
exists, err := AlertRuleExists(ctx, ar.Id, ar.GroupId, arf.Name)
if err != nil {
return err
}
@ -509,11 +637,30 @@ func (ar *AlertRule) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interfa
return DB(ctx).Model(ar).Updates(fields).Error
}
// for v5 rule
func (ar *AlertRule) FillDatasourceIds() error {
if ar.DatasourceIds != "" {
json.Unmarshal([]byte(ar.DatasourceIds), &ar.DatasourceIdsJson)
return nil
func (ar *AlertRule) FillDatasourceQueries() error {
// 兼容旧逻辑,将 datasourceIds 转换为 datasourceQueries
if len(ar.DatasourceQueries) == 0 && len(ar.DatasourceIds) != 0 {
datasourceQueries := DatasourceQuery{
MatchType: 0,
Op: "in",
Values: make([]interface{}, 0),
}
var values []int
if ar.DatasourceIds != "" {
json.Unmarshal([]byte(ar.DatasourceIds), &values)
}
for i := range values {
if values[i] == 0 {
// 0 表示所有数据源
datasourceQueries.MatchType = 2
break
}
datasourceQueries.Values = append(datasourceQueries.Values, values[i])
}
ar.DatasourceQueries = []DatasourceQuery{datasourceQueries}
}
return nil
}
@ -632,14 +779,6 @@ func (ar *AlertRule) FE2DB() error {
}
ar.AlgoParams = string(algoParamsByte)
if len(ar.DatasourceIdsJson) > 0 {
idsByte, err := json.Marshal(ar.DatasourceIdsJson)
if err != nil {
return fmt.Errorf("marshal datasource_ids err:%v", err)
}
ar.DatasourceIds = string(idsByte)
}
if ar.RuleConfigJson == nil {
query := PromQuery{
PromQl: ar.PromQl,
@ -711,8 +850,12 @@ func (ar *AlertRule) DB2FE() error {
json.Unmarshal([]byte(ar.RuleConfig), &ruleConfig)
ar.EventRelabelConfig = ruleConfig.EventRelabelConfig
err := ar.FillDatasourceIds()
return err
err := ar.FillDatasourceQueries()
if err != nil {
return err
}
return nil
}
func AlertRuleDels(ctx *ctx.Context, ids []int64, bgid ...int64) error {
@ -726,7 +869,7 @@ func AlertRuleDels(ctx *ctx.Context, ids []int64, bgid ...int64) error {
return ret.Error
}
// 说明确实删掉了,把相关的活跃告警也删了,这些告警永远都不会恢复了,而且策略都没了,说明没关心了
// 说明确实删掉了,把相关的活跃告警也删了,这些告警永远都不会恢复了,而且策略都没了,说明没<EFBFBD><EFBFBD><EFBFBD>关心了
if ret.RowsAffected > 0 {
DB(ctx).Where("rule_id = ?", ids[i]).Delete(new(AlertCurEvent))
}
@ -735,7 +878,7 @@ func AlertRuleDels(ctx *ctx.Context, ids []int64, bgid ...int64) error {
return nil
}
func AlertRuleExists(ctx *ctx.Context, id, groupId int64, datasourceIds []int64, name string) (bool, error) {
func AlertRuleExists(ctx *ctx.Context, id, groupId int64, name string) (bool, error) {
session := DB(ctx).Where("id <> ? and group_id = ? and name = ?", id, groupId, name)
var lst []AlertRule
@ -747,15 +890,6 @@ func AlertRuleExists(ctx *ctx.Context, id, groupId int64, datasourceIds []int64,
return false, nil
}
// match cluster
for _, r := range lst {
r.FillDatasourceIds()
for _, id := range r.DatasourceIdsJson {
if MatchDatasource(datasourceIds, id) {
return true, nil
}
}
}
return false, nil
}

View File

@ -388,12 +388,12 @@ func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
}
}
ret := make(map[int64]*Datasource)
ds := make(map[int64]*Datasource)
for i := 0; i < len(lst); i++ {
ret[lst[i].Id] = lst[i]
ds[lst[i].Id] = lst[i]
}
return ret, nil
return ds, nil
}
func DatasourceStatistics(ctx *ctx.Context) (*Statistics, error) {

View File

@ -186,7 +186,8 @@ func InsertPermPoints(db *gorm.DB) {
}
type AlertRule struct {
ExtraConfig string `gorm:"type:text;column:extra_config"` // extra config
ExtraConfig string `gorm:"type:text;column:extra_config"` // extra config
DatasourceQueries []models.DatasourceQuery `json:"datasource_queries" gorm:"datasource_queries;type:text;serializer:json"` // datasource queries
}
type AlertSubscribe struct {
@ -203,9 +204,10 @@ type AlertMute struct {
}
type RecordingRule struct {
QueryConfigs string `gorm:"type:text;not null;column:query_configs"` // query_configs
DatasourceIds string `gorm:"column:datasource_ids;type:varchar(255);default:'';comment:datasource ids"`
CronPattern string `gorm:"column:cron_pattern;type:varchar(255);default:'';comment:cron pattern"`
QueryConfigs string `gorm:"type:text;not null;column:query_configs"` // query_configs
DatasourceIds string `gorm:"column:datasource_ids;type:varchar(255);default:'';comment:datasource ids"`
CronPattern string `gorm:"column:cron_pattern;type:varchar(255);default:'';comment:cron pattern"`
DatasourceQueries []models.DatasourceQuery `json:"datasource_queries" gorm:"datasource_queries;type:text;serializer:json"` // datasource queries
}
type AlertingEngines struct {

View File

@ -31,7 +31,7 @@ func convertInterval(interval string) int {
return int(duration.Seconds())
}
func ConvertAlert(rule PromRule, interval string, datasouceIds []int64, disabled int) AlertRule {
func ConvertAlert(rule PromRule, interval string, datasouceQueries []DatasourceQuery, disabled int) AlertRule {
annotations := rule.Annotations
appendTags := []string{}
severity := 2
@ -55,29 +55,31 @@ func ConvertAlert(rule PromRule, interval string, datasouceIds []int64, disabled
}
}
return AlertRule{
Name: ruleName,
Severity: severity,
DatasourceIdsJson: datasouceIds,
Disabled: disabled,
PromForDuration: convertInterval(rule.For),
PromQl: rule.Expr,
PromEvalInterval: convertInterval(interval),
EnableStimeJSON: "00:00",
EnableEtimeJSON: "23:59",
ar := AlertRule{
Name: rule.Alert,
Severity: severity,
Disabled: disabled,
PromForDuration: convertInterval(rule.For),
PromQl: rule.Expr,
PromEvalInterval: convertInterval(interval),
EnableStimeJSON: "00:00",
EnableEtimeJSON: "23:59",
EnableDaysOfWeekJSON: []string{
"1", "2", "3", "4", "5", "6", "0",
},
EnableInBG: AlertRuleEnableInGlobalBG,
NotifyRecovered: AlertRuleNotifyRecovered,
NotifyRepeatStep: AlertRuleNotifyRepeatStep60Min,
RecoverDuration: AlertRuleRecoverDuration0Sec,
AnnotationsJSON: annotations,
AppendTagsJSON: appendTags,
EnableInBG: AlertRuleEnableInGlobalBG,
NotifyRecovered: AlertRuleNotifyRecovered,
NotifyRepeatStep: AlertRuleNotifyRepeatStep60Min,
RecoverDuration: AlertRuleRecoverDuration0Sec,
AnnotationsJSON: annotations,
AppendTagsJSON: appendTags,
DatasourceQueries: datasouceQueries,
}
return ar
}
func DealPromGroup(promRule []PromRuleGroup, dataSourceIds []int64, disabled int) []AlertRule {
func DealPromGroup(promRule []PromRuleGroup, dataSourceQueries []DatasourceQuery, disabled int) []AlertRule {
var alertRules []AlertRule
for _, group := range promRule {
@ -88,7 +90,7 @@ func DealPromGroup(promRule []PromRuleGroup, dataSourceIds []int64, disabled int
for _, rule := range group.Rules {
if rule.Alert != "" {
alertRules = append(alertRules,
ConvertAlert(rule, interval, dataSourceIds, disabled))
ConvertAlert(rule, interval, dataSourceQueries, disabled))
}
}
}

View File

@ -21,7 +21,7 @@ func TestConvertAlert(t *testing.T) {
t.Errorf("Failed to Unmarshal, err: %s", err)
}
t.Logf("jobMissing: %+v", jobMissing[0])
convJobMissing := models.ConvertAlert(jobMissing[0], "30s", []int64{1}, 0)
convJobMissing := models.ConvertAlert(jobMissing[0], "30s", []models.DatasourceQuery{}, 0)
if convJobMissing.PromEvalInterval != 30 {
t.Errorf("PromEvalInterval is expected to be 30, but got %d",
convJobMissing.PromEvalInterval)
@ -45,7 +45,7 @@ func TestConvertAlert(t *testing.T) {
description: "Prometheus rule evaluation took more time than the scheduled interval. It indicates a slower storage backend access or too complex query.\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"
`), &ruleEvaluationSlow)
t.Logf("ruleEvaluationSlow: %+v", ruleEvaluationSlow[0])
convRuleEvaluationSlow := models.ConvertAlert(ruleEvaluationSlow[0], "1m", []int64{1}, 0)
convRuleEvaluationSlow := models.ConvertAlert(ruleEvaluationSlow[0], "1m", []models.DatasourceQuery{}, 0)
if convRuleEvaluationSlow.PromEvalInterval != 60 {
t.Errorf("PromEvalInterval is expected to be 60, but got %d",
convJobMissing.PromEvalInterval)
@ -69,7 +69,7 @@ func TestConvertAlert(t *testing.T) {
description: "A Prometheus target has disappeared. An exporter might be crashed.\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"
`), &targetMissing)
t.Logf("targetMissing: %+v", targetMissing[0])
convTargetMissing := models.ConvertAlert(targetMissing[0], "1h", []int64{1}, 0)
convTargetMissing := models.ConvertAlert(targetMissing[0], "1h", []models.DatasourceQuery{}, 0)
if convTargetMissing.PromEvalInterval != 3600 {
t.Errorf("PromEvalInterval is expected to be 3600, but got %d",
convTargetMissing.PromEvalInterval)

View File

@ -16,25 +16,25 @@ import (
// A RecordingRule records its vector expression into new timeseries.
type RecordingRule struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
DatasourceIds string `json:"-" gorm:"datasource_ids"` // datasource ids
DatasourceIdsJson []int64 `json:"datasource_ids" gorm:"-"` // for fe
Cluster string `json:"cluster"` // take effect by cluster, seperated by space
Name string `json:"name"` // new metric name
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
PromQl string `json:"prom_ql"` // just one ql for promql
QueryConfigs string `json:"-" gorm:"query_configs"` // query_configs
QueryConfigsJson []QueryConfig `json:"query_configs" gorm:"-"` // query_configs for fe
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
CronPattern string `json:"cron_pattern"`
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Note string `json:"note"` // note
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
DatasourceIds string `json:"-" gorm:"datasource_ids,omitempty"` // datasource ids
DatasourceQueries []DatasourceQuery `json:"datasource_queries,omitempty" gorm:"datasource_queries;type:text;serializer:json"` // datasource queries
Cluster string `json:"cluster"` // take effect by cluster, seperated by space
Name string `json:"name"` // new metric name
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
PromQl string `json:"prom_ql"` // just one ql for promql
QueryConfigs string `json:"-" gorm:"query_configs"` // query_configs
QueryConfigsJson []QueryConfig `json:"query_configs" gorm:"-"` // query_configs for fe
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
CronPattern string `json:"cron_pattern"`
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Note string `json:"note"` // note
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
}
type QueryConfig struct {
@ -46,9 +46,10 @@ type QueryConfig struct {
}
type Query struct {
DatasourceIds []int64 `json:"datasource_ids"`
Cate string `json:"cate"`
Config interface{} `json:"config"`
DatasourceIds []int64 `json:"datasource_ids"`
DatasourceQueries []DatasourceQuery `json:"datasource_queries"`
Cate string `json:"cate"`
Config interface{} `json:"config"`
}
func (re *RecordingRule) TableName() string {
@ -57,8 +58,6 @@ func (re *RecordingRule) TableName() string {
func (re *RecordingRule) FE2DB() {
re.AppendTags = strings.Join(re.AppendTagsJSON, " ")
idsByte, _ := json.Marshal(re.DatasourceIdsJson)
re.DatasourceIds = string(idsByte)
queryConfigsByte, _ := json.Marshal(re.QueryConfigsJson)
re.QueryConfigs = string(queryConfigsByte)
@ -66,9 +65,28 @@ func (re *RecordingRule) FE2DB() {
func (re *RecordingRule) DB2FE() error {
re.AppendTagsJSON = strings.Fields(re.AppendTags)
json.Unmarshal([]byte(re.DatasourceIds), &re.DatasourceIdsJson)
re.FillDatasourceQueries()
json.Unmarshal([]byte(re.QueryConfigs), &re.QueryConfigsJson)
// 存量数据规则不包含 DatasourceQueries 字段,将 DatasourceIds 转换为 DatasourceQueries 字段
for i := range re.QueryConfigsJson {
for j := range re.QueryConfigsJson[i].Queries {
if len(re.QueryConfigsJson[i].Queries[j].DatasourceQueries) == 0 {
values := make([]interface{}, 0, len(re.QueryConfigsJson[i].Queries[j].DatasourceIds))
for _, dsID := range re.QueryConfigsJson[i].Queries[j].DatasourceIds {
values = append(values, dsID)
}
re.QueryConfigsJson[i].Queries[j].DatasourceQueries = []DatasourceQuery{
{
MatchType: 0,
Op: "in",
Values: values,
},
}
}
}
}
if re.CronPattern == "" && re.PromEvalInterval != 0 {
re.CronPattern = fmt.Sprintf("@every %ds", re.PromEvalInterval)
@ -77,14 +95,42 @@ func (re *RecordingRule) DB2FE() error {
return nil
}
func (re *RecordingRule) FillDatasourceQueries() error {
// 兼容旧逻辑,将 datasourceIds 转换为 datasourceQueries
if len(re.DatasourceQueries) == 0 && len(re.DatasourceIds) != 0 {
datasourceQueries := DatasourceQuery{
MatchType: 0,
Op: "in",
Values: make([]interface{}, 0),
}
var values []int64
if re.DatasourceIds != "" {
json.Unmarshal([]byte(re.DatasourceIds), &values)
}
for i := range values {
if values[i] == 0 {
// 0 表示所有数据源
datasourceQueries.MatchType = 2
break
}
datasourceQueries.Values = append(datasourceQueries.Values, values[i])
}
re.DatasourceQueries = []DatasourceQuery{datasourceQueries}
}
return nil
}
func (re *RecordingRule) Verify() error {
if re.GroupId < 0 {
return fmt.Errorf("GroupId(%d) invalid", re.GroupId)
}
if IsAllDatasource(re.DatasourceIdsJson) {
re.DatasourceIdsJson = []int64{0}
}
//if IsAllDatasource(re.DatasourceIdsJson) {
// re.DatasourceIdsJson = []int64{0}
//}
if re.PromQl != "" && !model.MetricNameRE.MatchString(re.Name) {
return errors.New("Name has invalid chreacters")

View File

@ -3,7 +3,6 @@ package prom
import (
"sync"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/prom"
)
@ -62,29 +61,6 @@ func (pc *PromClientMap) IsNil(datasourceId int64) bool {
return c == nil
}
// Hit 根据当前有效的 datasourceId 和规则的 datasourceId 配置计算有效的cluster列表
func (pc *PromClientMap) Hit(datasourceIds []int64) []int64 {
pc.RLock()
defer pc.RUnlock()
dsIds := make([]int64, 0, len(pc.ReaderClients))
if len(datasourceIds) == 1 && datasourceIds[0] == models.DatasourceIdAll {
for c := range pc.ReaderClients {
dsIds = append(dsIds, c)
}
return dsIds
}
for dsId := range pc.ReaderClients {
for _, id := range datasourceIds {
if id == dsId {
dsIds = append(dsIds, id)
continue
}
}
}
return dsIds
}
func (pc *PromClientMap) Reset() {
pc.Lock()
defer pc.Unlock()

View File

@ -80,29 +80,6 @@ func (pc *TdengineClientMap) IsNil(datasourceId int64) bool {
return c == nil
}
// Hit 根据当前有效的 datasourceId 和规则的 datasourceId 配置计算有效的cluster列表
func (pc *TdengineClientMap) Hit(datasourceIds []int64) []int64 {
pc.RLock()
defer pc.RUnlock()
dsIds := make([]int64, 0, len(pc.ReaderClients))
if len(datasourceIds) == 1 && datasourceIds[0] == models.DatasourceIdAll {
for c := range pc.ReaderClients {
dsIds = append(dsIds, c)
}
return dsIds
}
for dsId := range pc.ReaderClients {
for _, id := range datasourceIds {
if id == dsId {
dsIds = append(dsIds, id)
continue
}
}
}
return dsIds
}
func (pc *TdengineClientMap) Reset() {
pc.Lock()
defer pc.Unlock()