feat: diskless sync (#810)

This commit is contained in:
jijijijijichild 2024-05-27 16:03:22 +08:00 committed by GitHub
parent df23dde27a
commit f9316b5dc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 80 additions and 17 deletions

View File

@ -2,6 +2,7 @@ package reader
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
@ -32,6 +33,7 @@ type SyncReaderOptions struct {
SyncRdb bool `mapstructure:"sync_rdb" default:"true"`
SyncAof bool `mapstructure:"sync_aof" default:"true"`
PreferReplica bool `mapstructure:"prefer_replica" default:"false"`
TryDiskless bool `mapstructure:"try_diskless" default:"false"`
}
type State string
@ -97,9 +99,9 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
go func() {
r.sendReplconfListenPort()
r.sendPSync()
go r.sendReplconfAck() // start sent replconf ack
rdbFilePath := r.receiveRDB()
startOffset := r.stat.AofReceivedOffset
go r.sendReplconfAck() // start sent replconf ack
go r.receiveAOF(r.rd)
if r.opts.SyncRdb {
r.sendRDB(rdbFilePath)
@ -125,6 +127,13 @@ func (r *syncStandaloneReader) sendReplconfListenPort() {
}
func (r *syncStandaloneReader) sendPSync() {
if r.opts.TryDiskless {
argv := []interface{}{"REPLCONF", "CAPA", "EOF"}
reply := r.client.DoWithStringReply(argv...)
if reply != "OK" {
log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply)
}
}
// send PSync
argv := []interface{}{"PSYNC", "?", "-1"}
if config.Opt.Advanced.AwsPSync != "" {
@ -147,6 +156,7 @@ func (r *syncStandaloneReader) sendPSync() {
if bytes[0] != '\n' {
break
}
r.rd.ReadByte()
}
reply := r.client.ReceiveString()
masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2])
@ -161,6 +171,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
r.stat.Status = kWaitBgsave
timeStart := time.Now()
// format: \n\n\n$<length>\r\n<rdb>
// if source support repl-diskless-sync: \n\n\n$EOF:<40 characters EOF marker>\r\nstream data<EOF marker>
for {
b, err := r.rd.ReadByte()
if err != nil {
@ -175,18 +186,11 @@ func (r *syncStandaloneReader) receiveRDB() string {
break
}
log.Debugf("[%s] source db bgsave finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
lengthStr, err := r.rd.ReadString('\n')
marker, err := r.rd.ReadString('\n')
if err != nil {
log.Panicf(err.Error())
}
lengthStr = strings.TrimSpace(lengthStr)
length, err := strconv.ParseInt(lengthStr, 10, 64)
if err != nil {
log.Panicf(err.Error())
}
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
marker = strings.TrimSpace(marker)
// create rdb file
rdbFilePath, err := filepath.Abs(r.stat.Name + "/dump.rdb")
@ -202,6 +206,70 @@ func (r *syncStandaloneReader) receiveRDB() string {
// receive rdb
r.stat.Status = kReceiveRdb
if strings.HasPrefix(marker, "EOF") {
log.Infof("[%s] source db supoort diskless sync capability.", r.stat.Name)
r.receiveRDBWithDiskless(marker, rdbFileHandle)
} else {
r.receiveRDBWithoutDiskless(marker, rdbFileHandle)
}
err = rdbFileHandle.Close()
if err != nil {
log.Panicf(err.Error())
}
log.Debugf("[%s] save RDB finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
return rdbFilePath
}
func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Writer) {
const bufSize int64 = 32 * 1024 * 1024 // 32MB
buf := make([]byte, bufSize)
marker = strings.Split(marker, ":")[1]
if len(marker) != 40 {
log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker)
}
log.Infof("meet EOF begin marker: %s", marker)
bMarker := []byte(marker)
goon := true
for goon {
n, err := r.rd.Read(buf[:bufSize])
if err != nil {
log.Panicf(err.Error())
}
buffer := buf[:n]
if bytes.Contains(buffer, bMarker) {
log.Infof("meet EOF end marker.")
// replace it
fi := bytes.Index(buffer, bMarker)
if len(buffer[fi+40:]) > 0 {
log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:]))
}
buffer = buffer[:fi]
goon = false
}
_, err = wt.Write(buffer)
if err != nil {
log.Panicf(err.Error())
}
r.stat.RdbFileSizeBytes += int64(n)
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes))
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
}
}
func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Writer) {
length, err := strconv.ParseInt(marker, 10, 64)
if err != nil {
log.Panicf(err.Error())
}
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
remainder := length
const bufSize int64 = 32 * 1024 * 1024 // 32MB
buf := make([]byte, bufSize)
@ -215,7 +283,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
log.Panicf(err.Error())
}
remainder -= int64(n)
_, err = rdbFileHandle.Write(buf[:n])
_, err = wt.Write(buf[:n])
if err != nil {
log.Panicf(err.Error())
}
@ -223,12 +291,6 @@ func (r *syncStandaloneReader) receiveRDB() string {
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
}
err = rdbFileHandle.Close()
if err != nil {
log.Panicf(err.Error())
}
log.Debugf("[%s] save RDB finished. timeUsed=[%.2f]s", r.stat.Name, time.Since(timeStart).Seconds())
return rdbFilePath
}
func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {

View File

@ -10,6 +10,7 @@ tls = false
sync_rdb = true # set to false if you don't want to sync rdb
sync_aof = true # set to false if you don't want to sync aof
prefer_replica = true # set to true if you want to sync from replica node
try_diskless = false # set to true if you want to sync by socket and source repl-diskless-sync=yes
#[scan_reader]
#cluster = false # set to true if source is a redis cluster