"""Estimate full-corpus storage by sampling oshwhub detail pages (no downloads). 从列表 API 取 N 个项目,解析每个详情页的 `attachments[]`,把 `size` 字段求和。 不下载任何附件,仅抓 HTML 页,对服务器压力小;可用来快速给 Charles 一个 放量存储估计。 Usage: uv run python scripts/estimate_size.py --pages 5 --sort hot """ from __future__ import annotations import argparse import statistics as st import sys import time from pathlib import Path # Reuse crawler helpers; avoid duplicating HTTP/parse code sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from crawlers.oshwhub.crawler import ( # noqa: E402 make_client, list_projects, parse_detail_html, BASE, ) def fmt_mb(bytes_: float) -> str: return f"{bytes_ / 1024 / 1024:.1f} MB" def fmt_gb(bytes_: float) -> str: return f"{bytes_ / 1024 / 1024 / 1024:.2f} GB" def main(argv: list[str] | None = None) -> int: ap = argparse.ArgumentParser() ap.add_argument("--pages", type=int, default=3) ap.add_argument("--page-size", type=int, default=30) ap.add_argument("--sort", default="hot") ap.add_argument("--sleep", type=float, default=1.0, help="seconds between detail fetches") args = ap.parse_args(argv) sample_sizes: list[int] = [] # per-project total bytes sample_counts: list[int] = [] # per-project attachment count ext_hist: dict[str, int] = {} # bytes by extension lic_hist: dict[str, int] = {} total = None with make_client() as client: for page in range(1, args.pages + 1): res = list_projects(client, page=page, page_size=args.page_size, sort=args.sort) total = res["total"] for it in res["lists"]: path = it["path"] url = f"{BASE}/{path}" try: r = client.get(url) r.raise_for_status() d = parse_detail_html(r.text) except Exception as e: print(f" skip {path}: {e}", file=sys.stderr) continue lic = d.get("license") or "unknown" lic_hist[lic] = lic_hist.get(lic, 0) + 1 proj_size = 0 count = 0 for a in d.get("attachments", []): size = a.get("size") or 0 ext = (a.get("ext") or "?").lower() proj_size += size count += 1 ext_hist[ext] = ext_hist.get(ext, 0) + size sample_sizes.append(proj_size) sample_counts.append(count) print( f" p{page:02d} {path:50.50} files={count:>2} size={fmt_mb(proj_size)}", flush=True, ) time.sleep(args.sleep) if not sample_sizes: print("no samples") return 1 n = len(sample_sizes) total_bytes = sum(sample_sizes) mean = st.mean(sample_sizes) median = st.median(sample_sizes) p90 = sorted(sample_sizes)[int(n * 0.9)] if n >= 10 else max(sample_sizes) max_ = max(sample_sizes) print() print(f"sampled: {n} projects (sort={args.sort})") print(f"attachments/proj: mean={st.mean(sample_counts):.1f} " f"median={st.median(sample_counts):.0f} max={max(sample_counts)}") print(f"size/proj: mean={fmt_mb(mean)} median={fmt_mb(median)} " f"p90={fmt_mb(p90)} max={fmt_mb(max_)}") print(f"sample total: {fmt_mb(total_bytes)}") if total: est_full = mean * total print(f"\ncorpus total (API reports): {total} projects") print(f" × mean → estimate: {fmt_gb(est_full)}") print(f" × median → estimate: {fmt_gb(median * total)}") print(f" × p90 → upper bound: {fmt_gb(p90 * total)}") print("\ntop ext by total bytes:") for ext, b in sorted(ext_hist.items(), key=lambda x: -x[1])[:10]: print(f" .{ext:6} {fmt_mb(b):>12}") print("\nlicense distribution in sample:") for lic, c in sorted(lic_hist.items(), key=lambda x: -x[1])[:10]: pct = 100 * c / n print(f" {lic:30} {c:>3} ({pct:.0f}%)") return 0 if __name__ == "__main__": raise SystemExit(main())