Phase 1 MVP: crawl 10 high-quality oshwhub projects into LFS

Why:
- Charles 指定:先爬 10 个高质量项目存 Gitea LFS,一个项目一个文件夹,
  保留原文件和 URL。先以小批量验证 schema + LFS 流水线,放量前再拍板
  存储规模。

What:
- crawlers/oshwhub: 列表 API (`/api/project?sort=hot`) + SSR HTML 解析,
  一次性产出 metadata / description / cover / files / _urls
- schemas/project.schema.json: 跨源统一 schema
- docs/sources/oshwhub.md: API 入口 / 字段映射 / 陷阱调研
- pyproject.toml: httpx[http2] 单依赖
- .gitattributes: data/raw/**/files/** 一律走 LFS(规则写窄,避免误伤 schemas/*.json 等)
- .gitignore: 移除 data/raw/* 排除(改走 LFS 入库)

10 个项目覆盖:调试器 / 加热台 / 盖革计数器 / 数控电源 / 焊台 /
智能手表 / USB 测电流 / ZVS 感应加热 / AI 开发板 / 红外热成像。
共 52 附件 ≈ 524 MB 入 LFS,筛选判据 grade=4 & likes>=100 & 多样性。

Known gaps(见 plan.md § Phase 1.4):
- EasyEDA 源 JSON 需登录 (u.lceda.cn),v0.1 跳过
- fs-web-stream.jlc.com 的工程源下载未测
- scripts/validate.py 自动 schema 校验未实现

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Zhang Jiahao
2026-04-23 19:34:09 +08:00
parent bf2370f83b
commit 5ffa10f256
103 changed files with 2279 additions and 28 deletions

View File

View File

@@ -0,0 +1,4 @@
from .crawler import main
if __name__ == "__main__":
raise SystemExit(main())

436
crawlers/oshwhub/crawler.py Normal file
View File

@@ -0,0 +1,436 @@
"""oshwhub.com crawler — MVP.
Usage:
uv run python -m crawlers.oshwhub \
--out data/raw/oshwhub \
--top 10 --min-likes 50 --min-grade 4
Or with explicit UUID list:
uv run python -m crawlers.oshwhub \
--uuids 298873b7fdbe44f8ba0e7351e023bc2c,7b6a398811f14eba9a952b8d2ddd7ace \
--out data/raw/oshwhub
"""
from __future__ import annotations
import argparse
import dataclasses as dc
import hashlib
import html as _html
import json
import re
import sys
import time
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterator
import httpx
API_LIST = "https://oshwhub.com/api/project"
BASE = "https://oshwhub.com"
IMG_CDN = "https://image.lceda.cn"
UA = "FacereDataset/0.1 (+https://git.deepknow.site/Facere/FacereDataset)"
SLEEP_BETWEEN = 2.0 # seconds between detail-page / file fetches
# ---------------------------------------------------------------------------
# HTTP
# ---------------------------------------------------------------------------
def make_client(timeout: float = 30.0) -> httpx.Client:
return httpx.Client(
http2=True,
timeout=timeout,
headers={"User-Agent": UA, "Accept": "text/html,application/json;q=0.9,*/*;q=0.8"},
follow_redirects=True,
)
def polite_sleep() -> None:
time.sleep(SLEEP_BETWEEN)
# ---------------------------------------------------------------------------
# Listing
# ---------------------------------------------------------------------------
def list_projects(
client: httpx.Client,
page: int = 1,
page_size: int = 30,
sort: str = "hot",
) -> dict:
r = client.get(API_LIST, params={"page": page, "pageSize": page_size, "sort": sort})
r.raise_for_status()
data = r.json()
if not data.get("success"):
raise RuntimeError(f"list API failed: {data}")
return data["result"]
def rank_score(item: dict) -> float:
"""Composite quality score: favor projects with broad engagement."""
c = item["count"]
return (
c["like"] * 3
+ c["star"] * 1
+ c["fork"] * 2
+ c["views"] / 100
+ item["comments_count"] * 2
+ (item.get("grade") or 0) * 50
)
def pick_top(
items: list[dict],
n: int,
min_likes: int,
min_grade: int,
exclude_copies: bool = True,
) -> list[dict]:
filtered = []
for it in items:
if exclude_copies and "_copy" in it["path"]:
continue
if it["count"]["like"] < min_likes:
continue
if (it.get("grade") or 0) < min_grade:
continue
filtered.append(it)
filtered.sort(key=rank_score, reverse=True)
return filtered[:n]
# ---------------------------------------------------------------------------
# Detail page parsing
# ---------------------------------------------------------------------------
RE_ATTACH_BLOCK = re.compile(r'\\"attachments\\":\[', re.DOTALL)
RE_LICENSE = re.compile(r'\\"license\\":\\"([^\\"]+)\\"')
RE_META_DESC = re.compile(
r'<meta\s+name="description"\s+content="([^"]*)"', re.IGNORECASE
)
RE_TITLE = re.compile(r"<title>([^<]+)</title>", re.IGNORECASE)
def _find_balanced_bracket(s: str, start: int, open_ch: str = "[", close_ch: str = "]") -> int:
"""Return index after the matching close bracket. start must point at open_ch."""
assert s[start] == open_ch
depth = 0
for i in range(start, len(s)):
ch = s[i]
if ch == open_ch:
depth += 1
elif ch == close_ch:
depth -= 1
if depth == 0:
return i + 1
raise ValueError("unbalanced")
def parse_detail_html(h: str) -> dict:
"""Extract attachments, license, title, description from SSR HTML."""
out: dict = {
"title": None,
"description_meta": None,
"license": None,
"attachments": [],
}
m = RE_TITLE.search(h)
if m:
# HTML entities + suffix stripping
title = _html.unescape(m.group(1)).strip()
for sfx in (
" - 立创开源硬件平台 - 深圳创电优选科技有限公司",
" - 立创开源硬件平台",
):
if title.endswith(sfx):
title = title[: -len(sfx)]
out["title"] = title
m = RE_META_DESC.search(h)
if m:
out["description_meta"] = _html.unescape(m.group(1))
m = RE_LICENSE.search(h)
if m:
out["license"] = m.group(1)
m = RE_ATTACH_BLOCK.search(h)
if m:
arr_start = m.end() - 1 # point at '['
arr_end = _find_balanced_bracket(h, arr_start)
block = h[arr_start:arr_end]
clean = block.replace('\\"', '"').replace("\\\\", "\\")
try:
out["attachments"] = json.loads(clean)
except json.JSONDecodeError as e:
# Keep raw for debugging; skip attachments silently. Caller can log.
out["_attachments_parse_error"] = str(e)
return out
# ---------------------------------------------------------------------------
# Download helpers
# ---------------------------------------------------------------------------
def download_to(client: httpx.Client, url: str, dest: Path) -> tuple[int, str]:
"""Stream-download url to dest. Returns (size, sha256)."""
dest.parent.mkdir(parents=True, exist_ok=True)
h = hashlib.sha256()
size = 0
with client.stream("GET", url) as r:
r.raise_for_status()
with open(dest, "wb") as f:
for chunk in r.iter_bytes(1 << 15):
f.write(chunk)
h.update(chunk)
size += len(chunk)
return size, h.hexdigest()
# ---------------------------------------------------------------------------
# Single-project crawl
# ---------------------------------------------------------------------------
@dc.dataclass
class CrawlResult:
project_id: str
out_dir: Path
files_count: int
bytes_total: int
skipped_files: list[str]
def crawl_one(
client: httpx.Client,
list_item: dict,
out_root: Path,
fetch_files: bool = True,
) -> CrawlResult:
uuid = list_item["uuid"]
path = list_item["path"]
proj_dir = out_root / uuid
proj_dir.mkdir(parents=True, exist_ok=True)
# 1. Fetch detail HTML
detail_url = f"{BASE}/{path}"
r = client.get(detail_url)
r.raise_for_status()
detail = parse_detail_html(r.text)
polite_sleep()
# 2. Cover image
thumb_url = list_item["thumb"]
if thumb_url.startswith("//"):
thumb_url = "https:" + thumb_url
cover_rel = None
if thumb_url:
ext = Path(urllib.parse.urlparse(thumb_url).path).suffix or ".jpg"
cover_rel = f"cover{ext}"
try:
download_to(client, thumb_url, proj_dir / cover_rel)
except httpx.HTTPError as e:
print(f" cover failed: {e}", file=sys.stderr)
cover_rel = None
polite_sleep()
# 3. Description markdown (combine meta + introduction)
desc_md_parts = [f"# {list_item['name']}\n"]
if detail.get("description_meta"):
desc_md_parts.append(detail["description_meta"].strip())
elif list_item.get("introduction"):
desc_md_parts.append(list_item["introduction"].strip())
desc_md_parts.append(
f"\n---\n"
f"- Source: {detail_url}\n"
f"- Author: {list_item['owner'].get('nickname')} "
f"({list_item['owner'].get('username')})\n"
f"- License: {detail.get('license') or 'unknown'}\n"
f"- Published: {list_item.get('oshwhub_publish_at')}\n"
)
(proj_dir / "description.md").write_text("\n".join(desc_md_parts), encoding="utf-8")
# 4. Files
files_meta: list[dict] = []
skipped: list[str] = []
bytes_total = 0
for a in detail.get("attachments", []):
src = a.get("src") or ""
if not src:
continue
file_url = IMG_CDN + src if src.startswith("/") else src
name = a.get("name") or Path(src).name
safe_name = re.sub(r'[/\\:*?"<>|]', "_", name)
local_rel = f"files/{safe_name}"
local_path = proj_dir / local_rel
entry: dict = {
"name": name,
"url": file_url,
"original_id": a.get("uuid"),
"ext": a.get("ext"),
"mime": a.get("mime"),
"size": a.get("size"),
"md5": a.get("md5"),
}
if fetch_files:
try:
size, sha = download_to(client, file_url, local_path)
entry["path"] = local_rel
entry["sha256"] = sha
if entry.get("size") and entry["size"] != size:
entry["size_actual"] = size
else:
entry["size"] = size
bytes_total += size
except httpx.HTTPError as e:
skipped.append(f"{name}: {e}")
print(f" file skipped {name}: {e}", file=sys.stderr)
polite_sleep()
files_meta.append(entry)
# 5. URL manifest (for files we couldn't download or for future re-download)
urls_manifest = {
"detail_url": detail_url,
"cover_url": thumb_url,
"attachments": [
{"name": f["name"], "url": f["url"], "original_id": f.get("original_id")}
for f in files_meta
],
}
(proj_dir / "_urls.json").write_text(
json.dumps(urls_manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
# 6. Unified metadata
meta = {
"source": "oshwhub",
"source_url": detail_url,
"project_id": uuid,
"title": detail.get("title") or list_item["name"],
"description_short": list_item.get("introduction") or "",
"description_path": "description.md",
"author": {
"username": list_item["owner"]["username"],
"display_name": list_item["owner"].get("nickname"),
"user_id": list_item["owner"].get("uuid"),
},
"license": detail.get("license") or "unknown",
"tags": list_item.get("tags") or [],
"created_at": list_item.get("created_at"),
"updated_at": list_item.get("updated_at"),
"published_at": list_item.get("oshwhub_publish_at"),
"crawled_at": datetime.now(timezone.utc).isoformat(),
"metrics": {
"likes": list_item["count"]["like"],
"stars": list_item["count"]["star"],
"forks": list_item["count"]["fork"],
"views": list_item["count"]["views"],
"watch": list_item["count"].get("watch", 0),
"comments": list_item.get("comments_count", 0),
},
"cover": {"url": thumb_url, "path": cover_rel} if thumb_url else None,
"files": files_meta,
"raw_fields": {
"path": list_item["path"],
"grade": list_item.get("grade"),
"origin": list_item.get("origin"),
"public": list_item.get("public"),
"publish": list_item.get("publish"),
"skipped_files": skipped,
},
}
(proj_dir / "metadata.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
)
return CrawlResult(
project_id=uuid,
out_dir=proj_dir,
files_count=len(files_meta),
bytes_total=bytes_total,
skipped_files=skipped,
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def iter_candidates(
client: httpx.Client,
pages: int,
page_size: int,
sort: str,
) -> Iterator[dict]:
for p in range(1, pages + 1):
res = list_projects(client, page=p, page_size=page_size, sort=sort)
for it in res["lists"]:
yield it
polite_sleep()
def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="oshwhub MVP crawler")
ap.add_argument("--out", type=Path, default=Path("data/raw/oshwhub"))
ap.add_argument("--top", type=int, default=10, help="number of projects to crawl")
ap.add_argument("--min-likes", type=int, default=50)
ap.add_argument("--min-grade", type=int, default=4)
ap.add_argument("--pages", type=int, default=3, help="list API pages to scan")
ap.add_argument("--page-size", type=int, default=30)
ap.add_argument("--sort", default="hot")
ap.add_argument("--uuids", type=str, default=None, help="comma-separated explicit UUID list")
ap.add_argument("--no-files", action="store_true", help="do not download attachments")
ap.add_argument("--limit", type=int, default=None, help="override --top, same effect")
args = ap.parse_args(argv)
n_target = args.limit if args.limit is not None else args.top
args.out.mkdir(parents=True, exist_ok=True)
with make_client() as client:
# Build list of items to crawl
if args.uuids:
wanted = set(args.uuids.split(","))
items: list[dict] = []
for it in iter_candidates(client, args.pages, args.page_size, args.sort):
if it["uuid"] in wanted:
items.append(it)
if len(items) == len(wanted):
break
if len(items) < len(wanted):
missing = wanted - {i["uuid"] for i in items}
print(f"WARN: missing uuids (not in top pages): {missing}", file=sys.stderr)
else:
pool = list(iter_candidates(client, args.pages, args.page_size, args.sort))
items = pick_top(
pool, n=n_target, min_likes=args.min_likes, min_grade=args.min_grade
)
if len(items) < n_target:
print(
f"WARN: only {len(items)} items passed filters "
f"(wanted {n_target})",
file=sys.stderr,
)
print(f"Crawling {len(items)} projects -> {args.out}")
for i, it in enumerate(items, 1):
print(f"[{i}/{len(items)}] {it['path']} ({it['name']})")
try:
r = crawl_one(client, it, args.out, fetch_files=not args.no_files)
print(
f" OK: {r.files_count} files, {r.bytes_total / 1024 / 1024:.1f} MB "
f"(skipped: {len(r.skipped_files)})"
)
except Exception as e:
print(f" FAIL: {e}", file=sys.stderr)
return 0
if __name__ == "__main__":
raise SystemExit(main())