Topic-targeted pull from local listing index (`name OR introduction`
contains 飞控). 79 std hits in oshwhub_listing_full.jsonl, 2 already
crawled, 77 newly fetched.
dev1 (Guangzhou) walltime:
Step 1 detail scrape ~12s, Step 4 std-source backfill ~80s
(concurrency=5)
Source completeness: 73/77 with editor source, 4 are upstream
attachments-only (no editor session ever attached, source_documents=[]
is genuine — no editor_version on the SSR page either).
Crawler hardening (crawlers/oshwhub/crawler.py):
- count.{like,star,fork,views} are now `.get(..., 0)` defensive.
Listing API omits zero-valued fields for some low-activity entries
(3/77 hit this on first pass, hard-failed with KeyError 'like').
Affects rank_score, pick_top, and metadata.json metrics block.
License mix: 65% GPL 3.0, 11% Public Domain, 11% MIT, ~6% CC variants.
Transport: dev1 → SG via tar+scp (33 MB, ~3 min over lossy
cross-region link). Bypassed gitea push from dev1 because the same
6.5%-loss link tanks single-stream throughput.
1519 lines
57 KiB
Python
1519 lines
57 KiB
Python
"""oshwhub.com crawler — MVP.
|
||
|
||
Usage:
|
||
uv run python -m crawlers.oshwhub \
|
||
--out data/raw/oshwhub \
|
||
--top 10 --min-likes 50 --min-grade 4
|
||
|
||
Or with explicit UUID list:
|
||
uv run python -m crawlers.oshwhub \
|
||
--uuids 298873b7fdbe44f8ba0e7351e023bc2c,7b6a398811f14eba9a952b8d2ddd7ace \
|
||
--out data/raw/oshwhub
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import dataclasses as dc
|
||
import hashlib
|
||
import html as _html
|
||
import json
|
||
import re
|
||
import shutil
|
||
import sys
|
||
import threading
|
||
import time
|
||
import urllib.parse
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Iterator
|
||
|
||
import httpx
|
||
|
||
API_LIST = "https://oshwhub.com/api/project"
|
||
API_PROJECT = "https://oshwhub.com/api/project" # /api/project/<uuid> for source flow
|
||
BASE = "https://oshwhub.com"
|
||
IMG_CDN = "https://image.lceda.cn"
|
||
LCEDA_DOC_API = "https://lceda.cn/api/documents"
|
||
PRO_API = "https://pro.lceda.cn/api/v4"
|
||
PRO_EDITOR_VERSION = "3.2.127"
|
||
PRO_COOKIE_PATH_DEFAULT = "/home/ubuntu/.secrets/pro-lceda-cookie-header.txt"
|
||
UA = "FacereDataset/0.1 (+https://git.deepknow.site/Facere/FacereDataset)"
|
||
# Std source endpoints reject FacereDataset UA on oshwhub /api/project; spoof browser UA only there.
|
||
# See docs/sources/easyeda_std_source.md §3.
|
||
BROWSER_UA = (
|
||
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
"Chrome/147.0.0.0 Safari/537.36"
|
||
)
|
||
# Per-host rate limits — calibrated against ladder probes (scripts/probe_rate_limit.py)
|
||
# on 2026-04-29. See data/state/probe_rate_limit_results.md for the methodology.
|
||
SLEEP_BETWEEN = 1.0 # oshwhub.com detail/listing — ladder probe: 0.5s clean,
|
||
# 1.0s leaves headroom (detail HTML p90 hits 6s at 1.0s,
|
||
# 15s at 0.5s due to server-queue softlimit).
|
||
SLEEP_SOURCE = 0.5 # lceda.cn Std source endpoints — ladder probe 5/2/1/0.5/0.25s
|
||
# all clean (45/45 200/success). Latency is dominated by
|
||
# payload size (Std docs span 4 KB to 4.5 MB) not server
|
||
# backpressure. Same posture as Pro API. 10x speedup.
|
||
SLEEP_PRO = 0.5 # pro.lceda.cn API host — sustained burst probe (25
|
||
# distinct UUIDs at 0.5s) showed 0/25 errors, median
|
||
# latency 410ms. 10x faster than the original 5.0s.
|
||
# Originally set high out of caution because Pro requires
|
||
# logged-in cookie; empirically Pro API tolerates QPS=2
|
||
# cleanly. CDN blob loop uses SLEEP_PRO_CDN below.
|
||
SLEEP_PRO_CDN = 0.2 # modules.lceda.cn — CDN serving AES-encrypted EPRO2
|
||
# history blobs. The editor fires these back-to-back per
|
||
# HAR analysis. Chain replay walltime dominated by this
|
||
# loop on big projects (X86 board: ~1h at 5s/req →
|
||
# ~3 min at 0.2s/req).
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# HTTP
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def make_client(timeout: float = 30.0) -> httpx.Client:
|
||
return httpx.Client(
|
||
http2=True,
|
||
timeout=timeout,
|
||
headers={"User-Agent": UA, "Accept": "text/html,application/json;q=0.9,*/*;q=0.8"},
|
||
follow_redirects=True,
|
||
)
|
||
|
||
|
||
def make_source_client(timeout: float = 60.0) -> httpx.Client:
|
||
"""Client for Std source endpoints (lceda.cn/oshwhub.com /api/...).
|
||
|
||
Uses browser UA + editor Referer to satisfy server-side UA filter.
|
||
"""
|
||
return httpx.Client(
|
||
http2=True,
|
||
timeout=timeout,
|
||
headers={
|
||
"User-Agent": BROWSER_UA,
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
"Referer": "https://lceda.cn/editor",
|
||
},
|
||
follow_redirects=False,
|
||
)
|
||
|
||
|
||
def make_pro_source_client(
|
||
cookie_path: str = PRO_COOKIE_PATH_DEFAULT,
|
||
timeout: float = 90.0,
|
||
) -> httpx.Client:
|
||
"""Client for Pro source endpoints (pro.lceda.cn /api/v4/...).
|
||
|
||
Requires logged-in cookie header at `cookie_path` (mode 600). The cookie
|
||
file is a single Cookie header value (e.g. `lceda_pro_session=...; XSRF-TOKEN=...`).
|
||
Per-request `path: <project_uuid>` header MUST be added by callers — see
|
||
docs/sources/easyeda_pro_source.md §2.5.
|
||
"""
|
||
cookie = Path(cookie_path).read_text(encoding="utf-8").strip()
|
||
return httpx.Client(
|
||
http2=True,
|
||
timeout=timeout,
|
||
headers={
|
||
"User-Agent": BROWSER_UA,
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||
"Editor-Version": PRO_EDITOR_VERSION,
|
||
"Referer": "https://pro.lceda.cn/editor",
|
||
"Cookie": cookie,
|
||
},
|
||
follow_redirects=False,
|
||
)
|
||
|
||
|
||
def polite_sleep() -> None:
|
||
time.sleep(SLEEP_BETWEEN)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Listing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def list_projects(
|
||
client: httpx.Client,
|
||
page: int = 1,
|
||
page_size: int = 30,
|
||
sort: str = "hot",
|
||
origin: str | None = None,
|
||
) -> dict:
|
||
params: dict[str, object] = {"page": page, "pageSize": page_size, "sort": sort}
|
||
if origin:
|
||
params["origin"] = origin # 'std' or 'pro' — server-side filter
|
||
r = client.get(API_LIST, params=params)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
if not data.get("success"):
|
||
raise RuntimeError(f"list API failed: {data}")
|
||
return data["result"]
|
||
|
||
|
||
def rank_score(item: dict) -> float:
|
||
"""Composite quality score: favor projects with broad engagement."""
|
||
# Listing API can omit zero-valued count fields (observed: low-activity
|
||
# projects miss `like`, possibly others). Use .get with 0 default.
|
||
c = item.get("count") or {}
|
||
return (
|
||
c.get("like", 0) * 3
|
||
+ c.get("star", 0) * 1
|
||
+ c.get("fork", 0) * 2
|
||
+ c.get("views", 0) / 100
|
||
+ (item.get("comments_count") or 0) * 2
|
||
+ (item.get("grade") or 0) * 50
|
||
)
|
||
|
||
|
||
def pick_top(
|
||
items: list[dict],
|
||
n: int,
|
||
min_likes: int,
|
||
min_grade: int,
|
||
exclude_copies: bool = True,
|
||
) -> list[dict]:
|
||
filtered = []
|
||
for it in items:
|
||
if exclude_copies and "_copy" in it["path"]:
|
||
continue
|
||
if (it.get("count") or {}).get("like", 0) < min_likes:
|
||
continue
|
||
if (it.get("grade") or 0) < min_grade:
|
||
continue
|
||
filtered.append(it)
|
||
filtered.sort(key=rank_score, reverse=True)
|
||
return filtered[:n]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Detail page parsing
|
||
# ---------------------------------------------------------------------------
|
||
|
||
RE_ATTACH_BLOCK = re.compile(r'\\"attachments\\":\[', re.DOTALL)
|
||
RE_LICENSE = re.compile(r'\\"license\\":\\"([^\\"]+)\\"')
|
||
RE_META_DESC = re.compile(
|
||
r'<meta\s+name="description"\s+content="([^"]*)"', re.IGNORECASE
|
||
)
|
||
RE_TITLE = re.compile(r"<title>([^<]+)</title>", re.IGNORECASE)
|
||
|
||
|
||
def _find_balanced_bracket(s: str, start: int, open_ch: str = "[", close_ch: str = "]") -> int:
|
||
"""Return index after the matching close bracket. start must point at open_ch."""
|
||
assert s[start] == open_ch
|
||
depth = 0
|
||
for i in range(start, len(s)):
|
||
ch = s[i]
|
||
if ch == open_ch:
|
||
depth += 1
|
||
elif ch == close_ch:
|
||
depth -= 1
|
||
if depth == 0:
|
||
return i + 1
|
||
raise ValueError("unbalanced")
|
||
|
||
|
||
def parse_detail_html(h: str) -> dict:
|
||
"""Extract attachments, license, title, description from SSR HTML."""
|
||
out: dict = {
|
||
"title": None,
|
||
"description_meta": None,
|
||
"license": None,
|
||
"attachments": [],
|
||
}
|
||
|
||
m = RE_TITLE.search(h)
|
||
if m:
|
||
# HTML entities + suffix stripping
|
||
title = _html.unescape(m.group(1)).strip()
|
||
for sfx in (
|
||
" - 立创开源硬件平台 - 深圳创电优选科技有限公司",
|
||
" - 立创开源硬件平台",
|
||
):
|
||
if title.endswith(sfx):
|
||
title = title[: -len(sfx)]
|
||
out["title"] = title
|
||
|
||
m = RE_META_DESC.search(h)
|
||
if m:
|
||
out["description_meta"] = _html.unescape(m.group(1))
|
||
|
||
m = RE_LICENSE.search(h)
|
||
if m:
|
||
out["license"] = m.group(1)
|
||
|
||
m = RE_ATTACH_BLOCK.search(h)
|
||
if m:
|
||
arr_start = m.end() - 1 # point at '['
|
||
arr_end = _find_balanced_bracket(h, arr_start)
|
||
block = h[arr_start:arr_end]
|
||
clean = block.replace('\\"', '"').replace("\\\\", "\\")
|
||
try:
|
||
out["attachments"] = json.loads(clean)
|
||
except json.JSONDecodeError as e:
|
||
# Keep raw for debugging; skip attachments silently. Caller can log.
|
||
out["_attachments_parse_error"] = str(e)
|
||
|
||
return out
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Download helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def download_to(
|
||
client: httpx.Client, url: str, dest: Path, *, max_seconds: float = 60.0
|
||
) -> tuple[int, str]:
|
||
"""Stream-download url to dest. Returns (size, sha256).
|
||
|
||
`max_seconds` caps total download walltime — defends against
|
||
pathologically slow CDN connections (observed 10 KB/s on
|
||
image.lceda.cn for one project, would have hung 6+ min on a 3.6 MB
|
||
cover otherwise). httpx's default timeout resets per chunk, so
|
||
relying on it for streaming is unsafe. We track wall time ourselves.
|
||
"""
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
h = hashlib.sha256()
|
||
size = 0
|
||
t_start = time.monotonic()
|
||
with client.stream("GET", url) as r:
|
||
r.raise_for_status()
|
||
with open(dest, "wb") as f:
|
||
for chunk in r.iter_bytes(1 << 15):
|
||
if time.monotonic() - t_start > max_seconds:
|
||
raise httpx.ReadTimeout(
|
||
f"download exceeded {max_seconds}s wall budget "
|
||
f"(got {size} of {r.headers.get('content-length', '?')} bytes)"
|
||
)
|
||
f.write(chunk)
|
||
h.update(chunk)
|
||
size += len(chunk)
|
||
return size, h.hexdigest()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Std source fetch (login NOT required for public projects — see
|
||
# docs/sources/easyeda_std_source.md)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def fetch_std_source(
|
||
source_client: httpx.Client,
|
||
project_uuid: str,
|
||
proj_dir: Path,
|
||
sleep: float = SLEEP_SOURCE,
|
||
) -> dict:
|
||
"""Fetch EasyEDA Std project source (schematic + PCB dataStr) anonymously.
|
||
|
||
Returns dict with keys:
|
||
- source_format: "easyeda-std"
|
||
- source_path: "source/"
|
||
- source_documents: list of {doc_uuid, docType, master, path, size, sha256}
|
||
- editor_version: from dataStr.head when available
|
||
"""
|
||
src_dir = proj_dir / "source"
|
||
src_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 1. Project meta → version_documents
|
||
r = source_client.get(f"{API_PROJECT}/{project_uuid}")
|
||
r.raise_for_status()
|
||
j = r.json()
|
||
if not j.get("success"):
|
||
raise RuntimeError(f"oshwhub project meta failed: {j}")
|
||
version_documents = j["result"].get("version_documents") or []
|
||
time.sleep(sleep)
|
||
|
||
# 2. Per document → dataStr
|
||
doc_metas: list[dict] = []
|
||
editor_version: str | None = None
|
||
for vd in version_documents:
|
||
doc_uuid = vd["uuid"]
|
||
master = vd.get("master")
|
||
doc_type = vd.get("docType")
|
||
url = f"{LCEDA_DOC_API}/{doc_uuid}"
|
||
r2 = source_client.get(url, params={"uuid": doc_uuid, "path": doc_uuid})
|
||
r2.raise_for_status()
|
||
# Server returns text/html mistakenly; body is JSON regardless.
|
||
try:
|
||
body_json = r2.json()
|
||
except Exception as e: # noqa: BLE001
|
||
raise RuntimeError(f"doc {doc_uuid} non-JSON response: {e}; head={r2.text[:200]!r}")
|
||
if not body_json.get("success"):
|
||
raise RuntimeError(f"doc {doc_uuid} response not success: {body_json}")
|
||
|
||
local_rel = f"source/{doc_uuid}.json"
|
||
local_path = proj_dir / local_rel
|
||
text = json.dumps(body_json, ensure_ascii=False, separators=(",", ":"))
|
||
local_path.write_text(text, encoding="utf-8")
|
||
size = local_path.stat().st_size
|
||
sha = hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||
|
||
# Pull editor version from the dataStr.head if present.
|
||
ev = _extract_editor_version(body_json)
|
||
if ev and not editor_version:
|
||
editor_version = ev
|
||
|
||
doc_metas.append({
|
||
"doc_uuid": doc_uuid,
|
||
"docType": doc_type,
|
||
"master": master,
|
||
"path": local_rel,
|
||
"size": size,
|
||
"sha256": sha,
|
||
})
|
||
time.sleep(sleep)
|
||
|
||
# 3. source/manifest.json — index + raw upstream version_documents for diffing
|
||
manifest = {
|
||
"project_uuid": project_uuid,
|
||
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
||
"editor_version": editor_version,
|
||
"documents": doc_metas,
|
||
"upstream_version_documents": version_documents,
|
||
}
|
||
(src_dir / "manifest.json").write_text(
|
||
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
|
||
return {
|
||
"source_format": "easyeda-std",
|
||
"source_path": "source/",
|
||
"source_documents": doc_metas,
|
||
"editor_version": editor_version,
|
||
}
|
||
|
||
|
||
def _extract_editor_version(body_json: dict) -> str | None:
|
||
"""Best-effort: pull head.editorVersion from dataStr (location varies by docType)."""
|
||
res = body_json.get("result") or {}
|
||
# PCB shape: result.dataStr at top
|
||
ds = res.get("dataStr")
|
||
if isinstance(ds, dict):
|
||
head = ds.get("head") or {}
|
||
if isinstance(head, dict) and head.get("editorVersion"):
|
||
return str(head["editorVersion"])
|
||
# Schematic shape: result.schematics[*].dataStr
|
||
for sch in (res.get("schematics") or []):
|
||
if isinstance(sch, dict):
|
||
ds2 = sch.get("dataStr") or {}
|
||
if isinstance(ds2, dict):
|
||
head = ds2.get("head") or {}
|
||
if isinstance(head, dict) and head.get("editorVersion"):
|
||
return str(head["editorVersion"])
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Pro source fetch (pro.lceda.cn — EPRO2 message stream, AES-128-GCM)
|
||
# See docs/sources/easyeda_pro_source.md.
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _pro_get_json(
|
||
client: httpx.Client,
|
||
url: str,
|
||
project_uuid: str,
|
||
*,
|
||
params: dict | None = None,
|
||
) -> dict | list:
|
||
"""GET a pro.lceda.cn /api/v4 endpoint with the per-project `path` header.
|
||
|
||
Raises if the JSON envelope's `success` is False; returns `result`.
|
||
"""
|
||
r = client.get(url, params=params, headers={"path": project_uuid})
|
||
r.raise_for_status()
|
||
j = r.json()
|
||
if not j.get("success"):
|
||
raise RuntimeError(f"Pro API failed (url={url}): {j}")
|
||
return j["result"]
|
||
|
||
|
||
def _order_history_chain(chain: list[dict]) -> list[dict]:
|
||
"""Return the chain ordered root→HEAD by walking parent links.
|
||
|
||
Pro returns the chain HEAD-first as a flat list with `parent` links. We
|
||
walk from the unique root forward.
|
||
"""
|
||
by_uuid = {h["uuid"]: h for h in chain}
|
||
roots = [h for h in chain if h.get("parent") not in by_uuid]
|
||
if len(roots) != 1:
|
||
raise RuntimeError(
|
||
f"history chain has {len(roots)} roots; not strictly linear"
|
||
)
|
||
children: dict[str | None, list[dict]] = {}
|
||
for h in chain:
|
||
children.setdefault(h.get("parent"), []).append(h)
|
||
ordered: list[dict] = []
|
||
cur: dict | None = roots[0]
|
||
while cur is not None:
|
||
ordered.append(cur)
|
||
nexts = children.get(cur["uuid"], [])
|
||
if len(nexts) > 1:
|
||
raise RuntimeError(
|
||
f"history chain not linear at {cur['uuid']!r}: {len(nexts)} children"
|
||
)
|
||
cur = nexts[0] if nexts else None
|
||
if len(ordered) != len(chain):
|
||
raise RuntimeError(
|
||
f"reconstructed {len(ordered)} of {len(chain)} histories; chain has cycles or orphans"
|
||
)
|
||
return ordered
|
||
|
||
|
||
class ProjectOversizeError(Exception):
|
||
"""Raised when a Pro project's chain replay would exceed the configured cap.
|
||
|
||
`cap_mb` is the trip threshold; `bytes_so_far` is the *encrypted blob* total
|
||
accumulated when we tripped (pre-decompression, pre-partition).
|
||
"""
|
||
|
||
def __init__(self, bytes_so_far: int, cap_mb: int) -> None:
|
||
super().__init__(
|
||
f"oversize: blob bytes {bytes_so_far // 1024 // 1024} MB > cap {cap_mb} MB"
|
||
)
|
||
self.bytes_so_far = bytes_so_far
|
||
self.cap_mb = cap_mb
|
||
|
||
|
||
def _record_oversize(out_root: Path, uuid: str, err: ProjectOversizeError) -> None:
|
||
"""Append one row to data/state/oshwhub_pro_oversize.jsonl for later review."""
|
||
state_path = Path("data/state/oshwhub_pro_oversize.jsonl")
|
||
state_path.parent.mkdir(parents=True, exist_ok=True)
|
||
row = {
|
||
"uuid": uuid,
|
||
"out_root": str(out_root),
|
||
"bytes_so_far": err.bytes_so_far,
|
||
"cap_mb": err.cap_mb,
|
||
"ts": datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
with state_path.open("a") as f:
|
||
f.write(json.dumps(row, ensure_ascii=False) + "\n")
|
||
|
||
|
||
def fetch_pro_source(
|
||
pro_client: httpx.Client,
|
||
project_uuid: str,
|
||
proj_dir: Path,
|
||
sleep: float = SLEEP_PRO,
|
||
max_source_mb: int | None = None,
|
||
) -> dict:
|
||
"""Dispatcher: pick modern (3.x branch+EPRO2) vs legacy (2.x v2/documents/lists)
|
||
based on whether project meta contains a non-null branch_uuid.
|
||
|
||
Pro 3.x stores in git-style branch+history with AES-encrypted EPRO2 streams;
|
||
Pro 2.x predates that and uses Std-style per-doc dataStr served from
|
||
/api/v2/documents/lists. See docs/sources/easyeda_pro_source.md §1.1.
|
||
|
||
`max_source_mb` only gates modern-path projects (legacy is always tiny: <2 MB
|
||
in our 2/2 sample) and trips before any blob is written to disk past the cap.
|
||
"""
|
||
proj = _pro_get_json(pro_client, f"{PRO_API}/projects/{project_uuid}", project_uuid)
|
||
time.sleep(sleep)
|
||
if proj.get("branch_uuid"):
|
||
return _fetch_pro_modern(
|
||
pro_client, project_uuid, proj, proj_dir, sleep, max_source_mb=max_source_mb
|
||
)
|
||
return _fetch_pro_legacy(pro_client, project_uuid, proj, proj_dir, sleep)
|
||
|
||
|
||
def _fetch_pro_modern(
|
||
pro_client: httpx.Client,
|
||
project_uuid: str,
|
||
proj: dict,
|
||
proj_dir: Path,
|
||
sleep: float = SLEEP_PRO,
|
||
max_source_mb: int | None = None,
|
||
) -> dict:
|
||
"""Modern Pro 3.x fetcher: full history chain, AES-GCM decrypted, gunzipped,
|
||
and partitioned into per-document EPRO2 streams.
|
||
|
||
Side effects under ``proj_dir``:
|
||
- source/structure.json — project document tree (boards/schematics/sheets/pcbs/...)
|
||
- source/<doc_uuid>.epro2 — one file per document, raw EPRO2 messages (newline-separated)
|
||
- source/manifest.json — per-doc index + chain summary
|
||
"""
|
||
import gzip
|
||
from collections import OrderedDict
|
||
from Crypto.Cipher import AES # local import: cheap if pycryptodome already loaded
|
||
|
||
src_dir = proj_dir / "source"
|
||
src_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
branch_uuid = proj["branch_uuid"]
|
||
project_editor_version = proj.get("editorVersion")
|
||
|
||
# 2. branch meta -> head history_uuid
|
||
branch = _pro_get_json(
|
||
pro_client,
|
||
f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}",
|
||
project_uuid,
|
||
)
|
||
head_uuid = branch.get("history_uuid")
|
||
if not head_uuid:
|
||
raise RuntimeError(f"no history_uuid (HEAD) on branch {branch_uuid}")
|
||
time.sleep(sleep)
|
||
|
||
# 3. structure tree
|
||
st = _pro_get_json(
|
||
pro_client,
|
||
f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/structures",
|
||
project_uuid,
|
||
)
|
||
raw_structure = st.get("structure")
|
||
structure = json.loads(raw_structure) if isinstance(raw_structure, str) else raw_structure
|
||
(src_dir / "structure.json").write_text(
|
||
json.dumps(structure, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
time.sleep(sleep)
|
||
|
||
# 4. history chain — single endpoint returns full chain (HAR-confirmed 2026-04-28)
|
||
chain = _pro_get_json(
|
||
pro_client,
|
||
f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/histories/{head_uuid}",
|
||
project_uuid,
|
||
)
|
||
if not isinstance(chain, list) or not chain:
|
||
raise RuntimeError(f"unexpected histories response: {type(chain).__name__}")
|
||
ordered = _order_history_chain(chain)
|
||
time.sleep(sleep)
|
||
|
||
# 5. download + decrypt + gunzip + partition by DOCHEAD
|
||
docs: OrderedDict[str, dict] = OrderedDict()
|
||
cur_doc: str | None = None
|
||
bytes_blob_total = 0
|
||
bytes_plain_total = 0
|
||
cap_bytes = max_source_mb * 1024 * 1024 if max_source_mb is not None else None
|
||
for h in ordered:
|
||
blob_r = pro_client.get(h["dataStrUrl"], headers={"path": project_uuid})
|
||
blob_r.raise_for_status()
|
||
blob = blob_r.content
|
||
bytes_blob_total += len(blob)
|
||
# Trip cap on the encrypted blob total. Hits *after* the offending
|
||
# download, but before we decrypt/gunzip/partition (those scale with
|
||
# plain bytes which is even larger). Wipe any partial source/ so disk
|
||
# doesn't accumulate junk on multi-project runs.
|
||
if cap_bytes is not None and bytes_blob_total > cap_bytes:
|
||
shutil.rmtree(src_dir, ignore_errors=True)
|
||
raise ProjectOversizeError(bytes_blob_total, max_source_mb)
|
||
if len(blob) < 16:
|
||
raise RuntimeError(f"history {h['uuid']} blob too short ({len(blob)} B)")
|
||
ct, tag = blob[:-16], blob[-16:]
|
||
cipher = AES.new(
|
||
bytes.fromhex(h["key"]),
|
||
AES.MODE_GCM,
|
||
nonce=bytes.fromhex(h["iv"]),
|
||
)
|
||
gz = cipher.decrypt_and_verify(ct, tag)
|
||
plain = gzip.decompress(gz)
|
||
bytes_plain_total += len(plain)
|
||
for ln in plain.split(b"\n"):
|
||
if not ln.strip():
|
||
continue
|
||
# EPRO2 lines use `||` as field separator and terminate with a single
|
||
# `|`. Strip the trailing `|` first so each part parses as bare JSON.
|
||
stripped = ln.rstrip(b"|")
|
||
parts = stripped.split(b"||")
|
||
try:
|
||
head_msg = json.loads(parts[0])
|
||
except Exception: # noqa: BLE001 — malformed head; skip entire line
|
||
continue
|
||
if head_msg.get("type") == "DOCHEAD" and len(parts) >= 2:
|
||
try:
|
||
payload = json.loads(parts[1])
|
||
except Exception: # noqa: BLE001
|
||
payload = {}
|
||
new_doc = payload.get("uuid")
|
||
if new_doc:
|
||
cur_doc = new_doc
|
||
if cur_doc not in docs:
|
||
docs[cur_doc] = {
|
||
"lines": [],
|
||
"doc_head": payload,
|
||
}
|
||
if cur_doc and cur_doc in docs:
|
||
docs[cur_doc]["lines"].append(ln)
|
||
# CDN host, not the rate-sensitive API host — see SLEEP_PRO_CDN comment.
|
||
time.sleep(SLEEP_PRO_CDN)
|
||
|
||
# 6. write per-doc .epro2 + manifest
|
||
doc_metas: list[dict] = []
|
||
editor_version: str | None = None
|
||
for doc_uuid, info in docs.items():
|
||
body = b"\n".join(info["lines"]) + b"\n"
|
||
local_rel = f"source/{doc_uuid}.epro2"
|
||
local_path = proj_dir / local_rel
|
||
local_path.write_bytes(body)
|
||
size = len(body)
|
||
sha = hashlib.sha256(body).hexdigest()
|
||
head = info["doc_head"]
|
||
ev = head.get("editVersion") or head.get("editorVersion")
|
||
if ev and not editor_version:
|
||
editor_version = str(ev)
|
||
doc_metas.append({
|
||
"doc_uuid": doc_uuid,
|
||
"docType": head.get("docType"), # "BOARD" / "PCB" / "SCH" / "SCH_PAGE" / "SYMBOL" / ...
|
||
"path": local_rel,
|
||
"size": size,
|
||
"sha256": sha,
|
||
"message_count": len(info["lines"]),
|
||
})
|
||
|
||
# editor_version fallback: project meta if no DOCHEAD payload had it
|
||
if not editor_version and project_editor_version:
|
||
editor_version = str(project_editor_version)
|
||
|
||
structure_summary: dict[str, int] = {}
|
||
if isinstance(structure, dict):
|
||
for k, v in structure.items():
|
||
if isinstance(v, dict):
|
||
structure_summary[k] = len(v)
|
||
elif isinstance(v, list):
|
||
structure_summary[k] = len(v)
|
||
|
||
manifest = {
|
||
"project_uuid": project_uuid,
|
||
"branch_uuid": branch_uuid,
|
||
"head_uuid": head_uuid,
|
||
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
||
"editor_version": editor_version,
|
||
"chain_length": len(chain),
|
||
"blob_bytes_total": bytes_blob_total,
|
||
"plain_bytes_total": bytes_plain_total,
|
||
"documents": doc_metas,
|
||
"structure_summary": structure_summary,
|
||
}
|
||
(src_dir / "manifest.json").write_text(
|
||
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
|
||
return {
|
||
"source_format": "easyeda-pro",
|
||
"source_path": "source/",
|
||
"source_documents": doc_metas,
|
||
"editor_version": editor_version,
|
||
}
|
||
|
||
|
||
def _pro_post_json(
|
||
client: httpx.Client,
|
||
url: str,
|
||
project_uuid: str,
|
||
body: dict,
|
||
) -> object:
|
||
"""POST a pro.lceda.cn endpoint with `path` header, validate envelope."""
|
||
r = client.post(
|
||
url,
|
||
json=body,
|
||
headers={"path": project_uuid, "Content-Type": "application/json"},
|
||
)
|
||
r.raise_for_status()
|
||
j = r.json()
|
||
if not j.get("success"):
|
||
raise RuntimeError(f"Pro API failed (POST {url}): {j}")
|
||
return j["result"]
|
||
|
||
|
||
def _fetch_pro_legacy(
|
||
pro_client: httpx.Client,
|
||
project_uuid: str,
|
||
proj: dict,
|
||
proj_dir: Path,
|
||
sleep: float = SLEEP_PRO,
|
||
) -> dict:
|
||
"""Legacy Pro 2.x fetcher: project meta has `boards: [{sch, pcb, name}]` and
|
||
no branch model. Documents are fetched via `/api/v2/documents/lists` (Std-style
|
||
plaintext dataStr); resources/coppers/textpath/blobs come from supplementary
|
||
POST endpoints. Reverse-engineered from HAR `tmp/prodownload3.har`
|
||
(2026-04-28); see docs/sources/easyeda_pro_source.md §1.1.
|
||
|
||
Side effects under ``proj_dir``:
|
||
- source/ticket.json — full project manifest (counts of all asset types)
|
||
- source/<sheet_uuid>.json — schematic sheet content (docType=1)
|
||
- source/pcb_<pcb_uuid>.json — PCB content (docType=3)
|
||
- source/coppers.json — copper pour data (if any)
|
||
- source/textpath.json — text path / font data (if any)
|
||
- source/blobs.json — embedded image blobs (if any)
|
||
- source/manifest.json — index across all of the above
|
||
"""
|
||
src_dir = proj_dir / "source"
|
||
src_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
boards = proj.get("boards") or []
|
||
if not boards:
|
||
raise RuntimeError(f"legacy project {project_uuid} has no boards[] in meta")
|
||
project_editor_version = proj.get("editorVersion")
|
||
|
||
# 1. ticket — full manifest (counts of every asset type the project owns)
|
||
ticket = pro_client.get(
|
||
f"https://pro.lceda.cn/api/projects/{project_uuid}/ticket",
|
||
params={"uuid": project_uuid, "g_ticket": "-1"},
|
||
headers={"path": project_uuid},
|
||
)
|
||
ticket.raise_for_status()
|
||
ticket_j = ticket.json()
|
||
if not ticket_j.get("success"):
|
||
raise RuntimeError(f"ticket endpoint failed: {ticket_j}")
|
||
manifest_ticket = ticket_j["result"]
|
||
(src_dir / "ticket.json").write_text(
|
||
json.dumps(manifest_ticket, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
time.sleep(sleep)
|
||
|
||
doc_metas: list[dict] = []
|
||
|
||
# 2. schematic containers -> sheet UUIDs via /api/schematic/lists
|
||
# Filter `boards[].sch` against `ticket.schematics` keys: a board may
|
||
# reference a deprecated/deleted sch (e.g. "主控板V1(废弃)") whose
|
||
# UUID is gone from the schematics dict. Asking schematic/lists for it
|
||
# returns 401 and aborts the whole batch. Skip it.
|
||
valid_sch_uuids = set((manifest_ticket.get("schematics") or {}).keys())
|
||
sch_container_uuids = [
|
||
b["sch"] for b in boards if b.get("sch") and b["sch"] in valid_sch_uuids
|
||
]
|
||
sheet_uuids: list[str] = []
|
||
if sch_container_uuids:
|
||
containers = _pro_post_json(
|
||
pro_client,
|
||
"https://pro.lceda.cn/api/schematic/lists",
|
||
project_uuid,
|
||
{"uuids": sch_container_uuids},
|
||
)
|
||
if isinstance(containers, list):
|
||
for c in containers:
|
||
for s in c.get("sort") or []:
|
||
su = s.get("uuid")
|
||
if su:
|
||
sheet_uuids.append(su)
|
||
time.sleep(sleep)
|
||
|
||
# 3. schematic sheets via documents/lists docType=1 (plaintext dataStr per sheet)
|
||
if sheet_uuids:
|
||
sheets = _pro_post_json(
|
||
pro_client,
|
||
"https://pro.lceda.cn/api/v2/documents/lists",
|
||
project_uuid,
|
||
{"uuids": sheet_uuids, "docType": 1},
|
||
)
|
||
for s in (sheets or []):
|
||
doc_uuid = s["uuid"]
|
||
local_rel = f"source/{doc_uuid}.json"
|
||
text = json.dumps(s, ensure_ascii=False, separators=(",", ":"))
|
||
(proj_dir / local_rel).write_text(text, encoding="utf-8")
|
||
doc_metas.append({
|
||
"doc_uuid": doc_uuid,
|
||
"docType": 1,
|
||
"path": local_rel,
|
||
"size": len(text.encode("utf-8")),
|
||
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(),
|
||
})
|
||
time.sleep(sleep)
|
||
|
||
# 4. PCB documents via documents/lists docType=3
|
||
# Same deprecated-uuid risk as #2 — filter against ticket.pcbs keys.
|
||
valid_pcb_uuids = set((manifest_ticket.get("pcbs") or {}).keys())
|
||
pcb_uuids = [
|
||
b["pcb"] for b in boards if b.get("pcb") and b["pcb"] in valid_pcb_uuids
|
||
]
|
||
if pcb_uuids:
|
||
pcbs = _pro_post_json(
|
||
pro_client,
|
||
"https://pro.lceda.cn/api/v2/documents/lists",
|
||
project_uuid,
|
||
{"uuids": pcb_uuids, "docType": 3},
|
||
)
|
||
for p in (pcbs or []):
|
||
doc_uuid = p["uuid"]
|
||
local_rel = f"source/pcb_{doc_uuid}.json"
|
||
text = json.dumps(p, ensure_ascii=False, separators=(",", ":"))
|
||
(proj_dir / local_rel).write_text(text, encoding="utf-8")
|
||
doc_metas.append({
|
||
"doc_uuid": doc_uuid,
|
||
"docType": 3,
|
||
"path": local_rel,
|
||
"size": len(text.encode("utf-8")),
|
||
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(),
|
||
})
|
||
time.sleep(sleep)
|
||
|
||
# 5. supplementary PCB layer assets — coppers / textpath / resources (blobs)
|
||
aux: dict[str, object] = {}
|
||
copper_paths = list((manifest_ticket.get("coppers") or {}).keys())
|
||
if copper_paths:
|
||
coppers = _pro_post_json(
|
||
pro_client,
|
||
"https://pro.lceda.cn/api/coppers/search",
|
||
project_uuid,
|
||
{"paths": copper_paths},
|
||
)
|
||
(src_dir / "coppers.json").write_text(
|
||
json.dumps(coppers, ensure_ascii=False), encoding="utf-8"
|
||
)
|
||
aux["coppers_count"] = len(coppers) if isinstance(coppers, list) else 0
|
||
time.sleep(sleep)
|
||
|
||
textpath_paths = list((manifest_ticket.get("textpath") or {}).keys())
|
||
if textpath_paths:
|
||
textpath = _pro_post_json(
|
||
pro_client,
|
||
"https://pro.lceda.cn/api/textpath/search",
|
||
project_uuid,
|
||
{
|
||
"paths": textpath_paths,
|
||
"project_uuid": project_uuid,
|
||
"path": project_uuid,
|
||
},
|
||
)
|
||
(src_dir / "textpath.json").write_text(
|
||
json.dumps(textpath, ensure_ascii=False), encoding="utf-8"
|
||
)
|
||
aux["textpath_count"] = len(textpath) if isinstance(textpath, list) else 0
|
||
time.sleep(sleep)
|
||
|
||
blob_hashes = list((manifest_ticket.get("blobs") or {}).keys())
|
||
if blob_hashes:
|
||
blobs = _pro_post_json(
|
||
pro_client,
|
||
"https://pro.lceda.cn/api/v2/resources/search",
|
||
project_uuid,
|
||
{"hash": blob_hashes, "project_uuid": project_uuid},
|
||
)
|
||
(src_dir / "blobs.json").write_text(
|
||
json.dumps(blobs, ensure_ascii=False), encoding="utf-8"
|
||
)
|
||
aux["blobs_count"] = len(blobs) if isinstance(blobs, list) else 0
|
||
time.sleep(sleep)
|
||
|
||
# 6. manifest.json — overall index
|
||
structure_summary = {
|
||
"boards": len(boards),
|
||
"schematic_containers": len(sch_container_uuids),
|
||
"schematic_sheets": len(sheet_uuids),
|
||
"pcbs": len(pcb_uuids),
|
||
"symbols": len(manifest_ticket.get("symbols") or {}),
|
||
"footprints": len(manifest_ticket.get("footprints") or {}),
|
||
"devices": len(manifest_ticket.get("devices") or {}),
|
||
"coppers": len(manifest_ticket.get("coppers") or {}),
|
||
"textpath": len(manifest_ticket.get("textpath") or {}),
|
||
"blobs": len(manifest_ticket.get("blobs") or {}),
|
||
}
|
||
manifest = {
|
||
"project_uuid": project_uuid,
|
||
"fetched_at": datetime.now(timezone.utc).isoformat(),
|
||
"editor_version": project_editor_version,
|
||
"boards": boards,
|
||
"documents": doc_metas,
|
||
"structure_summary": structure_summary,
|
||
"aux": aux,
|
||
}
|
||
(src_dir / "manifest.json").write_text(
|
||
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
|
||
return {
|
||
"source_format": "easyeda-pro-legacy",
|
||
"source_path": "source/",
|
||
"source_documents": doc_metas,
|
||
"editor_version": project_editor_version,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Single-project crawl
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dc.dataclass
|
||
class CrawlResult:
|
||
project_id: str
|
||
out_dir: Path
|
||
files_count: int
|
||
bytes_total: int
|
||
skipped_files: list[str]
|
||
|
||
|
||
def crawl_one(
|
||
client: httpx.Client,
|
||
list_item: dict,
|
||
out_root: Path,
|
||
fetch_files: bool = True,
|
||
fetch_cover: bool = True,
|
||
source_client: httpx.Client | None = None,
|
||
pro_source_client: httpx.Client | None = None,
|
||
skip_exts: set[str] | None = None,
|
||
max_source_mb: int | None = None,
|
||
) -> CrawlResult:
|
||
uuid = list_item["uuid"]
|
||
path = list_item["path"]
|
||
proj_dir = out_root / uuid
|
||
proj_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 1. Fetch detail HTML.
|
||
# No polite_sleep after — the next call goes to image.lceda.cn (cover)
|
||
# or to attachment CDN, both different hosts from the detail server.
|
||
# Sleep is used to space hits to the *same* host before the next iteration.
|
||
detail_url = f"{BASE}/{path}"
|
||
r = client.get(detail_url)
|
||
r.raise_for_status()
|
||
detail = parse_detail_html(r.text)
|
||
|
||
# 2. Cover image (skipped via fetch_cover=False — useful for scan-only modes
|
||
# like Step 1 of batch ingest where we only want license/meta).
|
||
thumb_url = list_item["thumb"]
|
||
if thumb_url.startswith("//"):
|
||
thumb_url = "https:" + thumb_url
|
||
cover_rel = None
|
||
if thumb_url and fetch_cover:
|
||
ext = Path(urllib.parse.urlparse(thumb_url).path).suffix or ".jpg"
|
||
cover_rel = f"cover{ext}"
|
||
try:
|
||
# Cover thumbs should be small (~100-300 KB). Cap walltime at 15s
|
||
# so a pathologically slow CDN connection can't hang the loop.
|
||
download_to(client, thumb_url, proj_dir / cover_rel, max_seconds=15.0)
|
||
except httpx.HTTPError as e:
|
||
print(f" cover failed: {e}", file=sys.stderr)
|
||
cover_rel = None
|
||
polite_sleep()
|
||
|
||
# 3. Description markdown (combine meta + introduction)
|
||
desc_md_parts = [f"# {list_item['name']}\n"]
|
||
if detail.get("description_meta"):
|
||
desc_md_parts.append(detail["description_meta"].strip())
|
||
elif list_item.get("introduction"):
|
||
desc_md_parts.append(list_item["introduction"].strip())
|
||
desc_md_parts.append(
|
||
f"\n---\n"
|
||
f"- Source: {detail_url}\n"
|
||
f"- Author: {list_item['owner'].get('nickname')} "
|
||
f"({list_item['owner'].get('username')})\n"
|
||
f"- License: {detail.get('license') or 'unknown'}\n"
|
||
f"- Published: {list_item.get('oshwhub_publish_at')}\n"
|
||
)
|
||
(proj_dir / "description.md").write_text("\n".join(desc_md_parts), encoding="utf-8")
|
||
|
||
# 4. Files
|
||
files_meta: list[dict] = []
|
||
skipped: list[str] = []
|
||
bytes_total = 0
|
||
for a in detail.get("attachments", []):
|
||
src = a.get("src") or ""
|
||
if not src:
|
||
continue
|
||
file_url = IMG_CDN + src if src.startswith("/") else src
|
||
name = a.get("name") or Path(src).name
|
||
safe_name = re.sub(r'[/\\:*?"<>|]', "_", name)
|
||
local_rel = f"files/{safe_name}"
|
||
local_path = proj_dir / local_rel
|
||
|
||
entry: dict = {
|
||
"name": name,
|
||
"url": file_url,
|
||
"original_id": a.get("uuid"),
|
||
"ext": a.get("ext"),
|
||
"mime": a.get("mime"),
|
||
"size": a.get("size"),
|
||
"md5": a.get("md5"),
|
||
}
|
||
# ext gate: declared `ext` first, fall back to filename suffix. Lower-case
|
||
# compare; entry kept in metadata so we can re-fetch later if policy changes.
|
||
ext_token = (a.get("ext") or Path(safe_name).suffix.lstrip(".")).lower()
|
||
if skip_exts and ext_token in skip_exts:
|
||
entry["skipped"] = f"ext:{ext_token}"
|
||
skipped.append(f"{name}: ext:{ext_token}")
|
||
files_meta.append(entry)
|
||
continue
|
||
if fetch_files:
|
||
try:
|
||
size, sha = download_to(client, file_url, local_path)
|
||
entry["path"] = local_rel
|
||
entry["sha256"] = sha
|
||
if entry.get("size") and entry["size"] != size:
|
||
entry["size_actual"] = size
|
||
else:
|
||
entry["size"] = size
|
||
bytes_total += size
|
||
except httpx.HTTPError as e:
|
||
skipped.append(f"{name}: {e}")
|
||
print(f" file skipped {name}: {e}", file=sys.stderr)
|
||
polite_sleep()
|
||
files_meta.append(entry)
|
||
|
||
# 5. URL manifest (for files we couldn't download or for future re-download)
|
||
urls_manifest = {
|
||
"detail_url": detail_url,
|
||
"cover_url": thumb_url,
|
||
"attachments": [
|
||
{"name": f["name"], "url": f["url"], "original_id": f.get("original_id")}
|
||
for f in files_meta
|
||
],
|
||
}
|
||
(proj_dir / "_urls.json").write_text(
|
||
json.dumps(urls_manifest, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
|
||
# 6. Optional: EasyEDA project source — dispatch on origin (std vs pro)
|
||
src_meta: dict = {}
|
||
origin = list_item.get("origin")
|
||
if origin == "pro" and pro_source_client is not None:
|
||
try:
|
||
src_meta = fetch_pro_source(
|
||
pro_source_client, uuid, proj_dir, max_source_mb=max_source_mb
|
||
)
|
||
print(
|
||
f" pro source: {len(src_meta.get('source_documents', []))} docs, "
|
||
f"editor={src_meta.get('editor_version')}"
|
||
)
|
||
except ProjectOversizeError as e:
|
||
print(f" pro source SKIPPED (oversize): {e}", file=sys.stderr)
|
||
skipped.append(f"pro_source: oversize ({e.bytes_so_far // 1024 // 1024} MB > {e.cap_mb} MB)")
|
||
_record_oversize(out_root, uuid, e)
|
||
except Exception as e: # noqa: BLE001
|
||
print(f" pro source FAIL: {e}", file=sys.stderr)
|
||
skipped.append(f"pro_source: {e}")
|
||
elif origin != "pro" and source_client is not None:
|
||
try:
|
||
src_meta = fetch_std_source(source_client, uuid, proj_dir)
|
||
print(
|
||
f" source: {len(src_meta.get('source_documents', []))} docs, "
|
||
f"editor={src_meta.get('editor_version')}"
|
||
)
|
||
except Exception as e: # noqa: BLE001
|
||
print(f" source FAIL: {e}", file=sys.stderr)
|
||
skipped.append(f"source: {e}")
|
||
|
||
# 7. Unified metadata
|
||
meta = {
|
||
"source": "oshwhub",
|
||
"source_url": detail_url,
|
||
"project_id": uuid,
|
||
"title": detail.get("title") or list_item["name"],
|
||
"description_short": list_item.get("introduction") or "",
|
||
"description_path": "description.md",
|
||
"author": {
|
||
"username": list_item["owner"]["username"],
|
||
"display_name": list_item["owner"].get("nickname"),
|
||
"user_id": list_item["owner"].get("uuid"),
|
||
},
|
||
"license": detail.get("license") or "unknown",
|
||
"tags": list_item.get("tags") or [],
|
||
"created_at": list_item.get("created_at"),
|
||
"updated_at": list_item.get("updated_at"),
|
||
"published_at": list_item.get("oshwhub_publish_at"),
|
||
"crawled_at": datetime.now(timezone.utc).isoformat(),
|
||
"metrics": {
|
||
"likes": (list_item.get("count") or {}).get("like", 0),
|
||
"stars": (list_item.get("count") or {}).get("star", 0),
|
||
"forks": (list_item.get("count") or {}).get("fork", 0),
|
||
"views": (list_item.get("count") or {}).get("views", 0),
|
||
"watch": (list_item.get("count") or {}).get("watch", 0),
|
||
"comments": list_item.get("comments_count", 0),
|
||
},
|
||
"cover": {"url": thumb_url, "path": cover_rel} if thumb_url else None,
|
||
"files": files_meta,
|
||
"raw_fields": {
|
||
"path": list_item["path"],
|
||
"grade": list_item.get("grade"),
|
||
"origin": list_item.get("origin"),
|
||
"public": list_item.get("public"),
|
||
"publish": list_item.get("publish"),
|
||
"skipped_files": skipped,
|
||
},
|
||
}
|
||
if src_meta:
|
||
meta["source_format"] = src_meta["source_format"]
|
||
meta["source_path"] = src_meta["source_path"]
|
||
meta["source_documents"] = src_meta["source_documents"]
|
||
if src_meta.get("editor_version"):
|
||
meta["editor_version"] = src_meta["editor_version"]
|
||
(proj_dir / "metadata.json").write_text(
|
||
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
|
||
return CrawlResult(
|
||
project_id=uuid,
|
||
out_dir=proj_dir,
|
||
files_count=len(files_meta),
|
||
bytes_total=bytes_total,
|
||
skipped_files=skipped,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def iter_candidates(
|
||
client: httpx.Client,
|
||
pages: int,
|
||
page_size: int,
|
||
sort: str,
|
||
origin: str | None = None,
|
||
) -> Iterator[dict]:
|
||
for p in range(1, pages + 1):
|
||
res = list_projects(client, page=p, page_size=page_size, sort=sort, origin=origin)
|
||
for it in res["lists"]:
|
||
yield it
|
||
polite_sleep()
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
ap = argparse.ArgumentParser(description="oshwhub MVP crawler")
|
||
ap.add_argument("--out", type=Path, default=Path("data/raw/oshwhub"))
|
||
ap.add_argument("--top", type=int, default=10, help="number of projects to crawl")
|
||
ap.add_argument("--min-likes", type=int, default=50)
|
||
ap.add_argument("--min-grade", type=int, default=4)
|
||
ap.add_argument("--pages", type=int, default=3, help="list API pages to scan")
|
||
ap.add_argument("--page-size", type=int, default=30)
|
||
ap.add_argument("--sort", default="hot")
|
||
ap.add_argument(
|
||
"--origin",
|
||
choices=["std", "pro"],
|
||
default=None,
|
||
help="filter listing API by origin (server-side); needed to find Pro projects in top-N",
|
||
)
|
||
ap.add_argument("--uuids", type=str, default=None, help="comma-separated explicit UUID list")
|
||
ap.add_argument("--no-files", action="store_true", help="do not download attachments")
|
||
ap.add_argument("--limit", type=int, default=None, help="override --top, same effect")
|
||
ap.add_argument(
|
||
"--with-source",
|
||
action="store_true",
|
||
help="also fetch EasyEDA Std project source (schematic + PCB dataStr) per project",
|
||
)
|
||
ap.add_argument(
|
||
"--backfill-source",
|
||
action="store_true",
|
||
help="skip listing/HTML/attachments; only fetch source for projects already in --out",
|
||
)
|
||
ap.add_argument(
|
||
"--with-pro-source",
|
||
action="store_true",
|
||
help="also fetch EasyEDA Pro project source (full history chain, EPRO2 streams) per project",
|
||
)
|
||
ap.add_argument(
|
||
"--backfill-pro-source",
|
||
action="store_true",
|
||
help="skip listing; only fetch Pro source for origin=pro projects already in --out",
|
||
)
|
||
ap.add_argument(
|
||
"--pro-cookie",
|
||
type=str,
|
||
default=PRO_COOKIE_PATH_DEFAULT,
|
||
help="path to file with Cookie header for pro.lceda.cn",
|
||
)
|
||
ap.add_argument(
|
||
"--skip-ext",
|
||
type=str,
|
||
default=None,
|
||
help="comma-separated list of attachment extensions to skip (e.g. mp4,qt,mov). "
|
||
"Saves ~30-50%% LFS storage on average. Entry still recorded in metadata.json "
|
||
"with skipped:ext:<token> so we can re-fetch later.",
|
||
)
|
||
ap.add_argument(
|
||
"--max-source-mb",
|
||
type=int,
|
||
default=None,
|
||
help="skip Pro modern projects whose chain blob total exceeds N MB. "
|
||
"Trips inside the chain loop, wipes partial source/, records to "
|
||
"data/state/oshwhub_pro_oversize.jsonl. No effect on Std or Pro 2.x legacy.",
|
||
)
|
||
ap.add_argument(
|
||
"--from-jsonl",
|
||
type=Path,
|
||
default=None,
|
||
help="read pre-selected listing items from a jsonl (one item per line, "
|
||
"shape matches /api/project listing entries). Bypasses the listing "
|
||
"API entirely — useful when candidates were chosen offline from the "
|
||
"full-corpus index, where most aren't in the default top-N pages.",
|
||
)
|
||
ap.add_argument(
|
||
"--no-cover",
|
||
action="store_true",
|
||
help="skip cover image download. For scan-only runs (license / meta scrape) "
|
||
"this drops ~1.3s/project and avoids slow-CDN hangs.",
|
||
)
|
||
ap.add_argument(
|
||
"--concurrency",
|
||
type=int,
|
||
default=1,
|
||
help="number of parallel crawl_one workers (ThreadPoolExecutor). Default 1 "
|
||
"= serial. Anonymous endpoints (oshwhub.com detail/listing) tolerate "
|
||
"concurrency 5+ comfortably; only enable when no auth is involved or "
|
||
"you've probed the host.",
|
||
)
|
||
args = ap.parse_args(argv)
|
||
skip_exts: set[str] | None = (
|
||
{x.strip().lower().lstrip(".") for x in args.skip_ext.split(",") if x.strip()}
|
||
if args.skip_ext else None
|
||
)
|
||
|
||
n_target = args.limit if args.limit is not None else args.top
|
||
args.out.mkdir(parents=True, exist_ok=True)
|
||
|
||
# --backfill-source: standalone path that scans existing project dirs and
|
||
# only fetches source. No listing/HTML/attachment work.
|
||
if args.backfill_source:
|
||
return _run_backfill_source(
|
||
args.out, only_uuids=args.uuids, concurrency=args.concurrency
|
||
)
|
||
if args.backfill_pro_source:
|
||
return _run_backfill_pro_source(
|
||
args.out,
|
||
only_uuids=args.uuids,
|
||
cookie_path=args.pro_cookie,
|
||
max_source_mb=args.max_source_mb,
|
||
concurrency=args.concurrency,
|
||
)
|
||
|
||
with make_client() as client:
|
||
# Build list of items to crawl
|
||
if args.from_jsonl:
|
||
items = [json.loads(ln) for ln in args.from_jsonl.read_text().splitlines() if ln.strip()]
|
||
if args.uuids:
|
||
wanted = set(args.uuids.split(","))
|
||
items = [i for i in items if i.get("uuid") in wanted]
|
||
print(f"loaded {len(items)} items from {args.from_jsonl}")
|
||
elif args.uuids:
|
||
wanted = set(args.uuids.split(","))
|
||
items: list[dict] = []
|
||
for it in iter_candidates(
|
||
client, args.pages, args.page_size, args.sort, origin=args.origin
|
||
):
|
||
if it["uuid"] in wanted:
|
||
items.append(it)
|
||
if len(items) == len(wanted):
|
||
break
|
||
if len(items) < len(wanted):
|
||
missing = wanted - {i["uuid"] for i in items}
|
||
print(f"WARN: missing uuids (not in top pages): {missing}", file=sys.stderr)
|
||
else:
|
||
pool = list(
|
||
iter_candidates(
|
||
client, args.pages, args.page_size, args.sort, origin=args.origin
|
||
)
|
||
)
|
||
items = pick_top(
|
||
pool, n=n_target, min_likes=args.min_likes, min_grade=args.min_grade
|
||
)
|
||
if len(items) < n_target:
|
||
print(
|
||
f"WARN: only {len(items)} items passed filters "
|
||
f"(wanted {n_target})",
|
||
file=sys.stderr,
|
||
)
|
||
|
||
source_client_ctx = make_source_client() if args.with_source else None
|
||
pro_source_client_ctx = (
|
||
make_pro_source_client(args.pro_cookie) if args.with_pro_source else None
|
||
)
|
||
try:
|
||
print(f"Crawling {len(items)} projects -> {args.out} (concurrency={args.concurrency})")
|
||
print_lock = threading.Lock()
|
||
|
||
def _do_one(i: int, it: dict) -> None:
|
||
try:
|
||
r = crawl_one(
|
||
client,
|
||
it,
|
||
args.out,
|
||
fetch_files=not args.no_files,
|
||
fetch_cover=not args.no_cover,
|
||
source_client=source_client_ctx,
|
||
pro_source_client=pro_source_client_ctx,
|
||
skip_exts=skip_exts,
|
||
max_source_mb=args.max_source_mb,
|
||
)
|
||
with print_lock:
|
||
print(
|
||
f"[{i}/{len(items)}] OK {it['path']}: "
|
||
f"{r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB "
|
||
f"(skipped: {len(r.skipped_files)})"
|
||
)
|
||
except Exception as e: # noqa: BLE001
|
||
with print_lock:
|
||
print(f"[{i}/{len(items)}] FAIL {it['path']}: {e}", file=sys.stderr)
|
||
|
||
if args.concurrency <= 1:
|
||
for i, it in enumerate(items, 1):
|
||
_do_one(i, it)
|
||
else:
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
with ThreadPoolExecutor(max_workers=args.concurrency) as pool:
|
||
futs = [pool.submit(_do_one, i, it) for i, it in enumerate(items, 1)]
|
||
for f in as_completed(futs):
|
||
f.result()
|
||
finally:
|
||
if source_client_ctx is not None:
|
||
source_client_ctx.close()
|
||
if pro_source_client_ctx is not None:
|
||
pro_source_client_ctx.close()
|
||
|
||
return 0
|
||
|
||
|
||
def _run_backfill_concurrent(
|
||
targets: list[Path],
|
||
fetch_one,
|
||
*,
|
||
concurrency: int,
|
||
label: str,
|
||
size_unit: str = "MB",
|
||
) -> None:
|
||
"""Shared worker dispatch for std/pro backfill.
|
||
|
||
`fetch_one(proj_dir, meta) -> (status, src_meta_or_msg)` where status is one of:
|
||
"ok" — `src_meta` is the fetch_*_source dict
|
||
"oversize" — Pro-only; ProjectOversizeError already recorded by caller
|
||
"fail" — `src_meta_or_msg` is the human-readable error string
|
||
|
||
Threads share fetch_one's captured client (httpx.Client is sync-thread-safe).
|
||
Print + per-target metadata writes are independent (each project owns its
|
||
metadata.json), so no lock needed there. Only stdout ordering is serialized.
|
||
"""
|
||
print_lock = threading.Lock()
|
||
|
||
def _do_one(idx: int, proj_dir: Path) -> None:
|
||
uuid = proj_dir.name
|
||
meta_path = proj_dir / "metadata.json"
|
||
try:
|
||
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||
except Exception as e: # noqa: BLE001
|
||
with print_lock:
|
||
print(f"[{idx}/{len(targets)}] meta read FAIL {uuid}: {e}", file=sys.stderr)
|
||
return
|
||
title = meta.get("title", "?")
|
||
status, payload = fetch_one(proj_dir, meta)
|
||
with print_lock:
|
||
head = f"[{idx}/{len(targets)}] {uuid}"
|
||
if status == "ok":
|
||
src_meta = payload
|
||
meta["source_format"] = src_meta["source_format"]
|
||
meta["source_path"] = src_meta["source_path"]
|
||
meta["source_documents"] = src_meta["source_documents"]
|
||
if src_meta.get("editor_version"):
|
||
meta["editor_version"] = src_meta["editor_version"]
|
||
meta_path.write_text(
|
||
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
|
||
)
|
||
docs = src_meta["source_documents"]
|
||
total = sum(d["size"] for d in docs)
|
||
if size_unit == "KB":
|
||
sz = f"{total / 1024:.1f} KB"
|
||
else:
|
||
sz = f"{total / 1024 / 1024:.1f} MB"
|
||
print(
|
||
f"{head} OK ({title[:30]}): {len(docs)} docs, {sz}, "
|
||
f"editor={src_meta.get('editor_version')}"
|
||
)
|
||
elif status == "oversize":
|
||
print(f"{head} SKIPPED oversize ({title[:30]}): {payload}", file=sys.stderr)
|
||
else:
|
||
print(f"{head} FAIL ({title[:30]}): {payload}", file=sys.stderr)
|
||
|
||
print(f"Backfill {label} for {len(targets)} projects (concurrency={concurrency})")
|
||
if concurrency <= 1:
|
||
for i, d in enumerate(targets, 1):
|
||
_do_one(i, d)
|
||
else:
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
with ThreadPoolExecutor(max_workers=concurrency) as pool:
|
||
futs = [pool.submit(_do_one, i, d) for i, d in enumerate(targets, 1)]
|
||
for f in as_completed(futs):
|
||
f.result()
|
||
|
||
|
||
def _discover_backfill_targets(
|
||
out_root: Path,
|
||
only_uuids: str | None,
|
||
*,
|
||
require_origin_pro: bool = False,
|
||
) -> list[Path]:
|
||
"""Walk out_root for per-project dirs that have metadata.json and match
|
||
the filter. If `only_uuids` is given, that's the authoritative whitelist;
|
||
otherwise (Pro only) we fall back to filtering by metadata.raw_fields.origin."""
|
||
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
|
||
targets: list[Path] = []
|
||
for d in sorted(out_root.iterdir()):
|
||
if not d.is_dir() or not (d / "metadata.json").exists():
|
||
continue
|
||
if wanted is not None:
|
||
if d.name not in wanted:
|
||
continue
|
||
elif require_origin_pro:
|
||
try:
|
||
m = json.loads((d / "metadata.json").read_text(encoding="utf-8"))
|
||
except Exception: # noqa: BLE001
|
||
continue
|
||
if (m.get("raw_fields") or {}).get("origin") != "pro":
|
||
continue
|
||
targets.append(d)
|
||
return targets
|
||
|
||
|
||
def _run_backfill_source(
|
||
out_root: Path,
|
||
only_uuids: str | None = None,
|
||
concurrency: int = 1,
|
||
) -> int:
|
||
"""Walk per-project dirs and fetch Std source.json into each.
|
||
|
||
Updates metadata.json in-place to add source_format / source_documents /
|
||
editor_version.
|
||
"""
|
||
targets = _discover_backfill_targets(out_root, only_uuids)
|
||
src_client = make_source_client()
|
||
|
||
def fetch(proj_dir: Path, meta: dict):
|
||
try:
|
||
return ("ok", fetch_std_source(src_client, proj_dir.name, proj_dir))
|
||
except Exception as e: # noqa: BLE001
|
||
return ("fail", str(e))
|
||
|
||
try:
|
||
_run_backfill_concurrent(
|
||
targets, fetch, concurrency=concurrency, label="std source", size_unit="KB"
|
||
)
|
||
finally:
|
||
src_client.close()
|
||
return 0
|
||
|
||
|
||
def _run_backfill_pro_source(
|
||
out_root: Path,
|
||
only_uuids: str | None = None,
|
||
cookie_path: str = PRO_COOKIE_PATH_DEFAULT,
|
||
max_source_mb: int | None = None,
|
||
concurrency: int = 1,
|
||
) -> int:
|
||
"""Walk per-project dirs and fetch Pro source into each (modern + legacy)."""
|
||
targets = _discover_backfill_targets(
|
||
out_root, only_uuids, require_origin_pro=(only_uuids is None)
|
||
)
|
||
pro_client = make_pro_source_client(cookie_path=cookie_path)
|
||
# Multiple threads may trip oversize concurrently; serialize the state-file
|
||
# append so lines don't interleave.
|
||
oversize_lock = threading.Lock()
|
||
|
||
def fetch(proj_dir: Path, meta: dict):
|
||
uuid = proj_dir.name
|
||
try:
|
||
return (
|
||
"ok",
|
||
fetch_pro_source(
|
||
pro_client, uuid, proj_dir, max_source_mb=max_source_mb
|
||
),
|
||
)
|
||
except ProjectOversizeError as e:
|
||
with oversize_lock:
|
||
_record_oversize(out_root, uuid, e)
|
||
return ("oversize", str(e))
|
||
except Exception as e: # noqa: BLE001
|
||
return ("fail", str(e))
|
||
|
||
try:
|
||
_run_backfill_concurrent(
|
||
targets, fetch, concurrency=concurrency, label="pro source"
|
||
)
|
||
finally:
|
||
pro_client.close()
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|