feat: improve reader implementation for better performance and reliability (#899)
This commit is contained in:
parent
0a47f7b74a
commit
71820fd2ff
|
@ -261,13 +261,24 @@ func (r *Redis) ReceiveString() string {
|
|||
return reply.(string)
|
||||
}
|
||||
|
||||
func (r *Redis) BufioReader() *bufio.Reader {
|
||||
return r.reader
|
||||
func (r *Redis) Peek() (byte, error) {
|
||||
bytes, err := r.protoReader.Peek(1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return bytes[0], nil
|
||||
}
|
||||
|
||||
func (r *Redis) SetBufioReader(rd *bufio.Reader) {
|
||||
r.reader = rd
|
||||
r.protoReader = proto.NewReader(r.reader)
|
||||
func (r *Redis) Read(p []byte) (int, error) {
|
||||
return r.reader.Read(p)
|
||||
}
|
||||
|
||||
func (r *Redis) ReadByte() (byte, error) {
|
||||
return r.reader.ReadByte()
|
||||
}
|
||||
|
||||
func (r *Redis) ReadString(delim byte) (string, error) {
|
||||
return r.reader.ReadString(delim)
|
||||
}
|
||||
|
||||
func (r *Redis) Close() {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package reader
|
||||
|
||||
import (
|
||||
"RedisShake/internal/client/proto"
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
|
@ -54,8 +55,6 @@ type syncStandaloneReader struct {
|
|||
ch chan *entry.Entry
|
||||
DbId int
|
||||
|
||||
rd *bufio.Reader
|
||||
|
||||
stat struct {
|
||||
Name string `json:"name"`
|
||||
Address string `json:"address"`
|
||||
|
@ -80,26 +79,23 @@ type syncStandaloneReader struct {
|
|||
}
|
||||
|
||||
// version info
|
||||
SupportPSYNC bool
|
||||
isDiskless bool
|
||||
isDiskless bool
|
||||
}
|
||||
|
||||
func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader {
|
||||
r := new(syncStandaloneReader)
|
||||
r.opts = opts
|
||||
r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica)
|
||||
r.rd = r.client.BufioReader()
|
||||
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
|
||||
r.stat.Address = opts.Address
|
||||
r.stat.Status = kHandShake
|
||||
r.stat.Dir = utils.GetAbsPath(r.stat.Name)
|
||||
utils.CreateEmptyDir(r.stat.Dir)
|
||||
|
||||
r.SupportPSYNC = r.supportPSYNC()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *syncStandaloneReader) supportPSYNC() bool {
|
||||
func (r *syncStandaloneReader) supportPSync() bool {
|
||||
reply := r.client.DoWithStringReply("info", "server")
|
||||
for _, line := range strings.Split(reply, "\n") {
|
||||
if strings.HasPrefix(line, "redis_version:") {
|
||||
|
@ -120,9 +116,15 @@ func (r *syncStandaloneReader) supportPSYNC() bool {
|
|||
}
|
||||
|
||||
func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
|
||||
if !r.SupportPSYNC {
|
||||
if r.supportPSync() { // Redis version >= 2.8
|
||||
return r.StartReadWithPSync(ctx)
|
||||
} else { // Redis version < 2.8
|
||||
return r.StartReadWithSync(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// StartReadWithPSync is used in Redis version >= 2.8
|
||||
func (r *syncStandaloneReader) StartReadWithPSync(ctx context.Context) []chan *entry.Entry {
|
||||
r.ctx = ctx
|
||||
r.ch = make(chan *entry.Entry, 1024)
|
||||
go func() {
|
||||
|
@ -131,7 +133,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr
|
|||
rdbFilePath := r.receiveRDB()
|
||||
startOffset := r.stat.AofReceivedOffset
|
||||
go r.sendReplconfAck() // start sent replconf ack
|
||||
go r.receiveAOF(r.rd)
|
||||
go r.receiveAOF()
|
||||
if r.opts.SyncRdb {
|
||||
r.sendRDB(rdbFilePath)
|
||||
}
|
||||
|
@ -145,16 +147,15 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr
|
|||
return []chan *entry.Entry{r.ch}
|
||||
}
|
||||
|
||||
// StartReadWithSync is only used in Redis version < 2.8
|
||||
func (r *syncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *entry.Entry {
|
||||
r.ctx = ctx
|
||||
r.ch = make(chan *entry.Entry, 1024)
|
||||
go func() {
|
||||
//r.sendReplconfListenPort()
|
||||
r.sendSync()
|
||||
rdbFilePath := r.receiveRDB()
|
||||
startOffset := r.stat.AofReceivedOffset
|
||||
//go r.sendReplconfAck() // start sent replconf ack
|
||||
go r.receiveAOF(r.rd)
|
||||
go r.receiveAOF()
|
||||
if r.opts.SyncRdb {
|
||||
r.sendRDB(rdbFilePath)
|
||||
}
|
||||
|
@ -226,14 +227,17 @@ func (r *syncStandaloneReader) sendPSync() {
|
|||
runtime.Goexit() // stop goroutine
|
||||
default:
|
||||
}
|
||||
bytes, err := r.rd.Peek(1)
|
||||
peakByte, err := r.client.Peek()
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
if bytes[0] != '\n' {
|
||||
if peakByte != '\n' {
|
||||
break
|
||||
}
|
||||
r.rd.ReadByte()
|
||||
_, err = r.client.ReadByte()
|
||||
if err != nil {
|
||||
log.Panicf("[%s] pop byte failed. error=[%v]", r.stat.Name, err)
|
||||
}
|
||||
}
|
||||
reply := r.client.ReceiveString()
|
||||
masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2])
|
||||
|
@ -267,14 +271,17 @@ func (r *syncStandaloneReader) sendSync() {
|
|||
runtime.Goexit() // stop goroutine
|
||||
default:
|
||||
}
|
||||
bytes, err := r.rd.Peek(1)
|
||||
peekByte, err := r.client.Peek()
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
if bytes[0] != '\n' {
|
||||
if peekByte != '\n' {
|
||||
break
|
||||
}
|
||||
r.rd.ReadByte()
|
||||
_, err = r.client.ReadByte()
|
||||
if err != nil {
|
||||
log.Panicf("[%s] pop byte failed. error=[%v]", r.stat.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -285,7 +292,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
|
|||
// format: \n\n\n$<length>\r\n<rdb>
|
||||
// if source support repl-diskless-sync: \n\n\n$EOF:<40 characters EOF marker>\r\nstream data<EOF marker>
|
||||
for {
|
||||
b, err := r.rd.ReadByte()
|
||||
b, err := r.client.ReadByte()
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
|
@ -298,7 +305,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
|
|||
break
|
||||
}
|
||||
log.Debugf("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
|
||||
marker, err := r.rd.ReadString('\n')
|
||||
marker, err := r.client.ReadString('\n')
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
|
@ -344,7 +351,7 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write
|
|||
bMarker := []byte(marker)
|
||||
goon := true
|
||||
for goon {
|
||||
n, err := r.rd.Read(buf[:bufSize])
|
||||
n, err := r.client.Read(buf[:bufSize])
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
|
@ -390,7 +397,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
|
|||
if remainder < readOnce {
|
||||
readOnce = remainder
|
||||
}
|
||||
n, err := r.rd.Read(buf[:readOnce])
|
||||
n, err := r.client.Read(buf[:readOnce])
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
|
@ -405,7 +412,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
|
|||
}
|
||||
}
|
||||
|
||||
func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
|
||||
func (r *syncStandaloneReader) receiveAOF() {
|
||||
log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name)
|
||||
aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset)
|
||||
defer aofWriter.Close()
|
||||
|
@ -415,7 +422,7 @@ func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
|
|||
case <-r.ctx.Done():
|
||||
return
|
||||
default:
|
||||
n, err := rd.Read(buf)
|
||||
n, err := r.client.Read(buf)
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
|
@ -444,64 +451,73 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
|
|||
}
|
||||
|
||||
func (r *syncStandaloneReader) sendAOF(offset int64) {
|
||||
time.Sleep(1 * time.Second) // wait for receiveAOF create aof file
|
||||
aofReader := rotate.NewAOFReader(r.stat.Name, r.stat.Dir, offset)
|
||||
defer aofReader.Close()
|
||||
r.client.SetBufioReader(bufio.NewReader(aofReader))
|
||||
protoReader := proto.NewReader(bufio.NewReader(aofReader))
|
||||
for {
|
||||
if err := r.ctx.Err(); err != nil {
|
||||
log.Infof("[%s] sendAOF exit", r.stat.Name)
|
||||
return
|
||||
}
|
||||
|
||||
iArgv, err := protoReader.ReadReply()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
} else {
|
||||
log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
argv := client.ArrayString(iArgv, nil)
|
||||
r.stat.AofSentOffset = aofReader.Offset()
|
||||
// select
|
||||
if strings.EqualFold(argv[0], "select") {
|
||||
DbId, err := strconv.Atoi(argv[1])
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
r.DbId = DbId
|
||||
continue
|
||||
}
|
||||
// ping
|
||||
if strings.EqualFold(argv[0], "ping") {
|
||||
continue
|
||||
}
|
||||
// replconf @AWS
|
||||
if strings.EqualFold(argv[0], "replconf") {
|
||||
continue
|
||||
}
|
||||
// opinfo @Aliyun
|
||||
if strings.EqualFold(argv[0], "opinfo") {
|
||||
continue
|
||||
}
|
||||
// txn
|
||||
if strings.EqualFold(argv[0], "multi") || strings.EqualFold(argv[0], "exec") {
|
||||
continue
|
||||
}
|
||||
// sentinel
|
||||
if strings.EqualFold(argv[0], "publish") && strings.EqualFold(argv[1], "__sentinel__:hello") {
|
||||
continue
|
||||
}
|
||||
|
||||
e := entry.NewEntry()
|
||||
e.Argv = argv
|
||||
e.DbId = r.DbId
|
||||
r.ch <- e
|
||||
}
|
||||
}
|
||||
|
||||
// sendReplconfAck sends replconf ack to master to maintain heartbeat between redis-shake and source redis.
|
||||
func (r *syncStandaloneReader) sendReplconfAck() {
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
default:
|
||||
argv := client.ArrayString(r.client.Receive())
|
||||
r.stat.AofSentOffset = aofReader.Offset()
|
||||
// select
|
||||
if strings.EqualFold(argv[0], "select") {
|
||||
DbId, err := strconv.Atoi(argv[1])
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
r.DbId = DbId
|
||||
continue
|
||||
}
|
||||
// ping
|
||||
if strings.EqualFold(argv[0], "ping") {
|
||||
continue
|
||||
}
|
||||
// replconf @AWS
|
||||
if strings.EqualFold(argv[0], "replconf") {
|
||||
continue
|
||||
}
|
||||
// opinfo @Aliyun
|
||||
if strings.EqualFold(argv[0], "opinfo") {
|
||||
continue
|
||||
}
|
||||
// txn
|
||||
if strings.EqualFold(argv[0], "multi") || strings.EqualFold(argv[0], "exec") {
|
||||
continue
|
||||
}
|
||||
// sentinel
|
||||
if strings.EqualFold(argv[0], "publish") && strings.EqualFold(argv[1], "__sentinel__:hello") {
|
||||
continue
|
||||
}
|
||||
|
||||
e := entry.NewEntry()
|
||||
e.Argv = argv
|
||||
e.DbId = r.DbId
|
||||
r.ch <- e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendReplconfAck send replconf ack to master to keep heartbeat between redis-shake and source redis.
|
||||
func (r *syncStandaloneReader) sendReplconfAck() {
|
||||
ticker := time.NewTicker(time.Millisecond * 100)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
default:
|
||||
case <-ticker.C:
|
||||
if r.stat.AofReceivedOffset != 0 {
|
||||
r.client.Send("replconf", "ack", strconv.FormatInt(r.stat.AofReceivedOffset, 10))
|
||||
}
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
package rotate
|
||||
|
||||
import (
|
||||
"RedisShake/internal/log"
|
||||
"RedisShake/internal/utils"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"RedisShake/internal/log"
|
||||
"RedisShake/internal/utils"
|
||||
)
|
||||
|
||||
type AOFReader struct {
|
||||
|
@ -39,30 +37,38 @@ func (r *AOFReader) openFile(offset int64) {
|
|||
log.Debugf("[%s] open file for read. filename=[%s]", r.name, r.filepath)
|
||||
}
|
||||
|
||||
func (r *AOFReader) readNextFile(offset int64) {
|
||||
func (r *AOFReader) readNextFile(offset int64) bool {
|
||||
filepath := fmt.Sprintf("%s/%d.aof", r.dir, r.offset)
|
||||
if utils.IsExist(filepath) {
|
||||
r.Close()
|
||||
err := os.Remove(r.filepath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
r.openFile(offset)
|
||||
if r.filepath == filepath {
|
||||
return false
|
||||
}
|
||||
if !utils.IsExist(filepath) {
|
||||
return false
|
||||
}
|
||||
r.Close()
|
||||
err := os.Remove(r.filepath)
|
||||
if err != nil {
|
||||
log.Panicf("[%s] remove file failed. filename=[%s], err=[%v]", r.name, r.filepath, err)
|
||||
return false
|
||||
}
|
||||
r.openFile(offset)
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *AOFReader) Read(buf []byte) (n int, err error) {
|
||||
n, err = r.file.Read(buf)
|
||||
for err == io.EOF {
|
||||
if r.filepath != fmt.Sprintf("%s/%d.aof", r.dir, r.offset) {
|
||||
r.readNextFile(r.offset)
|
||||
if err == io.EOF {
|
||||
if !r.readNextFile(r.offset) {
|
||||
return n, io.EOF
|
||||
}
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
_, err = r.file.Seek(0, 1)
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
}
|
||||
n, err = r.file.Read(buf)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
log.Panicf(err.Error())
|
||||
|
|
Loading…
Reference in New Issue