这套方案涵盖了数据从 MySQL 搬迁到本地/远程 D1 的全自动处理逻辑。
第一步:更新核心导出脚本 (migrate_articles.py)
使用这个版本,它能完美处理文章中的复杂换行符(如代码块、长文章),确保生成的 SQL 不会断行。
#!/usr/bin/env python3
import pymysql, sys, os
from datetime import datetime
# 数据库配置
MYSQL_CONFIG = {
';host';: ';localhost';, ';port';: 3306, ';user';: ';yys';,
';password';: ';123456';, ';db';: ';yys';, ';charset';: ';utf8mb4';
}
OUTPUT_FILE = ';articles_import.sql';
def esc(val):
if val is None: return ';NULL';
# 核心技巧:将换行符转为 SQLite 函数,防止 SQL 语句在文本中被断开
s = str(val).replace(";';";, ";';';";)
s = s.replace(';\r\n';, ";'; || char(10) || ';";).replace(';\n';, ";'; || char(10) || ';";)
return ";';"; + s + ";';";
def dt(val):
if val is None: return ';NULL';
if isinstance(val, datetime): return f";';{val.strftime(';%Y-%m-%d %H:%M:%S';)}';";
return ";';"; + str(val).split(';.';)[0] + ";';";
def main():
try:
conn = pymysql.connect(**MYSQL_CONFIG)
cur = conn.cursor(pymysql.cursors.DictCursor)
cur.execute(";SELECT id, avatar, title, content, total_views, created, updated, author_id, category_id, previous_content, is_encrypted FROM tb_article ORDER BY id";)
rows = cur.fetchall()
conn.close()
except Exception as e:
print(f";❌ MySQL连接失败: {e}";); sys.exit(1)
lines = [";PRAGMA journal_mode=WAL;";]
cols = ";id, avatar, title, content, total_views, created, updated, author_id, category_id, previous_content, is_encrypted";
for r in rows:
vals = f";{r[';id';]}, {esc(r[';avatar';])}, {esc(r[';title';])}, {esc(r[';content';])}, {int(r[';total_views';] or 0)}, {dt(r[';created';])}, {dt(r[';updated';])}, {int(r[';author_id';] or 1)}, {r[';category_id';] if r[';category_id';] else ';NULL';}, {esc(r[';previous_content';])}, {1 if r[';is_encrypted';] else 0}";
lines.append(f";INSERT OR REPLACE INTO tb_article ({cols}) VALUES ({vals});";)
with open(OUTPUT_FILE, ';w';, encoding=';utf-8';, newline=';\n';) as f:
f.write(';\n';.join(lines))
print(f";✅ SQL 已导出: {OUTPUT_FILE} ({len(rows)} 条记录)";)
if __name__ == ';__main__';: main()
第二步:一键导入工作流 (import_all.sh)
将此脚本保存为 import_all.sh,赋予权限 chmod +x import_all.sh。它负责清理旧数据、重置 ID 序列、并执行导入。
#!/bin/bash
# 配置路径
DB_PATH=";.wrangler/state/v3/d1/miniflare-D1DatabaseObject/cbead4c0cbcc98a701e95d7be92a1a77970bb0062aa5d52916341ad4d8399224.sqlite";
SQL_FILE=";articles_import.sql";
echo ";=== 1. 导出 MySQL 数据 ===";
python3 migrate_articles.py
echo ";=== 2. 清理并重置本地 D1 ===";
# 清空数据并重置 SQLite 自增序列,确保 ID 从 1 开始
sqlite3 ";$DB_PATH"; ";DELETE FROM tb_article; DELETE FROM sqlite_sequence WHERE name=';tb_article';;";
echo ";=== 3. 导入 SQL 到本地 ===";
sqlite3 ";$DB_PATH"; <; ";$SQL_FILE";
echo ";=== 4. 验证 ===";
COUNT=$(sqlite3 ";$DB_PATH"; ";SELECT count(*) FROM tb_article;";)
echo ";成功导入 $COUNT 篇文章。";
sqlite3 ";$DB_PATH"; ";SELECT id, title FROM tb_article ORDER BY id ASC LIMIT 5;";
第三步:远程部署同步脚本 (sync_remote.sh)
由于 Cloudflare 远程限制,导入需要切片:
#!/bin/bash
# 切割 SQL 文件,每 50 条一个文件
mkdir -p parts
split -l 50 articles_import.sql parts/part_
echo ";=== 开始远程同步 ===";
for f in parts/part_*; do
echo ";正在上传 $f ...";
npx wrangler d1 execute blog --remote --file=";$f";
done
rm -rf parts
echo ";=== 远程同步完成 ===";
总结操作指南
- 准备环境:在
~/node目录下放入migrate_articles.py,import_all.sh,sync_remote.sh。 - 执行本地:运行
./import_all.sh。这会更新你的本地.sqlite文件。如果此时在npx wrangler dev下看到数据不全,请刷新页面。 - 执行远程:当你确认本地数据完美后,执行
./sync_remote.sh,脚本会自动分块推送到 Cloudflare 线上 D1。
为什么这个流程完整?
- 兼容性:解决了 SQL 包含换行符会导致导入失败的问题 (
char(10)技巧)。 - 一致性:
sqlite_sequence的重置确保了 ID 逻辑与 MySQL 完全一致。 - 稳定性:远程分块 (
split) 绕过了 Cloudflare API 对单次 SQL 请求的大小限制。
现在你可以直接通过运行 ./import_all.sh 来完成全流程。
评论区(0 条)
发表评论⏳ 加载编辑器…