From 84d5a3be0a6018af448d2bc78df92b673099cd05 Mon Sep 17 00:00:00 2001 From: danfengcao Date: Thu, 8 Dec 2016 15:18:33 +0800 Subject: [PATCH] fix: add start position --- README.md | 65 ++++++++++++++++++++++-------- binlog2sql/binlog2sql.py | 68 ++++++++++++++++++-------------- example/FixOldMasterExtraData.md | 4 +- 3 files changed, 89 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 4369a44..3fae696 100644 --- a/README.md +++ b/README.md @@ -35,26 +35,22 @@ pip install -r requirements.txt **解析出标准SQL** ```bash -$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -d dbname -t table1 table2 --start-file='mysql-bin.000002' --start-pos=1240 +$ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t test3 test4 --start-file='mysql-bin.000002' 输出: -INSERT INTO d(`did`, `updateTime`, `uid`) VALUES (18, '2016-12-07 14:01:14', 4); -INSERT INTO c(`id`, `name`) VALUES (0, 'b'); -UPDATE d SET `did`=17, `updateTime`='2016-12-07 14:01:14', `uid`=4 WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 LIMIT 1; -DELETE FROM c WHERE `id`=0 AND `name`='b' LIMIT 1; - +INSERT INTO `test`.`test3`(`data`, `id`, `data2`) VALUES ('Hello', 1, 'World'); #start 474 end 642 +INSERT INTO `test`.`test4`(`data`, `id`, `data2`) VALUES ('Hello', 1, 'World'); #start 669 end 837 +UPDATE `test`.`test4` SET `data`='World', `id`=1, `data2`='Hello' WHERE `data`='Hello' AND `id`=1 AND `data2`='World' LIMIT 1; #start 864 end 1052 +DELETE FROM `test`.`test4` WHERE `data`='World' AND `id`=1 AND `data2`='Hello' LIMIT 1; #start 1079 end 1247 ``` **解析出回滚SQL** ```bash -python binlog2sql.py --flashback -h127.0.0.1 -P3306 -uadmin -p'admin' -d dbname -t table1 table2 --start-file='mysql-bin.000002' --start-pos=1240 +python binlog2sql.py --flashback -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -ttest4 --start-file='mysql-bin.000002' --start-pos=1079 --end-pos=1247 输出: -INSERT INTO c(`id`, `name`) VALUES (0, 'b'); -UPDATE d SET `did`=18, `updateTime`='2016-12-07 14:01:14', `uid`=4 WHERE `did`=17 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 LIMIT 1; -DELETE FROM c WHERE `id`=0 AND `name`='b' LIMIT 1; -DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 LIMIT 1; +DELETE FROM `test`.`test4` WHERE `data`='World' AND `id`=1 AND `data2`='Hello' LIMIT 1; #start 1079 end 1247 ``` ###选项 **mysql连接配置** @@ -63,11 +59,11 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 **解析模式** ---realtime 持续同步binlog。可选。不加则同步至执行命令时最新的binlog位置。 +--stop-never 持续同步binlog。可选。不加则同步至执行命令时最新的binlog位置。 --popPk 对INSERT语句去除主键。可选。 --B, --flashback 生成回滚语句。可选。与realtime或popPk不能同时添加。 +-B, --flashback 生成回滚语句。可选。与stop-never或popPk不能同时添加。 **解析范围控制** @@ -75,9 +71,9 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 --start-pos start-file的起始解析位置。可选。默认为start-file的起始位置; ---end-file 末尾解析文件。可选。默认为start-file同一个文件。若解析模式为realtime,此选项失效。 +--end-file 末尾解析文件。可选。默认为start-file同一个文件。若解析模式为stop-never,此选项失效。 ---end-pos end-file的末尾解析位置。可选。默认为end-file的最末位置;若解析模式为realtime,此选项失效。 +--end-pos end-file的末尾解析位置。可选。默认为end-file的最末位置;若解析模式为stop-never,此选项失效。 **对象过滤** @@ -87,7 +83,42 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 ###应用案例 -**主从切换后数据不一致的修复**,详细描述可参见[example/FixOldMasterExtraData.md](./example/FixOldMasterExtraData.md) +#### **案例一 跑错SQL,需要紧急回滚** + +**背景**:误删test库c表的数据,需要紧急回滚。 + +**步骤**: + +1. 定位误操作的SQL位置 + + ```bash + $ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t c --start-file='mysql-bin.000002' + + 输出: + INSERT INTO `test`.`c`(`id`, `name`) VALUES (0, 'b'); #start 310 end 459 + DELETE FROM `test`.`c` WHERE `id`=0 AND `name`='b' LIMIT 1; #start 682 end 831 + UPDATE `test`.`c` SET `id`=3, `name`='b' WHERE `id`=3 AND `name`='a' LIMIT 1; #start 858 end 1015 + ``` +2. 查看回滚sql是否正确 + + ```bash + $ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t c --start-file='mysql-bin.000002' --start-pos=682 --end-pos=831 -B + + 输出: + INSERT INTO `test`.`c`(`id`, `name`) VALUES (0, 'b'); #start 682 end 831 + ``` +3. 执行回滚语句 + + ```bash + $ python binlog2sql.py -h127.0.0.1 -P3306 -uadmin -p'admin' -dtest -t c --start-file='mysql-bin.000002' --start-pos=682 --end-pos=831 -B | mysql -h127.0.0.1 -P3306 -uadmin -p'admin' + + 回滚成功 + ``` + + +**案例二 主从切换后数据不一致的修复** + +详细描述可参见[example/FixOldMasterExtraData.md](./example/FixOldMasterExtraData.md) 1. 提取old master未同步的数据,并对其中的insert语句去除主键(为了防止步骤3中出现主键冲突) @@ -117,11 +148,13 @@ DELETE FROM d WHERE `did`=18 AND `updateTime`='2016-12-07 14:01:14' AND `uid`=4 * MySQL 5.6 ###优点(对比mysqlbinlog) + * 纯Python开发,安装与使用都很简单 * 自带flashback、popPk解析模式,无需再装补丁 * 解析为标准SQL,方便理解、调试 * 代码容易改造,可以支持更多个性化解析 + ###联系我 有任何问题,请与我联系 [danfengcao.info@gmail.com](danfengcao.info@gmail.com) diff --git a/binlog2sql/binlog2sql.py b/binlog2sql/binlog2sql.py index 0060629..ca8aa1d 100755 --- a/binlog2sql/binlog2sql.py +++ b/binlog2sql/binlog2sql.py @@ -9,7 +9,7 @@ from pymysqlreplication.row_event import ( UpdateRowsEvent, DeleteRowsEvent, ) -from pymysqlreplication.event import QueryEvent +from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent def command_line_parser(): """Returns a command line parser used for binlog2sql""" @@ -33,8 +33,8 @@ def command_line_parser(): help="End binlog file to be parsed. default: '--start-file'", default='') range.add_argument('--end-pos', dest='endPos', type=int, help="stop position of end binlog file. default: end position of '--end-file'", default=0) - parser.add_argument('--realtime', dest='realtime', action='store_true', - help='continuously replicate binlog. default: stop replicate when meeting the latest binlog when you run the program', default=False) + parser.add_argument('--stop-never', dest='stopnever', action='store_true', + help='Wait for more data from the server. default: stop replicate at the last binlog when you start binlog2sql', default=False) parser.add_argument('--help', dest='help', action='store_true', help='help infomation', default=False) @@ -57,8 +57,8 @@ def command_line_args(): if args.help: parser.print_help() sys.exit(1) - if args.flashback and args.realtime: - raise ValueError('only one of flashback or realtime can be True') + if args.flashback and args.stopnever: + raise ValueError('only one of flashback or stop-never can be True') if args.flashback and args.popPk: raise ValueError('only one of flashback or popPk can be True') return args @@ -75,7 +75,7 @@ def fix_object(value): else: return value -def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, popPk=False): +def concat_sql_from_binlogevent(cursor, binlogevent, row=None, eStartPos=None, flashback=False, popPk=False): if flashback and popPk: raise ValueError('only one of flashback or popPk can be True') if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent): @@ -84,21 +84,21 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, sql = '' if flashback is True: if isinstance(binlogevent, WriteRowsEvent): - template = 'DELETE FROM {0} WHERE {1} LIMIT 1;'.format( - binlogevent.table, + template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( + binlogevent.schema, binlogevent.table, ' AND '.join(map(compare_items, row['values'].items())) ) sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, DeleteRowsEvent): - template = 'INSERT INTO {0}({1}) VALUES ({2});'.format( - binlogevent.table, + template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format( + binlogevent.schema, binlogevent.table, ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, UpdateRowsEvent): - template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format( - binlogevent.table, + template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format( + binlogevent.schema, binlogevent.table, ', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]), ' AND '.join(map(compare_items, row['after_values'].items()))) sql = cursor.mogrify(template, map(fix_object, row['before_values'].values()+row['after_values'].values())) @@ -108,21 +108,21 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, tableInfo = (binlogevent.table_map)[binlogevent.table_id] if tableInfo.primary_key: row['values'].pop(tableInfo.primary_key) - template = 'INSERT INTO {0}({1}) VALUES ({2});'.format( - binlogevent.table, + template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format( + binlogevent.schema, binlogevent.table, ', '.join(map(lambda k: '`%s`'%k, row['values'].keys())), ', '.join(['%s'] * len(row['values'])) ) sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, DeleteRowsEvent): - template ='DELETE FROM {0} WHERE {1} LIMIT 1;'.format( - binlogevent.table, + template ='DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format( + binlogevent.schema, binlogevent.table, ' AND '.join(map(compare_items, row['values'].items())) ) sql = cursor.mogrify(template, map(fix_object, row['values'].values())) elif isinstance(binlogevent, UpdateRowsEvent): - template = 'UPDATE {0} SET {1} WHERE {2} LIMIT 1;'.format( - binlogevent.table, + template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format( + binlogevent.schema, binlogevent.table, ', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]), ' AND '.join(map(compare_items, row['before_values'].items())) ) @@ -131,13 +131,15 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False, sql ='USE {0};\n{1};'.format( binlogevent.schema, fix_object(binlogevent.query) ) + if type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent): + sql += ' #start %s end %s' % (eStartPos, binlogevent.packet.log_pos) return sql class Binlog2sql(object): def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, - endPos=None, only_schemas=None, only_tables=None, popPk=False, flashback=False, realtime=False): + endPos=None, only_schemas=None, only_tables=None, popPk=False, flashback=False, stopnever=False): ''' connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave} ''' @@ -154,7 +156,7 @@ class Binlog2sql(object): self.only_tables = only_tables if only_tables else None self.popPk = popPk self.flashback = flashback - self.realtime = realtime + self.stopnever = stopnever self.binlogList = [] self.connection = pymysql.connect(**self.connectionSettings) @@ -190,31 +192,37 @@ class Binlog2sql(object): tmpFile = 'tmp.%s.%s.tmp' % (self.connectionSettings['host'],self.connectionSettings['port']) # to simplify code, we do not use file lock for tmpFile. ftmp = open(tmpFile ,"w") flagLastEvent = False + eStartPos = stream.log_pos + lastPos = stream.log_pos try: for binlogevent in stream: - if not self.realtime: + if not self.stopnever: if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos): flagLastEvent = True - elif not self.endPos: - if stream.log_file not in self.binlogList: - break - elif (stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos): + elif stream.log_file not in self.binlogList: break - else: - raise ValueError('unknown binlog file or position') + elif (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos): + break + # else: + # raise ValueError('unknown binlog file or position') + + if isinstance(binlogevent, QueryEvent) and binlogevent.query == 'BEGIN': + eStartPos = lastPos if isinstance(binlogevent, QueryEvent): - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, popPk=self.popPk) + sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, popPk=self.popPk) if sql: print sql elif type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent): for row in binlogevent.rows: - sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk) + sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, popPk=self.popPk, eStartPos=eStartPos) if self.flashback: ftmp.write(sql + '\n') else: print sql + if type(binlogevent) not in (RotateEvent, FormatDescriptionEvent): + lastPos = binlogevent.packet.log_pos if flagLastEvent: break ftmp.close() @@ -239,5 +247,5 @@ if __name__ == '__main__': connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password} binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile, startPos=args.startPos, endFile=args.endFile, endPos=args.endPos, - only_schemas=args.databases, only_tables=args.tables, popPk=args.popPk, flashback=args.flashback, realtime=args.realtime) + only_schemas=args.databases, only_tables=args.tables, popPk=args.popPk, flashback=args.flashback, stopnever=args.stopnever) binlog2sql.process_binlog() diff --git a/example/FixOldMasterExtraData.md b/example/FixOldMasterExtraData.md index f951ce6..e434ba6 100644 --- a/example/FixOldMasterExtraData.md +++ b/example/FixOldMasterExtraData.md @@ -57,7 +57,7 @@ $ mysql -h10.1.1.2 -P3306 -uadmin -p'admin' < oldMaster.sql 如果表的主键id是有业务含义的,则务必与业务方确认可行后再操作。 -###参考文献 +###参考资料 [1] 易固武, [MySQL数据库的高可用性分析](https://www.qcloud.com/community/article/203) -[2] 曹单锋, [Parse MySQL binlog to SQL you want](https://github.com/danfengcao/binlog2sql) \ No newline at end of file +[2] danfengcao, [binlog2sql: Parse MySQL binlog to SQL you want](https://github.com/danfengcao/binlog2sql) \ No newline at end of file