这套方案涵盖了数据从 MySQL 搬迁到本地/远程 D1 的全自动处理逻辑。

第一步:更新核心导出脚本 (migrate_articles.py)

使用这个版本,它能完美处理文章中的复杂换行符(如代码块、长文章),确保生成的 SQL 不会断行。

#!/usr/bin/env python3
import pymysql, sys, os
from datetime import datetime

# 数据库配置
MYSQL_CONFIG = {
    ';host';: ';localhost';, ';port';: 3306, ';user';: ';yys';, 
    ';password';: ';123456';, ';db';: ';yys';, ';charset';: ';utf8mb4';
}
OUTPUT_FILE = ';articles_import.sql';

def esc(val):
    if val is None: return ';NULL';
    # 核心技巧:将换行符转为 SQLite 函数,防止 SQL 语句在文本中被断开
    s = str(val).replace(";';";, ";';';";)
    s = s.replace(';\r\n';, ";'; || char(10) || ';";).replace(';\n';, ";'; || char(10) || ';";)
    return ";';"; + s + ";';";

def dt(val):
    if val is None: return ';NULL';
    if isinstance(val, datetime): return f";';{val.strftime(';%Y-%m-%d %H:%M:%S';)}';";
    return ";';"; + str(val).split(';.';)[0] + ";';";

def main():
    try:
        conn = pymysql.connect(**MYSQL_CONFIG)
        cur = conn.cursor(pymysql.cursors.DictCursor)
        cur.execute(";SELECT id, avatar, title, content, total_views, created, updated, author_id, category_id, previous_content, is_encrypted FROM tb_article ORDER BY id";)
        rows = cur.fetchall()
        conn.close()
    except Exception as e:
        print(f";❌ MySQL连接失败: {e}";); sys.exit(1)

    lines = [";PRAGMA journal_mode=WAL;";]
    cols = ";id, avatar, title, content, total_views, created, updated, author_id, category_id, previous_content, is_encrypted";
    
    for r in rows:
        vals = f";{r[';id';]}, {esc(r[';avatar';])}, {esc(r[';title';])}, {esc(r[';content';])}, {int(r[';total_views';] or 0)}, {dt(r[';created';])}, {dt(r[';updated';])}, {int(r[';author_id';] or 1)}, {r[';category_id';] if r[';category_id';] else ';NULL';}, {esc(r[';previous_content';])}, {1 if r[';is_encrypted';] else 0}";
        lines.append(f";INSERT OR REPLACE INTO tb_article ({cols}) VALUES ({vals});";)

    with open(OUTPUT_FILE, ';w';, encoding=';utf-8';, newline=';\n';) as f:
        f.write(';\n';.join(lines))
    print(f";✅ SQL 已导出: {OUTPUT_FILE} ({len(rows)} 条记录)";)

if __name__ == ';__main__';: main()

第二步:一键导入工作流 (import_all.sh)

将此脚本保存为 import_all.sh,赋予权限 chmod +x import_all.sh。它负责清理旧数据、重置 ID 序列、并执行导入。

#!/bin/bash
# 配置路径
DB_PATH=";.wrangler/state/v3/d1/miniflare-D1DatabaseObject/cbead4c0cbcc98a701e95d7be92a1a77970bb0062aa5d52916341ad4d8399224.sqlite";
SQL_FILE=";articles_import.sql";

echo ";=== 1. 导出 MySQL 数据 ===";
python3 migrate_articles.py

echo ";=== 2. 清理并重置本地 D1 ===";
# 清空数据并重置 SQLite 自增序列,确保 ID 从 1 开始
sqlite3 ";$DB_PATH"; ";DELETE FROM tb_article; DELETE FROM sqlite_sequence WHERE name=';tb_article';;";

echo ";=== 3. 导入 SQL 到本地 ===";
sqlite3 ";$DB_PATH"; <; ";$SQL_FILE";

echo ";=== 4. 验证 ===";
COUNT=$(sqlite3 ";$DB_PATH"; ";SELECT count(*) FROM tb_article;";)
echo ";成功导入 $COUNT 篇文章。";
sqlite3 ";$DB_PATH"; ";SELECT id, title FROM tb_article ORDER BY id ASC LIMIT 5;";

第三步:远程部署同步脚本 (sync_remote.sh)

由于 Cloudflare 远程限制,导入需要切片:

#!/bin/bash
# 切割 SQL 文件,每 50 条一个文件
mkdir -p parts
split -l 50 articles_import.sql parts/part_

echo ";=== 开始远程同步 ===";
for f in parts/part_*; do
  echo ";正在上传 $f ...";
  npx wrangler d1 execute blog --remote --file=";$f";
done
rm -rf parts
echo ";=== 远程同步完成 ===";

总结操作指南

  1. 准备环境:在 ~/node 目录下放入 migrate_articles.py, import_all.sh, sync_remote.sh
  2. 执行本地:运行 ./import_all.sh。这会更新你的本地 .sqlite 文件。如果此时在 npx wrangler dev 下看到数据不全,请刷新页面。
  3. 执行远程:当你确认本地数据完美后,执行 ./sync_remote.sh,脚本会自动分块推送到 Cloudflare 线上 D1。

为什么这个流程完整?

  • 兼容性:解决了 SQL 包含换行符会导致导入失败的问题 (char(10) 技巧)。
  • 一致性sqlite_sequence 的重置确保了 ID 逻辑与 MySQL 完全一致。
  • 稳定性:远程分块 (split) 绕过了 Cloudflare API 对单次 SQL 请求的大小限制。

现在你可以直接通过运行 ./import_all.sh 来完成全流程。