Add EasyEDA Std project source ingestion (10 boards backfilled)

打通 oshwhub origin=std 项目的工程源(schematic + PCB dataStr)抓取链路。原
plan.md §1.6 假设需要登录,实测 lceda.cn/api/documents/<doc>?uuid=<doc>&path=<doc>
对公开项目匿名可访问 —— 无需 cookie,无账号封禁风险。

调研:4 轮探测留痕在 data/state/std_probe[1-5]/(gitignored);翻 Std 编辑器
v6.5.51 的 main.min.js bundle 找到 ajaxDetail 端点;按 docType 区分两种
响应 shape(schematic 项目视图 vs PCB 文档视图)。

Crawler:
  - make_source_client() 用浏览器 UA + lceda.cn/editor Referer,因为
    oshwhub /api/project/<uuid> 端点拒绝 FacereDataset/0.1 UA(CLAUDE.md
    UA 例外条款:目标站主动封自定义 UA + 公开静态资源)
  - fetch_std_source(): 项目元 → version_documents → 逐文档 dataStr → 落
    source/<doc>.json + source/manifest.json
  - --with-source(爬新项目时一并抓源)/ --backfill-source(仅扫已有)
  - QPS ≤ 0.2 (SLEEP_SOURCE = 5s) 自律

Schema: 加 source_format / source_path / source_documents / editor_version
(前 3 进 enum 锁定,便于后续 Pro / KiCad 源对齐)。

回填结果:10/10 成功,45 个文档,33.2 MB;schema validate 全通。
docTypes 主要是 1 (schematic) 与 3 (pcb);USB 电压电流表只有 PCB 文档(4 个:
主板+盖板+底板+面板,作者未上传原理图源)。

完整调研:docs/sources/easyeda_std_source.md。
This commit is contained in:
2026-04-28 20:07:40 +08:00
parent b0d3afd2a9
commit d874278bc5
71 changed files with 4389 additions and 23 deletions

View File

