python确权工具(带草稿确权)

 

突然想起来一个确权的事情,于是写了一个区块链确权工具(wordpress上有插件的,但是启用才500),因此自己写了一个,原理是:

  1. 每次检测文章数据库表(wp_posts)并获取最新48小时的草稿、文章内容到json和sql文件
  2. 通过python计算sha256和md5.
  3. 最后写入文件并加上版权信息(域名、RSA)
  4. 最后使用ots stamp filename 获取比特币的区块认证(可审计)

如果担心有实时爬虫(除非名声很大,本站半年都没人看完全不担心)可以保存到草稿延迟一天发布。

后期可以添加附件(图片目录和附件目录)的哈希以及写入最终ots上去。

下面是python代码和json文件:

 
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import datetime as dt
import hashlib
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple

import pymysql


COPYRIGHT_BLOCK = """\
© Xiao. All rights reserved.
RSA Fingerprint: 35023C1E8DEB09435105647FFE38FE1D457E8CB0
Source: coremix.net
"""


def utcnow() -> dt.datetime:
    return dt.datetime.now(dt.timezone.utc)


def ensure_dir(p: Path) -> None:
    p.mkdir(parents=True, exist_ok=True)


def load_json(p: Path) -> Dict[str, Any]:
    with p.open("r", encoding="utf-8") as f:
        return json.load(f)


def save_text(p: Path, s: str) -> None:
    with p.open("w", encoding="utf-8") as f:
        f.write(s)


def save_json(p: Path, obj: Any) -> None:
    with p.open("w", encoding="utf-8") as f:
        json.dump(obj, f, ensure_ascii=False, indent=2, default=str)


