Update binlog2sql.py

Key changes to the code include the use of more readable variable names, such as replacing `conn_setting` with `conn_settings` for better clarity. Helper functions like `_parse_time`, `_validate_binlog`, and `_binlog_num` were introduced to reduce repetitive code and improve the overall structure. In-line comments have been added to explain complex sections, enhancing readability and maintainability. Additionally, the logic was simplified by breaking down complex tasks into manageable parts using helper methods, such as handling timestamps and checking the last event in the binlog stream. This results in cleaner, more organized code.
This commit is contained in:
Imran Imtiaz 2024-09-22 07:56:46 +04:00 committed by GitHub
parent 5a8e65c432
commit 6657e02e07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 127 additions and 104 deletions

View File

@ -1,150 +1,173 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import datetime
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.event import QueryEvent, RotateEvent, FormatDescriptionEvent
from binlog2sql_util import command_line_args, concat_sql_from_binlog_event, create_unique_file, temp_open, \
reversed_lines, is_dml_event, event_type
class Binlog2sql(object):
from binlog2sql_util import (
command_line_args, concat_sql_from_binlog_event, create_unique_file,
temp_open, reversed_lines, is_dml_event, event_type
)
class Binlog2sql:
def __init__(self, connection_settings, start_file=None, start_pos=None, end_file=None, end_pos=None,
start_time=None, stop_time=None, only_schemas=None, only_tables=None, no_pk=False,
flashback=False, stop_never=False, back_interval=1.0, only_dml=True, sql_type=None):
"""
conn_setting: {'host': 127.0.0.1, 'port': 3306, 'user': user, 'passwd': passwd, 'charset': 'utf8'}
Initialize Binlog2sql with connection settings and filtering options.
connection_settings: Dictionary with keys like host, port, user, passwd, charset.
"""
if not start_file:
raise ValueError('Lack of parameter: start_file')
raise ValueError('Missing parameter: start_file is required.')
self.conn_setting = connection_settings
self.conn_settings = connection_settings
self.start_file = start_file
self.start_pos = start_pos if start_pos else 4 # use binlog v4
self.end_file = end_file if end_file else start_file
self.start_pos = start_pos or 4 # default binlog position is 4 for v4 binlog
self.end_file = end_file or start_file
self.end_pos = end_pos
if start_time:
self.start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
else:
self.start_time = datetime.datetime.strptime('1980-01-01 00:00:00', "%Y-%m-%d %H:%M:%S")
if stop_time:
self.stop_time = datetime.datetime.strptime(stop_time, "%Y-%m-%d %H:%M:%S")
else:
self.stop_time = datetime.datetime.strptime('2999-12-31 00:00:00', "%Y-%m-%d %H:%M:%S")
self.start_time = self._parse_time(start_time, default='1980-01-01 00:00:00')
self.stop_time = self._parse_time(stop_time, default='2999-12-31 00:00:00')
self.only_schemas = only_schemas if only_schemas else None
self.only_tables = only_tables if only_tables else None
self.no_pk, self.flashback, self.stop_never, self.back_interval = (no_pk, flashback, stop_never, back_interval)
self.only_schemas = only_schemas
self.only_tables = only_tables
self.no_pk = no_pk
self.flashback = flashback
self.stop_never = stop_never
self.back_interval = back_interval
self.only_dml = only_dml
self.sql_type = [t.upper() for t in sql_type] if sql_type else []
self.sql_type = [t.upper() for t in (sql_type or [])]
self.binlogList = []
self.connection = pymysql.connect(**self.conn_setting)
with self.connection as cursor:
self.binlog_list = []
self.connection = pymysql.connect(**self.conn_settings)
# Validate connection and binlog parameters
self._validate_binlog()
def _parse_time(self, time_str, default):
"""Parse a datetime string or return a default datetime."""
if time_str:
return datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return datetime.datetime.strptime(default, "%Y-%m-%d %H:%M:%S")
def _validate_binlog(self):
"""Fetch binlog and server metadata to validate start_file and positions."""
with self.connection.cursor() as cursor:
cursor.execute("SHOW MASTER STATUS")
self.eof_file, self.eof_pos = cursor.fetchone()[:2]
cursor.execute("SHOW MASTER LOGS")
bin_index = [row[0] for row in cursor.fetchall()]
if self.start_file not in bin_index:
raise ValueError('parameter error: start_file %s not in mysql server' % self.start_file)
binlog2i = lambda x: x.split('.')[1]
for binary in bin_index:
if binlog2i(self.start_file) <= binlog2i(binary) <= binlog2i(self.end_file):
self.binlogList.append(binary)
raise ValueError(f'start_file {self.start_file} is not found in the MySQL server.')
self.binlog_list = [
binlog for binlog in bin_index
if self._binlog_num(self.start_file) <= self._binlog_num(binlog) <= self._binlog_num(self.end_file)
]
cursor.execute("SELECT @@server_id")
self.server_id = cursor.fetchone()[0]
if not self.server_id:
raise ValueError('missing server_id in %s:%s' % (self.conn_setting['host'], self.conn_setting['port']))
raise ValueError(f'Missing server_id for {self.conn_settings["host"]}:{self.conn_settings["port"]}')
@staticmethod
def _binlog_num(binlog_file):
"""Extract binlog number from binlog filename."""
return binlog_file.split('.')[1]
def process_binlog(self):
stream = BinLogStreamReader(connection_settings=self.conn_setting, server_id=self.server_id,
log_file=self.start_file, log_pos=self.start_pos, only_schemas=self.only_schemas,
only_tables=self.only_tables, resume_stream=True, blocking=True)
"""Process the binlog and output SQL statements."""
stream = BinLogStreamReader(
connection_settings=self.conn_settings, server_id=self.server_id,
log_file=self.start_file, log_pos=self.start_pos,
only_schemas=self.only_schemas, only_tables=self.only_tables,
resume_stream=True, blocking=True
)
flag_last_event = False
e_start_pos, last_pos = stream.log_pos, stream.log_pos
# to simplify code, we do not use flock for tmp_file.
tmp_file = create_unique_file('%s.%s' % (self.conn_setting['host'], self.conn_setting['port']))
with temp_open(tmp_file, "w") as f_tmp, self.connection as cursor:
for binlog_event in stream:
last_event_pos = stream.log_pos
tmp_file = create_unique_file(f"{self.conn_settings['host']}.{self.conn_settings['port']}")
with temp_open(tmp_file, "w") as tmp_output, self.connection.cursor() as cursor:
for event in stream:
# Handle the stop condition and time boundaries
if not self.stop_never:
try:
event_time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
except OSError:
event_time = datetime.datetime(1980, 1, 1, 0, 0)
if (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos == self.eof_pos):
flag_last_event = True
elif event_time < self.start_time:
if not (isinstance(binlog_event, RotateEvent)
or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos
continue
elif (stream.log_file not in self.binlogList) or \
(self.end_pos and stream.log_file == self.end_file and stream.log_pos > self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos > self.eof_pos) or \
(event_time >= self.stop_time):
event_time = self._get_event_time(event)
if self._is_last_event(stream, event) or event_time >= self.stop_time:
break
# else:
# raise ValueError('unknown binlog file or position')
if event_time < self.start_time:
if not isinstance(event, (RotateEvent, FormatDescriptionEvent)):
last_event_pos = event.packet.log_pos
continue
if isinstance(binlog_event, QueryEvent) and binlog_event.query == 'BEGIN':
e_start_pos = last_pos
# Capture DML and non-DML events
if isinstance(event, QueryEvent) and event.query == 'BEGIN':
e_start_pos = last_event_pos
if isinstance(binlog_event, QueryEvent) and not self.only_dml:
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event,
flashback=self.flashback, no_pk=self.no_pk)
if sql:
print(sql)
elif is_dml_event(binlog_event) and event_type(binlog_event) in self.sql_type:
for row in binlog_event.rows:
sql = concat_sql_from_binlog_event(cursor=cursor, binlog_event=binlog_event, no_pk=self.no_pk,
row=row, flashback=self.flashback, e_start_pos=e_start_pos)
if self.flashback:
f_tmp.write(sql + '\n')
else:
print(sql)
self._handle_event(event, cursor, tmp_output, e_start_pos, last_event_pos)
if not (isinstance(binlog_event, RotateEvent) or isinstance(binlog_event, FormatDescriptionEvent)):
last_pos = binlog_event.packet.log_pos
if flag_last_event:
break
if not isinstance(event, (RotateEvent, FormatDescriptionEvent)):
last_event_pos = event.packet.log_pos
stream.close()
f_tmp.close()
if self.flashback:
self.print_rollback_sql(filename=tmp_file)
return True
stream.close()
def print_rollback_sql(self, filename):
"""print rollback sql from tmp_file"""
with open(filename, "rb") as f_tmp:
batch_size = 1000
i = 0
for line in reversed_lines(f_tmp):
print(line.rstrip())
if i >= batch_size:
i = 0
if self.back_interval:
print('SELECT SLEEP(%s);' % self.back_interval)
if self.flashback:
self._print_rollback_sql(tmp_file)
def _get_event_time(self, event):
"""Extract the event timestamp."""
try:
return datetime.datetime.fromtimestamp(event.timestamp)
except OSError:
return datetime.datetime(1980, 1, 1)
def _is_last_event(self, stream, event):
"""Check if the current event is the last one based on log file and position."""
return (stream.log_file == self.end_file and stream.log_pos == self.end_pos) or \
(stream.log_file == self.eof_file and stream.log_pos == self.eof_pos)
def _handle_event(self, event, cursor, tmp_output, e_start_pos, last_event_pos):
"""Process and print SQL for relevant events."""
if isinstance(event, QueryEvent) and not self.only_dml:
sql = concat_sql_from_binlog_event(cursor, event, flashback=self.flashback, no_pk=self.no_pk)
if sql:
print(sql)
elif is_dml_event(event) and event_type(event) in self.sql_type:
for row in event.rows:
sql = concat_sql_from_binlog_event(
cursor, event, row=row, flashback=self.flashback,
no_pk=self.no_pk, e_start_pos=e_start_pos
)
if self.flashback:
tmp_output.write(sql + '\n')
else:
i += 1
print(sql)
def _print_rollback_sql(self, filename):
"""Print rollback SQL from a temporary file."""
with open(filename, "rb") as tmp_file:
batch_size = 1000
for i, line in enumerate(reversed_lines(tmp_file)):
print(line.rstrip())
if i % batch_size == 0 and i > 0:
if self.back_interval:
print(f'SELECT SLEEP({self.back_interval});')
def __del__(self):
"""Cleanup resources if necessary."""
pass
if __name__ == '__main__':
args = command_line_args(sys.argv[1:])
conn_setting = {'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, 'charset': 'utf8'}
binlog2sql = Binlog2sql(connection_settings=conn_setting, start_file=args.start_file, start_pos=args.start_pos,
end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,
stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,
no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never,
back_interval=args.back_interval, only_dml=args.only_dml, sql_type=args.sql_type)
conn_settings = {
'host': args.host, 'port': args.port, 'user': args.user,
'passwd': args.password, 'charset': 'utf8'
}
binlog2sql = Binlog2sql(
connection_settings=conn_settings, start_file=args.start_file, start_pos=args.start_pos,
end_file=args.end_file, end_pos=args.end_pos, start_time=args.start_time,
stop_time=args.stop_time, only_schemas=args.databases, only_tables=args.tables,
no_pk=args.no_pk, flashback=args.flashback, stop_never=args.stop_never,
back_interval=args.back_interval, only_dml=args.only_dml, sql_type=args.sql_type
)
binlog2sql.process_binlog()