fix: add start position

This commit is contained in:
danfengcao 2016-12-08 15:18:33 +08:00
parent cc946eb62d
commit 84d5a3be0a
3 changed files with 89 additions and 48 deletions

View File

@ -35,26 +35,22 @@ pip install -r requirements.txt
**解析出标准SQL**
```bash
$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -d dbname -t table1 table2 --start-file='mysql-bin.000002' --start-pos=1240
$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t test3 test4 --start-file='mysql-bin.000002'
输出:
INSERT INTO d(`did`, `updateTime`, `uid`) VALUES (18, '2016-12-07 14:01:14', 4);
INSERT INTO c(`id`, `name`) VALUES (0, 'b');
UPDATE d SET `did`=17, `updateTime`='2016-12-07 14:01:14', `uid`=4 WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 LIMIT 1;
DELETE FROM c WHERE `id`=0 AND `name`='b' LIMIT 1;
INSERT INTO `test`.`test3`(`data`, `id`, `data2`) VALUES ('Hello', 1, 'World'); #start 474 end 642
INSERT INTO `test`.`test4`(`data`, `id`, `data2`) VALUES ('Hello', 1, 'World'); #start 669 end 837
UPDATE `test`.`test4` SET `data`='World', `id`=1, `data2`='Hello' WHERE `data`='Hello' AND `id`=1 AND `data2`='World' LIMIT 1; #start 864 end 1052
DELETE FROM `test`.`test4` WHERE `data`='World' AND `id`=1 AND `data2`='Hello' LIMIT 1; #start 1079 end 1247
```
**解析出回滚SQL**
```bash
python binlog2sql.py --flashback -h127.0.0.1 -P3306 -uadmin -p'admin' -d dbname -t table1 table2 --start-file='mysql-bin.000002' --start-pos=1240
python binlog2sql.py --flashback -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -ttest4 --start-file='mysql-bin.000002' --start-pos=1079 --end-pos=1247
输出:
INSERT INTO c(`id`, `name`) VALUES (0, 'b');
UPDATE d SET `did`=18, `updateTime`='2016-12-07 14:01:14', `uid`=4 WHERE `did`=17 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 LIMIT 1;
DELETE FROM c WHERE `id`=0 AND `name`='b' LIMIT 1;
DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 LIMIT 1;
DELETE FROM `test`.`test4` WHERE `data`='World' AND `id`=1 AND `data2`='Hello' LIMIT 1; #start 1079 end 1247
```
###选项
**mysql连接配置**
@ -63,11 +59,11 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4
**解析模式**
--realtime 持续同步binlog。可选。不加则同步至执行命令时最新的binlog位置。
--stop-never 持续同步binlog。可选。不加则同步至执行命令时最新的binlog位置。
--popPk 对INSERT语句去除主键。可选。
-B, --flashback 生成回滚语句。可选。与realtime或popPk不能同时添加。
-B, --flashback 生成回滚语句。可选。与stop-never或popPk不能同时添加。
**解析范围控制**
@ -75,9 +71,9 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4
--start-pos start-file的起始解析位置。可选。默认为start-file的起始位置
--end-file 末尾解析文件。可选。默认为start-file同一个文件。若解析模式为realtime,此选项失效。
--end-file 末尾解析文件。可选。默认为start-file同一个文件。若解析模式为stop-never,此选项失效。
--end-pos end-file的末尾解析位置。可选。默认为end-file的最末位置若解析模式为realtime,此选项失效。
--end-pos end-file的末尾解析位置。可选。默认为end-file的最末位置若解析模式为stop-never,此选项失效。
**对象过滤**
@ -87,7 +83,42 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4
###应用案例
**主从切换后数据不一致的修复**,详细描述可参见[example/FixOldMasterExtraData.md](./example/FixOldMasterExtraData.md)
#### **案例一 跑错SQL需要紧急回滚**
**背景**误删test库c表的数据需要紧急回滚。
**步骤**
1. 定位误操作的SQL位置
```bash
$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t c --start-file='mysql-bin.000002'
输出:
INSERT INTO `test`.`c`(`id`, `name`) VALUES (0, 'b'); #start 310 end 459
DELETE FROM `test`.`c` WHERE `id`=0 AND `name`='b' LIMIT 1; #start 682 end 831
UPDATE `test`.`c` SET `id`=3, `name`='b' WHERE `id`=3 AND `name`='a' LIMIT 1; #start 858 end 1015
```
2. 查看回滚sql是否正确
```bash
$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t c --start-file='mysql-bin.000002' --start-pos=682 --end-pos=831 -B
输出:
INSERT INTO `test`.`c`(`id`, `name`) VALUES (0, 'b'); #start 682 end 831
```
3. 执行回滚语句
```bash
$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t c --start-file='mysql-bin.000002' --start-pos=682 --end-pos=831 -B | mysql -h127.0.0.1 -P3306 -uadmin -p'admin'
回滚成功
```
**案例二 主从切换后数据不一致的修复**
详细描述可参见[example/FixOldMasterExtraData.md](./example/FixOldMasterExtraData.md)
1. 提取old master未同步的数据并对其中的insert语句去除主键为了防止步骤3中出现主键冲突
@ -117,11 +148,13 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4
* MySQL 5.6
###优点对比mysqlbinlog
* 纯Python开发安装与使用都很简单
* 自带flashback、popPk解析模式无需再装补丁
* 解析为标准SQL方便理解、调试
* 代码容易改造,可以支持更多个性化解析
###联系我
有任何问题,请与我联系 [danfengcao.info@gmail.com](danfengcao.info@gmail.com)

