feat: add support for filter config (#860)

This commit is contained in:
suxb201 2024-09-10 18:43:09 +08:00 committed by GitHub
parent 6093348dcc
commit 6405b4c077
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 563 additions and 394 deletions

View File

@ -8,6 +8,7 @@
- [English Documentation](https://tair-opensource.github.io/RedisShake/en/)
![](./docs/demo.gif)
## Overview
RedisShake is a tool designed for processing and migrating Redis data. It offers the following features:
@ -50,12 +51,14 @@ sh build.sh
### Usage
Assume you have two Redis instances:
To migrate data from one Redis instance to another while skipping keys with specific prefixes, follow these steps:
1. Ensure you have two Redis instances running:
* Instance A: 127.0.0.1:6379
* Instance B: 127.0.0.1:6380
Create a new configuration file `shake.toml`:
2. Create a new configuration file `shake.toml`, and set the `block_key_prefix` parameter to skip keys with specific prefixes:
```toml
[sync_reader]
@ -63,9 +66,12 @@ address = "127.0.0.1:6379"
[redis_writer]
address = "127.0.0.1:6380"
[filter]
block_key_prefix = ["temp:", "cache:"]
```
To start RedisShake, run the following command:
3. Start RedisShake by running the following command:
```shell
./redis-shake shake.toml
@ -84,6 +90,7 @@ would like to change. We are particularly interested in:
1. Adding support for more modules
2. Enhancing support for Readers and Writers
3. Sharing your Lua scripts and best practices
4. Improving the documentation
## History

View File

@ -10,7 +10,7 @@ import (
"RedisShake/internal/config"
"RedisShake/internal/entry"
"RedisShake/internal/function"
"RedisShake/internal/filter"
"RedisShake/internal/log"
"RedisShake/internal/reader"
"RedisShake/internal/status"
@ -27,7 +27,7 @@ func main() {
utils.ChdirAndAcquireFileLock()
utils.SetNcpu()
utils.SetPprofPort()
luaRuntime := function.New(config.Opt.Function)
luaRuntime := filter.NewFunctionFilter(config.Opt.Filter.Function)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -139,6 +139,10 @@ Loop:
status.AddReadCount(e.CmdName)
// filter
if !filter.Filter(e) {
log.Debugf("skip command: %v", e)
continue
}
log.Debugf("function before: %v", e)
entries := luaRuntime.RunFunction(e)
log.Debugf("function after: %v", entries)

View File

@ -48,16 +48,18 @@ function sidebar(): DefaultTheme.SidebarItem[] {
]
},
{
text: 'Function',
text: 'Filter and Processing',
items: [
{ text: 'What is function', link: '/en/function/introduction' },
{ text: 'Best Practices', link: '/en/function/best_practices' }
{ text: 'Built-in Filter Rules', link: '/en/filter/filter' },
{ text: 'What is function', link: '/en/filter/function' },
]
},
{
text: 'Others',
items: [
{ text: 'Redis Modules', link: '/en/others/modules' },
{ text: 'How to Verify Data Consistency', link: '/en/others/consistent' },
{ text: 'Cross-version Migration', link: '/en/others/version' },
]
},
]

View File

@ -46,10 +46,10 @@ function sidebar(): DefaultTheme.SidebarItem[] {
]
},
{
text: 'Function',
text: '过滤与加工',
items: [
{ text: '什么是 function', link: '/zh/function/introduction' },
{ text: '最佳实践', link: '/zh/function/best_practices' }
{ text: '内置过滤规则', link: '/zh/filter/filter' },
{ text: '什么是 function', link: '/zh/filter/function' },
]
},
{

View File

@ -0,0 +1,40 @@
---
outline: deep
---
# Built-in Filter Rules
RedisShake provides various built-in filter rules that users can choose from according to their needs.
## Filtering Keys
RedisShake supports filtering by key name prefixes and suffixes. You can set the following options in the configuration file, for example:
```toml
allow_key_prefix = ["user:", "product:"]
allow_key_suffix = [":active", ":valid"]
block_key_prefix = ["temp:", "cache:"]
block_key_suffix = [":tmp", ":old"]
```
If these options are not set, all keys are allowed by default.
## Filtering Databases
You can specify allowed or blocked database numbers, for example:
```toml
allow_db = [0, 1, 2]
block_db = [3, 4, 5]
```
If these options are not set, all databases are allowed by default.
## Filtering Commands
RedisShake allows you to filter specific Redis commands, for example:
```toml
allow_command = ["GET", "SET"]
block_command = ["DEL", "FLUSHDB"]
```
## Filtering Command Groups
You can also filter by command groups. Available command groups include:
SERVER, STRING, CLUSTER, CONNECTION, BITMAP, LIST, SORTED_SET, GENERIC, TRANSACTIONS, SCRIPTING, TAIRHASH, TAIRSTRING, TAIRZSET, GEO, HASH, HYPERLOGLOG, PUBSUB, SET, SENTINEL, STREAM
For example:
```toml
allow_command_group = ["STRING", "HASH"]
block_command_group = ["SCRIPTING", "PUBSUB"]
```

View File

@ -1,99 +0,0 @@
---
outline: deep
---
# 最佳实践
## 过滤
### 过滤 Key
```lua
local prefix = "user:"
local prefix_len = #prefix
if string.sub(KEYS[1], 1, prefix_len) ~= prefix then
return
end
shake.call(DB, ARGV)
```
效果是只将 key 以 `user:` 开头的源数据写入到目标端。没有考虑 `mset` 等多 key 命令的情况。
### 过滤 DB
```lua
shake.log(DB)
if DB == 0
then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端 `db` 0 的数据,将其他 `db` 的数据写入到目标端。
### 过滤某类数据结构
可以通过 `GROUP` 变量来判断数据结构类型,支持的数据结构类型有:`STRING`、`LIST`、`SET`、`ZSET`、`HASH`、`SCRIPTING` 等。
#### 过滤 Hash 类型数据
```lua
if GROUP == "HASH" then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端的 `hash` 类型数据,将其他数据写入到目标端。
#### 过滤 [LUA 脚本](https://redis.io/docs/interact/programmability/eval-intro/)
```lua
if GROUP == "SCRIPTING" then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端的 `lua` 脚本,将其他数据写入到目标端。常见于主从同步至集群时,存在集群不支持的 LUA 脚本。
## 修改
### 修改 Key 的前缀
```lua
local prefix_old = "prefix_old_"
local prefix_new = "prefix_new_"
shake.log("old=" .. table.concat(ARGV, " "))
for i, index in ipairs(KEY_INDEXES) do
local key = ARGV[index]
if string.sub(key, 1, #prefix_old) == prefix_old then
ARGV[index] = prefix_new .. string.sub(key, #prefix_old + 1)
end
end
shake.log("new=" .. table.concat(ARGV, " "))
shake.call(DB, ARGV)
```
效果是将源端的 key `prefix_old_key` 写入到目标端的 key `prefix_new_key`
### 交换 DB
```lua
local db1 = 1
local db2 = 2
if DB == db1 then
DB = db2
elseif DB == db2 then
DB = db1
end
shake.call(DB, ARGV)
```
效果是将源端的 `db 1` 写入到目标端的 `db 2`,将源端的 `db 2` 写入到目标端的 `db 1`, 其他 `db` 不变。

View File

@ -4,29 +4,24 @@ outline: deep
# Configuration File
RedisShake uses the [TOML](https://toml.io/cn/) language for writing, and all configuration parameters are explained in all.toml.
RedisShake uses the [TOML](https://toml.io/) language for writing, and all configuration parameters are explained in all.toml.
The configuration file is composed as follows:
```toml
function = "..."
[xxx_reader]
...
[xxx_writer]
...
[filter]
...
[advanced]
...
```
Under normal usage, you only need to write the `xxx_reader` and `xxx_writer` parts. The `function` and `advanced` parts are for advanced usage, and users can configure them according to their needs.
## function Configuration
Refer to [What is function](../function/introduction.md).
## reader Configuration
RedisShake provides different Readers to interface with different sources, see the Reader section for configuration details:
@ -34,6 +29,7 @@ RedisShake provides different Readers to interface with different sources, see t
* [Sync Reader](../reader/sync_reader.md)
* [Scan Reader](../reader/scan_reader.md)
* [RDB Reader](../reader/rdb_reader.md)
* [AOF Reader](../reader/aof_reader.md)
## writer Configuration
@ -41,42 +37,10 @@ RedisShake provides different Writers to interface with different targets, see t
* [Redis Writer](../writer/redis_writer.md)
## filter Configuration
You can set filter rules through the configuration file. Refer to [Filter and Processing](../filter/filter.md) and [function](../filter/function.md).
## advanced Configuration
```toml
[advanced]
dir = "data"
ncpu = 3 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores
pprof_port = 0 # pprof port, 0 means disable
status_port = 0 # status port, 0 means disable
# log
log_file = "shake.log"
log_level = "info" # debug, info or warn
log_interval = 5 # in seconds
# redis-shake gets key and value from rdb file, and uses RESTORE command to
# create the key in target redis. Redis RESTORE will return a "Target key name
# is busy" error when key already exists. You can use this configuration item
# to change the default behavior of restore:
# panic: redis-shake will stop when meet "Target key name is busy" error.
# rewrite: redis-shake will replace the key with new value.
# ignore: redis-shake will skip restore the key when meet "Target key name is busy" error.
rdb_restore_command_behavior = "rewrite" # panic, rewrite or skip
# redis-shake uses pipeline to improve sending performance.
# This item limits the maximum number of commands in a pipeline.
pipeline_count_limit = 1024
# Client query buffers accumulate new commands. They are limited to a fixed
# amount by default. This amount is normally 1gb.
target_redis_client_max_querybuf_len = 1024_000_000
# In the Redis protocol, bulk requests, that are, elements representing single
# strings, are normally limited to 512 mb.
target_redis_proto_max_bulk_len = 512_000_000
# If the source is Elasticache or MemoryDB, you can set this item.
aws_psync = ""
```
Refer to the [shake.toml configuration file](https://github.com/tair-opensource/RedisShake/blob/v4/shake.toml).

Binary file not shown.

Before

Width:  |  Height:  |  Size: 182 KiB

View File

@ -8,11 +8,11 @@ outline: deep
Currently, RedisShake has three migration modes: `PSync`, `RDB`, and `SCAN`, corresponding to [`sync_reader`](../reader/sync_reader.md), [`rdb_reader`](../reader/rdb_reader.md), and [`scan_reader`](../reader/scan_reader.md) respectively.
* For scenarios of recovering data from backups, you can use `rdb_reader`.
* For data migration scenarios, `sync_reader` should be the preferred choice. Some cloud vendors do not provide support for the PSync protocol, in which case `scan_reader` can be chosen.
* For long-term data synchronization scenarios, RedisShake currently cannot handle them because the PSync protocol is not reliable. When the replication connection is disconnected, RedisShake will not be able to reconnect to the source database. If the demand for availability is not high, you can use `scan_reader`. If the write volume is not large and there are no large keys, `scan_reader` can also be considered.
* For scenarios of recovering data from backups, use `rdb_reader`.
* For data migration scenarios, prefer `sync_reader`. Some cloud vendors don't provide PSync protocol support, in which case `scan_reader` can be chosen.
* For long-term data synchronization scenarios, RedisShake currently can't handle them because the PSync protocol is not reliable. When the replication connection is disconnected, RedisShake will not be able to reconnect to the source database. If availability requirements are not high, you can use `scan_reader`. If the write volume is not large and there are no large keys, `scan_reader` can also be considered.
Different modes have their pros and cons, and you need to check each Reader section for more information.
Different modes have their pros and cons. Refer to each Reader section for more information.
## Redis Cluster Architecture
@ -22,32 +22,36 @@ When the source Redis is deployed in a cluster architecture, you can use `sync_r
When the source Redis is deployed in a sentinel architecture and RedisShake uses `sync_reader` to connect to the master, it will be treated as a slave by the master and may be elected as the new master by the sentinel.
To avoid this, you should choose a replica as the source.
To avoid this situation, you should choose a replica as the source.
## Cloud Redis Service
## Cloud Redis Services
Mainstream cloud vendors all provide Redis services, but there are several reasons that make using RedisShake on these services more complex:
1. Engine restrictions. Some self-developed Redis-like databases do not support the PSync protocol.
2. Architecture restrictions. Many cloud vendors support proxy mode, i.e., adding a Proxy component between the user and the Redis service. Because of the existence of the Proxy component, the PSync protocol cannot be supported.
3. Security restrictions. In native Redis, the PSync protocol will basically trigger fork(2), leading to memory bloat and increased user request latency. In worse cases, it may even lead to out of memory. Although there are solutions to alleviate these issues, not all cloud vendors have invested in this area.
4. Business strategies. Many users use RedisShake to migrate off the cloud or switch clouds, so some cloud vendors do not want users to use RedisShake, thus blocking the PSync protocol.
1. Engine limitations. Some self-developed Redis-like databases don't support the PSync protocol.
2. Architecture limitations. Many cloud vendors support proxy mode, i.e., adding a Proxy component between the user and the Redis service. Due to the Proxy component, the PSync protocol can't be supported.
3. Security limitations. In native Redis, the PSync protocol typically triggers fork(2), leading to memory bloat and increased user request latency. In worse cases, it may even cause out of memory. Although there are solutions to mitigate these issues, not all cloud vendors have invested in this area.
4. Business strategies. Many users use RedisShake to migrate off the cloud or switch clouds, so some cloud vendors don't want users to use RedisShake, thus blocking the PSync protocol.
The following will introduce some RedisShake usage schemes in special scenarios based on practical experience.
The following introduces some RedisShake usage schemes in special scenarios based on practical experience.
### Alibaba Cloud Redis & Tair
Alibaba Cloud Redis and Tair both support the PSync protocol, and `sync_reader` is recommended. Users need to create an account with replication permissions. RedisShake can use this account for data synchronization. The specific creation steps can be found in [Create and manage database accounts](https://help.aliyun.com/zh/redis/user-guide/create-and-manage-database-accounts).
Alibaba Cloud Redis and Tair both support the PSync protocol, and `sync_reader` is recommended. Users need to create an account with replication permissions (able to execute PSync commands). RedisShake uses this account for data synchronization. See [Create and manage accounts](https://help.aliyun.com/zh/redis/user-guide/create-and-manage-database-accounts) for specific creation steps.
Exceptions:
1. Version 2.8 Redis instances do not support the creation of accounts with replication permissions. You need to [upgrade to a major version](https://help.aliyun.com/zh/redis/user-guide/upgrade-the-major-version-1).
2. Cluster architecture Redis and Tair instances do not support the PSync protocol under [proxy mode](https://help.aliyun.com/zh/redis/product-overview/cluster-master-replica-instances#section-h69-izd-531).
3. Read-write separation architecture does not support the PSync protocol.
1. Version 2.8 Redis instances don't support creating accounts with replication permissions. You need to [upgrade to a major version](https://help.aliyun.com/zh/redis/user-guide/upgrade-the-major-version-1).
2. Cluster architecture Redis and Tair instances don't support the PSync protocol under [proxy mode](https://help.aliyun.com/zh/redis/product-overview/cluster-master-replica-instances#section-h69-izd-531).
3. Read-write separation architecture doesn't support the PSync protocol.
In scenarios where the PSync protocol is not supported, `scan_reader` can be used. It should be noted that `scan_reader` will put significant pressure on the source database.
In scenarios where the PSync protocol is not supported, `scan_reader` can be used. Note that `scan_reader` will put significant pressure on the source database.
### AWS ElastiCache and MemoryDB
### AWS ElastiCache
`sync_reader` is preferred. AWS ElastiCache and MemoryDB do not enable the PSync protocol by default, but you can request to enable the PSync protocol by submitting a ticket. AWS will provide a renamed PSync command in the ticket, such as `xhma21yfkssync` and `nmfu2bl5osync`. This command has the same effect as the `psync` command, just with a different name.
Users only need to modify the `aws_psync` configuration item in the RedisShake configuration file. For a single instance, write one pair of `ip:port@cmd`. For cluster instances, write all `ip:port@cmd`, separated by commas.
Prefer `sync_reader`. AWS ElastiCache doesn't enable the PSync protocol by default, but you can request to enable it by submitting a ticket. AWS will provide renamed PSync commands in the ticket, such as `xhma21yfkssync` and `nmfu2bl5osync`. These commands have the same effect as the `psync` command, just with different names.
Users only need to modify the `aws_psync` configuration item in the RedisShake configuration file. For a single instance, write one pair of `ip:port@cmd`. For cluster instances, write all `ip:port@cmd` pairs, separated by commas.
When it is inconvenient to submit a ticket, you can use `scan_reader`. It should be noted that `scan_reader` will put significant pressure on the source database.
When it's inconvenient to submit a ticket, you can use `scan_reader`. Note that `scan_reader` will put significant pressure on the source database.
### AWS MemoryDB
AWS MemoryDB doesn't provide PSync permissions. You can use `scan_reader` and `rdb_reader`.

View File

@ -0,0 +1,20 @@
# How to Verify Data Consistency
During the incremental synchronization phase, it's often necessary to determine if the data between the source and target is consistent. Here are two empirical methods:
1. By observing logs or monitoring, when RedisShake's synchronization traffic reaches 0, it's considered that synchronization has ended and both ends are consistent.
2. Compare the number of keys between the source and destination. If they are equal, it's considered consistent.
However, if keys have expiration times, it can lead to inconsistencies in key counts. Reasons include:
1. Due to limitations in the expiration algorithm, some keys in the source may have expired but not actually been deleted. These keys might be deleted in the destination, causing the destination to have fewer keys than the source.
2. The source and destination run independently, each with its own expiration algorithm. The randomness in these algorithms can cause inconsistencies in which keys are deleted, leading to different key counts.
In practice, keys with expiration times are generally considered acceptable to be inconsistent and won't affect business operations. Therefore, you can verify consistency by checking only the number of keys without expiration times. As shown below, you should calculate and compare the value of `$keys-$expires` for both the source and destination. [[795]](https://github.com/tair-opensource/RedisShake/issues/795) [[791]](https://github.com/tair-opensource/RedisShake/issues/791)
```
127.0.0.1:6379> info keyspace
# Keyspace
db0:keys=4463175,expires=2,avg_ttl=333486
```

View File

@ -0,0 +1,14 @@
# Cross-version Migration
Common Redis-like databases are generally backward compatible, meaning there are no compatibility issues when migrating from a lower version instance to a higher version.
However, when migrating from a higher version instance to a lower version, compatibility issues may arise. For example, importing data from Redis 7.0 into Redis 4.0.
It's recommended to avoid such scenarios as much as possible due to various incompatibility issues [[794]](https://github.com/tair-opensource/RedisShake/issues/794) [[699]](https://github.com/tair-opensource/RedisShake/issues/699):
1. Binary data encoding incompatibility
2. Unsupported commands, seen in the incremental phase of SYNC
If migrating to a lower version is unavoidable, you can resolve it through the following methods:
1. For binary data encoding incompatibility, consider modifying the `target_redis_proto_max_bulk_len` parameter by setting it to 0.
The original purpose of `target_redis_proto_max_bulk_len` is to handle extremely large keys. RedisShake will convert binary data larger than `target_redis_proto_max_bulk_len` into RESP commands during synchronization. For example, a list structure with many elements will be converted into multiple `RPUSH` commands. In version downgrade scenarios, you can set it to 0, prompting RedisShake to convert all binary data into RESP commands, thus avoiding binary data encoding incompatibility issues.
2. For unsupported command issues, it's recommended to directly filter out the unsupported commands.

View File

@ -1,14 +1,15 @@
# rdb_reader
# RDB Reader
## 介绍
## Introduction
可以使用 `rdb_reader` 来从 RDB 文件中读取数据,然后写入目标端。常见于从备份文件中恢复数据。
The `rdb_reader` can be used to read data from an RDB file and then write it to the target destination. This is commonly used for recovering data from backup files.
## 配置
## Configuration
```toml
[rdb_reader]
filepath = "/tmp/dump.rdb"
```
* 应传入绝对路径。
* An absolute path should be passed in.

View File

@ -1,43 +1,63 @@
---
outline: deep
---
# Scan Reader
## 介绍
::: tip
本方案为次选方案,当可以使用 [`sync_reader`](sync_reader.md) 时,请优选 [`sync_reader`](sync_reader.md)。
The performance and data consistency of `scan_reader` are not as good as [`sync_reader`](sync_reader.md). You should choose `sync_reader` whenever possible.
:::
`scan_reader` 通过 `SCAN` 命令遍历源端数据库中的所有 Key并使用 `DUMP``RESTORE` 命令来读取与写入 Key 的内容。
## Principle Introduction
注意:
1. Redis 的 `SCAN` 命令只保证 `SCAN` 的开始与结束之前均存在的 Key 一定会被返回,但是新写入的 Key 有可能会被遗漏,期间删除的 Key 也可能已经被写入目的端。可以通过 `ksn` 配置解决
2. `SCAN` 命令与 `DUMP` 命令会占用源端数据库较多的 CPU 资源。
Scan Reader has two stages: SCAN and KSN. The SCAN stage is for full synchronization, while the KSN stage is for incremental synchronization.
### Full Data
**SCAN stage**: Enabled by default, can be disabled through the `scan` configuration. `scan_reader` uses the `SCAN` command to traverse all Keys in the source database, then uses `DUMP` to get the Value corresponding to the Key, and writes to the destination through the `RESTORE` command, completing full data synchronization.
## 配置
1. Redis's `SCAN` command only guarantees that Keys that <u>exist throughout the SCAN operation</u> will definitely be returned, but newly written Keys may be missed, and Keys deleted during this period may have already been written to the destination.
2. During the SCAN stage, RedisShake will calculate the current synchronization progress through the cursor returned by the `SCAN command`. This progress has a large error and is for reference only. For non-Redis databases, the cursor calculation method is different from Redis, so you may see incorrect progress displays, which can be ignored.
```toml
[scan_reader]
### Incremental Data
**KSN stage**: Disabled by default, can be enabled through `ksn`, which can solve the problem of missing Keys during the SCAN stage. Incremental data synchronization does not start after the SCAN stage ends, but proceeds simultaneously with it, and continues after the SCAN stage ends until RedisShake exits.
`ksn` uses [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/) capability to subscribe to Key changes. Specifically, RedisShake will use the `psubscribe` command to subscribe to `__keyevent@*__:*`. When a Key changes, RedisShake will receive the modified Key, then use the `DUMP` and `RESTORE` commands to read the content of the Key from the source and write it to the destination.
1. Redis does not enable the `notify-keyspace-events` configuration by default. It needs to be manually enabled, ensuring the value contains `AE`.
2. If the source disconnects during the KSN stage, consider appropriately increasing the value of `client-output-buffer-limit pubsub`. [802](https://github.com/tair-opensource/RedisShake/issues/802)
3. `Redis keyspace notifications` will not detect `FLUSHALL` and `FLUSHDB` commands, so when using the `ksn` parameter, ensure that the source database does not execute these two commands.
### Performance Impact
Both SCAN and KSN stages use the DUMP command to obtain data. The DUMP command is CPU-intensive and will cause high pressure on the source. It needs to be used carefully to avoid affecting the availability of the source instance.
* For the SCAN stage, you can adjust the `count` parameter to reduce the pressure on the source. It's recommended to start from 1 and gradually increase.
* For the KSN stage, there are currently no adjustable parameters. The decision to enable it should be based on an assessment of the write request volume at the source.
Reference data for performance impact: When the source instance's write QPS is about 150,000, the source CPU usage is 47%. After enabling RedisShake, the source CPU usage becomes 91%.
## Configuration
```
cluster = false # set to true if source is a redis cluster
address = "127.0.0.1:6379" # when cluster is true, set address to one of the cluster node
username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false
dbs = [] # set you want to scan dbs such as [1,5,7], if you don't want to scan all
scan = true # set to false if you don't want to scan keys
ksn = false # set to true to enabled Redis keyspace notifications (KSN) subscription
dbs = [] # set you want to scan dbs, if you don't want to scan all
count = 1 # number of keys to scan per iteration
```
* `cluster`:源端是否为集群
* `address`:源端地址, 当源端为集群时,`address` 为集群中的任意一个节点即可
* 鉴权:
* 当源端使用 ACL 账号时,配置 `username``password`
* 当源端使用传统账号时,仅配置 `password`
* 当源端无鉴权时,不配置 `username``password`
* `tls`:源端是否开启 TLS/SSL不需要配置证书因为 RedisShake 没有校验服务器证书
* `ksn`:开启 `ksn` 参数后 RedisShake 会在 `SCAN` 之前使用 [Redis keyspace notifications](https://redis.io/docs/manual/keyspace-notifications/)
能力来订阅 Key 的变化。当 Key 发生变化时RedisShake 会使用 `DUMP``RESTORE` 命令来从源端读取 Key 的内容,并写入目标端。
* `dbs`源端为非集群模式时支持指定DB库
::: warning
Redis keyspace notifications 不会感知到 `FLUSHALL``FLUSHDB` 命令,因此在使用 `ksn` 参数时,需要确保源端数据库不会执行这两个命令。
:::
* `cluster`: Whether the source is a cluster
* `address`: Source address. When the source is a cluster, `address` can be any node in the cluster
* Authentication:
* When the source uses ACL accounts, configure `username` and `password`
* When the source uses traditional accounts, only configure `password`
* When the source has no authentication, do not configure `username` and `password`
* `tls`: Whether the source has enabled TLS/SSL. No need to configure a certificate because RedisShake does not verify the server certificate
* `dbs`: For non-cluster mode sources, supports synchronizing only specified DB libraries.
* `scan`: Whether to enable the SCAN stage. When set to false, RedisShake will skip the full synchronization stage
* `ksn`: After enabling the `ksn` parameter, RedisShake will subscribe to Key changes at the source to achieve incremental synchronization
* `count`: The number of keys fetched from the source each time during full synchronization. The default is 1. Changing to a larger value can significantly improve synchronization efficiency, but will also increase pressure on the source.

View File

@ -1,10 +1,10 @@
# Redis Writer
## 介绍
## Introduction
`redis_writer` 用于将数据写入 Redis-like 数据库。
`redis_writer` is used to write data to Redis-like databases.
## 配置
## Configuration
```toml
[redis_writer]
@ -15,14 +15,17 @@ password = "" # keep empty if no authentication is required
tls = false
```
* `cluster`:是否为集群。
* `address`:连接地址。当目的端为集群时,`address` 填写集群中的任意一个节点即可
* 鉴权:
* 当使用 ACL 账号体系时,配置 `username``password`
* 当使用传统账号体系时,仅配置 `password`
* 当无鉴权时,不配置 `username``password`
* `tls`:是否开启 TLS/SSL不需要配置证书因为 RedisShake 没有校验服务器证书
* `cluster`: Whether it's a cluster or not.
* `address`: Connection address. When the destination is a cluster, `address` can be any node in the cluster.
* Authentication:
* When using the ACL account system, configure both `username` and `password`
* When using the traditional account system, only configure `password`
* When no authentication is required, leave both `username` and `password` empty
* `tls`: Whether to enable TLS/SSL. No need to configure certificates as RedisShake doesn't verify server certificates.
Important notes:
1. When the destination is a cluster, ensure that the commands from the source satisfy the [requirement that keys' hash values belong to the same slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset).
2. It's recommended to ensure that the destination version is greater than or equal to the source version, otherwise unsupported commands may occur. If a lower version is necessary, you can set `target_redis_proto_max_bulk_len` to 0 to avoid using the `restore` command for data recovery.
注意事项:
1. 当目的端为集群时,应保证源端发过来的命令满足 [Key 的哈希值属于同一个 slot](https://redis.io/docs/reference/cluster-spec/#implemented-subset)。
2. 应尽量保证目的端版本大于等于源端版本,否则可能会出现不支持的命令。如确实需要降低版本,可以设置 `target_redis_proto_max_bulk_len` 为 0来避免使用 `restore` 命令恢复数据。

View File

@ -0,0 +1,40 @@
---
outline: deep
---
# 内置过滤规则
RedisShake 提供了多种内置的过滤规则,用户可以根据需要选择合适的规则。
## 过滤 Key
RedisShake 支持通过键名前缀和后缀进行过滤。您可以在配置文件中设置以下选项,例如:
```toml
allow_key_prefix = ["user:", "product:"]
allow_key_suffix = [":active", ":valid"]
block_key_prefix = ["temp:", "cache:"]
block_key_suffix = [":tmp", ":old"]
```
如果不设置这些选项,默认允许所有键。
## 过滤数据库
您可以指定允许或阻止的数据库编号,例如:
```toml
allow_db = [0, 1, 2]
block_db = [3, 4, 5]
```
如果不设置这些选项,默认允许所有数据库。
## 过滤命令
RedisShake 允许您过滤特定的 Redis 命令,例如:
```toml
allow_command = ["GET", "SET"]
block_command = ["DEL", "FLUSHDB"]
```
## 过滤命令组
您还可以按命令组进行过滤,可用的命令组包括:
SERVER, STRING, CLUSTER, CONNECTION, BITMAP, LIST, SORTED_SET, GENERIC, TRANSACTIONS, SCRIPTING, TAIRHASH, TAIRSTRING, TAIRZSET, GEO, HASH, HYPERLOGLOG, PUBSUB, SET, SENTINEL, STREAM
例如:
```toml
allow_command_group = ["STRING", "HASH"]
block_command_group = ["SCRIPTING", "PUBSUB"]
```

View File

@ -54,3 +54,96 @@ address = "127.0.0.1:6380"
### 函数
* `shake.call(DB, ARGV)`:返回一个 Redis 命令RedisShake 会将该命令写入目标端。
* `shake.log(msg)`:打印日志。
## 最佳实践
### 过滤 Key
```lua
local prefix = "user:"
local prefix_len = #prefix
if string.sub(KEYS[1], 1, prefix_len) ~= prefix then
return
end
shake.call(DB, ARGV)
```
效果是只将 key 以 `user:` 开头的源数据写入到目标端。没有考虑 `mset` 等多 key 命令的情况。
### 过滤 DB
```lua
shake.log(DB)
if DB == 0
then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端 `db` 0 的数据,将其他 `db` 的数据写入到目标端。
### 过滤某类数据结构
可以通过 `GROUP` 变量来判断数据结构类型,支持的数据结构类型有:`STRING`、`LIST`、`SET`、`ZSET`、`HASH`、`SCRIPTING` 等。
#### 过滤 Hash 类型数据
```lua
if GROUP == "HASH" then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端的 `hash` 类型数据,将其他数据写入到目标端。
#### 过滤 [LUA 脚本](https://redis.io/docs/interact/programmability/eval-intro/)
```lua
if GROUP == "SCRIPTING" then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端的 `lua` 脚本,将其他数据写入到目标端。常见于主从同步至集群时,存在集群不支持的 LUA 脚本。
### 修改 Key 的前缀
```lua
local prefix_old = "prefix_old_"
local prefix_new = "prefix_new_"
shake.log("old=" .. table.concat(ARGV, " "))
for i, index in ipairs(KEY_INDEXES) do
local key = ARGV[index]
if string.sub(key, 1, #prefix_old) == prefix_old then
ARGV[index] = prefix_new .. string.sub(key, #prefix_old + 1)
end
end
shake.log("new=" .. table.concat(ARGV, " "))
shake.call(DB, ARGV)
```
效果是将源端的 key `prefix_old_key` 写入到目标端的 key `prefix_new_key`
### 交换 DB
```lua
local db1 = 1
local db2 = 2
if DB == db1 then
DB = db2
elseif DB == db2 then
DB = db1
end
shake.call(DB, ARGV)
```
效果是将源端的 `db 1` 写入到目标端的 `db 2`,将源端的 `db 2` 写入到目标端的 `db 1`, 其他 `db` 不变。

View File

@ -1,99 +0,0 @@
---
outline: deep
---
# 最佳实践
## 过滤
### 过滤 Key
```lua
local prefix = "user:"
local prefix_len = #prefix
if string.sub(KEYS[1], 1, prefix_len) ~= prefix then
return
end
shake.call(DB, ARGV)
```
效果是只将 key 以 `user:` 开头的源数据写入到目标端。没有考虑 `mset` 等多 key 命令的情况。
### 过滤 DB
```lua
shake.log(DB)
if DB == 0
then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端 `db` 0 的数据,将其他 `db` 的数据写入到目标端。
### 过滤某类数据结构
可以通过 `GROUP` 变量来判断数据结构类型,支持的数据结构类型有:`STRING`、`LIST`、`SET`、`ZSET`、`HASH`、`SCRIPTING` 等。
#### 过滤 Hash 类型数据
```lua
if GROUP == "HASH" then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端的 `hash` 类型数据,将其他数据写入到目标端。
#### 过滤 [LUA 脚本](https://redis.io/docs/interact/programmability/eval-intro/)
```lua
if GROUP == "SCRIPTING" then
return
end
shake.call(DB, ARGV)
```
效果是丢弃源端的 `lua` 脚本,将其他数据写入到目标端。常见于主从同步至集群时,存在集群不支持的 LUA 脚本。
## 修改
### 修改 Key 的前缀
```lua
local prefix_old = "prefix_old_"
local prefix_new = "prefix_new_"
shake.log("old=" .. table.concat(ARGV, " "))
for i, index in ipairs(KEY_INDEXES) do
local key = ARGV[index]
if string.sub(key, 1, #prefix_old) == prefix_old then
ARGV[index] = prefix_new .. string.sub(key, #prefix_old + 1)
end
end
shake.log("new=" .. table.concat(ARGV, " "))
shake.call(DB, ARGV)
```
效果是将源端的 key `prefix_old_key` 写入到目标端的 key `prefix_new_key`
### 交换 DB
```lua
local db1 = 1
local db2 = 2
if DB == db1 then
DB = db2
elseif DB == db2 then
DB = db1
end
shake.call(DB, ARGV)
```
效果是将源端的 `db 1` 写入到目标端的 `db 2`,将源端的 `db 2` 写入到目标端的 `db 1`, 其他 `db` 不变。

View File

@ -9,7 +9,6 @@ RedisShake 使用 [TOML](https://toml.io/cn/) 语言书写,所有的配置参
配置文件的组成如下:
```toml
function = "..."
[xxx_reader]
...
@ -17,15 +16,13 @@ function = "..."
[xxx_writer]
...
[filter]
...
[advanced]
...
```
一般用法下,只需要书写 `xxx_reader`、`xxx_writer` 两个部分即可,`function` 和 `advanced` 部分为进阶用法,用户可以根据自己的需求进行配置。
## function 配置
参考 [什么是 function](../function/introduction.md)。
## reader 配置
@ -42,42 +39,10 @@ RedisShake 提供了不同的 Writer 用来对接不同的目标端,配置详
* [Redis Writer](../writer/redis_writer.md)
## filter 配置
允许通过配置文件设置过滤规则,参考 [过滤与加工](../filter/filter.md) 与 [function](../filter/function.md)。
## advanced 配置
```toml
[advanced]
dir = "data"
ncpu = 3 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores
pprof_port = 0 # pprof port, 0 means disable
status_port = 0 # status port, 0 means disable
# log
log_file = "shake.log"
log_level = "info" # debug, info or warn
log_interval = 5 # in seconds
# redis-shake gets key and value from rdb file, and uses RESTORE command to
# create the key in target redis. Redis RESTORE will return a "Target key name
# is busy" error when key already exists. You can use this configuration item
# to change the default behavior of restore:
# panic: redis-shake will stop when meet "Target key name is busy" error.
# rewrite: redis-shake will replace the key with new value.
# ignore: redis-shake will skip restore the key when meet "Target key name is busy" error.
rdb_restore_command_behavior = "rewrite" # panic, rewrite or skip
# redis-shake uses pipeline to improve sending performance.
# This item limits the maximum number of commands in a pipeline.
pipeline_count_limit = 1024
# Client query buffers accumulate new commands. They are limited to a fixed
# amount by default. This amount is normally 1gb.
target_redis_client_max_querybuf_len = 1024_000_000
# In the Redis protocol, bulk requests, that are, elements representing single
# strings, are normally limited to 512 mb.
target_redis_proto_max_bulk_len = 512_000_000
# If the source is Elasticache or MemoryDB, you can set this item.
aws_psync = ""
```
参考 [shake.toml 配置文件](https://github.com/tair-opensource/RedisShake/blob/v4/shake.toml)。

2
go.mod
View File

@ -1,6 +1,6 @@
module RedisShake
go 1.20
go 1.21
require (
github.com/dustin/go-humanize v1.0.1

View File

@ -12,6 +12,20 @@ import (
"github.com/spf13/viper"
)
type FilterOptions struct {
AllowKeyPrefix []string `mapstructure:"allow_key_prefix" default:"[]"`
AllowKeySuffix []string `mapstructure:"allow_key_suffix" default:"[]"`
BlockKeyPrefix []string `mapstructure:"block_key_prefix" default:"[]"`
BlockKeySuffix []string `mapstructure:"block_key_suffix" default:"[]"`
AllowDB []int `mapstructure:"allow_db" default:"[]"`
BlockDB []int `mapstructure:"block_db" default:"[]"`
AllowCommand []string `mapstructure:"allow_command" default:"[]"`
BlockCommand []string `mapstructure:"block_command" default:"[]"`
AllowCommandGroup []string `mapstructure:"allow_command_group" default:"[]"`
BlockCommandGroup []string `mapstructure:"block_command_group" default:"[]"`
Function string `mapstructure:"function" default:""`
}
type AdvancedOptions struct {
Dir string `mapstructure:"dir" default:"data"`
@ -59,7 +73,7 @@ func (opt *AdvancedOptions) GetPSyncCommand(address string) string {
}
type ShakeOptions struct {
Function string `mapstructure:"function" default:""`
Filter FilterOptions
Advanced AdvancedOptions
Module ModuleOptions
}

121
internal/filter/filter.go Normal file
View File

@ -0,0 +1,121 @@
package filter
import (
"RedisShake/internal/config"
"RedisShake/internal/entry"
"log"
"slices"
"strings"
)
// Filter returns:
// - true if the entry should be processed
// - false if it should be filtered out
func Filter(e *entry.Entry) bool {
keyResults := make([]bool, len(e.Keys))
for i := range keyResults {
keyResults[i] = true
}
for inx, key := range e.Keys {
// Check if the key matches any of the allowed patterns
allow := false
for _, prefix := range config.Opt.Filter.AllowKeyPrefix {
if strings.HasPrefix(key, prefix) {
allow = true
}
}
for _, suffix := range config.Opt.Filter.AllowKeySuffix {
if strings.HasSuffix(key, suffix) {
allow = true
}
}
if len(config.Opt.Filter.AllowKeyPrefix) == 0 && len(config.Opt.Filter.AllowKeySuffix) == 0 {
allow = true
}
if !allow {
keyResults[inx] = false
}
// Check if the key matches any of the blocked patterns
block := false
for _, prefix := range config.Opt.Filter.BlockKeyPrefix {
if strings.HasPrefix(key, prefix) {
block = true
}
}
for _, suffix := range config.Opt.Filter.BlockKeySuffix {
if strings.HasSuffix(key, suffix) {
block = true
}
}
if block {
keyResults[inx] = false
}
}
allTrue := true
allFalse := true
var passedKeys, filteredKeys []string
for i, result := range keyResults {
if result {
allFalse = false
passedKeys = append(passedKeys, e.Keys[i])
} else {
allTrue = false
filteredKeys = append(filteredKeys, e.Keys[i])
}
}
if allTrue {
// All keys are allowed, continue checking
} else if allFalse {
return false
} else {
// If we reach here, it means some keys are true and some are false
log.Printf("Error: Inconsistent filter results for entry with %d keys", len(e.Keys))
log.Printf("Passed keys: %v", passedKeys)
log.Printf("Filtered keys: %v", filteredKeys)
return false
}
// Check if the database matches any of the allowed databases
if len(config.Opt.Filter.AllowDB) > 0 {
if !slices.Contains(config.Opt.Filter.AllowDB, e.DbId) {
return false
}
}
// Check if the database matches any of the blocked databases
if len(config.Opt.Filter.BlockDB) > 0 {
if slices.Contains(config.Opt.Filter.BlockDB, e.DbId) {
return false
}
}
// Check if the command matches any of the allowed commands
if len(config.Opt.Filter.AllowCommand) > 0 {
if !slices.Contains(config.Opt.Filter.AllowCommand, e.CmdName) {
return false
}
}
// Check if the command matches any of the blocked commands
if len(config.Opt.Filter.BlockCommand) > 0 {
if slices.Contains(config.Opt.Filter.BlockCommand, e.CmdName) {
return false
}
}
// Check if the command group matches any of the allowed command groups
if len(config.Opt.Filter.AllowCommandGroup) > 0 {
if !slices.Contains(config.Opt.Filter.AllowCommandGroup, e.Group) {
return false
}
}
// Check if the command group matches any of the blocked command groups
if len(config.Opt.Filter.BlockCommandGroup) > 0 {
if slices.Contains(config.Opt.Filter.BlockCommandGroup, e.Group) {
return false
}
}
return true
}

View File

@ -1,4 +1,4 @@
package function
package filter
import (
"strings"
@ -16,7 +16,7 @@ type Runtime struct {
compiledFunction *lua.FunctionProto
}
func New(luaCode string) *Runtime {
func NewFunctionFilter(luaCode string) *Runtime {
if len(luaCode) == 0 {
return nil
}

View File

@ -1,4 +1,4 @@
package function
package filter
import (
"testing"
@ -19,22 +19,24 @@ import (
// BenchmarkRunFunction-16 118228 8482 ns/op 15292 B/op 42 allocs/op
func BenchmarkRunFunction(b *testing.B) {
config.Opt = config.ShakeOptions{
Function: `
local prefix = "mlpSummary:"
local prefix_len = #prefix
if KEYS[1] == nil then
return
end
if KEYS[1] == "" then
return
end
if string.sub(KEYS[1], 1, prefix_len) ~= prefix then
return
end
shake.call(DB, ARGV)
`,
Filter: config.FilterOptions{
Function: `
local prefix = "mlpSummary:"
local prefix_len = #prefix
if KEYS[1] == nil then
return
end
if KEYS[1] == "" then
return
end
if string.sub(KEYS[1], 1, prefix_len) ~= prefix then
return
end
shake.call(DB, ARGV)
`,
},
}
luaRuntime := New(config.Opt.Function)
luaRuntime := NewFunctionFilter(config.Opt.Filter.Function)
e := &entry.Entry{
DbId: 0,
Argv: []string{"set", "mlpSummary:1", "1"},

View File

@ -1,6 +1,3 @@
function = ""
[sync_reader]
cluster = false # set to true if source is a redis cluster
address = "127.0.0.1:6379" # when cluster is true, set address to one of the cluster node
@ -28,7 +25,7 @@ try_diskless = false # set to true if you want to sync by socket and sourc
# [aof_reader]
# filepath = "/tmp/.aof"
# timestamp = 0 # subsecond
# timestamp = 0 # subsecond
[redis_writer]
cluster = false # set to true if target is a redis cluster
@ -40,6 +37,53 @@ password = "" # keep empty if no authentication is required
tls = false
off_reply = false # turn off the server reply
[filter]
# Allow keys with specific prefixes or suffixes
# Examples:
# allow_key_prefix = ["user:", "product:"]
# allow_key_suffix = [":active", ":valid"]
# Leave empty to allow all keys
allow_key_prefix = []
allow_key_suffix = []
# Block keys with specific prefixes or suffixes
# Examples:
# block_key_prefix = ["temp:", "cache:"]
# block_key_suffix = [":tmp", ":old"]
# Leave empty to block nothing
block_key_prefix = []
block_key_suffix = []
# Specify allowed and blocked database numbers (e.g., allow_db = [0, 1, 2], block_db = [3, 4, 5])
# Leave empty to allow all databases
allow_db = []
block_db = []
# Allow or block specific commands
# Examples:
# allow_command = ["GET", "SET"] # Only allow GET and SET commands
# block_command = ["DEL", "FLUSHDB"] # Block DEL and FLUSHDB commands
# Leave empty to allow all commands
allow_command = []
block_command = []
# Allow or block specific command groups
# Available groups:
# SERVER, STRING, CLUSTER, CONNECTION, BITMAP, LIST, SORTED_SET,
# GENERIC, TRANSACTIONS, SCRIPTING, TAIRHASH, TAIRSTRING, TAIRZSET,
# GEO, HASH, HYPERLOGLOG, PUBSUB, SET, SENTINEL, STREAM
# Examples:
# allow_command_group = ["STRING", "HASH"] # Only allow STRING and HASH commands
# block_command_group = ["SCRIPTING", "PUBSUB"] # Block SCRIPTING and PUBSUB commands
# Leave empty to allow all command groups
allow_command_group = []
block_command_group = []
# Function for custom data processing
# For best practices and examples, visit:
# https://tair-opensource.github.io/RedisShake/zh/function/best_practices.html
function = ""
[advanced]
dir = "data"
ncpu = 0 # runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores
@ -61,18 +105,25 @@ log_interval = 5 # in seconds
rdb_restore_command_behavior = "panic" # panic, rewrite or skip
# redis-shake uses pipeline to improve sending performance.
# This item limits the maximum number of commands in a pipeline.
# Adjust this value based on the destination Redis performance:
# - Higher values may improve performance for capable destinations.
# - Lower values are recommended for destinations with poor performance.
# 1024 is a good default value for most cases.
pipeline_count_limit = 1024
# Client query buffers accumulate new commands. They are limited to a fixed
# amount by default. This amount is normally 1gb.
target_redis_client_max_querybuf_len = 1024_000_000
# This setting corresponds to the 'client-query-buffer-limit' in Redis configuration.
# The default value is typically 1GB.
# It's recommended not to modify this value unless absolutely necessary.
target_redis_client_max_querybuf_len = 1073741824 # 1GB in bytes
# In the Redis protocol, bulk requests, that are, elements representing single
# strings, are normally limited to 512 mb.
# This setting corresponds to the 'proto-max-bulk-len' in Redis configuration.
# It defines the maximum size of a single string element in the Redis protocol.
# The value must be 1MB or greater. Default is 512MB.
# It's recommended not to modify this value unless absolutely necessary.
target_redis_proto_max_bulk_len = 512_000_000
# If the source is Elasticache or MemoryDB, you can set this item.
# If the source is Elasticache, you can set this item. AWS ElastiCache has custom
# psync command, which can be obtained through a ticket.
aws_psync = "" # example: aws_psync = "10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379@xhma21xfkssync"
# destination will delete itself entire database before fetching files

View File

@ -9,7 +9,8 @@ def filter_db():
dst = h.Redis()
opts = h.ShakeOpts.create_sync_opts(src, dst)
opts["function"] = """
opts["filter"] = {}
opts["filter"]["function"] = """
shake.log(DB)
if DB == 0
then
@ -39,7 +40,8 @@ def split_mset_to_set():
src = h.Redis()
dst = h.Redis()
opts = h.ShakeOpts.create_sync_opts(src, dst)
opts["function"] = """
opts["filter"] = {}
opts["filter"]["function"] = """
shake.log(KEYS)
if CMD == "MSET"
then