This commit is contained in:
dfcao 2019-02-04 15:10:18 +08:00
parent 6dc29a45c1
commit 807e910391
2 changed files with 31 additions and 33 deletions

View File

@ -26,22 +26,23 @@ class Binlog2sql(object):
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
else:
self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
if stop_time:
self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")
else:
self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
self.conn = pymysql.connect(**self.conn_setting)
self.conn = pymysql.connect(**connection_settings)
self.conn.autocommit(True)
self.instance = "{0}:{1}".format(connection_settings['ip'], connection_settings['port'])
self.instance = "{0}:{1}".format(connection_settings['host'], connection_settings['port'])
self.eof_file, self.eof_pos = self.get_master_status()
self.end_file = end_file if end_file else start_file
self.end_pos = end_pos
self.binlog_list = self.get_binlog_list(start_file, self.end_file)
server_id = self.get_server_id()
start_pos = start_pos if start_pos else 4 # use binlog v4
self.stream = BinLogStreamReader(connection_settings=conn_setting, server_id=server_id,
self.stream = BinLogStreamReader(connection_settings=connection_settings, server_id=server_id,
log_file=start_file, log_pos=start_pos, only_schemas=only_schemas,
only_tables=only_tables, resume_stream=True, blocking=True)
@ -76,17 +77,18 @@ class Binlog2sql(object):
return server_id
def position(self, log_file, log_pos, event_time):
"""position compared to destination"""
position = 'unknown'
"""position compared to start-stop section"""
if (log_file not in self.binlog_list) or (event_time >= self.stop_time) or \
(self.end_pos and log_file == self.end_file and log_pos > self.end_pos) or \
(log_file == self.eof_file and log_pos > self.eof_pos):
position = 'after'
elif (log_file == self.end_file and log_pos == self.end_pos) or \
(log_file == self.eof_file and log_pos == self.eof_pos):
position = 'in'
position = 'end'
elif event_time < self.start_time:
position = 'before'
elif event_time >= self.start_time:
position = 'in'
return position
def process_binlog(self):
@ -98,27 +100,26 @@ class Binlog2sql(object):
with temp_open(tmp_file, "w") as f_tmp, self.conn as cursor:
for binlog_event in self.stream:
print(binlog_event)
log_file = self.stream.log_file
log_pos = self.stream.log_pos
if not self.stop_never:
try:
event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
except OSError:
event_time = datetime.datetime(1980, 1, 1, 0, 0)
try:
event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
except OSError:
event_time = datetime.datetime(1980, 1, 1, 0, 0)
position = self.position(log_file, log_pos, event_time)
if position == 'after':
break
elif position == 'in':
flag_last_event = True
elif position == 'before':
if not (isinstance(binlog_event, RotateEvent)
or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos
continue
# else:
# raise ValueError('unknown binlog file or position')
position = self.position(log_file, log_pos, event_time)
if position == 'after' and not self.stop_never:
break
elif position == 'end' and not self.stop_never:
flag_last_event = True
elif position == 'before':
if not (isinstance(binlog_event, RotateEvent)
or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos
continue
# 此处有问题updateevent变成了queryevent
if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
e_start_pos = last_pos
@ -141,7 +142,6 @@ class Binlog2sql(object):
if flag_last_event:
break
f_tmp.close()
if self.flashback:
self.print_rollback_sql(filename=tmp_file)
return True
@ -156,7 +156,7 @@ class Binlog2sql(object):
if i >= batch_size:
i = 0
if self.back_interval:
print('SELECT SLEEP(%s);' % self.back_interval)
print('SELECT SLEEP({0});'.format(self.back_interval))
else:
i += 1
@ -164,6 +164,7 @@ class Binlog2sql(object):
self.stream.close()
self.conn.close()
if __name__ == '__main__':
args = command_line_args(sys.argv[1:])
conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}

View File

@ -147,10 +147,9 @@ def fix_object(value):
def is_dml_event(event):
if isinstance(event, WriteRowsEvent) or isinstance(event, UpdateRowsEvent) or isinstance(event, DeleteRowsEvent):
if event_type(event) in ['INSERT', 'UPDATE', 'DELETE']:
return True
else:
return False
return False
def event_type(event):
@ -177,13 +176,11 @@ def remove_dropped_col(row):
def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False):
if flashback and no_pk:
raise ValueError('only one of flashback or no_pk can be True')
if not (isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent)
or isinstance(binlog_event, DeleteRowsEvent) or isinstance(binlog_event, QueryEvent)):
raise ValueError('binlog_event must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent')
if not (is_dml_event(binlog_event) or isinstance(binlog_event, QueryEvent)):
raise ValueError('binlog_event must be dml_event or QueryEvent')
sql = ''
if isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) \
or isinstance(binlog_event, DeleteRowsEvent):
if is_dml_event(binlog_event):
remove_dropped_col(row)
pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk)
sql = cursor.mogrify(pattern['template'], pattern['values'])