feat: support parallel writer (#847)

* feat: support parallerl writer in sync & scan mode

* feat: parallel parse, filter, function

Signed-off-by: OxalisCu <2127298698@qq.com>

* feat: parallel entry allocation

Signed-off-by: OxalisCu <2127298698@qq.com>

* feat: seperate ops log and status monitoring

---------

Signed-off-by: OxalisCu <2127298698@qq.com>
This commit is contained in:
OxalisCu 2024-10-17 17:14:46 +08:00 committed by GitHub
parent 0f42759419
commit 66ca8e71f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 167 additions and 116 deletions

View File

@ -5,6 +5,7 @@ import (
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"sync/atomic"
"syscall" "syscall"
"time" "time"
@ -116,42 +117,95 @@ func main() {
default: default:
log.Panicf("no writer config entry found") log.Panicf("no writer config entry found")
} }
// create status // create status
status.Init(theReader, theWriter) if config.Opt.Advanced.StatusPort != 0 {
status.Init(theReader, theWriter)
}
// create log entry count
logEntryCount := status.EntryCount{
ReadCount: 0,
WriteCount: 0,
}
log.Infof("start syncing...") log.Infof("start syncing...")
ch := theReader.StartRead(ctx)
go waitShutdown(cancel) go waitShutdown(cancel)
chrs := theReader.StartRead(ctx)
theWriter.StartWrite(ctx)
readerDone := make(chan bool)
for _, chr := range chrs {
go func(ch chan *entry.Entry) {
for e := range ch {
// calc arguments
e.Parse()
// update reader status
if config.Opt.Advanced.StatusPort != 0 {
status.AddReadCount(e.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.ReadCount, 1)
// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
}
// run lua function
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)
// write
for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)
// update writer status
if config.Opt.Advanced.StatusPort != 0 {
status.AddWriteCount(theEntry.CmdName)
}
// update log entry count
atomic.AddUint64(&logEntryCount.WriteCount, 1)
}
}
readerDone <- true
}(chr)
}
// caluate ops and log to screen
go func() {
if config.Opt.Advanced.LogInterval <= 0 {
log.Infof("log interval is 0, will not log to screen")
return
}
ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
logEntryCount.UpdateOPS()
log.Infof("%s, %s", logEntryCount.String(), theReader.StatusString())
}
}()
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() defer ticker.Stop()
readerCnt := len(chrs)
Loop: Loop:
for { for {
select { select {
case e, ok := <-ch: case done := <-readerDone:
if !ok { if done {
// ch has been closed, exit the loop readerCnt--
}
if readerCnt == 0 {
break Loop break Loop
} }
// calc arguments
e.Parse()
status.AddReadCount(e.CmdName)
// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
}
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)
for _, theEntry := range entries {
theEntry.Parse()
theWriter.Write(theEntry)
status.AddWriteCount(theEntry.CmdName)
}
case <-ticker.C: case <-ticker.C:
pingEntry := entry.NewEntry() pingEntry := entry.NewEntry()
pingEntry.DbId = 0 pingEntry.DbId = 0

View File

@ -66,7 +66,7 @@ func NewAOFReader(opts *AOFReaderOptions) Reader {
return r return r
} }
func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry { func (r *aofReader) StartRead(ctx context.Context) []chan *entry.Entry {
//init entry //init entry
r.ch = make(chan *entry.Entry, 1024) r.ch = make(chan *entry.Entry, 1024)
@ -101,5 +101,5 @@ func (r *aofReader) StartRead(ctx context.Context) chan *entry.Entry {
}() }()
return r.ch return []chan *entry.Entry{r.ch}
} }

View File

