tools/epro2: add Relations layer for cross-object navigation

在 replay 的扁平 objects[id] -> payload 之上盖一层 Relations,建索引和
反向引用,把孤立对象拼成可遍历的图,是后续 EPRO2 → KiCad 转换器的
中间表示前置。

Relations.build(doc) 单遍扫所有对象,得到:

主集合(按类型分桶):
  parts / components / pins / pads / wires / nets / layers / rules

复合 ID 解析(关键):
  '["LAYER",1]'                          → layers[1]
  '["NET","GND"]'                        → nets["GND"]
  '["PAD_NET","e0","1","e7"]'            → pad_nets_by_pad/by_net
  '["RULE","SAFE","copperThickness1oz"]' → rules[("RULE","SAFE",...)]

反向引用:
  obj_ids_by_part         partId            → 引用对象 ids(lib 内 RECT/TEXT/PIN 都带 partId)
  components_by_part      partId            → component ids
  attrs_by_parent         parentId          → ATTR ids
  lines_by_wire           WIRE.id           → LINE ids(wire 由若干 LINE 段组成)
  pad_nets_by_pad         PAD.id            → PAD_NET 记录
  pad_nets_by_net         net name          → PAD_NET 记录
  objects_on_layer / objects_in_net  字段反查

便捷 accessor:
  attrs_dict(parent_id)   折叠所有 ATTR ops 到 {key: value} dict(last
                          write wins),KiCad 转换时按 component 拿
                          Designator/Value/Footprint 的常用入口

ATTR.parentId 解析(实测发现的两种坑):
1. 不仅指向 COMPONENT/PART —— 也大量指向 WIRE(schematic 上的网络
   标签 / 网络属性)。原查重函数漏算,636 个 false positive
   unresolved;改为"任意 doc.objects[parentId] 命中即算 resolved"
2. 复合形式 `<comp_id>-<pin_id>` 用于把 ATTR 挂在某 component 的某个
   pin 上(如 PinName)。`_resolve_parent()` 用 split("-",1) 兜底

CLI 加 --relations,按 docType 聚合 stats:
  uv run python -m tools.epro2 data/raw/oshwhub/<uuid> --relations

ESP-VoCat 验证:
  SCH_PAGE 9 docs : 572 components, 563 wires, 934 lines_grouped,
                    4111 attrs_attached, 0 unresolved_parents
  PCB      6 docs : 206 components, 807 pad_nets, 173 nets, 544 layers
  SYMBOL 105 docs : 106 parts, 560 pins, 1680 attrs_attached
  FOOTPRINT 55 docs: 496 pads, 9 nets, 1771 layers, 140 rules

注:PCB 内 pads=6 vs pad_nets=807 不矛盾 —— PAD 实例存在 FOOTPRINT
文档里,PCB stream 用 ["PAD_NET",comp,pin,pad] 复合 id 跨文档引用;
解析"comp 的某 pin 通过哪个 footprint 的哪个 pad"需要 project-级
Relations 聚合(下个 task)。

测试:tools/epro2/tests/test_relations.py 9 个单测覆盖复合 id 解析、
lineGroup 链接、parentId 直/复合解析、partId 反查、attrs 折叠。
parser + relations 共 15/15 通过。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-28 22:17:28 +08:00
parent 3c57e75d51
commit 7f9e2fad73
4 changed files with 448 additions and 0 deletions

View File