@@ -29,10 +29,19 @@ from typing import Iterator
import httpx
API_LIST = "https://oshwhub.com/api/project"
API_PROJECT = "https://oshwhub.com/api/project" # /api/project/<uuid> for source flow
BASE = "https://oshwhub.com"
IMG_CDN = "https://image.lceda.cn"
LCEDA_DOC_API = "https://lceda.cn/api/documents"
UA = "FacereDataset/0.1 (+https://git.deepknow.site/Facere/FacereDataset)"
# Std source endpoints reject FacereDataset UA on oshwhub /api/project; spoof browser UA only there.
# See docs/sources/easyeda_std_source.md §3.
BROWSER_UA = (
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/147.0.0.0 Safari/537.36"
)
SLEEP_BETWEEN = 2.0 # seconds between detail-page / file fetches
SLEEP_SOURCE = 5.0 # source fetch is sensitive — QPS ≤ 0.2 per CLAUDE.md登录态 spirit
# ---------------------------------------------------------------------------
@@ -48,6 +57,24 @@ def make_client(timeout: float = 30.0) -> httpx.Client:
)
def make_source_client(timeout: float = 60.0) -> httpx.Client:
"""Client for Std source endpoints (lceda.cn/oshwhub.com /api/...).
Uses browser UA + editor Referer to satisfy server-side UA filter.
"""
return httpx.Client(
http2=True,
timeout=timeout,
headers={
"User-Agent": BROWSER_UA,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://lceda.cn/editor",
},
follow_redirects=False,
)
def polite_sleep() -> None:
time.sleep(SLEEP_BETWEEN)
@@ -193,6 +220,117 @@ def download_to(client: httpx.Client, url: str, dest: Path) -> tuple[int, str]:
return size, h.hexdigest()
# ---------------------------------------------------------------------------
# Std source fetch (login NOT required for public projects — see
# docs/sources/easyeda_std_source.md)
# ---------------------------------------------------------------------------
def fetch_std_source(
source_client: httpx.Client,
project_uuid: str,
proj_dir: Path,
sleep: float = SLEEP_SOURCE,
) -> dict:
"""Fetch EasyEDA Std project source (schematic + PCB dataStr) anonymously.
Returns dict with keys:
- source_format: "easyeda-std"
- source_path: "source/"
- source_documents: list of {doc_uuid, docType, master, path, size, sha256}
- editor_version: from dataStr.head when available
"""
src_dir = proj_dir / "source"
src_dir.mkdir(parents=True, exist_ok=True)
# 1. Project meta → version_documents
r = source_client.get(f"{API_PROJECT}/{project_uuid}")
r.raise_for_status()
j = r.json()
if not j.get("success"):
raise RuntimeError(f"oshwhub project meta failed: {j}")
version_documents = j["result"].get("version_documents") or []
time.sleep(sleep)
# 2. Per document → dataStr
doc_metas: list[dict] = []
editor_version: str | None = None
for vd in version_documents:
doc_uuid = vd["uuid"]
master = vd.get("master")
doc_type = vd.get("docType")
url = f"{LCEDA_DOC_API}/{doc_uuid}"
r2 = source_client.get(url, params={"uuid": doc_uuid, "path": doc_uuid})
r2.raise_for_status()
# Server returns text/html mistakenly; body is JSON regardless.
try:
body_json = r2.json()
except Exception as e: # noqa: BLE001
raise RuntimeError(f"doc {doc_uuid} non-JSON response: {e}; head={r2.text[:200]!r}")
if not body_json.get("success"):
raise RuntimeError(f"doc {doc_uuid} response not success: {body_json}")
local_rel = f"source/{doc_uuid}.json"
local_path = proj_dir / local_rel
text = json.dumps(body_json, ensure_ascii=False, separators=(",", ":"))
local_path.write_text(text, encoding="utf-8")
size = local_path.stat().st_size
sha = hashlib.sha256(text.encode("utf-8")).hexdigest()
# Pull editor version from the dataStr.head if present.
ev = _extract_editor_version(body_json)
if ev and not editor_version:
editor_version = ev
doc_metas.append({
"doc_uuid": doc_uuid,
"docType": doc_type,
"master": master,
"path": local_rel,
"size": size,
"sha256": sha,
})
time.sleep(sleep)
# 3. source/manifest.json — index + raw upstream version_documents for diffing
manifest = {
"project_uuid": project_uuid,
"fetched_at": datetime.now(timezone.utc).isoformat(),
"editor_version": editor_version,
"documents": doc_metas,
"upstream_version_documents": version_documents,
}
(src_dir / "manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
return {
"source_format": "easyeda-std",
"source_path": "source/",
"source_documents": doc_metas,
"editor_version": editor_version,
}
def _extract_editor_version(body_json: dict) -> str | None:
"""Best-effort: pull head.editorVersion from dataStr (location varies by docType)."""
res = body_json.get("result") or {}
# PCB shape: result.dataStr at top
ds = res.get("dataStr")
if isinstance(ds, dict):
head = ds.get("head") or {}
if isinstance(head, dict) and head.get("editorVersion"):
return str(head["editorVersion"])
# Schematic shape: result.schematics[*].dataStr
for sch in (res.get("schematics") or []):
if isinstance(sch, dict):
ds2 = sch.get("dataStr") or {}
if isinstance(ds2, dict):
head = ds2.get("head") or {}
if isinstance(head, dict) and head.get("editorVersion"):
return str(head["editorVersion"])
return None
# ---------------------------------------------------------------------------
# Single-project crawl
# ---------------------------------------------------------------------------
@@ -211,6 +349,7 @@ def crawl_one(
list_item: dict,
out_root: Path,
fetch_files: bool = True,
source_client: httpx.Client | None = None,
) -> CrawlResult:
uuid = list_item["uuid"]
path = list_item["path"]
@@ -307,7 +446,20 @@ def crawl_one(
json.dumps(urls_manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
# 6. Unified metadata
# 6. Optional: EasyEDA Std project source (schematic + PCB dataStr)
src_meta: dict = {}
if source_client is not None:
try:
src_meta = fetch_std_source(source_client, uuid, proj_dir)
print(
f" source: {len(src_meta.get('source_documents', []))} docs, "
f"editor={src_meta.get('editor_version')}"
)
except Exception as e: # noqa: BLE001
print(f" source FAIL: {e}", file=sys.stderr)
skipped.append(f"source: {e}")
# 7. Unified metadata
meta = {
"source": "oshwhub",
"source_url": detail_url,
@@ -345,6 +497,12 @@ def crawl_one(
"skipped_files": skipped,
},
}
if src_meta:
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
if src_meta.get("editor_version"):
meta["editor_version"] = src_meta["editor_version"]
(proj_dir / "metadata.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
@@ -387,11 +545,26 @@ def main(argv: list[str] | None = None) -> int:
ap.add_argument("--uuids", type=str, default=None, help="comma-separated explicit UUID list")
ap.add_argument("--no-files", action="store_true", help="do not download attachments")
ap.add_argument("--limit", type=int, default=None, help="override --top, same effect")
ap.add_argument(
"--with-source",
action="store_true",
help="also fetch EasyEDA Std project source (schematic + PCB dataStr) per project",
)
ap.add_argument(
"--backfill-source",
action="store_true",
help="skip listing/HTML/attachments; only fetch source for projects already in --out",
)
args = ap.parse_args(argv)
n_target = args.limit if args.limit is not None else args.top
args.out.mkdir(parents=True, exist_ok=True)
# --backfill-source: standalone path that scans existing project dirs and
# only fetches source. No listing/HTML/attachment work.
if args.backfill_source:
return _run_backfill_source(args.out, only_uuids=args.uuids)
with make_client() as client:
# Build list of items to crawl
if args.uuids:
@@ -417,20 +590,79 @@ def main(argv: list[str] | None = None) -> int:
file=sys.stderr,
)
print(f"Crawling {len(items)} projects -> {args.out}")
for i, it in enumerate(items, 1):
print(f"[{i}/{len(items)}] {it['path']} ({it['name']})")
try:
r = crawl_one(client, it, args.out, fetch_files=not args.no_files)
print(
f" OK: {r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB "
f"(skipped: {len(r.skipped_files)})"
)
except Exception as e:
print(f" FAIL: {e}", file=sys.stderr)
source_client_ctx = make_source_client() if args.with_source else None
try:
print(f"Crawling {len(items)} projects -> {args.out}")
for i, it in enumerate(items, 1):
print(f"[{i}/{len(items)}] {it['path']} ({it['name']})")
try:
r = crawl_one(
client,
it,
args.out,
fetch_files=not args.no_files,
source_client=source_client_ctx,
)
print(
f" OK: {r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB "
f"(skipped: {len(r.skipped_files)})"
)
except Exception as e:
print(f" FAIL: {e}", file=sys.stderr)
finally:
if source_client_ctx is not None:
source_client_ctx.close()
return 0
def _run_backfill_source(out_root: Path, only_uuids: str | None = None) -> int:
"""Walk existing per-project dirs in out_root and fetch source.json into each.
Updates metadata.json in-place to add source_format / source_documents / editor_version.
"""
wanted: set[str] | None = set(only_uuids.split(",")) if only_uuids else None
targets: list[Path] = []
for d in sorted(out_root.iterdir()):
if not d.is_dir():
continue
meta_path = d / "metadata.json"
if not meta_path.exists():
continue
if wanted and d.name not in wanted:
continue
targets.append(d)
print(f"Backfill source for {len(targets)} projects under {out_root}")
src_client = make_source_client()
try:
for i, proj_dir in enumerate(targets, 1):
uuid = proj_dir.name
meta_path = proj_dir / "metadata.json"
meta = json.loads(meta_path.read_text(encoding="utf-8"))
print(f"[{i}/{len(targets)}] {uuid} ({meta.get('title', '?')})")
try:
src_meta = fetch_std_source(src_client, uuid, proj_dir)
except Exception as e: # noqa: BLE001
print(f" FAIL: {e}", file=sys.stderr)
continue
meta["source_format"] = src_meta["source_format"]
meta["source_path"] = src_meta["source_path"]
meta["source_documents"] = src_meta["source_documents"]
if src_meta.get("editor_version"):
meta["editor_version"] = src_meta["editor_version"]
meta_path.write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
total = sum(d["size"] for d in src_meta["source_documents"])
print(
f" OK: {len(src_meta['source_documents'])} docs, "
f"{total / 1024:.1f} KB, editor={src_meta.get('editor_version')}"
)
finally:
src_client.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())