由于当时生产环境的数据库使用的是 MySQL5.1 而我们的从库使用的是 MySQL5.6,结果出现了两个数据库表的结构不一致的情况 ,但是我们当时没有用立即发现这种差异,导致后来迁移数据失败,主要原因是 MySQL5.1 与 MySQL5.6 的日志结构差异非常大,所以导致后来表结构的差异。

所以我后来写了一个脚本,专门用来检测检测 MySQL 主从的差异。这个脚本主要实现了表的结构的对比,数据行数的对比,以及部分数据的对比。

# 例外的表,不进行比较
out_tables = ('schema_migrations', 'product_category_hierarchies', 'roles_security_param_values', 'users_roles')
class CompareDatabase(object):
def __init__(self, master_info, slave_info):
self.slave_con = mdb.connect(*master_info)
self.master_con = mdb.connect(*slave_info)
self.slave_cur = self.slave_con.cursor()
self.master_cur = self.master_con.cursor()
# 输入参数:无
# 返回结果(tuple(string)):返回当前主从数据库的版本
def show_version(self):
sql = 'SELECT VERSION()'
self.slave_cur.execute(sql)
self.master_cur.execute(sql)
slave_version = self.slave_cur.fetchone()
master_version = self.master_cur.fetchone()
return master_version[0], slave_version[0]
# 输入参数:
# 1、"master" 返回主库的所有表的名称 type: list(string)
# 2、"slave" 返回从库所有的表的名称 type: lsit(string)
# 3、"all" 返回主库和从库所有的表的明称 type: tuple(list)
# 4、默认返回主库所有表的明称,非法参数返回 None
def show_tables(self, scheme='master'):
sql = 'SHOW TABLES'
if scheme == 'master':
self.master_cur.execute(sql)
master_tables = []
rows = self.master_cur.fetchall()
for row in rows:
if row[0] not in out_tables:
master_tables.append(row[0])
return master_tables
elif scheme == 'slave' and row[0]:
self.slave_cur.execute(sql)
rows = self.master_cur.fetchall()
slave_tables = []
for row in rows:
if row[0] not in out_tables:
slave_tables.append(row[0])
return slave_tables
if scheme == 'all':
self.master_cur.execute(sql)
self.slave_cur.execute(sql)
master_tables = []
rows = self.master_cur.fetchall()
for row in rows:
if row[0] not in out_tables:
master_tables.append(row[0])
slave_tables = []
rows = self.slave_cur.fetchall()
for row in rows:
if row[0] not in out_tables:
slave_tables.append(row[0])
return master_tables, slave_tables
# 返回某一个表的行数,默认返回主库
def show_table_row_num(self, table_name, scheme='master'):
sql = 'SELECT COUNT(*) FROM %s' % table_name
if scheme == 'master':
self.master_cur.execute(sql)
data = self.master_cur.fetchone()
return int(data[0])
elif scheme == 'slave':
self.slave_cur.execute(sql)
data = self.slave_cur.fetchone()
return int(data[0])
elif scheme == 'all':
self.master_cur.execute(sql)
self.slave_cur.execute(sql)
master_data = self.master_cur.fetchone()
slave_data = self.slave_cur.fetchone()
return int(master_data[0]), int(slave_data[0])
else:
return None
# 比较两个库之间表的数目和名称是否完全相同
# 相同返回:True  不同返回:False
def compare_db_tables(self):
master_tables, slave_tables = self.show_tables('all')
for table in master_tables:
if table not in slave_tables:
return False
return True
# 显示表的结构 default ('master')
# 输入参数:
# 1、master 返回主库中该表的结构
# 2、slave 返回从库中该表的结构
# 3、all 返回主库和从库中该表的结构
# 4、非法参数返回None
def show_table_structure(self, table_name, scheme='master'):
sql = 'DESC %s' % table_name
if scheme == 'master':
self.master_cur.execute(sql)
master_rows = self.master_cur.fetchall()
return master_rows
elif scheme == 'slave':
self.slave_cur.execute(sql)
slave_rows = self.slave_cur.fetchall()
return slave_rows
elif scheme == 'all':
self.master_cur.execute(sql)
master_rows = self.master_cur.fetchall()
self.slave_cur.execute(sql)
slave_rows = self.slave_cur.fetchall()
return master_rows, slave_rows
else:
return None
# 显示表的字段 default ('master')
# 输入参数:
# 1、master 返回主库中该表的字段
# 2、slave 返回从库中该表的字段
# 3、all 返回主库和从库中该表的字段
# 4、非法参数返回None
def show_table_columns(self, table, scheme='master'):
if scheme == 'master':
master_table_structure = self.show_table_structure(table)
master_column = []
for row in master_table_structure:
master_column.append(row[0])
return master_column
elif scheme == 'slave':
slave_table_structure = self.show_table_structure(table)
slave_column = []
for row in slave_table_structure:
slave_column.append(row[0])
return slave_column
elif scheme == 'all':
master_table_structure = self.show_table_structure(table)
master_column = []
for row in master_table_structure:
master_column.append(row[0])
slave_table_structure = self.show_table_structure(table)
slave_column = []
for row in slave_table_structure:
slave_column.append(row[0])
return master_column, slave_column
else:
return None
# 比较两个表的结构,相同返回True,不同返回False
def compare_table_structure(self, table_name):
sql = 'DESC %s' % table_name
self.slave_cur.execute(sql)
self.master_cur.execute(sql)
master_rows = self.master_cur.fetchall()
slave_rows = self.slave_cur.fetchall()
for row in master_rows:
if row not in slave_rows:
return False
return True
# 比较连个库所有表的结构,相同返回True,不同返回False
def compare_all_tables_structure(self):
if not self.compare_db_tables():
print('Slave and Master DB tables dose not match!')
return False
master_tables, slave_tables = self.show_tables('all')
for table in master_tables:
if not self.compare_table_structure(table):
print('%s table does not match!' % table)
return False
return True
# 返回表的行数
def show_table_rows(self, table_name):
print('show table %s' % table_name)
sql = 'SELECT COUNT(id) FROM %s' % table_name
self.master_cur.execute(sql)
master_result = self.master_cur.fetchone()
num_of_row = int(master_result[0])
return num_of_row
# 随机返回一个表的前500条记录的50个ID
# 如果表的记录不超过50,则全部返回
def show_table_id_random(self, table_name):
num_of_rows = self.show_table_rows(table_name)
sql = 'SELECT id FROM %s ORDER BY id LIMIT 200' % table_name
if num_of_rows >= 50:
self.master_cur.execute(sql)
master_rows = self.master_cur.fetchall()
selected_rows = random.sample(master_rows, 50)
table_id = []
for row in selected_rows:
table_id.append(int(row[0]))
return table_id
else:
sql = 'SELECT id FROM %s' % table_name
self.master_cur.execute(sql)
selected_rows = self.master_cur.fetchall()
table_id = []
for row in selected_rows:
table_id.append(int(row[0]))
return table_id
# 返回一个表的后50条记录
def show_table_id_last(self, table_name):
num_of_rows = self.show_table_rows(table_name)
sql = 'SELECT id FROM %s ORDER BY id desc LIMIT 50' % table_name
if num_of_rows >= 50:
self.slave_cur.execute(sql)
slave_rows = self.slave_cur.fetchall()
table_id = []
for row in slave_rows:
table_id.append(int(row[0]))
return table_id
else:
sql = 'SELECT id FROM %s' % table_id
self.slave_cur.execute(sql)
slave_rows = self.slave_cur.fetchall()
table_id = []
for row in slave_rows:
table_id.append(int(row[0]))
return table_id
return None
# 随机比较两个表的数据
def compare_one_table_data_random(self, table_name):
table_id = self.show_table_id_random(table_name)
print("Process %s" % table_name)
for id in table_id:
sql = 'SELECT * FROM %s WHERE id = %d' % (table_name, id)
self.master_cur.execute(sql)
self.slave_cur.execute(sql)
master_row = self.master_cur.fetchone()
slave_row = self.slave_cur.fetchone()
if master_row != slave_row:
return False
return True
# 随机比较所有表的数据
def compare_all_tables_data_random(self):
all_tables = self.show_tables()
for table in all_tables:
if self.compare_one_table_data_random(table):
print('%s table is OK' % table)
else:
print('%s table can not pass' % table)
def __del__(self):
self.master_con.close()
self.slave_con.close()

还可以添加一些其他的功能,比如输出文档,但是这不是核心功能。我们后来测试对比主从差异非常小,时间在 1s 以内。

发表评论

电子邮件地址不会被公开。 必填项已用*标注