diff --git a/crawlers/oshwhub/crawler.py b/crawlers/oshwhub/crawler.py index 9e87a64..9921910 100644 --- a/crawlers/oshwhub/crawler.py +++ b/crawlers/oshwhub/crawler.py @@ -1246,13 +1246,16 @@ def main(argv: list[str] | None = None) -> int: # --backfill-source: standalone path that scans existing project dirs and # only fetches source. No listing/HTML/attachment work. if args.backfill_source: - return _run_backfill_source(args.out, only_uuids=args.uuids) + return _run_backfill_source( + args.out, only_uuids=args.uuids, concurrency=args.concurrency + ) if args.backfill_pro_source: return _run_backfill_pro_source( args.out, only_uuids=args.uuids, cookie_path=args.pro_cookie, max_source_mb=args.max_source_mb, + concurrency=args.concurrency, ) with make_client() as client: @@ -1341,49 +1344,128 @@ def main(argv: list[str] | None = None) -> int: return 0 -def _run_backfill_source(out_root: Path, only_uuids: str | None = None) -> int: - """Walk existing per-project dirs in out_root and fetch source.json into each. +def _run_backfill_concurrent( + targets: list[Path], + fetch_one, + *, + concurrency: int, + label: str, + size_unit: str = "MB", +) -> None: + """Shared worker dispatch for std/pro backfill. - Updates metadata.json in-place to add source_format / source_documents / editor_version. + `fetch_one(proj_dir, meta) -> (status, src_meta_or_msg)` where status is one of: + "ok" — `src_meta` is the fetch_*_source dict + "oversize" — Pro-only; ProjectOversizeError already recorded by caller + "fail" — `src_meta_or_msg` is the human-readable error string + + Threads share fetch_one's captured client (httpx.Client is sync-thread-safe). + Print + per-target metadata writes are independent (each project owns its + metadata.json), so no lock needed there. Only stdout ordering is serialized. """ + print_lock = threading.Lock() + + def _do_one(idx: int, proj_dir: Path) -> None: + uuid = proj_dir.name + meta_path = proj_dir / "metadata.json" + try: + meta = json.loads(meta_path.read_text(encoding="utf-8")) + except Exception as e: # noqa: BLE001 + with print_lock: + print(f"[{idx}/{len(targets)}] meta read FAIL {uuid}: {e}", file=sys.stderr) + return + title = meta.get("title", "?") + status, payload = fetch_one(proj_dir, meta) + with print_lock: + head = f"[{idx}/{len(targets)}] {uuid}" + if status == "ok": + src_meta = payload + meta["source_format"] = src_meta["source_format"] + meta["source_path"] = src_meta["source_path"] + meta["source_documents"] = src_meta["source_documents"] + if src_meta.get("editor_version"): + meta["editor_version"] = src_meta["editor_version"] + meta_path.write_text( + json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" + ) + docs = src_meta["source_documents"] + total = sum(d["size"] for d in docs) + if size_unit == "KB": + sz = f"{total / 1024:.1f} KB" + else: + sz = f"{total / 1024 / 1024:.1f} MB" + print( + f"{head} OK ({title[:30]}): {len(docs)} docs, {sz}, " + f"editor={src_meta.get('editor_version')}" + ) + elif status == "oversize": + print(f"{head} SKIPPED oversize ({title[:30]}): {payload}", file=sys.stderr) + else: + print(f"{head} FAIL ({title[:30]}): {payload}", file=sys.stderr) + + print(f"Backfill {label} for {len(targets)} projects (concurrency={concurrency})") + if concurrency <= 1: + for i, d in enumerate(targets, 1): + _do_one(i, d) + else: + from concurrent.futures import ThreadPoolExecutor, as_completed + with ThreadPoolExecutor(max_workers=concurrency) as pool: + futs = [pool.submit(_do_one, i, d) for i, d in enumerate(targets, 1)] + for f in as_completed(futs): + f.result() + + +def _discover_backfill_targets( + out_root: Path, + only_uuids: str | None, + *, + require_origin_pro: bool = False, +) -> list[Path]: + """Walk out_root for per-project dirs that have metadata.json and match + the filter. If `only_uuids` is given, that's the authoritative whitelist; + otherwise (Pro only) we fall back to filtering by metadata.raw_fields.origin.""" wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None targets: list[Path] = [] for d in sorted(out_root.iterdir()): - if not d.is_dir(): + if not d.is_dir() or not (d / "metadata.json").exists(): continue - meta_path = d / "metadata.json" - if not meta_path.exists(): - continue - if wanted and d.name not in wanted: - continue - targets.append(d) - - print(f"Backfill source for {len(targets)} projects under {out_root}") - src_client = make_source_client() - try: - for i, proj_dir in enumerate(targets, 1): - uuid = proj_dir.name - meta_path = proj_dir / "metadata.json" - meta = json.loads(meta_path.read_text(encoding="utf-8")) - print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})") - try: - src_meta = fetch_std_source(src_client, uuid, proj_dir) - except Exception as e: # noqa: BLE001 - print(f" FAIL: {e}", file=sys.stderr) + if wanted is not None: + if d.name not in wanted: continue - meta["source_format"] = src_meta["source_format"] - meta["source_path"] = src_meta["source_path"] - meta["source_documents"] = src_meta["source_documents"] - if src_meta.get("editor_version"): - meta["editor_version"] = src_meta["editor_version"] - meta_path.write_text( - json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" - ) - total = sum(d["size"] for d in src_meta["source_documents"]) - print( - f" OK: {len(src_meta['source_documents'])} docs, " - f"{total / 1024:.1f} KB, editor={src_meta.get('editor_version')}" - ) + elif require_origin_pro: + try: + m = json.loads((d / "metadata.json").read_text(encoding="utf-8")) + except Exception: # noqa: BLE001 + continue + if (m.get("raw_fields") or {}).get("origin") != "pro": + continue + targets.append(d) + return targets + + +def _run_backfill_source( + out_root: Path, + only_uuids: str | None = None, + concurrency: int = 1, +) -> int: + """Walk per-project dirs and fetch Std source.json into each. + + Updates metadata.json in-place to add source_format / source_documents / + editor_version. + """ + targets = _discover_backfill_targets(out_root, only_uuids) + src_client = make_source_client() + + def fetch(proj_dir: Path, meta: dict): + try: + return ("ok", fetch_std_source(src_client, proj_dir.name, proj_dir)) + except Exception as e: # noqa: BLE001 + return ("fail", str(e)) + + try: + _run_backfill_concurrent( + targets, fetch, concurrency=concurrency, label="std source", size_unit="KB" + ) finally: src_client.close() return 0 @@ -1394,65 +1476,37 @@ def _run_backfill_pro_source( only_uuids: str | None = None, cookie_path: str = PRO_COOKIE_PATH_DEFAULT, max_source_mb: int | None = None, + concurrency: int = 1, ) -> int: - """Walk per-project dirs in out_root, fetch Pro source for origin=pro projects. - - A project is considered Pro by either: existing metadata.json marks - raw_fields.origin == 'pro', OR --uuids was passed and includes this UUID - (caller is asserting Pro). - """ - wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None - targets: list[Path] = [] - for d in sorted(out_root.iterdir()): - if not d.is_dir(): - continue - meta_path = d / "metadata.json" - if not meta_path.exists(): - continue - if wanted is not None: - if d.name not in wanted: - continue - else: - try: - m = json.loads(meta_path.read_text(encoding="utf-8")) - except Exception: # noqa: BLE001 - continue - if (m.get("raw_fields") or {}).get("origin") != "pro": - continue - targets.append(d) - - print(f"Backfill pro source for {len(targets)} projects under {out_root}") + """Walk per-project dirs and fetch Pro source into each (modern + legacy).""" + targets = _discover_backfill_targets( + out_root, only_uuids, require_origin_pro=(only_uuids is None) + ) pro_client = make_pro_source_client(cookie_path=cookie_path) - try: - for i, proj_dir in enumerate(targets, 1): - uuid = proj_dir.name - meta_path = proj_dir / "metadata.json" - meta = json.loads(meta_path.read_text(encoding="utf-8")) - print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})") - try: - src_meta = fetch_pro_source( + # Multiple threads may trip oversize concurrently; serialize the state-file + # append so lines don't interleave. + oversize_lock = threading.Lock() + + def fetch(proj_dir: Path, meta: dict): + uuid = proj_dir.name + try: + return ( + "ok", + fetch_pro_source( pro_client, uuid, proj_dir, max_source_mb=max_source_mb - ) - except ProjectOversizeError as e: - print(f" SKIPPED (oversize): {e}", file=sys.stderr) + ), + ) + except ProjectOversizeError as e: + with oversize_lock: _record_oversize(out_root, uuid, e) - continue - except Exception as e: # noqa: BLE001 - print(f" FAIL: {e}", file=sys.stderr) - continue - meta["source_format"] = src_meta["source_format"] - meta["source_path"] = src_meta["source_path"] - meta["source_documents"] = src_meta["source_documents"] - if src_meta.get("editor_version"): - meta["editor_version"] = src_meta["editor_version"] - meta_path.write_text( - json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" - ) - total = sum(d["size"] for d in src_meta["source_documents"]) - print( - f" OK: {len(src_meta['source_documents'])} docs, " - f"{total / 1024 / 1024:.1f} MB plain, editor={src_meta.get('editor_version')}" - ) + return ("oversize", str(e)) + except Exception as e: # noqa: BLE001 + return ("fail", str(e)) + + try: + _run_backfill_concurrent( + targets, fetch, concurrency=concurrency, label="pro source" + ) finally: pro_client.close() return 0