使用python进行两个数据库的结构同步


安装环境:

python3 -m venv myenv  # Create a virtual environment
source myenv/bin/activate  # Activate the virtual environment
pip install sqlalchemy pymysql python-Levenshtein  # Install your packages inside the virtual environment

🛠️ 在本地(localhost)上创建:

CREATE USER 'dbsync'@'localhost' IDENTIFIED BY 'strong_sync_password';

GRANT USAGE ON *.* TO 'dbsync'@'localhost';
GRANT SELECT ON `dbdbdb`.* TO 'dbsync'@'localhost';
GRANT CREATE, ALTER, DROP, INDEX ON `dbdbdb`.* TO 'dbsync'@'localhost';

FLUSH PRIVILEGES;

🌐 在远程主机上创建(假设你从任意 IP 访问):

CREATE USER 'dbsync'@'%' IDENTIFIED BY 'strong_sync_password';

GRANT USAGE ON *.* TO 'dbsync'@'%';
GRANT SELECT ON `dbdbdb`.* TO 'dbsync'@'%';
GRANT CREATE, ALTER, DROP, INDEX ON `dbdbdb`.* TO 'dbsync'@'%';

FLUSH PRIVILEGES;

核心脚本dbsync.py,更改本地和远程的连接配置即可:

import traceback
from sqlalchemy import create_engine, MetaData, Table, text
from sqlalchemy.schema import CreateTable
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, FLOAT
import Levenshtein

# === 连接配置 ===
local_url = "mysql+pymysql://root:123456@localhost/db1"
remote_url = "mysql+pymysql://root:123456@localhost/db2"

# === 控制同步方向 ===
local_to_remote = True  # False 表示反向同步

source_url, target_url = (local_url, remote_url) if local_to_remote else (remote_url, local_url)

print(f"[INFO] Sync direction: {'local → remote' if local_to_remote else 'remote → local'}")

local_engine = create_engine(source_url, future=True)
remote_engine = create_engine(target_url, future=True)

local_metadata = MetaData()
remote_metadata = MetaData()
local_metadata.reflect(bind=local_engine)
remote_metadata.reflect(bind=remote_engine)

# === 默认值逻辑 ===
def guess_default(col):
    if col.default is not None:
        return f"DEFAULT {col.default.arg}"
    elif col.nullable:
        return "DEFAULT NULL"
    elif isinstance(col.type, INTEGER):
        return "DEFAULT 0"
    elif isinstance(col.type, FLOAT):
        return "DEFAULT 0.0"
    elif isinstance(col.type, VARCHAR):
        return "DEFAULT ''"
    else:
        return ""

# === 上下文字段匹配辅助 ===
def get_neighbors(name_list, name):
    idx = name_list.index(name)
    before = name_list[idx - 1] if idx > 0 else None
    after = name_list[idx + 1] if idx + 1 < len(name_list) else None
    return before, after

def find_renamed_column(local_col, local_col_names, remote_col_names, remote_columns):
    local_idx = local_col_names.index(local_col.name)

    # === Step 1: 快速位置匹配 ===
    if local_idx < len(remote_col_names):
        remote_candidate = remote_col_names[local_idx]
        remote_col = remote_columns[remote_candidate]
        if str(local_col.type) == str(remote_col.type):
            return remote_candidate  # 位置相同 + 类型一致,认定为改名

    # === Step 2: 智能模糊匹配 ===
    candidates = list(remote_columns.keys())
    matches = [(c, Levenshtein.ratio(local_col.name.lower(), c.lower())) for c in candidates]
    matches = sorted(matches, key=lambda x: -x[1])

    for remote_name, score in matches:
        if score < 0.6:
            continue
        remote_col = remote_columns[remote_name]
        if str(local_col.type) != str(remote_col.type):
            continue

        # 上下文字段匹配
        local_prev, local_next = get_neighbors(local_col_names, local_col.name)
        remote_prev, remote_next = get_neighbors(remote_col_names, remote_name)

        match_prev = (local_prev == remote_prev) if local_prev and remote_prev else True
        match_next = (local_next == remote_next) if local_next and remote_next else True

        if match_prev or match_next:
            return remote_name

    return None  # 没找到可能的重命名字段


