Files
FacereDataset/crawlers/oshwhub/crawler.py
Knowit 1e06ba6582 crawler: split sleep policy by host — chain blob fetches drop 5s -> 0.2s
The Pro modern fetch_pro_modern walks a per-history blob loop on
modules.lceda.cn (CDN-flavored host serving AES-encrypted EPRO2 streams).
We were sleeping 5s between every blob — same rate we use for the
rate-sensitive pro.lceda.cn API host. HAR analysis (proexportNew2.har)
shows the editor fires these blobs back-to-back without throttling, so
0.2s is plenty.

Walltime drops linearly with chain length:
  ESP-VoCat (chain=12):    80s sleep -> 22s sleep  (-72%)
  220V power (chain=28):  160s sleep -> 26s sleep  (-84%)
  X86 board  (chain~700, projection):  ~1h -> ~3min

Verified by re-fetching ESP-VoCat + 220V power: byte-identical output
across all per-doc .epro2 files (sha256 match), only fetched_at
timestamp differs in manifest.json. Two manifest files re-stamped as
proof of the validation runs.

API host sleeps (4x 5s in modern fetcher, 7x 5s in legacy fetcher) are
unchanged — those go to pro.lceda.cn /api/ which still wants polite
QPS<=0.2.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 00:09:19 +08:00

1280 lines
46 KiB
Python