@ -8,5 +8,5 @@ import (
type Reader interface { type Reader interface {
status.Statusable status.Statusable
StartRead(ctx context.Context) chan *entry.Entry StartRead(ctx context.Context) []chan *entry.Entry
} }

View File

@ -8,6 +8,7 @@ import (
"RedisShake/internal/log" "RedisShake/internal/log"
"RedisShake/internal/rdb" "RedisShake/internal/rdb"
"RedisShake/internal/utils" "RedisShake/internal/utils"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
) )
@ -41,7 +42,7 @@ func NewRDBReader(opts *RdbReaderOptions) Reader {
return r return r
} }
func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry { func (r *rdbReader) StartRead(ctx context.Context) []chan *entry.Entry {
log.Infof("[%s] start read", r.stat.Name) log.Infof("[%s] start read", r.stat.Name)
r.ch = make(chan *entry.Entry, 1024) r.ch = make(chan *entry.Entry, 1024)
updateFunc := func(offset int64) { updateFunc := func(offset int64) {
@ -58,7 +59,7 @@ func (r *rdbReader) StartRead(ctx context.Context) chan *entry.Entry {
close(r.ch) close(r.ch)
}() }()
return r.ch return []chan *entry.Entry{r.ch}
} }
func (r *rdbReader) Status() interface{} { func (r *rdbReader) Status() interface{} {

View File

@ -3,7 +3,6 @@ package reader
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"RedisShake/internal/entry" "RedisShake/internal/entry"
"RedisShake/internal/utils" "RedisShake/internal/utils"
@ -26,23 +25,13 @@ func NewScanClusterReader(ctx context.Context, opts *ScanReaderOptions) Reader {
return rd return rd
} }
func (rd *scanClusterReader) StartRead(ctx context.Context) chan *entry.Entry { func (rd *scanClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
ch := make(chan *entry.Entry, 1024) chs := make([]chan *entry.Entry, 0)
var wg sync.WaitGroup
for _, r := range rd.readers { for _, r := range rd.readers {
wg.Add(1) ch := r.StartRead(ctx)
go func(r Reader) { chs = append(chs, ch[0])
for e := range r.StartRead(ctx) {
ch <- e
}
wg.Done()
}(r)
} }
go func() { return chs
wg.Wait()
close(ch)
}()
return ch
} }
func (rd *scanClusterReader) Status() interface{} { func (rd *scanClusterReader) Status() interface{} {

View File

@ -85,7 +85,7 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade
return r return r
} }
func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry { func (r *scanStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx r.ctx = ctx
if r.opts.Scan { if r.opts.Scan {
go r.scan() go r.scan()
@ -95,7 +95,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
} }
go r.dump() go r.dump()
go r.restore() go r.restore()
return r.ch return []chan *entry.Entry{r.ch}
} }
func (r *scanStandaloneReader) subscript() { func (r *scanStandaloneReader) subscript() {

View File

@ -3,7 +3,6 @@ package reader
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"RedisShake/internal/entry" "RedisShake/internal/entry"
"RedisShake/internal/log" "RedisShake/internal/log"
@ -30,23 +29,13 @@ func NewSyncClusterReader(ctx context.Context, opts *SyncReaderOptions) Reader {
return rd return rd
} }
func (rd *syncClusterReader) StartRead(ctx context.Context) chan *entry.Entry { func (rd *syncClusterReader) StartRead(ctx context.Context) []chan *entry.Entry {
ch := make(chan *entry.Entry, 1024) chs := make([]chan *entry.Entry, 0)
var wg sync.WaitGroup
for _, r := range rd.readers { for _, r := range rd.readers {
wg.Add(1) ch := r.StartRead(ctx)
go func(r Reader) { chs = append(chs, ch[0])
defer wg.Done()
for e := range r.StartRead(ctx) {
ch <- e
}
}(r)
} }
go func() { return chs
wg.Wait()
close(ch)
}()
return ch
} }
func (rd *syncClusterReader) Status() interface{} { func (rd *syncClusterReader) Status() interface{} {

View File

@ -93,7 +93,7 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade
return r return r
} }
func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry { func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024) r.ch = make(chan *entry.Entry, 1024)
go func() { go func() {
@ -113,7 +113,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
close(r.ch) close(r.ch)
}() }()
return r.ch return []chan *entry.Entry{r.ch}
} }
func (r *syncStandaloneReader) sendReplconfListenPort() { func (r *syncStandaloneReader) sendReplconfListenPort() {

View File

@ -18,7 +18,7 @@ type EntryCount struct {
} }
// call this function every second // call this function every second
func (e *EntryCount) updateOPS() { func (e *EntryCount) UpdateOPS() {
nowTimestampSec := float64(time.Now().UnixNano()) / 1e9 nowTimestampSec := float64(time.Now().UnixNano()) / 1e9
if e.lastUpdateTimestampSec != 0 { if e.lastUpdateTimestampSec != 0 {
timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec timeIntervalSec := nowTimestampSec - e.lastUpdateTimestampSec

View File

@ -2,9 +2,6 @@ package status
import ( import (
"time" "time"
"RedisShake/internal/config"
"RedisShake/internal/log"
) )
type Statusable interface { type Statusable interface {
@ -32,9 +29,6 @@ var theWriter Statusable
func AddReadCount(cmd string) { func AddReadCount(cmd string) {
ch <- func() { ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd] cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok { if !ok {
cmdEntryCount = EntryCount{} cmdEntryCount = EntryCount{}
@ -48,9 +42,6 @@ func AddReadCount(cmd string) {
func AddWriteCount(cmd string) { func AddWriteCount(cmd string) {
ch <- func() { ch <- func() {
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd] cmdEntryCount, ok := stat.PerCmdEntriesCount[cmd]
if !ok { if !ok {
cmdEntryCount = EntryCount{} cmdEntryCount = EntryCount{}
@ -68,6 +59,11 @@ func Init(r Statusable, w Statusable) {
setStatusPort() setStatusPort()
stat.Time = time.Now().Format("2006-01-02 15:04:05") stat.Time = time.Now().Format("2006-01-02 15:04:05")
// init per cmd entries count
if stat.PerCmdEntriesCount == nil {
stat.PerCmdEntriesCount = make(map[string]EntryCount)
}
// for update reader/writer stat // for update reader/writer stat
go func() { go func() {
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
@ -81,29 +77,14 @@ func Init(r Statusable, w Statusable) {
stat.Consistent = lastConsistent && theReader.StatusConsistent() && theWriter.StatusConsistent() stat.Consistent = lastConsistent && theReader.StatusConsistent() && theWriter.StatusConsistent()
lastConsistent = stat.Consistent lastConsistent = stat.Consistent
// update OPS // update OPS
stat.TotalEntriesCount.updateOPS() stat.TotalEntriesCount.UpdateOPS()
for _, cmdEntryCount := range stat.PerCmdEntriesCount { for _, cmdEntryCount := range stat.PerCmdEntriesCount {
cmdEntryCount.updateOPS() cmdEntryCount.UpdateOPS()
} }
} }
} }
}() }()
// for log to screen
go func() {
if config.Opt.Advanced.LogInterval <= 0 {
log.Infof("log interval is 0, will not log to screen")
return
}
ticker := time.NewTicker(time.Duration(config.Opt.Advanced.LogInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
ch <- func() {
log.Infof("%s, %s", stat.TotalEntriesCount.String(), theReader.StatusString())
}
}
}()
// run all func in ch // run all func in ch
go func() { go func() {
for f := range ch { for f := range ch {

View File

@ -3,10 +3,12 @@ package writer
import ( import (
"RedisShake/internal/entry" "RedisShake/internal/entry"
"RedisShake/internal/status" "RedisShake/internal/status"
"context"
) )
type Writer interface { type Writer interface {
status.Statusable status.Statusable
Write(entry *entry.Entry) Write(entry *entry.Entry)
StartWrite(ctx context.Context) (ch chan *entry.Entry)
Close() Close()
} }

View File

@ -2,6 +2,7 @@ package writer
import ( import (
"context" "context"
"sync"
"RedisShake/internal/entry" "RedisShake/internal/entry"
"RedisShake/internal/log" "RedisShake/internal/log"
@ -14,18 +15,22 @@ type RedisClusterWriter struct {
addresses []string addresses []string
writers []Writer writers []Writer
router [KeySlots]Writer router [KeySlots]Writer
ch chan *entry.Entry
stat []interface{} chWg sync.WaitGroup
stat []interface{}
} }
func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer { func NewRedisClusterWriter(ctx context.Context, opts *RedisWriterOptions) Writer {
rw := new(RedisClusterWriter) rw := new(RedisClusterWriter)
rw.loadClusterNodes(ctx, opts) rw.loadClusterNodes(ctx, opts)
rw.ch = make(chan *entry.Entry, 1024)
log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses) log.Infof("redisClusterWriter connected to redis cluster successful. addresses=%v", rw.addresses)
return rw return rw
} }
func (r *RedisClusterWriter) Close() { func (r *RedisClusterWriter) Close() {
r.chWg.Wait()
close(r.ch)
for _, writer := range r.writers { for _, writer := range r.writers {
writer.Close() writer.Close()
} }
@ -54,6 +59,20 @@ func (r *RedisClusterWriter) loadClusterNodes(ctx context.Context, opts *RedisWr
} }
} }
func (r *RedisClusterWriter) StartWrite(ctx context.Context) chan *entry.Entry {
chs := make(map[string]chan *entry.Entry, len(r.writers))
for _, w := range r.writers {
stat := w.Status().(struct {
Name string `json:"name"`
UnansweredBytes int64 `json:"unanswered_bytes"`
UnansweredEntries int64 `json:"unanswered_entries"`
})
chs[stat.Name] = w.StartWrite(ctx)
}
return nil
}
func (r *RedisClusterWriter) Write(entry *entry.Entry) { func (r *RedisClusterWriter) Write(entry *entry.Entry) {
if len(entry.Slots) == 0 { if len(entry.Slots) == 0 {
for _, writer := range r.writers { for _, writer := range r.writers {
@ -61,7 +80,6 @@ func (r *RedisClusterWriter) Write(entry *entry.Entry) {
} }
return return
} }
lastSlot := -1 lastSlot := -1
for _, slot := range entry.Slots { for _, slot := range entry.Slots {
if lastSlot == -1 { if lastSlot == -1 {

View File

@ -34,8 +34,10 @@ type redisStandaloneWriter struct {
DbId int DbId int
chWaitReply chan *entry.Entry chWaitReply chan *entry.Entry
chWg sync.WaitGroup chWaitWg sync.WaitGroup
offReply bool offReply bool
ch chan *entry.Entry
chWg sync.WaitGroup
stat struct { stat struct {
Name string `json:"name"` Name string `json:"name"`
@ -49,13 +51,14 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
rw.address = opts.Address rw.address = opts.Address
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
rw.ch = make(chan *entry.Entry, 1024)
if opts.OffReply { if opts.OffReply {
log.Infof("turn off the reply of write") log.Infof("turn off the reply of write")
rw.offReply = true rw.offReply = true
rw.client.Send("CLIENT", "REPLY", "OFF") rw.client.Send("CLIENT", "REPLY", "OFF")
} else { } else {
rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit) rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit)
rw.chWg.Add(1) rw.chWaitWg.Add(1)
go rw.processReply() go rw.processReply()
} }
return rw return rw
@ -63,29 +66,43 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
func (w *redisStandaloneWriter) Close() { func (w *redisStandaloneWriter) Close() {
if !w.offReply { if !w.offReply {
close(w.chWaitReply) close(w.ch)
w.chWg.Wait() w.chWg.Wait()
close(w.chWaitReply)
w.chWaitWg.Wait()
} }
} }
func (w *redisStandaloneWriter) Write(e *entry.Entry) { func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry {
// switch db if we need w.chWg = sync.WaitGroup{}
if w.DbId != e.DbId { w.chWg.Add(1)
w.switchDbTo(e.DbId) go func() {
} for e := range w.ch {
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}
// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
}
w.chWg.Done()
}()
// send return w.ch
bytes := e.Serialize() }
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond) func (w *redisStandaloneWriter) Write(e *entry.Entry) {
} w.ch <- e
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytes(bytes)
} }
func (w *redisStandaloneWriter) switchDbTo(newDbId int) { func (w *redisStandaloneWriter) switchDbTo(newDbId int) {
@ -124,7 +141,7 @@ func (w *redisStandaloneWriter) processReply() {
atomic.AddInt64(&w.stat.UnansweredBytes, -e.SerializedSize) atomic.AddInt64(&w.stat.UnansweredBytes, -e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, -1) atomic.AddInt64(&w.stat.UnansweredEntries, -1)
} }
w.chWg.Done() w.chWaitWg.Done()
} }
func (w *redisStandaloneWriter) Status() interface{} { func (w *redisStandaloneWriter) Status() interface{} {