diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/epro2/__init__.py b/tools/epro2/__init__.py new file mode 100644 index 0000000..1c52f61 --- /dev/null +++ b/tools/epro2/__init__.py @@ -0,0 +1,17 @@ +"""EPRO2 parser + replay engine for EasyEDA Pro 3.x project source. + +See docs/sources/easyeda_pro_source.md §3 for the format spec. +""" + +from .parser import Op, iter_ops, parse_line +from .replay import Document, Project, replay_document, replay_project + +__all__ = [ + "Op", + "iter_ops", + "parse_line", + "Document", + "Project", + "replay_document", + "replay_project", +] diff --git a/tools/epro2/__main__.py b/tools/epro2/__main__.py new file mode 100644 index 0000000..ce71ad8 --- /dev/null +++ b/tools/epro2/__main__.py @@ -0,0 +1,93 @@ +"""CLI: replay every document in a Pro 3.x project and print a summary. + +Usage: + uv run python -m tools.epro2 data/raw/oshwhub/ + uv run python -m tools.epro2 data/raw/oshwhub/ --dump-doc + +Designed for sanity-checking the parser/replay against ESP-VoCat first; +later we'll diff replayed state against the editor-rendered ground truth. +""" + +from __future__ import annotations + +import argparse +import json +import sys +from collections import Counter +from pathlib import Path + +from .replay import Project, replay_project + + +def _print_summary(proj: Project) -> None: + print(f"Project: {proj.project_uuid}") + print(f"Editor version: {proj.editor_version}") + print(f"Documents: {len(proj.documents)} (parse_errors={len(proj.parse_errors)})") + print() + + by_type = proj.by_doc_type() + print(f"{'count':>6} {'docType':<14s} {'objects':>10s} {'ops':>10s} {'deletes':>8s} {'untyped_ops':>11s}") + for t in sorted(by_type, key=lambda k: -len(by_type[k])): + ds = by_type[t] + objs = sum(len(d.objects) for d in ds) + ops = sum(d.op_counts.total() for d in ds) + dels = sum(d.deletes for d in ds) + untyped = sum(d.untyped_ops for d in ds) + print(f"{len(ds):>6d} {t:<14s} {objs:>10d} {ops:>10d} {dels:>8d} {untyped:>11d}") + print() + + print(f"Top 25 op types across project:") + for t, n in proj.aggregate_op_counts().most_common(25): + print(f" {n:>9d} {t}") + + if proj.parse_errors: + print() + print(f"Parse errors ({len(proj.parse_errors)}):") + for u, e in proj.parse_errors[:10]: + print(f" {u[:32]} {e}") + + +def _dump_doc(proj: Project, doc_uuid: str, n_objects: int = 5) -> None: + if doc_uuid not in proj.documents: + # try prefix match + candidates = [u for u in proj.documents if u.startswith(doc_uuid)] + if len(candidates) != 1: + print(f" no unique match for {doc_uuid!r} (matches: {candidates[:5]})", file=sys.stderr) + return + doc_uuid = candidates[0] + d = proj.documents[doc_uuid] + print() + print("=" * 72) + print(f"Document: {d.doc_uuid}") + print(f"docType: {d.doc_type}") + print(f"head: {json.dumps(d.head, ensure_ascii=False)[:200]}") + print(f"op_counts (top 15):") + for t, n in d.op_counts.most_common(15): + print(f" {n:>7d} {t}") + print(f"objects: {len(d.objects)} deletes: {d.deletes} untyped_ops: {d.untyped_ops}") + if d.objects: + print(f"\nFirst {n_objects} objects:") + for k, v in list(d.objects.items())[:n_objects]: + print(f" {k} → {json.dumps(v, ensure_ascii=False)[:240]}") + + +def main(argv: list[str] | None = None) -> int: + ap = argparse.ArgumentParser(description="Replay an EPRO2 project and summarize.") + ap.add_argument("project_dir", type=Path, help="data/raw/oshwhub//") + ap.add_argument( + "--dump-doc", + action="append", + default=[], + help="dump replayed state of one document (uuid or unique prefix); repeatable", + ) + args = ap.parse_args(argv) + + proj = replay_project(args.project_dir) + _print_summary(proj) + for doc_id in args.dump_doc: + _dump_doc(proj, doc_id) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/epro2/parser.py b/tools/epro2/parser.py new file mode 100644 index 0000000..233a2d9 --- /dev/null +++ b/tools/epro2/parser.py @@ -0,0 +1,89 @@ +"""EPRO2 line parser. + +EPRO2 is EasyEDA Pro 3.x's event-sourced project source format. After AES-GCM +decryption + gunzip (handled by the crawler), each newline-separated line has +the shape: + + {"type":"X","ticket":N,"id":"..."}||{payload JSON}||{optional extra}| + +Field separator is ``||``; line terminator is a single trailing ``|`` (NOT a +field separator — easy to mis-parse, see docs/sources/easyeda_pro_source.md §3.1). + +This module only does line-level parsing (raw → ``Op``). State semantics +(create / update / delete) live in ``replay.py``. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Iterator + + +@dataclass(slots=True) +class Op: + """A single EPRO2 message after raw parsing.""" + + type: str + ticket: int | None + id: str | None + payload: dict | None + extra: dict | None + raw: bytes # original line, for debugging / round-trip + + +class Epro2ParseError(ValueError): + """Raised when a line cannot be parsed.""" + + +def parse_line(ln: bytes) -> Op: + """Parse one EPRO2 line. Raises ``Epro2ParseError`` on a malformed head.""" + stripped = ln.strip().rstrip(b"|") + if not stripped: + raise Epro2ParseError("empty line") + parts = stripped.split(b"||") + try: + head = json.loads(parts[0]) + except json.JSONDecodeError as e: + raise Epro2ParseError( + f"bad head JSON at byte {e.pos}: {parts[0][:160]!r}" + ) from e + payload = _maybe_json(parts[1]) if len(parts) >= 2 else None + extra = _maybe_json(parts[2]) if len(parts) >= 3 else None + return Op( + type=str(head.get("type", "?")), + ticket=head.get("ticket"), + id=head.get("id"), + payload=payload if isinstance(payload, dict) else None, + extra=extra if isinstance(extra, dict) else None, + raw=ln, + ) + + +def _maybe_json(b: bytes) -> object | None: + """JSON-decode if non-empty; tolerate malformed payloads (return None).""" + if not b: + return None + try: + return json.loads(b) + except json.JSONDecodeError: + return None + + +def iter_ops(path: Path | str) -> Iterator[Op]: + """Yield ``Op`` records from a ``.epro2`` file. + + Lines that fail to parse are skipped; structural failures (file not found, + encoding error) propagate. + """ + p = Path(path) + with p.open("rb") as f: + for ln in f: + ln = ln.rstrip(b"\n") + if not ln.strip(): + continue + try: + yield parse_line(ln) + except Epro2ParseError: + continue diff --git a/tools/epro2/replay.py b/tools/epro2/replay.py new file mode 100644 index 0000000..2903d14 --- /dev/null +++ b/tools/epro2/replay.py @@ -0,0 +1,122 @@ +"""EPRO2 state-machine replay. + +Each ``.epro2`` file is a per-document op stream (already partitioned by +DOCHEAD during crawl). Replaying that stream yields the document's final +state — a dict keyed by object id with the latest payload. + +This is a *prototype*. Semantics intentionally minimal: + - DOCHEAD sets the document head (docType, uuid, editVersion, ...). + - Any other op with an ``id`` upserts ``objects[id]`` with its payload. + - A ``null`` / missing payload on a normally-payloaded op is treated as + a deletion. (Empirically uncommon — flagged for review when seen.) + - We do *not* yet model relationships (lineGroup → WIRE, NET ↔ PAD_NET, ...). + Those belong in a higher-level translator, not the raw replay. +""" + +from __future__ import annotations + +import json +from collections import Counter +from dataclasses import dataclass, field +from pathlib import Path + +from .parser import Op, iter_ops + + +@dataclass +class Document: + """Replayed state of a single Pro document.""" + + doc_uuid: str + doc_type: str | None = None + head: dict = field(default_factory=dict) + objects: dict[str, dict] = field(default_factory=dict) + op_counts: Counter[str] = field(default_factory=Counter) + deletes: int = 0 + untyped_ops: int = 0 # ops with no `id` and not DOCHEAD/EDIT_HEAD/META/CANVAS + + def apply(self, op: Op) -> None: + self.op_counts[op.type] += 1 + + if op.type == "DOCHEAD": + if op.payload: + self.head = op.payload + self.doc_type = op.payload.get("docType") + return + + # Document-level singletons that don't have an `id` field. + if op.type in {"EDIT_HEAD", "META", "CANVAS", "PREFERENCE", "PANELIZE"}: + if op.payload is not None: + self.objects[op.type] = {"_type": op.type, **op.payload} + return + + if op.id is None: + # Op carries no addressable id — keep a tally so we know if our + # model is missing a category. + self.untyped_ops += 1 + return + + if op.payload is None: + # Empty payload on an id-keyed op — treat as deletion. + if op.id in self.objects: + del self.objects[op.id] + self.deletes += 1 + return + + self.objects[op.id] = {"_type": op.type, **op.payload} + + +@dataclass +class Project: + """Replayed state across all documents of a project.""" + + project_uuid: str + editor_version: str | None = None + documents: dict[str, Document] = field(default_factory=dict) + parse_errors: list[tuple[str, str]] = field(default_factory=list) + + def by_doc_type(self) -> dict[str, list[Document]]: + out: dict[str, list[Document]] = {} + for d in self.documents.values(): + out.setdefault(d.doc_type or "?", []).append(d) + return out + + def aggregate_op_counts(self) -> Counter[str]: + agg: Counter[str] = Counter() + for d in self.documents.values(): + agg += d.op_counts + return agg + + +def replay_document(epro2_path: Path | str) -> Document: + """Replay a single ``.epro2`` file. Document UUID is taken from filename.""" + p = Path(epro2_path) + doc_uuid = p.stem # crawler writes .epro2 + d = Document(doc_uuid=doc_uuid) + for op in iter_ops(p): + d.apply(op) + return d + + +def replay_project(project_dir: Path | str) -> Project: + """Replay every document under ``/source/`` per its manifest.json.""" + pdir = Path(project_dir) + src = pdir / "source" + manifest_path = src / "manifest.json" + if not manifest_path.exists(): + raise FileNotFoundError(f"missing {manifest_path}") + manifest = json.loads(manifest_path.read_text(encoding="utf-8")) + + proj = Project( + project_uuid=manifest["project_uuid"], + editor_version=manifest.get("editor_version"), + ) + for entry in manifest.get("documents", []): + epro2_path = pdir / entry["path"] + try: + d = replay_document(epro2_path) + except Exception as e: # noqa: BLE001 — surface as parse_errors + proj.parse_errors.append((entry["doc_uuid"], f"{type(e).__name__}: {e}")) + continue + proj.documents[d.doc_uuid] = d + return proj diff --git a/tools/epro2/tests/__init__.py b/tools/epro2/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/epro2/tests/test_parser.py b/tools/epro2/tests/test_parser.py new file mode 100644 index 0000000..5b7314e --- /dev/null +++ b/tools/epro2/tests/test_parser.py @@ -0,0 +1,56 @@ +"""Parser regression tests — pin down the ``rstrip("|")`` invariant that took +us two debugging rounds to find (see docs/sources/easyeda_pro_source.md §3.1). +""" + +from tools.epro2.parser import parse_line, Epro2ParseError + + +def test_dochead_with_trailing_pipe(): + ln = b'{"type":"DOCHEAD","ticket":2}||{"docType":"BOARD","uuid":"35086b7d90787675","editVersion":"3.2.127"}|' + op = parse_line(ln) + assert op.type == "DOCHEAD" + assert op.ticket == 2 + assert op.payload["docType"] == "BOARD" + assert op.payload["uuid"] == "35086b7d90787675" + assert op.payload["editVersion"] == "3.2.127" + + +def test_three_part_message(): + """Some types carry an `extra` third field after the second `||`.""" + ln = b'{"type":"WIRE","ticket":1009,"id":"e3514"}||{"groupId":""}||{"meta":1}|' + op = parse_line(ln) + assert op.type == "WIRE" + assert op.id == "e3514" + assert op.payload == {"groupId": ""} + assert op.extra == {"meta": 1} + + +def test_id_only_no_payload_yields_none_payload(): + ln = b'{"type":"COMPONENT","ticket":7,"id":"e1"}|' + op = parse_line(ln) + assert op.type == "COMPONENT" + assert op.id == "e1" + assert op.payload is None # downstream replay treats this as a delete + + +def test_bad_head_raises(): + try: + parse_line(b"not json||{}|") + except Epro2ParseError: + return + raise AssertionError("expected Epro2ParseError") + + +def test_empty_line_raises(): + try: + parse_line(b"|") + except Epro2ParseError: + return + raise AssertionError("expected Epro2ParseError") + + +def test_payload_with_embedded_pipes(): + """An embedded `|` inside a JSON string must NOT be treated as a separator.""" + ln = b'{"type":"ATTR","ticket":3,"id":"e1"}||{"key":"Symbol","value":"a|b|c"}|' + op = parse_line(ln) + assert op.payload["value"] == "a|b|c"