突然想起来一个确权的事情,于是写了一个区块链确权工具(wordpress上有插件的,但是启用才500),因此自己写了一个,原理是:
- 每次检测文章数据库表(wp_posts)并获取最新48小时的草稿、文章内容到json和sql文件
- 通过python计算sha256和md5.
- 最后写入文件并加上版权信息(域名、RSA)
- 最后使用ots stamp filename 获取比特币的区块认证(可审计)
如果担心有实时爬虫(除非名声很大,本站半年都没人看完全不担心)可以保存到草稿延迟一天发布。
后期可以添加附件(图片目录和附件目录)的哈希以及写入最终ots上去。
下面是python代码和json文件:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import datetime as dt
import hashlib
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
import pymysql
COPYRIGHT_BLOCK = """\
© Xiao. All rights reserved.
RSA Fingerprint: 35023C1E8DEB09435105647FFE38FE1D457E8CB0
Source: coremix.net
"""
def utcnow() -> dt.datetime:
return dt.datetime.now(dt.timezone.utc)
def ensure_dir(p: Path) -> None:
p.mkdir(parents=True, exist_ok=True)
def load_json(p: Path) -> Dict[str, Any]:
with p.open("r", encoding="utf-8") as f:
return json.load(f)
def save_text(p: Path, s: str) -> None:
with p.open("w", encoding="utf-8") as f:
f.write(s)
def save_json(p: Path, obj: Any) -> None:
with p.open("w", encoding="utf-8") as f:
json.dump(obj, f, ensure_ascii=False, indent=2, default=str)
def file_hashes(p: Path) -> Tuple[str, str]:
md5 = hashlib.md5()
sha = hashlib.sha256()
with p.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
md5.update(chunk)
sha.update(chunk)
return md5.hexdigest(), sha.hexdigest()
def mysql_connect(mysql_cfg: Dict[str, Any], db: str):
return pymysql.connect(
host=mysql_cfg["host"],
port=int(mysql_cfg.get("port", 3306)),
user=mysql_cfg["user"],
password=mysql_cfg["password"],
database=db,
charset=mysql_cfg.get("charset", "utf8mb4"),
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
def detect_first_run(state_path: Path) -> bool:
return not state_path.exists()
def load_state(state_path: Path) -> Dict[str, Any]:
if not state_path.exists():
return {}
return load_json(state_path)
def save_state(state_path: Path, state: Dict[str, Any]) -> None:
save_json(state_path, state)
def sql_escape(val: Any, conn) -> str:
# Use pymysql escape where possible
if val is None:
return "NULL"
if isinstance(val, (int, float)):
return str(val)
if isinstance(val, (dt.datetime, dt.date)):
return "'" + str(val) + "'"
s = str(val)
return "'" + conn.escape_string(s) + "'"
def rows_to_insert_sql(conn, table: str, rows: List[Dict[str, Any]]) -> str:
if not rows:
return f"-- No rows for {table}\n"
cols = list(rows[0].keys())
col_list = ", ".join([f"`{c}`" for c in cols])
lines = [f"-- Exported from {table}", "SET NAMES utf8mb4;", ""]
for r in rows:
values = ", ".join([sql_escape(r.get(c), conn) for c in cols])
lines.append(f"INSERT INTO `{table}` ({col_list}) VALUES ({values});")
lines.append("")
return "\n".join(lines)
def fetch_posts_and_meta(conn, prefix: str, full: bool, hours: int) -> Dict[str, Any]:
posts_table = f"{prefix}posts"
postmeta_table = f"{prefix}postmeta"
with conn.cursor() as cur:
if full:
# 全量:post/page/revision(你也可以按需加 attachment)
cur.execute(
f"""
SELECT *
FROM `{posts_table}`
WHERE post_type IN ('post','page','revision')
"""
)
else:
cutoff = utcnow() - dt.timedelta(hours=hours)
# 使用 post_modified_gmt 更可靠(跨时区/跨机器)
cur.execute(
f"""
SELECT *
FROM `{posts_table}`
WHERE post_type IN ('post','page','revision')
AND (
post_modified_gmt >= %s
OR post_date_gmt >= %s
)
""",
(cutoff.strftime("%Y-%m-%d %H:%M:%S"), cutoff.strftime("%Y-%m-%d %H:%M:%S")),
)
posts = cur.fetchall()
post_ids = [p["ID"] for p in posts if "ID" in p]
meta: List[Dict[str, Any]] = []
if post_ids:
# 批量拉取相关 postmeta(只取这些文章/修订的元数据)
# IN 切片避免过长
chunk = 1000
for i in range(0, len(post_ids), chunk):
part = post_ids[i : i + chunk]
placeholders = ",".join(["%s"] * len(part))
cur.execute(
f"""
SELECT *
FROM `{postmeta_table}`
WHERE post_id IN ({placeholders})
""",
part,
)
meta.extend(cur.fetchall())
return {
"posts_table": posts_table,
"postmeta_table": postmeta_table,
"posts": posts,
"postmeta": meta,
"exported_at_utc": utcnow().isoformat(),
"mode": "full" if full else f"last_{hours}_hours",
}
def run_ots_stamp(manifest_path: Path) -> Tuple[bool, str]:
try:
r = subprocess.run(
["/root/.local/bin/ots", "stamp", str(manifest_path)],
check=False,
capture_output=True,
text=True,
)
ok = (r.returncode == 0)
out = (r.stdout or "") + (r.stderr or "")
return ok, out.strip()
except FileNotFoundError:
return False, "ots command not found (install OpenTimestamps client and ensure 'ots' in PATH)."
def main():
ap = argparse.ArgumentParser(description="Export WP posts + revisions + meta for OTS proof, hash, and stamp.")
ap.add_argument("--config", required=True, help="Path to sites.json")
ap.add_argument("--hours", type=int, default=48, help="Incremental window in hours (default: 48)")
ap.add_argument("--state", default=".wp_ots_state.json", help="State file path (default: .wp_ots_state.json)")
ap.add_argument("--no-ots", action="store_true", help="Do not run 'ots stamp' automatically")
args = ap.parse_args()
cfg_path = Path(args.config).expanduser().resolve()
cfg = load_json(cfg_path)
out_dir = Path(cfg["export"]["output_dir"]).expanduser()
ensure_dir(out_dir)
state_path = Path(args.state).expanduser().resolve()
state = load_state(state_path)
first_run = detect_first_run(state_path) or state.get("initialized") is not True
run_id = utcnow().strftime("%Y%m%dT%H%M%SZ")
run_dir = out_dir / run_id
ensure_dir(run_dir)
manifest_lines: List[str] = []
manifest_lines.append(f"# WordPress Content Proof Manifest")
manifest_lines.append(f"- Run UTC: {utcnow().isoformat()}")
manifest_lines.append(f"- Mode: {'FULL (first run)' if first_run else f'INCREMENTAL last {args.hours} hours'}")
manifest_lines.append("")
all_files: List[Path] = []
mysql_cfg = cfg["mysql"]
for site in cfg["sites"]:
name = site["name"]
db = site["db"]
prefix = site.get("prefix", "wp_")
site_url = site.get("site_url", "")
site_dir = run_dir / name
ensure_dir(site_dir)
try:
conn = mysql_connect(mysql_cfg, db)
except Exception as e:
manifest_lines.append(f"## {name}")
manifest_lines.append(f"- DB: {db}")
manifest_lines.append(f"- ERROR: failed to connect: {e}")
manifest_lines.append("")
continue
try:
data = fetch_posts_and_meta(conn, prefix=prefix, full=first_run, hours=args.hours)
# JSON export (最适合确权内容核验:正文、修订、meta 原样保留)
json_path = site_dir / f"{name}_{data['mode']}_{run_id}.json"
payload = {
"site": {
"name": name,
"db": db,
"prefix": prefix,
"site_url": site_url,
},
"export": {
"exported_at_utc": data["exported_at_utc"],
"mode": data["mode"],
"hours": None if first_run else args.hours,
},
"tables": {
data["posts_table"]: data["posts"],
data["postmeta_table"]: data["postmeta"],
},
}
save_json(json_path, payload)
# SQL export (可用于快速回灌/审计;注意:这是 INSERT,不含 DROP/CREATE)
sql_path = site_dir / f"{name}_{data['mode']}_{run_id}.sql"
sql_parts = []
sql_parts.append(f"-- Site: {name} DB: {db} Prefix: {prefix}")
sql_parts.append(f"-- Exported at UTC: {data['exported_at_utc']}")
sql_parts.append("")
sql_parts.append(rows_to_insert_sql(conn, data["posts_table"], data["posts"]))
sql_parts.append(rows_to_insert_sql(conn, data["postmeta_table"], data["postmeta"]))
save_text(sql_path, "\n".join(sql_parts))
all_files.extend([json_path, sql_path])
# Manifest section
manifest_lines.append(f"## {name}")
manifest_lines.append(f"- DB: `{db}` Prefix: `{prefix}` Site: `{site_url}`")
manifest_lines.append(f"- Posts exported: `{len(data['posts'])}`")
manifest_lines.append(f"- Postmeta exported: `{len(data['postmeta'])}`")
manifest_lines.append("")
finally:
try:
conn.close()
except Exception:
pass
# Hash all export files
manifest_lines.append("# Hashes")
manifest_lines.append("")
manifest_lines.append("| File | MD5 | SHA256 |")
manifest_lines.append("|---|---|---|")
for p in sorted(all_files):
md5h, sha = file_hashes(p)
rel = p.relative_to(run_dir)
manifest_lines.append(f"| `{rel}` | `{md5h}` | `{sha}` |")
manifest_lines.append("")
manifest_lines.append("---")
manifest_lines.append(COPYRIGHT_BLOCK.rstrip())
manifest_lines.append("")
manifest_name = cfg["export"].get("manifest_name", "manifest.md")
manifest_path = run_dir / manifest_name
save_text(manifest_path, "\n".join(manifest_lines))
# update state
state["initialized"] = True
state["last_run_utc"] = utcnow().isoformat()
state["last_run_id"] = run_id
save_state(state_path, state)
print(f"[OK] Exports written to: {run_dir}")
print(f"[OK] Manifest: {manifest_path}")
if not args.no_ots:
ok, out = run_ots_stamp(manifest_path)
if ok:
print(f"[OK] ots stamp success: {manifest_path}.ots should be created")
else:
print(f"[WARN] ots stamp failed: {out}")
return 0
if __name__ == "__main__":
sys.exit(main())
# python3 /data/wp/wp_ots_proof.py --config /data/wp/sites.json --state /data/wp/.wp_ots_state.json --hours 48
# python3 /data/wp/wp_ots_proof.py \
# --config /data/wp/sites.json \
# --state /data/wp/.wp_ots_state.json \
# --hours 48
# crontab -e
# 添加这句:
# 20 3 * * * /usr/bin/python3 /data/wp/wp_ots_proof.py --config /data/wp/sites.json --state /data/wp/.wp_ots_state.json --hours 48 >> /data/wp/logs/wp_ots_proof.log 2>&1
# 验证:
# crontab -l
# 第二天检查:
# ls -lt /data/wp/exports | head
# tail -n 200 /data/wp/logs/wp_ots_proof.log
json文件:
{
"mysql": {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "xxxxxx",
"charset": "utf8mb4"
},
"sites": [
{
"name": "eait",
"db": "wpdb",
"prefix": "wp_",
"site_url": "https://notes.eait.co"
},
],
"export": {
"output_dir": "/data/wp",
"manifest_name": "manifest.md"
}
}