feat: support record event notify detail (#2088)

* feat: record alert notification (#2045)

* record notification

---------

Co-authored-by: wenbo <bupt.lwb@gmail.com>
Co-authored-by: wenbo <1027758873@qq.com>
This commit is contained in:
Yening Qin 2024-08-09 17:06:49 +08:00 committed by GitHub
parent 656326458f
commit 1bb590ce6d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 554 additions and 89 deletions

View File

@ -112,5 +112,5 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
go consumer.LoopConsume()
go queue.ReportQueueSize(alertStats)
go sender.InitEmailSender(notifyConfigCache)
go sender.InitEmailSender(ctx, notifyConfigCache)
}

View File

@ -242,7 +242,8 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
needSend := e.BeforeSenderHook(event)
if needSend {
for channel, uids := range notifyTarget.ToChannelUserMap() {
msgCtx := sender.BuildMessageContext(rule, []*models.AlertCurEvent{event}, uids, e.userCache, e.Astats)
msgCtx := sender.BuildMessageContext(e.ctx, rule, []*models.AlertCurEvent{event},
uids, e.userCache, e.Astats)
e.RwLock.RLock()
s := e.Senders[channel]
e.RwLock.RUnlock()
@ -269,13 +270,14 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
// handle global webhooks
if e.alerting.WebhookBatchSend {
sender.BatchSendWebhooks(notifyTarget.ToWebhookList(), event, e.Astats)
sender.BatchSendWebhooks(e.ctx, notifyTarget.ToWebhookList(), event, e.Astats)
} else {
sender.SingleSendWebhooks(notifyTarget.ToWebhookList(), event, e.Astats)
sender.SingleSendWebhooks(e.ctx, notifyTarget.ToWebhookList(), event, e.Astats)
}
// handle plugin call
go sender.MayPluginNotify(e.genNoticeBytes(event), e.notifyConfigCache.GetNotifyScript(), e.Astats)
go sender.MayPluginNotify(e.ctx, e.genNoticeBytes(event), e.notifyConfigCache.
GetNotifyScript(), e.Astats, event)
}
func (e *Dispatch) SendCallbacks(rule *models.AlertRule, notifyTarget *NotifyTarget, event *models.AlertCurEvent) {

View File

@ -125,7 +125,7 @@ func (c *DefaultCallBacker) CallBack(ctx CallBackContext) {
Batch: 1000,
}
PushCallbackEvent(webhookConf, event, ctx.Stats)
PushCallbackEvent(ctx.Ctx, webhookConf, event, ctx.Stats)
return
}
@ -141,16 +141,38 @@ func (c *DefaultCallBacker) CallBack(ctx CallBackContext) {
}
}
func doSend(url string, body interface{}, channel string, stats *astats.Stats) {
func doSendAndRecord(ctx *ctx.Context, url, token string, body interface{}, channel string,
stats *astats.Stats, event *models.AlertCurEvent) {
res, err := doSend(url, body, channel, stats)
NotifyRecord(ctx, event, channel, token, res, err)
}
func NotifyRecord(ctx *ctx.Context, evt *models.AlertCurEvent, channel, target, res string, err error) {
noti := models.NewNotificationRecord(evt, channel, target)
if err != nil {
noti.SetStatus(models.NotiStatusFailure)
noti.SetDetails(err.Error())
} else if res != "" {
noti.SetDetails(string(res))
}
if err := noti.Add(ctx); err != nil {
logger.Errorf("Add noti failed, err: %v", err)
}
}
func doSend(url string, body interface{}, channel string, stats *astats.Stats) (string, error) {
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
res, code, err := poster.PostJSON(url, time.Second*5, body, 3)
if err != nil {
logger.Errorf("%s_sender: result=fail url=%s code=%d error=%v req:%v response=%s", channel, url, code, err, body, string(res))
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
} else {
logger.Infof("%s_sender: result=succ url=%s code=%d req:%v response=%s", channel, url, code, body, string(res))
return "", err
}
logger.Infof("%s_sender: result=succ url=%s code=%d req:%v response=%s", channel, url, code, body, string(res))
return string(res), nil
}
type TaskCreateReply struct {
@ -158,7 +180,7 @@ type TaskCreateReply struct {
Dat int64 `json:"dat"` // task.id
}
func PushCallbackEvent(webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
func PushCallbackEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
CallbackEventQueueLock.RLock()
queue := CallbackEventQueue[webhook.Url]
CallbackEventQueueLock.RUnlock()
@ -173,7 +195,7 @@ func PushCallbackEvent(webhook *models.Webhook, event *models.AlertCurEvent, sta
CallbackEventQueue[webhook.Url] = queue
CallbackEventQueueLock.Unlock()
StartConsumer(queue, webhook.Batch, webhook, stats)
StartConsumer(ctx, queue, webhook.Batch, webhook, stats)
}
succ := queue.list.PushFront(event)

View File

@ -1,9 +1,10 @@
package sender
import (
"github.com/ccfos/nightingale/v6/models"
"html/template"
"strings"
"github.com/ccfos/nightingale/v6/models"
)
type dingtalkMarkdown struct {
@ -35,13 +36,13 @@ func (ds *DingtalkSender) Send(ctx MessageContext) {
return
}
urls, ats := ds.extract(ctx.Users)
urls, ats, tokens := ds.extract(ctx.Users)
if len(urls) == 0 {
return
}
message := BuildTplMessage(models.Dingtalk, ds.tpl, ctx.Events)
for _, url := range urls {
for i, url := range urls {
var body dingtalk
// NoAt in url
if strings.Contains(url, "noat=1") {
@ -66,7 +67,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) {
}
}
doSend(url, body, models.Dingtalk, ctx.Stats)
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Dingtalk, ctx.Stats, ctx.Events[0])
}
}
@ -96,15 +97,17 @@ func (ds *DingtalkSender) CallBack(ctx CallBackContext) {
body.Markdown.Text = message
}
doSend(ctx.CallBackURL, body, models.Dingtalk, ctx.Stats)
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body,
"callback", ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
// extract urls and ats from Users
func (ds *DingtalkSender) extract(users []*models.User) ([]string, []string) {
func (ds *DingtalkSender) extract(users []*models.User) ([]string, []string, []string) {
urls := make([]string, 0, len(users))
ats := make([]string, 0, len(users))
tokens := make([]string, 0, len(users))
for _, user := range users {
if user.Phone != "" {
@ -116,7 +119,8 @@ func (ds *DingtalkSender) extract(users []*models.User) ([]string, []string) {
url = "https://oapi.dingtalk.com/robot/send?access_token=" + token
}
urls = append(urls, url)
tokens = append(tokens, token)
}
}
return urls, ats
return urls, ats, tokens
}

View File

@ -9,13 +9,14 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
"gopkg.in/gomail.v2"
)
var mailch chan *gomail.Message
var mailch chan *EmailContext
type EmailSender struct {
subjectTpl *template.Template
@ -23,6 +24,11 @@ type EmailSender struct {
smtp aconf.SMTPConfig
}
type EmailContext struct {
event *models.AlertCurEvent
mail *gomail.Message
}
func (es *EmailSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return
@ -36,7 +42,7 @@ func (es *EmailSender) Send(ctx MessageContext) {
subject = ctx.Events[0].RuleName
}
content := BuildTplMessage(models.Email, es.contentTpl, ctx.Events)
es.WriteEmail(subject, content, tos)
es.WriteEmail(subject, content, tos, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues(models.Email).Add(float64(len(tos)))
}
@ -73,7 +79,8 @@ func SendEmail(subject, content string, tos []string, stmp aconf.SMTPConfig) err
return nil
}
func (es *EmailSender) WriteEmail(subject, content string, tos []string) {
func (es *EmailSender) WriteEmail(subject, content string, tos []string,
event *models.AlertCurEvent) {
m := gomail.NewMessage()
m.SetHeader("From", es.smtp.From)
@ -81,7 +88,7 @@ func (es *EmailSender) WriteEmail(subject, content string, tos []string) {
m.SetHeader("Subject", subject)
m.SetBody("text/html", content)
mailch <- m
mailch <- &EmailContext{event, m}
}
func dialSmtp(d *gomail.Dialer) gomail.SendCloser {
@ -104,22 +111,22 @@ func dialSmtp(d *gomail.Dialer) gomail.SendCloser {
var mailQuit = make(chan struct{})
func RestartEmailSender(smtp aconf.SMTPConfig) {
func RestartEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
// Notify internal start exit
mailQuit <- struct{}{}
startEmailSender(smtp)
startEmailSender(ctx, smtp)
}
var smtpConfig aconf.SMTPConfig
func InitEmailSender(ncc *memsto.NotifyConfigCacheType) {
mailch = make(chan *gomail.Message, 100000)
go updateSmtp(ncc)
func InitEmailSender(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
mailch = make(chan *EmailContext, 100000)
go updateSmtp(ctx, ncc)
smtpConfig = ncc.GetSMTP()
startEmailSender(smtpConfig)
startEmailSender(ctx, smtpConfig)
}
func updateSmtp(ncc *memsto.NotifyConfigCacheType) {
func updateSmtp(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
for {
time.Sleep(1 * time.Minute)
smtp := ncc.GetSMTP()
@ -127,12 +134,12 @@ func updateSmtp(ncc *memsto.NotifyConfigCacheType) {
smtpConfig.Pass != smtp.Pass || smtpConfig.User != smtp.User || smtpConfig.Port != smtp.Port ||
smtpConfig.InsecureSkipVerify != smtp.InsecureSkipVerify { //diff
smtpConfig = smtp
RestartEmailSender(smtp)
RestartEmailSender(ctx, smtp)
}
}
}
func startEmailSender(smtp aconf.SMTPConfig) {
func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
conf := smtp
if conf.Host == "" || conf.Port == 0 {
logger.Warning("SMTP configurations invalid")
@ -167,7 +174,8 @@ func startEmailSender(smtp aconf.SMTPConfig) {
}
open = true
}
if err := gomail.Send(s, m); err != nil {
var err error
if err = gomail.Send(s, m.mail); err != nil {
logger.Errorf("email_sender: failed to send: %s", err)
// close and retry
@ -184,11 +192,16 @@ func startEmailSender(smtp aconf.SMTPConfig) {
}
open = true
if err := gomail.Send(s, m); err != nil {
if err = gomail.Send(s, m.mail); err != nil {
logger.Errorf("email_sender: failed to retry send: %s", err)
}
} else {
logger.Infof("email_sender: result=succ subject=%v to=%v", m.GetHeader("Subject"), m.GetHeader("To"))
logger.Infof("email_sender: result=succ subject=%v to=%v",
m.mail.GetHeader("Subject"), m.mail.GetHeader("To"))
}
for _, to := range m.mail.GetHeader("To") {
NotifyRecord(ctx, m.event, models.Email, to, "", err)
}
size++

View File

@ -54,7 +54,8 @@ func (fs *FeishuSender) CallBack(ctx CallBackContext) {
},
}
doSend(ctx.CallBackURL, body, models.Feishu, ctx.Stats)
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
@ -62,9 +63,9 @@ func (fs *FeishuSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return
}
urls, ats := fs.extract(ctx.Users)
urls, ats, tokens := fs.extract(ctx.Users)
message := BuildTplMessage(models.Feishu, fs.tpl, ctx.Events)
for _, url := range urls {
for i, url := range urls {
body := feishu{
Msgtype: "text",
Content: feishuContent{
@ -77,13 +78,14 @@ func (fs *FeishuSender) Send(ctx MessageContext) {
IsAtAll: false,
}
}
doSend(url, body, models.Feishu, ctx.Stats)
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Feishu, ctx.Stats, ctx.Events[0])
}
}
func (fs *FeishuSender) extract(users []*models.User) ([]string, []string) {
func (fs *FeishuSender) extract(users []*models.User) ([]string, []string, []string) {
urls := make([]string, 0, len(users))
ats := make([]string, 0, len(users))
tokens := make([]string, 0, len(users))
for _, user := range users {
if user.Phone != "" {
@ -95,7 +97,8 @@ func (fs *FeishuSender) extract(users []*models.User) ([]string, []string) {
url = "https://open.feishu.cn/open-apis/bot/v2/hook/" + token
}
urls = append(urls, url)
tokens = append(tokens, token)
}
}
return urls, ats
return urls, ats, tokens
}

View File

@ -134,14 +134,15 @@ func (fs *FeishuCardSender) CallBack(ctx CallBackContext) {
}
parsedURL.RawQuery = ""
doSend(parsedURL.String(), body, models.FeishuCard, ctx.Stats)
doSendAndRecord(ctx.Ctx, parsedURL.String(), parsedURL.String(), body, "callback",
ctx.Stats, ctx.Events[0])
}
func (fs *FeishuCardSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return
}
urls, _ := fs.extract(ctx.Users)
urls, tokens := fs.extract(ctx.Users)
message := BuildTplMessage(models.FeishuCard, fs.tpl, ctx.Events)
color := "red"
lowerUnicode := strings.ToLower(message)
@ -156,14 +157,15 @@ func (fs *FeishuCardSender) Send(ctx MessageContext) {
body.Card.Header.Template = color
body.Card.Elements[0].Text.Content = message
body.Card.Elements[2].Elements[0].Content = SendTitle
for _, url := range urls {
doSend(url, body, models.FeishuCard, ctx.Stats)
for i, url := range urls {
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.FeishuCard,
ctx.Stats, ctx.Events[0])
}
}
func (fs *FeishuCardSender) extract(users []*models.User) ([]string, []string) {
urls := make([]string, 0, len(users))
ats := make([]string, 0)
tokens := make([]string, 0, len(users))
for i := range users {
if token, has := users[i].ExtractToken(models.FeishuCard); has {
url := token
@ -171,7 +173,8 @@ func (fs *FeishuCardSender) extract(users []*models.User) ([]string, []string) {
url = "https://open.feishu.cn/open-apis/bot/v2/hook/" + strings.TrimSpace(token)
}
urls = append(urls, url)
tokens = append(tokens, token)
}
}
return urls, ats
return urls, tokens
}

View File

@ -27,7 +27,8 @@ func (lk *LarkSender) CallBack(ctx CallBackContext) {
},
}
doSend(ctx.CallBackURL, body, models.Lark, ctx.Stats)
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}

View File

@ -55,7 +55,8 @@ func (fs *LarkCardSender) CallBack(ctx CallBackContext) {
}
parsedURL.RawQuery = ""
doSend(parsedURL.String(), body, models.LarkCard, ctx.Stats)
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
}
func (fs *LarkCardSender) Send(ctx MessageContext) {

View File

@ -7,6 +7,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
)
@ -38,11 +39,11 @@ func (ms *MmSender) Send(ctx MessageContext) {
}
message := BuildTplMessage(models.Mm, ms.tpl, ctx.Events)
SendMM(MatterMostMessage{
SendMM(ctx.Ctx, MatterMostMessage{
Text: message,
Tokens: urls,
Stats: ctx.Stats,
})
}, ctx.Events[0])
}
func (ms *MmSender) CallBack(ctx CallBackContext) {
@ -51,11 +52,11 @@ func (ms *MmSender) CallBack(ctx CallBackContext) {
}
message := BuildTplMessage(models.Mm, ms.tpl, ctx.Events)
SendMM(MatterMostMessage{
SendMM(ctx.Ctx, MatterMostMessage{
Text: message,
Tokens: []string{ctx.CallBackURL},
Stats: ctx.Stats,
})
}, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
@ -70,7 +71,7 @@ func (ms *MmSender) extract(users []*models.User) []string {
return tokens
}
func SendMM(message MatterMostMessage) {
func SendMM(ctx *ctx.Context, message MatterMostMessage, event *models.AlertCurEvent) {
for i := 0; i < len(message.Tokens); i++ {
u, err := url.Parse(message.Tokens[i])
if err != nil {
@ -103,7 +104,7 @@ func SendMM(message MatterMostMessage) {
Username: username,
Text: txt + message.Text,
}
doSend(ur, body, models.Mm, message.Stats)
doSendAndRecord(ctx, ur, message.Tokens[i], body, models.Mm, message.Stats, event)
}
}
}

View File

@ -2,26 +2,30 @@ package sender
import (
"bytes"
"fmt"
"os"
"os/exec"
"time"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/sys"
)
func MayPluginNotify(noticeBytes []byte, notifyScript models.NotifyScript, stats *astats.Stats) {
func MayPluginNotify(ctx *ctx.Context, noticeBytes []byte, notifyScript models.NotifyScript,
stats *astats.Stats, event *models.AlertCurEvent) {
if len(noticeBytes) == 0 {
return
}
alertingCallScript(noticeBytes, notifyScript, stats)
alertingCallScript(ctx, noticeBytes, notifyScript, stats, event)
}
func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript, stats *astats.Stats) {
func alertingCallScript(ctx *ctx.Context, stdinBytes []byte, notifyScript models.NotifyScript,
stats *astats.Stats, event *models.AlertCurEvent) {
// not enable or no notify.py? do nothing
config := notifyScript
if !config.Enable || config.Content == "" {
@ -81,6 +85,7 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript, sta
}
err, isTimeout := sys.WrapTimeout(cmd, time.Duration(config.Timeout)*time.Second)
NotifyRecord(ctx, event, channel, cmd.String(), "", buildErr(err, isTimeout))
if isTimeout {
if err == nil {
@ -102,3 +107,11 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript, sta
logger.Infof("event_script_notify_ok: exec %s output: %s", fpath, buf.String())
}
func buildErr(err error, isTimeout bool) error {
if err == nil && !isTimeout {
return nil
} else {
return fmt.Errorf("is_timeout: %v, err: %v", isTimeout, err)
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
type (
@ -22,6 +23,7 @@ type (
Rule *models.AlertRule
Events []*models.AlertCurEvent
Stats *astats.Stats
Ctx *ctx.Context
}
)
@ -49,13 +51,15 @@ func NewSender(key string, tpls map[string]*template.Template, smtp ...aconf.SMT
return nil
}
func BuildMessageContext(rule *models.AlertRule, events []*models.AlertCurEvent, uids []int64, userCache *memsto.UserCacheType, stats *astats.Stats) MessageContext {
func BuildMessageContext(ctx *ctx.Context, rule *models.AlertRule, events []*models.AlertCurEvent,
uids []int64, userCache *memsto.UserCacheType, stats *astats.Stats) MessageContext {
users := userCache.GetByUserIds(uids)
return MessageContext{
Rule: rule,
Events: events,
Users: users,
Stats: stats,
Ctx: ctx,
}
}

View File

@ -6,6 +6,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
)
@ -35,11 +36,11 @@ func (ts *TelegramSender) CallBack(ctx CallBackContext) {
}
message := BuildTplMessage(models.Telegram, ts.tpl, ctx.Events)
SendTelegram(TelegramMessage{
SendTelegram(ctx.Ctx, TelegramMessage{
Text: message,
Tokens: []string{ctx.CallBackURL},
Stats: ctx.Stats,
})
}, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
@ -51,11 +52,11 @@ func (ts *TelegramSender) Send(ctx MessageContext) {
tokens := ts.extract(ctx.Users)
message := BuildTplMessage(models.Telegram, ts.tpl, ctx.Events)
SendTelegram(TelegramMessage{
SendTelegram(ctx.Ctx, TelegramMessage{
Text: message,
Tokens: tokens,
Stats: ctx.Stats,
})
}, ctx.Events[0])
}
func (ts *TelegramSender) extract(users []*models.User) []string {
@ -68,7 +69,7 @@ func (ts *TelegramSender) extract(users []*models.User) []string {
return tokens
}
func SendTelegram(message TelegramMessage) {
func SendTelegram(ctx *ctx.Context, message TelegramMessage, event *models.AlertCurEvent) {
for i := 0; i < len(message.Tokens); i++ {
if !strings.Contains(message.Tokens[i], "/") && !strings.HasPrefix(message.Tokens[i], "https://") {
logger.Errorf("telegram_sender: result=fail invalid token=%s", message.Tokens[i])
@ -92,6 +93,6 @@ func SendTelegram(message TelegramMessage) {
Text: message.Text,
}
doSend(url, body, models.Telegram, message.Stats)
doSendAndRecord(ctx, url, message.Tokens[i], body, models.Telegram, message.Stats, event)
}
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
@ -11,11 +12,12 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
)
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) bool {
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) (bool, string, error) {
channel := "webhook"
if webhook.Type == models.RuleCallback {
channel = "callback"
@ -23,12 +25,12 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
conf := webhook
if conf.Url == "" || !conf.Enable {
return false
return false, "", nil
}
bs, err := json.Marshal(event)
if err != nil {
logger.Errorf("%s alertingWebhook failed to marshal event:%+v err:%v", channel, event, err)
return false
return false, "", err
}
bf := bytes.NewBuffer(bs)
@ -36,7 +38,7 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
req, err := http.NewRequest("POST", conf.Url, bf)
if err != nil {
logger.Warningf("%s alertingWebhook failed to new reques event:%s err:%v", channel, string(bs), err)
return true
return true, "", err
}
req.Header.Set("Content-Type", "application/json")
@ -66,14 +68,15 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
var resp *http.Response
var body []byte
resp, err = client.Do(req)
if err != nil {
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
logger.Errorf("event_%s_fail, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, err)
return true
return true, "", err
}
var body []byte
if resp.Body != nil {
defer resp.Body.Close()
body, _ = io.ReadAll(resp.Body)
@ -81,18 +84,19 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
if resp.StatusCode == 429 {
logger.Errorf("event_%s_fail, url: %s, response code: %d, body: %s event:%s", channel, conf.Url, resp.StatusCode, string(body), string(bs))
return true
return true, string(body), fmt.Errorf("status code is 429")
}
logger.Debugf("event_%s_succ, url: %s, response code: %d, body: %s event:%s", channel, conf.Url, resp.StatusCode, string(body), string(bs))
return false
return false, string(body), nil
}
func SingleSendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
func SingleSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
for _, conf := range webhooks {
retryCount := 0
for retryCount < 3 {
needRetry := sendWebhook(conf, event, stats)
needRetry, res, err := sendWebhook(conf, event, stats)
NotifyRecord(ctx, event, "webhook", conf.Url, res, err)
if !needRetry {
break
}
@ -102,10 +106,10 @@ func SingleSendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent,
}
}
func BatchSendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
func BatchSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
for _, conf := range webhooks {
logger.Infof("push event:%+v to queue:%v", event, conf)
PushEvent(conf, event, stats)
PushEvent(ctx, conf, event, stats)
}
}
@ -121,7 +125,7 @@ type WebhookQueue struct {
closeCh chan struct{}
}
func PushEvent(webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
EventQueueLock.RLock()
queue := EventQueue[webhook.Url]
EventQueueLock.RUnlock()
@ -136,7 +140,7 @@ func PushEvent(webhook *models.Webhook, event *models.AlertCurEvent, stats *asta
EventQueue[webhook.Url] = queue
EventQueueLock.Unlock()
StartConsumer(queue, webhook.Batch, webhook, stats)
StartConsumer(ctx, queue, webhook.Batch, webhook, stats)
}
succ := queue.list.PushFront(event)
@ -146,7 +150,7 @@ func PushEvent(webhook *models.Webhook, event *models.AlertCurEvent, stats *asta
}
}
func StartConsumer(queue *WebhookQueue, popSize int, webhook *models.Webhook, stats *astats.Stats) {
func StartConsumer(ctx *ctx.Context, queue *WebhookQueue, popSize int, webhook *models.Webhook, stats *astats.Stats) {
for {
select {
case <-queue.closeCh:
@ -161,13 +165,21 @@ func StartConsumer(queue *WebhookQueue, popSize int, webhook *models.Webhook, st
retryCount := 0
for retryCount < webhook.RetryCount {
needRetry := sendWebhook(webhook, events, stats)
needRetry, res, err := sendWebhook(webhook, events, stats)
if !needRetry {
break
}
go RecordEvents(ctx, webhook, events, stats, res, err)
retryCount++
time.Sleep(time.Second * time.Duration(webhook.RetryInterval) * time.Duration(retryCount))
}
}
}
}
func RecordEvents(ctx *ctx.Context, webhook *models.Webhook, events []*models.AlertCurEvent, stats *astats.Stats, res string, err error) {
for _, event := range events {
time.Sleep(time.Millisecond * 10)
NotifyRecord(ctx, event, "webhook", webhook.Url, res, err)
}
}

View File

@ -37,7 +37,8 @@ func (ws *WecomSender) CallBack(ctx CallBackContext) {
},
}
doSend(ctx.CallBackURL, body, models.Wecom, ctx.Stats)
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
@ -45,21 +46,22 @@ func (ws *WecomSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return
}
urls := ws.extract(ctx.Users)
urls, tokens := ws.extract(ctx.Users)
message := BuildTplMessage(models.Wecom, ws.tpl, ctx.Events)
for _, url := range urls {
for i, url := range urls {
body := wecom{
Msgtype: "markdown",
Markdown: wecomMarkdown{
Content: message,
},
}
doSend(url, body, models.Wecom, ctx.Stats)
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Wecom, ctx.Stats, ctx.Events[0])
}
}
func (ws *WecomSender) extract(users []*models.User) []string {
func (ws *WecomSender) extract(users []*models.User) ([]string, []string) {
urls := make([]string, 0, len(users))
tokens := make([]string, 0, len(users))
for _, user := range users {
if token, has := user.ExtractToken(models.Wecom); has {
url := token
@ -67,7 +69,8 @@ func (ws *WecomSender) extract(users []*models.User) []string {
url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=" + token
}
urls = append(urls, url)
tokens = append(tokens, token)
}
}
return urls
return urls, tokens
}

View File

@ -13,6 +13,7 @@ type Center struct {
UseFileAssets bool
FlashDuty FlashDuty
EventHistoryGroupView bool
CleanNotifyRecordDay int
}
type Plugin struct {

View File

@ -16,6 +16,7 @@ import (
centerrt "github.com/ccfos/nightingale/v6/center/router"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/conf"
"github.com/ccfos/nightingale/v6/cron"
"github.com/ccfos/nightingale/v6/dumper"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
@ -106,6 +107,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
go version.GetGithubVersion()
go cron.CleanNotifyRecord(ctx, config.Center.CleanNotifyRecordDay)
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
centerRouter := centerrt.New(config.HTTP, config.Center, config.Alert, config.Ibex, cconf.Operations, dsCache, notifyConfigCache, promClients, tdengineClients,
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)

View File

@ -352,9 +352,11 @@ func (rt *Router) Config(r *gin.Engine) {
if rt.Center.AnonymousAccess.AlertDetail {
pages.GET("/alert-cur-event/:eid", rt.alertCurEventGet)
pages.GET("/alert-his-event/:eid", rt.alertHisEventGet)
pages.GET("/event-notify-records/:eid", rt.notificationRecordList)
} else {
pages.GET("/alert-cur-event/:eid", rt.auth(), rt.user(), rt.alertCurEventGet)
pages.GET("/alert-his-event/:eid", rt.auth(), rt.user(), rt.alertHisEventGet)
pages.GET("/event-notify-records/:eid", rt.auth(), rt.user(), rt.notificationRecordList)
}
// card logic

View File

@ -0,0 +1,196 @@
package router
import (
"strings"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
type NotificationResponse struct {
SubRules []SubRule `json:"sub_rules"`
Notifies map[string][]Record `json:"notifies"`
}
type SubRule struct {
SubID int64 `json:"sub_id"`
Notifies map[string][]Record `json:"notifies"`
}
type Notify struct {
Channel string `json:"channel"`
Records []Record `json:"records"`
}
type Record struct {
Target string `json:"target"`
Username string `json:"username"`
Status int `json:"status"`
Detail string `json:"detail"`
}
func (rt *Router) notificationRecordList(c *gin.Context) {
eid := ginx.UrlParamInt64(c, "eid")
lst, err := models.NotificaitonRecordsGetByEventId(rt.Ctx, eid)
ginx.Dangerous(err)
response := buildNotificationResponse(rt.Ctx, lst)
ginx.NewRender(c).Data(response, nil)
}
func buildNotificationResponse(ctx *ctx.Context, nl []*models.NotificaitonRecord) NotificationResponse {
response := NotificationResponse{
SubRules: []SubRule{},
Notifies: make(map[string][]Record),
}
subRuleMap := make(map[int64]*SubRule)
// Collect all group IDs
groupIdSet := make(map[int64]struct{})
// map[SubId]map[Channel]map[Target]index
filter := make(map[int64]map[string]map[string]int)
for i, n := range nl {
// 对相同的 channel-target 进行合并
for _, gid := range n.GetGroupIds(ctx) {
groupIdSet[gid] = struct{}{}
}
if _, exists := filter[n.SubId]; !exists {
filter[n.SubId] = make(map[string]map[string]int)
}
if _, exists := filter[n.SubId][n.Channel]; !exists {
filter[n.SubId][n.Channel] = make(map[string]int)
}
idx, exists := filter[n.SubId][n.Channel][n.Target]
if !exists {
filter[n.SubId][n.Channel][n.Target] = i
} else {
if nl[idx].Status < n.Status {
nl[idx].Status = n.Status
}
nl[idx].Details = nl[idx].Details + ", " + n.Details
nl[i] = nil
}
}
// Fill usernames only once
usernameByTarget := fillUserNames(ctx, groupIdSet)
for _, n := range nl {
if n == nil {
continue
}
m := usernameByTarget[n.Target]
usernames := make([]string, 0, len(m))
for k := range m {
usernames = append(usernames, k)
}
if !checkChannel(n.Channel) {
// Hide sensitive information
n.Target = replaceLastEightChars(n.Target)
}
record := Record{
Target: n.Target,
Status: n.Status,
Detail: n.Details,
}
record.Username = strings.Join(usernames, ",")
if n.SubId > 0 {
// Handle SubRules
subRule, ok := subRuleMap[n.SubId]
if !ok {
newSubRule := &SubRule{
SubID: n.SubId,
}
newSubRule.Notifies = make(map[string][]Record)
newSubRule.Notifies[n.Channel] = []Record{record}
subRuleMap[n.SubId] = newSubRule
} else {
if _, exists := subRule.Notifies[n.Channel]; !exists {
subRule.Notifies[n.Channel] = []Record{record}
} else {
subRule.Notifies[n.Channel] = append(subRule.Notifies[n.Channel], record)
}
}
continue
}
if response.Notifies == nil {
response.Notifies = make(map[string][]Record)
}
if _, exists := response.Notifies[n.Channel]; !exists {
response.Notifies[n.Channel] = []Record{record}
} else {
response.Notifies[n.Channel] = append(response.Notifies[n.Channel], record)
}
}
for _, subRule := range subRuleMap {
response.SubRules = append(response.SubRules, *subRule)
}
return response
}
// check channel is one of the following: tx-sms, tx-voice, ali-sms, ali-voice, email, script
func checkChannel(channel string) bool {
switch channel {
case "tx-sms", "tx-voice", "ali-sms", "ali-voice", "email", "script":
return true
}
return false
}
func replaceLastEightChars(s string) string {
if len(s) <= 8 {
return strings.Repeat("*", len(s))
}
return s[:len(s)-8] + strings.Repeat("*", 8)
}
func fillUserNames(ctx *ctx.Context, groupIdSet map[int64]struct{}) map[string]map[string]struct{} {
userNameByTarget := make(map[string]map[string]struct{})
gids := make([]int64, 0, len(groupIdSet))
for gid := range groupIdSet {
gids = append(gids, gid)
}
users, err := models.UsersGetByGroupIds(ctx, gids)
if err != nil {
logger.Errorf("UsersGetByGroupIds failed, err: %v", err)
return userNameByTarget
}
for _, user := range users {
logger.Warningf("user: %s", user.Username)
for _, ch := range models.DefaultChannels {
target, exist := user.ExtractToken(ch)
if exist {
if _, ok := userNameByTarget[target]; !ok {
userNameByTarget[target] = make(map[string]struct{})
}
userNameByTarget[target][user.Username] = struct{}{}
}
}
}
return userNameByTarget
}

View File

@ -182,7 +182,7 @@ func (rt *Router) notifyConfigPut(c *gin.Context) {
smtp, errSmtp := SmtpValidate(text)
ginx.Dangerous(errSmtp)
go sender.RestartEmailSender(smtp)
go sender.RestartEmailSender(rt.Ctx, smtp)
}
ginx.NewRender(c).Message(nil)

View File

@ -0,0 +1,41 @@
package cron
import (
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/robfig/cron/v3"
"github.com/toolkits/pkg/logger"
)
func cleanNotifyRecord(ctx *ctx.Context, day int) {
lastWeek := time.Now().Unix() - 86400*int64(day)
err := models.DB(ctx).Model(&models.NotificaitonRecord{}).Where("created_at < ?", lastWeek).Delete(&models.NotificaitonRecord{}).Error
if err != nil {
logger.Errorf("Failed to clean notify record: %v", err)
}
}
// 每天凌晨1点执行清理任务
func CleanNotifyRecord(ctx *ctx.Context, day int) {
c := cron.New()
if day < 1 {
day = 7
}
// 使用cron表达式设置每天凌晨1点执行
_, err := c.AddFunc("0 1 * * *", func() {
cleanNotifyRecord(ctx, day)
})
if err != nil {
logger.Errorf("Failed to add clean notify record cron job: %v", err)
return
}
// 启动cron任务
c.Start()
}

View File

@ -366,6 +366,7 @@ CREATE TABLE `target` (
`host_ip` varchar(15) default '' COMMENT 'IPv4 string',
`agent_version` varchar(255) default '' COMMENT 'agent version',
`engine_name` varchar(255) default '' COMMENT 'engine_name',
`os` VARCHAR(31) DEFAULT '' COMMENT 'os type',
`update_at` bigint not null default 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`ident`),
@ -546,6 +547,18 @@ CREATE TABLE `builtin_payloads` (
KEY `idx_type` (`type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE notification_record (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`event_id` BIGINT NOT NULL,
`sub_id` BIGINT NOT NULL,
`channel` VARCHAR(255) NOT NULL,
`status` TINYINT NOT NULL DEFAULT 0,
`target` VARCHAR(1024) NOT NULL,
`details` VARCHAR(2048),
`created_at` BIGINT NOT NULL,
INDEX idx_evt (event_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `task_tpl`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,

View File

@ -86,4 +86,17 @@ ALTER TABLE alert_cur_event ADD COLUMN original_tags TEXT COMMENT 'labels key=va
ALTER TABLE alert_his_event ADD COLUMN original_tags TEXT COMMENT 'labels key=val,,k2=v2';
/* v7.1.0 */
ALTER TABLE target ADD COLUMN os VARCHAR(31) DEFAULT '' COMMENT 'os type';
ALTER TABLE target ADD COLUMN os VARCHAR(31) DEFAULT '' COMMENT 'os type';
/* v7.2.0 */
CREATE TABLE notification_record (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT,
`event_id` BIGINT NOT NULL,
`sub_id` BIGINT NOT NULL,
`channel` VARCHAR(255) NOT NULL,
`status` TINYINT NOT NULL DEFAULT 0,
`target` VARCHAR(1024) NOT NULL,
`details` VARCHAR(2048),
`created_at` BIGINT NOT NULL,
INDEX idx_evt (event_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@ -58,7 +58,7 @@ func MigrateTables(db *gorm.DB) error {
dts := []interface{}{&RecordingRule{}, &AlertRule{}, &AlertSubscribe{}, &AlertMute{},
&TaskRecord{}, &ChartShare{}, &Target{}, &Configs{}, &Datasource{}, &NotifyTpl{},
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.BuiltinComponent{}}
&models.MetricFilter{}, &models.BuiltinComponent{}, &models.NotificaitonRecord{}}
if !columnHasIndex(db, &AlertHisEvent{}, "original_tags") ||
!columnHasIndex(db, &AlertCurEvent{}, "original_tags") {

View File

@ -0,0 +1,91 @@
package models
import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
const (
NotiStatusSuccess = iota + 1
NotiStatusFailure
)
type NotificaitonRecord struct {
Id int64 `json:"id" gorm:"primaryKey;type:bigint;autoIncrement"`
EventId int64 `json:"event_id" gorm:"type:bigint;not null;index:idx_evt,priority:1;comment:event history id"`
SubId int64 `json:"sub_id" gorm:"type:bigint;comment:subscribed rule id"`
Channel string `json:"channel" gorm:"type:varchar(255);not null;comment:notification channel name"`
Status int `json:"status" gorm:"type:int;comment:notification status"` // 1-成功2-失败
Target string `json:"target" gorm:"type:varchar(1024);not null;comment:notification target"`
Details string `json:"details" gorm:"type:varchar(2048);default:'';comment:notification other info"`
CreatedAt int64 `json:"created_at" gorm:"type:bigint;not null;comment:create time"`
}
func NewNotificationRecord(event *AlertCurEvent, channel, target string) *NotificaitonRecord {
return &NotificaitonRecord{
EventId: event.Id,
SubId: event.SubRuleId,
Channel: channel,
Status: NotiStatusSuccess,
Target: target,
}
}
func (n *NotificaitonRecord) SetStatus(status int) {
if n == nil {
return
}
n.Status = status
}
func (n *NotificaitonRecord) SetDetails(details string) {
if n == nil {
return
}
n.Details = details
}
func (n *NotificaitonRecord) TableName() string {
return "notification_record"
}
func (n *NotificaitonRecord) Add(ctx *ctx.Context) error {
return Insert(ctx, n)
}
func (n *NotificaitonRecord) GetGroupIds(ctx *ctx.Context) (groupIds []int64) {
if n == nil {
return
}
if n.SubId > 0 {
if sub, err := AlertSubscribeGet(ctx, "id=?", n.SubId); err != nil {
logger.Errorf("AlertSubscribeGet failed, err: %v", err)
} else {
groupIds = str.IdsInt64(sub.UserGroupIds, " ")
}
return
}
if event, err := AlertHisEventGetById(ctx, n.EventId); err != nil {
logger.Errorf("AlertHisEventGetById failed, err: %v", err)
} else {
groupIds = str.IdsInt64(event.NotifyGroups, " ")
}
return
}
func NotificaitonRecordsGetByEventId(ctx *ctx.Context, eid int64) ([]*NotificaitonRecord, error) {
return NotificaitonRecordsGet(ctx, "event_id=?", eid)
}
func NotificaitonRecordsGet(ctx *ctx.Context, where string, args ...interface{}) ([]*NotificaitonRecord, error) {
var lst []*NotificaitonRecord
err := DB(ctx).Where(where, args...).Find(&lst).Error
if err != nil {
return nil, err
}
return lst, nil
}

View File

@ -304,6 +304,22 @@ func UserGetById(ctx *ctx.Context, id int64) (*User, error) {
return UserGet(ctx, "id=?", id)
}
func UsersGetByGroupIds(ctx *ctx.Context, groupIds []int64) ([]User, error) {
if len(groupIds) == 0 {
return nil, nil
}
userIds, err := GroupsMemberIds(ctx, groupIds)
if err != nil {
return nil, err
}
users, err := UserGetsByIds(ctx, userIds)
if err != nil {
return nil, err
}
return users, nil
}
func InitRoot(ctx *ctx.Context) {
user, err := UserGetByUsername(ctx, "root")
if err != nil {

View File

@ -38,6 +38,12 @@ func MemberIds(ctx *ctx.Context, groupId int64) ([]int64, error) {
return ids, err
}
func GroupsMemberIds(ctx *ctx.Context, groupIds []int64) ([]int64, error) {
var ids []int64
err := DB(ctx).Model(&UserGroupMember{}).Where("group_id in ?", groupIds).Pluck("user_id", &ids).Error
return ids, err
}
func UserGroupMemberCount(ctx *ctx.Context, where string, args ...interface{}) (int64, error) {
return Count(DB(ctx).Model(&UserGroupMember{}).Where(where, args...))
}