crawler: thread-pool concurrency for backfill paths

Both _run_backfill_source and _run_backfill_pro_source now honor
--concurrency N (default 1 keeps current sequential behavior). Shared
dispatch helper _run_backfill_concurrent + _discover_backfill_targets
factored out — the two paths had drifted but were structurally the same.

Thread safety:
  - httpx.Client is sync-thread-safe per docs; one client shared across
    threads is correct
  - Per-project file writes (metadata.json + source/*) don't conflict
    since each thread owns one project dir
  - Oversize state file is shared; serialized via a Lock around
    _record_oversize
  - Print is wrapped in a Lock for readable progress

Expected speedup on dev1 (Guangzhou): batch-200 Pro 100 项 sequential
~14 min -> concurrency 5 ~3-4 min. Std similar 2-3x. Server-side limit
isn't likely to bite at this scale (probe showed Pro QPS=2 sustained
clean; concurrency 5 puts effective rate around 4-5 req/s).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-29 02:32:03 +08:00
parent 7cb35020f4
commit f9d370e950

View File

@@ -1246,13 +1246,16 @@ def main(argv: list[str] | None = None) -> int:
# --backfill-source: standalone path that scans existing project dirs and
# only fetches source. No listing/HTML/attachment work.
if args.backfill_source:
return _run_backfill_source(args.out, only_uuids=args.uuids)
return _run_backfill_source(
args.out, only_uuids=args.uuids, concurrency=args.concurrency
)
if args.backfill_pro_source:
return _run_backfill_pro_source(
args.out,
only_uuids=args.uuids,
cookie_path=args.pro_cookie,
max_source_mb=args.max_source_mb,
concurrency=args.concurrency,
)
with make_client() as client:
@@ -1341,36 +1344,42 @@ def main(argv: list[str] | None = None) -> int:
return 0
def _run_backfill_source(out_root: Path, only_uuids: str | None = None) -> int:
"""Walk existing per-project dirs in out_root and fetch source.json into each.
def _run_backfill_concurrent(
targets: list[Path],
fetch_one,
*,
concurrency: int,
label: str,
size_unit: str = "MB",
) -> None:
"""Shared worker dispatch for std/pro backfill.
Updates metadata.json in-place to add source_format / source_documents / editor_version.
`fetch_one(proj_dir, meta) -> (status, src_meta_or_msg)` where status is one of:
"ok" — `src_meta` is the fetch_*_source dict
"oversize" — Pro-only; ProjectOversizeError already recorded by caller
"fail" — `src_meta_or_msg` is the human-readable error string
Threads share fetch_one's captured client (httpx.Client is sync-thread-safe).
Print + per-target metadata writes are independent (each project owns its
metadata.json), so no lock needed there. Only stdout ordering is serialized.
"""
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
targets: list[Path] = []
for d in sorted(out_root.iterdir()):
if not d.is_dir():
continue
meta_path = d / "metadata.json"
if not meta_path.exists():
continue
if wanted and d.name not in wanted:
continue
targets.append(d)
print_lock = threading.Lock()
print(f"Backfill source for {len(targets)} projects under {out_root}")
src_client = make_source_client()
try:
for i, proj_dir in enumerate(targets, 1):
def _do_one(idx: int, proj_dir: Path) -> None:
uuid = proj_dir.name
meta_path = proj_dir / "metadata.json"
meta = json.loads(meta_path.read_text(encoding="utf-8"))
print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})")
try:
src_meta = fetch_std_source(src_client, uuid, proj_dir)
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception as e: # noqa: BLE001
print(f" FAIL: {e}", file=sys.stderr)
continue
with print_lock:
print(f"[{idx}/{len(targets)}] meta read FAIL {uuid}: {e}", file=sys.stderr)
return
title = meta.get("title", "?")
status, payload = fetch_one(proj_dir, meta)
with print_lock:
head = f"[{idx}/{len(targets)}] {uuid}"
if status == "ok":
src_meta = payload
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
@@ -1379,10 +1388,83 @@ def _run_backfill_source(out_root: Path, only_uuids: str | None = None) -> int:
meta_path.write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
total = sum(d["size"] for d in src_meta["source_documents"])
docs = src_meta["source_documents"]
total = sum(d["size"] for d in docs)
if size_unit == "KB":
sz = f"{total / 1024:.1f} KB"
else:
sz = f"{total / 1024 / 1024:.1f} MB"
print(
f" OK: {len(src_meta['source_documents'])} docs, "
f"{total / 1024:.1f} KB, editor={src_meta.get('editor_version')}"
f"{head} OK ({title[:30]}): {len(docs)} docs, {sz}, "
f"editor={src_meta.get('editor_version')}"
)
elif status == "oversize":
print(f"{head} SKIPPED oversize ({title[:30]}): {payload}", file=sys.stderr)
else:
print(f"{head} FAIL ({title[:30]}): {payload}", file=sys.stderr)
print(f"Backfill {label} for {len(targets)} projects (concurrency={concurrency})")
if concurrency <= 1:
for i, d in enumerate(targets, 1):
_do_one(i, d)
else:
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=concurrency) as pool:
futs = [pool.submit(_do_one, i, d) for i, d in enumerate(targets, 1)]
for f in as_completed(futs):
f.result()
def _discover_backfill_targets(
out_root: Path,
only_uuids: str | None,
*,
require_origin_pro: bool = False,
) -> list[Path]:
"""Walk out_root for per-project dirs that have metadata.json and match
the filter. If `only_uuids` is given, that's the authoritative whitelist;
otherwise (Pro only) we fall back to filtering by metadata.raw_fields.origin."""
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
targets: list[Path] = []
for d in sorted(out_root.iterdir()):
if not d.is_dir() or not (d / "metadata.json").exists():
continue
if wanted is not None:
if d.name not in wanted:
continue
elif require_origin_pro:
try:
m = json.loads((d / "metadata.json").read_text(encoding="utf-8"))
except Exception: # noqa: BLE001
continue
if (m.get("raw_fields") or {}).get("origin") != "pro":
continue
targets.append(d)
return targets
def _run_backfill_source(
out_root: Path,
only_uuids: str | None = None,
concurrency: int = 1,
) -> int:
"""Walk per-project dirs and fetch Std source.json into each.
Updates metadata.json in-place to add source_format / source_documents /
editor_version.
"""
targets = _discover_backfill_targets(out_root, only_uuids)
src_client = make_source_client()
def fetch(proj_dir: Path, meta: dict):
try:
return ("ok", fetch_std_source(src_client, proj_dir.name, proj_dir))
except Exception as e: # noqa: BLE001
return ("fail", str(e))
try:
_run_backfill_concurrent(
targets, fetch, concurrency=concurrency, label="std source", size_unit="KB"
)
finally:
src_client.close()
@@ -1394,64 +1476,36 @@ def _run_backfill_pro_source(
only_uuids: str | None = None,
cookie_path: str = PRO_COOKIE_PATH_DEFAULT,
max_source_mb: int | None = None,
concurrency: int = 1,
) -> int:
"""Walk per-project dirs in out_root, fetch Pro source for origin=pro projects.
A project is considered Pro by either: existing metadata.json marks
raw_fields.origin == 'pro', OR --uuids was passed and includes this UUID
(caller is asserting Pro).
"""
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
targets: list[Path] = []
for d in sorted(out_root.iterdir()):
if not d.is_dir():
continue
meta_path = d / "metadata.json"
if not meta_path.exists():
continue
if wanted is not None:
if d.name not in wanted:
continue
else:
try:
m = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception: # noqa: BLE001
continue
if (m.get("raw_fields") or {}).get("origin") != "pro":
continue
targets.append(d)
print(f"Backfill pro source for {len(targets)} projects under {out_root}")
"""Walk per-project dirs and fetch Pro source into each (modern + legacy)."""
targets = _discover_backfill_targets(
out_root, only_uuids, require_origin_pro=(only_uuids is None)
)
pro_client = make_pro_source_client(cookie_path=cookie_path)
try:
for i, proj_dir in enumerate(targets, 1):
# Multiple threads may trip oversize concurrently; serialize the state-file
# append so lines don't interleave.
oversize_lock = threading.Lock()
def fetch(proj_dir: Path, meta: dict):
uuid = proj_dir.name
meta_path = proj_dir / "metadata.json"
meta = json.loads(meta_path.read_text(encoding="utf-8"))
print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})")
try:
src_meta = fetch_pro_source(
return (
"ok",
fetch_pro_source(
pro_client, uuid, proj_dir, max_source_mb=max_source_mb
),
)
except ProjectOversizeError as e:
print(f" SKIPPED (oversize): {e}", file=sys.stderr)
with oversize_lock:
_record_oversize(out_root, uuid, e)
continue
return ("oversize", str(e))
except Exception as e: # noqa: BLE001
print(f" FAIL: {e}", file=sys.stderr)
continue
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
if src_meta.get("editor_version"):
meta["editor_version"] = src_meta["editor_version"]
meta_path.write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
total = sum(d["size"] for d in src_meta["source_documents"])
print(
f" OK: {len(src_meta['source_documents'])} docs, "
f"{total / 1024 / 1024:.1f} MB plain, editor={src_meta.get('editor_version')}"
return ("fail", str(e))
try:
_run_backfill_concurrent(
targets, fetch, concurrency=concurrency, label="pro source"
)
finally:
pro_client.close()