opt bytes.Index and humanize.IBytes

This commit is contained in:
yinhang.sun 2024-12-25 11:56:34 +08:00
parent 43f6d16236
commit dd86fbe6e4
1 changed files with 43 additions and 41 deletions

View File

@ -1,7 +1,6 @@
package reader
import (
"RedisShake/internal/client/proto"
"bufio"
"bytes"
"context"
@ -14,6 +13,8 @@ import (
"strings"
"time"
"RedisShake/internal/client/proto"
"RedisShake/internal/client"
"RedisShake/internal/config"
"RedisShake/internal/entry"
@ -38,6 +39,8 @@ type SyncReaderOptions struct {
Sentinel client.SentinelOptions `mapstructure:"sentinel"`
}
const RDB_EOF_MARKER_LEN = 40
type State string
const (
@ -65,18 +68,14 @@ type syncStandaloneReader struct {
Status State `json:"status"`
// rdb info
RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
RdbSentHuman string `json:"rdb_sent_human"`
// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}
// version info
@ -319,7 +318,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
}
timeStart = time.Now()
log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, rdbFilePath)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666)
if err != nil {
log.Panicf(err.Error())
}
@ -345,39 +344,46 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write
buf := make([]byte, bufSize)
marker = strings.Split(marker, ":")[1]
if len(marker) != 40 {
if len(marker) != RDB_EOF_MARKER_LEN {
log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker)
}
log.Infof("meet EOF begin marker: %s", marker)
bMarker := []byte(marker)
goon := true
for goon {
n, err := r.client.Read(buf[:bufSize])
var lastBytes []byte
for {
if lastBytes != nil { // add previous tail
copy(buf, lastBytes)
}
nread, err := r.client.Read(buf[len(lastBytes):])
if err != nil {
log.Panicf(err.Error())
}
buffer := buf[:n]
if bytes.Contains(buffer, bMarker) {
bufLen := len(lastBytes) + nread
nwrite := 0
if bufLen >= RDB_EOF_MARKER_LEN && bytes.Equal(buf[bufLen-RDB_EOF_MARKER_LEN:bufLen], bMarker) {
log.Infof("meet EOF end marker.")
// replace it
fi := bytes.Index(buffer, bMarker)
if len(buffer[fi+40:]) > 0 {
log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:]))
}
buffer = buffer[:fi]
goon = false
}
_, err = wt.Write(buffer)
if err != nil {
// Write all buf without EOF marker and break
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
break
}
r.stat.RdbFileSizeBytes += int64(n)
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes))
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
if bufLen >= RDB_EOF_MARKER_LEN {
// left RDB_EOF_MARKER_LEN bytes to next round
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
lastBytes = buf[bufLen-RDB_EOF_MARKER_LEN : bufLen] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round
} else {
// save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN
lastBytes = buf[:bufLen]
}
r.stat.RdbFileSizeBytes += uint64(nwrite)
r.stat.RdbReceivedBytes += uint64(nwrite)
}
}
@ -387,8 +393,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
r.stat.RdbFileSizeBytes = uint64(length)
remainder := length
const bufSize int64 = 32 * 1024 * 1024 // 32MB
@ -408,8 +413,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbReceivedBytes += uint64(n)
}
}
@ -428,7 +432,6 @@ func (r *syncStandaloneReader) receiveAOF() {
log.Panicf(err.Error())
}
r.stat.AofReceivedBytes += int64(n)
r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes))
aofWriter.Write(buf[:n])
r.stat.AofReceivedOffset += int64(n)
}
@ -441,7 +444,6 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
r.stat.Status = kSyncRdb
updateFunc := func(offset int64) {
r.stat.RdbSentBytes = offset
r.stat.RdbSentHuman = humanize.IBytes(uint64(offset))
}
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch)
r.DbId = rdbLoader.ParseRDB(r.ctx)
@ -532,16 +534,16 @@ func (r *syncStandaloneReader) Status() interface{} {
func (r *syncStandaloneReader) StatusString() string {
if r.stat.Status == kSyncRdb {
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(uint64(r.stat.RdbSentBytes)), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
if r.stat.Status == kSyncAof {
return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset)
}
if r.stat.Status == kReceiveRdb {
if r.isDiskless {
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman)
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes))
}
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
return string(r.stat.Status)
}