这套方案涵盖了数据从 MySQL 搬迁到本地/远程 D1 的全自动处理逻辑。

第一步:更新核心导出脚本 (migrate_articles.py)

使用这个版本,它能完美处理文章中的复杂换行符(如代码块、长文章),确保生成的 SQL 不会断行。

#!/usr/bin/env python3
import pymysql, sys, os
from datetime import datetime

# 数据库配置
MYSQL_CONFIG = {
    'host': 'localhost', 'port': 3306, 'user': 'yys', 
    'password': '123456', 'db': 'yys', 'charset': 'utf8mb4'
}
OUTPUT_FILE = 'articles_import.sql'

def esc(val):
    if val is None: return 'NULL'
    # 核心技巧:将换行符转为 SQLite 函数,防止 SQL 语句在文本中被断开
    s = str(val).replace("'", "''")
    s = s.replace('\r\n', "' || char(10) || '").replace('\n', "' || char(10) || '")
    return "'" + s + "'"

def dt(val):
    if val is None: return 'NULL'
    if isinstance(val, datetime): return f"'{val.strftime('%Y-%m-%d %H:%M:%S')}'"
    return "'" + str(val).split('.')[0] + "'"

def main():
    try:
        conn = pymysql.connect(**MYSQL_CONFIG)
        cur = conn.cursor(pymysql.cursors.DictCursor)
        cur.execute("SELECT id, avatar, title, content, total_views, created, updated, author_id, category_id, previous_content, is_encrypted FROM tb_article ORDER BY id")
        rows = cur.fetchall()
        conn.close()
    except Exception as e:
        print(f"❌ MySQL连接失败: {e}"); sys.exit(1)

    lines = ["PRAGMA journal_mode=WAL;"]
    cols = "id, avatar, title, content, total_views, created, updated, author_id, category_id, previous_content, is_encrypted"
    
    for r in rows:
        vals = f"{r['id']}, {esc(r['avatar'])}, {esc(r['title'])}, {esc(r['content'])}, {int(r['total_views'] or 0)}, {dt(r['created'])}, {dt(r['updated'])}, {int(r['author_id'] or 1)}, {r['category_id'] if r['category_id'] else 'NULL'}, {esc(r['previous_content'])}, {1 if r['is_encrypted'] else 0}"
        lines.append(f"INSERT OR REPLACE INTO tb_article ({cols}) VALUES ({vals});")

    with open(OUTPUT_FILE, 'w', encoding='utf-8', newline='\n') as f:
        f.write('\n'.join(lines))
    print(f"✅ SQL 已导出: {OUTPUT_FILE} ({len(rows)} 条记录)")

if __name__ == '__main__': main()

第二步:一键导入工作流 (import_all.sh)

将此脚本保存为 import_all.sh,赋予权限 chmod +x import_all.sh。它负责清理旧数据、重置 ID 序列、并执行导入。

#!/bin/bash
# 配置路径
DB_PATH=".wrangler/state/v3/d1/miniflare-D1DatabaseObject/cbead4c0cbcc98a701e95d7be92a1a77970bb0062aa5d52916341ad4d8399224.sqlite"
SQL_FILE="articles_import.sql"

echo "=== 1. 导出 MySQL 数据 ==="
python3 migrate_articles.py

echo "=== 2. 清理并重置本地 D1 ==="
# 清空数据并重置 SQLite 自增序列,确保 ID 从 1 开始
sqlite3 "$DB_PATH" "DELETE FROM tb_article; DELETE FROM sqlite_sequence WHERE name='tb_article';"

echo "=== 3. 导入 SQL 到本地 ==="
sqlite3 "$DB_PATH" < "$SQL_FILE"

echo "=== 4. 验证 ==="
COUNT=$(sqlite3 "$DB_PATH" "SELECT count(*) FROM tb_article;")
echo "成功导入 $COUNT 篇文章。"
sqlite3 "$DB_PATH" "SELECT id, title FROM tb_article ORDER BY id ASC LIMIT 5;"

第三步:远程部署同步脚本 (sync_remote.sh)

由于 Cloudflare 远程限制,导入需要切片:

#!/bin/bash
# 切割 SQL 文件,每 50 条一个文件
mkdir -p parts
split -l 50 articles_import.sql parts/part_

echo "=== 开始远程同步 ==="
for f in parts/part_*; do
  echo "正在上传 $f ..."
  npx wrangler d1 execute blog --remote --file="$f"
done
rm -rf parts
echo "=== 远程同步完成 ==="

总结操作指南

  1. 准备环境:在 ~/node 目录下放入 migrate_articles.py, import_all.sh, sync_remote.sh
  2. 执行本地:运行 ./import_all.sh。这会更新你的本地 .sqlite 文件。如果此时在 npx wrangler dev 下看到数据不全,请刷新页面。
  3. 执行远程:当你确认本地数据完美后,执行 ./sync_remote.sh,脚本会自动分块推送到 Cloudflare 线上 D1。

为什么这个流程完整?

  • 兼容性:解决了 SQL 包含换行符会导致导入失败的问题 (char(10) 技巧)。
  • 一致性sqlite_sequence 的重置确保了 ID 逻辑与 MySQL 完全一致。
  • 稳定性:远程分块 (split) 绕过了 Cloudflare API 对单次 SQL 请求的大小限制。

现在你可以直接通过运行 ./import_all.sh 来完成全流程。