Published on

使用 Python 比较 MySQL 主从的差异

Authors

由于当时生产环境的数据库使用的是 MySQL5.1 而我们的从库使用的是 MySQL5.6,结果出现了两个数据库表的结构不一致的情况 ,但是我们当时没有用立即发现这种差异,导致后来迁移数据失败,主要原因是 MySQL5.1 与 MySQL5.6 的日志结构差异非常大,所以导致后来表结构的差异。

所以我后来写了一个脚本,专门用来检测检测 MySQL 主从的差异。这个脚本主要实现了表的结构的对比,数据行数的对比,以及部分数据的对比。

# 例外的表,不进行比较
out_tables = ('schema_migrations', 'product_category_hierarchies', 'roles_security_param_values', 'users_roles')

class CompareDatabase(object):
    def __init__(self, master_info, slave_info):
        self.slave_con = mdb.connect(*master_info)
        self.master_con = mdb.connect(*slave_info)
        self.slave_cur = self.slave_con.cursor()
        self.master_cur = self.master_con.cursor()

    # 输入参数:无
    # 返回结果(tuple(string)):返回当前主从数据库的版本
    def show_version(self):
        sql = 'SELECT VERSION()'
        self.slave_cur.execute(sql)
        self.master_cur.execute(sql)
        slave_version = self.slave_cur.fetchone()
        master_version = self.master_cur.fetchone()
        return master_version[0], slave_version[0]

    # 输入参数:
    # 1、"master" 返回主库的所有表的名称 type: list(string)
    # 2、"slave" 返回从库所有的表的名称 type: lsit(string)
    # 3、"all" 返回主库和从库所有的表的明称 type: tuple(list)
    # 4、默认返回主库所有表的明称,非法参数返回 None
    def show_tables(self, scheme='master'):
        sql = 'SHOW TABLES'
        if scheme == 'master':
            self.master_cur.execute(sql)
            master_tables = []
            rows = self.master_cur.fetchall()
            for row in rows:
                if row[0] not in out_tables:
                    master_tables.append(row[0])
            return master_tables
        elif scheme == 'slave' and row[0]:
            self.slave_cur.execute(sql)
            rows = self.master_cur.fetchall()
            slave_tables = []
            for row in rows:
                if row[0] not in out_tables:
                    slave_tables.append(row[0])
            return slave_tables
        if scheme == 'all':
            self.master_cur.execute(sql)
            self.slave_cur.execute(sql)
            master_tables = []
            rows = self.master_cur.fetchall()
            for row in rows:
                if row[0] not in out_tables:
                    master_tables.append(row[0])
            slave_tables = []
            rows = self.slave_cur.fetchall()
            for row in rows:
                if row[0] not in out_tables:
                    slave_tables.append(row[0])
            return master_tables, slave_tables

    # 返回某一个表的行数,默认返回主库
    def show_table_row_num(self, table_name, scheme='master'):
        sql = 'SELECT COUNT(*) FROM %s' % table_name
        if scheme == 'master':
            self.master_cur.execute(sql)
            data = self.master_cur.fetchone()
            return int(data[0])
        elif scheme == 'slave':
            self.slave_cur.execute(sql)
            data = self.slave_cur.fetchone()
            return int(data[0])
        elif scheme == 'all':
            self.master_cur.execute(sql)
            self.slave_cur.execute(sql)
            master_data = self.master_cur.fetchone()
            slave_data = self.slave_cur.fetchone()
            return int(master_data[0]), int(slave_data[0])
        else:
            return None

    # 比较两个库之间表的数目和名称是否完全相同
    # 相同返回:True  不同返回:False
    def compare_db_tables(self):
        master_tables, slave_tables = self.show_tables('all')
        for table in master_tables:
            if table not in slave_tables:
                return False
        return True

    # 显示表的结构 default ('master')
    # 输入参数:
    # 1、master 返回主库中该表的结构
    # 2、slave 返回从库中该表的结构
    # 3、all 返回主库和从库中该表的结构
    # 4、非法参数返回None
    def show_table_structure(self, table_name, scheme='master'):
        sql = 'DESC %s' % table_name
        if scheme == 'master':
            self.master_cur.execute(sql)
            master_rows = self.master_cur.fetchall()
            return master_rows
        elif scheme == 'slave':
            self.slave_cur.execute(sql)
            slave_rows = self.slave_cur.fetchall()
            return slave_rows
        elif scheme == 'all':
            self.master_cur.execute(sql)
            master_rows = self.master_cur.fetchall()
            self.slave_cur.execute(sql)
            slave_rows = self.slave_cur.fetchall()
            return master_rows, slave_rows
        else:
            return None

    # 显示表的字段 default ('master')
    # 输入参数:
    # 1、master 返回主库中该表的字段
    # 2、slave 返回从库中该表的字段
    # 3、all 返回主库和从库中该表的字段
    # 4、非法参数返回None
    def show_table_columns(self, table, scheme='master'):
        if scheme == 'master':
            master_table_structure = self.show_table_structure(table)
            master_column = []
            for row in master_table_structure:
                master_column.append(row[0])
            return master_column
        elif scheme == 'slave':
            slave_table_structure = self.show_table_structure(table)
            slave_column = []
            for row in slave_table_structure:
                slave_column.append(row[0])
            return slave_column
        elif scheme == 'all':
            master_table_structure = self.show_table_structure(table)
            master_column = []
            for row in master_table_structure:
                master_column.append(row[0])
            slave_table_structure = self.show_table_structure(table)
            slave_column = []
            for row in slave_table_structure:
                slave_column.append(row[0])
            return master_column, slave_column
        else:
            return None

    # 比较两个表的结构,相同返回True,不同返回False
    def compare_table_structure(self, table_name):
        sql = 'DESC %s' % table_name
        self.slave_cur.execute(sql)
        self.master_cur.execute(sql)
        master_rows = self.master_cur.fetchall()
        slave_rows = self.slave_cur.fetchall()
        for row in master_rows:
            if row not in slave_rows:
                return False
        return True

    # 比较连个库所有表的结构,相同返回True,不同返回False
    def compare_all_tables_structure(self):
        if not self.compare_db_tables():
            print('Slave and Master DB tables dose not match!')
            return False
        master_tables, slave_tables = self.show_tables('all')
        for table in master_tables:
            if not self.compare_table_structure(table):
                print('%s table does not match!' % table)
                return False
        return True

    # 返回表的行数
    def show_table_rows(self, table_name):
        print('show table %s' % table_name)
        sql = 'SELECT COUNT(id) FROM %s' % table_name
        self.master_cur.execute(sql)
        master_result = self.master_cur.fetchone()
        num_of_row = int(master_result[0])
        return num_of_row

    # 随机返回一个表的前500条记录的50个ID
    # 如果表的记录不超过50,则全部返回
    def show_table_id_random(self, table_name):
        num_of_rows = self.show_table_rows(table_name)
        sql = 'SELECT id FROM %s ORDER BY id LIMIT 200' % table_name
        if num_of_rows >= 50:
            self.master_cur.execute(sql)
            master_rows = self.master_cur.fetchall()
            selected_rows = random.sample(master_rows, 50)
            table_id = []
            for row in selected_rows:
                table_id.append(int(row[0]))
            return table_id
        else:
            sql = 'SELECT id FROM %s' % table_name
            self.master_cur.execute(sql)
            selected_rows = self.master_cur.fetchall()
            table_id = []
            for row in selected_rows:
                table_id.append(int(row[0]))
            return table_id

    # 返回一个表的后50条记录
    def show_table_id_last(self, table_name):
        num_of_rows = self.show_table_rows(table_name)
        sql = 'SELECT id FROM %s ORDER BY id desc LIMIT 50' % table_name
        if num_of_rows >= 50:
            self.slave_cur.execute(sql)
            slave_rows = self.slave_cur.fetchall()
            table_id = []
            for row in slave_rows:
                table_id.append(int(row[0]))
            return table_id
        else:
            sql = 'SELECT id FROM %s' % table_id
            self.slave_cur.execute(sql)
            slave_rows = self.slave_cur.fetchall()
            table_id = []
            for row in slave_rows:
                table_id.append(int(row[0]))
            return table_id
        return None

    # 随机比较两个表的数据
    def compare_one_table_data_random(self, table_name):
        table_id = self.show_table_id_random(table_name)
        print("Process %s" % table_name)
        for id in table_id:
            sql = 'SELECT * FROM %s WHERE id = %d' % (table_name, id)
            self.master_cur.execute(sql)
            self.slave_cur.execute(sql)
            master_row = self.master_cur.fetchone()
            slave_row = self.slave_cur.fetchone()
            if master_row != slave_row:
                return False
        return True

    # 随机比较所有表的数据
    def compare_all_tables_data_random(self):
        all_tables = self.show_tables()
        for table in all_tables:
            if self.compare_one_table_data_random(table):
                print('%s table is OK' % table)
            else:
                print('%s table can not pass' % table)

    def __del__(self):
        self.master_con.close()
        self.slave_con.close()

还可以添加一些其他的功能,比如输出文档,但是这不是核心功能。我们后来测试对比主从差异非常小,时间在 1s 以内。