diff --git a/internal/client/redis.go b/internal/client/redis.go index d830ece..4516b3f 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -22,9 +22,9 @@ type Redis struct { writer *bufio.Writer protoReader *proto.Reader protoWriter *proto.Writer - timer *time.Timer - sendCount uint64 - mu sync.Mutex + timer *time.Timer + sendBytes uint64 + mu sync.Mutex } func NewSentinelMasterClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis { @@ -200,10 +200,10 @@ func (r *Redis) SendBytesBuff(buf []byte) { if err != nil { log.Panicf(err.Error()) } - r.flushBuff() + r.flushBuff(len(buf)) } -func (r *Redis) flushBuff() { +func (r *Redis) resetTimer() { if !r.timer.Stop() { select { case <-r.timer.C: @@ -211,10 +211,16 @@ func (r *Redis) flushBuff() { } } r.timer.Reset(time.Second) - if atomic.AddUint64(&r.sendCount, 1)%100 != 0 { +} + +func (r *Redis) flushBuff(l int) { + // if the data size is too small, no need to flush + if atomic.AddUint64(&r.sendBytes, uint64(l)) > 64*1024 { + r.flush() + r.resetTimer() return } - r.flush() + r.resetTimer() } func (r *Redis) flush() { @@ -222,7 +228,7 @@ func (r *Redis) flush() { if err != nil { log.Panicf(err.Error()) } - atomic.StoreUint64(&r.sendCount, 0) + atomic.StoreUint64(&r.sendBytes, 0) } func (r *Redis) autoFlush(ctx context.Context) { @@ -231,7 +237,7 @@ func (r *Redis) autoFlush(ctx context.Context) { case <-ctx.Done(): return case <-r.timer.C: - if atomic.LoadUint64(&r.sendCount) > 0 { + if atomic.LoadUint64(&r.sendBytes) > 0 { r.mu.Lock() err := r.writer.Flush() r.mu.Unlock() diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 9ab4c40..542f4dc 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -80,7 +80,8 @@ type syncStandaloneReader struct { } // version info - SupportPSYNC bool + SupportPSYNC bool + isDiskless bool } func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader { @@ -94,21 +95,20 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade r.stat.Dir = utils.GetAbsPath(r.stat.Name) utils.CreateEmptyDir(r.stat.Dir) - r.SupportPSYNC = r.supportPSYNC(); + r.SupportPSYNC = r.supportPSYNC() return r } - func (r *syncStandaloneReader) supportPSYNC() bool { reply := r.client.DoWithStringReply("info", "server") for _, line := range strings.Split(reply, "\n") { if strings.HasPrefix(line, "redis_version:") { version := strings.Split(line, ":")[1] - parts := strings.Split(version,"."); - if len(parts) > 2{ - v1,_ := strconv.Atoi(parts[0]); - v2,_ := strconv.Atoi(parts[1]); - if v1 * 1000 + v2 < 2008{ + parts := strings.Split(version, ".") + if len(parts) > 2 { + v1, _ := strconv.Atoi(parts[0]) + v2, _ := strconv.Atoi(parts[1]) + if v1*1000+v2 < 2008 { return false } } @@ -116,12 +116,12 @@ func (r *syncStandaloneReader) supportPSYNC() bool { } } - return true; + return true } func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry { - if !r.SupportPSYNC{ - return r.StartReadWithSync(ctx); + if !r.SupportPSYNC { + return r.StartReadWithSync(ctx) } r.ctx = ctx r.ch = make(chan *entry.Entry, 1024) @@ -206,6 +206,8 @@ func (r *syncStandaloneReader) sendPSync() { reply := r.client.DoWithStringReply(argv...) if reply != "OK" { log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply) + } else { + r.isDiskless = true } } r.checkBgsaveInProgress() @@ -518,6 +520,12 @@ func (r *syncStandaloneReader) StatusString() string { if r.stat.Status == kSyncAof { return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset) } + if r.stat.Status == kReceiveRdb { + if r.isDiskless { + return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman) + } + return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman) + } return string(r.stat.Status) }