support python3

This commit is contained in:
dfcao 2017-12-15 10:31:16 +08:00
parent f430ecc8ef
commit 9a14642281
4 changed files with 60 additions and 38 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
*~
.idea/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

View File

@ -16,7 +16,7 @@ binlog2sql
正常维护。应用于大众点评线上环境。线上环境的操作请在对MySQL**相当熟悉**的同学指导下进行 正常维护。应用于大众点评线上环境。线上环境的操作请在对MySQL**相当熟悉**的同学指导下进行
* 已测试环境 * 已测试环境
* Python 2.6, 2.7 * Python 2.6, 2.7, 3.4
* MySQL 5.6 * MySQL 5.6

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os
import sys import sys
import datetime import datetime
import pymysql import pymysql
@ -12,7 +11,8 @@ from pymysqlreplication.row_event import (
DeleteRowsEvent, DeleteRowsEvent,
) )
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, reversed_lines from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, \
temp_open, print_rollback_sql
class Binlog2sql(object): class Binlog2sql(object):
@ -35,7 +35,7 @@ class Binlog2sql(object):
if start_time: if start_time:
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
else: else:
self.start_time = datetime.datetime.strptime('1970-01-01 00:00:00', "%Y-%m-%d %H:%M:%S") self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
if stop_time: if stop_time:
self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S") self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")
else: else:
@ -69,19 +69,21 @@ class Binlog2sql(object):
log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas, log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,
only_tables=self.only_tables, resume_stream=True) only_tables=self.only_tables, resume_stream=True)
cursor = self.connection.cursor()
# to simplify code, we do not use flock for tmp_file.
tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))
f_tmp = open(tmp_file, "w")
flag_last_event = False flag_last_event = False
e_start_pos, last_pos = stream.log_pos, stream.log_pos e_start_pos, last_pos = stream.log_pos, stream.log_pos
try: # to simplify code, we do not use flock for tmp_file.
tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))
with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor:
for binlog_event in stream: for binlog_event in stream:
if not self.stop_never: if not self.stop_never:
try:
event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
except OSError:
event_time = datetime.datetime(1980, 1, 1, 0, 0)
if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \ if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos == self.eof_pos): (stream.log_file == self.eof_file and stream.log_pos == self.eof_pos):
flag_last_event = True flag_last_event = True
elif datetime.datetime.fromtimestamp(binlog_event.timestamp) < self.start_time: elif event_time < self.start_time:
if not (isinstance(binlog_event, RotateEvent) if not (isinstance(binlog_event, RotateEvent)
or isinstance(binlog_event, FormatDescriptionEvent)): or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos last_pos = binlog_event.packet.log_pos
@ -89,7 +91,7 @@ class Binlog2sql(object):
elif (stream.log_file not in self.binlogList) or \ elif (stream.log_file not in self.binlogList) or \
(self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \ (self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \ (stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \
(datetime.datetime.fromtimestamp(binlog_event.timestamp) >= self.stop_time): (event_time >= self.stop_time):
break break
# else: # else:
# raise ValueError('unknown binlog file or position') # raise ValueError('unknown binlog file or position')
@ -116,36 +118,18 @@ class Binlog2sql(object):
last_pos = binlog_event.packet.log_pos last_pos = binlog_event.packet.log_pos
if flag_last_event: if flag_last_event:
break break
stream.close()
f_tmp.close() f_tmp.close()
if self.flashback: if self.flashback:
self.print_rollback_sql(filename=tmp_file) print_rollback_sql(filename=tmp_file)
finally:
os.remove(tmp_file)
cursor.close()
stream.close()
return True return True
@staticmethod
def print_rollback_sql(filename):
"""print rollback sql from tmp_file"""
with open(filename) as f_tmp:
sleep_interval = 1000
i = 0
for line in reversed_lines(f_tmp):
print(line.rstrip())
if i >= sleep_interval:
print('SELECT SLEEP(1);')
i = 0
else:
i += 1
def __del__(self): def __del__(self):
pass pass
if __name__ == '__main__': if __name__ == '__main__':
args = command_line_args(sys.argv[1:]) args = command_line_args(sys.argv[1:])
conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'} conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}
binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos, binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,

View File

@ -5,6 +5,7 @@ import os
import sys import sys
import argparse import argparse
import datetime import datetime
from contextlib import contextmanager
from pymysqlreplication.row_event import ( from pymysqlreplication.row_event import (
WriteRowsEvent, WriteRowsEvent,
UpdateRowsEvent, UpdateRowsEvent,
@ -12,6 +13,11 @@ from pymysqlreplication.row_event import (
) )
from pymysqlreplication.event import QueryEvent from pymysqlreplication.event import QueryEvent
if sys.version > '3':
PY3PLUS = True
else:
PY3PLUS = False
def is_valid_datetime(string): def is_valid_datetime(string):
try: try:
@ -33,6 +39,16 @@ def create_unique_file(filename):
return result_file return result_file
@contextmanager
def temp_open(filename, mode):
f = open(filename, mode)
try:
yield f
finally:
f.close()
os.remove(filename)
def parse_args(): def parse_args():
"""parse args for binlog2sql""" """parse args for binlog2sql"""
@ -105,8 +121,9 @@ def command_line_args(args):
return args return args
def compare_items((k, v)): def compare_items(items):
#caution: if v is NULL, may need to process # caution: if v is NULL, may need to process
(k, v) = items
if v is None: if v is None:
return '`%s` IS %%s' % k return '`%s` IS %%s' % k
else: else:
@ -115,7 +132,9 @@ def compare_items((k, v)):
def fix_object(value): def fix_object(value):
"""Fixes python objects so that they can be properly inserted into SQL queries""" """Fixes python objects so that they can be properly inserted into SQL queries"""
if isinstance(value, unicode): if PY3PLUS and isinstance(value, bytes):
return value.decode('utf-8')
elif not PY3PLUS and isinstance(value, unicode):
return value.encode('utf-8') return value.encode('utf-8')
else: else:
return value return value
@ -166,7 +185,7 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False):
binlog_event.schema, binlog_event.table, binlog_event.schema, binlog_event.table,
', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]), ', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]),
' AND '.join(map(compare_items, row['after_values'].items()))) ' AND '.join(map(compare_items, row['after_values'].items())))
values = map(fix_object, row['before_values'].values()+row['after_values'].values()) values = map(fix_object, list(row['before_values'].values())+list(row['after_values'].values()))
else: else:
if isinstance(binlog_event, WriteRowsEvent): if isinstance(binlog_event, WriteRowsEvent):
if no_pk: if no_pk:
@ -193,15 +212,31 @@ def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False):
', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]), ', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]),
' AND '.join(map(compare_items, row['before_values'].items())) ' AND '.join(map(compare_items, row['before_values'].items()))
) )
values = map(fix_object, row['after_values'].values()+row['before_values'].values()) values = map(fix_object, list(row['after_values'].values())+list(row['before_values'].values()))
return {'template': template, 'values': values} return {'template': template, 'values': list(values)}
def print_rollback_sql(filename):
"""print rollback sql from tmp_file"""
with open(filename, "rb") as f_tmp:
sleep_interval = 1000
i = 0
for line in reversed_lines(f_tmp):
print(line.rstrip())
if i >= sleep_interval:
print('SELECT SLEEP(1);')
i = 0
else:
i += 1
def reversed_lines(fin): def reversed_lines(fin):
"""Generate the lines of file in reverse order.""" """Generate the lines of file in reverse order."""
part = '' part = ''
for block in reversed_blocks(fin): for block in reversed_blocks(fin):
if PY3PLUS:
block = block.decode("utf-8")
for c in reversed(block): for c in reversed(block):
if c == '\n' and part: if c == '\n' and part:
yield part[::-1] yield part[::-1]