add unit test

This commit is contained in:
danfengcao 2016-12-21 12:19:31 +08:00
parent b3dd1a8bcf
commit a53dd75727
3 changed files with 151 additions and 40 deletions

View File

@ -1,7 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os, datetime
import os, sys, datetime
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
@ -71,9 +71,9 @@ class Binlog2sql(object):
if not self.stopnever:
if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
flagLastEvent = True
elif datetime.datetime.fromtimestamp(binlogevent.__dict__['timestamp']) < self.startTime:
elif datetime.datetime.fromtimestamp(binlogevent.timestamp) < self.startTime:
continue
elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.__dict__['timestamp']) >= self.stopTime):
elif (stream.log_file not in self.binlogList) or (self.endPos and stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos) or (datetime.datetime.fromtimestamp(binlogevent.timestamp) >= self.stopTime):
break
# else:
# raise ValueError('unknown binlog file or position')
@ -85,7 +85,7 @@ class Binlog2sql(object):
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, nopk=self.nopk)
if sql:
print sql
elif type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
elif isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
for row in binlogevent.rows:
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, row=row , flashback=self.flashback, nopk=self.nopk, eStartPos=eStartPos)
if self.flashback:
@ -93,7 +93,7 @@ class Binlog2sql(object):
else:
print sql
if type(binlogevent) not in (RotateEvent, FormatDescriptionEvent):
if isinstance(binlogevent, RotateEvent) or isinstance(binlogevent, FormatDescriptionEvent):
lastPos = binlogevent.packet.log_pos
if flagLastEvent:
break
@ -115,7 +115,7 @@ class Binlog2sql(object):
if __name__ == '__main__':
args = command_line_args()
args = command_line_args(sys.argv[1:])
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,

View File

