使用python进行两个数据库的结构同步
安装环境:
python3 -m venv myenv # Create a virtual environment
source myenv/bin/activate # Activate the virtual environment
pip install sqlalchemy pymysql python-Levenshtein # Install your packages inside the virtual environment
🛠️ 在本地(localhost)上创建:
CREATE USER 'dbsync'@'localhost' IDENTIFIED BY 'strong_sync_password';
GRANT USAGE ON *.* TO 'dbsync'@'localhost';
GRANT SELECT ON `dbdbdb`.* TO 'dbsync'@'localhost';
GRANT CREATE, ALTER, DROP, INDEX ON `dbdbdb`.* TO 'dbsync'@'localhost';
FLUSH PRIVILEGES;
🌐 在远程主机上创建(假设你从任意 IP 访问):
CREATE USER 'dbsync'@'%' IDENTIFIED BY 'strong_sync_password';
GRANT USAGE ON *.* TO 'dbsync'@'%';
GRANT SELECT ON `dbdbdb`.* TO 'dbsync'@'%';
GRANT CREATE, ALTER, DROP, INDEX ON `dbdbdb`.* TO 'dbsync'@'%';
FLUSH PRIVILEGES;
核心脚本dbsync.py
,更改本地和远程的连接配置即可:
import traceback
from sqlalchemy import create_engine, MetaData, Table, text
from sqlalchemy.schema import CreateTable
from sqlalchemy.dialects.mysql import INTEGER, VARCHAR, FLOAT
import Levenshtein
# === 连接配置 ===
local_url = "mysql+pymysql://root:123456@localhost/db1"
remote_url = "mysql+pymysql://root:123456@localhost/db2"
# === 控制同步方向 ===
local_to_remote = True # False 表示反向同步
source_url, target_url = (local_url, remote_url) if local_to_remote else (remote_url, local_url)
print(f"[INFO] Sync direction: {'local → remote' if local_to_remote else 'remote → local'}")
local_engine = create_engine(source_url, future=True)
remote_engine = create_engine(target_url, future=True)
local_metadata = MetaData()
remote_metadata = MetaData()
local_metadata.reflect(bind=local_engine)
remote_metadata.reflect(bind=remote_engine)
# === 默认值逻辑 ===
def guess_default(col):
if col.default is not None:
return f"DEFAULT {col.default.arg}"
elif col.nullable:
return "DEFAULT NULL"
elif isinstance(col.type, INTEGER):
return "DEFAULT 0"
elif isinstance(col.type, FLOAT):
return "DEFAULT 0.0"
elif isinstance(col.type, VARCHAR):
return "DEFAULT ''"
else:
return ""
# === 上下文字段匹配辅助 ===
def get_neighbors(name_list, name):
idx = name_list.index(name)
before = name_list[idx - 1] if idx > 0 else None
after = name_list[idx + 1] if idx + 1 < len(name_list) else None
return before, after
def find_renamed_column(local_col, local_col_names, remote_col_names, remote_columns):
local_idx = local_col_names.index(local_col.name)
# === Step 1: 快速位置匹配 ===
if local_idx < len(remote_col_names):
remote_candidate = remote_col_names[local_idx]
remote_col = remote_columns[remote_candidate]
if str(local_col.type) == str(remote_col.type):
return remote_candidate # 位置相同 + 类型一致,认定为改名
# === Step 2: 智能模糊匹配 ===
candidates = list(remote_columns.keys())
matches = [(c, Levenshtein.ratio(local_col.name.lower(), c.lower())) for c in candidates]
matches = sorted(matches, key=lambda x: -x[1])
for remote_name, score in matches:
if score < 0.6:
continue
remote_col = remote_columns[remote_name]
if str(local_col.type) != str(remote_col.type):
continue
# 上下文字段匹配
local_prev, local_next = get_neighbors(local_col_names, local_col.name)
remote_prev, remote_next = get_neighbors(remote_col_names, remote_name)
match_prev = (local_prev == remote_prev) if local_prev and remote_prev else True
match_next = (local_next == remote_next) if local_next and remote_next else True
if match_prev or match_next:
return remote_name
return None # 没找到可能的重命名字段
# === 主同步逻辑 ===
alter_statements = []
def sync_structure():
for table_name, local_table in local_metadata.tables.items():
if table_name not in remote_metadata.tables:
print(f"[CREATE TABLE] Missing table `{table_name}`")
ddl = str(CreateTable(local_table).compile(remote_engine))
alter_statements.append(ddl + ";")
continue
print(f"[CHECK] Comparing `{table_name}`...")
remote_table = remote_metadata.tables[table_name]
remote_columns = remote_table.columns
local_columns = local_table.columns
remote_col_names = list(remote_columns.keys())
local_col_names = list(local_columns.keys())
# 字段同步
for local_col in local_columns.values():
name = local_col.name
if name in remote_columns:
remote_col = remote_columns[name]
if str(local_col.type) != str(remote_col.type):
col_type = local_col.type.compile(dialect=remote_engine.dialect)
alter_statements.append(
f"ALTER TABLE `{table_name}` MODIFY COLUMN `{name}` {col_type} {guess_default(local_col)};"
)
continue
# 尝试识别重命名字段
renamed = find_renamed_column(local_col, local_col_names, remote_col_names, remote_columns)
if renamed:
col_type = local_col.type.compile(dialect=remote_engine.dialect)
alter_statements.append(
f"ALTER TABLE `{table_name}` CHANGE COLUMN `{renamed}` `{name}` {col_type} {guess_default(local_col)};"
)
else:
# 新增字段
col_type = local_col.type.compile(dialect=remote_engine.dialect)
alter_statements.append(
f"ALTER TABLE `{table_name}` ADD COLUMN `{name}` {col_type} {guess_default(local_col)};"
)
# 主键同步
if local_table.primary_key:
local_pk = set(local_table.primary_key.columns.keys())
remote_pk = set(remote_table.primary_key.columns.keys())
if local_pk != remote_pk:
alter_statements.append(f"ALTER TABLE `{table_name}` DROP PRIMARY KEY;")
alter_statements.append(f"ALTER TABLE `{table_name}` ADD PRIMARY KEY ({', '.join(f'`{c}`' for c in local_pk)});")
# 索引同步
local_indexes = {idx.name: idx for idx in local_table.indexes}
remote_indexes = {idx.name: idx for idx in remote_table.indexes}
for idx_name, idx in local_indexes.items():
if idx_name not in remote_indexes:
cols = ', '.join(f"`{col.name}`" for col in idx.columns)
unique = "UNIQUE" if idx.unique else ""
alter_statements.append(f"CREATE {unique} INDEX `{idx_name}` ON `{table_name}` ({cols});")
# 外键同步(选做)
for fk in local_table.foreign_keys:
if fk.constraint.name not in [fk2.name for fk2 in remote_table.foreign_keys]:
alter_statements.append(str(fk.constraint.compile(dialect=remote_engine.dialect)) + ";")
def apply_changes():
with remote_engine.connect() as conn:
for stmt in alter_statements:
print(f"[EXEC] {stmt}")
try:
conn.execute(text(stmt))
conn.commit()
print("[OK]")
except Exception as e:
print("[ERROR]", e)
traceback.print_exc()
if __name__ == "__main__":
sync_structure()
print("\n========== PREVIEW OF ALTER STATEMENTS ==========")
for stmt in alter_statements:
print(stmt)
print("==================================================")
confirm = input("Type YES to apply changes: ")
if confirm.strip().upper() == "YES":
apply_changes()
else:
print("Aborted.")