Add tools/epro2 — EPRO2 parser + replay prototype
为 Pro 3.x .epro2 工程源数据写解析骨架,下游做 EPRO2→KiCad 转换器
前的基础设施。在 ESP-VoCat (278 docs / 7.5 MB) + 220V 桌面电源
(771 docs / 26 MB) 端到端跑通,0 parse errors。
模块结构:
tools/epro2/parser.py 单行 → Op:rstrip("|") + split("||") + json.loads
tools/epro2/replay.py state-machine:DOCHEAD 设头;其它 op 按 id 做
upsert(payload=None 当 delete);EDIT_HEAD/
META/CANVAS/PREFERENCE/PANELIZE 当 doc 级单
例存
tools/epro2/__main__.py CLI:传项目目录走 manifest.json 重放每个 doc,
按 docType 聚合输出 + 可选 --dump-doc 看单文
档详情
tools/epro2/tests/ 6 个单测 pin 死 trailing-pipe / 三段消息 /
id-only-no-payload / 嵌入管道符等坑
ESP-VoCat 输出示例:
Documents: 278 (parse_errors=0)
count docType objects ops deletes untyped_ops
105 SYMBOL 4124 4439 0 0
88 DEVICE 88 264 0 0
55 FOOTPRINT 4641 4855 0 0
9 SCH_PAGE 7982 8167 42 0
6 PCB 8428 8547 38 0
6 BOARD 9 18 0 0
6 SCH 9 26 0 0
1 BLOB 4 8 0 0
1 FONT 16 28 0 0
1 CONFIG 2 3 0 0
Top ops: ATTR 7035 / ELE_PLACEHOLDER 4225 / LINE 3005 / LAYER 2318 ...
PCB 文档单 dump 验证语义正确:META 含 title (PCB-EchoEar-CoreBoard-V1_0)
+ board 引用;CANVAS 含 origin/grid/unit (mm);LAYER 1/2/3 = TOP/BOTTOM/
TOP_SILK 配色齐全。
跑法:
uv run python -m tools.epro2 data/raw/oshwhub/<project_uuid>
uv run python -m tools.epro2 data/raw/oshwhub/<uuid> --dump-doc <doc_uuid>
下一步(不在本 commit):
1. 把对象间关系建起来(COMPONENT.partId → PART;LINE.lineGroup → WIRE;
PAD_NET id → PAD + NET 三方关联)—— 当前 replay 只做扁平 dict
2. EPRO2 → KiCad 序列化层(Forge 投影硬门槛)
3. 在 Pro 3.x 三个项目做整体回归(X86 主板 7374 docs 可作压力测试)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
0
tools/__init__.py
Normal file
0
tools/__init__.py
Normal file
17
tools/epro2/__init__.py
Normal file
17
tools/epro2/__init__.py
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
"""EPRO2 parser + replay engine for EasyEDA Pro 3.x project source.
|
||||||
|
|
||||||
|
See docs/sources/easyeda_pro_source.md §3 for the format spec.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .parser import Op, iter_ops, parse_line
|
||||||
|
from .replay import Document, Project, replay_document, replay_project
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"Op",
|
||||||
|
"iter_ops",
|
||||||
|
"parse_line",
|
||||||
|
"Document",
|
||||||
|
"Project",
|
||||||
|
"replay_document",
|
||||||
|
"replay_project",
|
||||||
|
]
|
||||||
93
tools/epro2/__main__.py
Normal file
93
tools/epro2/__main__.py
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
"""CLI: replay every document in a Pro 3.x project and print a summary.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
uv run python -m tools.epro2 data/raw/oshwhub/<project_uuid>
|
||||||
|
uv run python -m tools.epro2 data/raw/oshwhub/<uuid> --dump-doc <doc_uuid>
|
||||||
|
|
||||||
|
Designed for sanity-checking the parser/replay against ESP-VoCat first;
|
||||||
|
later we'll diff replayed state against the editor-rendered ground truth.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from collections import Counter
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .replay import Project, replay_project
|
||||||
|
|
||||||
|
|
||||||
|
def _print_summary(proj: Project) -> None:
|
||||||
|
print(f"Project: {proj.project_uuid}")
|
||||||
|
print(f"Editor version: {proj.editor_version}")
|
||||||
|
print(f"Documents: {len(proj.documents)} (parse_errors={len(proj.parse_errors)})")
|
||||||
|
print()
|
||||||
|
|
||||||
|
by_type = proj.by_doc_type()
|
||||||
|
print(f"{'count':>6} {'docType':<14s} {'objects':>10s} {'ops':>10s} {'deletes':>8s} {'untyped_ops':>11s}")
|
||||||
|
for t in sorted(by_type, key=lambda k: -len(by_type[k])):
|
||||||
|
ds = by_type[t]
|
||||||
|
objs = sum(len(d.objects) for d in ds)
|
||||||
|
ops = sum(d.op_counts.total() for d in ds)
|
||||||
|
dels = sum(d.deletes for d in ds)
|
||||||
|
untyped = sum(d.untyped_ops for d in ds)
|
||||||
|
print(f"{len(ds):>6d} {t:<14s} {objs:>10d} {ops:>10d} {dels:>8d} {untyped:>11d}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print(f"Top 25 op types across project:")
|
||||||
|
for t, n in proj.aggregate_op_counts().most_common(25):
|
||||||
|
print(f" {n:>9d} {t}")
|
||||||
|
|
||||||
|
if proj.parse_errors:
|
||||||
|
print()
|
||||||
|
print(f"Parse errors ({len(proj.parse_errors)}):")
|
||||||
|
for u, e in proj.parse_errors[:10]:
|
||||||
|
print(f" {u[:32]} {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def _dump_doc(proj: Project, doc_uuid: str, n_objects: int = 5) -> None:
|
||||||
|
if doc_uuid not in proj.documents:
|
||||||
|
# try prefix match
|
||||||
|
candidates = [u for u in proj.documents if u.startswith(doc_uuid)]
|
||||||
|
if len(candidates) != 1:
|
||||||
|
print(f" no unique match for {doc_uuid!r} (matches: {candidates[:5]})", file=sys.stderr)
|
||||||
|
return
|
||||||
|
doc_uuid = candidates[0]
|
||||||
|
d = proj.documents[doc_uuid]
|
||||||
|
print()
|
||||||
|
print("=" * 72)
|
||||||
|
print(f"Document: {d.doc_uuid}")
|
||||||
|
print(f"docType: {d.doc_type}")
|
||||||
|
print(f"head: {json.dumps(d.head, ensure_ascii=False)[:200]}")
|
||||||
|
print(f"op_counts (top 15):")
|
||||||
|
for t, n in d.op_counts.most_common(15):
|
||||||
|
print(f" {n:>7d} {t}")
|
||||||
|
print(f"objects: {len(d.objects)} deletes: {d.deletes} untyped_ops: {d.untyped_ops}")
|
||||||
|
if d.objects:
|
||||||
|
print(f"\nFirst {n_objects} objects:")
|
||||||
|
for k, v in list(d.objects.items())[:n_objects]:
|
||||||
|
print(f" {k} → {json.dumps(v, ensure_ascii=False)[:240]}")
|
||||||
|
|
||||||
|
|
||||||
|
def main(argv: list[str] | None = None) -> int:
|
||||||
|
ap = argparse.ArgumentParser(description="Replay an EPRO2 project and summarize.")
|
||||||
|
ap.add_argument("project_dir", type=Path, help="data/raw/oshwhub/<project_uuid>/")
|
||||||
|
ap.add_argument(
|
||||||
|
"--dump-doc",
|
||||||
|
action="append",
|
||||||
|
default=[],
|
||||||
|
help="dump replayed state of one document (uuid or unique prefix); repeatable",
|
||||||
|
)
|
||||||
|
args = ap.parse_args(argv)
|
||||||
|
|
||||||
|
proj = replay_project(args.project_dir)
|
||||||
|
_print_summary(proj)
|
||||||
|
for doc_id in args.dump_doc:
|
||||||
|
_dump_doc(proj, doc_id)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
89
tools/epro2/parser.py
Normal file
89
tools/epro2/parser.py
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
"""EPRO2 line parser.
|
||||||
|
|
||||||
|
EPRO2 is EasyEDA Pro 3.x's event-sourced project source format. After AES-GCM
|
||||||
|
decryption + gunzip (handled by the crawler), each newline-separated line has
|
||||||
|
the shape:
|
||||||
|
|
||||||
|
{"type":"X","ticket":N,"id":"..."}||{payload JSON}||{optional extra}|
|
||||||
|
|
||||||
|
Field separator is ``||``; line terminator is a single trailing ``|`` (NOT a
|
||||||
|
field separator — easy to mis-parse, see docs/sources/easyeda_pro_source.md §3.1).
|
||||||
|
|
||||||
|
This module only does line-level parsing (raw → ``Op``). State semantics
|
||||||
|
(create / update / delete) live in ``replay.py``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class Op:
|
||||||
|
"""A single EPRO2 message after raw parsing."""
|
||||||
|
|
||||||
|
type: str
|
||||||
|
ticket: int | None
|
||||||
|
id: str | None
|
||||||
|
payload: dict | None
|
||||||
|
extra: dict | None
|
||||||
|
raw: bytes # original line, for debugging / round-trip
|
||||||
|
|
||||||
|
|
||||||
|
class Epro2ParseError(ValueError):
|
||||||
|
"""Raised when a line cannot be parsed."""
|
||||||
|
|
||||||
|
|
||||||
|
def parse_line(ln: bytes) -> Op:
|
||||||
|
"""Parse one EPRO2 line. Raises ``Epro2ParseError`` on a malformed head."""
|
||||||
|
stripped = ln.strip().rstrip(b"|")
|
||||||
|
if not stripped:
|
||||||
|
raise Epro2ParseError("empty line")
|
||||||
|
parts = stripped.split(b"||")
|
||||||
|
try:
|
||||||
|
head = json.loads(parts[0])
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
raise Epro2ParseError(
|
||||||
|
f"bad head JSON at byte {e.pos}: {parts[0][:160]!r}"
|
||||||
|
) from e
|
||||||
|
payload = _maybe_json(parts[1]) if len(parts) >= 2 else None
|
||||||
|
extra = _maybe_json(parts[2]) if len(parts) >= 3 else None
|
||||||
|
return Op(
|
||||||
|
type=str(head.get("type", "?")),
|
||||||
|
ticket=head.get("ticket"),
|
||||||
|
id=head.get("id"),
|
||||||
|
payload=payload if isinstance(payload, dict) else None,
|
||||||
|
extra=extra if isinstance(extra, dict) else None,
|
||||||
|
raw=ln,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _maybe_json(b: bytes) -> object | None:
|
||||||
|
"""JSON-decode if non-empty; tolerate malformed payloads (return None)."""
|
||||||
|
if not b:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return json.loads(b)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def iter_ops(path: Path | str) -> Iterator[Op]:
|
||||||
|
"""Yield ``Op`` records from a ``.epro2`` file.
|
||||||
|
|
||||||
|
Lines that fail to parse are skipped; structural failures (file not found,
|
||||||
|
encoding error) propagate.
|
||||||
|
"""
|
||||||
|
p = Path(path)
|
||||||
|
with p.open("rb") as f:
|
||||||
|
for ln in f:
|
||||||
|
ln = ln.rstrip(b"\n")
|
||||||
|
if not ln.strip():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
yield parse_line(ln)
|
||||||
|
except Epro2ParseError:
|
||||||
|
continue
|
||||||
122
tools/epro2/replay.py
Normal file
122
tools/epro2/replay.py
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
"""EPRO2 state-machine replay.
|
||||||
|
|
||||||
|
Each ``.epro2`` file is a per-document op stream (already partitioned by
|
||||||
|
DOCHEAD during crawl). Replaying that stream yields the document's final
|
||||||
|
state — a dict keyed by object id with the latest payload.
|
||||||
|
|
||||||
|
This is a *prototype*. Semantics intentionally minimal:
|
||||||
|
- DOCHEAD sets the document head (docType, uuid, editVersion, ...).
|
||||||
|
- Any other op with an ``id`` upserts ``objects[id]`` with its payload.
|
||||||
|
- A ``null`` / missing payload on a normally-payloaded op is treated as
|
||||||
|
a deletion. (Empirically uncommon — flagged for review when seen.)
|
||||||
|
- We do *not* yet model relationships (lineGroup → WIRE, NET ↔ PAD_NET, ...).
|
||||||
|
Those belong in a higher-level translator, not the raw replay.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from collections import Counter
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from .parser import Op, iter_ops
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Document:
|
||||||
|
"""Replayed state of a single Pro document."""
|
||||||
|
|
||||||
|
doc_uuid: str
|
||||||
|
doc_type: str | None = None
|
||||||
|
head: dict = field(default_factory=dict)
|
||||||
|
objects: dict[str, dict] = field(default_factory=dict)
|
||||||
|
op_counts: Counter[str] = field(default_factory=Counter)
|
||||||
|
deletes: int = 0
|
||||||
|
untyped_ops: int = 0 # ops with no `id` and not DOCHEAD/EDIT_HEAD/META/CANVAS
|
||||||
|
|
||||||
|
def apply(self, op: Op) -> None:
|
||||||
|
self.op_counts[op.type] += 1
|
||||||
|
|
||||||
|
if op.type == "DOCHEAD":
|
||||||
|
if op.payload:
|
||||||
|
self.head = op.payload
|
||||||
|
self.doc_type = op.payload.get("docType")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Document-level singletons that don't have an `id` field.
|
||||||
|
if op.type in {"EDIT_HEAD", "META", "CANVAS", "PREFERENCE", "PANELIZE"}:
|
||||||
|
if op.payload is not None:
|
||||||
|
self.objects[op.type] = {"_type": op.type, **op.payload}
|
||||||
|
return
|
||||||
|
|
||||||
|
if op.id is None:
|
||||||
|
# Op carries no addressable id — keep a tally so we know if our
|
||||||
|
# model is missing a category.
|
||||||
|
self.untyped_ops += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
if op.payload is None:
|
||||||
|
# Empty payload on an id-keyed op — treat as deletion.
|
||||||
|
if op.id in self.objects:
|
||||||
|
del self.objects[op.id]
|
||||||
|
self.deletes += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
self.objects[op.id] = {"_type": op.type, **op.payload}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Project:
|
||||||
|
"""Replayed state across all documents of a project."""
|
||||||
|
|
||||||
|
project_uuid: str
|
||||||
|
editor_version: str | None = None
|
||||||
|
documents: dict[str, Document] = field(default_factory=dict)
|
||||||
|
parse_errors: list[tuple[str, str]] = field(default_factory=list)
|
||||||
|
|
||||||
|
def by_doc_type(self) -> dict[str, list[Document]]:
|
||||||
|
out: dict[str, list[Document]] = {}
|
||||||
|
for d in self.documents.values():
|
||||||
|
out.setdefault(d.doc_type or "?", []).append(d)
|
||||||
|
return out
|
||||||
|
|
||||||
|
def aggregate_op_counts(self) -> Counter[str]:
|
||||||
|
agg: Counter[str] = Counter()
|
||||||
|
for d in self.documents.values():
|
||||||
|
agg += d.op_counts
|
||||||
|
return agg
|
||||||
|
|
||||||
|
|
||||||
|
def replay_document(epro2_path: Path | str) -> Document:
|
||||||
|
"""Replay a single ``.epro2`` file. Document UUID is taken from filename."""
|
||||||
|
p = Path(epro2_path)
|
||||||
|
doc_uuid = p.stem # crawler writes <doc_uuid>.epro2
|
||||||
|
d = Document(doc_uuid=doc_uuid)
|
||||||
|
for op in iter_ops(p):
|
||||||
|
d.apply(op)
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
def replay_project(project_dir: Path | str) -> Project:
|
||||||
|
"""Replay every document under ``<project_dir>/source/`` per its manifest.json."""
|
||||||
|
pdir = Path(project_dir)
|
||||||
|
src = pdir / "source"
|
||||||
|
manifest_path = src / "manifest.json"
|
||||||
|
if not manifest_path.exists():
|
||||||
|
raise FileNotFoundError(f"missing {manifest_path}")
|
||||||
|
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
proj = Project(
|
||||||
|
project_uuid=manifest["project_uuid"],
|
||||||
|
editor_version=manifest.get("editor_version"),
|
||||||
|
)
|
||||||
|
for entry in manifest.get("documents", []):
|
||||||
|
epro2_path = pdir / entry["path"]
|
||||||
|
try:
|
||||||
|
d = replay_document(epro2_path)
|
||||||
|
except Exception as e: # noqa: BLE001 — surface as parse_errors
|
||||||
|
proj.parse_errors.append((entry["doc_uuid"], f"{type(e).__name__}: {e}"))
|
||||||
|
continue
|
||||||
|
proj.documents[d.doc_uuid] = d
|
||||||
|
return proj
|
||||||
0
tools/epro2/tests/__init__.py
Normal file
0
tools/epro2/tests/__init__.py
Normal file
56
tools/epro2/tests/test_parser.py
Normal file
56
tools/epro2/tests/test_parser.py
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
"""Parser regression tests — pin down the ``rstrip("|")`` invariant that took
|
||||||
|
us two debugging rounds to find (see docs/sources/easyeda_pro_source.md §3.1).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from tools.epro2.parser import parse_line, Epro2ParseError
|
||||||
|
|
||||||
|
|
||||||
|
def test_dochead_with_trailing_pipe():
|
||||||
|
ln = b'{"type":"DOCHEAD","ticket":2}||{"docType":"BOARD","uuid":"35086b7d90787675","editVersion":"3.2.127"}|'
|
||||||
|
op = parse_line(ln)
|
||||||
|
assert op.type == "DOCHEAD"
|
||||||
|
assert op.ticket == 2
|
||||||
|
assert op.payload["docType"] == "BOARD"
|
||||||
|
assert op.payload["uuid"] == "35086b7d90787675"
|
||||||
|
assert op.payload["editVersion"] == "3.2.127"
|
||||||
|
|
||||||
|
|
||||||
|
def test_three_part_message():
|
||||||
|
"""Some types carry an `extra` third field after the second `||`."""
|
||||||
|
ln = b'{"type":"WIRE","ticket":1009,"id":"e3514"}||{"groupId":""}||{"meta":1}|'
|
||||||
|
op = parse_line(ln)
|
||||||
|
assert op.type == "WIRE"
|
||||||
|
assert op.id == "e3514"
|
||||||
|
assert op.payload == {"groupId": ""}
|
||||||
|
assert op.extra == {"meta": 1}
|
||||||
|
|
||||||
|
|
||||||
|
def test_id_only_no_payload_yields_none_payload():
|
||||||
|
ln = b'{"type":"COMPONENT","ticket":7,"id":"e1"}|'
|
||||||
|
op = parse_line(ln)
|
||||||
|
assert op.type == "COMPONENT"
|
||||||
|
assert op.id == "e1"
|
||||||
|
assert op.payload is None # downstream replay treats this as a delete
|
||||||
|
|
||||||
|
|
||||||
|
def test_bad_head_raises():
|
||||||
|
try:
|
||||||
|
parse_line(b"not json||{}|")
|
||||||
|
except Epro2ParseError:
|
||||||
|
return
|
||||||
|
raise AssertionError("expected Epro2ParseError")
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_line_raises():
|
||||||
|
try:
|
||||||
|
parse_line(b"|")
|
||||||
|
except Epro2ParseError:
|
||||||
|
return
|
||||||
|
raise AssertionError("expected Epro2ParseError")
|
||||||
|
|
||||||
|
|
||||||
|
def test_payload_with_embedded_pipes():
|
||||||
|
"""An embedded `|` inside a JSON string must NOT be treated as a separator."""
|
||||||
|
ln = b'{"type":"ATTR","ticket":3,"id":"e1"}||{"key":"Symbol","value":"a|b|c"}|'
|
||||||
|
op = parse_line(ln)
|
||||||
|
assert op.payload["value"] == "a|b|c"
|
||||||
Reference in New Issue
Block a user