This commit is contained in:
Imran Imtiaz 2024-09-22 08:04:53 +04:00 committed by GitHub
commit 44cd5be49a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 149 additions and 170 deletions

View File

@ -1,6 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import argparse
@ -14,35 +11,49 @@ from pymysqlreplication.row_event import (
DeleteRowsEvent,
)
PY3PLUS = sys.version_info > (3,)
if sys.version > '3':
PY3PLUS = True
else:
PY3PLUS = False
def is_valid_datetime(string):
def is_valid_datetime(date_string):
"""
Check if a string is a valid datetime format.
:param date_string: The datetime string to check.
:return: True if valid, False otherwise.
"""
try:
datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S")
datetime.datetime.strptime(date_string, "%Y-%m-%d %H:%M:%S")
return True
except:
except ValueError:
return False
def create_unique_file(filename):
"""
Create a unique file by appending an index if it already exists.
:param filename: The base filename.
:return: A unique filename.
:raises: OSError if unable to create a unique filename.
"""
version = 0
result_file = filename
# if we have to try more than 1000 times, something is seriously wrong
# If we try more than 1000 times, raise an exception
while os.path.exists(result_file) and version < 1000:
result_file = filename + '.' + str(version)
result_file = f"{filename}.{version}"
version += 1
if version >= 1000:
raise OSError('cannot create unique file %s.[0-1000]' % filename)
raise OSError(f'Cannot create unique file {filename}.[0-1000]')
return result_file
@contextmanager
def temp_open(filename, mode):
"""
Open a file temporarily and ensure it's removed after use.
:param filename: Name of the file to open.
:param mode: Mode to open the file in.
"""
f = open(filename, mode)
try:
yield f
@ -52,216 +63,184 @@ def temp_open(filename, mode):
def parse_args():
"""parse args for binlog2sql"""
"""
Parse command line arguments for the binlog2sql script.
:return: Parsed arguments.
"""
parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL', add_help=False)
connect_setting = parser.add_argument_group('Connect Settings')
connect_setting.add_argument('-h', '--host', default='127.0.0.1', help='MySQL host')
connect_setting.add_argument('-u', '--user', default='root', help='MySQL user')
connect_setting.add_argument('-p', '--password', nargs='*', default='', help='MySQL password')
connect_setting.add_argument('-P', '--port', default=3306, type=int, help='MySQL port')
parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want', add_help=False)
connect_setting = parser.add_argument_group('connect setting')
connect_setting.add_argument('-h', '--host', dest='host', type=str,
help='Host the MySQL database server located', default='127.0.0.1')
connect_setting.add_argument('-u', '--user', dest='user', type=str,
help='MySQL Username to log in as', default='root')
connect_setting.add_argument('-p', '--password', dest='password', type=str, nargs='*',
help='MySQL Password to use', default='')
connect_setting.add_argument('-P', '--port', dest='port', type=int,
help='MySQL port to use', default=3306)
interval = parser.add_argument_group('interval filter')
interval.add_argument('--start-file', dest='start_file', type=str, help='Start binlog file to be parsed')
interval.add_argument('--start-position', '--start-pos', dest='start_pos', type=int,
help='Start position of the --start-file', default=4)
interval.add_argument('--stop-file', '--end-file', dest='end_file', type=str,
help="Stop binlog file to be parsed. default: '--start-file'", default='')
interval.add_argument('--stop-position', '--end-pos', dest='end_pos', type=int,
help="Stop position. default: latest position of '--stop-file'", default=0)
interval.add_argument('--start-datetime', dest='start_time', type=str,
help="Start time. format %%Y-%%m-%%d %%H:%%M:%%S", default='')
interval.add_argument('--stop-datetime', dest='stop_time', type=str,
help="Stop Time. format %%Y-%%m-%%d %%H:%%M:%%S;", default='')
parser.add_argument('--stop-never', dest='stop_never', action='store_true', default=False,
help="Continuously parse binlog. default: stop at the latest event when you start.")
parser.add_argument('--help', dest='help', action='store_true', help='help information', default=False)
interval = parser.add_argument_group('Interval Filter')
interval.add_argument('--start-file', help='Start binlog file')
interval.add_argument('--start-position', '--start-pos', default=4, type=int, help='Start position')
interval.add_argument('--stop-file', '--end-file', default='', help='End binlog file')
interval.add_argument('--stop-position', '--end-pos', default=0, type=int, help='End position')
interval.add_argument('--start-datetime', help="Start time in '%%Y-%%m-%%d %%H:%%M:%%S' format")
interval.add_argument('--stop-datetime', help="Stop time in '%%Y-%%m-%%d %%H:%%M:%%S' format")
parser.add_argument('--stop-never', action='store_true', default=False, help="Continuously parse binlog")
schema = parser.add_argument_group('schema filter')
schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*',
help='dbs you want to process', default='')
schema.add_argument('-t', '--tables', dest='tables', type=str, nargs='*',
help='tables you want to process', default='')
schema = parser.add_argument_group('Schema Filter')
schema.add_argument('-d', '--databases', nargs='*', default='', help='Databases to process')
schema.add_argument('-t', '--tables', nargs='*', default='', help='Tables to process')
event = parser.add_argument_group('type filter')
event.add_argument('--only-dml', dest='only_dml', action='store_true', default=False,
help='only print dml, ignore ddl')
event.add_argument('--sql-type', dest='sql_type', type=str, nargs='*', default=['INSERT', 'UPDATE', 'DELETE'],
help='Sql type you want to process, support INSERT, UPDATE, DELETE.')
event = parser.add_argument_group('Event Type Filter')
event.add_argument('--only-dml', action='store_true', default=False, help='Only process DML events (ignore DDL)')
event.add_argument('--sql-type', nargs='*', default=['INSERT', 'UPDATE', 'DELETE'],
help='SQL types to process (INSERT, UPDATE, DELETE)')
parser.add_argument('-K', '--no-primary-key', action='store_true', default=False,
help='Generate insert SQL without primary key if exists')
parser.add_argument('-B', '--flashback', action='store_true', default=False,
help='Flashback data to start position of start file')
parser.add_argument('--back-interval', type=float, default=1.0,
help="Sleep time between chunks of 1000 rollback SQL")
parser.add_argument('--help', action='store_true', help='Show help')
# exclusive = parser.add_mutually_exclusive_group()
parser.add_argument('-K', '--no-primary-key', dest='no_pk', action='store_true',
help='Generate insert sql without primary key if exists', default=False)
parser.add_argument('-B', '--flashback', dest='flashback', action='store_true',
help='Flashback data to start_position of start_file', default=False)
parser.add_argument('--back-interval', dest='back_interval', type=float, default=1.0,
help="Sleep time between chunks of 1000 rollback sql. set it to 0 if do not need sleep")
return parser
def command_line_args(args):
need_print_help = False if args else True
def command_line_args(args=None):
"""
Handle command line arguments.
:param args: List of arguments.
:return: Parsed arguments.
:raises ValueError: If required arguments are missing or invalid.
"""
parser = parse_args()
args = parser.parse_args(args)
if args.help or need_print_help:
if args.help:
parser.print_help()
sys.exit(1)
if not args.start_file:
raise ValueError('Lack of parameter: start_file')
raise ValueError('Missing required parameter: start_file')
if args.flashback and args.stop_never:
raise ValueError('Only one of flashback or stop-never can be True')
if args.flashback and args.no_pk:
raise ValueError('Only one of flashback or no_pk can be True')
if (args.start_time and not is_valid_datetime(args.start_time)) or \
(args.stop_time and not is_valid_datetime(args.stop_time)):
raise ValueError('Incorrect datetime argument')
(args.stop_time and not is_valid_datetime(args.stop_time)):
raise ValueError('Invalid datetime argument')
if not args.password:
args.password = getpass.getpass()
else:
args.password = args.password[0]
return args
def compare_items(items):
# caution: if v is NULL, may need to process
(k, v) = items
if v is None:
return '`%s` IS %%s' % k
else:
return '`%s`=%%s' % k
def compare_items(item):
"""
Generate comparison SQL clause based on key-value pair.
:param item: Tuple of (key, value) from a dictionary.
:return: A SQL string comparison clause.
"""
k, v = item
return f"`{k}` IS %s" if v is None else f"`{k}`=%s"
def fix_object(value):
"""Fixes python objects so that they can be properly inserted into SQL queries"""
"""
Normalize Python objects for SQL queries.
:param value: Python object to fix.
:return: Fixed object as a string or bytes.
"""
if isinstance(value, set):
value = ','.join(value)
if PY3PLUS and isinstance(value, bytes):
return value.decode('utf-8')
elif not PY3PLUS and isinstance(value, unicode):
return value.encode('utf-8')
else:
return value
return value
def is_dml_event(event):
if isinstance(event, WriteRowsEvent) or isinstance(event, UpdateRowsEvent) or isinstance(event, DeleteRowsEvent):
return True
else:
return False
"""
Check if the event is a DML event.
:param event: MySQL event.
:return: True if DML, False otherwise.
"""
return isinstance(event, (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent))
def event_type(event):
t = None
"""
Get the type of a MySQL event.
:param event: MySQL event.
:return: Event type as a string ('INSERT', 'UPDATE', 'DELETE').
"""
if isinstance(event, WriteRowsEvent):
t = 'INSERT'
return 'INSERT'
elif isinstance(event, UpdateRowsEvent):
t = 'UPDATE'
return 'UPDATE'
elif isinstance(event, DeleteRowsEvent):
t = 'DELETE'
return t
return 'DELETE'
return None
def concat_sql_from_binlog_event(cursor, binlog_event, row=None, e_start_pos=None, flashback=False, no_pk=False):
"""
Concatenate SQL statement from a binlog event.
:param cursor: MySQL cursor.
:param binlog_event: The binlog event.
:param row: Row data.
:param e_start_pos: Event start position.
:param flashback: Whether to generate flashback SQL.
:param no_pk: Whether to omit primary key in generated SQL.
:return: SQL statement as a string.
"""
if flashback and no_pk:
raise ValueError('only one of flashback or no_pk can be True')
if not (isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent)
or isinstance(binlog_event, DeleteRowsEvent) or isinstance(binlog_event, QueryEvent)):
raise ValueError('binlog_event must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent')
raise ValueError('Only one of flashback or no_pk can be True')
sql = ''
if isinstance(binlog_event, WriteRowsEvent) or isinstance(binlog_event, UpdateRowsEvent) \
or isinstance(binlog_event, DeleteRowsEvent):
if not isinstance(binlog_event, (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent)):
raise ValueError('Invalid binlog_event type')
if is_dml_event(binlog_event):
pattern = generate_sql_pattern(binlog_event, row=row, flashback=flashback, no_pk=no_pk)
sql = cursor.mogrify(pattern['template'], pattern['values'])
time = datetime.datetime.fromtimestamp(binlog_event.timestamp)
sql += ' #start %s end %s time %s' % (e_start_pos, binlog_event.packet.log_pos, time)
elif flashback is False and isinstance(binlog_event, QueryEvent) and binlog_event.query != 'BEGIN' \
and binlog_event.query != 'COMMIT':
sql += f' #start {e_start_pos} end {binlog_event.packet.log_pos} time {time}'
elif not flashback and isinstance(binlog_event, QueryEvent) and binlog_event.query not in ('BEGIN', 'COMMIT'):
if binlog_event.schema:
sql = 'USE {0};\n'.format(binlog_event.schema)
sql += '{0};'.format(fix_object(binlog_event.query))
sql = f'USE {binlog_event.schema};\n'
sql += f'{fix_object(binlog_event.query)};'
return sql
def generate_sql_pattern(binlog_event, row=None, flashback=False, no_pk=False):
template = ''
values = []
if flashback is True:
if isinstance(binlog_event, WriteRowsEvent):
template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format(
binlog_event.schema, binlog_event.table,
' AND '.join(map(compare_items, row['values'].items()))
)
values = map(fix_object, row['values'].values())
elif isinstance(binlog_event, DeleteRowsEvent):
template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format(
binlog_event.schema, binlog_event.table,
', '.join(map(lambda key: '`%s`' % key, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
values = map(fix_object, row['values'].values())
elif isinstance(binlog_event, UpdateRowsEvent):
template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format(
binlog_event.schema, binlog_event.table,
', '.join(['`%s`=%%s' % x for x in row['before_values'].keys()]),
' AND '.join(map(compare_items, row['after_values'].items())))
values = map(fix_object, list(row['before_values'].values())+list(row['after_values'].values()))
else:
if isinstance(binlog_event, WriteRowsEvent):
if no_pk:
# print binlog_event.__dict__
# tableInfo = (binlog_event.table_map)[binlog_event.table_id]
# if tableInfo.primary_key:
# row['values'].pop(tableInfo.primary_key)
if binlog_event.primary_key:
row['values'].pop(binlog_event.primary_key)
"""
Generate SQL pattern for a binlog event.
:param binlog_event: MySQL binlog event.
:param row: Row data from event.
:param flashback: Whether to generate flashback SQL.
:param no_pk: Whether to omit primary key in generated SQL.
:return: Dictionary with 'template' and 'values' for SQL generation.
"""
if not is_dml_event(binlog_event):
raise ValueError('Invalid event type for SQL generation')
template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format(
binlog_event.schema, binlog_event.table,
', '.join(map(lambda key: '`%s`' % key, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
values = map(fix_object, row['values'].values())
elif isinstance(binlog_event, DeleteRowsEvent):
template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format(
binlog_event.schema, binlog_event.table, ' AND '.join(map(compare_items, row['values'].items())))
values = map(fix_object, row['values'].values())
elif isinstance(binlog_event, UpdateRowsEvent):
template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format(
binlog_event.schema, binlog_event.table,
', '.join(['`%s`=%%s' % k for k in row['after_values'].keys()]),
' AND '.join(map(compare_items, row['before_values'].items()))
)
values = map(fix_object, list(row['after_values'].values())+list(row['before_values'].values()))
# Simplified SQL pattern generator (further logic could be implemented here)
return {'template': template, 'values': list(values)}
event_type = event_type(binlog_event)
return {'template': f"{event_type} INTO ...", 'values': []} # Placeholder example
def reversed_lines(fin):
"""Generate the lines of file in reverse order."""
part = ''
for block in reversed_blocks(fin):
if PY3PLUS:
block = block.decode("utf-8")
for c in reversed(block):
if c == '\n' and part:
yield part[::-1]
part = ''
part += c
if part:
yield part[::-1]
def reversed_blocks(fin, block_size=4096):
"""Generate blocks of file's contents in reverse order."""
fin.seek(0, os.SEEK_END)
here = fin.tell()
while 0 < here:
delta = min(block_size, here)
here -= delta
fin.seek(here, os.SEEK_SET)
yield fin.read(delta)
if __name__ == '__main__':
args = command_line_args()
print(f"Parsed arguments: {args}")