@ -24,15 +24,14 @@ def create_unique_file(filename):
resultFile = filename
# if we have to try more than 1000 times, something is seriously wrong
while os.path.exists(resultFile) and version<1000:
resultFile = filename + '.' + version
resultFile = filename + '.' + str(version)
version += 1
if version >= 1000:
raise OSError('cannot create unique file %s[.0-1000]' % filename)
sys.exit(1)
raise OSError('cannot create unique file %s.[0-1000]' % filename)
return resultFile
def command_line_parser():
"""Returns a command line parser used for binlog2sql"""
def parse_args(args):
"""parse args for binlog2sql"""
parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want', add_help=False)
connect_setting = parser.add_argument_group('connect setting')
@ -68,34 +67,35 @@ def command_line_parser():
schema.add_argument('-t', '--tables', dest='tables', type=str, nargs='*',
help='tables you want to process', default='')
exclusive = parser.add_mutually_exclusive_group()
exclusive.add_argument('-K', '--no-primary-key', dest='nopk', action='store_true',
# exclusive = parser.add_mutually_exclusive_group()
parser.add_argument('-K', '--no-primary-key', dest='nopk', action='store_true',
help='Generate insert sql without primary key if exists', default=False)
exclusive.add_argument('-B', '--flashback', dest='flashback', action='store_true',
parser.add_argument('-B', '--flashback', dest='flashback', action='store_true',
help='Flashback data to start_postition of start_file', default=False)
return parser
return parser.parse_args(args)
def command_line_args():
parser = command_line_parser()
args = parser.parse_args()
def command_line_args(args):
args = parse_args(args)
if args.help:
parser.print_help()
sys.exit(1)
if not args.startFile:
raise ValueError('Lack of parameter: startFile')
if args.flashback and args.stopnever:
raise ValueError('only one of flashback or stop-never can be True')
raise ValueError('Only one of flashback or stop-never can be True')
if args.flashback and args.nopk:
raise ValueError('only one of flashback or nopk can be True')
raise ValueError('Only one of flashback or nopk can be True')
if (args.startTime and not is_valid_datetime(args.startTime)) or (args.stopTime and not is_valid_datetime(args.stopTime)):
raise ValueError('Incorrect date and time argument')
raise ValueError('Incorrect datetime argument')
return args
def compare_items((k, v)):
#caution: if v is NULL, may need to process
if v is None:
return '`%s` IS %%s'%k
return '`%s` IS %%s' % k
else:
return '`%s`=%%s'%k
return '`%s`=%%s' % k
def fix_object(value):
"""Fixes python objects so that they can be properly inserted into SQL queries"""
@ -107,59 +107,72 @@ def fix_object(value):
def concat_sql_from_binlogevent(cursor, binlogevent, row=None, eStartPos=None, flashback=False, nopk=False):
if flashback and nopk:
raise ValueError('only one of flashback or nopk can be True')
if type(binlogevent) not in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent, QueryEvent):
if not (isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent) or isinstance(binlogevent, QueryEvent)):
raise ValueError('binlogevent must be WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent or QueryEvent')
sql = ''
if isinstance(binlogevent, WriteRowsEvent) or isinstance(binlogevent, UpdateRowsEvent) or isinstance(binlogevent, DeleteRowsEvent):
pattern = generate_sql_pattern(binlogevent, row=row, flashback=flashback, nopk=nopk)
sql = cursor.mogrify(pattern['template'], pattern['values'])
sql += ' #start %s end %s time %s' % (eStartPos, binlogevent.packet.log_pos, datetime.datetime.fromtimestamp(binlogevent.timestamp))
elif isinstance(binlogevent, QueryEvent) and binlogevent.query != 'BEGIN' and binlogevent.query != 'COMMIT':
if binlogevent.schema:
sql = 'USE {0};\n'.format(binlogevent.schema)
sql += '{0};'.format(fix_object(binlogevent.query))
return sql
def generate_sql_pattern(binlogevent, row=None, flashback=False, nopk=False):
template = ''
values = []
if flashback is True:
if isinstance(binlogevent, WriteRowsEvent):
template = 'DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
' AND '.join(map(compare_items, row['values'].items()))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
values = map(fix_object, row['values'].values())
elif isinstance(binlogevent, DeleteRowsEvent):
template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format(
binlogevent.schema, binlogevent.table,
', '.join(map(lambda k: '`%s`'%k, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
values = map(fix_object, row['values'].values())
elif isinstance(binlogevent, UpdateRowsEvent):
template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
', '.join(['`%s`=%%s'%k for k in row['before_values'].keys()]),
' AND '.join(map(compare_items, row['after_values'].items())))
sql = cursor.mogrify(template, map(fix_object, row['before_values'].values()+row['after_values'].values()))
values = map(fix_object, row['before_values'].values()+row['after_values'].values())
else:
if isinstance(binlogevent, WriteRowsEvent):
if nopk:
tableInfo = (binlogevent.table_map)[binlogevent.table_id]
if tableInfo.primary_key:
row['values'].pop(tableInfo.primary_key)
# print binlogevent.__dict__
# tableInfo = (binlogevent.table_map)[binlogevent.table_id]
# if tableInfo.primary_key:
# row['values'].pop(tableInfo.primary_key)
if binlogevent.primary_key:
row['values'].pop(binlogevent.primary_key)
template = 'INSERT INTO `{0}`.`{1}`({2}) VALUES ({3});'.format(
binlogevent.schema, binlogevent.table,
', '.join(map(lambda k: '`%s`'%k, row['values'].keys())),
', '.join(['%s'] * len(row['values']))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
values = map(fix_object, row['values'].values())
elif isinstance(binlogevent, DeleteRowsEvent):
template ='DELETE FROM `{0}`.`{1}` WHERE {2} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
' AND '.join(map(compare_items, row['values'].items()))
)
sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
values = map(fix_object, row['values'].values())
elif isinstance(binlogevent, UpdateRowsEvent):
template = 'UPDATE `{0}`.`{1}` SET {2} WHERE {3} LIMIT 1;'.format(
binlogevent.schema, binlogevent.table,
', '.join(['`%s`=%%s'%k for k in row['after_values'].keys()]),
' AND '.join(map(compare_items, row['before_values'].items()))
)
sql = cursor.mogrify(template, map(fix_object, row['after_values'].values()+row['before_values'].values()))
elif isinstance(binlogevent, QueryEvent) and binlogevent.query != 'BEGIN' and binlogevent.query != 'COMMIT':
if binlogevent.schema:
sql = 'USE {0};\n'.format(binlogevent.schema)
sql += '{0};'.format(fix_object(binlogevent.query))
if type(binlogevent) in (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent):
sql += ' #start %s end %s time %s' % (eStartPos, binlogevent.packet.log_pos, datetime.datetime.fromtimestamp(binlogevent.__dict__['timestamp']))
return sql
values = map(fix_object, row['after_values'].values()+row['before_values'].values())
return {'template':template, 'values':values}

98
tests/test_binlog2sql_util.py Executable file
View File

@ -0,0 +1,98 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import unittest
import mock
from pymysql.cursors import Cursor
from pymysql.connections import Connection
from pymysqlreplication.event import BinLogEvent
from pymysqlreplication.row_event import (
WriteRowsEvent,
UpdateRowsEvent,
DeleteRowsEvent,
)
sys.path.append("..")
from binlog2sql.binlog2sql_util import *
class TestBinlog2sqlUtil(unittest.TestCase):
def test_is_valid_datetime(self):
self.assertTrue(is_valid_datetime('2015-12-12 12:12:12'))
self.assertFalse(is_valid_datetime('2015-12-12 12:12'))
self.assertFalse(is_valid_datetime('2015-12-12'))
self.assertFalse(is_valid_datetime(None))
@mock.patch('binlog2sql.binlog2sql_util.os.path')
def test_create_unique_file(self, mock_path):
filename = "test.sql"
mock_path.exists.return_value = False
self.assertEqual(create_unique_file(filename), filename)
mock_path.exists.return_value = True
try:
create_unique_file(filename)
except Exception as e:
self.assertEqual(str(e), "cannot create unique file %s.[0-1000]" % filename)
def test_command_line_args(self):
try:
command_line_args([])
except Exception as e:
self.assertEqual(str(e), "Lack of parameter: startFile")
try:
command_line_args(['--start-file', 'mysql-bin.000058', '--flashback', '--no-primary-key'])
except Exception as e:
self.assertEqual(str(e), "Only one of flashback or nopk can be True")
try:
command_line_args(['--start-file', 'mysql-bin.000058', '--flashback', '--stop-never'])
except Exception as e:
self.assertEqual(str(e), "Only one of flashback or stop-never can be True")
try:
command_line_args(['--start-file', 'mysql-bin.000058', '--start-datetime', '2016-12-12'])
except Exception as e:
self.assertEqual(str(e), "Incorrect datetime argument")
def test_compare_items(self):
self.assertEqual(compare_items(('data', '12345')), '`data`=%s')
self.assertEqual(compare_items(('data', None)), '`data` IS %s')
def test_fix_object(self):
self.assertEqual(fix_object('ascii'), 'ascii')
self.assertEqual(fix_object(u'unicode'), u'unicode'.encode('utf-8'))
def test_generate_sql_pattern(self):
row = {'values':{'data':'hello','id':1}}
mock_write_event = mock.create_autospec(WriteRowsEvent)
mock_write_event.schema = 'test'
mock_write_event.table = 'tbl'
mock_write_event.primary_key = 'id'
pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=False, nopk=False)
self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'})
pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=True, nopk=False)
self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'})
pattern = generate_sql_pattern(binlogevent=mock_write_event, row=row, flashback=False, nopk=True)
self.assertEqual(pattern, {'values': ['hello'], 'template': 'INSERT INTO `test`.`tbl`(`data`) VALUES (%s);'})
row = {'values':{'data':'hello','id':1}}
mock_delete_event = mock.create_autospec(DeleteRowsEvent)
mock_delete_event.schema = 'test'
mock_delete_event.table = 'tbl'
pattern = generate_sql_pattern(binlogevent=mock_delete_event, row=row, flashback=False, nopk=False)
self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'DELETE FROM `test`.`tbl` WHERE `data`=%s AND `id`=%s LIMIT 1;'})
pattern = generate_sql_pattern(binlogevent=mock_delete_event, row=row, flashback=True, nopk=False)
self.assertEqual(pattern, {'values': ['hello', 1], 'template': 'INSERT INTO `test`.`tbl`(`data`, `id`) VALUES (%s, %s);'})
row = {'before_values':{'data':'hello','id':1}, 'after_values':{'data':'binlog2sql','id':1}}
mock_update_event = mock.create_autospec(UpdateRowsEvent)
mock_update_event.schema = 'test'
mock_update_event.table = 'tbl'
pattern = generate_sql_pattern(binlogevent=mock_update_event, row=row, flashback=False, nopk=False)
self.assertEqual(pattern, {'values': ['binlog2sql', 1, 'hello', 1], 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND `id`=%s LIMIT 1;'})
pattern = generate_sql_pattern(binlogevent=mock_update_event, row=row, flashback=True, nopk=False)
self.assertEqual(pattern, {'values': ['hello', 1, 'binlog2sql', 1], 'template': 'UPDATE `test`.`tbl` SET `data`=%s, `id`=%s WHERE `data`=%s AND `id`=%s LIMIT 1;'})
if __name__ == '__main__':
unittest.main()