@@ -4,6 +4,7 @@ See docs/sources/easyeda_pro_source.md §3 for the format spec.
""" """
from .parser import Op, iter_ops, parse_line from .parser import Op, iter_ops, parse_line
from .relations import Relations, parse_composite_id
from .replay import Document, Project, replay_document, replay_project from .replay import Document, Project, replay_document, replay_project
__all__ = [ __all__ = [
@@ -14,4 +15,6 @@ __all__ = [
"Project", "Project",
"replay_document", "replay_document",
"replay_project", "replay_project",
"Relations",
"parse_composite_id",
] ]

View File

@@ -16,6 +16,7 @@ import sys
from collections import Counter from collections import Counter
from pathlib import Path from pathlib import Path
from .relations import Relations
from .replay import Project, replay_project from .replay import Project, replay_project
@@ -71,6 +72,46 @@ def _dump_doc(proj: Project, doc_uuid: str, n_objects: int = 5) -> None:
print(f" {k}{json.dumps(v, ensure_ascii=False)[:240]}") print(f" {k}{json.dumps(v, ensure_ascii=False)[:240]}")
def _print_relations(proj: Project) -> None:
"""Per-doc Relations summary aggregated across the project."""
print()
print("=" * 72)
print("Relations (per docType, summed)")
print("-" * 72)
# group docs by docType, build relations, sum stats
aggregated: dict[str, Counter[str]] = {}
samples: dict[str, str] = {} # docType → first doc_uuid (for --dump-relations)
for d in proj.documents.values():
rel = Relations.build(d)
agg = aggregated.setdefault(d.doc_type or "?", Counter())
for k, v in rel.summary().items():
agg[k] += v
samples.setdefault(d.doc_type or "?", d.doc_uuid)
if not aggregated:
print(" (no documents)")
return
# ordered by doc count desc
order = sorted(
aggregated,
key=lambda t: -sum(1 for d in proj.documents.values() if (d.doc_type or "?") == t),
)
cols = [
"parts", "components", "pins", "pads", "wires", "nets", "layers", "rules",
"lines_grouped", "attrs_attached", "pad_nets",
"unresolved_parents", "unresolved_wires", "unresolved_layers",
]
print(f" {'docType':<12s} " + " ".join(f"{c:>16s}" for c in cols))
for t in order:
row = aggregated[t]
print(
f" {t:<12s} "
+ " ".join(f"{row.get(c, 0):>16d}" for c in cols)
)
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
ap = argparse.ArgumentParser(description="Replay an EPRO2 project and summarize.") ap = argparse.ArgumentParser(description="Replay an EPRO2 project and summarize.")
ap.add_argument("project_dir", type=Path, help="data/raw/oshwhub/<project_uuid>/") ap.add_argument("project_dir", type=Path, help="data/raw/oshwhub/<project_uuid>/")
@@ -80,12 +121,19 @@ def main(argv: list[str] | None = None) -> int:
default=[], default=[],
help="dump replayed state of one document (uuid or unique prefix); repeatable", help="dump replayed state of one document (uuid or unique prefix); repeatable",
) )
ap.add_argument(
"--relations",
action="store_true",
help="build cross-object indices and print per-docType summary",
)
args = ap.parse_args(argv) args = ap.parse_args(argv)
proj = replay_project(args.project_dir) proj = replay_project(args.project_dir)
_print_summary(proj) _print_summary(proj)
for doc_id in args.dump_doc: for doc_id in args.dump_doc:
_dump_doc(proj, doc_id) _dump_doc(proj, doc_id)
if args.relations:
_print_relations(proj)
return 0 return 0

271
tools/epro2/relations.py Normal file
View File