# === 主同步逻辑 ===
alter_statements = []

def sync_structure():
    for table_name, local_table in local_metadata.tables.items():
        if table_name not in remote_metadata.tables:
            print(f"[CREATE TABLE] Missing table `{table_name}`")
            ddl = str(CreateTable(local_table).compile(remote_engine))
            alter_statements.append(ddl + ";")
            continue

        print(f"[CHECK] Comparing `{table_name}`...")
        remote_table = remote_metadata.tables[table_name]
        remote_columns = remote_table.columns
        local_columns = local_table.columns

        remote_col_names = list(remote_columns.keys())
        local_col_names = list(local_columns.keys())

        # 字段同步
        for local_col in local_columns.values():
            name = local_col.name
            if name in remote_columns:
                remote_col = remote_columns[name]
                if str(local_col.type) != str(remote_col.type):
                    col_type = local_col.type.compile(dialect=remote_engine.dialect)
                    alter_statements.append(
                        f"ALTER TABLE `{table_name}` MODIFY COLUMN `{name}` {col_type} {guess_default(local_col)};"
                    )
                continue

            # 尝试识别重命名字段
            renamed = find_renamed_column(local_col, local_col_names, remote_col_names, remote_columns)
            if renamed:
                col_type = local_col.type.compile(dialect=remote_engine.dialect)
                alter_statements.append(
                    f"ALTER TABLE `{table_name}` CHANGE COLUMN `{renamed}` `{name}` {col_type} {guess_default(local_col)};"
                )
            else:
                # 新增字段
                col_type = local_col.type.compile(dialect=remote_engine.dialect)
                alter_statements.append(
                    f"ALTER TABLE `{table_name}` ADD COLUMN `{name}` {col_type} {guess_default(local_col)};"
                )

        # 主键同步
        if local_table.primary_key:
            local_pk = set(local_table.primary_key.columns.keys())
            remote_pk = set(remote_table.primary_key.columns.keys())
            if local_pk != remote_pk:
                alter_statements.append(f"ALTER TABLE `{table_name}` DROP PRIMARY KEY;")
                alter_statements.append(f"ALTER TABLE `{table_name}` ADD PRIMARY KEY ({', '.join(f'`{c}`' for c in local_pk)});")

        # 索引同步
        local_indexes = {idx.name: idx for idx in local_table.indexes}
        remote_indexes = {idx.name: idx for idx in remote_table.indexes}

        for idx_name, idx in local_indexes.items():
            if idx_name not in remote_indexes:
                cols = ', '.join(f"`{col.name}`" for col in idx.columns)
                unique = "UNIQUE" if idx.unique else ""
                alter_statements.append(f"CREATE {unique} INDEX `{idx_name}` ON `{table_name}` ({cols});")

        # 外键同步(选做)
        for fk in local_table.foreign_keys:
            if fk.constraint.name not in [fk2.name for fk2 in remote_table.foreign_keys]:
                alter_statements.append(str(fk.constraint.compile(dialect=remote_engine.dialect)) + ";")

def apply_changes():
    with remote_engine.connect() as conn:
        for stmt in alter_statements:
            print(f"[EXEC] {stmt}")
            try:
                conn.execute(text(stmt))
                conn.commit()
                print("[OK]")
            except Exception as e:
                print("[ERROR]", e)
                traceback.print_exc()

if __name__ == "__main__":
    sync_structure()

    print("\n========== PREVIEW OF ALTER STATEMENTS ==========")
    for stmt in alter_statements:
        print(stmt)
    print("==================================================")

    confirm = input("Type YES to apply changes: ")
    if confirm.strip().upper() == "YES":
        apply_changes()
    else:
        print("Aborted.")

原文链接:https://blog.yongit.com/note/1573151.html