#!/usr/bin/env python3 """ Wiki ↔ Raw 三向同步工具 功能: - 检测 raw/ 下文件变化(新增/修改/删除) - 维护 manifest.json 状态映射 - 检测 orphan entity/concept(仅报告,不删除) 用法: python tools/sync.py --check 预览变化(不执行) python tools/sync.py --sync 执行同步(更新 manifest) python tools/sync.py --pending 显示待处理文件列表 python tools/sync.py --json JSON 行输出(供程序消费) python tools/sync.py --rebuild 从 manifest 重建 wiki/index(兜底) manifest.json 格式: { "version": 1, "updated_at": "ISO timestamp", "files": { "relative/path/to/file.md": { "hash": "sha256", "modified": "ISO timestamp", "slug": "wiki-source-slug", "source_path": "wiki/sources/slug.md", "ingested": true } } } """ import json import hashlib import argparse from pathlib import Path from datetime import datetime, timezone REPO_ROOT = Path(__file__).parent.parent.resolve() WIKI_DIR = REPO_ROOT / "wiki" MANIFEST_FILE = Path(__file__).parent / "manifest.json" # ─── 工具函数 ─────────────────────────────────────────────── def green(text): return f"\033[92m{text}\033[0m" def yellow(text): return f"\033[93m{text}\033[0m" def red(text): return f"\033[91m{text}\033[0m" def dim(text): return f"\033[2m{text}\033[0m" def bold(text): return f"\033[1m{text}\033[0m" def log(msg, style="normal"): prefixes = { "normal": " ", "info": " ℹ ", "success": " ✓ ", "warn": " ⚠ ", "error": " ✗ ", "section": "\n── ", } print(f"{prefixes.get(style, ' ')}{msg}") def sha256_file(path: Path) -> str: h = hashlib.sha256() h.update(path.read_bytes()) return h.hexdigest()[:16] def iso_now(): return datetime.now(timezone.utc).isoformat() def load_manifest() -> dict: if MANIFEST_FILE.exists(): try: return json.loads(MANIFEST_FILE.read_text(encoding="utf-8")) except (json.JSONDecodeError, IOError): pass return {"version": 1, "updated_at": iso_now(), "files": {}} def save_manifest(manifest: dict): manifest["updated_at"] = iso_now() MANIFEST_FILE.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8") def scan_raw() -> dict[str, dict]: """返回 {relative_path: {hash, modified, size}}""" raw_dir = REPO_ROOT / "raw" result = {} if not raw_dir.exists(): return result for p in raw_dir.rglob("*.md"): if p.is_file() and not p.name.startswith("."): rel = str(p.relative_to(REPO_ROOT)) stat = p.stat() result[rel] = { "hash": sha256_file(p), "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat(), "size": stat.st_size, "abs_path": str(p), } return result def build_slug_from_path(rel_path: str) -> str: """从相对路径生成 slug(尽量保留中文,kebab-case)""" name = Path(rel_path).stem name = name.replace(" ", "-").replace("/", "-").replace("\\", "-") name = "".join(c if c.isalnum() or c in ("-", "_", "·") else "-" for c in name) name = name.strip("-") return name or "untitled" def find_orphan_entity_concept(manifest: dict) -> tuple[list, list]: """检测未被任何 source page 引用的 entity 和 concept""" import re wikilink_pattern = re.compile(r"\[\[([^\]]+)\]\]") sources_dir = WIKI_DIR / "sources" referenced_entities = set() referenced_concepts = set() if sources_dir.exists(): for src in sources_dir.glob("*.md"): content = src.read_text(encoding="utf-8") for link in wikilink_pattern.findall(content): name = link.strip() if name.startswith("entities/"): referenced_entities.add(Path(name).stem) elif name.startswith("concepts/"): referenced_concepts.add(Path(name).stem) elif "/" not in name: referenced_entities.add(name) referenced_concepts.add(name) orphan_entities = [] entities_dir = WIKI_DIR / "entities" if entities_dir.exists(): for f in entities_dir.glob("*.md"): if f.stem not in referenced_entities: orphan_entities.append(f.name) orphan_concepts = [] concepts_dir = WIKI_DIR / "concepts" if concepts_dir.exists(): for f in concepts_dir.glob("*.md"): if f.stem not in referenced_concepts: orphan_concepts.append(f.name) return orphan_entities, orphan_concepts # ─── 核心同步逻辑 ─────────────────────────────────────────────── def check_changes(manifest: dict, raw_files: dict) -> dict: """对比 manifest 和实际 raw 文件,返回变化""" changes = {"new": [], "updated": [], "deleted": [], "unchanged": []} manifest_files = manifest.get("files", {}) for rel_path, info in raw_files.items(): if rel_path not in manifest_files: changes["new"].append({"rel_path": rel_path, **info}) elif info["hash"] != manifest_files[rel_path]["hash"]: changes["updated"].append({ "rel_path": rel_path, "old_hash": manifest_files[rel_path]["hash"], **info, }) else: changes["unchanged"].append(rel_path) for rel_path in manifest_files: abs_path = REPO_ROOT / rel_path if not abs_path.exists(): changes["deleted"].append({ "rel_path": rel_path, "slug": manifest_files[rel_path].get("slug", build_slug_from_path(rel_path)), "source_path": manifest_files[rel_path].get("source_path"), }) return changes def run_sync(dry_run: bool = False, verbose: bool = False, json_mode: bool = False): """执行同步并尽量保持输出精简。 - 默认(非 verbose、非 json)只会输出一行变化摘要 + manifest 更新成功提示。 - verbose=True 会打印每个新增/更新/删除的文件列表(保留旧行为)。 - json_mode=True 保持原有的机器友好 JSON 流输出。 """ manifest = load_manifest() raw_files = scan_raw() changes = check_changes(manifest, raw_files) new = changes["new"] updated = changes["updated"] deleted = changes["deleted"] total_changes = len(new) + len(updated) + len(deleted) if total_changes == 0: if json_mode: print(json.dumps({"event": "sync_complete", "summary": {"pending": 0, "deleted": 0, "manifest_entries": len(manifest.get("files", {}))}})) else: log("No changes detected — wiki is up to date.", "success") return # 非 JSON:简短摘要(默认)或详细列表(verbose) if not json_mode: log(f"Changes detected: +{len(new)} ~{len(updated)} -{len(deleted)}", "info") if verbose: if new: print("\nNew Files:") for f in new: print(f" {f['rel_path']}") if updated: print("\nUpdated Files:") for f in updated: old = f.get("old_hash") print(f" {f['rel_path']}" + (f" (was {old})" if old else "")) if deleted: print("\nDeleted Files:") for f in deleted: print(f" {f['rel_path']}") if dry_run: log("Dry-run complete. Run with --sync to apply.", "warn") return # Apply changes (保持原有 manifest 更新逻辑,但抑制逐文件日志,除非 json_mode 或 verbose) updated_manifest = manifest.copy() updated_manifest["files"] = manifest.get("files", {}).copy() pending_files = [] for f in new: rel_path = f["rel_path"] slug = build_slug_from_path(rel_path) if json_mode: print(json.dumps({"event": "pending", "rel_path": rel_path, "slug": slug, "action": "new"})) pending_files.append({"rel_path": rel_path, "abs_path": f["abs_path"], "slug": slug, "action": "new"}) updated_manifest["files"][rel_path] = { "hash": f["hash"], "modified": f.get("modified"), "slug": slug, "source_path": f"wiki/sources/{slug}.md", "ingested": False, "ingested_at": None, } for f in updated: rel_path = f["rel_path"] old_entry = manifest["files"].get(rel_path, {}) slug = old_entry.get("slug") or build_slug_from_path(rel_path) if json_mode: print(json.dumps({"event": "pending", "rel_path": rel_path, "slug": slug, "action": "updated"})) pending_files.append({"rel_path": rel_path, "abs_path": f["abs_path"], "slug": slug, "action": "updated"}) updated_manifest["files"][rel_path] = { **old_entry, "hash": f["hash"], "modified": f.get("modified"), "ingested": False, "ingested_at": None, } deleted_files = [] for f in deleted: rel_path = f["rel_path"] source_path = f.get("source_path") if rel_path in updated_manifest["files"]: del updated_manifest["files"][rel_path] deleted_files.append(rel_path) if json_mode and deleted: print(json.dumps({"event": "deleted_detected", "rel_path": rel_path})) save_manifest(updated_manifest) if json_mode: print(json.dumps({ "event": "sync_complete", "summary": { "pending": len(pending_files), "deleted": len(deleted_files), "manifest_entries": len(updated_manifest["files"]), }, "pending_files": pending_files, "deleted_files": deleted_files, })) else: log(f"manifest.json updated ({len(updated_manifest['files'])} entries)", "success") if verbose: log(f"Pending files for ingestion: {len(pending_files)}", "info") # 简短的 orphan 报告(仅在 verbose 模式下列出详情) orphan_entities, orphan_concepts = find_orphan_entity_concept(updated_manifest) if not json_mode: if orphan_entities or orphan_concepts: if verbose: print(f"\n{bold('--- Orphan Report (kept as requested) ---')}") if orphan_entities: print(f"Orphan Entities ({len(orphan_entities)}):") for e in sorted(orphan_entities): print(f" {e}") if orphan_concepts: print(f"Orphan Concepts ({len(orphan_concepts)}):") for c in sorted(orphan_concepts): print(f" {c}") else: log(f"Orphan entities: {len(orphan_entities)}; Orphan concepts: {len(orphan_concepts)}", "info") else: if verbose: log("No orphan entity/concept detected.", "success") if not json_mode: print("\nDone.") def run_check(): """只预览变化,不执行(输出为标准 Markdown)""" manifest = load_manifest() raw_files = scan_raw() changes = check_changes(manifest, raw_files) total = len(changes["new"]) + len(changes["updated"]) + len(changes["deleted"]) # Markdown header and summary print("# Wiki Sync Check\n") print(f"- Raw files: {len(raw_files)}") print(f"- Manifest entries: {len(manifest.get('files', {}))}") print(f"- New: {len(changes['new'])}") print(f"- Updated: {len(changes['updated'])}") print(f"- Deleted: {len(changes['deleted'])}\n") if total > 0: if changes["new"]: print("## New Files") for f in changes["new"]: print(f"- {f['rel_path']}") print() if changes["updated"]: print("## Updated Files") for f in changes["updated"]: print(f"- {f['rel_path']} (was {f['old_hash']}, now {f['hash']})") print() if changes["deleted"]: print("## Deleted Files") for f in changes["deleted"]: print(f"- {f['rel_path']}") print() else: print("No changes — wiki is in sync.\n") def run_rebuild(): """从 manifest 重建 wiki/index.md(兜底方案)。 改进点: - 优先使用 manifest 中记录的 source_path(如果存在且文件真实存在), 其次尝试 wiki/sources/.md;再尝试在 wiki/sources 下做不区分大小写或 归一化后的匹配(减少命名差异导致的断链)。 - 更健壮地解析 YAML frontmatter 中的 title 字段(支持缺失结束符的容错), 并在没有 title 时回退到第一个 Markdown 标题或 slug。 - 在无法找到 source 文件时,保留原 slug 并在 index 中标注 (source missing), 以便人工排查。 """ manifest = load_manifest() print(f"\n{bold('=== Wiki Rebuild from Manifest')}\n") print(f" Manifest entries: {len(manifest.get('files', {}))}") print(f" Rebuilding index.md ...\n") index_lines = [ "# Wiki Index\n", "\n## Overview\n", "- [Overview](overview.md) — living synthesis\n", "\n## Sources\n", ] files = manifest.get("files", {}) sorted_files = sorted(files.items(), key=lambda x: x[1].get("modified", ""), reverse=True) import re sources_dir = WIKI_DIR / "sources" def normalize(s: str) -> str: # 用于不严格匹配文件名:移除非字母数字并小写 return ''.join(ch for ch in s.lower() if ch.isalnum()) def find_source_file(slug: str, info: dict, rel_path: str): # 尝试按 manifest.source_path 优先匹配 sp = info.get('source_path') if sp: p = REPO_ROOT / sp if p.exists(): return p # 如果是相对于 wiki 的路径(如 "sources/foo.md"),尝试 WIKI_DIR 下 p2 = WIKI_DIR / sp if p2.exists(): return p2 # 常规位置:wiki/sources/.md candidate = sources_dir / f"{slug}.md" if candidate.exists(): return candidate # 尝试去除多余后缀(如 manifest 中误带了 ".md") if slug.endswith('.md'): short = slug[:-3] c2 = sources_dir / f"{short}.md" if c2.exists(): return c2 # 不区分大小写或归一化匹配 norm_slug = normalize(slug) if sources_dir.exists(): for p in sources_dir.glob('*.md'): if p.stem.lower() == slug.lower(): return p if normalize(p.stem) == norm_slug: return p # 最后尝试根据 manifest 中的 rel_path(原始 raw 文件)去推测 source 文件名 # 有些仓库会把源文件直接放在 wiki/sources 下并采用不同的 slug 规则 try: # rel_path 示例: 'raw/dir/name.md' -> use name as candidate name = Path(rel_path).stem p3 = sources_dir / f"{name}.md" if p3.exists(): return p3 except Exception: pass return None for rel_path, info in sorted_files: slug = info.get("slug") or build_slug_from_path(rel_path) # 清理误带后缀 if slug.endswith('.md'): slug = slug[:-3] src_file = find_source_file(slug, info, rel_path) # 从 manifest 的 modified 字段提取日期前缀(格式 YYYY-MM-DD) modified_raw = info.get("modified", "") date_prefix = "" if modified_raw: try: date_prefix = f"[{modified_raw[:10]}] " except Exception: date_prefix = "" title = None if src_file and src_file.exists(): content = src_file.read_text(encoding="utf-8") lines = content.splitlines() # 处理 YAML frontmatter(容错:若缺少结束 '---' 则忽略 frontmatter) if lines and lines[0].strip() == '---': end_idx = None for i in range(1, min(len(lines), 500)): if lines[i].strip() == '---': end_idx = i break if end_idx: frontmatter = '\n'.join(lines[1:end_idx]) # 支持 title: "..." 或 title: > 的情况(简单提取首行) m = re.search(r'^\s*title\s*:\s*(?:["\']?(.*?)["\']?|>\s*\n\s*(.*))\s*$', frontmatter, flags=re.MULTILINE) if m: title = (m.group(1) or m.group(2) or '').strip() # 回退:第一个以 # 开头的行 if not title and lines: for line in lines: s = line.strip() if s.startswith('#'): title = s.lstrip('#').strip() break if not title: title = slug index_lines.append(f"- {date_prefix}[{title}](sources/{src_file.name})\n") else: # 如果没有找到 source 文件,但 manifest 里有 source_path 文本,则将其展示出来,便于排查 sp = info.get('source_path') if sp: index_lines.append(f"- {date_prefix}[{slug}](sources/{slug}.md) — (expected: {sp} — source missing)\n") else: index_lines.append(f"- {date_prefix}[{slug}](sources/{slug}.md) — (source missing)\n") # Entities 索引 index_lines.append("\n## Entities\n") entities_dir = WIKI_DIR / "entities" if entities_dir.exists(): entity_files = sorted(entities_dir.glob("*.md"), key=lambda p: p.stem.lower()) for ef in entity_files: index_lines.append(f"- [{ef.stem}](entities/{ef.name})\n") # Concepts 索引 index_lines.append("\n## Concepts\n") concepts_dir = WIKI_DIR / "concepts" if concepts_dir.exists(): concept_files = sorted(concepts_dir.glob("*.md"), key=lambda p: p.stem.lower()) for cf in concept_files: index_lines.append(f"- [{cf.stem}](concepts/{cf.name})\n") index_lines.append("\n## Syntheses\n") index_file = WIKI_DIR / "index.md" index_file.write_text("".join(index_lines), encoding="utf-8") print(f" {green('✓')} index.md rebuilt with {len(sorted_files)} sources") # orphan 检测使用 manifest(重建后也可根据最新 manifest 检测) orphan_entities, orphan_concepts = find_orphan_entity_concept(manifest) if orphan_entities: print(f" {dim('?')} Orphan entities: {len(orphan_entities)}") if orphan_concepts: print(f" {dim('?')} Orphan concepts: {len(orphan_concepts)}") print(f"\nDone.") # ─── 管理接口:reslug(批量规范化 manifest slug) ────────────────────────────────────── def _compute_normalized_slug(rel_path: str) -> str: """根据规则从 raw 文件路径计算规范化 slug。 规则: a. 中文字符直接保留(不转拼音) b. ASCII 大写字母转小写 c. 空格和特殊字符(引号、斜杠、问号、冒号、逗号、句号、感叹号、括号、 全角符号等)替换为 `-` d. 连续多个 `-` 压缩为单个 `-`,并去除首尾 `-` """ import re stem = Path(rel_path).stem # 转小写(仅影响 ASCII 字母,中文不变) result = stem.lower() # 将特殊字符替换为 `-` # 保留:中文字符、ASCII 字母数字、点(在版本号如 0.65.0 中保留)、下划线 result = re.sub( r'[ \t\r\n' r'\'"' # 单双引号 r'//\\\\' # 斜杠(全角/半角/反斜杠) r'??' # 问号 r'::' # 冒号 r',,' # 逗号 r'。\.' # 句号(保留版本号小数点后面会被压缩) r'!!' # 感叹号 r'()()' # 括号 r'【】\[\]' # 方括号 r'《》<>' # 书名号/尖括号 r'、' # 顿号 r'—–\-' # 破折号/连字符(统一重新处理) r'|&@#%\^*+=~`' r';;' # 分号 r']+', '-', result, ) # 压缩连续 `-` 为单个 result = re.sub(r'-{2,}', '-', result) # 去除首尾 `-` result = result.strip('-') return result or 'untitled' def run_reslug(target_rel_path: str = None, dry_run: bool = False): """批量(或单条)规范化 manifest 中的 slug / source_path。 参数: target_rel_path: 指定单个 raw 相对路径;为 None 则处理全部条目。 dry_run: 若为 True,只打印预览,不写入 manifest。 """ manifest = load_manifest() files = manifest.get("files", {}) if target_rel_path: targets = [(target_rel_path, files[target_rel_path])] if target_rel_path in files else [] if not targets: print(red(f" ✗ Not found in manifest: {target_rel_path}")) return else: targets = list(files.items()) changed = [] skipped = 0 for rel_path, info in targets: new_slug = _compute_normalized_slug(rel_path) old_slug = info.get("slug", "") new_source_path = f"wiki/sources/{new_slug}.md" old_source_path = info.get("source_path", "") if new_slug == old_slug and new_source_path == old_source_path: skipped += 1 continue changed.append({ "rel_path": rel_path, "old_slug": old_slug, "new_slug": new_slug, "old_source_path": old_source_path, "new_source_path": new_source_path, }) print(f"\n{bold('=== Reslug Preview' if dry_run else '=== Reslug')}\n") print(f" Total entries scanned : {len(targets)}") print(f" Unchanged (skipped) : {skipped}") print(f" To update : {len(changed)}\n") if not changed: print(f" {green('✓')} All slugs already normalized.\n") return for item in changed: print(f" {dim(item['rel_path'])}") if item['old_slug'] != item['new_slug']: print(f" slug : {yellow(item['old_slug'])} → {green(item['new_slug'])}") if item['old_source_path'] != item['new_source_path']: print(f" src : {yellow(item['old_source_path'])} → {green(item['new_source_path'])}") print() if dry_run: print(f" {yellow('⚠')} Dry-run — manifest NOT updated. Re-run without --dry-run to apply.\n") return # 应用变更 for item in changed: entry = files[item["rel_path"]] entry["slug"] = item["new_slug"] entry["source_path"] = item["new_source_path"] save_manifest(manifest) print(f" {green('✓')} manifest.json updated ({len(changed)} entries changed).\n") # ─── 管理接口:mark_ingested(供摄取流程调用) ───────────────────────────────────────── def mark_ingested(rel_path: str, slug: str, json_mode: bool = False): """标记某个 raw 文件为已摄取(更新 manifest 条目)。 行为: - rel_path 必须已存在于 manifest(即曾被 --sync 扫描过),否则报错退出。 - slug 必须显式传入,否则报错退出。 - source_path 由 slug 自动推断为 wiki/sources/.md。 - modified 强制更新为 raw 文件的实际 mtime(文件不存在时保留旧值并警告)。 - ingested 设为 True,ingested_at 设为当前 UTC 时间戳。 参数: rel_path : 相对于仓库根目录的路径,例如 "raw/dir/name.md" (必填) slug : wiki slug,例如 "my-article" (必填) json_mode : 若为 True,输出单行 JSON,便于脚本消费 """ if not slug or not slug.strip(): msg = f"--slug is required for --mark-ingested" if json_mode: print(json.dumps({"event": "error", "message": msg})) else: print(red(f" ✗ {msg}")) raise SystemExit(1) manifest = load_manifest() files = manifest.get("files", {}) if rel_path not in files: msg = f"rel_path not found in manifest (run --sync first): {rel_path}" if json_mode: print(json.dumps({"event": "error", "message": msg})) else: print(red(f" ✗ {msg}")) raise SystemExit(1) entry = files[rel_path] # 更新 slug 和 source_path entry["slug"] = slug.strip() entry["source_path"] = f"wiki/sources/{slug.strip()}.md" # 强制更新 modified(基于 raw 文件实际 mtime) abs_path = REPO_ROOT / rel_path if abs_path.exists(): entry["hash"] = sha256_file(abs_path) entry["modified"] = datetime.fromtimestamp(abs_path.stat().st_mtime, tz=timezone.utc).isoformat() else: if not json_mode: print(yellow(f" ⚠ Raw file not found, modified timestamp not updated: {rel_path}")) # 标记已摄取 entry["ingested"] = True entry["ingested_at"] = iso_now() entry.pop("error", None) files[rel_path] = entry manifest["files"] = files save_manifest(manifest) if json_mode: print(json.dumps({ "event": "mark_ingested", "rel_path": rel_path, "slug": entry["slug"], "source_path": entry["source_path"], "modified": entry.get("modified"), "ingested_at": entry["ingested_at"], })) else: print(f" {green('✓')} Marked ingested: {rel_path}") print(f" slug : {entry['slug']}") print(f" source_path : {entry['source_path']}") print(f" modified : {entry.get('modified', '(unchanged)')}") print(f" ingested_at : {entry['ingested_at']}") # ─── CLI 入口 ─────────────────────────────────────────────── if __name__ == "__main__": parser = argparse.ArgumentParser( description="Wiki ↔ Raw 三向同步工具", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--check", action="store_true", help="预览变化,不执行同步", ) parser.add_argument( "--sync", action="store_true", help="执行完整同步(新增/修改/删除 + orphan 检测)", ) parser.add_argument( "--rebuild", action="store_true", help="从 manifest 重建 wiki/index.md(兜底方案)", ) parser.add_argument( "--reset-failed", action="store_true", help="重置所有 failed 的 ingest 状态(让它们重新待处理)", ) parser.add_argument( "--pending", action="store_true", help="列出所有待摄取的 pending 文件", ) parser.add_argument( "--verbose", "-v", action="store_true", help="详细输出", ) parser.add_argument( "--json", action="store_true", help="JSON 行输出模式(供调用方解析)", ) parser.add_argument( "--mark-ingested", metavar="REL_PATH", nargs=1, help="标记单个 raw 文件为已摄取:传入相对路径(例如 'raw/dir/file.md')。必须配合 --slug 使用。", ) parser.add_argument( "--slug", help="与 --mark-ingested 配合(必填):指定 wiki slug(例如 my-article)", ) parser.add_argument( "--mark-json", action="store_true", help="与 --mark-ingested 配合:以 JSON 单行输出 mark 结果", ) parser.add_argument( "--limit", type=int, default=None, help="与 --pending --json 配合:限制返回条目数(默认返回全部)", ) parser.add_argument( "--reslug", action="store_true", help="批量规范化 manifest 中的 slug/source_path(中文保留,ASCII 特殊字符转 -,大写转小写,压缩连续 -)", ) parser.add_argument( "--reslug-target", metavar="REL_PATH", help="与 --reslug 配合:只处理指定的 raw 文件(例如 'raw/dir/file.md')", ) parser.add_argument( "--dry-run", action="store_true", help="与 --reslug 配合:只预览变更,不写入 manifest", ) args = parser.parse_args() if args.mark_ingested: rel = args.mark_ingested[0] mark_ingested(rel, slug=args.slug, json_mode=args.mark_json) elif args.reslug: run_reslug(target_rel_path=args.reslug_target, dry_run=args.dry_run) elif args.rebuild: run_rebuild() elif args.pending: manifest = load_manifest() pending = [(k, v) for k, v in manifest["files"].items() if not v.get("ingested")] if args.json: total = len(pending) # 未指定 limit -> 返回全部(files 列表) if args.limit is None: payload = { "event": "pending_list", "count": total, "files": [ { "rel_path": k, "slug": v.get("slug", build_slug_from_path(k)), "source_path": v.get("source_path"), "modified": v.get("modified"), "hash": v.get("hash"), } for k, v in pending ], } elif args.limit <= 0: payload = {"event": "pending_list", "count": total, "files": []} elif args.limit == 1: first = pending[0] if pending else (None, None) if first[0] is None: payload = {"event": "pending_list", "count": 0, "file": None} else: k, v = first payload = { "event": "pending_list", "count": total, "file": { "rel_path": k, "slug": v.get("slug", build_slug_from_path(k)), "source_path": v.get("source_path"), "modified": v.get("modified"), "hash": v.get("hash"), }, } else: # 返回前 N 条 as files array n = min(args.limit, total) payload = { "event": "pending_list", "count": total, "files": [ { "rel_path": k, "slug": v.get("slug", build_slug_from_path(k)), "source_path": v.get("source_path"), "modified": v.get("modified"), "hash": v.get("hash"), } for k, v in pending[:n] ], } print(json.dumps(payload)) else: # 控制台输出也支持 --limit total = len(pending) n = total if args.limit is None else max(0, args.limit) print(f"=== Pending Ingest Files ({total}) ===\n") if n == 0: print(" (no items to show)") else: for i, (path, info) in enumerate(pending[:n], 1): print(f"{i:3}. {path}") elif args.reset_failed: manifest = load_manifest() reset_count = 0 for k, v in manifest["files"].items(): if v.get("error"): v["ingested"] = False v.pop("error", None) v.pop("ingested_at", None) reset_count += 1 if reset_count > 0: save_manifest(manifest) print(f"Reset {reset_count} failed entries to pending.") else: print("No failed entries found.") elif args.check: run_check() elif args.sync: run_sync(dry_run=False, verbose=args.verbose, json_mode=args.json) else: parser.print_help() print("\n示例:") print(" python tools/sync.py --check # 预览变化") print(" python tools/sync.py --sync # 执行同步") print(" python tools/sync.py --sync -v # 详细模式") print(" python tools/sync.py --rebuild # 重建 index")