View File

@ -9,7 +9,7 @@ from pymysqlreplication.row_event import (
UpdateRowsEvent,
DeleteRowsEvent,
)
from pymysqlreplication.event import QueryEvent
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
def command_line_parser():
"""Returns a command line parser used for binlog2sql"""
@ -33,8 +33,8 @@ def command_line_parser():
help="End binlog file to be parsed. default: '--start-file'", default='')
range.add_argument('--end-pos', dest='endPos', type=int,
help="stop position of end binlog file. default: end position of '--end-file'", default=0)
parser.add_argument('--realtime', dest='realtime', action='store_true',
help='continuously replicate binlog. default: stop replicate when meeting the latest binlog when you run the program', default=False)
parser.add_argument('--stop-never', dest='stopnever', action='store_true',
help='Wait for more data from the server. default: stop replicate at the last binlog when you start binlog2sql', default=False)
parser.add_argument('--help', dest='help', action='store_true', help='help infomation', default=False)
@ -57,8 +57,8 @@ def command_line_args():
if args.help:
parser.print_help()
sys.exit(1)
if args.flashback and args.realtime:
raise ValueError('only one of flashback or realtime can be True')
if args.flashback and args.stopnever:
raise ValueError('only one of flashback or stop-never can be True')
if args.flashback and args.popPk:
raise ValueError('only one of flashback or popPk can be True')
return args
@ -75,7 +75,7 @@ def fix_object(value):
else:
return value
def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, popPk=False):
def concat_sql_from_binlogevent(cursor, binlogevent, row=None, eStartPos=None, flashback=False, popPk=False):
if flashback and popPk:
raise ValueError('only one of flashback or popPk can be True')
if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent):
@ -84,21 +84,21 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False,
sql = ''
if flashback is True:
if isinstance(binlogevent, WriteRowsEvent):
template = 'DELETE FROM {0} WHERE {1} LIMIT 1;'.format(
binlogevent.table,
template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
' AND '.join(map(compare_items, row['values'].items()))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, DeleteRowsEvent):
template = 'INSERT INTO {0}({1}) VALUES ({2});'.format(
binlogevent.table,
template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format(
binlogevent.schema, binlogevent.table,
', '.join(map(lambda k: '`%s`'%k, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, UpdateRowsEvent):
template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format(
binlogevent.table,
template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]),
' AND '.join(map(compare_items, row['after_values'].items())))
sql = cursor.mogrify(template, map(fix_object, row['before_values'].values()+row['after_values'].values()))
@ -108,21 +108,21 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False,
tableInfo = (binlogevent.table_map)[binlogevent.table_id]
if tableInfo.primary_key:
row['values'].pop(tableInfo.primary_key)
template = 'INSERT INTO {0}({1}) VALUES ({2});'.format(
binlogevent.table,
template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format(
binlogevent.schema, binlogevent.table,
', '.join(map(lambda k: '`%s`'%k, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, DeleteRowsEvent):
template ='DELETE FROM {0} WHERE {1} LIMIT 1;'.format(
binlogevent.table,
template ='DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
' AND '.join(map(compare_items, row['values'].items()))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, UpdateRowsEvent):
template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format(
binlogevent.table,
template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]),
' AND '.join(map(compare_items, row['before_values'].items()))
)
@ -131,13 +131,15 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False,
sql ='USE {0};\n{1};'.format(
binlogevent.schema, fix_object(binlogevent.query)
)
if type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
sql += ' #start %s end %s' % (eStartPos, binlogevent.packet.log_pos)
return sql
class Binlog2sql(object):
def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None,
endPos=None, only_schemas=None, only_tables=None, popPk=False, flashback=False, realtime=False):
endPos=None, only_schemas=None, only_tables=None, popPk=False, flashback=False, stopnever=False):
'''
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
'''
@ -154,7 +156,7 @@ class Binlog2sql(object):
self.only_tables = only_tables if only_tables else None
self.popPk = popPk
self.flashback = flashback
self.realtime = realtime
self.stopnever = stopnever
self.binlogList = []
self.connection = pymysql.connect(**self.connectionSettings)
@ -190,31 +192,37 @@ class Binlog2sql(object):
tmpFile = 'tmp.%s.%s.tmp' % (self.connectionSettings['host'],self.connectionSettings['port']) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w")
flagLastEvent = False
eStartPos = stream.log_pos
lastPos = stream.log_pos
try:
for binlogevent in stream:
if not self.realtime:
if not self.stopnever:
if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
flagLastEvent = True
elif not self.endPos:
if stream.log_file not in self.binlogList:
break
elif (stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos):
elif stream.log_file not in self.binlogList:
break
else:
raise ValueError('unknown binlog file or position')
elif (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos):
break
# else:
# raise ValueError('unknown binlog file or position')
if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN':
eStartPos = lastPos
if isinstance(binlogevent, QueryEvent):
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, popPk=self.popPk)
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, popPk=self.popPk)
if sql:
print sql
elif type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk)
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk, eStartPos=eStartPos)
if self.flashback:
ftmp.write(sql + '\n')
else:
print sql
if type(binlogevent) not in (RotateEvent, FormatDescriptionEvent):
lastPos = binlogevent.packet.log_pos
if flagLastEvent:
break
ftmp.close()
@ -239,5 +247,5 @@ if __name__ == '__main__':
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,
only_schemas=args.databases, only_tables=args.tables, popPk=args.popPk, flashback=args.flashback, realtime=args.realtime)
only_schemas=args.databases, only_tables=args.tables, popPk=args.popPk, flashback=args.flashback, stopnever=args.stopnever)
binlog2sql.process_binlog()

View File

@ -57,7 +57,7 @@ $ mysql -h10.1.1.2 -P3306 -uadmin -p'admin' < oldMaster.sql
如果表的主键id是有业务含义的则务必与业务方确认可行后再操作。
###参考文献
###参考资料
[1] 易固武, [MySQL数据库的高可用性分析](https://www.qcloud.com/community/article/203)
[2] 曹单锋, [Parse MySQL binlog to SQL you want](https://github.com/danfengcao/binlog2sql)
[2] danfengcao, [binlog2sql: Parse MySQL binlog to SQL you want](https://github.com/danfengcao/binlog2sql)