def file_hashes(p: Path) -> Tuple[str, str]:
    md5 = hashlib.md5()
    sha = hashlib.sha256()
    with p.open("rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            md5.update(chunk)
            sha.update(chunk)
    return md5.hexdigest(), sha.hexdigest()


def mysql_connect(mysql_cfg: Dict[str, Any], db: str):
    return pymysql.connect(
        host=mysql_cfg["host"],
        port=int(mysql_cfg.get("port", 3306)),
        user=mysql_cfg["user"],
        password=mysql_cfg["password"],
        database=db,
        charset=mysql_cfg.get("charset", "utf8mb4"),
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=True,
    )


def detect_first_run(state_path: Path) -> bool:
    return not state_path.exists()


def load_state(state_path: Path) -> Dict[str, Any]:
    if not state_path.exists():
        return {}
    return load_json(state_path)


def save_state(state_path: Path, state: Dict[str, Any]) -> None:
    save_json(state_path, state)


def sql_escape(val: Any, conn) -> str:
    # Use pymysql escape where possible
    if val is None:
        return "NULL"
    if isinstance(val, (int, float)):
        return str(val)
    if isinstance(val, (dt.datetime, dt.date)):
        return "'" + str(val) + "'"
    s = str(val)
    return "'" + conn.escape_string(s) + "'"


def rows_to_insert_sql(conn, table: str, rows: List[Dict[str, Any]]) -> str:
    if not rows:
        return f"-- No rows for {table}\n"
    cols = list(rows[0].keys())
    col_list = ", ".join([f"`{c}`" for c in cols])
    lines = [f"-- Exported from {table}", "SET NAMES utf8mb4;", ""]
    for r in rows:
        values = ", ".join([sql_escape(r.get(c), conn) for c in cols])
        lines.append(f"INSERT INTO `{table}` ({col_list}) VALUES ({values});")
    lines.append("")
    return "\n".join(lines)


def fetch_posts_and_meta(conn, prefix: str, full: bool, hours: int) -> Dict[str, Any]:
    posts_table = f"{prefix}posts"
    postmeta_table = f"{prefix}postmeta"

    with conn.cursor() as cur:
        if full:
            # 全量:post/page/revision(你也可以按需加 attachment)
            cur.execute(
                f"""
                SELECT *
                FROM `{posts_table}`
                WHERE post_type IN ('post','page','revision')
                """
            )
        else:
            cutoff = utcnow() - dt.timedelta(hours=hours)
            # 使用 post_modified_gmt 更可靠(跨时区/跨机器)
            cur.execute(
                f"""
                SELECT *
                FROM `{posts_table}`
                WHERE post_type IN ('post','page','revision')
                  AND (
                    post_modified_gmt >= %s
                    OR post_date_gmt >= %s
                  )
                """,
                (cutoff.strftime("%Y-%m-%d %H:%M:%S"), cutoff.strftime("%Y-%m-%d %H:%M:%S")),
            )
        posts = cur.fetchall()

        post_ids = [p["ID"] for p in posts if "ID" in p]
        meta: List[Dict[str, Any]] = []
        if post_ids:
            # 批量拉取相关 postmeta(只取这些文章/修订的元数据)
            # IN 切片避免过长
            chunk = 1000
            for i in range(0, len(post_ids), chunk):
                part = post_ids[i : i + chunk]
                placeholders = ",".join(["%s"] * len(part))
                cur.execute(
                    f"""
                    SELECT *
                    FROM `{postmeta_table}`
                    WHERE post_id IN ({placeholders})
                    """,
                    part,
                )
                meta.extend(cur.fetchall())

    return {
        "posts_table": posts_table,
        "postmeta_table": postmeta_table,
        "posts": posts,
        "postmeta": meta,
        "exported_at_utc": utcnow().isoformat(),
        "mode": "full" if full else f"last_{hours}_hours",
    }


def run_ots_stamp(manifest_path: Path) -> Tuple[bool, str]:
    try:
        r = subprocess.run(
            ["/root/.local/bin/ots", "stamp", str(manifest_path)],
            check=False,
            capture_output=True,
            text=True,
        )
        ok = (r.returncode == 0)
        out = (r.stdout or "") + (r.stderr or "")
        return ok, out.strip()
    except FileNotFoundError:
        return False, "ots command not found (install OpenTimestamps client and ensure 'ots' in PATH)."


def main():
    ap = argparse.ArgumentParser(description="Export WP posts + revisions + meta for OTS proof, hash, and stamp.")
    ap.add_argument("--config", required=True, help="Path to sites.json")
    ap.add_argument("--hours", type=int, default=48, help="Incremental window in hours (default: 48)")
    ap.add_argument("--state", default=".wp_ots_state.json", help="State file path (default: .wp_ots_state.json)")
    ap.add_argument("--no-ots", action="store_true", help="Do not run 'ots stamp' automatically")
    args = ap.parse_args()

    cfg_path = Path(args.config).expanduser().resolve()
    cfg = load_json(cfg_path)

    out_dir = Path(cfg["export"]["output_dir"]).expanduser()
    ensure_dir(out_dir)

    state_path = Path(args.state).expanduser().resolve()
    state = load_state(state_path)
    first_run = detect_first_run(state_path) or state.get("initialized") is not True

    run_id = utcnow().strftime("%Y%m%dT%H%M%SZ")
    run_dir = out_dir / run_id
    ensure_dir(run_dir)

    manifest_lines: List[str] = []
    manifest_lines.append(f"# WordPress Content Proof Manifest")
    manifest_lines.append(f"- Run UTC: {utcnow().isoformat()}")
    manifest_lines.append(f"- Mode: {'FULL (first run)' if first_run else f'INCREMENTAL last {args.hours} hours'}")
    manifest_lines.append("")

    all_files: List[Path] = []

    mysql_cfg = cfg["mysql"]

    for site in cfg["sites"]:
        name = site["name"]
        db = site["db"]
        prefix = site.get("prefix", "wp_")
        site_url = site.get("site_url", "")

        site_dir = run_dir / name
        ensure_dir(site_dir)

        try:
            conn = mysql_connect(mysql_cfg, db)
        except Exception as e:
            manifest_lines.append(f"## {name}")
            manifest_lines.append(f"- DB: {db}")
            manifest_lines.append(f"- ERROR: failed to connect: {e}")
            manifest_lines.append("")
            continue

        try:
            data = fetch_posts_and_meta(conn, prefix=prefix, full=first_run, hours=args.hours)

            # JSON export (最适合确权内容核验:正文、修订、meta 原样保留)
            json_path = site_dir / f"{name}_{data['mode']}_{run_id}.json"
            payload = {
                "site": {
                    "name": name,
                    "db": db,
                    "prefix": prefix,
                    "site_url": site_url,
                },
                "export": {
                    "exported_at_utc": data["exported_at_utc"],
                    "mode": data["mode"],
                    "hours": None if first_run else args.hours,
                },
                "tables": {
                    data["posts_table"]: data["posts"],
                    data["postmeta_table"]: data["postmeta"],
                },
            }
            save_json(json_path, payload)

            # SQL export (可用于快速回灌/审计;注意:这是 INSERT,不含 DROP/CREATE)
            sql_path = site_dir / f"{name}_{data['mode']}_{run_id}.sql"
            sql_parts = []
            sql_parts.append(f"-- Site: {name}  DB: {db}  Prefix: {prefix}")
            sql_parts.append(f"-- Exported at UTC: {data['exported_at_utc']}")
            sql_parts.append("")
            sql_parts.append(rows_to_insert_sql(conn, data["posts_table"], data["posts"]))
            sql_parts.append(rows_to_insert_sql(conn, data["postmeta_table"], data["postmeta"]))
            save_text(sql_path, "\n".join(sql_parts))

            all_files.extend([json_path, sql_path])

            # Manifest section
            manifest_lines.append(f"## {name}")
            manifest_lines.append(f"- DB: `{db}`  Prefix: `{prefix}`  Site: `{site_url}`")
            manifest_lines.append(f"- Posts exported: `{len(data['posts'])}`")
            manifest_lines.append(f"- Postmeta exported: `{len(data['postmeta'])}`")
            manifest_lines.append("")
        finally:
            try:
                conn.close()
            except Exception:
                pass

    # Hash all export files
    manifest_lines.append("# Hashes")
    manifest_lines.append("")
    manifest_lines.append("| File | MD5 | SHA256 |")
    manifest_lines.append("|---|---|---|")

    for p in sorted(all_files):
        md5h, sha = file_hashes(p)
        rel = p.relative_to(run_dir)
        manifest_lines.append(f"| `{rel}` | `{md5h}` | `{sha}` |")

    manifest_lines.append("")
    manifest_lines.append("---")
    manifest_lines.append(COPYRIGHT_BLOCK.rstrip())
    manifest_lines.append("")

    manifest_name = cfg["export"].get("manifest_name", "manifest.md")
    manifest_path = run_dir / manifest_name
    save_text(manifest_path, "\n".join(manifest_lines))

    # update state
    state["initialized"] = True
    state["last_run_utc"] = utcnow().isoformat()
    state["last_run_id"] = run_id
    save_state(state_path, state)

    print(f"[OK] Exports written to: {run_dir}")
    print(f"[OK] Manifest: {manifest_path}")

    if not args.no_ots:
        ok, out = run_ots_stamp(manifest_path)
        if ok:
            print(f"[OK] ots stamp success: {manifest_path}.ots should be created")
        else:
            print(f"[WARN] ots stamp failed: {out}")

    return 0


if __name__ == "__main__":
    sys.exit(main())
# python3 /data/wp/wp_ots_proof.py   --config /data/wp/sites.json   --state /data/wp/.wp_ots_state.json   --hours 48

# python3 /data/wp/wp_ots_proof.py \
#   --config /data/wp/sites.json \
#   --state /data/wp/.wp_ots_state.json \
#   --hours 48

# crontab -e
# 添加这句:
# 20 3 * * * /usr/bin/python3 /data/wp/wp_ots_proof.py --config /data/wp/sites.json --state /data/wp/.wp_ots_state.json --hours 48 >> /data/wp/logs/wp_ots_proof.log 2>&1
# 验证:
# crontab -l
# 第二天检查:
# ls -lt /data/wp/exports | head
# tail -n 200 /data/wp/logs/wp_ots_proof.log
 

json文件:

 {
  "mysql": {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "password": "xxxxxx",
    "charset": "utf8mb4"
  },
  "sites": [
    {
      "name": "eait",
      "db": "wpdb",
      "prefix": "wp_",
      "site_url": "https://notes.eait.co"
    },
   
  ],
  "export": {
    "output_dir": "/data/wp",
    "manifest_name": "manifest.md"
  }
}

留下评论

您的邮箱地址不会被公开。 必填项已用 * 标注