@@ -0,0 +1,271 @@
"""Build cross-object relationship indices from a replayed Document.
After ``replay.Document`` flattens the EPRO2 stream into ``objects[id] -> payload``,
this module walks those payloads to build the secondary indices needed for
downstream translation (KiCad export, graph extraction, etc).
Relationships modeled (empirically — see docs/sources/easyeda_pro_source.md §3
+ probe results 2026-04-28 on ESP-VoCat):
PART --(id, dotted name)--> primitives via primitive.partId (lib/parts)
COMPONENT --(.partId)--> PART (sch) or footprint via ATTR (pcb)
ATTR --(.parentId)--> COMPONENT or PART (key/value annotations)
LINE --(.lineGroup)--> WIRE (sch wire segments)
PAD_NET[id=["PAD_NET",comp,pin,pad]] --(.padNet)--> NET[id=["NET",name]]
any obj --(.layerId)--> LAYER[id=["LAYER",N]] (pcb)
any obj --(.netName)--> NET (pcb)
Composite IDs (e.g. ``'["LAYER",1]'``) are emitted by the editor as JSON
serialized arrays. We parse them lazily — see ``parse_composite_id``.
"""
from __future__ import annotations
import json
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any
from .replay import Document
def parse_composite_id(s: str) -> list | None:
"""Best-effort decode an id field that's a serialized JSON array.
Returns the list if the string looks like JSON array, else None.
"""
if not isinstance(s, str) or not s.startswith("["):
return None
try:
v = json.loads(s)
except json.JSONDecodeError:
return None
return v if isinstance(v, list) else None
def _resolve_parent(parent_id: str, doc: Document) -> bool:
"""Check whether ``parent_id`` references something we know about.
Accepts:
- direct hit on ``doc.objects`` (any _type — COMPONENT/WIRE/PART/PAD/PIN/...)
- compound ``<a>-<b>`` where ``<a>`` resolves to a doc object
(used for "component+pin" addressing in schematic ATTR ops)
"""
if parent_id in doc.objects:
return True
if "-" in parent_id:
head = parent_id.split("-", 1)[0]
if head in doc.objects:
return True
return False
@dataclass
class Relations:
"""Indices built from one ``Document``. Cheap to (re)build.
Lookup conventions:
- "by_id" maps a primitive's id to its payload.
- "by_<key>" maps the value at <key> to a list of object ids referencing it.
- composite-keyed maps use the parsed tuple as key (e.g. layer int).
"""
doc: Document
# Primitive collections by type ----------------------------------------
parts: dict[str, dict] = field(default_factory=dict) # PART.id (dotted) → payload
components: dict[str, dict] = field(default_factory=dict) # COMPONENT.id → payload
pins: dict[str, dict] = field(default_factory=dict) # PIN.id → payload
pads: dict[str, dict] = field(default_factory=dict) # PAD.id → payload
wires: dict[str, dict] = field(default_factory=dict) # WIRE.id → payload
nets: dict[str, dict] = field(default_factory=dict) # NET name → payload
layers: dict[int, dict] = field(default_factory=dict) # LAYER int → payload
rules: dict[tuple, dict] = field(default_factory=dict) # ("RULE", ...) tuple → payload
# Cross-references -----------------------------------------------------
obj_ids_by_part: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))
"""partId (dotted name OR `pid...` prefix) → object ids referencing it."""
components_by_part: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))
"""partId → component ids whose COMPONENT.partId == this."""
attrs_by_parent: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))
"""parentId → ATTR ids attached."""
lines_by_wire: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))
"""WIRE.id → LINE ids whose lineGroup == this."""
pad_nets_by_pad: dict[str, list[dict]] = field(default_factory=lambda: defaultdict(list))
"""PAD.id → [{comp, pin, net_name, padNet_payload}, ...]."""
pad_nets_by_net: dict[str, list[dict]] = field(default_factory=lambda: defaultdict(list))
"""net_name (from PAD_NET.padNet) → [{comp, pin, pad}, ...]."""
objects_on_layer: dict[int, list[str]] = field(default_factory=lambda: defaultdict(list))
"""layer int → object ids whose payload.layerId == this."""
objects_in_net: dict[str, list[str]] = field(default_factory=lambda: defaultdict(list))
"""net name (payload.netName) → object ids."""
# Diagnostics ----------------------------------------------------------
unresolved_parents: int = 0 # ATTR.parentId points to nothing in components/parts/pads
unresolved_wires: int = 0 # LINE.lineGroup points to nothing in wires
unresolved_layers: int = 0 # payload.layerId points to nothing in layers (pcb only)
bad_composite_ids: int = 0
# ----------------------------------------------------------------------
@classmethod
def build(cls, doc: Document) -> "Relations":
rel = cls(doc=doc)
# First pass: bucket primitives by type, parse composite ids.
for obj_id, payload in doc.objects.items():
t = payload.get("_type")
if t == "PART":
# PART payload uses head.id as its key (e.g. "0.96_inch_lcd.1").
# In our replay, doc.objects[obj_id] has _type=PART; obj_id IS the part id.
rel.parts[obj_id] = payload
elif t == "COMPONENT":
rel.components[obj_id] = payload
if part_ref := payload.get("partId"):
rel.components_by_part[str(part_ref)].append(obj_id)
elif t == "PIN":
rel.pins[obj_id] = payload
elif t == "PAD":
rel.pads[obj_id] = payload
elif t == "WIRE":
rel.wires[obj_id] = payload
elif t == "NET":
# NET id is `["NET", "<name>"]`
comp = parse_composite_id(obj_id)
if comp and len(comp) >= 2 and comp[0] == "NET":
rel.nets[str(comp[1])] = payload
else:
rel.bad_composite_ids += 1
elif t == "LAYER":
# LAYER id is `["LAYER", <int>]`
comp = parse_composite_id(obj_id)
if comp and len(comp) >= 2 and comp[0] == "LAYER":
try:
rel.layers[int(comp[1])] = payload
except (TypeError, ValueError):
rel.bad_composite_ids += 1
else:
rel.bad_composite_ids += 1
elif t == "RULE":
comp = parse_composite_id(obj_id)
if comp and comp[0] == "RULE":
rel.rules[tuple(comp)] = payload
else:
rel.bad_composite_ids += 1
elif t == "PAD_NET":
# id is `["PAD_NET", <comp_id>, <pin_num>, <pad_id>]`
# payload.padNet = "<net name>"
comp = parse_composite_id(obj_id)
if comp and len(comp) >= 4 and comp[0] == "PAD_NET":
_, c_id, pin_num, pad_id = comp[0], str(comp[1]), str(comp[2]), str(comp[3])
net_name = payload.get("padNet")
record = {
"comp": c_id,
"pin": pin_num,
"pad": pad_id,
"net_name": net_name,
"payload": payload,
}
rel.pad_nets_by_pad[pad_id].append(record)
if net_name:
rel.pad_nets_by_net[str(net_name)].append(record)
else:
rel.bad_composite_ids += 1
# Second pass: cross-references that need full primitive maps available.
for obj_id, payload in doc.objects.items():
t = payload.get("_type")
# partId fan-in (not just COMPONENTs — RECT/TEXT/PIN inside SYMBOL/FOOTPRINT
# all carry partId pointing at their containing PART)
if (part_ref := payload.get("partId")) and t != "COMPONENT":
rel.obj_ids_by_part[str(part_ref)].append(obj_id)
# ATTR → parent. parentId may target any addressable object in the doc
# (COMPONENT / WIRE / PART / PAD / PIN), or a compound `<a>-<b>` form
# where <a> is a component and <b> is its pin/sub-ref.
if t == "ATTR":
if parent := payload.get("parentId"):
parent_str = str(parent)
rel.attrs_by_parent[parent_str].append(obj_id)
if not _resolve_parent(parent_str, doc):
rel.unresolved_parents += 1
# LINE → wire
if t == "LINE":
if wire_ref := payload.get("lineGroup"):
rel.lines_by_wire[str(wire_ref)].append(obj_id)
if wire_ref not in rel.wires:
rel.unresolved_wires += 1
# any obj on layer
if (lid := payload.get("layerId")) is not None:
try:
lid_int = int(lid)
rel.objects_on_layer[lid_int].append(obj_id)
if lid_int not in rel.layers:
rel.unresolved_layers += 1
except (TypeError, ValueError):
pass
# any obj in net
if net_name := payload.get("netName"):
rel.objects_in_net[str(net_name)].append(obj_id)
return rel
# Accessor helpers -----------------------------------------------------
def part_for_component(self, comp_id: str) -> dict | None:
"""Return the PART payload for a COMPONENT, if resolvable.
In schematic context, COMPONENT.partId is a `pid...` prefix string that
does NOT match PART.id directly — the editor resolves it via library
cache. We try a best-effort match on the raw partId; callers handle None.
"""
comp = self.components.get(comp_id)
if not comp:
return None
return self.parts.get(str(comp.get("partId", "")))
def attrs_dict(self, parent_id: str) -> dict[str, Any]:
"""Convenience: collapse all ATTR ops with parentId == ``parent_id`` into a
flat ``{key: value}`` dict. Last write wins on duplicate keys.
"""
out: dict[str, Any] = {}
for attr_id in self.attrs_by_parent.get(parent_id, []):
payload = self.doc.objects.get(attr_id) or {}
k = payload.get("key")
if k is not None:
out[str(k)] = payload.get("value")
return out
def summary(self) -> dict[str, int]:
"""Stats for CLI / tests / sanity checks."""
return {
"parts": len(self.parts),
"components": len(self.components),
"pins": len(self.pins),
"pads": len(self.pads),
"wires": len(self.wires),
"nets": len(self.nets),
"layers": len(self.layers),
"rules": len(self.rules),
"lines_grouped": sum(len(v) for v in self.lines_by_wire.values()),
"attrs_attached": sum(len(v) for v in self.attrs_by_parent.values()),
"pad_nets": sum(len(v) for v in self.pad_nets_by_pad.values()),
"objects_on_layer": sum(len(v) for v in self.objects_on_layer.values()),
"objects_in_net": sum(len(v) for v in self.objects_in_net.values()),
"unresolved_parents": self.unresolved_parents,
"unresolved_wires": self.unresolved_wires,
"unresolved_layers": self.unresolved_layers,
"bad_composite_ids": self.bad_composite_ids,
}