"""oshwhub.com crawler — MVP.
Usage:
uv run python -m crawlers.oshwhub \
--out data/raw/oshwhub \
--top 10 --min-likes 50 --min-grade 4
Or with explicit UUID list:
uv run python -m crawlers.oshwhub \
--uuids 298873b7fdbe44f8ba0e7351e023bc2c,7b6a398811f14eba9a952b8d2ddd7ace \
--out data/raw/oshwhub
"""
from __future__ import annotations
import argparse
import dataclasses as dc
import hashlib
import html as _html
import json
import re
import sys
import time
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import httpx
API_LIST = "https://oshwhub.com/api/project"
API_PROJECT = "https://oshwhub.com/api/project" # /api/project/<uuid> for source flow
BASE = "https://oshwhub.com"
IMG_CDN = "https://image.lceda.cn"
LCEDA_DOC_API = "https://lceda.cn/api/documents"
PRO_API = "https://pro.lceda.cn/api/v4"
PRO_EDITOR_VERSION = "3.2.127"
PRO_COOKIE_PATH_DEFAULT = "/home/ubuntu/.secrets/pro-lceda-cookie-header.txt"
UA = "FacereDataset/0.1 (+https://git.deepknow.site/Facere/FacereDataset)"
# Std source endpoints reject FacereDataset UA on oshwhub /api/project; spoof browser UA only there.
# See docs/sources/easyeda_std_source.md §3.
BROWSER_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/147.0.0.0 Safari/537.36"
)
SLEEP_BETWEEN = 2.0 # seconds between detail-page / file fetches
SLEEP_SOURCE = 5.0 # source fetch is sensitive — QPS ≤ 0.2 per CLAUDE.md登录态 spirit
SLEEP_PRO = 5.0 # Pro API host (pro.lceda.cn): rate-sensitive, keep at QPS ≤ 0.2
# CDN host (modules.lceda.cn) only serves AES-encrypted history blobs.
# HAR analysis (proexportNew2.har 2026-04-29) shows the editor fires these
# blobs back-to-back without throttling — the CDN can clearly take it.
# Walltime for chain replay is dominated by this loop on multi-hundred-history
# projects (X86 board: chain ≈ 700 → ~1h at 5s/req → ~few min at 0.2s/req).
SLEEP_PRO_CDN = 0.2
# ---------------------------------------------------------------------------
# HTTP
# ---------------------------------------------------------------------------
def make_client(timeout: float = 30.0) -> httpx.Client:
return httpx.Client(
http2=True,
timeout=timeout,
headers={"User-Agent": UA, "Accept": "text/html,application/json;q=0.9,*/*;q=0.8"},
follow_redirects=True,
)
def make_source_client(timeout: float = 60.0) -> httpx.Client:
"""Client for Std source endpoints (lceda.cn/oshwhub.com /api/...).
Uses browser UA + editor Referer to satisfy server-side UA filter.
"""
return httpx.Client(
http2=True,
timeout=timeout,
headers={
"User-Agent": BROWSER_UA,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://lceda.cn/editor",
},
follow_redirects=False,
)
def make_pro_source_client(
cookie_path: str = PRO_COOKIE_PATH_DEFAULT,
timeout: float = 90.0,
) -> httpx.Client:
"""Client for Pro source endpoints (pro.lceda.cn /api/v4/...).
Requires logged-in cookie header at `cookie_path` (mode 600). The cookie
file is a single Cookie header value (e.g. `lceda_pro_session=...; XSRF-TOKEN=...`).
Per-request `path: <project_uuid>` header MUST be added by callers — see
docs/sources/easyeda_pro_source.md §2.5.
"""
cookie = Path(cookie_path).read_text(encoding="utf-8").strip()
return httpx.Client(
http2=True,
timeout=timeout,
headers={
"User-Agent": BROWSER_UA,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Editor-Version": PRO_EDITOR_VERSION,
"Referer": "https://pro.lceda.cn/editor",
"Cookie": cookie,
},
follow_redirects=False,
)
def polite_sleep() -> None:
time.sleep(SLEEP_BETWEEN)
# ---------------------------------------------------------------------------
# Listing
# ---------------------------------------------------------------------------
def list_projects(
client: httpx.Client,
page: int = 1,
page_size: int = 30,
sort: str = "hot",
origin: str | None = None,
) -> dict:
params: dict[str, object] = {"page": page, "pageSize": page_size, "sort": sort}
if origin:
params["origin"] = origin # 'std' or 'pro' — server-side filter
r = client.get(API_LIST, params=params)
r.raise_for_status()
data = r.json()
if not data.get("success"):
raise RuntimeError(f"list API failed: {data}")
return data["result"]
def rank_score(item: dict) -> float:
"""Composite quality score: favor projects with broad engagement."""
c = item["count"]
return (
c["like"] * 3
+ c["star"] * 1
+ c["fork"] * 2
+ c["views"] / 100
+ item["comments_count"] * 2
+ (item.get("grade") or 0) * 50
)
def pick_top(
items: list[dict],
n: int,
min_likes: int,
min_grade: int,
exclude_copies: bool = True,
) -> list[dict]:
filtered = []
for it in items:
if exclude_copies and "_copy" in it["path"]:
continue
if it["count"]["like"] < min_likes:
continue
if (it.get("grade") or 0) < min_grade:
continue
filtered.append(it)
filtered.sort(key=rank_score, reverse=True)
return filtered[:n]
# ---------------------------------------------------------------------------
# Detail page parsing
# ---------------------------------------------------------------------------
RE_ATTACH_BLOCK = re.compile(r'\\"attachments\\":\[', re.DOTALL)
RE_LICENSE = re.compile(r'\\"license\\":\\"([^\\"]+)\\"')
RE_META_DESC = re.compile(
r'<meta\s+name="description"\s+content="([^"]*)"', re.IGNORECASE
)
RE_TITLE = re.compile(r"<title>([^<]+)</title>", re.IGNORECASE)
def _find_balanced_bracket(s: str, start: int, open_ch: str = "[", close_ch: str = "]") -> int:
"""Return index after the matching close bracket. start must point at open_ch."""
assert s[start] == open_ch
depth = 0
for i in range(start, len(s)):
ch = s[i]
if ch == open_ch:
depth += 1
elif ch == close_ch:
depth -= 1
if depth == 0:
return i + 1
raise ValueError("unbalanced")
def parse_detail_html(h: str) -> dict:
"""Extract attachments, license, title, description from SSR HTML."""
out: dict = {
"title": None,
"description_meta": None,
"license": None,
"attachments": [],
}
m = RE_TITLE.search(h)
if m:
# HTML entities + suffix stripping
title = _html.unescape(m.group(1)).strip()
for sfx in (
" - 立创开源硬件平台 - 深圳创电优选科技有限公司",
" - 立创开源硬件平台",
):
if title.endswith(sfx):
title = title[: -len(sfx)]
out["title"] = title
m = RE_META_DESC.search(h)
if m:
out["description_meta"] = _html.unescape(m.group(1))
m = RE_LICENSE.search(h)
if m:
out["license"] = m.group(1)
m = RE_ATTACH_BLOCK.search(h)
if m:
arr_start = m.end() - 1 # point at '['
arr_end = _find_balanced_bracket(h, arr_start)
block = h[arr_start:arr_end]
clean = block.replace('\\"', '"').replace("\\\\", "\\")
try:
out["attachments"] = json.loads(clean)
except json.JSONDecodeError as e:
# Keep raw for debugging; skip attachments silently. Caller can log.
out["_attachments_parse_error"] = str(e)
return out
# ---------------------------------------------------------------------------
# Download helpers
# ---------------------------------------------------------------------------
def download_to(client: httpx.Client, url: str, dest: Path) -> tuple[int, str]:
"""Stream-download url to dest. Returns (size, sha256)."""
dest.parent.mkdir(parents=True, exist_ok=True)
h = hashlib.sha256()
size = 0
with client.stream("GET", url) as r:
r.raise_for_status()
with open(dest, "wb") as f:
for chunk in r.iter_bytes(1 << 15):
f.write(chunk)
h.update(chunk)
size += len(chunk)
return size, h.hexdigest()
# ---------------------------------------------------------------------------
# Std source fetch (login NOT required for public projects — see
# docs/sources/easyeda_std_source.md)
# ---------------------------------------------------------------------------
def fetch_std_source(
source_client: httpx.Client,
project_uuid: str,
proj_dir: Path,
sleep: float = SLEEP_SOURCE,
) -> dict:
"""Fetch EasyEDA Std project source (schematic + PCB dataStr) anonymously.
Returns dict with keys:
- source_format: "easyeda-std"
- source_path: "source/"
- source_documents: list of {doc_uuid, docType, master, path, size, sha256}
- editor_version: from dataStr.head when available
"""
src_dir = proj_dir / "source"
src_dir.mkdir(parents=True, exist_ok=True)
# 1. Project meta → version_documents
r = source_client.get(f"{API_PROJECT}/{project_uuid}")
r.raise_for_status()
j = r.json()
if not j.get("success"):
raise RuntimeError(f"oshwhub project meta failed: {j}")
version_documents = j["result"].get("version_documents") or []
time.sleep(sleep)
# 2. Per document → dataStr
doc_metas: list[dict] = []
editor_version: str | None = None
for vd in version_documents:
doc_uuid = vd["uuid"]
master = vd.get("master")
doc_type = vd.get("docType")
url = f"{LCEDA_DOC_API}/{doc_uuid}"
r2 = source_client.get(url, params={"uuid": doc_uuid, "path": doc_uuid})
r2.raise_for_status()
# Server returns text/html mistakenly; body is JSON regardless.
try:
body_json = r2.json()
except Exception as e: # noqa: BLE001
raise RuntimeError(f"doc {doc_uuid} non-JSON response: {e}; head={r2.text[:200]!r}")
if not body_json.get("success"):
raise RuntimeError(f"doc {doc_uuid} response not success: {body_json}")
local_rel = f"source/{doc_uuid}.json"
local_path = proj_dir / local_rel
text = json.dumps(body_json, ensure_ascii=False, separators=(",", ":"))
local_path.write_text(text, encoding="utf-8")
size = local_path.stat().st_size
sha = hashlib.sha256(text.encode("utf-8")).hexdigest()
# Pull editor version from the dataStr.head if present.
ev = _extract_editor_version(body_json)
if ev and not editor_version:
editor_version = ev
doc_metas.append({
"doc_uuid": doc_uuid,
"docType": doc_type,
"master": master,
"path": local_rel,
"size": size,
"sha256": sha,
})
time.sleep(sleep)
# 3. source/manifest.json — index + raw upstream version_documents for diffing
manifest = {
"project_uuid": project_uuid,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"editor_version": editor_version,
"documents": doc_metas,
"upstream_version_documents": version_documents,
}
(src_dir / "manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
return {
"source_format": "easyeda-std",
"source_path": "source/",
"source_documents": doc_metas,
"editor_version": editor_version,
}
def _extract_editor_version(body_json: dict) -> str | None:
"""Best-effort: pull head.editorVersion from dataStr (location varies by docType)."""
res = body_json.get("result") or {}
# PCB shape: result.dataStr at top
ds = res.get("dataStr")
if isinstance(ds, dict):
head = ds.get("head") or {}
if isinstance(head, dict) and head.get("editorVersion"):
return str(head["editorVersion"])
# Schematic shape: result.schematics[*].dataStr
for sch in (res.get("schematics") or []):
if isinstance(sch, dict):
ds2 = sch.get("dataStr") or {}
if isinstance(ds2, dict):
head = ds2.get("head") or {}
if isinstance(head, dict) and head.get("editorVersion"):
return str(head["editorVersion"])
return None
# ---------------------------------------------------------------------------
# Pro source fetch (pro.lceda.cn — EPRO2 message stream, AES-128-GCM)
# See docs/sources/easyeda_pro_source.md.
# ---------------------------------------------------------------------------
def _pro_get_json(
client: httpx.Client,
url: str,
project_uuid: str,
*,
params: dict | None = None,
) -> dict | list:
"""GET a pro.lceda.cn /api/v4 endpoint with the per-project `path` header.
Raises if the JSON envelope's `success` is False; returns `result`.
"""
r = client.get(url, params=params, headers={"path": project_uuid})
r.raise_for_status()
j = r.json()
if not j.get("success"):
raise RuntimeError(f"Pro API failed (url={url}): {j}")
return j["result"]
def _order_history_chain(chain: list[dict]) -> list[dict]:
"""Return the chain ordered root→HEAD by walking parent links.
Pro returns the chain HEAD-first as a flat list with `parent` links. We
walk from the unique root forward.
"""
by_uuid = {h["uuid"]: h for h in chain}
roots = [h for h in chain if h.get("parent") not in by_uuid]
if len(roots) != 1:
raise RuntimeError(
f"history chain has {len(roots)} roots; not strictly linear"
)
children: dict[str | None, list[dict]] = {}
for h in chain:
children.setdefault(h.get("parent"), []).append(h)
ordered: list[dict] = []
cur: dict | None = roots[0]
while cur is not None:
ordered.append(cur)
nexts = children.get(cur["uuid"], [])
if len(nexts) > 1:
raise RuntimeError(
f"history chain not linear at {cur['uuid']!r}: {len(nexts)} children"
)
cur = nexts[0] if nexts else None
if len(ordered) != len(chain):
raise RuntimeError(
f"reconstructed {len(ordered)} of {len(chain)} histories; chain has cycles or orphans"
)
return ordered
def fetch_pro_source(
pro_client: httpx.Client,
project_uuid: str,
proj_dir: Path,
sleep: float = SLEEP_PRO,
) -> dict:
"""Dispatcher: pick modern (3.x branch+EPRO2) vs legacy (2.x v2/documents/lists)
based on whether project meta contains a non-null branch_uuid.
Pro 3.x stores in git-style branch+history with AES-encrypted EPRO2 streams;
Pro 2.x predates that and uses Std-style per-doc dataStr served from
/api/v2/documents/lists. See docs/sources/easyeda_pro_source.md §1.1.
"""
proj = _pro_get_json(pro_client, f"{PRO_API}/projects/{project_uuid}", project_uuid)
time.sleep(sleep)
if proj.get("branch_uuid"):
return _fetch_pro_modern(pro_client, project_uuid, proj, proj_dir, sleep)
return _fetch_pro_legacy(pro_client, project_uuid, proj, proj_dir, sleep)
def _fetch_pro_modern(
pro_client: httpx.Client,
project_uuid: str,
proj: dict,
proj_dir: Path,
sleep: float = SLEEP_PRO,
) -> dict:
"""Modern Pro 3.x fetcher: full history chain, AES-GCM decrypted, gunzipped,
and partitioned into per-document EPRO2 streams.
Side effects under ``proj_dir``:
- source/structure.json — project document tree (boards/schematics/sheets/pcbs/...)
- source/<doc_uuid>.epro2 — one file per document, raw EPRO2 messages (newline-separated)
- source/manifest.json — per-doc index + chain summary
"""
import gzip
from collections import OrderedDict
from Crypto.Cipher import AES # local import: cheap if pycryptodome already loaded
src_dir = proj_dir / "source"
src_dir.mkdir(parents=True, exist_ok=True)
branch_uuid = proj["branch_uuid"]
project_editor_version = proj.get("editorVersion")
# 2. branch meta -> head history_uuid
branch = _pro_get_json(
pro_client,
f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}",
project_uuid,
)
head_uuid = branch.get("history_uuid")
if not head_uuid:
raise RuntimeError(f"no history_uuid (HEAD) on branch {branch_uuid}")
time.sleep(sleep)
# 3. structure tree
st = _pro_get_json(
pro_client,
f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/structures",
project_uuid,
)
raw_structure = st.get("structure")
structure = json.loads(raw_structure) if isinstance(raw_structure, str) else raw_structure
(src_dir / "structure.json").write_text(
json.dumps(structure, ensure_ascii=False, indent=2), encoding="utf-8"
)
time.sleep(sleep)
# 4. history chain — single endpoint returns full chain (HAR-confirmed 2026-04-28)
chain = _pro_get_json(
pro_client,
f"{PRO_API}/projects/{project_uuid}/branches/{branch_uuid}/histories/{head_uuid}",
project_uuid,
)
if not isinstance(chain, list) or not chain:
raise RuntimeError(f"unexpected histories response: {type(chain).__name__}")
ordered = _order_history_chain(chain)
time.sleep(sleep)
# 5. download + decrypt + gunzip + partition by DOCHEAD
docs: OrderedDict[str, dict] = OrderedDict()
cur_doc: str | None = None
bytes_blob_total = 0
bytes_plain_total = 0
for h in ordered:
blob_r = pro_client.get(h["dataStrUrl"], headers={"path": project_uuid})
blob_r.raise_for_status()
blob = blob_r.content
bytes_blob_total += len(blob)
if len(blob) < 16:
raise RuntimeError(f"history {h['uuid']} blob too short ({len(blob)} B)")
ct, tag = blob[:-16], blob[-16:]
cipher = AES.new(
bytes.fromhex(h["key"]),
AES.MODE_GCM,
nonce=bytes.fromhex(h["iv"]),
)
gz = cipher.decrypt_and_verify(ct, tag)
plain = gzip.decompress(gz)
bytes_plain_total += len(plain)
for ln in plain.split(b"\n"):
if not ln.strip():
continue
# EPRO2 lines use `||` as field separator and terminate with a single
# `|`. Strip the trailing `|` first so each part parses as bare JSON.
stripped = ln.rstrip(b"|")
parts = stripped.split(b"||")
try:
head_msg = json.loads(parts[0])
except Exception: # noqa: BLE001 — malformed head; skip entire line
continue
if head_msg.get("type") == "DOCHEAD" and len(parts) >= 2:
try:
payload = json.loads(parts[1])
except Exception: # noqa: BLE001
payload = {}
new_doc = payload.get("uuid")
if new_doc:
cur_doc = new_doc
if cur_doc not in docs:
docs[cur_doc] = {
"lines": [],
"doc_head": payload,
}
if cur_doc and cur_doc in docs:
docs[cur_doc]["lines"].append(ln)
# CDN host, not the rate-sensitive API host — see SLEEP_PRO_CDN comment.
time.sleep(SLEEP_PRO_CDN)
# 6. write per-doc .epro2 + manifest
doc_metas: list[dict] = []
editor_version: str | None = None
for doc_uuid, info in docs.items():
body = b"\n".join(info["lines"]) + b"\n"
local_rel = f"source/{doc_uuid}.epro2"
local_path = proj_dir / local_rel
local_path.write_bytes(body)
size = len(body)
sha = hashlib.sha256(body).hexdigest()
head = info["doc_head"]
ev = head.get("editVersion") or head.get("editorVersion")
if ev and not editor_version:
editor_version = str(ev)
doc_metas.append({
"doc_uuid": doc_uuid,
"docType": head.get("docType"), # "BOARD" / "PCB" / "SCH" / "SCH_PAGE" / "SYMBOL" / ...
"path": local_rel,
"size": size,
"sha256": sha,
"message_count": len(info["lines"]),
})
# editor_version fallback: project meta if no DOCHEAD payload had it
if not editor_version and project_editor_version:
editor_version = str(project_editor_version)
structure_summary: dict[str, int] = {}
if isinstance(structure, dict):
for k, v in structure.items():
if isinstance(v, dict):
structure_summary[k] = len(v)
elif isinstance(v, list):
structure_summary[k] = len(v)
manifest = {
"project_uuid": project_uuid,
"branch_uuid": branch_uuid,
"head_uuid": head_uuid,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"editor_version": editor_version,
"chain_length": len(chain),
"blob_bytes_total": bytes_blob_total,
"plain_bytes_total": bytes_plain_total,
"documents": doc_metas,
"structure_summary": structure_summary,
}
(src_dir / "manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
return {
"source_format": "easyeda-pro",
"source_path": "source/",
"source_documents": doc_metas,
"editor_version": editor_version,
}
def _pro_post_json(
client: httpx.Client,
url: str,
project_uuid: str,
body: dict,
) -> object:
"""POST a pro.lceda.cn endpoint with `path` header, validate envelope."""
r = client.post(
url,
json=body,
headers={"path": project_uuid, "Content-Type": "application/json"},
)
r.raise_for_status()
j = r.json()
if not j.get("success"):
raise RuntimeError(f"Pro API failed (POST {url}): {j}")
return j["result"]
def _fetch_pro_legacy(
pro_client: httpx.Client,
project_uuid: str,
proj: dict,
proj_dir: Path,
sleep: float = SLEEP_PRO,
) -> dict:
"""Legacy Pro 2.x fetcher: project meta has `boards: [{sch, pcb, name}]` and
no branch model. Documents are fetched via `/api/v2/documents/lists` (Std-style
plaintext dataStr); resources/coppers/textpath/blobs come from supplementary
POST endpoints. Reverse-engineered from HAR `tmp/prodownload3.har`
(2026-04-28); see docs/sources/easyeda_pro_source.md §1.1.
Side effects under ``proj_dir``:
- source/ticket.json — full project manifest (counts of all asset types)
- source/<sheet_uuid>.json — schematic sheet content (docType=1)
- source/pcb_<pcb_uuid>.json — PCB content (docType=3)
- source/coppers.json — copper pour data (if any)
- source/textpath.json — text path / font data (if any)
- source/blobs.json — embedded image blobs (if any)
- source/manifest.json — index across all of the above
"""
src_dir = proj_dir / "source"
src_dir.mkdir(parents=True, exist_ok=True)
boards = proj.get("boards") or []
if not boards:
raise RuntimeError(f"legacy project {project_uuid} has no boards[] in meta")
project_editor_version = proj.get("editorVersion")
# 1. ticket — full manifest (counts of every asset type the project owns)
ticket = pro_client.get(
f"https://pro.lceda.cn/api/projects/{project_uuid}/ticket",
params={"uuid": project_uuid, "g_ticket": "-1"},
headers={"path": project_uuid},
)
ticket.raise_for_status()
ticket_j = ticket.json()
if not ticket_j.get("success"):
raise RuntimeError(f"ticket endpoint failed: {ticket_j}")
manifest_ticket = ticket_j["result"]
(src_dir / "ticket.json").write_text(
json.dumps(manifest_ticket, ensure_ascii=False, indent=2), encoding="utf-8"
)
time.sleep(sleep)
doc_metas: list[dict] = []
# 2. schematic containers -> sheet UUIDs via /api/schematic/lists
sch_container_uuids = [b["sch"] for b in boards if b.get("sch")]
sheet_uuids: list[str] = []
if sch_container_uuids:
containers = _pro_post_json(
pro_client,
"https://pro.lceda.cn/api/schematic/lists",
project_uuid,
{"uuids": sch_container_uuids},
)
if isinstance(containers, list):
for c in containers:
for s in c.get("sort") or []:
su = s.get("uuid")
if su:
sheet_uuids.append(su)
time.sleep(sleep)
# 3. schematic sheets via documents/lists docType=1 (plaintext dataStr per sheet)
if sheet_uuids:
sheets = _pro_post_json(
pro_client,
"https://pro.lceda.cn/api/v2/documents/lists",
project_uuid,
{"uuids": sheet_uuids, "docType": 1},
)
for s in (sheets or []):
doc_uuid = s["uuid"]
local_rel = f"source/{doc_uuid}.json"
text = json.dumps(s, ensure_ascii=False, separators=(",", ":"))
(proj_dir / local_rel).write_text(text, encoding="utf-8")
doc_metas.append({
"doc_uuid": doc_uuid,
"docType": 1,
"path": local_rel,
"size": len(text.encode("utf-8")),
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(),
})
time.sleep(sleep)
# 4. PCB documents via documents/lists docType=3
pcb_uuids = [b["pcb"] for b in boards if b.get("pcb")]
if pcb_uuids:
pcbs = _pro_post_json(
pro_client,
"https://pro.lceda.cn/api/v2/documents/lists",
project_uuid,
{"uuids": pcb_uuids, "docType": 3},
)
for p in (pcbs or []):
doc_uuid = p["uuid"]
local_rel = f"source/pcb_{doc_uuid}.json"
text = json.dumps(p, ensure_ascii=False, separators=(",", ":"))
(proj_dir / local_rel).write_text(text, encoding="utf-8")
doc_metas.append({
"doc_uuid": doc_uuid,
"docType": 3,
"path": local_rel,
"size": len(text.encode("utf-8")),
"sha256": hashlib.sha256(text.encode("utf-8")).hexdigest(),
})
time.sleep(sleep)
# 5. supplementary PCB layer assets — coppers / textpath / resources (blobs)
aux: dict[str, object] = {}
copper_paths = list((manifest_ticket.get("coppers") or {}).keys())
if copper_paths:
coppers = _pro_post_json(
pro_client,
"https://pro.lceda.cn/api/coppers/search",
project_uuid,
{"paths": copper_paths},
)
(src_dir / "coppers.json").write_text(
json.dumps(coppers, ensure_ascii=False), encoding="utf-8"
)
aux["coppers_count"] = len(coppers) if isinstance(coppers, list) else 0
time.sleep(sleep)
textpath_paths = list((manifest_ticket.get("textpath") or {}).keys())
if textpath_paths:
textpath = _pro_post_json(
pro_client,
"https://pro.lceda.cn/api/textpath/search",
project_uuid,
{
"paths": textpath_paths,
"project_uuid": project_uuid,
"path": project_uuid,
},
)
(src_dir / "textpath.json").write_text(
json.dumps(textpath, ensure_ascii=False), encoding="utf-8"
)
aux["textpath_count"] = len(textpath) if isinstance(textpath, list) else 0
time.sleep(sleep)
blob_hashes = list((manifest_ticket.get("blobs") or {}).keys())
if blob_hashes:
blobs = _pro_post_json(
pro_client,
"https://pro.lceda.cn/api/v2/resources/search",
project_uuid,
{"hash": blob_hashes, "project_uuid": project_uuid},
)
(src_dir / "blobs.json").write_text(
json.dumps(blobs, ensure_ascii=False), encoding="utf-8"
)
aux["blobs_count"] = len(blobs) if isinstance(blobs, list) else 0
time.sleep(sleep)
# 6. manifest.json — overall index
structure_summary = {
"boards": len(boards),
"schematic_containers": len(sch_container_uuids),
"schematic_sheets": len(sheet_uuids),
"pcbs": len(pcb_uuids),
"symbols": len(manifest_ticket.get("symbols") or {}),
"footprints": len(manifest_ticket.get("footprints") or {}),
"devices": len(manifest_ticket.get("devices") or {}),
"coppers": len(manifest_ticket.get("coppers") or {}),
"textpath": len(manifest_ticket.get("textpath") or {}),
"blobs": len(manifest_ticket.get("blobs") or {}),
}
manifest = {
"project_uuid": project_uuid,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"editor_version": project_editor_version,
"boards": boards,
"documents": doc_metas,
"structure_summary": structure_summary,
"aux": aux,
}
(src_dir / "manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
return {
"source_format": "easyeda-pro-legacy",
"source_path": "source/",
"source_documents": doc_metas,
"editor_version": project_editor_version,
}
# ---------------------------------------------------------------------------
# Single-project crawl
# ---------------------------------------------------------------------------
@dc.dataclass
class CrawlResult:
project_id: str
out_dir: Path
files_count: int
bytes_total: int
skipped_files: list[str]
def crawl_one(
client: httpx.Client,
list_item: dict,
out_root: Path,
fetch_files: bool = True,
source_client: httpx.Client | None = None,
pro_source_client: httpx.Client | None = None,
) -> CrawlResult:
uuid = list_item["uuid"]
path = list_item["path"]
proj_dir = out_root / uuid
proj_dir.mkdir(parents=True, exist_ok=True)
# 1. Fetch detail HTML
detail_url = f"{BASE}/{path}"
r = client.get(detail_url)
r.raise_for_status()
detail = parse_detail_html(r.text)
polite_sleep()
# 2. Cover image
thumb_url = list_item["thumb"]
if thumb_url.startswith("//"):
thumb_url = "https:" + thumb_url
cover_rel = None
if thumb_url:
ext = Path(urllib.parse.urlparse(thumb_url).path).suffix or ".jpg"
cover_rel = f"cover{ext}"
try:
download_to(client, thumb_url, proj_dir / cover_rel)
except httpx.HTTPError as e:
print(f" cover failed: {e}", file=sys.stderr)
cover_rel = None
polite_sleep()
# 3. Description markdown (combine meta + introduction)
desc_md_parts = [f"# {list_item['name']}\n"]
if detail.get("description_meta"):
desc_md_parts.append(detail["description_meta"].strip())
elif list_item.get("introduction"):
desc_md_parts.append(list_item["introduction"].strip())
desc_md_parts.append(
f"\n---\n"
f"- Source: {detail_url}\n"
f"- Author: {list_item['owner'].get('nickname')} "
f"({list_item['owner'].get('username')})\n"
f"- License: {detail.get('license') or 'unknown'}\n"
f"- Published: {list_item.get('oshwhub_publish_at')}\n"
)
(proj_dir / "description.md").write_text("\n".join(desc_md_parts), encoding="utf-8")
# 4. Files
files_meta: list[dict] = []
skipped: list[str] = []
bytes_total = 0
for a in detail.get("attachments", []):
src = a.get("src") or ""
if not src:
continue
file_url = IMG_CDN + src if src.startswith("/") else src
name = a.get("name") or Path(src).name
safe_name = re.sub(r'[/\\:*?"<>|]', "_", name)
local_rel = f"files/{safe_name}"
local_path = proj_dir / local_rel
entry: dict = {
"name": name,
"url": file_url,
"original_id": a.get("uuid"),
"ext": a.get("ext"),
"mime": a.get("mime"),
"size": a.get("size"),
"md5": a.get("md5"),
}
if fetch_files:
try:
size, sha = download_to(client, file_url, local_path)
entry["path"] = local_rel
entry["sha256"] = sha
if entry.get("size") and entry["size"] != size:
entry["size_actual"] = size
else:
entry["size"] = size
bytes_total += size
except httpx.HTTPError as e:
skipped.append(f"{name}: {e}")
print(f" file skipped {name}: {e}", file=sys.stderr)
polite_sleep()
files_meta.append(entry)
# 5. URL manifest (for files we couldn't download or for future re-download)
urls_manifest = {
"detail_url": detail_url,
"cover_url": thumb_url,
"attachments": [
{"name": f["name"], "url": f["url"], "original_id": f.get("original_id")}
for f in files_meta
],
}
(proj_dir / "_urls.json").write_text(
json.dumps(urls_manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
# 6. Optional: EasyEDA project source — dispatch on origin (std vs pro)
src_meta: dict = {}
origin = list_item.get("origin")
if origin == "pro" and pro_source_client is not None:
try:
src_meta = fetch_pro_source(pro_source_client, uuid, proj_dir)
print(
f" pro source: {len(src_meta.get('source_documents', []))} docs, "
f"editor={src_meta.get('editor_version')}"
)
except Exception as e: # noqa: BLE001
print(f" pro source FAIL: {e}", file=sys.stderr)
skipped.append(f"pro_source: {e}")
elif origin != "pro" and source_client is not None:
try:
src_meta = fetch_std_source(source_client, uuid, proj_dir)
print(
f" source: {len(src_meta.get('source_documents', []))} docs, "
f"editor={src_meta.get('editor_version')}"
)
except Exception as e: # noqa: BLE001
print(f" source FAIL: {e}", file=sys.stderr)
skipped.append(f"source: {e}")
# 7. Unified metadata
meta = {
"source": "oshwhub",
"source_url": detail_url,
"project_id": uuid,
"title": detail.get("title") or list_item["name"],
"description_short": list_item.get("introduction") or "",
"description_path": "description.md",
"author": {
"username": list_item["owner"]["username"],
"display_name": list_item["owner"].get("nickname"),
"user_id": list_item["owner"].get("uuid"),
},
"license": detail.get("license") or "unknown",
"tags": list_item.get("tags") or [],
"created_at": list_item.get("created_at"),
"updated_at": list_item.get("updated_at"),
"published_at": list_item.get("oshwhub_publish_at"),
"crawled_at": datetime.now(timezone.utc).isoformat(),
"metrics": {
"likes": list_item["count"]["like"],
"stars": list_item["count"]["star"],
"forks": list_item["count"]["fork"],
"views": list_item["count"]["views"],
"watch": list_item["count"].get("watch", 0),
"comments": list_item.get("comments_count", 0),
},
"cover": {"url": thumb_url, "path": cover_rel} if thumb_url else None,
"files": files_meta,
"raw_fields": {
"path": list_item["path"],
"grade": list_item.get("grade"),
"origin": list_item.get("origin"),
"public": list_item.get("public"),
"publish": list_item.get("publish"),
"skipped_files": skipped,
},
}
if src_meta:
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
if src_meta.get("editor_version"):
meta["editor_version"] = src_meta["editor_version"]
(proj_dir / "metadata.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
return CrawlResult(
project_id=uuid,
out_dir=proj_dir,
files_count=len(files_meta),
bytes_total=bytes_total,
skipped_files=skipped,
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def iter_candidates(
client: httpx.Client,
pages: int,
page_size: int,
sort: str,
origin: str | None = None,
) -> Iterator[dict]:
for p in range(1, pages + 1):
res = list_projects(client, page=p, page_size=page_size, sort=sort, origin=origin)
for it in res["lists"]:
yield it
polite_sleep()
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="oshwhub MVP crawler")
ap.add_argument("--out", type=Path, default=Path("data/raw/oshwhub"))
ap.add_argument("--top", type=int, default=10, help="number of projects to crawl")
ap.add_argument("--min-likes", type=int, default=50)
ap.add_argument("--min-grade", type=int, default=4)
ap.add_argument("--pages", type=int, default=3, help="list API pages to scan")
ap.add_argument("--page-size", type=int, default=30)
ap.add_argument("--sort", default="hot")
ap.add_argument(
"--origin",
choices=["std", "pro"],
default=None,
help="filter listing API by origin (server-side); needed to find Pro projects in top-N",
)
ap.add_argument("--uuids", type=str, default=None, help="comma-separated explicit UUID list")
ap.add_argument("--no-files", action="store_true", help="do not download attachments")
ap.add_argument("--limit", type=int, default=None, help="override --top, same effect")
ap.add_argument(
"--with-source",
action="store_true",
help="also fetch EasyEDA Std project source (schematic + PCB dataStr) per project",
)
ap.add_argument(
"--backfill-source",
action="store_true",
help="skip listing/HTML/attachments; only fetch source for projects already in --out",
)
ap.add_argument(
"--with-pro-source",
action="store_true",
help="also fetch EasyEDA Pro project source (full history chain, EPRO2 streams) per project",
)
ap.add_argument(
"--backfill-pro-source",
action="store_true",
help="skip listing; only fetch Pro source for origin=pro projects already in --out",
)
ap.add_argument(
"--pro-cookie",
type=str,
default=PRO_COOKIE_PATH_DEFAULT,
help="path to file with Cookie header for pro.lceda.cn",
)
args = ap.parse_args(argv)
n_target = args.limit if args.limit is not None else args.top
args.out.mkdir(parents=True, exist_ok=True)
# --backfill-source: standalone path that scans existing project dirs and
# only fetches source. No listing/HTML/attachment work.
if args.backfill_source:
return _run_backfill_source(args.out, only_uuids=args.uuids)
if args.backfill_pro_source:
return _run_backfill_pro_source(
args.out, only_uuids=args.uuids, cookie_path=args.pro_cookie
)
with make_client() as client:
# Build list of items to crawl
if args.uuids:
wanted = set(args.uuids.split(","))
items: list[dict] = []
for it in iter_candidates(
client, args.pages, args.page_size, args.sort, origin=args.origin
):
if it["uuid"] in wanted:
items.append(it)
if len(items) == len(wanted):
break
if len(items) < len(wanted):
missing = wanted - {i["uuid"] for i in items}
print(f"WARN: missing uuids (not in top pages): {missing}", file=sys.stderr)
else:
pool = list(
iter_candidates(
client, args.pages, args.page_size, args.sort, origin=args.origin
)
)
items = pick_top(
pool, n=n_target, min_likes=args.min_likes, min_grade=args.min_grade
)
if len(items) < n_target:
print(
f"WARN: only {len(items)} items passed filters "
f"(wanted {n_target})",
file=sys.stderr,
)
source_client_ctx = make_source_client() if args.with_source else None
pro_source_client_ctx = (
make_pro_source_client(args.pro_cookie) if args.with_pro_source else None
)
try:
print(f"Crawling {len(items)} projects -> {args.out}")
for i, it in enumerate(items, 1):
print(f"[{i}/{len(items)}] {it['path']} ({it['name']})")
try:
r = crawl_one(
client,
it,
args.out,
fetch_files=not args.no_files,
source_client=source_client_ctx,
pro_source_client=pro_source_client_ctx,
)
print(
f" OK: {r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB "
f"(skipped: {len(r.skipped_files)})"
)
except Exception as e:
print(f" FAIL: {e}", file=sys.stderr)
finally:
if source_client_ctx is not None:
source_client_ctx.close()
if pro_source_client_ctx is not None:
pro_source_client_ctx.close()
return 0
def _run_backfill_source(out_root: Path, only_uuids: str | None = None) -> int:
"""Walk existing per-project dirs in out_root and fetch source.json into each.
Updates metadata.json in-place to add source_format / source_documents / editor_version.
"""
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
targets: list[Path] = []
for d in sorted(out_root.iterdir()):
if not d.is_dir():
continue
meta_path = d / "metadata.json"
if not meta_path.exists():
continue
if wanted and d.name not in wanted:
continue
targets.append(d)
print(f"Backfill source for {len(targets)} projects under {out_root}")
src_client = make_source_client()
try:
for i, proj_dir in enumerate(targets, 1):
uuid = proj_dir.name
meta_path = proj_dir / "metadata.json"
meta = json.loads(meta_path.read_text(encoding="utf-8"))
print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})")
try:
src_meta = fetch_std_source(src_client, uuid, proj_dir)
except Exception as e: # noqa: BLE001
print(f" FAIL: {e}", file=sys.stderr)
continue
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
if src_meta.get("editor_version"):
meta["editor_version"] = src_meta["editor_version"]
meta_path.write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
total = sum(d["size"] for d in src_meta["source_documents"])
print(
f" OK: {len(src_meta['source_documents'])} docs, "
f"{total / 1024:.1f} KB, editor={src_meta.get('editor_version')}"
)
finally:
src_client.close()
return 0
def _run_backfill_pro_source(
out_root: Path,
only_uuids: str | None = None,
cookie_path: str = PRO_COOKIE_PATH_DEFAULT,
) -> int:
"""Walk per-project dirs in out_root, fetch Pro source for origin=pro projects.
A project is considered Pro by either: existing metadata.json marks
raw_fields.origin == 'pro', OR --uuids was passed and includes this UUID
(caller is asserting Pro).
"""
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
targets: list[Path] = []
for d in sorted(out_root.iterdir()):
if not d.is_dir():
continue
meta_path = d / "metadata.json"
if not meta_path.exists():
continue
if wanted is not None:
if d.name not in wanted:
continue
else:
try:
m = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception: # noqa: BLE001
continue
if (m.get("raw_fields") or {}).get("origin") != "pro":
continue
targets.append(d)
print(f"Backfill pro source for {len(targets)} projects under {out_root}")
pro_client = make_pro_source_client(cookie_path=cookie_path)
try:
for i, proj_dir in enumerate(targets, 1):
uuid = proj_dir.name
meta_path = proj_dir / "metadata.json"
meta = json.loads(meta_path.read_text(encoding="utf-8"))
print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})")
try:
src_meta = fetch_pro_source(pro_client, uuid, proj_dir)
except Exception as e: # noqa: BLE001
print(f" FAIL: {e}", file=sys.stderr)
continue
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
if src_meta.get("editor_version"):
meta["editor_version"] = src_meta["editor_version"]
meta_path.write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
total = sum(d["size"] for d in src_meta["source_documents"])
print(
f" OK: {len(src_meta['source_documents'])} docs, "
f"{total / 1024 / 1024:.1f} MB plain, editor={src_meta.get('editor_version')}"
)
finally:
pro_client.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())