fix:(rdb_restore_command_behavior is rewrite. del key befor write key)
This commit is contained in:
parent
bbca45b9de
commit
f0909d478f
|
@ -1,6 +1,7 @@
|
|||
package rdb
|
||||
|
||||
import (
|
||||
"RedisShake/internal/config"
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
|
@ -201,6 +202,15 @@ func (ld *Loader) parseRDBEntry(ctx context.Context, rd *bufio.Reader) {
|
|||
return
|
||||
default:
|
||||
key := structure.ReadString(rd)
|
||||
// if rdb_restore_command_behavior is rewrite. first del key. otherwise list type key maybe write repeat
|
||||
if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" {
|
||||
argv := []string{"DEL", key}
|
||||
ld.ch <- &entry.Entry{
|
||||
DbId: ld.nowDBId,
|
||||
Argv: argv,
|
||||
}
|
||||
}
|
||||
|
||||
o := types.ParseObject(rd, typeByte, key)
|
||||
cmdC := o.Rewrite()
|
||||
for cmd := range cmdC {
|
||||
|
|
|
@ -24,7 +24,6 @@ func (o *HashObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
|
|||
func (o *HashObject) Rewrite() <-chan RedisCmd {
|
||||
go func() {
|
||||
defer close(o.cmdC)
|
||||
o.cmdC <- RedisCmd{"del", o.key}
|
||||
switch o.typeByte {
|
||||
case rdbTypeHash:
|
||||
o.readHash()
|
||||
|
|
|
@ -30,7 +30,6 @@ func (o *ListObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
|
|||
func (o *ListObject) Rewrite() <-chan RedisCmd {
|
||||
go func() {
|
||||
defer close(o.cmdC)
|
||||
o.cmdC <- RedisCmd{"del", o.key}
|
||||
switch o.typeByte {
|
||||
case rdbTypeList:
|
||||
o.readList()
|
||||
|
|
|
@ -140,7 +140,6 @@ func (o *BloomObject) Rewrite() <-chan RedisCmd {
|
|||
} else {
|
||||
h = getEncodedHeader(&o.sb, true, true)
|
||||
}
|
||||
cmdC <- RedisCmd{"del", o.key}
|
||||
cmd := RedisCmd{"BF.LOADCHUNK", o.key, "1", h}
|
||||
cmdC <- cmd
|
||||
curIter := uint64(1)
|
||||
|
|
|
@ -24,7 +24,6 @@ func (o *SetObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
|
|||
func (o *SetObject) Rewrite() <-chan RedisCmd {
|
||||
go func() {
|
||||
defer close(o.cmdC)
|
||||
o.cmdC <- RedisCmd{"del", o.key}
|
||||
switch o.typeByte {
|
||||
case rdbTypeSet:
|
||||
o.readSet()
|
||||
|
|
|
@ -57,7 +57,6 @@ func (o *StreamObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
|
|||
func (o *StreamObject) Rewrite() <-chan RedisCmd {
|
||||
go func() {
|
||||
defer close(o.cmdC)
|
||||
o.cmdC <- RedisCmd{"del", o.key}
|
||||
switch o.typeByte {
|
||||
case rdbTypeStreamListpacks:
|
||||
o.readStream()
|
||||
|
|
|
@ -26,7 +26,6 @@ func (o *TairHashObject) Rewrite() <-chan RedisCmd {
|
|||
dictSizeStr := structure.ReadModuleUnsigned(rd)
|
||||
key := structure.ReadModuleString(rd)
|
||||
size, _ := strconv.Atoi(dictSizeStr)
|
||||
cmdC <- RedisCmd{"del", key}
|
||||
for i := 0; i < size; i++ {
|
||||
skey := structure.ReadModuleString(rd)
|
||||
version := structure.ReadModuleUnsigned(rd)
|
||||
|
|
|
@ -25,7 +25,6 @@ func (o *TairZsetObject) Rewrite() <-chan RedisCmd {
|
|||
cmdC := o.cmdC
|
||||
go func() {
|
||||
defer close(cmdC)
|
||||
cmdC <- RedisCmd{"del", o.key}
|
||||
length, _ := strconv.Atoi(structure.ReadModuleUnsigned(rd))
|
||||
scoreNum, _ := strconv.Atoi(structure.ReadModuleUnsigned(rd))
|
||||
for i := 0; i < length; i++ {
|
||||
|
|
|
@ -25,7 +25,6 @@ func (o *ZsetObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
|
|||
func (o *ZsetObject) Rewrite() <-chan RedisCmd {
|
||||
go func() {
|
||||
defer close(o.cmdC)
|
||||
o.cmdC <- RedisCmd{"del", o.key}
|
||||
switch o.typeByte {
|
||||
case rdbTypeZSet:
|
||||
o.readZset()
|
||||
|
|
|
@ -252,6 +252,16 @@ func (r *scanStandaloneReader) restore() {
|
|||
}
|
||||
if uint64(len(dump)) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen {
|
||||
log.Warnf("key=[%s] dump len=[%d] too large, split it. This is not a good practice in Redis.", key, len(dump))
|
||||
// fix: if TargetRedisProtoMaxBulkLen is small. list type key maybe write repeat
|
||||
if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" {
|
||||
argv := []string{"DEL", key}
|
||||
|
||||
r.ch <- &entry.Entry{
|
||||
DbId: dbId,
|
||||
Argv: argv,
|
||||
}
|
||||
}
|
||||
|
||||
typeByte := dump[0]
|
||||
anotherReader := strings.NewReader(dump[1 : len(dump)-10])
|
||||
o := types.ParseObject(anotherReader, typeByte, key)
|
||||
|
|
Loading…
Reference in New Issue