View File

@@ -0,0 +1,126 @@
"""Relations builder regression tests.
These run a tiny synthetic Document through Relations.build to exercise:
- composite-id parsing for NET / LAYER / PAD_NET / RULE
- LINE.lineGroup → WIRE indexing
- ATTR.parentId resolution (direct + compound `<a>-<b>` form)
- cross-references on partId / netName / layerId
"""
from tools.epro2.relations import Relations, parse_composite_id
from tools.epro2.replay import Document
def _doc(obj_pairs):
"""Build a Document with given (id, payload) entries; payload _type required."""
d = Document(doc_uuid="test", doc_type="PCB")
for oid, payload in obj_pairs:
d.objects[oid] = payload
return d
def test_parse_composite_id_basic():
assert parse_composite_id('["LAYER",1]') == ["LAYER", 1]
assert parse_composite_id('["NET","+12V"]') == ["NET", "+12V"]
assert parse_composite_id('["PAD_NET","e0","1","e7"]') == ["PAD_NET", "e0", "1", "e7"]
assert parse_composite_id("e1") is None # plain id
assert parse_composite_id("[bad]") is None # malformed JSON
def test_layer_and_net_extraction():
d = _doc([
('["LAYER",1]', {"_type": "LAYER", "layerName": "Top Layer"}),
('["LAYER",2]', {"_type": "LAYER", "layerName": "Bottom Layer"}),
('["NET","GND"]', {"_type": "NET", "netType": None}),
])
rel = Relations.build(d)
assert rel.layers[1]["layerName"] == "Top Layer"
assert rel.layers[2]["layerName"] == "Bottom Layer"
assert "GND" in rel.nets
def test_lines_grouped_by_wire():
d = _doc([
("e637", {"_type": "WIRE", "groupId": "", "zIndex": 53}),
("ln1", {"_type": "LINE", "lineGroup": "e637", "startX": 0, "startY": 0, "endX": 10, "endY": 0}),
("ln2", {"_type": "LINE", "lineGroup": "e637", "startX": 10, "startY": 0, "endX": 10, "endY": 10}),
("ln3", {"_type": "LINE", "lineGroup": "e999", "startX": 0, "startY": 0, "endX": 1, "endY": 1}), # orphan
])
rel = Relations.build(d)
assert sorted(rel.lines_by_wire["e637"]) == ["ln1", "ln2"]
assert rel.lines_by_wire["e999"] == ["ln3"]
assert rel.unresolved_wires == 1 # ln3's wire 'e999' doesn't exist
def test_attr_parent_resolution_direct_and_compound():
d = _doc([
("e1", {"_type": "COMPONENT", "partId": "pidABC", "x": 0, "y": 0}),
("a1", {"_type": "ATTR", "parentId": "e1", "key": "Designator", "value": "R1"}),
("a2", {"_type": "ATTR", "parentId": "e1-pin3", "key": "PinName", "value": "VCC"}),
("a3", {"_type": "ATTR", "parentId": "ghost", "key": "X", "value": "Y"}), # truly orphan
])
rel = Relations.build(d)
assert sorted(rel.attrs_by_parent["e1"]) == ["a1"]
assert rel.attrs_by_parent["e1-pin3"] == ["a2"]
assert rel.unresolved_parents == 1 # only `ghost` is fully unresolved
def test_pad_net_indexing():
d = _doc([
('["PAD_NET","e0","1","e7"]', {"_type": "PAD_NET", "padNet": "GND"}),
('["PAD_NET","e0","2","e8"]', {"_type": "PAD_NET", "padNet": "VCC"}),
('["PAD_NET","e1","1","e7"]', {"_type": "PAD_NET", "padNet": "GND"}), # same pad, diff comp/pin
])
rel = Relations.build(d)
# pad e7 is referenced by 2 PAD_NETs (different (comp,pin) pairs)
assert len(rel.pad_nets_by_pad["e7"]) == 2
# net GND has 2 references; VCC has 1
assert len(rel.pad_nets_by_net["GND"]) == 2
assert len(rel.pad_nets_by_net["VCC"]) == 1
def test_attrs_dict_collapse():
d = _doc([
("e1", {"_type": "COMPONENT", "partId": "p"}),
("a1", {"_type": "ATTR", "parentId": "e1", "key": "Designator", "value": "R1"}),
("a2", {"_type": "ATTR", "parentId": "e1", "key": "Value", "value": "10kΩ"}),
("a3", {"_type": "ATTR", "parentId": "e1", "key": "Designator", "value": "R2"}), # last write wins
])
rel = Relations.build(d)
flat = rel.attrs_dict("e1")
assert flat == {"Designator": "R2", "Value": "10kΩ"}
def test_components_by_part_index():
d = _doc([
("e1", {"_type": "COMPONENT", "partId": "pidA"}),
("e2", {"_type": "COMPONENT", "partId": "pidA"}),
("e3", {"_type": "COMPONENT", "partId": "pidB"}),
])
rel = Relations.build(d)
assert sorted(rel.components_by_part["pidA"]) == ["e1", "e2"]
assert rel.components_by_part["pidB"] == ["e3"]
def test_objects_on_layer_and_in_net():
d = _doc([
('["LAYER",1]', {"_type": "LAYER", "layerName": "Top"}),
("v1", {"_type": "VIA", "layerId": 1, "netName": "GND"}),
("v2", {"_type": "VIA", "layerId": 1, "netName": "VCC"}),
("p1", {"_type": "POLY", "layerId": 99, "netName": "GND"}), # layer 99 doesn't exist → unresolved
])
rel = Relations.build(d)
assert sorted(rel.objects_on_layer[1]) == ["v1", "v2"]
assert rel.unresolved_layers == 1
assert sorted(rel.objects_in_net["GND"]) == ["p1", "v1"]
def test_summary_keys_present():
d = _doc([])
rel = Relations.build(d)
s = rel.summary()
for key in ("parts", "components", "pins", "pads", "nets", "layers", "rules",
"lines_grouped", "attrs_attached", "pad_nets",
"unresolved_parents", "unresolved_wires", "unresolved_layers",
"bad_composite_ids"):
assert key in s, f"missing summary key: {key}"