"""oshwhub.com crawler — MVP. Usage: uv run python -m crawlers.oshwhub \ --out data/raw/oshwhub \ --top 10 --min-likes 50 --min-grade 4 Or with explicit UUID list: uv run python -m crawlers.oshwhub \ --uuids 298873b7fdbe44f8ba0e7351e023bc2c,7b6a398811f14eba9a952b8d2ddd7ace \ --out data/raw/oshwhub """ from __future__ import annotations import argparse import dataclasses as dc import hashlib import html as _html import json import re import shutil import sys import threading import time import urllib.parse from datetime import datetime, timezone from pathlib import Path from typing import Iterator import httpx API_LIST = "https://oshwhub.com/api/project" API_PROJECT = "https://oshwhub.com/api/project" # /api/project/ for source flow BASE = "https://oshwhub.com" IMG_CDN = "https://image.lceda.cn" LCEDA_DOC_API = "https://lceda.cn/api/documents" PRO_API = "https://pro.lceda.cn/api/v4" PRO_EDITOR_VERSION = "3.2.127" PRO_COOKIE_PATH_DEFAULT = "/home/ubuntu/.secrets/pro-lceda-cookie-header.txt" UA = "FacereDataset/0.1 (+https://git.deepknow.site/Facere/FacereDataset)" # Std source endpoints reject FacereDataset UA on oshwhub /api/project; spoof browser UA only there. # See docs/sources/easyeda_std_source.md §3. BROWSER_UA = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/147.0.0.0 Safari/537.36" ) # Per-host rate limits — calibrated against ladder probes (scripts/probe_rate_limit.py) # on 2026-04-29. See data/state/probe_rate_limit_results.md for the methodology. SLEEP_BETWEEN = 1.0 # oshwhub.com detail/listing — ladder probe: 0.5s clean, # 1.0s leaves headroom (detail HTML p90 hits 6s at 1.0s, # 15s at 0.5s due to server-queue softlimit). SLEEP_SOURCE = 0.5 # lceda.cn Std source endpoints — ladder probe 5/2/1/0.5/0.25s # all clean (45/45 200/success). Latency is dominated by # payload size (Std docs span 4 KB to 4.5 MB) not server # backpressure. Same posture as Pro API. 10x speedup. SLEEP_PRO = 0.5 # pro.lceda.cn API host — sustained burst probe (25 # distinct UUIDs at 0.5s) showed 0/25 errors, median # latency 410ms. 10x faster than the original 5.0s. # Originally set high out of caution because Pro requires # logged-in cookie; empirically Pro API tolerates QPS=2 # cleanly. CDN blob loop uses SLEEP_PRO_CDN below. SLEEP_PRO_CDN = 0.2 # modules.lceda.cn — CDN serving AES-encrypted EPRO2 # history blobs. The editor fires these back-to-back per # HAR analysis. Chain replay walltime dominated by this # loop on big projects (X86 board: ~1h at 5s/req → # ~3 min at 0.2s/req). # --------------------------------------------------------------------------- # HTTP # --------------------------------------------------------------------------- def make_client(timeout: float = 30.0) -> httpx.Client: return httpx.Client( http2=True, timeout=timeout, headers={"User-Agent": UA, "Accept": "text/html,application/json;q=0.9,*/*;q=0.8"}, follow_redirects=True, ) def make_source_client(timeout: float = 60.0) -> httpx.Client: """Client for Std source endpoints (lceda.cn/oshwhub.com /api/...). Uses browser UA + editor Referer to satisfy server-side UA filter. """ return httpx.Client( http2=True, timeout=timeout, headers={ "User-Agent": BROWSER_UA, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Referer": "https://lceda.cn/editor", }, follow_redirects=False, ) def make_pro_source_client( cookie_path: str = PRO_COOKIE_PATH_DEFAULT, timeout: float = 90.0, ) -> httpx.Client: """Client for Pro source endpoints (pro.lceda.cn /api/v4/...). Requires logged-in cookie header at `cookie_path` (mode 600). The cookie file is a single Cookie header value (e.g. `lceda_pro_session=...; XSRF-TOKEN=...`). Per-request `path: ` header MUST be added by callers — see docs/sources/easyeda_pro_source.md §2.5. """ cookie = Path(cookie_path).read_text(encoding="utf-8").strip() return httpx.Client( http2=True, timeout=timeout, headers={ "User-Agent": BROWSER_UA, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Editor-Version": PRO_EDITOR_VERSION, "Referer": "https://pro.lceda.cn/editor", "Cookie": cookie, }, follow_redirects=False, ) def polite_sleep() -> None: time.sleep(SLEEP_BETWEEN) # --------------------------------------------------------------------------- # Listing # --------------------------------------------------------------------------- def list_projects( client: httpx.Client, page: int = 1, page_size: int = 30, sort: str = "hot", origin: str | None = None, ) -> dict: params: dict[str, object] = {"page": page, "pageSize": page_size, "sort": sort} if origin: params["origin"] = origin # 'std' or 'pro' — server-side filter r = client.get(API_LIST, params=params) r.raise_for_status() data = r.json() if not data.get("success"): raise RuntimeError(f"list API failed: {data}") return data["result"] def rank_score(item: dict) -> float: """Composite quality score: favor projects with broad engagement.""" # Listing API can omit zero-valued count fields (observed: low-activity # projects miss `like`, possibly others). Use .get with 0 default. c = item.get("count") or {} return ( c.get("like", 0) * 3 + c.get("star", 0) * 1 + c.get("fork", 0) * 2 + c.get("views", 0) / 100 + (item.get("comments_count") or 0) * 2 + (item.get("grade") or 0) * 50 ) def pick_top( items: list[dict], n: int, min_likes: int, min_grade: int, exclude_copies: bool = True, ) -> list[dict]: filtered = [] for it in items: if exclude_copies and "_copy" in it["path"]: continue if (it.get("count") or {}).get("like", 0) < min_likes: continue if (it.get("grade") or 0) < min_grade: continue filtered.append(it) filtered.sort(key=rank_score, reverse=True) return filtered[:n] # --------------------------------------------------------------------------- # Detail page parsing # --------------------------------------------------------------------------- RE_ATTACH_BLOCK = re.compile(r'\\"attachments\\":\[', re.DOTALL) RE_LICENSE = re.compile(r'\\"license\\":\\"([^\\"]+)\\"') RE_META_DESC = re.compile( r'([^<]+)", re.IGNORECASE) def _find_balanced_bracket(s: str, start: int, open_ch: str = "[", close_ch: str = "]") -> int: """Return index after the matching close bracket. start must point at open_ch.""" assert s[start] == open_ch depth = 0 for i in range(start, len(s)): ch = s[i] if ch == open_ch: depth += 1 elif ch == close_ch: depth -= 1 if depth == 0: return i + 1 raise ValueError("unbalanced") def parse_detail_html(h: str) -> dict: """Extract attachments, license, title, description from SSR HTML.""" out: dict = { "title": None, "description_meta": None, "license": None, "attachments": [], } m = RE_TITLE.search(h) if m: # HTML entities + suffix stripping title = _html.unescape(m.group(1)).strip() for sfx in ( " - 立创开源硬件平台 - 深圳创电优选科技有限公司", " - 立创开源硬件平台", ): if title.endswith(sfx): title = title[: -len(sfx)] out["title"] = title m = RE_META_DESC.search(h) if m: out["description_meta"] = _html.unescape(m.group(1)) m = RE_LICENSE.search(h) if m: out["license"] = m.group(1) m = RE_ATTACH_BLOCK.search(h) if m: arr_start = m.end() - 1 # point at '[' arr_end = _find_balanced_bracket(h, arr_start) block = h[arr_start:arr_end] clean = block.replace('\\"', '"').replace("\\\\", "\\") try: out["attachments"] = json.loads(clean) except json.JSONDecodeError as e: # Keep raw for debugging; skip attachments silently. Caller can log. out["_attachments_parse_error"] = str(e) return out # --------------------------------------------------------------------------- # Download helpers # --------------------------------------------------------------------------- def download_to( client: httpx.Client, url: str, dest: Path, *, max_seconds: float = 60.0 ) -> tuple[int, str]: """Stream-download url to dest. Returns (size, sha256). `max_seconds` caps total download walltime — defends against pathologically slow CDN connections (observed 10 KB/s on image.lceda.cn for one project, would have hung 6+ min on a 3.6 MB cover otherwise). httpx's default timeout resets per chunk, so relying on it for streaming is unsafe. We track wall time ourselves. """ dest.parent.mkdir(parents=True, exist_ok=True) h = hashlib.sha256() size = 0 t_start = time.monotonic() with client.stream("GET", url) as r: r.raise_for_status() with open(dest, "wb") as f: for chunk in r.iter_bytes(1 << 15): if time.monotonic() - t_start > max_seconds: raise httpx.ReadTimeout( f"download exceeded {max_seconds}s wall budget " f"(got {size} of {r.headers.get('content-length', '?')} bytes)" ) f.write(chunk) h.update(chunk) size += len(chunk) return size, h.hexdigest() # --------------------------------------------------------------------------- # Std source fetch (login NOT required for public projects — see # docs/sources/easyeda_std_source.md) # --------------------------------------------------------------------------- def fetch_std_source( source_client: httpx.Client, project_uuid: str, proj_dir: Path, sleep: float = SLEEP_SOURCE, ) -> dict: """Fetch EasyEDA Std project source (schematic + PCB dataStr) anonymously. Returns dict with keys: - source_format: "easyeda-std" - source_path: "source/" - source_documents: list of {doc_uuid, docType, master, path, size, sha256} - editor_version: from dataStr.head when available """ src_dir = proj_dir / "source" src_dir.mkdir(parents=True, exist_ok=True) # 1. Project meta → version_documents r = source_client.get(f"{API_PROJECT}/{project_uuid}") r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(f"oshwhub project meta failed: {j}") version_documents = j["result"].get("version_documents") or [] time.sleep(sleep) # 2. Per document → dataStr doc_metas: list[dict] = [] editor_version: str | None = None for vd in version_documents: doc_uuid = vd["uuid"] master = vd.get("master") doc_type = vd.get("docType") url = f"{LCEDA_DOC_API}/{doc_uuid}" r2 = source_client.get(url, params={"uuid": doc_uuid, "path": doc_uuid}) r2.raise_for_status() # Server returns text/html mistakenly; body is JSON regardless. try: body_json = r2.json() except Exception as e: # noqa: BLE001 raise RuntimeError(f"doc {doc_uuid} non-JSON response: {e}; head={r2.text[:200]!r}") if not body_json.get("success"): raise RuntimeError(f"doc {doc_uuid} response not success: {body_json}") local_rel = f"source/{doc_uuid}.json" local_path = proj_dir / local_rel text = json.dumps(body_json, ensure_ascii=False, separators=(",", ":")) local_path.write_text(text, encoding="utf-8") size = local_path.stat().st_size sha = hashlib.sha256(text.encode("utf-8")).hexdigest() # Pull editor version from the dataStr.head if present. ev = _extract_editor_version(body_json) if ev and not editor_version: editor_version = ev doc_metas.append({ "doc_uuid": doc_uuid, "docType": doc_type, "master": master, "path": local_rel, "size": size, "sha256": sha, }) time.sleep(sleep) # 3. source/manifest.json — index + raw upstream version_documents for diffing manifest = { "project_uuid": project_uuid, "fetched_at": datetime.now(timezone.utc).isoformat(), "editor_version": editor_version, "documents": doc_metas, "upstream_version_documents": version_documents, } (src_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) return { "source_format": "easyeda-std", "source_path": "source/", "source_documents": doc_metas, "editor_version": editor_version, } def _extract_editor_version(body_json: dict) -> str | None: """Best-effort: pull head.editorVersion from dataStr (location varies by docType).""" res = body_json.get("result") or {} # PCB shape: result.dataStr at top ds = res.get("dataStr") if isinstance(ds, dict): head = ds.get("head") or {} if isinstance(head, dict) and head.get("editorVersion"): return str(head["editorVersion"]) # Schematic shape: result.schematics[*].dataStr for sch in (res.get("schematics") or []): if isinstance(sch, dict): ds2 = sch.get("dataStr") or {} if isinstance(ds2, dict): head = ds2.get("head") or {} if isinstance(head, dict) and head.get("editorVersion"): return str(head["editorVersion"]) return None # --------------------------------------------------------------------------- # Pro source fetch (pro.lceda.cn — EPRO2 message stream, AES-128-GCM) # See docs/sources/easyeda_pro_source.md. # --------------------------------------------------------------------------- def _pro_get_json( client: httpx.Client, url: str, project_uuid: str, *, params: dict | None = None, ) -> dict | list: """GET a pro.lceda.cn /api/v4 endpoint with the per-project `path` header. Raises if the JSON envelope's `success` is False; returns `result`. """ r = client.get(url, params=params, headers={"path": project_uuid}) r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(f"Pro API failed (url={url}): {j}") return j["result"] def _order_history_chain(chain: list[dict]) -> list[dict]: """Return the chain ordered root→HEAD by walking parent links. Pro returns the chain HEAD-first as a flat list with `parent` links. We walk from the unique root forward. """ by_uuid = {h["uuid"]: h for h in chain} roots = [h for h in chain if h.get("parent") not in by_uuid] if len(roots) != 1: raise RuntimeError( f"history chain has {len(roots)} roots; not strictly linear" ) children: dict[str | None, list[dict]] = {} for h in chain: children.setdefault(h.get("parent"), []).append(h) ordered: list[dict] = [] cur: dict | None = roots[0] while cur is not None: ordered.append(cur) nexts = children.get(cur["uuid"], []) if len(nexts) > 1: raise RuntimeError( f"history chain not linear at {cur['uuid']!r}: {len(nexts)} children" ) cur = nexts[0] if nexts else None if len(ordered) != len(chain): raise RuntimeError( f"reconstructed {len(ordered)} of {len(chain)} histories; chain has cycles or orphans" ) return ordered class ProjectOversizeError(Exception): """Raised when a Pro project's chain replay would exceed the configured cap. `cap_mb` is the trip threshold; `bytes_so_far` is the *encrypted blob* total accumulated when we tripped (pre-decompression, pre-partition). """ def __init__(self, bytes_so_far: int, cap_mb: int) -> None: super().__init__( f"oversize: blob bytes {bytes_so_far // 1024 // 1024} MB > cap {cap_mb} MB" ) self.bytes_so_far = bytes_so_far self.cap_mb = cap_mb def _record_oversize(out_root: Path, uuid: str, err: ProjectOversizeError) -> None: """Append one row to data/state/oshwhub_pro_oversize.jsonl for later review.""" state_path = Path("data/state/oshwhub_pro_oversize.jsonl") state_path.parent.mkdir(parents=True, exist_ok=True) row = { "uuid": uuid, "out_root": str(out_root), "bytes_so_far": err.bytes_so_far, "cap_mb": err.cap_mb, "ts": datetime.now(timezone.utc).isoformat(), } with state_path.open("a") as f: f.write(json.dumps(row, ensure_ascii=False) + "\n") def fetch_pro_source( pro_client: httpx.Client, project_uuid: str, proj_dir: Path, sleep: float = SLEEP_PRO, max_source_mb: int | None = None, ) -> dict: """Dispatcher: pick modern (3.x branch+EPRO2) vs legacy (2.x v2/documents/lists) based on whether project meta contains a non-null branch_uuid. Pro 3.x stores in git-style branch+history with AES-encrypted EPRO2 streams; Pro 2.x predates that and uses Std-style per-doc dataStr served from /api/v2/documents/lists. See docs/sources/easyeda_pro_source.md §1.1. `max_source_mb` only gates modern-path projects (legacy is always tiny: <2 MB in our 2/2 sample) and trips before any blob is written to disk past the cap. """ proj = _pro_get_json(pro_client, f"{PRO_API}/projects/{project_uuid}", project_uuid) time.sleep(sleep) if proj.get("branch_uuid"): return _fetch_pro_modern( pro_client, project_uuid, proj, proj_dir, sleep, max_source_mb=max_source_mb ) return _fetch_pro_legacy(pro_client, project_uuid, proj, proj_dir, sleep) def _fetch_pro_modern( pro_client: httpx.Client, project_uuid: str, proj: dict, proj_dir: Path, sleep: float = SLEEP_PRO, max_source_mb: int | None = None, ) -> dict: """Modern Pro 3.x fetcher: full history chain, AES-GCM decrypted, gunzipped, and partitioned into per-document EPRO2 streams. Side effects under ``proj_dir``: - source/structure.json — project document tree (boards/schematics/sheets/pcbs/...) - source/.epro2 — one file per document, raw EPRO2 messages (newline-separated) - source/manifest.json — per-doc index + chain summary """ import gzip from collections import OrderedDict from Crypto.Cipher import AES # local import: cheap if pycryptodome already loaded src_dir = proj_dir / "source" src_dir.mkdir(parents=True, exist_ok=True) branch_uuid = proj["branch_uuid"] project_editor_version = proj.get("editorVersion") # 2. branch meta -> head history_uuid branch = _pro_get_json( pro_client, f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}", project_uuid, ) head_uuid = branch.get("history_uuid") if not head_uuid: raise RuntimeError(f"no history_uuid (HEAD) on branch {branch_uuid}") time.sleep(sleep) # 3. structure tree st = _pro_get_json( pro_client, f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/structures", project_uuid, ) raw_structure = st.get("structure") structure = json.loads(raw_structure) if isinstance(raw_structure, str) else raw_structure (src_dir / "structure.json").write_text( json.dumps(structure, ensure_ascii=False, indent=2), encoding="utf-8" ) time.sleep(sleep) # 4. history chain — single endpoint returns full chain (HAR-confirmed 2026-04-28) chain = _pro_get_json( pro_client, f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/histories/{head_uuid}", project_uuid, ) if not isinstance(chain, list) or not chain: raise RuntimeError(f"unexpected histories response: {type(chain).__name__}") ordered = _order_history_chain(chain) time.sleep(sleep) # 5. download + decrypt + gunzip + partition by DOCHEAD docs: OrderedDict[str, dict] = OrderedDict() cur_doc: str | None = None bytes_blob_total = 0 bytes_plain_total = 0 cap_bytes = max_source_mb * 1024 * 1024 if max_source_mb is not None else None for h in ordered: blob_r = pro_client.get(h["dataStrUrl"], headers={"path": project_uuid}) blob_r.raise_for_status() blob = blob_r.content bytes_blob_total += len(blob) # Trip cap on the encrypted blob total. Hits *after* the offending # download, but before we decrypt/gunzip/partition (those scale with # plain bytes which is even larger). Wipe any partial source/ so disk # doesn't accumulate junk on multi-project runs. if cap_bytes is not None and bytes_blob_total > cap_bytes: shutil.rmtree(src_dir, ignore_errors=True) raise ProjectOversizeError(bytes_blob_total, max_source_mb) if len(blob) < 16: raise RuntimeError(f"history {h['uuid']} blob too short ({len(blob)} B)") ct, tag = blob[:-16], blob[-16:] cipher = AES.new( bytes.fromhex(h["key"]), AES.MODE_GCM, nonce=bytes.fromhex(h["iv"]), ) gz = cipher.decrypt_and_verify(ct, tag) plain = gzip.decompress(gz) bytes_plain_total += len(plain) for ln in plain.split(b"\n"): if not ln.strip(): continue # EPRO2 lines use `||` as field separator and terminate with a single # `|`. Strip the trailing `|` first so each part parses as bare JSON. stripped = ln.rstrip(b"|") parts = stripped.split(b"||") try: head_msg = json.loads(parts[0]) except Exception: # noqa: BLE001 — malformed head; skip entire line continue if head_msg.get("type") == "DOCHEAD" and len(parts) >= 2: try: payload = json.loads(parts[1]) except Exception: # noqa: BLE001 payload = {} new_doc = payload.get("uuid") if new_doc: cur_doc = new_doc if cur_doc not in docs: docs[cur_doc] = { "lines": [], "doc_head": payload, } if cur_doc and cur_doc in docs: docs[cur_doc]["lines"].append(ln) # CDN host, not the rate-sensitive API host — see SLEEP_PRO_CDN comment. time.sleep(SLEEP_PRO_CDN) # 6. write per-doc .epro2 + manifest doc_metas: list[dict] = [] editor_version: str | None = None for doc_uuid, info in docs.items(): body = b"\n".join(info["lines"]) + b"\n" local_rel = f"source/{doc_uuid}.epro2" local_path = proj_dir / local_rel local_path.write_bytes(body) size = len(body) sha = hashlib.sha256(body).hexdigest() head = info["doc_head"] ev = head.get("editVersion") or head.get("editorVersion") if ev and not editor_version: editor_version = str(ev) doc_metas.append({ "doc_uuid": doc_uuid, "docType": head.get("docType"), # "BOARD" / "PCB" / "SCH" / "SCH_PAGE" / "SYMBOL" / ... "path": local_rel, "size": size, "sha256": sha, "message_count": len(info["lines"]), }) # editor_version fallback: project meta if no DOCHEAD payload had it if not editor_version and project_editor_version: editor_version = str(project_editor_version) structure_summary: dict[str, int] = {} if isinstance(structure, dict): for k, v in structure.items(): if isinstance(v, dict): structure_summary[k] = len(v) elif isinstance(v, list): structure_summary[k] = len(v) manifest = { "project_uuid": project_uuid, "branch_uuid": branch_uuid, "head_uuid": head_uuid, "fetched_at": datetime.now(timezone.utc).isoformat(), "editor_version": editor_version, "chain_length": len(chain), "blob_bytes_total": bytes_blob_total, "plain_bytes_total": bytes_plain_total, "documents": doc_metas, "structure_summary": structure_summary, } (src_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) return { "source_format": "easyeda-pro", "source_path": "source/", "source_documents": doc_metas, "editor_version": editor_version, } def _pro_post_json( client: httpx.Client, url: str, project_uuid: str, body: dict, ) -> object: """POST a pro.lceda.cn endpoint with `path` header, validate envelope.""" r = client.post( url, json=body, headers={"path": project_uuid, "Content-Type": "application/json"}, ) r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(f"Pro API failed (POST {url}): {j}") return j["result"] def _fetch_pro_legacy( pro_client: httpx.Client, project_uuid: str, proj: dict, proj_dir: Path, sleep: float = SLEEP_PRO, ) -> dict: """Legacy Pro 2.x fetcher: project meta has `boards: [{sch, pcb, name}]` and no branch model. Documents are fetched via `/api/v2/documents/lists` (Std-style plaintext dataStr); resources/coppers/textpath/blobs come from supplementary POST endpoints. Reverse-engineered from HAR `tmp/prodownload3.har` (2026-04-28); see docs/sources/easyeda_pro_source.md §1.1. Side effects under ``proj_dir``: - source/ticket.json — full project manifest (counts of all asset types) - source/.json — schematic sheet content (docType=1) - source/pcb_.json — PCB content (docType=3) - source/coppers.json — copper pour data (if any) - source/textpath.json — text path / font data (if any) - source/blobs.json — embedded image blobs (if any) - source/manifest.json — index across all of the above """ src_dir = proj_dir / "source" src_dir.mkdir(parents=True, exist_ok=True) boards = proj.get("boards") or [] if not boards: raise RuntimeError(f"legacy project {project_uuid} has no boards[] in meta") project_editor_version = proj.get("editorVersion") # 1. ticket — full manifest (counts of every asset type the project owns) ticket = pro_client.get( f"https://pro.lceda.cn/api/projects/{project_uuid}/ticket", params={"uuid": project_uuid, "g_ticket": "-1"}, headers={"path": project_uuid}, ) ticket.raise_for_status() ticket_j = ticket.json() if not ticket_j.get("success"): raise RuntimeError(f"ticket endpoint failed: {ticket_j}") manifest_ticket = ticket_j["result"] (src_dir / "ticket.json").write_text( json.dumps(manifest_ticket, ensure_ascii=False, indent=2), encoding="utf-8" ) time.sleep(sleep) doc_metas: list[dict] = [] # 2. schematic containers -> sheet UUIDs via /api/schematic/lists # Filter `boards[].sch` against `ticket.schematics` keys: a board may # reference a deprecated/deleted sch (e.g. "主控板V1(废弃)") whose # UUID is gone from the schematics dict. Asking schematic/lists for it # returns 401 and aborts the whole batch. Skip it. valid_sch_uuids = set((manifest_ticket.get("schematics") or {}).keys()) sch_container_uuids = [ b["sch"] for b in boards if b.get("sch") and b["sch"] in valid_sch_uuids ] sheet_uuids: list[str] = [] if sch_container_uuids: containers = _pro_post_json( pro_client, "https://pro.lceda.cn/api/schematic/lists", project_uuid, {"uuids": sch_container_uuids}, ) if isinstance(containers, list): for c in containers: for s in c.get("sort") or []: su = s.get("uuid") if su: sheet_uuids.append(su) time.sleep(sleep) # 3. schematic sheets via documents/lists docType=1 (plaintext dataStr per sheet) if sheet_uuids: sheets = _pro_post_json( pro_client, "https://pro.lceda.cn/api/v2/documents/lists", project_uuid, {"uuids": sheet_uuids, "docType": 1}, ) for s in (sheets or []): doc_uuid = s["uuid"] local_rel = f"source/{doc_uuid}.json" text = json.dumps(s, ensure_ascii=False, separators=(",", ":")) (proj_dir / local_rel).write_text(text, encoding="utf-8") doc_metas.append({ "doc_uuid": doc_uuid, "docType": 1, "path": local_rel, "size": len(text.encode("utf-8")), "sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(), }) time.sleep(sleep) # 4. PCB documents via documents/lists docType=3 # Same deprecated-uuid risk as #2 — filter against ticket.pcbs keys. valid_pcb_uuids = set((manifest_ticket.get("pcbs") or {}).keys()) pcb_uuids = [ b["pcb"] for b in boards if b.get("pcb") and b["pcb"] in valid_pcb_uuids ] if pcb_uuids: pcbs = _pro_post_json( pro_client, "https://pro.lceda.cn/api/v2/documents/lists", project_uuid, {"uuids": pcb_uuids, "docType": 3}, ) for p in (pcbs or []): doc_uuid = p["uuid"] local_rel = f"source/pcb_{doc_uuid}.json" text = json.dumps(p, ensure_ascii=False, separators=(",", ":")) (proj_dir / local_rel).write_text(text, encoding="utf-8") doc_metas.append({ "doc_uuid": doc_uuid, "docType": 3, "path": local_rel, "size": len(text.encode("utf-8")), "sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(), }) time.sleep(sleep) # 5. supplementary PCB layer assets — coppers / textpath / resources (blobs) aux: dict[str, object] = {} copper_paths = list((manifest_ticket.get("coppers") or {}).keys()) if copper_paths: coppers = _pro_post_json( pro_client, "https://pro.lceda.cn/api/coppers/search", project_uuid, {"paths": copper_paths}, ) (src_dir / "coppers.json").write_text( json.dumps(coppers, ensure_ascii=False), encoding="utf-8" ) aux["coppers_count"] = len(coppers) if isinstance(coppers, list) else 0 time.sleep(sleep) textpath_paths = list((manifest_ticket.get("textpath") or {}).keys()) if textpath_paths: textpath = _pro_post_json( pro_client, "https://pro.lceda.cn/api/textpath/search", project_uuid, { "paths": textpath_paths, "project_uuid": project_uuid, "path": project_uuid, }, ) (src_dir / "textpath.json").write_text( json.dumps(textpath, ensure_ascii=False), encoding="utf-8" ) aux["textpath_count"] = len(textpath) if isinstance(textpath, list) else 0 time.sleep(sleep) blob_hashes = list((manifest_ticket.get("blobs") or {}).keys()) if blob_hashes: blobs = _pro_post_json( pro_client, "https://pro.lceda.cn/api/v2/resources/search", project_uuid, {"hash": blob_hashes, "project_uuid": project_uuid}, ) (src_dir / "blobs.json").write_text( json.dumps(blobs, ensure_ascii=False), encoding="utf-8" ) aux["blobs_count"] = len(blobs) if isinstance(blobs, list) else 0 time.sleep(sleep) # 6. manifest.json — overall index structure_summary = { "boards": len(boards), "schematic_containers": len(sch_container_uuids), "schematic_sheets": len(sheet_uuids), "pcbs": len(pcb_uuids), "symbols": len(manifest_ticket.get("symbols") or {}), "footprints": len(manifest_ticket.get("footprints") or {}), "devices": len(manifest_ticket.get("devices") or {}), "coppers": len(manifest_ticket.get("coppers") or {}), "textpath": len(manifest_ticket.get("textpath") or {}), "blobs": len(manifest_ticket.get("blobs") or {}), } manifest = { "project_uuid": project_uuid, "fetched_at": datetime.now(timezone.utc).isoformat(), "editor_version": project_editor_version, "boards": boards, "documents": doc_metas, "structure_summary": structure_summary, "aux": aux, } (src_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) return { "source_format": "easyeda-pro-legacy", "source_path": "source/", "source_documents": doc_metas, "editor_version": project_editor_version, } # --------------------------------------------------------------------------- # Single-project crawl # --------------------------------------------------------------------------- @dc.dataclass class CrawlResult: project_id: str out_dir: Path files_count: int bytes_total: int skipped_files: list[str] def crawl_one( client: httpx.Client, list_item: dict, out_root: Path, fetch_files: bool = True, fetch_cover: bool = True, source_client: httpx.Client | None = None, pro_source_client: httpx.Client | None = None, skip_exts: set[str] | None = None, max_source_mb: int | None = None, ) -> CrawlResult: uuid = list_item["uuid"] path = list_item["path"] proj_dir = out_root / uuid proj_dir.mkdir(parents=True, exist_ok=True) # 1. Fetch detail HTML. # No polite_sleep after — the next call goes to image.lceda.cn (cover) # or to attachment CDN, both different hosts from the detail server. # Sleep is used to space hits to the *same* host before the next iteration. detail_url = f"{BASE}/{path}" r = client.get(detail_url) r.raise_for_status() detail = parse_detail_html(r.text) # 2. Cover image (skipped via fetch_cover=False — useful for scan-only modes # like Step 1 of batch ingest where we only want license/meta). thumb_url = list_item["thumb"] if thumb_url.startswith("//"): thumb_url = "https:" + thumb_url cover_rel = None if thumb_url and fetch_cover: ext = Path(urllib.parse.urlparse(thumb_url).path).suffix or ".jpg" cover_rel = f"cover{ext}" try: # Cover thumbs should be small (~100-300 KB). Cap walltime at 15s # so a pathologically slow CDN connection can't hang the loop. download_to(client, thumb_url, proj_dir / cover_rel, max_seconds=15.0) except httpx.HTTPError as e: print(f" cover failed: {e}", file=sys.stderr) cover_rel = None polite_sleep() # 3. Description markdown (combine meta + introduction) desc_md_parts = [f"# {list_item['name']}\n"] if detail.get("description_meta"): desc_md_parts.append(detail["description_meta"].strip()) elif list_item.get("introduction"): desc_md_parts.append(list_item["introduction"].strip()) desc_md_parts.append( f"\n---\n" f"- Source: {detail_url}\n" f"- Author: {list_item['owner'].get('nickname')} " f"({list_item['owner'].get('username')})\n" f"- License: {detail.get('license') or 'unknown'}\n" f"- Published: {list_item.get('oshwhub_publish_at')}\n" ) (proj_dir / "description.md").write_text("\n".join(desc_md_parts), encoding="utf-8") # 4. Files files_meta: list[dict] = [] skipped: list[str] = [] bytes_total = 0 for a in detail.get("attachments", []): src = a.get("src") or "" if not src: continue file_url = IMG_CDN + src if src.startswith("/") else src name = a.get("name") or Path(src).name safe_name = re.sub(r'[/\\:*?"<>|]', "_", name) local_rel = f"files/{safe_name}" local_path = proj_dir / local_rel entry: dict = { "name": name, "url": file_url, "original_id": a.get("uuid"), "ext": a.get("ext"), "mime": a.get("mime"), "size": a.get("size"), "md5": a.get("md5"), } # ext gate: declared `ext` first, fall back to filename suffix. Lower-case # compare; entry kept in metadata so we can re-fetch later if policy changes. ext_token = (a.get("ext") or Path(safe_name).suffix.lstrip(".")).lower() if skip_exts and ext_token in skip_exts: entry["skipped"] = f"ext:{ext_token}" skipped.append(f"{name}: ext:{ext_token}") files_meta.append(entry) continue if fetch_files: try: size, sha = download_to(client, file_url, local_path) entry["path"] = local_rel entry["sha256"] = sha if entry.get("size") and entry["size"] != size: entry["size_actual"] = size else: entry["size"] = size bytes_total += size except httpx.HTTPError as e: skipped.append(f"{name}: {e}") print(f" file skipped {name}: {e}", file=sys.stderr) polite_sleep() files_meta.append(entry) # 5. URL manifest (for files we couldn't download or for future re-download) urls_manifest = { "detail_url": detail_url, "cover_url": thumb_url, "attachments": [ {"name": f["name"], "url": f["url"], "original_id": f.get("original_id")} for f in files_meta ], } (proj_dir / "_urls.json").write_text( json.dumps(urls_manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) # 6. Optional: EasyEDA project source — dispatch on origin (std vs pro) src_meta: dict = {} origin = list_item.get("origin") if origin == "pro" and pro_source_client is not None: try: src_meta = fetch_pro_source( pro_source_client, uuid, proj_dir, max_source_mb=max_source_mb ) print( f" pro source: {len(src_meta.get('source_documents', []))} docs, " f"editor={src_meta.get('editor_version')}" ) except ProjectOversizeError as e: print(f" pro source SKIPPED (oversize): {e}", file=sys.stderr) skipped.append(f"pro_source: oversize ({e.bytes_so_far // 1024 // 1024} MB > {e.cap_mb} MB)") _record_oversize(out_root, uuid, e) except Exception as e: # noqa: BLE001 print(f" pro source FAIL: {e}", file=sys.stderr) skipped.append(f"pro_source: {e}") elif origin != "pro" and source_client is not None: try: src_meta = fetch_std_source(source_client, uuid, proj_dir) print( f" source: {len(src_meta.get('source_documents', []))} docs, " f"editor={src_meta.get('editor_version')}" ) except Exception as e: # noqa: BLE001 print(f" source FAIL: {e}", file=sys.stderr) skipped.append(f"source: {e}") # 7. Unified metadata meta = { "source": "oshwhub", "source_url": detail_url, "project_id": uuid, "title": detail.get("title") or list_item["name"], "description_short": list_item.get("introduction") or "", "description_path": "description.md", "author": { "username": list_item["owner"]["username"], "display_name": list_item["owner"].get("nickname"), "user_id": list_item["owner"].get("uuid"), }, "license": detail.get("license") or "unknown", "tags": list_item.get("tags") or [], "created_at": list_item.get("created_at"), "updated_at": list_item.get("updated_at"), "published_at": list_item.get("oshwhub_publish_at"), "crawled_at": datetime.now(timezone.utc).isoformat(), "metrics": { "likes": (list_item.get("count") or {}).get("like", 0), "stars": (list_item.get("count") or {}).get("star", 0), "forks": (list_item.get("count") or {}).get("fork", 0), "views": (list_item.get("count") or {}).get("views", 0), "watch": (list_item.get("count") or {}).get("watch", 0), "comments": list_item.get("comments_count", 0), }, "cover": {"url": thumb_url, "path": cover_rel} if thumb_url else None, "files": files_meta, "raw_fields": { "path": list_item["path"], "grade": list_item.get("grade"), "origin": list_item.get("origin"), "public": list_item.get("public"), "publish": list_item.get("publish"), "skipped_files": skipped, }, } if src_meta: meta["source_format"] = src_meta["source_format"] meta["source_path"] = src_meta["source_path"] meta["source_documents"] = src_meta["source_documents"] if src_meta.get("editor_version"): meta["editor_version"] = src_meta["editor_version"] (proj_dir / "metadata.json").write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" ) return CrawlResult( project_id=uuid, out_dir=proj_dir, files_count=len(files_meta), bytes_total=bytes_total, skipped_files=skipped, ) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def iter_candidates( client: httpx.Client, pages: int, page_size: int, sort: str, origin: str | None = None, ) -> Iterator[dict]: for p in range(1, pages + 1): res = list_projects(client, page=p, page_size=page_size, sort=sort, origin=origin) for it in res["lists"]: yield it polite_sleep() def main(argv: list[str] | None = None) -> int: ap = argparse.ArgumentParser(description="oshwhub MVP crawler") ap.add_argument("--out", type=Path, default=Path("data/raw/oshwhub")) ap.add_argument("--top", type=int, default=10, help="number of projects to crawl") ap.add_argument("--min-likes", type=int, default=50) ap.add_argument("--min-grade", type=int, default=4) ap.add_argument("--pages", type=int, default=3, help="list API pages to scan") ap.add_argument("--page-size", type=int, default=30) ap.add_argument("--sort", default="hot") ap.add_argument( "--origin", choices=["std", "pro"], default=None, help="filter listing API by origin (server-side); needed to find Pro projects in top-N", ) ap.add_argument("--uuids", type=str, default=None, help="comma-separated explicit UUID list") ap.add_argument("--no-files", action="store_true", help="do not download attachments") ap.add_argument("--limit", type=int, default=None, help="override --top, same effect") ap.add_argument( "--with-source", action="store_true", help="also fetch EasyEDA Std project source (schematic + PCB dataStr) per project", ) ap.add_argument( "--backfill-source", action="store_true", help="skip listing/HTML/attachments; only fetch source for projects already in --out", ) ap.add_argument( "--with-pro-source", action="store_true", help="also fetch EasyEDA Pro project source (full history chain, EPRO2 streams) per project", ) ap.add_argument( "--backfill-pro-source", action="store_true", help="skip listing; only fetch Pro source for origin=pro projects already in --out", ) ap.add_argument( "--pro-cookie", type=str, default=PRO_COOKIE_PATH_DEFAULT, help="path to file with Cookie header for pro.lceda.cn", ) ap.add_argument( "--skip-ext", type=str, default=None, help="comma-separated list of attachment extensions to skip (e.g. mp4,qt,mov). " "Saves ~30-50%% LFS storage on average. Entry still recorded in metadata.json " "with skipped:ext: so we can re-fetch later.", ) ap.add_argument( "--max-source-mb", type=int, default=None, help="skip Pro modern projects whose chain blob total exceeds N MB. " "Trips inside the chain loop, wipes partial source/, records to " "data/state/oshwhub_pro_oversize.jsonl. No effect on Std or Pro 2.x legacy.", ) ap.add_argument( "--from-jsonl", type=Path, default=None, help="read pre-selected listing items from a jsonl (one item per line, " "shape matches /api/project listing entries). Bypasses the listing " "API entirely — useful when candidates were chosen offline from the " "full-corpus index, where most aren't in the default top-N pages.", ) ap.add_argument( "--no-cover", action="store_true", help="skip cover image download. For scan-only runs (license / meta scrape) " "this drops ~1.3s/project and avoids slow-CDN hangs.", ) ap.add_argument( "--concurrency", type=int, default=1, help="number of parallel crawl_one workers (ThreadPoolExecutor). Default 1 " "= serial. Anonymous endpoints (oshwhub.com detail/listing) tolerate " "concurrency 5+ comfortably; only enable when no auth is involved or " "you've probed the host.", ) args = ap.parse_args(argv) skip_exts: set[str] | None = ( {x.strip().lower().lstrip(".") for x in args.skip_ext.split(",") if x.strip()} if args.skip_ext else None ) n_target = args.limit if args.limit is not None else args.top args.out.mkdir(parents=True, exist_ok=True) # --backfill-source: standalone path that scans existing project dirs and # only fetches source. No listing/HTML/attachment work. if args.backfill_source: return _run_backfill_source( args.out, only_uuids=args.uuids, concurrency=args.concurrency ) if args.backfill_pro_source: return _run_backfill_pro_source( args.out, only_uuids=args.uuids, cookie_path=args.pro_cookie, max_source_mb=args.max_source_mb, concurrency=args.concurrency, ) with make_client() as client: # Build list of items to crawl if args.from_jsonl: items = [json.loads(ln) for ln in args.from_jsonl.read_text().splitlines() if ln.strip()] if args.uuids: wanted = set(args.uuids.split(",")) items = [i for i in items if i.get("uuid") in wanted] print(f"loaded {len(items)} items from {args.from_jsonl}") elif args.uuids: wanted = set(args.uuids.split(",")) items: list[dict] = [] for it in iter_candidates( client, args.pages, args.page_size, args.sort, origin=args.origin ): if it["uuid"] in wanted: items.append(it) if len(items) == len(wanted): break if len(items) < len(wanted): missing = wanted - {i["uuid"] for i in items} print(f"WARN: missing uuids (not in top pages): {missing}", file=sys.stderr) else: pool = list( iter_candidates( client, args.pages, args.page_size, args.sort, origin=args.origin ) ) items = pick_top( pool, n=n_target, min_likes=args.min_likes, min_grade=args.min_grade ) if len(items) < n_target: print( f"WARN: only {len(items)} items passed filters " f"(wanted {n_target})", file=sys.stderr, ) source_client_ctx = make_source_client() if args.with_source else None pro_source_client_ctx = ( make_pro_source_client(args.pro_cookie) if args.with_pro_source else None ) try: print(f"Crawling {len(items)} projects -> {args.out} (concurrency={args.concurrency})") print_lock = threading.Lock() def _do_one(i: int, it: dict) -> None: try: r = crawl_one( client, it, args.out, fetch_files=not args.no_files, fetch_cover=not args.no_cover, source_client=source_client_ctx, pro_source_client=pro_source_client_ctx, skip_exts=skip_exts, max_source_mb=args.max_source_mb, ) with print_lock: print( f"[{i}/{len(items)}] OK {it['path']}: " f"{r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB " f"(skipped: {len(r.skipped_files)})" ) except Exception as e: # noqa: BLE001 with print_lock: print(f"[{i}/{len(items)}] FAIL {it['path']}: {e}", file=sys.stderr) if args.concurrency <= 1: for i, it in enumerate(items, 1): _do_one(i, it) else: from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=args.concurrency) as pool: futs = [pool.submit(_do_one, i, it) for i, it in enumerate(items, 1)] for f in as_completed(futs): f.result() finally: if source_client_ctx is not None: source_client_ctx.close() if pro_source_client_ctx is not None: pro_source_client_ctx.close() return 0 def _run_backfill_concurrent( targets: list[Path], fetch_one, *, concurrency: int, label: str, size_unit: str = "MB", ) -> None: """Shared worker dispatch for std/pro backfill. `fetch_one(proj_dir, meta) -> (status, src_meta_or_msg)` where status is one of: "ok" — `src_meta` is the fetch_*_source dict "oversize" — Pro-only; ProjectOversizeError already recorded by caller "fail" — `src_meta_or_msg` is the human-readable error string Threads share fetch_one's captured client (httpx.Client is sync-thread-safe). Print + per-target metadata writes are independent (each project owns its metadata.json), so no lock needed there. Only stdout ordering is serialized. """ print_lock = threading.Lock() def _do_one(idx: int, proj_dir: Path) -> None: uuid = proj_dir.name meta_path = proj_dir / "metadata.json" try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception as e: # noqa: BLE001 with print_lock: print(f"[{idx}/{len(targets)}] meta read FAIL {uuid}: {e}", file=sys.stderr) return title = meta.get("title", "?") status, payload = fetch_one(proj_dir, meta) with print_lock: head = f"[{idx}/{len(targets)}] {uuid}" if status == "ok": src_meta = payload meta["source_format"] = src_meta["source_format"] meta["source_path"] = src_meta["source_path"] meta["source_documents"] = src_meta["source_documents"] if src_meta.get("editor_version"): meta["editor_version"] = src_meta["editor_version"] meta_path.write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" ) docs = src_meta["source_documents"] total = sum(d["size"] for d in docs) if size_unit == "KB": sz = f"{total / 1024:.1f} KB" else: sz = f"{total / 1024 / 1024:.1f} MB" print( f"{head} OK ({title[:30]}): {len(docs)} docs, {sz}, " f"editor={src_meta.get('editor_version')}" ) elif status == "oversize": print(f"{head} SKIPPED oversize ({title[:30]}): {payload}", file=sys.stderr) else: print(f"{head} FAIL ({title[:30]}): {payload}", file=sys.stderr) print(f"Backfill {label} for {len(targets)} projects (concurrency={concurrency})") if concurrency <= 1: for i, d in enumerate(targets, 1): _do_one(i, d) else: from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=concurrency) as pool: futs = [pool.submit(_do_one, i, d) for i, d in enumerate(targets, 1)] for f in as_completed(futs): f.result() def _discover_backfill_targets( out_root: Path, only_uuids: str | None, *, require_origin_pro: bool = False, ) -> list[Path]: """Walk out_root for per-project dirs that have metadata.json and match the filter. If `only_uuids` is given, that's the authoritative whitelist; otherwise (Pro only) we fall back to filtering by metadata.raw_fields.origin.""" wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None targets: list[Path] = [] for d in sorted(out_root.iterdir()): if not d.is_dir() or not (d / "metadata.json").exists(): continue if wanted is not None: if d.name not in wanted: continue elif require_origin_pro: try: m = json.loads((d / "metadata.json").read_text(encoding="utf-8")) except Exception: # noqa: BLE001 continue if (m.get("raw_fields") or {}).get("origin") != "pro": continue targets.append(d) return targets def _run_backfill_source( out_root: Path, only_uuids: str | None = None, concurrency: int = 1, ) -> int: """Walk per-project dirs and fetch Std source.json into each. Updates metadata.json in-place to add source_format / source_documents / editor_version. """ targets = _discover_backfill_targets(out_root, only_uuids) src_client = make_source_client() def fetch(proj_dir: Path, meta: dict): try: return ("ok", fetch_std_source(src_client, proj_dir.name, proj_dir)) except Exception as e: # noqa: BLE001 return ("fail", str(e)) try: _run_backfill_concurrent( targets, fetch, concurrency=concurrency, label="std source", size_unit="KB" ) finally: src_client.close() return 0 def _run_backfill_pro_source( out_root: Path, only_uuids: str | None = None, cookie_path: str = PRO_COOKIE_PATH_DEFAULT, max_source_mb: int | None = None, concurrency: int = 1, ) -> int: """Walk per-project dirs and fetch Pro source into each (modern + legacy).""" targets = _discover_backfill_targets( out_root, only_uuids, require_origin_pro=(only_uuids is None) ) pro_client = make_pro_source_client(cookie_path=cookie_path) # Multiple threads may trip oversize concurrently; serialize the state-file # append so lines don't interleave. oversize_lock = threading.Lock() def fetch(proj_dir: Path, meta: dict): uuid = proj_dir.name try: return ( "ok", fetch_pro_source( pro_client, uuid, proj_dir, max_source_mb=max_source_mb ), ) except ProjectOversizeError as e: with oversize_lock: _record_oversize(out_root, uuid, e) return ("oversize", str(e)) except Exception as e: # noqa: BLE001 return ("fail", str(e)) try: _run_backfill_concurrent( targets, fetch, concurrency=concurrency, label="pro source" ) finally: pro_client.close() return 0 if __name__ == "__main__": raise SystemExit(main())