"""oshwhub.com crawler — MVP. Usage: uv run python -m crawlers.oshwhub \ --out data/raw/oshwhub \ --top 10 --min-likes 50 --min-grade 4 Or with explicit UUID list: uv run python -m crawlers.oshwhub \ --uuids 298873b7fdbe44f8ba0e7351e023bc2c,7b6a398811f14eba9a952b8d2ddd7ace \ --out data/raw/oshwhub """ from __future__ import annotations import argparse import dataclasses as dc import hashlib import html as _html import json import re import shutil import sys import time import urllib.parse from datetime import datetime, timezone from pathlib import Path from typing import Iterator import httpx API_LIST = "https://oshwhub.com/api/project" API_PROJECT = "https://oshwhub.com/api/project" # /api/project/ for source flow BASE = "https://oshwhub.com" IMG_CDN = "https://image.lceda.cn" LCEDA_DOC_API = "https://lceda.cn/api/documents" PRO_API = "https://pro.lceda.cn/api/v4" PRO_EDITOR_VERSION = "3.2.127" PRO_COOKIE_PATH_DEFAULT = "/home/ubuntu/.secrets/pro-lceda-cookie-header.txt" UA = "FacereDataset/0.1 (+https://git.deepknow.site/Facere/FacereDataset)" # Std source endpoints reject FacereDataset UA on oshwhub /api/project; spoof browser UA only there. # See docs/sources/easyeda_std_source.md §3. BROWSER_UA = ( "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/147.0.0.0 Safari/537.36" ) SLEEP_BETWEEN = 2.0 # seconds between detail-page / file fetches SLEEP_SOURCE = 5.0 # source fetch is sensitive — QPS ≤ 0.2 per CLAUDE.md登录态 spirit SLEEP_PRO = 5.0 # Pro API host (pro.lceda.cn): rate-sensitive, keep at QPS ≤ 0.2 # CDN host (modules.lceda.cn) only serves AES-encrypted history blobs. # HAR analysis (proexportNew2.har 2026-04-29) shows the editor fires these # blobs back-to-back without throttling — the CDN can clearly take it. # Walltime for chain replay is dominated by this loop on multi-hundred-history # projects (X86 board: chain ≈ 700 → ~1h at 5s/req → ~few min at 0.2s/req). SLEEP_PRO_CDN = 0.2 # --------------------------------------------------------------------------- # HTTP # --------------------------------------------------------------------------- def make_client(timeout: float = 30.0) -> httpx.Client: return httpx.Client( http2=True, timeout=timeout, headers={"User-Agent": UA, "Accept": "text/html,application/json;q=0.9,*/*;q=0.8"}, follow_redirects=True, ) def make_source_client(timeout: float = 60.0) -> httpx.Client: """Client for Std source endpoints (lceda.cn/oshwhub.com /api/...). Uses browser UA + editor Referer to satisfy server-side UA filter. """ return httpx.Client( http2=True, timeout=timeout, headers={ "User-Agent": BROWSER_UA, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Referer": "https://lceda.cn/editor", }, follow_redirects=False, ) def make_pro_source_client( cookie_path: str = PRO_COOKIE_PATH_DEFAULT, timeout: float = 90.0, ) -> httpx.Client: """Client for Pro source endpoints (pro.lceda.cn /api/v4/...). Requires logged-in cookie header at `cookie_path` (mode 600). The cookie file is a single Cookie header value (e.g. `lceda_pro_session=...; XSRF-TOKEN=...`). Per-request `path: ` header MUST be added by callers — see docs/sources/easyeda_pro_source.md §2.5. """ cookie = Path(cookie_path).read_text(encoding="utf-8").strip() return httpx.Client( http2=True, timeout=timeout, headers={ "User-Agent": BROWSER_UA, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", "Editor-Version": PRO_EDITOR_VERSION, "Referer": "https://pro.lceda.cn/editor", "Cookie": cookie, }, follow_redirects=False, ) def polite_sleep() -> None: time.sleep(SLEEP_BETWEEN) # --------------------------------------------------------------------------- # Listing # --------------------------------------------------------------------------- def list_projects( client: httpx.Client, page: int = 1, page_size: int = 30, sort: str = "hot", origin: str | None = None, ) -> dict: params: dict[str, object] = {"page": page, "pageSize": page_size, "sort": sort} if origin: params["origin"] = origin # 'std' or 'pro' — server-side filter r = client.get(API_LIST, params=params) r.raise_for_status() data = r.json() if not data.get("success"): raise RuntimeError(f"list API failed: {data}") return data["result"] def rank_score(item: dict) -> float: """Composite quality score: favor projects with broad engagement.""" c = item["count"] return ( c["like"] * 3 + c["star"] * 1 + c["fork"] * 2 + c["views"] / 100 + item["comments_count"] * 2 + (item.get("grade") or 0) * 50 ) def pick_top( items: list[dict], n: int, min_likes: int, min_grade: int, exclude_copies: bool = True, ) -> list[dict]: filtered = [] for it in items: if exclude_copies and "_copy" in it["path"]: continue if it["count"]["like"] < min_likes: continue if (it.get("grade") or 0) < min_grade: continue filtered.append(it) filtered.sort(key=rank_score, reverse=True) return filtered[:n] # --------------------------------------------------------------------------- # Detail page parsing # --------------------------------------------------------------------------- RE_ATTACH_BLOCK = re.compile(r'\\"attachments\\":\[', re.DOTALL) RE_LICENSE = re.compile(r'\\"license\\":\\"([^\\"]+)\\"') RE_META_DESC = re.compile( r'([^<]+)", re.IGNORECASE) def _find_balanced_bracket(s: str, start: int, open_ch: str = "[", close_ch: str = "]") -> int: """Return index after the matching close bracket. start must point at open_ch.""" assert s[start] == open_ch depth = 0 for i in range(start, len(s)): ch = s[i] if ch == open_ch: depth += 1 elif ch == close_ch: depth -= 1 if depth == 0: return i + 1 raise ValueError("unbalanced") def parse_detail_html(h: str) -> dict: """Extract attachments, license, title, description from SSR HTML.""" out: dict = { "title": None, "description_meta": None, "license": None, "attachments": [], } m = RE_TITLE.search(h) if m: # HTML entities + suffix stripping title = _html.unescape(m.group(1)).strip() for sfx in ( " - 立创开源硬件平台 - 深圳创电优选科技有限公司", " - 立创开源硬件平台", ): if title.endswith(sfx): title = title[: -len(sfx)] out["title"] = title m = RE_META_DESC.search(h) if m: out["description_meta"] = _html.unescape(m.group(1)) m = RE_LICENSE.search(h) if m: out["license"] = m.group(1) m = RE_ATTACH_BLOCK.search(h) if m: arr_start = m.end() - 1 # point at '[' arr_end = _find_balanced_bracket(h, arr_start) block = h[arr_start:arr_end] clean = block.replace('\\"', '"').replace("\\\\", "\\") try: out["attachments"] = json.loads(clean) except json.JSONDecodeError as e: # Keep raw for debugging; skip attachments silently. Caller can log. out["_attachments_parse_error"] = str(e) return out # --------------------------------------------------------------------------- # Download helpers # --------------------------------------------------------------------------- def download_to(client: httpx.Client, url: str, dest: Path) -> tuple[int, str]: """Stream-download url to dest. Returns (size, sha256).""" dest.parent.mkdir(parents=True, exist_ok=True) h = hashlib.sha256() size = 0 with client.stream("GET", url) as r: r.raise_for_status() with open(dest, "wb") as f: for chunk in r.iter_bytes(1 << 15): f.write(chunk) h.update(chunk) size += len(chunk) return size, h.hexdigest() # --------------------------------------------------------------------------- # Std source fetch (login NOT required for public projects — see # docs/sources/easyeda_std_source.md) # --------------------------------------------------------------------------- def fetch_std_source( source_client: httpx.Client, project_uuid: str, proj_dir: Path, sleep: float = SLEEP_SOURCE, ) -> dict: """Fetch EasyEDA Std project source (schematic + PCB dataStr) anonymously. Returns dict with keys: - source_format: "easyeda-std" - source_path: "source/" - source_documents: list of {doc_uuid, docType, master, path, size, sha256} - editor_version: from dataStr.head when available """ src_dir = proj_dir / "source" src_dir.mkdir(parents=True, exist_ok=True) # 1. Project meta → version_documents r = source_client.get(f"{API_PROJECT}/{project_uuid}") r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(f"oshwhub project meta failed: {j}") version_documents = j["result"].get("version_documents") or [] time.sleep(sleep) # 2. Per document → dataStr doc_metas: list[dict] = [] editor_version: str | None = None for vd in version_documents: doc_uuid = vd["uuid"] master = vd.get("master") doc_type = vd.get("docType") url = f"{LCEDA_DOC_API}/{doc_uuid}" r2 = source_client.get(url, params={"uuid": doc_uuid, "path": doc_uuid}) r2.raise_for_status() # Server returns text/html mistakenly; body is JSON regardless. try: body_json = r2.json() except Exception as e: # noqa: BLE001 raise RuntimeError(f"doc {doc_uuid} non-JSON response: {e}; head={r2.text[:200]!r}") if not body_json.get("success"): raise RuntimeError(f"doc {doc_uuid} response not success: {body_json}") local_rel = f"source/{doc_uuid}.json" local_path = proj_dir / local_rel text = json.dumps(body_json, ensure_ascii=False, separators=(",", ":")) local_path.write_text(text, encoding="utf-8") size = local_path.stat().st_size sha = hashlib.sha256(text.encode("utf-8")).hexdigest() # Pull editor version from the dataStr.head if present. ev = _extract_editor_version(body_json) if ev and not editor_version: editor_version = ev doc_metas.append({ "doc_uuid": doc_uuid, "docType": doc_type, "master": master, "path": local_rel, "size": size, "sha256": sha, }) time.sleep(sleep) # 3. source/manifest.json — index + raw upstream version_documents for diffing manifest = { "project_uuid": project_uuid, "fetched_at": datetime.now(timezone.utc).isoformat(), "editor_version": editor_version, "documents": doc_metas, "upstream_version_documents": version_documents, } (src_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) return { "source_format": "easyeda-std", "source_path": "source/", "source_documents": doc_metas, "editor_version": editor_version, } def _extract_editor_version(body_json: dict) -> str | None: """Best-effort: pull head.editorVersion from dataStr (location varies by docType).""" res = body_json.get("result") or {} # PCB shape: result.dataStr at top ds = res.get("dataStr") if isinstance(ds, dict): head = ds.get("head") or {} if isinstance(head, dict) and head.get("editorVersion"): return str(head["editorVersion"]) # Schematic shape: result.schematics[*].dataStr for sch in (res.get("schematics") or []): if isinstance(sch, dict): ds2 = sch.get("dataStr") or {} if isinstance(ds2, dict): head = ds2.get("head") or {} if isinstance(head, dict) and head.get("editorVersion"): return str(head["editorVersion"]) return None # --------------------------------------------------------------------------- # Pro source fetch (pro.lceda.cn — EPRO2 message stream, AES-128-GCM) # See docs/sources/easyeda_pro_source.md. # --------------------------------------------------------------------------- def _pro_get_json( client: httpx.Client, url: str, project_uuid: str, *, params: dict | None = None, ) -> dict | list: """GET a pro.lceda.cn /api/v4 endpoint with the per-project `path` header. Raises if the JSON envelope's `success` is False; returns `result`. """ r = client.get(url, params=params, headers={"path": project_uuid}) r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(f"Pro API failed (url={url}): {j}") return j["result"] def _order_history_chain(chain: list[dict]) -> list[dict]: """Return the chain ordered root→HEAD by walking parent links. Pro returns the chain HEAD-first as a flat list with `parent` links. We walk from the unique root forward. """ by_uuid = {h["uuid"]: h for h in chain} roots = [h for h in chain if h.get("parent") not in by_uuid] if len(roots) != 1: raise RuntimeError( f"history chain has {len(roots)} roots; not strictly linear" ) children: dict[str | None, list[dict]] = {} for h in chain: children.setdefault(h.get("parent"), []).append(h) ordered: list[dict] = [] cur: dict | None = roots[0] while cur is not None: ordered.append(cur) nexts = children.get(cur["uuid"], []) if len(nexts) > 1: raise RuntimeError( f"history chain not linear at {cur['uuid']!r}: {len(nexts)} children" ) cur = nexts[0] if nexts else None if len(ordered) != len(chain): raise RuntimeError( f"reconstructed {len(ordered)} of {len(chain)} histories; chain has cycles or orphans" ) return ordered class ProjectOversizeError(Exception): """Raised when a Pro project's chain replay would exceed the configured cap. `cap_mb` is the trip threshold; `bytes_so_far` is the *encrypted blob* total accumulated when we tripped (pre-decompression, pre-partition). """ def __init__(self, bytes_so_far: int, cap_mb: int) -> None: super().__init__( f"oversize: blob bytes {bytes_so_far // 1024 // 1024} MB > cap {cap_mb} MB" ) self.bytes_so_far = bytes_so_far self.cap_mb = cap_mb def _record_oversize(out_root: Path, uuid: str, err: ProjectOversizeError) -> None: """Append one row to data/state/oshwhub_pro_oversize.jsonl for later review.""" state_path = Path("data/state/oshwhub_pro_oversize.jsonl") state_path.parent.mkdir(parents=True, exist_ok=True) row = { "uuid": uuid, "out_root": str(out_root), "bytes_so_far": err.bytes_so_far, "cap_mb": err.cap_mb, "ts": datetime.now(timezone.utc).isoformat(), } with state_path.open("a") as f: f.write(json.dumps(row, ensure_ascii=False) + "\n") def fetch_pro_source( pro_client: httpx.Client, project_uuid: str, proj_dir: Path, sleep: float = SLEEP_PRO, max_source_mb: int | None = None, ) -> dict: """Dispatcher: pick modern (3.x branch+EPRO2) vs legacy (2.x v2/documents/lists) based on whether project meta contains a non-null branch_uuid. Pro 3.x stores in git-style branch+history with AES-encrypted EPRO2 streams; Pro 2.x predates that and uses Std-style per-doc dataStr served from /api/v2/documents/lists. See docs/sources/easyeda_pro_source.md §1.1. `max_source_mb` only gates modern-path projects (legacy is always tiny: <2 MB in our 2/2 sample) and trips before any blob is written to disk past the cap. """ proj = _pro_get_json(pro_client, f"{PRO_API}/projects/{project_uuid}", project_uuid) time.sleep(sleep) if proj.get("branch_uuid"): return _fetch_pro_modern( pro_client, project_uuid, proj, proj_dir, sleep, max_source_mb=max_source_mb ) return _fetch_pro_legacy(pro_client, project_uuid, proj, proj_dir, sleep) def _fetch_pro_modern( pro_client: httpx.Client, project_uuid: str, proj: dict, proj_dir: Path, sleep: float = SLEEP_PRO, max_source_mb: int | None = None, ) -> dict: """Modern Pro 3.x fetcher: full history chain, AES-GCM decrypted, gunzipped, and partitioned into per-document EPRO2 streams. Side effects under ``proj_dir``: - source/structure.json — project document tree (boards/schematics/sheets/pcbs/...) - source/.epro2 — one file per document, raw EPRO2 messages (newline-separated) - source/manifest.json — per-doc index + chain summary """ import gzip from collections import OrderedDict from Crypto.Cipher import AES # local import: cheap if pycryptodome already loaded src_dir = proj_dir / "source" src_dir.mkdir(parents=True, exist_ok=True) branch_uuid = proj["branch_uuid"] project_editor_version = proj.get("editorVersion") # 2. branch meta -> head history_uuid branch = _pro_get_json( pro_client, f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}", project_uuid, ) head_uuid = branch.get("history_uuid") if not head_uuid: raise RuntimeError(f"no history_uuid (HEAD) on branch {branch_uuid}") time.sleep(sleep) # 3. structure tree st = _pro_get_json( pro_client, f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/structures", project_uuid, ) raw_structure = st.get("structure") structure = json.loads(raw_structure) if isinstance(raw_structure, str) else raw_structure (src_dir / "structure.json").write_text( json.dumps(structure, ensure_ascii=False, indent=2), encoding="utf-8" ) time.sleep(sleep) # 4. history chain — single endpoint returns full chain (HAR-confirmed 2026-04-28) chain = _pro_get_json( pro_client, f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/histories/{head_uuid}", project_uuid, ) if not isinstance(chain, list) or not chain: raise RuntimeError(f"unexpected histories response: {type(chain).__name__}") ordered = _order_history_chain(chain) time.sleep(sleep) # 5. download + decrypt + gunzip + partition by DOCHEAD docs: OrderedDict[str, dict] = OrderedDict() cur_doc: str | None = None bytes_blob_total = 0 bytes_plain_total = 0 cap_bytes = max_source_mb * 1024 * 1024 if max_source_mb is not None else None for h in ordered: blob_r = pro_client.get(h["dataStrUrl"], headers={"path": project_uuid}) blob_r.raise_for_status() blob = blob_r.content bytes_blob_total += len(blob) # Trip cap on the encrypted blob total. Hits *after* the offending # download, but before we decrypt/gunzip/partition (those scale with # plain bytes which is even larger). Wipe any partial source/ so disk # doesn't accumulate junk on multi-project runs. if cap_bytes is not None and bytes_blob_total > cap_bytes: shutil.rmtree(src_dir, ignore_errors=True) raise ProjectOversizeError(bytes_blob_total, max_source_mb) if len(blob) < 16: raise RuntimeError(f"history {h['uuid']} blob too short ({len(blob)} B)") ct, tag = blob[:-16], blob[-16:] cipher = AES.new( bytes.fromhex(h["key"]), AES.MODE_GCM, nonce=bytes.fromhex(h["iv"]), ) gz = cipher.decrypt_and_verify(ct, tag) plain = gzip.decompress(gz) bytes_plain_total += len(plain) for ln in plain.split(b"\n"): if not ln.strip(): continue # EPRO2 lines use `||` as field separator and terminate with a single # `|`. Strip the trailing `|` first so each part parses as bare JSON. stripped = ln.rstrip(b"|") parts = stripped.split(b"||") try: head_msg = json.loads(parts[0]) except Exception: # noqa: BLE001 — malformed head; skip entire line continue if head_msg.get("type") == "DOCHEAD" and len(parts) >= 2: try: payload = json.loads(parts[1]) except Exception: # noqa: BLE001 payload = {} new_doc = payload.get("uuid") if new_doc: cur_doc = new_doc if cur_doc not in docs: docs[cur_doc] = { "lines": [], "doc_head": payload, } if cur_doc and cur_doc in docs: docs[cur_doc]["lines"].append(ln) # CDN host, not the rate-sensitive API host — see SLEEP_PRO_CDN comment. time.sleep(SLEEP_PRO_CDN) # 6. write per-doc .epro2 + manifest doc_metas: list[dict] = [] editor_version: str | None = None for doc_uuid, info in docs.items(): body = b"\n".join(info["lines"]) + b"\n" local_rel = f"source/{doc_uuid}.epro2" local_path = proj_dir / local_rel local_path.write_bytes(body) size = len(body) sha = hashlib.sha256(body).hexdigest() head = info["doc_head"] ev = head.get("editVersion") or head.get("editorVersion") if ev and not editor_version: editor_version = str(ev) doc_metas.append({ "doc_uuid": doc_uuid, "docType": head.get("docType"), # "BOARD" / "PCB" / "SCH" / "SCH_PAGE" / "SYMBOL" / ... "path": local_rel, "size": size, "sha256": sha, "message_count": len(info["lines"]), }) # editor_version fallback: project meta if no DOCHEAD payload had it if not editor_version and project_editor_version: editor_version = str(project_editor_version) structure_summary: dict[str, int] = {} if isinstance(structure, dict): for k, v in structure.items(): if isinstance(v, dict): structure_summary[k] = len(v) elif isinstance(v, list): structure_summary[k] = len(v) manifest = { "project_uuid": project_uuid, "branch_uuid": branch_uuid, "head_uuid": head_uuid, "fetched_at": datetime.now(timezone.utc).isoformat(), "editor_version": editor_version, "chain_length": len(chain), "blob_bytes_total": bytes_blob_total, "plain_bytes_total": bytes_plain_total, "documents": doc_metas, "structure_summary": structure_summary, } (src_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) return { "source_format": "easyeda-pro", "source_path": "source/", "source_documents": doc_metas, "editor_version": editor_version, } def _pro_post_json( client: httpx.Client, url: str, project_uuid: str, body: dict, ) -> object: """POST a pro.lceda.cn endpoint with `path` header, validate envelope.""" r = client.post( url, json=body, headers={"path": project_uuid, "Content-Type": "application/json"}, ) r.raise_for_status() j = r.json() if not j.get("success"): raise RuntimeError(f"Pro API failed (POST {url}): {j}") return j["result"] def _fetch_pro_legacy( pro_client: httpx.Client, project_uuid: str, proj: dict, proj_dir: Path, sleep: float = SLEEP_PRO, ) -> dict: """Legacy Pro 2.x fetcher: project meta has `boards: [{sch, pcb, name}]` and no branch model. Documents are fetched via `/api/v2/documents/lists` (Std-style plaintext dataStr); resources/coppers/textpath/blobs come from supplementary POST endpoints. Reverse-engineered from HAR `tmp/prodownload3.har` (2026-04-28); see docs/sources/easyeda_pro_source.md §1.1. Side effects under ``proj_dir``: - source/ticket.json — full project manifest (counts of all asset types) - source/.json — schematic sheet content (docType=1) - source/pcb_.json — PCB content (docType=3) - source/coppers.json — copper pour data (if any) - source/textpath.json — text path / font data (if any) - source/blobs.json — embedded image blobs (if any) - source/manifest.json — index across all of the above """ src_dir = proj_dir / "source" src_dir.mkdir(parents=True, exist_ok=True) boards = proj.get("boards") or [] if not boards: raise RuntimeError(f"legacy project {project_uuid} has no boards[] in meta") project_editor_version = proj.get("editorVersion") # 1. ticket — full manifest (counts of every asset type the project owns) ticket = pro_client.get( f"https://pro.lceda.cn/api/projects/{project_uuid}/ticket", params={"uuid": project_uuid, "g_ticket": "-1"}, headers={"path": project_uuid}, ) ticket.raise_for_status() ticket_j = ticket.json() if not ticket_j.get("success"): raise RuntimeError(f"ticket endpoint failed: {ticket_j}") manifest_ticket = ticket_j["result"] (src_dir / "ticket.json").write_text( json.dumps(manifest_ticket, ensure_ascii=False, indent=2), encoding="utf-8" ) time.sleep(sleep) doc_metas: list[dict] = [] # 2. schematic containers -> sheet UUIDs via /api/schematic/lists sch_container_uuids = [b["sch"] for b in boards if b.get("sch")] sheet_uuids: list[str] = [] if sch_container_uuids: containers = _pro_post_json( pro_client, "https://pro.lceda.cn/api/schematic/lists", project_uuid, {"uuids": sch_container_uuids}, ) if isinstance(containers, list): for c in containers: for s in c.get("sort") or []: su = s.get("uuid") if su: sheet_uuids.append(su) time.sleep(sleep) # 3. schematic sheets via documents/lists docType=1 (plaintext dataStr per sheet) if sheet_uuids: sheets = _pro_post_json( pro_client, "https://pro.lceda.cn/api/v2/documents/lists", project_uuid, {"uuids": sheet_uuids, "docType": 1}, ) for s in (sheets or []): doc_uuid = s["uuid"] local_rel = f"source/{doc_uuid}.json" text = json.dumps(s, ensure_ascii=False, separators=(",", ":")) (proj_dir / local_rel).write_text(text, encoding="utf-8") doc_metas.append({ "doc_uuid": doc_uuid, "docType": 1, "path": local_rel, "size": len(text.encode("utf-8")), "sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(), }) time.sleep(sleep) # 4. PCB documents via documents/lists docType=3 pcb_uuids = [b["pcb"] for b in boards if b.get("pcb")] if pcb_uuids: pcbs = _pro_post_json( pro_client, "https://pro.lceda.cn/api/v2/documents/lists", project_uuid, {"uuids": pcb_uuids, "docType": 3}, ) for p in (pcbs or []): doc_uuid = p["uuid"] local_rel = f"source/pcb_{doc_uuid}.json" text = json.dumps(p, ensure_ascii=False, separators=(",", ":")) (proj_dir / local_rel).write_text(text, encoding="utf-8") doc_metas.append({ "doc_uuid": doc_uuid, "docType": 3, "path": local_rel, "size": len(text.encode("utf-8")), "sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(), }) time.sleep(sleep) # 5. supplementary PCB layer assets — coppers / textpath / resources (blobs) aux: dict[str, object] = {} copper_paths = list((manifest_ticket.get("coppers") or {}).keys()) if copper_paths: coppers = _pro_post_json( pro_client, "https://pro.lceda.cn/api/coppers/search", project_uuid, {"paths": copper_paths}, ) (src_dir / "coppers.json").write_text( json.dumps(coppers, ensure_ascii=False), encoding="utf-8" ) aux["coppers_count"] = len(coppers) if isinstance(coppers, list) else 0 time.sleep(sleep) textpath_paths = list((manifest_ticket.get("textpath") or {}).keys()) if textpath_paths: textpath = _pro_post_json( pro_client, "https://pro.lceda.cn/api/textpath/search", project_uuid, { "paths": textpath_paths, "project_uuid": project_uuid, "path": project_uuid, }, ) (src_dir / "textpath.json").write_text( json.dumps(textpath, ensure_ascii=False), encoding="utf-8" ) aux["textpath_count"] = len(textpath) if isinstance(textpath, list) else 0 time.sleep(sleep) blob_hashes = list((manifest_ticket.get("blobs") or {}).keys()) if blob_hashes: blobs = _pro_post_json( pro_client, "https://pro.lceda.cn/api/v2/resources/search", project_uuid, {"hash": blob_hashes, "project_uuid": project_uuid}, ) (src_dir / "blobs.json").write_text( json.dumps(blobs, ensure_ascii=False), encoding="utf-8" ) aux["blobs_count"] = len(blobs) if isinstance(blobs, list) else 0 time.sleep(sleep) # 6. manifest.json — overall index structure_summary = { "boards": len(boards), "schematic_containers": len(sch_container_uuids), "schematic_sheets": len(sheet_uuids), "pcbs": len(pcb_uuids), "symbols": len(manifest_ticket.get("symbols") or {}), "footprints": len(manifest_ticket.get("footprints") or {}), "devices": len(manifest_ticket.get("devices") or {}), "coppers": len(manifest_ticket.get("coppers") or {}), "textpath": len(manifest_ticket.get("textpath") or {}), "blobs": len(manifest_ticket.get("blobs") or {}), } manifest = { "project_uuid": project_uuid, "fetched_at": datetime.now(timezone.utc).isoformat(), "editor_version": project_editor_version, "boards": boards, "documents": doc_metas, "structure_summary": structure_summary, "aux": aux, } (src_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) return { "source_format": "easyeda-pro-legacy", "source_path": "source/", "source_documents": doc_metas, "editor_version": project_editor_version, } # --------------------------------------------------------------------------- # Single-project crawl # --------------------------------------------------------------------------- @dc.dataclass class CrawlResult: project_id: str out_dir: Path files_count: int bytes_total: int skipped_files: list[str] def crawl_one( client: httpx.Client, list_item: dict, out_root: Path, fetch_files: bool = True, source_client: httpx.Client | None = None, pro_source_client: httpx.Client | None = None, skip_exts: set[str] | None = None, max_source_mb: int | None = None, ) -> CrawlResult: uuid = list_item["uuid"] path = list_item["path"] proj_dir = out_root / uuid proj_dir.mkdir(parents=True, exist_ok=True) # 1. Fetch detail HTML detail_url = f"{BASE}/{path}" r = client.get(detail_url) r.raise_for_status() detail = parse_detail_html(r.text) polite_sleep() # 2. Cover image thumb_url = list_item["thumb"] if thumb_url.startswith("//"): thumb_url = "https:" + thumb_url cover_rel = None if thumb_url: ext = Path(urllib.parse.urlparse(thumb_url).path).suffix or ".jpg" cover_rel = f"cover{ext}" try: download_to(client, thumb_url, proj_dir / cover_rel) except httpx.HTTPError as e: print(f" cover failed: {e}", file=sys.stderr) cover_rel = None polite_sleep() # 3. Description markdown (combine meta + introduction) desc_md_parts = [f"# {list_item['name']}\n"] if detail.get("description_meta"): desc_md_parts.append(detail["description_meta"].strip()) elif list_item.get("introduction"): desc_md_parts.append(list_item["introduction"].strip()) desc_md_parts.append( f"\n---\n" f"- Source: {detail_url}\n" f"- Author: {list_item['owner'].get('nickname')} " f"({list_item['owner'].get('username')})\n" f"- License: {detail.get('license') or 'unknown'}\n" f"- Published: {list_item.get('oshwhub_publish_at')}\n" ) (proj_dir / "description.md").write_text("\n".join(desc_md_parts), encoding="utf-8") # 4. Files files_meta: list[dict] = [] skipped: list[str] = [] bytes_total = 0 for a in detail.get("attachments", []): src = a.get("src") or "" if not src: continue file_url = IMG_CDN + src if src.startswith("/") else src name = a.get("name") or Path(src).name safe_name = re.sub(r'[/\\:*?"<>|]', "_", name) local_rel = f"files/{safe_name}" local_path = proj_dir / local_rel entry: dict = { "name": name, "url": file_url, "original_id": a.get("uuid"), "ext": a.get("ext"), "mime": a.get("mime"), "size": a.get("size"), "md5": a.get("md5"), } # ext gate: declared `ext` first, fall back to filename suffix. Lower-case # compare; entry kept in metadata so we can re-fetch later if policy changes. ext_token = (a.get("ext") or Path(safe_name).suffix.lstrip(".")).lower() if skip_exts and ext_token in skip_exts: entry["skipped"] = f"ext:{ext_token}" skipped.append(f"{name}: ext:{ext_token}") files_meta.append(entry) continue if fetch_files: try: size, sha = download_to(client, file_url, local_path) entry["path"] = local_rel entry["sha256"] = sha if entry.get("size") and entry["size"] != size: entry["size_actual"] = size else: entry["size"] = size bytes_total += size except httpx.HTTPError as e: skipped.append(f"{name}: {e}") print(f" file skipped {name}: {e}", file=sys.stderr) polite_sleep() files_meta.append(entry) # 5. URL manifest (for files we couldn't download or for future re-download) urls_manifest = { "detail_url": detail_url, "cover_url": thumb_url, "attachments": [ {"name": f["name"], "url": f["url"], "original_id": f.get("original_id")} for f in files_meta ], } (proj_dir / "_urls.json").write_text( json.dumps(urls_manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) # 6. Optional: EasyEDA project source — dispatch on origin (std vs pro) src_meta: dict = {} origin = list_item.get("origin") if origin == "pro" and pro_source_client is not None: try: src_meta = fetch_pro_source( pro_source_client, uuid, proj_dir, max_source_mb=max_source_mb ) print( f" pro source: {len(src_meta.get('source_documents', []))} docs, " f"editor={src_meta.get('editor_version')}" ) except ProjectOversizeError as e: print(f" pro source SKIPPED (oversize): {e}", file=sys.stderr) skipped.append(f"pro_source: oversize ({e.bytes_so_far // 1024 // 1024} MB > {e.cap_mb} MB)") _record_oversize(out_root, uuid, e) except Exception as e: # noqa: BLE001 print(f" pro source FAIL: {e}", file=sys.stderr) skipped.append(f"pro_source: {e}") elif origin != "pro" and source_client is not None: try: src_meta = fetch_std_source(source_client, uuid, proj_dir) print( f" source: {len(src_meta.get('source_documents', []))} docs, " f"editor={src_meta.get('editor_version')}" ) except Exception as e: # noqa: BLE001 print(f" source FAIL: {e}", file=sys.stderr) skipped.append(f"source: {e}") # 7. Unified metadata meta = { "source": "oshwhub", "source_url": detail_url, "project_id": uuid, "title": detail.get("title") or list_item["name"], "description_short": list_item.get("introduction") or "", "description_path": "description.md", "author": { "username": list_item["owner"]["username"], "display_name": list_item["owner"].get("nickname"), "user_id": list_item["owner"].get("uuid"), }, "license": detail.get("license") or "unknown", "tags": list_item.get("tags") or [], "created_at": list_item.get("created_at"), "updated_at": list_item.get("updated_at"), "published_at": list_item.get("oshwhub_publish_at"), "crawled_at": datetime.now(timezone.utc).isoformat(), "metrics": { "likes": list_item["count"]["like"], "stars": list_item["count"]["star"], "forks": list_item["count"]["fork"], "views": list_item["count"]["views"], "watch": list_item["count"].get("watch", 0), "comments": list_item.get("comments_count", 0), }, "cover": {"url": thumb_url, "path": cover_rel} if thumb_url else None, "files": files_meta, "raw_fields": { "path": list_item["path"], "grade": list_item.get("grade"), "origin": list_item.get("origin"), "public": list_item.get("public"), "publish": list_item.get("publish"), "skipped_files": skipped, }, } if src_meta: meta["source_format"] = src_meta["source_format"] meta["source_path"] = src_meta["source_path"] meta["source_documents"] = src_meta["source_documents"] if src_meta.get("editor_version"): meta["editor_version"] = src_meta["editor_version"] (proj_dir / "metadata.json").write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" ) return CrawlResult( project_id=uuid, out_dir=proj_dir, files_count=len(files_meta), bytes_total=bytes_total, skipped_files=skipped, ) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def iter_candidates( client: httpx.Client, pages: int, page_size: int, sort: str, origin: str | None = None, ) -> Iterator[dict]: for p in range(1, pages + 1): res = list_projects(client, page=p, page_size=page_size, sort=sort, origin=origin) for it in res["lists"]: yield it polite_sleep() def main(argv: list[str] | None = None) -> int: ap = argparse.ArgumentParser(description="oshwhub MVP crawler") ap.add_argument("--out", type=Path, default=Path("data/raw/oshwhub")) ap.add_argument("--top", type=int, default=10, help="number of projects to crawl") ap.add_argument("--min-likes", type=int, default=50) ap.add_argument("--min-grade", type=int, default=4) ap.add_argument("--pages", type=int, default=3, help="list API pages to scan") ap.add_argument("--page-size", type=int, default=30) ap.add_argument("--sort", default="hot") ap.add_argument( "--origin", choices=["std", "pro"], default=None, help="filter listing API by origin (server-side); needed to find Pro projects in top-N", ) ap.add_argument("--uuids", type=str, default=None, help="comma-separated explicit UUID list") ap.add_argument("--no-files", action="store_true", help="do not download attachments") ap.add_argument("--limit", type=int, default=None, help="override --top, same effect") ap.add_argument( "--with-source", action="store_true", help="also fetch EasyEDA Std project source (schematic + PCB dataStr) per project", ) ap.add_argument( "--backfill-source", action="store_true", help="skip listing/HTML/attachments; only fetch source for projects already in --out", ) ap.add_argument( "--with-pro-source", action="store_true", help="also fetch EasyEDA Pro project source (full history chain, EPRO2 streams) per project", ) ap.add_argument( "--backfill-pro-source", action="store_true", help="skip listing; only fetch Pro source for origin=pro projects already in --out", ) ap.add_argument( "--pro-cookie", type=str, default=PRO_COOKIE_PATH_DEFAULT, help="path to file with Cookie header for pro.lceda.cn", ) ap.add_argument( "--skip-ext", type=str, default=None, help="comma-separated list of attachment extensions to skip (e.g. mp4,qt,mov). " "Saves ~30-50%% LFS storage on average. Entry still recorded in metadata.json " "with skipped:ext: so we can re-fetch later.", ) ap.add_argument( "--max-source-mb", type=int, default=None, help="skip Pro modern projects whose chain blob total exceeds N MB. " "Trips inside the chain loop, wipes partial source/, records to " "data/state/oshwhub_pro_oversize.jsonl. No effect on Std or Pro 2.x legacy.", ) args = ap.parse_args(argv) skip_exts: set[str] | None = ( {x.strip().lower().lstrip(".") for x in args.skip_ext.split(",") if x.strip()} if args.skip_ext else None ) n_target = args.limit if args.limit is not None else args.top args.out.mkdir(parents=True, exist_ok=True) # --backfill-source: standalone path that scans existing project dirs and # only fetches source. No listing/HTML/attachment work. if args.backfill_source: return _run_backfill_source(args.out, only_uuids=args.uuids) if args.backfill_pro_source: return _run_backfill_pro_source( args.out, only_uuids=args.uuids, cookie_path=args.pro_cookie, max_source_mb=args.max_source_mb, ) with make_client() as client: # Build list of items to crawl if args.uuids: wanted = set(args.uuids.split(",")) items: list[dict] = [] for it in iter_candidates( client, args.pages, args.page_size, args.sort, origin=args.origin ): if it["uuid"] in wanted: items.append(it) if len(items) == len(wanted): break if len(items) < len(wanted): missing = wanted - {i["uuid"] for i in items} print(f"WARN: missing uuids (not in top pages): {missing}", file=sys.stderr) else: pool = list( iter_candidates( client, args.pages, args.page_size, args.sort, origin=args.origin ) ) items = pick_top( pool, n=n_target, min_likes=args.min_likes, min_grade=args.min_grade ) if len(items) < n_target: print( f"WARN: only {len(items)} items passed filters " f"(wanted {n_target})", file=sys.stderr, ) source_client_ctx = make_source_client() if args.with_source else None pro_source_client_ctx = ( make_pro_source_client(args.pro_cookie) if args.with_pro_source else None ) try: print(f"Crawling {len(items)} projects -> {args.out}") for i, it in enumerate(items, 1): print(f"[{i}/{len(items)}] {it['path']} ({it['name']})") try: r = crawl_one( client, it, args.out, fetch_files=not args.no_files, source_client=source_client_ctx, pro_source_client=pro_source_client_ctx, skip_exts=skip_exts, max_source_mb=args.max_source_mb, ) print( f" OK: {r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB " f"(skipped: {len(r.skipped_files)})" ) except Exception as e: print(f" FAIL: {e}", file=sys.stderr) finally: if source_client_ctx is not None: source_client_ctx.close() if pro_source_client_ctx is not None: pro_source_client_ctx.close() return 0 def _run_backfill_source(out_root: Path, only_uuids: str | None = None) -> int: """Walk existing per-project dirs in out_root and fetch source.json into each. Updates metadata.json in-place to add source_format / source_documents / editor_version. """ wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None targets: list[Path] = [] for d in sorted(out_root.iterdir()): if not d.is_dir(): continue meta_path = d / "metadata.json" if not meta_path.exists(): continue if wanted and d.name not in wanted: continue targets.append(d) print(f"Backfill source for {len(targets)} projects under {out_root}") src_client = make_source_client() try: for i, proj_dir in enumerate(targets, 1): uuid = proj_dir.name meta_path = proj_dir / "metadata.json" meta = json.loads(meta_path.read_text(encoding="utf-8")) print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})") try: src_meta = fetch_std_source(src_client, uuid, proj_dir) except Exception as e: # noqa: BLE001 print(f" FAIL: {e}", file=sys.stderr) continue meta["source_format"] = src_meta["source_format"] meta["source_path"] = src_meta["source_path"] meta["source_documents"] = src_meta["source_documents"] if src_meta.get("editor_version"): meta["editor_version"] = src_meta["editor_version"] meta_path.write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" ) total = sum(d["size"] for d in src_meta["source_documents"]) print( f" OK: {len(src_meta['source_documents'])} docs, " f"{total / 1024:.1f} KB, editor={src_meta.get('editor_version')}" ) finally: src_client.close() return 0 def _run_backfill_pro_source( out_root: Path, only_uuids: str | None = None, cookie_path: str = PRO_COOKIE_PATH_DEFAULT, max_source_mb: int | None = None, ) -> int: """Walk per-project dirs in out_root, fetch Pro source for origin=pro projects. A project is considered Pro by either: existing metadata.json marks raw_fields.origin == 'pro', OR --uuids was passed and includes this UUID (caller is asserting Pro). """ wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None targets: list[Path] = [] for d in sorted(out_root.iterdir()): if not d.is_dir(): continue meta_path = d / "metadata.json" if not meta_path.exists(): continue if wanted is not None: if d.name not in wanted: continue else: try: m = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: # noqa: BLE001 continue if (m.get("raw_fields") or {}).get("origin") != "pro": continue targets.append(d) print(f"Backfill pro source for {len(targets)} projects under {out_root}") pro_client = make_pro_source_client(cookie_path=cookie_path) try: for i, proj_dir in enumerate(targets, 1): uuid = proj_dir.name meta_path = proj_dir / "metadata.json" meta = json.loads(meta_path.read_text(encoding="utf-8")) print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})") try: src_meta = fetch_pro_source( pro_client, uuid, proj_dir, max_source_mb=max_source_mb ) except ProjectOversizeError as e: print(f" SKIPPED (oversize): {e}", file=sys.stderr) _record_oversize(out_root, uuid, e) continue except Exception as e: # noqa: BLE001 print(f" FAIL: {e}", file=sys.stderr) continue meta["source_format"] = src_meta["source_format"] meta["source_path"] = src_meta["source_path"] meta["source_documents"] = src_meta["source_documents"] if src_meta.get("editor_version"): meta["editor_version"] = src_meta["editor_version"] meta_path.write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8" ) total = sum(d["size"] for d in src_meta["source_documents"]) print( f" OK: {len(src_meta['source_documents'])} docs, " f"{total / 1024 / 1024:.1f} MB plain, editor={src_meta.get('editor_version')}" ) finally: pro_client.close() return 0 if __name__ == "__main__": raise SystemExit(main())