add aof_writer cmd_writer json_writer (#914)
This commit is contained in:
parent
85485fca16
commit
dbe3d7e4d4
|
@ -139,6 +139,14 @@ func main() {
|
|||
// create writer
|
||||
var theWriter writer.Writer
|
||||
switch {
|
||||
case v.IsSet("file_writer"):
|
||||
opts := new(writer.FileWriterOptions)
|
||||
defaults.SetDefaults(opts)
|
||||
err := v.UnmarshalKey("file_writer", opts)
|
||||
if err != nil {
|
||||
log.Panicf("failed to read the FileWriter config entry. err: %v", err)
|
||||
}
|
||||
theWriter = writer.NewFileWriter(ctx, opts)
|
||||
case v.IsSet("redis_writer"):
|
||||
opts := new(writer.RedisWriterOptions)
|
||||
defaults.SetDefaults(opts)
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
# file_writer
|
||||
|
||||
## Introduction
|
||||
|
||||
Can use ` file_writer ` to write data to file with type CMD/JSON/AOF .
|
||||
It is commonly used to extract/migrate/fix data by file.
|
||||
|
||||
## configuration
|
||||
|
||||
```toml
|
||||
[file_writer]
|
||||
filepath = "/tmp/cmd.txt"
|
||||
type = "cmd" #cmd,aof,json (default cmd)
|
||||
```
|
||||
|
||||
* An absolute filepath should be passed in.
|
||||
## application scenarios
|
||||
- share data between two system: one system write aof to disk/s3/oss, another system read file from them.
|
||||
- partial migrate data with business prefix: extract aof with prefix "XXX:" data from A system, B system import the aof with command `redis-cli --pipe XXX.aof` .
|
||||
- fix data by cmd file: export cmd data from one system, fix wrong data, and then import cmd file with command `redis-cli < cmd.txt`.
|
||||
- analysis data with json: export json file, and then import them into mongodb/bi to analysis.
|
||||
|
||||
## example output:
|
||||
### cmd_writer output:
|
||||
```
|
||||
SELECT 0
|
||||
set key1 1
|
||||
set key2 2
|
||||
set key3 3
|
||||
sadd key4 1 2 3 4
|
||||
lpush key5 1 2 3 4 5
|
||||
zadd key6 1 2 3 4 5 6
|
||||
```
|
||||
### json_writer output:
|
||||
```
|
||||
{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23}
|
||||
{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30}
|
||||
{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30}
|
||||
{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30}
|
||||
{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52}
|
||||
{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60}
|
||||
{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66}
|
||||
```
|
||||
### aof_writer output:
|
||||
```
|
||||
*2
|
||||
$6
|
||||
SELECT
|
||||
$1
|
||||
0
|
||||
*3
|
||||
$3
|
||||
set
|
||||
$4
|
||||
key1
|
||||
$1
|
||||
1
|
||||
```
|
|
@ -0,0 +1,57 @@
|
|||
# file_writer
|
||||
|
||||
## 介绍
|
||||
|
||||
可以使用 ` file_writer` 写文件, 可写的格式有 CMD/JSON/AOF, 常用于通过文件介质抽取/迁移/订正数据.
|
||||
## 配置
|
||||
|
||||
```toml
|
||||
[file_writer]
|
||||
filepath = "/tmp/cmd.txt"
|
||||
type = "cmd" #cmd,aof,json (default cmd)
|
||||
```
|
||||
|
||||
* 绝对路径 filepath 是必填的.
|
||||
|
||||
## 应用场景
|
||||
- 俩系统共享数据: 一个系统把文件写到 disk/s3/oss, 另一系统从中读取.
|
||||
- 跨系统局部迁移带指定前缀的数据: 从A系统迁出带前缀"XXX:"的数据, B系统通过命令导入这些数据 `redis-cli --pipe XXX.aof` .
|
||||
- 通过命令文件订正数据: 从一个系统中导出数据成cmd格式, 订正后再导入命令`redis-cli < cmd.txt`.
|
||||
- 通过json格式做数据分析: 导出成json文件, 导入到mongodb/bi做分析.
|
||||
|
||||
## 示例输出
|
||||
### cmd_writer 输出:
|
||||
```
|
||||
SELECT 0
|
||||
set key1 1
|
||||
set key2 2
|
||||
set key3 3
|
||||
sadd key4 1 2 3 4
|
||||
lpush key5 1 2 3 4 5
|
||||
zadd key6 1 2 3 4 5 6
|
||||
```
|
||||
### json_writer 输出:
|
||||
```
|
||||
{"DbId":0,"Argv":["SELECT","0"],"CmdName":"SELECT","Group":"CONNECTION","Keys":null,"KeyIndexes":null,"Slots":[],"SerializedSize":23}
|
||||
{"DbId":0,"Argv":["set","key1","1"],"CmdName":"SET","Group":"STRING","Keys":["key1"],"KeyIndexes":[2],"Slots":[9189],"SerializedSize":30}
|
||||
{"DbId":0,"Argv":["set","key2","2"],"CmdName":"SET","Group":"STRING","Keys":["key2"],"KeyIndexes":[2],"Slots":[4998],"SerializedSize":30}
|
||||
{"DbId":0,"Argv":["set","key3","3"],"CmdName":"SET","Group":"STRING","Keys":["key3"],"KeyIndexes":[2],"Slots":[935],"SerializedSize":30}
|
||||
{"DbId":0,"Argv":["sadd","key4","1","2","3","4"],"CmdName":"SADD","Group":"SET","Keys":["key4"],"KeyIndexes":[2],"Slots":[13120],"SerializedSize":52}
|
||||
{"DbId":0,"Argv":["lpush","key5","1","2","3","4","5"],"CmdName":"LPUSH","Group":"LIST","Keys":["key5"],"KeyIndexes":[2],"Slots":[9057],"SerializedSize":60}
|
||||
{"DbId":0,"Argv":["zadd","key6","1","2","3","4","5","6"],"CmdName":"ZADD","Group":"SORTED_SET","Keys":["key6"],"KeyIndexes":[2],"Slots":[4866],"SerializedSize":66}
|
||||
```
|
||||
### aof_writer 输出:
|
||||
```
|
||||
*2
|
||||
$6
|
||||
SELECT
|
||||
$1
|
||||
0
|
||||
*3
|
||||
$3
|
||||
set
|
||||
$4
|
||||
key1
|
||||
$1
|
||||
1
|
||||
```
|
|
@ -0,0 +1,129 @@
|
|||
package writer
|
||||
|
||||
import (
|
||||
"RedisShake/internal/entry"
|
||||
"RedisShake/internal/log"
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type FileType string
|
||||
|
||||
const (
|
||||
AOF FileType = "aof"
|
||||
CMD FileType = "cmd"
|
||||
JSON FileType = "json"
|
||||
)
|
||||
|
||||
var FileTypes = []FileType{CMD, AOF, JSON}
|
||||
|
||||
type FileWriterOptions struct {
|
||||
Filepath string `mapstructure:"filepath" default:""`
|
||||
FileType string `mapstructure:"type" default:"cmd"`
|
||||
}
|
||||
|
||||
type fileWriter struct {
|
||||
fileType FileType
|
||||
path string
|
||||
DbId int
|
||||
ch chan *entry.Entry
|
||||
chWg sync.WaitGroup
|
||||
stat struct {
|
||||
EntryCount int `json:"entry_count"`
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fileWriter) Write(e *entry.Entry) {
|
||||
w.ch <- e
|
||||
}
|
||||
|
||||
func (w *fileWriter) Close() {
|
||||
close(w.ch)
|
||||
w.chWg.Wait()
|
||||
}
|
||||
|
||||
func (w *fileWriter) Status() interface{} {
|
||||
return w.stat
|
||||
}
|
||||
|
||||
func (w *fileWriter) StatusString() string {
|
||||
return fmt.Sprintf("exported entry count=%d", w.stat.EntryCount)
|
||||
}
|
||||
|
||||
func (w *fileWriter) StatusConsistent() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func NewFileWriter(ctx context.Context, opts *FileWriterOptions) Writer {
|
||||
absolutePath, err := filepath.Abs(opts.Filepath)
|
||||
if err != nil {
|
||||
log.Panicf("NewFileWriter path=[%s]: filepath.Abs error: %s", opts.Filepath, err.Error())
|
||||
}
|
||||
log.Infof("NewFileWriter absolute path=[%s],type=[%s]", absolutePath, opts.FileType)
|
||||
w := &fileWriter{
|
||||
fileType: FileType(opts.FileType),
|
||||
DbId: 0,
|
||||
path: absolutePath,
|
||||
ch: make(chan *entry.Entry),
|
||||
}
|
||||
w.stat.EntryCount = 0
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *fileWriter) StartWrite(ctx context.Context) (ch chan *entry.Entry) {
|
||||
w.chWg = sync.WaitGroup{}
|
||||
w.chWg.Add(1)
|
||||
go w.processWrite(ctx)
|
||||
return w.ch
|
||||
|
||||
}
|
||||
|
||||
func (w *fileWriter) processWrite(ctx context.Context) {
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
file, err := os.Create(w.path)
|
||||
if err != nil {
|
||||
log.Panicf("create file failed:", err)
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
writer := bufio.NewWriter(file)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// do nothing until w.ch is closed
|
||||
case <-ticker.C:
|
||||
writer.Flush()
|
||||
case e, ok := <-w.ch:
|
||||
if !ok {
|
||||
w.chWg.Done()
|
||||
writer.Flush()
|
||||
return
|
||||
}
|
||||
w.stat.EntryCount++
|
||||
w.writeEntry(writer, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fileWriter) writeEntry(writer *bufio.Writer, e *entry.Entry) {
|
||||
switch w.fileType {
|
||||
case CMD:
|
||||
writer.WriteString(strings.Join(e.Argv, " ") + "\n")
|
||||
case AOF:
|
||||
writer.Write(e.Serialize())
|
||||
case JSON:
|
||||
// compute SerializeSize for json result
|
||||
e.Serialize()
|
||||
json, _ := json.Marshal(e)
|
||||
writer.Write(json)
|
||||
writer.WriteString("\n")
|
||||
}
|
||||
}
|
|
@ -35,6 +35,10 @@ password = "" # keep empty if no authentication is required
|
|||
tls = false
|
||||
off_reply = false # turn off the server reply
|
||||
|
||||
# [file_writer]
|
||||
# filepath = "/tmp/cmd.txt"
|
||||
# type = "cmd" #cmd,aof,json (default cmd)
|
||||
|
||||
[filter]
|
||||
# Allow keys with specific prefixes or suffixes
|
||||
# Examples:
|
||||
|
|
Loading…
Reference in New Issue