update target's timestamp in redis support batch

This commit is contained in:
Ulric Qin 2025-05-15 06:21:29 +08:00
parent 798f9e5536
commit ec6a4f134a
5 changed files with 101 additions and 15 deletions

View File

@ -86,7 +86,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
metas := metas.New(redis)
idents := idents.New(ctx, redis)
idents := idents.New(ctx, redis, config.Pushgw)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()

View File

@ -54,7 +54,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
idents := idents.New(ctx, redis)
idents := idents.New(ctx, redis, config.Pushgw)
metas := metas.New(redis)
writers := writer.NewWriters(config.Pushgw)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)

View File

@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/logger"
@ -17,16 +18,18 @@ import (
type Set struct {
sync.Mutex
items map[string]struct{}
redis storage.Redis
ctx *ctx.Context
items map[string]struct{}
redis storage.Redis
ctx *ctx.Context
configs pconf.Pushgw
}
func New(ctx *ctx.Context, redis storage.Redis) *Set {
func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
set := &Set{
items: make(map[string]struct{}),
redis: redis,
ctx: ctx,
items: make(map[string]struct{}),
redis: redis,
ctx: ctx,
configs: configs,
}
set.Init()
@ -95,9 +98,9 @@ type TargetUpdate struct {
}
func (s *Set) UpdateTargets(lst []string, now int64) error {
err := updateTargetsUpdateTs(lst, now, s.redis)
err := s.updateTargetsUpdateTs(lst, now, s.redis)
if err != nil {
logger.Errorf("failed to update targets:%v update_ts: %v", lst, err)
logger.Errorf("update_ts: failed to update targets: %v error: %v", lst, err)
}
if !s.ctx.IsCenter {
@ -141,7 +144,7 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
return nil
}
func updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
if redis == nil {
return fmt.Errorf("redis is nil")
}
@ -160,6 +163,68 @@ func updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
newMap[models.WrapIdentUpdateTime(ident)] = hostUpdateTime
}
err := storage.MSet(context.Background(), redis, newMap)
return s.updateTargetTsInRedis(newMap, redis)
}
func (s *Set) updateTargetTsInRedis(newMap map[string]interface{}, redis storage.Redis) (err error) {
if len(newMap) == 0 {
return nil
}
timeout := time.Duration(s.configs.UpdateTargetTimeoutMills) * time.Millisecond
batchSize := s.configs.UpdateTargetBatchSize
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if len(newMap) <= batchSize {
// 如果 newMap 的内容小于等于 batchSize则直接执行 MSet
return s.writeTargetTsInRedis(ctx, redis, newMap)
}
i := 0
batchMap := make(map[string]interface{}, batchSize)
for mapKey := range newMap {
batchMap[mapKey] = newMap[mapKey]
if (i+1)%batchSize == 0 {
if e := s.writeTargetTsInRedis(ctx, redis, batchMap); e != nil {
err = e
}
batchMap = make(map[string]interface{}, batchSize)
}
i++
}
if len(batchMap) > 0 {
if e := s.writeTargetTsInRedis(ctx, redis, batchMap); e != nil {
err = e
}
}
return err
}
func (s *Set) writeTargetTsInRedis(ctx context.Context, redis storage.Redis, content map[string]interface{}) error {
retryCount := s.configs.UpdateTargetRetryCount
retryInterval := time.Duration(s.configs.UpdateTargetRetryIntervalMills) * time.Millisecond
keys := make([]string, 0, len(content))
for k := range content {
keys = append(keys, k)
}
for i := 0; i < retryCount; i++ {
err := storage.MSet(ctx, redis, content)
logger.Debugf("update_ts: write target ts in redis, keys: %v, retryCount: %d, retryInterval: %v, error: %v", keys, retryCount, retryInterval, err)
if err == nil {
return nil
} else {
logger.Errorf("update_ts: failed to write target ts in redis: %v, keys: %v, retry %d/%d", err, keys, i+1, retryCount)
}
if i < retryCount-1 {
time.Sleep(retryInterval)
}
}
return fmt.Errorf("failed to write target ts in redis after %d retries, keys: %v", retryCount, keys)
}

View File

@ -14,6 +14,11 @@ import (
)
type Pushgw struct {
UpdateTargetRetryCount int
UpdateTargetRetryIntervalMills int64
UpdateTargetTimeoutMills int64
UpdateTargetBatchSize int
BusiGroupLabelKey string
IdentMetrics []string
IdentStatsThreshold int
@ -33,7 +38,7 @@ type WriterGlobalOpt struct {
QueuePopSize int
QueueNumber int // 每个 writer 固定数量的队列
QueueWaterMark float64 // 队列将满,开始丢弃数据的水位,比如 0.8
AllQueueMaxSize int64
AllQueueMaxSize int64 // 自动计算得到,无需配置
AllQueueMaxSizeInterval int
RetryCount int
RetryInterval int64
@ -103,6 +108,22 @@ type RelabelConfig struct {
}
func (p *Pushgw) PreCheck() {
if p.UpdateTargetRetryCount <= 0 {
p.UpdateTargetRetryCount = 3
}
if p.UpdateTargetRetryIntervalMills <= 0 {
p.UpdateTargetRetryIntervalMills = 500
}
if p.UpdateTargetTimeoutMills <= 0 {
p.UpdateTargetTimeoutMills = 3000
}
if p.UpdateTargetBatchSize <= 0 {
p.UpdateTargetBatchSize = 20
}
if p.BusiGroupLabelKey == "" {
p.BusiGroupLabelKey = "busigroup"
}

View File

@ -42,7 +42,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
}
idents := idents.New(ctx, redis)
idents := idents.New(ctx, redis, config.Pushgw)
metas := metas.New(redis)
stats := memsto.NewSyncStats()