RedisShake/cmd/commands/cmd.go

382 lines
17 KiB
Go

package commands
import (
"RedisShake/internal/client"
"RedisShake/internal/config"
"RedisShake/internal/reader"
"RedisShake/internal/writer"
"bytes"
"container/heap"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"os"
"strings"
)
var subCommands = []string{"aof_reader", "rdb_reader", "scan_reader", "sync_reader",
"sync_reader.sentinel", "redis_writer", "redis_writer.sentinel",
"filter", "advanced", "module"}
var buildFuncs = []func() (*viper.Viper, *cobra.Command){
buildCommandAofReader,
buildCommandRedisWriterSentinel,
buildCommandRedisWriter,
buildCommandSyncReaderSentinel,
buildCommandRdbReader,
buildCommandSyncReader,
buildCommandScanReader,
buildCommandAdvanced,
buildCommandFilter,
buildCommandModule,
}
func main() {
if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help" || os.Args[1] == "help") {
ConvertArgs2Toml(true, false)
}
commandLine := strings.Join(os.Args[1:], ",")
if strings.Contains(commandLine, "reader") || strings.Contains(commandLine, "writer") ||
strings.Contains(commandLine, "filter") || strings.Contains(commandLine, "advanced") ||
strings.Contains(commandLine, "module") {
ConvertArgs2Toml(false, strings.Contains(commandLine, "--dry-run"))
}
}
func ConvertArgs2Toml(showHelp bool, dryRun bool) (string, error) {
var rootCmd = &cobra.Command{
Use: "redis-shake",
Long: `redis-shake [command_reader][flags] [command_writer][flags] [filter][flags] [advanced][flags] [module]
command_reader: aof_reader, rdb_reader, scan_reader, sync_reader, sync_reader.sentinel
command_writer: redis_writer, redis_writer.sentinel`,
}
viperMap := make(map[string]*viper.Viper)
commandMap := make(map[string]*cobra.Command)
for _, buildFunc := range buildFuncs {
vp, cmd := buildFunc()
viperMap[cmd.Use] = vp
commandMap[cmd.Use] = cmd
}
if showHelp {
rootCmd.Execute()
for _, subCmd := range subCommands {
commandMap[subCmd].Execute()
}
return "", nil
}
subCommandArgIndex := &IntPairHeap{}
for argI, arg := range os.Args {
for cmdI, cmd := range subCommands {
if arg == cmd {
heap.Push(subCommandArgIndex, Pair{cmdI, argI})
}
}
}
// eg:redis-shake redis_writer -a 127.0.0.1:6379 aof_reader -f appendonly.aof
// cmdIndex 5 0
// argIndex 0 1 2 3 4 5 6
// redis_writer 需要传入argIndex[1:4],aof_reader需要传入参数argIndex[4:7]
heap.Push(subCommandArgIndex, Pair{len(subCommands), len(os.Args)})
prePair := Pair{}
heapSize := subCommandArgIndex.Len()
for i := 0; i < heapSize; i++ {
pair := heap.Pop(subCommandArgIndex).(Pair)
argIndex := pair.Right
if argIndex == 0 {
continue
}
preArgIndex := prePair.Right
if preArgIndex != 0 && argIndex != 0 {
preCmdIndex := prePair.Left
command := commandMap[subCommands[preCmdIndex]]
command.SetArgs(os.Args[preArgIndex:argIndex])
err := command.Execute()
if err != nil {
return "", err
}
}
prePair = pair
}
description := "#this config file is generated by command: " + strings.Join(os.Args, " ")
toml ,err:= viperMap2Toml(viperMap, description)
if err != nil {
return "", err
}
if dryRun {
fmt.Println(string(toml))
return "", nil
}
homeDir, err := os.UserHomeDir()
if err != nil {
panic(any(err))
}
shakeHomeDir := homeDir + "/.redis-shake/cache/"
os.MkdirAll(shakeHomeDir, os.ModePerm)
filePath := shakeHomeDir + uuid.New().String() + ".toml"
fmt.Println("generate toml config file: " + filePath)
os.WriteFile(filePath, toml, 0644)
return filePath, nil
}
func viperMap2Toml(viperMap map[string]*viper.Viper, description string) ([]byte,error) {
var buffer bytes.Buffer
buffer.WriteString(description + "\n")
isAllConfigEmpty := true
for key, vp := range viperMap {
settings := vp.AllSettings()
if len(settings) == 0 {
continue
}
if isAllConfigEmpty {
isAllConfigEmpty = false
}
buffer.WriteString("[" + key + "]\n")
for k, v := range settings {
buffer.WriteString(fmt.Sprintf("%s = %#v\n", k, v))
}
buffer.WriteString("\n")
}
if isAllConfigEmpty{
return []byte{},errors.New("all config empty")
}
return buffer.Bytes(),nil
}
func buildCommandRedisWriterSentinel() (*viper.Viper, *cobra.Command) {
return buildCommandSentinel("redis_writer.sentinel")
}
func buildCommandSyncReaderSentinel() (*viper.Viper, *cobra.Command) {
return buildCommandSentinel("sync_reader.sentinel")
}
func buildCommandSentinel(commandName string) (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &client.SentinelOptions{}
bindKeys := []string{"cluster", "address", "username", "password", "tls", "off_reply"}
command := NewCommand(commandName, bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.MasterName, "master_name", "m", "", "")
flags.StringVarP(&opts.Address, "address", "a", "", "[required]eg: 127.0.0.1:6379")
flags.StringVarP(&opts.Username, "username", "u", "", "")
flags.StringVarP(&opts.Password, "password", "p", "", "")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "")
command.MarkFlagRequired("address")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}
func buildCommandAofReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.AOFReaderOptions{}
bindKeys := []string{"filepath", "timestamp"}
command := NewCommand("aof_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.Filepath, "filepath", "f", "/tmp/.aof", "[required]")
flags.Int64VarP(&opts.AOFTimestamp, "timestamp", "a", 0, "# subsecond")
command.MarkFlagRequired("filepath")
return vp, command
}
func buildCommandRdbReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.RdbReaderOptions{}
bindKeys := []string{"filepath"}
command := NewCommand("rdb_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.Filepath, "filepath", "f", "/tmp/dump.rdb", "[required]")
command.MarkFlagRequired("filepath")
return vp, command
}
func buildCommandSyncReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.SyncReaderOptions{}
bindKeys := []string{"cluster", "address", "username", "password", "tls", "sync_rdb", "sync_aof", "prefer_replica", "try_diskless"}
command := NewCommand("sync_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.BoolVarP(&opts.Cluster, "cluster", "c", false, "# Set to true if the source is a Redis cluster")
flags.StringVarP(&opts.Address, "address", "a", "127.0.0.1:6379", "# [required]For clusters, specify the address of any cluster node; use the master or slave address in master-slave mode")
flags.StringVarP(&opts.Username, "username", "u", "", "# Keep empty if ACL is not in use")
flags.StringVarP(&opts.Password, "password", "p", "", "# Keep empty if no authentication is required")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "# Set to true to enable TLS if needed")
flags.BoolVarP(&opts.SyncRdb, "sync_rdb", "d", true, "# Set to false if RDB synchronization is not required")
flags.BoolVarP(&opts.SyncAof, "sync_aof", "o", true, "# Set to false if AOF synchronization is not required")
flags.BoolVarP(&opts.PreferReplica, "prefer_replica", "r", false, "# Set to true to sync from a replica node")
flags.BoolVarP(&opts.TryDiskless, "try_diskless", "l", false, "# Set to true for diskless sync if the source has repl-diskless-sync=yes")
command.MarkFlagRequired("address")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}
func buildCommandScanReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.ScanReaderOptions{}
bindKeys := []string{"cluster", "address", "username", "password", "tls", "scan", "ksn", "dbs", "prefer_replica", "count"}
command := NewCommand("scan_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.BoolVarP(&opts.Cluster, "cluster", "c", false, "# set to true if source is a redis cluster")
flags.StringVarP(&opts.Address, "address", "a", "127.0.0.1:6379", "# [required] when cluster is true, set address to one of the cluster node")
flags.StringVarP(&opts.Username, "username", "u", "", "# keep empty if not using ACL")
flags.StringVarP(&opts.Password, "password", "p", "", "# keep empty if no authentication is required")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "")
flags.IntSliceVarP(&opts.DBS, "dbs", "d", []int{}, "# set you want to scan dbs such as [1,5,7], if you don't want to scan all")
flags.BoolVarP(&opts.Scan, "scan", "s", true, "# set to false if you don't want to scan keys")
flags.BoolVarP(&opts.KSN, "ksn", "k", false, "# set to true to enabled Redis keyspace notifications (KSN) subscription")
flags.IntVarP(&opts.Count, "count", "n", 1, "# number of keys to scan per iteration")
flags.BoolVarP(&opts.PreferReplica, "prefer_replica", "r", false, "")
command.MarkFlagRequired("address")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}
func buildCommandRedisWriter() (*viper.Viper, *cobra.Command) {
opts := &writer.RedisWriterOptions{}
vp := viper.New()
bindKeys := []string{"cluster", "address", "username", "password", "tls", "off_reply"}
command := NewCommand("redis_writer", bindKeys, vp, opts)
flags := command.Flags()
flags.BoolVarP(&opts.Cluster, "cluster", "c", false, "# set to true if target is a redis cluster")
flags.StringVarP(&opts.Address, "address", "a", "127.0.0.1:6380", "# when cluster is true, set address to one of the cluster node")
flags.StringVarP(&opts.Username, "username", "u", "", "# keep empty if not using ACL")
flags.StringVarP(&opts.Password, "password", "p", "", "# keep empty if no authentication is required")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "# turn off the server reply")
flags.BoolVarP(&opts.OffReply, "off_reply", "o", false, "# turn off the server reply")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}
func buildCommandFilter() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &config.FilterOptions{}
bindKeys := []string{"allow_keys", "allow_key_prefix", "allow_key_suffix", "block_keys",
"block_key_prefix", "block_key_suffix", "allow_key_regex", "block_key_regex",
"allow_db", "block_db", "allow_command", "block_command",
"allow_command_group", "block_command_group", "function"}
command := NewCommand("filter", bindKeys, vp, opts)
flags := command.Flags()
flags.StringSliceVarP(&opts.AllowKeys, "allow_keys", "a", []string{},
`# Allow keys with specific prefixes or suffixes
# Examples:
# allow_keys = ["user:1001", "product:2001"]
# allow_key_prefix = ["user:", "product:"]
# allow_key_suffix = [":active", ":valid"]
# allow A collection of keys containing 11-digit mobile phone numbers
# allow_key_regex = [":\\d{11}:"]
# Leave empty to allow all keys`)
flags.StringSliceVarP(&opts.AllowKeyPrefix, "allow_key_prefix", "p", []string{}, "")
flags.StringSliceVarP(&opts.AllowKeySuffix, "allow_key_suffix", "s", []string{}, "")
flags.StringSliceVarP(&opts.AllowKeyRegex, "allow_key_regex", "e", []string{}, "")
flags.StringSliceVarP(&opts.BlockKeys, "block_keys", "k", []string{},
`# Block keys with specific prefixes or suffixes
# Examples:
# block_keys = ["temp:1001", "cache:2001"]
# block_key_prefix = ["temp:", "cache:"]
# block_key_suffix = [":tmp", ":old"]
# block test 11-digit mobile phone numbers keys
# block_key_regex = [":test:\\d{11}:"]
# Leave empty to block nothing`)
flags.StringSliceVarP(&opts.BlockKeyPrefix, "block_key_prefix", "r", []string{}, "")
flags.StringSliceVarP(&opts.BlockKeySuffix, "block_key_suffix", "i", []string{}, "")
flags.StringSliceVarP(&opts.BlockKeyRegex, "block_key_regex", "x", []string{}, "")
flags.IntSliceVarP(&opts.AllowDB, "allow_db", "w", []int{},
`# Specify allowed and blocked database numbers (e.g., allow_db = [0, 1, 2], block_db = [3, 4, 5])
# Leave empty to allow all databases`)
flags.IntSliceVarP(&opts.BlockDB, "block_db", "o", []int{}, "")
flags.StringSliceVarP(&opts.AllowCommand, "allow_command", "m", []string{},
`# Allow or block specific commands
# Examples:
# allow_command = ["GET", "SET"] # Only allow GET and SET commands
# block_command = ["DEL", "FLUSHDB"] # Block DEL and FLUSHDB commands
# Leave empty to allow all commands`)
flags.StringSliceVarP(&opts.BlockCommand, "block_command", "l", []string{}, "")
flags.StringSliceVarP(&opts.AllowCommandGroup, "allow_command_group", "g", []string{},
`# Allow or block specific command groups
# Available groups:
# SERVER, STRING, CLUSTER, CONNECTION, BITMAP, LIST, SORTED_SET,
# GENERIC, TRANSACTIONS, SCRIPTING, TAIRHASH, TAIRSTRING, TAIRZSET,
# GEO, HASH, HYPERLOGLOG, PUBSUB, SET, SENTINEL, STREAM
# Examples:
# allow_command_group = ["STRING", "HASH"] # Only allow STRING and HASH commands
# block_command_group = ["SCRIPTING", "PUBSUB"] # Block SCRIPTING and PUBSUB commands
# Leave empty to allow all command groups`)
flags.StringSliceVarP(&opts.BlockCommandGroup, "block_command_group", "u", []string{}, "")
flags.StringVarP(&opts.Function, "function", "f", "",
`# Function for custom data processing
# For best practices and examples, visit:
# https://tair-opensource.github.io/RedisShake/zh/filter/function.html`)
return vp, command
}
func buildCommandAdvanced() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &config.AdvancedOptions{}
bindKeys := []string{"dir", "ncpu", "pprof_port", "status_port",
"log_file", "log_level", "log_interval", "rdb_restore_command_behavior",
"pipeline_count_limit", "target_redis_client_max_querybuf_len", "target_redis_proto_max_bulk_len", "aws_psync",
"empty_db_before_sync"}
command := NewCommand("advanced", bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.Dir, "dir", "d", "data", "")
flags.IntVarP(&opts.Ncpu, "ncpu", "a", 0, "# runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores")
flags.IntVarP(&opts.PprofPort, "pprof_port", "f", 0, "# pprof port, 0 means disable")
flags.IntVarP(&opts.StatusPort, "status_port", "t", 0, "# status port, 0 means disable")
flags.StringVarP(&opts.LogFile, "log_file", "l", "shake.log", "")
flags.StringVarP(&opts.LogLevel, "log_level", "e", "info", "# debug, info or warn")
flags.IntVarP(&opts.LogInterval, "log_interval", "i", 5, "# in seconds")
flags.StringVarP(&opts.RDBRestoreCommandBehavior, "rdb_restore_command_behavior", "b", "panic",
`# panic, rewrite or skip
# redis-shake gets key and value from rdb file, and uses RESTORE command to
# create the key in target redis. Redis RESTORE will return a "Target key name
# is busy" error when key already exists. You can use this configuration item
# to change the default behavior of restore:
# panic: redis-shake will stop when meet "Target key name is busy" error.
# rewrite: redis-shake will replace the key with new value.
# skip: redis-shake will skip restore the key when meet "Target key name is busy" error.`)
flags.Uint64VarP(&opts.PipelineCountLimit, "target_redis_client_max_querybuf_len", "q", 1073741824,
`# This setting corresponds to the 'client-query-buffer-limit' in Redis configuration.
# The default value is typically 1GB.
# It's recommended not to modify this value unless absolutely necessary.`)
flags.Int64VarP(&opts.TargetRedisClientMaxQuerybufLen, "target_redis_proto_max_bulk_len", "x", 512_000_000,
`# This setting corresponds to the 'proto-max-bulk-len' in Redis configuration.
# It defines the maximum size of a single string element in the Redis protocol.
# The value must be 1MB or greater. Default is 512MB.
# It's recommended not to modify this value unless absolutely necessary.`)
flags.StringVarP(&opts.AwsPSync, "aws_psync", "w", "",
`# If the source is Elasticache, you can set this item. AWS ElastiCache has custom
# psync command, which can be obtained through a ticket.`)
flags.BoolVarP(&opts.EmptyDBBeforeSync, "empty_db_before_sync", "y", false,
`# destination will delete itself entire database before fetching files
# from source during full synchronization.
# This option is similar redis replicas RDB diskless load option:
# repl-diskless-load on-empty-db`)
return vp, command
}
func buildCommandModule() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &config.ModuleOptions{}
bindKeys := []string{"target_mbbloom_version"}
command := NewCommand("module", bindKeys, vp, opts)
flags := command.Flags()
flags.IntVarP(&opts.TargetMBbloomVersion, "target_mbbloom_version", "v", 0, "# The data format for BF.LOADCHUNK is not compatible in different versions. v2.6.3 <=> 20603")
return vp, command
}
func NewCommand(commandName string, bindKeys []string, vp *viper.Viper, opts interface{}) *cobra.Command {
command := &cobra.Command{
Use: commandName,
Long: "\n------redis-shake " + commandName + " (description)------",
PreRun: func(cmd *cobra.Command, args []string) {
for _, name := range bindKeys {
vp.BindPFlag(name, cmd.Flags().Lookup(name))
}
vp.Unmarshal(opts)
},
Run: func(cmd *cobra.Command, args []string) {},
}
return command
}