diff --git a/tools/epro2/__init__.py b/tools/epro2/__init__.py index 1c52f61..e858548 100644 --- a/tools/epro2/__init__.py +++ b/tools/epro2/__init__.py @@ -4,6 +4,7 @@ See docs/sources/easyeda_pro_source.md §3 for the format spec. """ from .parser import Op, iter_ops, parse_line +from .relations import Relations, parse_composite_id from .replay import Document, Project, replay_document, replay_project __all__ = [ @@ -14,4 +15,6 @@ __all__ = [ "Project", "replay_document", "replay_project", + "Relations", + "parse_composite_id", ] diff --git a/tools/epro2/__main__.py b/tools/epro2/__main__.py index ce71ad8..810561a 100644 --- a/tools/epro2/__main__.py +++ b/tools/epro2/__main__.py @@ -16,6 +16,7 @@ import sys from collections import Counter from pathlib import Path +from .relations import Relations from .replay import Project, replay_project @@ -71,6 +72,46 @@ def _dump_doc(proj: Project, doc_uuid: str, n_objects: int = 5) -> None: print(f" {k} → {json.dumps(v, ensure_ascii=False)[:240]}") +def _print_relations(proj: Project) -> None: + """Per-doc Relations summary aggregated across the project.""" + print() + print("=" * 72) + print("Relations (per docType, summed)") + print("-" * 72) + + # group docs by docType, build relations, sum stats + aggregated: dict[str, Counter[str]] = {} + samples: dict[str, str] = {} # docType → first doc_uuid (for --dump-relations) + for d in proj.documents.values(): + rel = Relations.build(d) + agg = aggregated.setdefault(d.doc_type or "?", Counter()) + for k, v in rel.summary().items(): + agg[k] += v + samples.setdefault(d.doc_type or "?", d.doc_uuid) + + if not aggregated: + print(" (no documents)") + return + + # ordered by doc count desc + order = sorted( + aggregated, + key=lambda t: -sum(1 for d in proj.documents.values() if (d.doc_type or "?") == t), + ) + cols = [ + "parts", "components", "pins", "pads", "wires", "nets", "layers", "rules", + "lines_grouped", "attrs_attached", "pad_nets", + "unresolved_parents", "unresolved_wires", "unresolved_layers", + ] + print(f" {'docType':<12s} " + " ".join(f"{c:>16s}" for c in cols)) + for t in order: + row = aggregated[t] + print( + f" {t:<12s} " + + " ".join(f"{row.get(c, 0):>16d}" for c in cols) + ) + + def main(argv: list[str] | None = None) -> int: ap = argparse.ArgumentParser(description="Replay an EPRO2 project and summarize.") ap.add_argument("project_dir", type=Path, help="data/raw/oshwhub//") @@ -80,12 +121,19 @@ def main(argv: list[str] | None = None) -> int: default=[], help="dump replayed state of one document (uuid or unique prefix); repeatable", ) + ap.add_argument( + "--relations", + action="store_true", + help="build cross-object indices and print per-docType summary", + ) args = ap.parse_args(argv) proj = replay_project(args.project_dir) _print_summary(proj) for doc_id in args.dump_doc: _dump_doc(proj, doc_id) + if args.relations: + _print_relations(proj) return 0 diff --git a/tools/epro2/relations.py b/tools/epro2/relations.py new file mode 100644 index 0000000..c5ab7da --- /dev/null +++ b/tools/epro2/relations.py @@ -0,0 +1,271 @@ +"""Build cross-object relationship indices from a replayed Document. + +After ``replay.Document`` flattens the EPRO2 stream into ``objects[id] -> payload``, +this module walks those payloads to build the secondary indices needed for +downstream translation (KiCad export, graph extraction, etc). + +Relationships modeled (empirically — see docs/sources/easyeda_pro_source.md §3 ++ probe results 2026-04-28 on ESP-VoCat): + + PART --(id, dotted name)--> primitives via primitive.partId (lib/parts) + COMPONENT --(.partId)--> PART (sch) or footprint via ATTR (pcb) + ATTR --(.parentId)--> COMPONENT or PART (key/value annotations) + LINE --(.lineGroup)--> WIRE (sch wire segments) + PAD_NET[id=["PAD_NET",comp,pin,pad]] --(.padNet)--> NET[id=["NET",name]] + any obj --(.layerId)--> LAYER[id=["LAYER",N]] (pcb) + any obj --(.netName)--> NET (pcb) + +Composite IDs (e.g. ``'["LAYER",1]'``) are emitted by the editor as JSON +serialized arrays. We parse them lazily — see ``parse_composite_id``. +""" + +from __future__ import annotations + +import json +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Any + +from .replay import Document + + +def parse_composite_id(s: str) -> list | None: + """Best-effort decode an id field that's a serialized JSON array. + + Returns the list if the string looks like JSON array, else None. + """ + if not isinstance(s, str) or not s.startswith("["): + return None + try: + v = json.loads(s) + except json.JSONDecodeError: + return None + return v if isinstance(v, list) else None + + +def _resolve_parent(parent_id: str, doc: Document) -> bool: + """Check whether ``parent_id`` references something we know about. + + Accepts: + - direct hit on ``doc.objects`` (any _type — COMPONENT/WIRE/PART/PAD/PIN/...) + - compound ``-`` where ```` resolves to a doc object + (used for "component+pin" addressing in schematic ATTR ops) + """ + if parent_id in doc.objects: + return True + if "-" in parent_id: + head = parent_id.split("-", 1)[0] + if head in doc.objects: + return True + return False + + +@dataclass +class Relations: + """Indices built from one ``Document``. Cheap to (re)build. + + Lookup conventions: + - "by_id" maps a primitive's id to its payload. + - "by_" maps the value at to a list of object ids referencing it. + - composite-keyed maps use the parsed tuple as key (e.g. layer int). + """ + + doc: Document + + # Primitive collections by type ---------------------------------------- + parts: dict[str, dict] = field(default_factory=dict) # PART.id (dotted) → payload + components: dict[str, dict] = field(default_factory=dict) # COMPONENT.id → payload + pins: dict[str, dict] = field(default_factory=dict) # PIN.id → payload + pads: dict[str, dict] = field(default_factory=dict) # PAD.id → payload + wires: dict[str, dict] = field(default_factory=dict) # WIRE.id → payload + nets: dict[str, dict] = field(default_factory=dict) # NET name → payload + layers: dict[int, dict] = field(default_factory=dict) # LAYER int → payload + rules: dict[tuple, dict] = field(default_factory=dict) # ("RULE", ...) tuple → payload + + # Cross-references ----------------------------------------------------- + obj_ids_by_part: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list)) + """partId (dotted name OR `pid...` prefix) → object ids referencing it.""" + + components_by_part: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list)) + """partId → component ids whose COMPONENT.partId == this.""" + + attrs_by_parent: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list)) + """parentId → ATTR ids attached.""" + + lines_by_wire: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list)) + """WIRE.id → LINE ids whose lineGroup == this.""" + + pad_nets_by_pad: dict[str, list[dict]] = field(default_factory=lambda: defaultdict(list)) + """PAD.id → [{comp, pin, net_name, padNet_payload}, ...].""" + + pad_nets_by_net: dict[str, list[dict]] = field(default_factory=lambda: defaultdict(list)) + """net_name (from PAD_NET.padNet) → [{comp, pin, pad}, ...].""" + + objects_on_layer: dict[int, list[str]] = field(default_factory=lambda: defaultdict(list)) + """layer int → object ids whose payload.layerId == this.""" + + objects_in_net: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list)) + """net name (payload.netName) → object ids.""" + + # Diagnostics ---------------------------------------------------------- + unresolved_parents: int = 0 # ATTR.parentId points to nothing in components/parts/pads + unresolved_wires: int = 0 # LINE.lineGroup points to nothing in wires + unresolved_layers: int = 0 # payload.layerId points to nothing in layers (pcb only) + bad_composite_ids: int = 0 + + # ---------------------------------------------------------------------- + + @classmethod + def build(cls, doc: Document) -> "Relations": + rel = cls(doc=doc) + + # First pass: bucket primitives by type, parse composite ids. + for obj_id, payload in doc.objects.items(): + t = payload.get("_type") + + if t == "PART": + # PART payload uses head.id as its key (e.g. "0.96_inch_lcd.1"). + # In our replay, doc.objects[obj_id] has _type=PART; obj_id IS the part id. + rel.parts[obj_id] = payload + elif t == "COMPONENT": + rel.components[obj_id] = payload + if part_ref := payload.get("partId"): + rel.components_by_part[str(part_ref)].append(obj_id) + elif t == "PIN": + rel.pins[obj_id] = payload + elif t == "PAD": + rel.pads[obj_id] = payload + elif t == "WIRE": + rel.wires[obj_id] = payload + elif t == "NET": + # NET id is `["NET", ""]` + comp = parse_composite_id(obj_id) + if comp and len(comp) >= 2 and comp[0] == "NET": + rel.nets[str(comp[1])] = payload + else: + rel.bad_composite_ids += 1 + elif t == "LAYER": + # LAYER id is `["LAYER", ]` + comp = parse_composite_id(obj_id) + if comp and len(comp) >= 2 and comp[0] == "LAYER": + try: + rel.layers[int(comp[1])] = payload + except (TypeError, ValueError): + rel.bad_composite_ids += 1 + else: + rel.bad_composite_ids += 1 + elif t == "RULE": + comp = parse_composite_id(obj_id) + if comp and comp[0] == "RULE": + rel.rules[tuple(comp)] = payload + else: + rel.bad_composite_ids += 1 + elif t == "PAD_NET": + # id is `["PAD_NET", , , ]` + # payload.padNet = "" + comp = parse_composite_id(obj_id) + if comp and len(comp) >= 4 and comp[0] == "PAD_NET": + _, c_id, pin_num, pad_id = comp[0], str(comp[1]), str(comp[2]), str(comp[3]) + net_name = payload.get("padNet") + record = { + "comp": c_id, + "pin": pin_num, + "pad": pad_id, + "net_name": net_name, + "payload": payload, + } + rel.pad_nets_by_pad[pad_id].append(record) + if net_name: + rel.pad_nets_by_net[str(net_name)].append(record) + else: + rel.bad_composite_ids += 1 + + # Second pass: cross-references that need full primitive maps available. + for obj_id, payload in doc.objects.items(): + t = payload.get("_type") + + # partId fan-in (not just COMPONENTs — RECT/TEXT/PIN inside SYMBOL/FOOTPRINT + # all carry partId pointing at their containing PART) + if (part_ref := payload.get("partId")) and t != "COMPONENT": + rel.obj_ids_by_part[str(part_ref)].append(obj_id) + + # ATTR → parent. parentId may target any addressable object in the doc + # (COMPONENT / WIRE / PART / PAD / PIN), or a compound `-` form + # where is a component and is its pin/sub-ref. + if t == "ATTR": + if parent := payload.get("parentId"): + parent_str = str(parent) + rel.attrs_by_parent[parent_str].append(obj_id) + if not _resolve_parent(parent_str, doc): + rel.unresolved_parents += 1 + + # LINE → wire + if t == "LINE": + if wire_ref := payload.get("lineGroup"): + rel.lines_by_wire[str(wire_ref)].append(obj_id) + if wire_ref not in rel.wires: + rel.unresolved_wires += 1 + + # any obj on layer + if (lid := payload.get("layerId")) is not None: + try: + lid_int = int(lid) + rel.objects_on_layer[lid_int].append(obj_id) + if lid_int not in rel.layers: + rel.unresolved_layers += 1 + except (TypeError, ValueError): + pass + + # any obj in net + if net_name := payload.get("netName"): + rel.objects_in_net[str(net_name)].append(obj_id) + + return rel + + # Accessor helpers ----------------------------------------------------- + + def part_for_component(self, comp_id: str) -> dict | None: + """Return the PART payload for a COMPONENT, if resolvable. + + In schematic context, COMPONENT.partId is a `pid...` prefix string that + does NOT match PART.id directly — the editor resolves it via library + cache. We try a best-effort match on the raw partId; callers handle None. + """ + comp = self.components.get(comp_id) + if not comp: + return None + return self.parts.get(str(comp.get("partId", ""))) + + def attrs_dict(self, parent_id: str) -> dict[str, Any]: + """Convenience: collapse all ATTR ops with parentId == ``parent_id`` into a + flat ``{key: value}`` dict. Last write wins on duplicate keys. + """ + out: dict[str, Any] = {} + for attr_id in self.attrs_by_parent.get(parent_id, []): + payload = self.doc.objects.get(attr_id) or {} + k = payload.get("key") + if k is not None: + out[str(k)] = payload.get("value") + return out + + def summary(self) -> dict[str, int]: + """Stats for CLI / tests / sanity checks.""" + return { + "parts": len(self.parts), + "components": len(self.components), + "pins": len(self.pins), + "pads": len(self.pads), + "wires": len(self.wires), + "nets": len(self.nets), + "layers": len(self.layers), + "rules": len(self.rules), + "lines_grouped": sum(len(v) for v in self.lines_by_wire.values()), + "attrs_attached": sum(len(v) for v in self.attrs_by_parent.values()), + "pad_nets": sum(len(v) for v in self.pad_nets_by_pad.values()), + "objects_on_layer": sum(len(v) for v in self.objects_on_layer.values()), + "objects_in_net": sum(len(v) for v in self.objects_in_net.values()), + "unresolved_parents": self.unresolved_parents, + "unresolved_wires": self.unresolved_wires, + "unresolved_layers": self.unresolved_layers, + "bad_composite_ids": self.bad_composite_ids, + } diff --git a/tools/epro2/tests/test_relations.py b/tools/epro2/tests/test_relations.py new file mode 100644 index 0000000..b70436c --- /dev/null +++ b/tools/epro2/tests/test_relations.py @@ -0,0 +1,126 @@ +"""Relations builder regression tests. + +These run a tiny synthetic Document through Relations.build to exercise: + - composite-id parsing for NET / LAYER / PAD_NET / RULE + - LINE.lineGroup → WIRE indexing + - ATTR.parentId resolution (direct + compound `-` form) + - cross-references on partId / netName / layerId +""" + +from tools.epro2.relations import Relations, parse_composite_id +from tools.epro2.replay import Document + + +def _doc(obj_pairs): + """Build a Document with given (id, payload) entries; payload _type required.""" + d = Document(doc_uuid="test", doc_type="PCB") + for oid, payload in obj_pairs: + d.objects[oid] = payload + return d + + +def test_parse_composite_id_basic(): + assert parse_composite_id('["LAYER",1]') == ["LAYER", 1] + assert parse_composite_id('["NET","+12V"]') == ["NET", "+12V"] + assert parse_composite_id('["PAD_NET","e0","1","e7"]') == ["PAD_NET", "e0", "1", "e7"] + assert parse_composite_id("e1") is None # plain id + assert parse_composite_id("[bad]") is None # malformed JSON + + +def test_layer_and_net_extraction(): + d = _doc([ + ('["LAYER",1]', {"_type": "LAYER", "layerName": "Top Layer"}), + ('["LAYER",2]', {"_type": "LAYER", "layerName": "Bottom Layer"}), + ('["NET","GND"]', {"_type": "NET", "netType": None}), + ]) + rel = Relations.build(d) + assert rel.layers[1]["layerName"] == "Top Layer" + assert rel.layers[2]["layerName"] == "Bottom Layer" + assert "GND" in rel.nets + + +def test_lines_grouped_by_wire(): + d = _doc([ + ("e637", {"_type": "WIRE", "groupId": "", "zIndex": 53}), + ("ln1", {"_type": "LINE", "lineGroup": "e637", "startX": 0, "startY": 0, "endX": 10, "endY": 0}), + ("ln2", {"_type": "LINE", "lineGroup": "e637", "startX": 10, "startY": 0, "endX": 10, "endY": 10}), + ("ln3", {"_type": "LINE", "lineGroup": "e999", "startX": 0, "startY": 0, "endX": 1, "endY": 1}), # orphan + ]) + rel = Relations.build(d) + assert sorted(rel.lines_by_wire["e637"]) == ["ln1", "ln2"] + assert rel.lines_by_wire["e999"] == ["ln3"] + assert rel.unresolved_wires == 1 # ln3's wire 'e999' doesn't exist + + +def test_attr_parent_resolution_direct_and_compound(): + d = _doc([ + ("e1", {"_type": "COMPONENT", "partId": "pidABC", "x": 0, "y": 0}), + ("a1", {"_type": "ATTR", "parentId": "e1", "key": "Designator", "value": "R1"}), + ("a2", {"_type": "ATTR", "parentId": "e1-pin3", "key": "PinName", "value": "VCC"}), + ("a3", {"_type": "ATTR", "parentId": "ghost", "key": "X", "value": "Y"}), # truly orphan + ]) + rel = Relations.build(d) + assert sorted(rel.attrs_by_parent["e1"]) == ["a1"] + assert rel.attrs_by_parent["e1-pin3"] == ["a2"] + assert rel.unresolved_parents == 1 # only `ghost` is fully unresolved + + +def test_pad_net_indexing(): + d = _doc([ + ('["PAD_NET","e0","1","e7"]', {"_type": "PAD_NET", "padNet": "GND"}), + ('["PAD_NET","e0","2","e8"]', {"_type": "PAD_NET", "padNet": "VCC"}), + ('["PAD_NET","e1","1","e7"]', {"_type": "PAD_NET", "padNet": "GND"}), # same pad, diff comp/pin + ]) + rel = Relations.build(d) + # pad e7 is referenced by 2 PAD_NETs (different (comp,pin) pairs) + assert len(rel.pad_nets_by_pad["e7"]) == 2 + # net GND has 2 references; VCC has 1 + assert len(rel.pad_nets_by_net["GND"]) == 2 + assert len(rel.pad_nets_by_net["VCC"]) == 1 + + +def test_attrs_dict_collapse(): + d = _doc([ + ("e1", {"_type": "COMPONENT", "partId": "p"}), + ("a1", {"_type": "ATTR", "parentId": "e1", "key": "Designator", "value": "R1"}), + ("a2", {"_type": "ATTR", "parentId": "e1", "key": "Value", "value": "10kΩ"}), + ("a3", {"_type": "ATTR", "parentId": "e1", "key": "Designator", "value": "R2"}), # last write wins + ]) + rel = Relations.build(d) + flat = rel.attrs_dict("e1") + assert flat == {"Designator": "R2", "Value": "10kΩ"} + + +def test_components_by_part_index(): + d = _doc([ + ("e1", {"_type": "COMPONENT", "partId": "pidA"}), + ("e2", {"_type": "COMPONENT", "partId": "pidA"}), + ("e3", {"_type": "COMPONENT", "partId": "pidB"}), + ]) + rel = Relations.build(d) + assert sorted(rel.components_by_part["pidA"]) == ["e1", "e2"] + assert rel.components_by_part["pidB"] == ["e3"] + + +def test_objects_on_layer_and_in_net(): + d = _doc([ + ('["LAYER",1]', {"_type": "LAYER", "layerName": "Top"}), + ("v1", {"_type": "VIA", "layerId": 1, "netName": "GND"}), + ("v2", {"_type": "VIA", "layerId": 1, "netName": "VCC"}), + ("p1", {"_type": "POLY", "layerId": 99, "netName": "GND"}), # layer 99 doesn't exist → unresolved + ]) + rel = Relations.build(d) + assert sorted(rel.objects_on_layer[1]) == ["v1", "v2"] + assert rel.unresolved_layers == 1 + assert sorted(rel.objects_in_net["GND"]) == ["p1", "v1"] + + +def test_summary_keys_present(): + d = _doc([]) + rel = Relations.build(d) + s = rel.summary() + for key in ("parts", "components", "pins", "pads", "nets", "layers", "rules", + "lines_grouped", "attrs_attached", "pad_nets", + "unresolved_parents", "unresolved_wires", "unresolved_layers", + "bad_composite_ids"): + assert key in s, f"missing summary key: {key}"