add realtime option, fix position filter, add fixOldMasterExtraData

This commit is contained in:
danfengcao 2016-12-06 13:00:56 +08:00
parent 0e106ff966
commit 03d97b345a
1 changed files with 31 additions and 24 deletions

View File

@ -14,7 +14,7 @@ from pymysqlreplication.event import QueryEvent
def command_line_parser(): def command_line_parser():
"""Returns a command line parser used for binlog2sql""" """Returns a command line parser used for binlog2sql"""
parser = argparse.ArgumentParser(description='Parse MySQL binlog to standard SQL') parser = argparse.ArgumentParser(description='Parse MySQL binlog to SQL you want')
connect_setting = parser.add_argument_group('connect setting') connect_setting = parser.add_argument_group('connect setting')
connect_setting.add_argument('--host', dest='host', type=str, connect_setting.add_argument('--host', dest='host', type=str,
help='Host the MySQL database server located', default='127.0.0.1') help='Host the MySQL database server located', default='127.0.0.1')
@ -30,9 +30,11 @@ def command_line_parser():
position.add_argument('--start-pos', dest='startPos', type=int, position.add_argument('--start-pos', dest='startPos', type=int,
help='start position of start binlog file', default=4) help='start position of start binlog file', default=4)
position.add_argument('--end-file', dest='endFile', type=str, position.add_argument('--end-file', dest='endFile', type=str,
help='End binlog file to be parsed', default='') help="End binlog file to be parsed. default: '--start-file'", default='')
position.add_argument('--end-pos', dest='endPos', type=int, position.add_argument('--end-pos', dest='endPos', type=int,
help='stop position of end binlog file', default=4) help="stop position of end binlog file. default: end position of '--end-file'", default=0)
parser.add_argument('--realtime', dest='realtime', action='store_true',
help='continuously replicate binlog. default: stop replicate when meeting the latest binlog when you run the program', default=False)
schema = parser.add_argument_group('schema filter') schema = parser.add_argument_group('schema filter')
schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*', schema.add_argument('-d', '--databases', dest='databases', type=str, nargs='*',
@ -71,8 +73,6 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False,
binlogevent.table, binlogevent.table,
' AND '.join(map(compare_items, row['values'].items())) ' AND '.join(map(compare_items, row['values'].items()))
) )
# print template
# print row['values'].values()
sql = cursor.mogrify(template, map(fix_object, row['values'].values())) sql = cursor.mogrify(template, map(fix_object, row['values'].values()))
elif isinstance(binlogevent, DeleteRowsEvent): elif isinstance(binlogevent, DeleteRowsEvent):
template = 'INSERT INTO {0}({1}) VALUES ({2});'.format( template = 'INSERT INTO {0}({1}) VALUES ({2});'.format(
@ -122,22 +122,24 @@ def concat_sql_from_binlogevent(cursor, binlogevent, row=None, flashback=False,
class Binlog2sql(object): class Binlog2sql(object):
def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None, def __init__(self, connectionSettings, startFile=None, startPos=None, endFile=None,
endPos=None, only_schemas=[], only_tables=[], popPk=False, flashback=False): endPos=None, only_schemas=[], only_tables=[], popPk=False, flashback=False, realtime=False):
''' '''
connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave} connectionSettings: {'host': 127.0.0.1, 'port': 3306, 'user': slave, 'passwd': slave}
''' '''
self.connectionSettings = connectionSettings
self.startFile = startFile
self.startPos = startPos
if not startFile: if not startFile:
raise ValueError('lack of parameter,startFile.') raise ValueError('lack of parameter,startFile.')
if not startPos:
self.startFile = 4 self.connectionSettings = connectionSettings
self.startFile = startFile
self.startPos = startPos if startPos else 4
self.endFile = endFile if endFile else startFile
self.endPos = endPos
self.only_schemas = only_schemas if only_schemas else None self.only_schemas = only_schemas if only_schemas else None
self.only_tables = only_tables if only_tables else None self.only_tables = only_tables if only_tables else None
self.popPk = popPk self.popPk = popPk
self.flashback = flashback self.flashback = flashback
self.realtime = realtime
self.binlogList = [] self.binlogList = []
self.connection = pymysql.connect(**self.connectionSettings) self.connection = pymysql.connect(**self.connectionSettings)
@ -145,10 +147,10 @@ class Binlog2sql(object):
cur = self.connection.cursor() cur = self.connection.cursor()
cur.execute("SHOW MASTER STATUS") cur.execute("SHOW MASTER STATUS")
self.eofFile, self.eofPos = cur.fetchone()[:2] self.eofFile, self.eofPos = cur.fetchone()[:2]
if endFile and endPos: # if endFile and endPos:
self.endFile, self.endPos = (endFile, endPos) # self.endFile, self.endPos = (endFile, endPos)
else: # else:
self.endFile, self.endPos = (self.eofFile, self.eofPos) # self.endFile, self.endPos = (self.eofFile, self.eofPos)
cur.execute("SHOW MASTER LOGS") cur.execute("SHOW MASTER LOGS")
binIndex = [row[0] for row in cur.fetchall()] binIndex = [row[0] for row in cur.fetchall()]
@ -172,16 +174,22 @@ class Binlog2sql(object):
cur = self.connection.cursor() cur = self.connection.cursor()
tmpFile = 'tmp.%s.%s.tmp' % (self.connectionSettings['host'],self.connectionSettings['port']) # to simplify code, we do not use file lock for tmpFile. tmpFile = 'tmp.%s.%s.tmp' % (self.connectionSettings['host'],self.connectionSettings['port']) # to simplify code, we do not use file lock for tmpFile.
ftmp = open(tmpFile ,"w") ftmp = open(tmpFile ,"w")
flagExit = 0 flagLastEvent = False
try: try:
for binlogevent in stream: for binlogevent in stream:
if (stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos): if not self.realtime:
break if (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos):
elif (stream.log_file == self.endFile and stream.log_pos == self.endPos) or (stream.log_file == self.eofFile and stream.log_pos == self.eofPos): flagLastEvent = True
flagExit = 1 elif not self.endPos:
if stream.log_file not in self.binlogList:
break
elif (stream.log_file == self.endFile and stream.log_pos > self.endPos) or (stream.log_file == self.eofFile and stream.log_pos > self.eofPos):
break
else:
raise ValueError('unknown binlog file or position')
if isinstance(binlogevent, QueryEvent): if isinstance(binlogevent, QueryEvent):
if binlogevent.query != 'BEGIN': if binlogevent.query != 'BEGIN' and binlogevent.query != 'COMMIT':
sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, popPk=self.popPk) sql = concat_sql_from_binlogevent(cursor=cur, binlogevent=binlogevent, flashback=self.flashback, popPk=self.popPk)
if sql: if sql:
print sql print sql
@ -193,7 +201,7 @@ class Binlog2sql(object):
else: else:
print sql print sql
if flagExit: if flagLastEvent:
break break
ftmp.close() ftmp.close()
if self.flashback: if self.flashback:
@ -203,7 +211,6 @@ class Binlog2sql(object):
print line.rstrip() print line.rstrip()
finally: finally:
os.remove(tmpFile) os.remove(tmpFile)
cur.close() cur.close()
stream.close() stream.close()
return True return True
@ -219,5 +226,5 @@ if __name__ == '__main__':
connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password} connectionSettings = {'host':args.host, 'port':args.port, 'user':args.user, 'passwd':args.password}
binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile, binlog2sql = Binlog2sql(connectionSettings=connectionSettings, startFile=args.startFile,
startPos=args.startPos, endFile=args.endFile, endPos=args.endPos, startPos=args.startPos, endFile=args.endFile, endPos=args.endPos,
only_schemas=args.databases, only_tables=args.tables, popPk=args.popPk, flashback=args.flashback) only_schemas=args.databases, only_tables=args.tables, popPk=args.popPk, flashback=args.flashback, realtime=args.realtime)
binlog2sql.process_binlog() binlog2sql.process_binlog()