#!/usr/bin/env python3 """ Wiki ↔ Raw 三向同步工具 功能: - 检测 raw/ 下文件变化(新增/修改/删除) - 维护 manifest.json 状态映射 - 检测 orphan entity/concept(仅报告,不删除) 用法: python tools/sync.py --check 预览变化(不执行) python tools/sync.py --sync 执行同步(更新 manifest) python tools/sync.py --pending 显示待处理文件列表 python tools/sync.py --json JSON 行输出(供程序消费) python tools/sync.py --rebuild 从 manifest 重建 wiki/index(兜底) manifest.json 格式: { "version": 1, "updated_at": "ISO timestamp", "files": { "relative/path/to/file.md": { "hash": "sha256", "modified": "ISO timestamp", "slug": "wiki-source-slug", "source_path": "wiki/sources/slug.md", "ingested": true } } } """ import json import hashlib import argparse from pathlib import Path from datetime import datetime, timezone REPO_ROOT = Path(__file__).parent.parent.resolve() WIKI_DIR = REPO_ROOT / "wiki" MANIFEST_FILE = Path(__file__).parent / "manifest.json" # ─── 工具函数 ─────────────────────────────────────────────── def green(text): return f"\033[92m{text}\033[0m" def yellow(text): return f"\033[93m{text}\033[0m" def red(text): return f"\033[91m{text}\033[0m" def dim(text): return f"\033[2m{text}\033[0m" def bold(text): return f"\033[1m{text}\033[0m" def log(msg, style="normal"): prefixes = { "normal": " ", "info": " ℹ ", "success": " ✓ ", "warn": " ⚠ ", "error": " ✗ ", "section": "\n── ", } print(f"{prefixes.get(style, ' ')}{msg}") def sha256_file(path: Path) -> str: h = hashlib.sha256() h.update(path.read_bytes()) return h.hexdigest()[:16] def iso_now(): return datetime.now(timezone.utc).isoformat() def load_manifest() -> dict: if MANIFEST_FILE.exists(): try: return json.loads(MANIFEST_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, IOError): pass return {"version": 1, "updated_at": iso_now(), "files": {}} def save_manifest(manifest: dict): manifest["updated_at"] = iso_now() MANIFEST_FILE.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8") def scan_raw() -> dict[str, dict]: """返回 {relative_path: {hash, modified, size}}""" raw_dir = REPO_ROOT / "raw" result = {} if not raw_dir.exists(): return result for p in raw_dir.rglob("*.md"): if p.is_file() and not p.name.startswith("."): rel = str(p.relative_to(REPO_ROOT)) stat = p.stat() result[rel] = { "hash": sha256_file(p), "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), "size": stat.st_size, "abs_path": str(p), } return result def build_slug_from_path(rel_path: str) -> str: """从相对路径生成 slug(尽量保留中文,kebab-case)""" name = Path(rel_path).stem name = name.replace(" ", "-").replace("/", "-").replace("\\", "-") name = "".join(c if c.isalnum() or c in ("-", "_", "·") else "-" for c in name) name = name.strip("-") return name or "untitled" def find_orphan_entity_concept(manifest: dict) -> tuple[list, list]: """检测未被任何 source page 引用的 entity 和 concept""" import re wikilink_pattern = re.compile(r"\[\[([^\]]+)\]\]") sources_dir = WIKI_DIR / "sources" referenced_entities = set() referenced_concepts = set() if sources_dir.exists(): for src in sources_dir.glob("*.md"): content = src.read_text(encoding="utf-8") for link in wikilink_pattern.findall(content): name = link.strip() if name.startswith("entities/"): referenced_entities.add(Path(name).stem) elif name.startswith("concepts/"): referenced_concepts.add(Path(name).stem) elif "/" not in name: referenced_entities.add(name) referenced_concepts.add(name) orphan_entities = [] entities_dir = WIKI_DIR / "entities" if entities_dir.exists(): for f in entities_dir.glob("*.md"): if f.stem not in referenced_entities: orphan_entities.append(f.name) orphan_concepts = [] concepts_dir = WIKI_DIR / "concepts" if concepts_dir.exists(): for f in concepts_dir.glob("*.md"): if f.stem not in referenced_concepts: orphan_concepts.append(f.name) return orphan_entities, orphan_concepts # ─── 核心同步逻辑 ─────────────────────────────────────────────── def check_changes(manifest: dict, raw_files: dict) -> dict: """对比 manifest 和实际 raw 文件,返回变化""" changes = {"new": [], "updated": [], "deleted": [], "unchanged": []} manifest_files = manifest.get("files", {}) for rel_path, info in raw_files.items(): if rel_path not in manifest_files: changes["new"].append({"rel_path": rel_path, **info}) elif info["hash"] != manifest_files[rel_path]["hash"]: changes["updated"].append({ "rel_path": rel_path, "old_hash": manifest_files[rel_path]["hash"], **info, }) else: changes["unchanged"].append(rel_path) for rel_path in manifest_files: abs_path = REPO_ROOT / rel_path if not abs_path.exists(): changes["deleted"].append({ "rel_path": rel_path, "slug": manifest_files[rel_path].get("slug", build_slug_from_path(rel_path)), "source_path": manifest_files[rel_path].get("source_path"), }) return changes def run_sync(dry_run: bool = False, verbose: bool = False, json_mode: bool = False): """执行同步并尽量保持输出精简。 - 默认(非 verbose、非 json)只会输出一行变化摘要 + manifest 更新成功提示。 - verbose=True 会打印每个新增/更新/删除的文件列表(保留旧行为)。 - json_mode=True 保持原有的机器友好 JSON 流输出。 """ manifest = load_manifest() raw_files = scan_raw() changes = check_changes(manifest, raw_files) new = changes["new"] updated = changes["updated"] deleted = changes["deleted"] total_changes = len(new) + len(updated) + len(deleted) if total_changes == 0: if json_mode: print(json.dumps({"event": "sync_complete", "summary": {"pending": 0, "deleted": 0, "manifest_entries": len(manifest.get("files", {}))}})) else: log("No changes detected — wiki is up to date.", "success") return # 非 JSON:简短摘要(默认)或详细列表(verbose) if not json_mode: log(f"Changes detected: +{len(new)} ~{len(updated)} -{len(deleted)}", "info") if verbose: if new: print("\nNew Files:") for f in new: print(f" {f['rel_path']}") if updated: print("\nUpdated Files:") for f in updated: old = f.get("old_hash") print(f" {f['rel_path']}" + (f" (was {old})" if old else "")) if deleted: print("\nDeleted Files:") for f in deleted: print(f" {f['rel_path']}") if dry_run: log("Dry-run complete. Run with --sync to apply.", "warn") return # Apply changes (保持原有 manifest 更新逻辑,但抑制逐文件日志,除非 json_mode 或 verbose) updated_manifest = manifest.copy() updated_manifest["files"] = manifest.get("files", {}).copy() pending_files = [] for f in new: rel_path = f["rel_path"] slug = build_slug_from_path(rel_path) if json_mode: print(json.dumps({"event": "pending", "rel_path": rel_path, "slug": slug, "action": "new"})) pending_files.append({"rel_path": rel_path, "abs_path": f["abs_path"], "slug": slug, "action": "new"}) updated_manifest["files"][rel_path] = { "hash": f["hash"], "modified": f.get("modified"), "slug": slug, "source_path": f"wiki/sources/{slug}.md", "ingested": False, "ingested_at": None, } for f in updated: rel_path = f["rel_path"] old_entry = manifest["files"].get(rel_path, {}) slug = old_entry.get("slug") or build_slug_from_path(rel_path) if json_mode: print(json.dumps({"event": "pending", "rel_path": rel_path, "slug": slug, "action": "updated"})) pending_files.append({"rel_path": rel_path, "abs_path": f["abs_path"], "slug": slug, "action": "updated"}) updated_manifest["files"][rel_path] = { **old_entry, "hash": f["hash"], "modified": f.get("modified"), "ingested": False, "ingested_at": None, } deleted_files = [] for f in deleted: rel_path = f["rel_path"] source_path = f.get("source_path") if rel_path in updated_manifest["files"]: del updated_manifest["files"][rel_path] deleted_files.append(rel_path) if json_mode and deleted: print(json.dumps({"event": "deleted_detected", "rel_path": rel_path})) save_manifest(updated_manifest) if json_mode: print(json.dumps({ "event": "sync_complete", "summary": { "pending": len(pending_files), "deleted": len(deleted_files), "manifest_entries": len(updated_manifest["files"]), }, "pending_files": pending_files, "deleted_files": deleted_files, })) else: log(f"manifest.json updated ({len(updated_manifest['files'])} entries)", "success") if verbose: log(f"Pending files for ingestion: {len(pending_files)}", "info") # 简短的 orphan 报告(仅在 verbose 模式下列出详情) orphan_entities, orphan_concepts = find_orphan_entity_concept(updated_manifest) if not json_mode: if orphan_entities or orphan_concepts: if verbose: print(f"\n{bold('--- Orphan Report (kept as requested) ---')}") if orphan_entities: print(f"Orphan Entities ({len(orphan_entities)}):") for e in sorted(orphan_entities): print(f" {e}") if orphan_concepts: print(f"Orphan Concepts ({len(orphan_concepts)}):") for c in sorted(orphan_concepts): print(f" {c}") else: log(f"Orphan entities: {len(orphan_entities)}; Orphan concepts: {len(orphan_concepts)}", "info") else: if verbose: log("No orphan entity/concept detected.", "success") if not json_mode: print("\nDone.") def run_check(): """只预览变化,不执行(输出为标准 Markdown)""" manifest = load_manifest() raw_files = scan_raw() changes = check_changes(manifest, raw_files) total = len(changes["new"]) + len(changes["updated"]) + len(changes["deleted"]) # Markdown header and summary print("# Wiki Sync Check\n") print(f"- Raw files: {len(raw_files)}") print(f"- Manifest entries: {len(manifest.get('files', {}))}") print(f"- New: {len(changes['new'])}") print(f"- Updated: {len(changes['updated'])}") print(f"- Deleted: {len(changes['deleted'])}\n") if total > 0: if changes["new"]: print("## New Files") for f in changes["new"]: print(f"- {f['rel_path']}") print() if changes["updated"]: print("## Updated Files") for f in changes["updated"]: print(f"- {f['rel_path']} (was {f['old_hash']}, now {f['hash']})") print() if changes["deleted"]: print("## Deleted Files") for f in changes["deleted"]: print(f"- {f['rel_path']}") print() else: print("No changes — wiki is in sync.\n") def run_rebuild(): """从 manifest 重建 wiki/index.md(兜底方案)。 改进点: - 优先使用 manifest 中记录的 source_path(如果存在且文件真实存在), 其次尝试 wiki/sources/.md;再尝试在 wiki/sources 下做不区分大小写或 归一化后的匹配(减少命名差异导致的断链)。 - 更健壮地解析 YAML frontmatter 中的 title 字段(支持缺失结束符的容错), 并在没有 title 时回退到第一个 Markdown 标题或 slug。 - 在无法找到 source 文件时,保留原 slug 并在 index 中标注 (source missing), 以便人工排查。 """ manifest = load_manifest() print(f"\n{bold('=== Wiki Rebuild from Manifest')}\n") print(f" Manifest entries: {len(manifest.get('files', {}))}") print(f" Rebuilding index.md ...\n") index_lines = [ "# Wiki Index\n", "\n## Overview\n", "- [Overview](overview.md) — living synthesis\n", "\n## Sources\n", ] files = manifest.get("files", {}) sorted_files = sorted(files.items(), key=lambda x: x[1].get("modified", ""), reverse=True) import re sources_dir = WIKI_DIR / "sources" def normalize(s: str) -> str: # 用于不严格匹配文件名:移除非字母数字并小写 return ''.join(ch for ch in s.lower() if ch.isalnum()) def find_source_file(slug: str, info: dict, rel_path: str): # 尝试按 manifest.source_path 优先匹配 sp = info.get('source_path') if sp: p = REPO_ROOT / sp if p.exists(): return p # 如果是相对于 wiki 的路径(如 "sources/foo.md"),尝试 WIKI_DIR 下 p2 = WIKI_DIR / sp if p2.exists(): return p2 # 常规位置:wiki/sources/.md candidate = sources_dir / f"{slug}.md" if candidate.exists(): return candidate # 尝试去除多余后缀(如 manifest 中误带了 ".md") if slug.endswith('.md'): short = slug[:-3] c2 = sources_dir / f"{short}.md" if c2.exists(): return c2 # 不区分大小写或归一化匹配 norm_slug = normalize(slug) if sources_dir.exists(): for p in sources_dir.glob('*.md'): if p.stem.lower() == slug.lower(): return p if normalize(p.stem) == norm_slug: return p # 最后尝试根据 manifest 中的 rel_path(原始 raw 文件)去推测 source 文件名 # 有些仓库会把源文件直接放在 wiki/sources 下并采用不同的 slug 规则 try: # rel_path 示例: 'raw/dir/name.md' -> use name as candidate name = Path(rel_path).stem p3 = sources_dir / f"{name}.md" if p3.exists(): return p3 except Exception: pass return None for rel_path, info in sorted_files: slug = info.get("slug") or build_slug_from_path(rel_path) # 清理误带后缀 if slug.endswith('.md'): slug = slug[:-3] src_file = find_source_file(slug, info, rel_path) # 从 manifest 的 modified 字段提取日期前缀(格式 YYYY-MM-DD) modified_raw = info.get("modified", "") date_prefix = "" if modified_raw: try: date_prefix = f"[{modified_raw[:10]}] " except Exception: date_prefix = "" title = None if src_file and src_file.exists(): content = src_file.read_text(encoding="utf-8") lines = content.splitlines() # 处理 YAML frontmatter(容错:若缺少结束 '---' 则忽略 frontmatter) if lines and lines[0].strip() == '---': end_idx = None for i in range(1, min(len(lines), 500)): if lines[i].strip() == '---': end_idx = i break if end_idx: frontmatter = '\n'.join(lines[1:end_idx]) # 支持 title: "..." 或 title: > 的情况(简单提取首行) m = re.search(r'^\s*title\s*:\s*(?:["\']?(.*?)["\']?|>\s*\n\s*(.*))\s*$', frontmatter, flags=re.MULTILINE) if m: title = (m.group(1) or m.group(2) or '').strip() # 回退:第一个以 # 开头的行 if not title and lines: for line in lines: s = line.strip() if s.startswith('#'): title = s.lstrip('#').strip() break if not title: title = slug index_lines.append(f"- {date_prefix}[{title}](sources/{src_file.name})\n") else: # 如果没有找到 source 文件,但 manifest 里有 source_path 文本,则将其展示出来,便于排查 sp = info.get('source_path') if sp: index_lines.append(f"- {date_prefix}[{slug}](sources/{slug}.md) — (expected: {sp} — source missing)\n") else: index_lines.append(f"- {date_prefix}[{slug}](sources/{slug}.md) — (source missing)\n") # Entities 索引 index_lines.append("\n## Entities\n") entities_dir = WIKI_DIR / "entities" if entities_dir.exists(): entity_files = sorted(entities_dir.glob("*.md"), key=lambda p: p.stem.lower()) for ef in entity_files: index_lines.append(f"- [{ef.stem}](entities/{ef.name})\n") # Concepts 索引 index_lines.append("\n## Concepts\n") concepts_dir = WIKI_DIR / "concepts" if concepts_dir.exists(): concept_files = sorted(concepts_dir.glob("*.md"), key=lambda p: p.stem.lower()) for cf in concept_files: index_lines.append(f"- [{cf.stem}](concepts/{cf.name})\n") index_lines.append("\n## Syntheses\n") index_file = WIKI_DIR / "index.md" index_file.write_text("".join(index_lines), encoding="utf-8") print(f" {green('✓')} index.md rebuilt with {len(sorted_files)} sources") # orphan 检测使用 manifest(重建后也可根据最新 manifest 检测) orphan_entities, orphan_concepts = find_orphan_entity_concept(manifest) if orphan_entities: print(f" {dim('?')} Orphan entities: {len(orphan_entities)}") if orphan_concepts: print(f" {dim('?')} Orphan concepts: {len(orphan_concepts)}") print(f"\nDone.") # ─── 管理接口:mark_ingested(供摄取流程调用) ───────────────────────────────────────── def mark_ingested(rel_path: str, slug: str = None, source_path: str = None, recalc_hash: bool = True, json_mode: bool = False): """标记某个 raw 文件为已摄取(更新 manifest 条目)。 参数: rel_path: 相对于仓库根目录的路径,例如 "raw/dir/name.md" (必填) slug: 可选的 wiki slug(例如 "my-article");如果传入会设置 entry["slug"] source_path: 可选的 wiki 来源路径(例如 "wiki/sources/my-article.md") recalc_hash: 如果为 True,会基于当前文件重新计算 hash/modified json_mode: 如果为 True,输出为单行 JSON,便于脚本消费 """ manifest = load_manifest() files = manifest.setdefault("files", {}) entry = files.get(rel_path, {}) abs_path = REPO_ROOT / rel_path if recalc_hash and abs_path.exists(): entry["hash"] = sha256_file(abs_path) entry["modified"] = datetime.fromtimestamp(abs_path.stat().st_mtime, tz=timezone.utc).isoformat() if slug: entry["slug"] = slug if source_path: entry["source_path"] = source_path else: entry.setdefault("slug", build_slug_from_path(rel_path)) entry.setdefault("source_path", f"wiki/sources/{entry.get('slug')}.md") entry["ingested"] = True entry["ingested_at"] = iso_now() entry.pop("error", None) files[rel_path] = entry save_manifest(manifest) if json_mode: print(json.dumps({ "event": "mark_ingested", "rel_path": rel_path, "slug": entry.get("slug"), "source_path": entry.get("source_path"), "ingested_at": entry.get("ingested_at"), })) else: print(f"Marked ingested: {rel_path} -> {entry.get('source_path')}") # ─── CLI 入口 ─────────────────────────────────────────────── if __name__ == "__main__": parser = argparse.ArgumentParser( description="Wiki ↔ Raw 三向同步工具", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--check", action="store_true", help="预览变化,不执行同步", ) parser.add_argument( "--sync", action="store_true", help="执行完整同步(新增/修改/删除 + orphan 检测)", ) parser.add_argument( "--rebuild", action="store_true", help="从 manifest 重建 wiki/index.md(兜底方案)", ) parser.add_argument( "--reset-failed", action="store_true", help="重置所有 failed 的 ingest 状态(让它们重新待处理)", ) parser.add_argument( "--pending", action="store_true", help="列出所有待摄取的 pending 文件", ) parser.add_argument( "--verbose", "-v", action="store_true", help="详细输出", ) parser.add_argument( "--json", action="store_true", help="JSON 行输出模式(供调用方解析)", ) parser.add_argument( "--mark-ingested", metavar=("REL_PATH"), nargs=1, help="标记单个 raw 文件为已摄取:传入相对路径(例如 'raw/dir/file.md')。", ) parser.add_argument( "--slug", help="与 --mark-ingested 配合:指定生成的 wiki slug(例如 my-article)", ) parser.add_argument( "--source-path", help="与 --mark-ingested 配合:指定 wiki source 路径(例如 wiki/sources/my-article.md)", ) parser.add_argument( "--no-recalc-hash", action="store_true", help="与 --mark-ingested 配合:不要重新计算文件 hash/modified(默认会重新计算)", ) parser.add_argument( "--mark-json", action="store_true", help="与 --mark-ingested 配合:以 JSON 单行输出 mark 结果", ) parser.add_argument( "--limit", type=int, default=None, help="与 --pending --json 配合:限制返回条目数(默认返回全部)", ) args = parser.parse_args() if args.mark_ingested: rel = args.mark_ingested[0] mark_ingested(rel, slug=args.slug, source_path=args.source_path, recalc_hash=not args.no_recalc_hash, json_mode=args.mark_json) elif args.rebuild: run_rebuild() elif args.pending: manifest = load_manifest() pending = [(k, v) for k, v in manifest["files"].items() if not v.get("ingested")] if args.json: total = len(pending) # 未指定 limit -> 返回全部(files 列表) if args.limit is None: payload = { "event": "pending_list", "count": total, "files": [ { "rel_path": k, "slug": v.get("slug", build_slug_from_path(k)), "source_path": v.get("source_path"), "modified": v.get("modified"), "hash": v.get("hash"), } for k, v in pending ], } elif args.limit <= 0: payload = {"event": "pending_list", "count": total, "files": []} elif args.limit == 1: first = pending[0] if pending else (None, None) if first[0] is None: payload = {"event": "pending_list", "count": 0, "file": None} else: k, v = first payload = { "event": "pending_list", "count": total, "file": { "rel_path": k, "slug": v.get("slug", build_slug_from_path(k)), "source_path": v.get("source_path"), "modified": v.get("modified"), "hash": v.get("hash"), }, } else: # 返回前 N 条 as files array n = min(args.limit, total) payload = { "event": "pending_list", "count": total, "files": [ { "rel_path": k, "slug": v.get("slug", build_slug_from_path(k)), "source_path": v.get("source_path"), "modified": v.get("modified"), "hash": v.get("hash"), } for k, v in pending[:n] ], } print(json.dumps(payload)) else: # 控制台输出也支持 --limit total = len(pending) n = total if args.limit is None else max(0, args.limit) print(f"=== Pending Ingest Files ({total}) ===\n") if n == 0: print(" (no items to show)") else: for i, (path, info) in enumerate(pending[:n], 1): print(f"{i:3}. {path}") elif args.reset_failed: manifest = load_manifest() reset_count = 0 for k, v in manifest["files"].items(): if v.get("error"): v["ingested"] = False v.pop("error", None) v.pop("ingested_at", None) reset_count += 1 if reset_count > 0: save_manifest(manifest) print(f"Reset {reset_count} failed entries to pending.") else: print("No failed entries found.") elif args.check: run_check() elif args.sync: run_sync(dry_run=False, verbose=args.verbose, json_mode=args.json) else: parser.print_help() print("\n示例:") print(" python tools/sync.py --check # 预览变化") print(" python tools/sync.py --sync # 执行同步") print(" python tools/sync.py --sync -v # 详细模式") print(" python tools/sync.py --rebuild # 重建 index")