Pro 2.x stores some doc payloads (notably Taishan's PCB) externally at
modules.lceda.cn keyed by dataStrId, AES-256-GCM encrypted with the
iv/key fields stored alongside. Same crypto pattern as Pro 3.x EPRO2:
last 16 bytes are the GCM auth tag, rest is gzip(plaintext-op-stream).
The CDN doesn't require auth.
- pro2_writer.fetch_encrypted_plaintext(): fetch + decrypt + gunzip,
cache result at source/<uuid>.decrypted.txt so re-runs skip the
network round-trip. Heavy imports (httpx, pycryptodome) are
deferred to call-time so the pure-replay path doesn't pay for them.
- pro2_writer.split_plaintext_by_doctype(): walk the multi-doc
plaintext (Pro 2.x bundles N FOOTPRINTs + 1 PCB into one blob), yield
(label, sub_text) per inner doc. Label = HEAD.uuid if present, else
fallback `<kind>_<idx>`.
- __main__._convert_pro2_encrypted(): for each sub-doc, write a
synthetic inline-Pro-2.x JSON next to the original and re-route
through write_pro2_doc — re-uses BBox / layers / objects-extraction
instead of duplicating the logic. Output filename
`<parent_uuid>__<sub_label>.json` makes the parent association
visible.
Smoke (Taishan): 28 inline SCHs → 55 total. Decrypts:
- one PCB blob (3.4 MB plaintext, 20267-object PCB + 25 FOOTPRINT
sub-docs of 130-580 objects each)
- one SCH-typed encrypted doc (1 sub-SCH of 891 objects)
86 unit tests still pass; new fetch/decrypt path is covered manually
via the smoke test rather than mocking httpx + AES.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
362 lines
13 KiB
Python
362 lines
13 KiB
Python
"""Convert one EasyEDA Pro 2.x JSON document → an Option-2 Std-shaped JSON.
|
|
|
|
Pro 2.x stores each document as a JSON file whose ``dataStr`` field is a
|
|
**plaintext op-stream** — one JSON array per line, e.g.::
|
|
|
|
["DOCTYPE","SCH","1.1"]
|
|
["HEAD",{"originX":0,"originY":0,"version":"2.1.39","maxId":4639}]
|
|
["COMPONENT","e1","",0,0,0,0,{},0]
|
|
["ATTR","e18","e1","Symbol","6d31...",0,0,2506,-116,0,"st1",0]
|
|
...
|
|
|
|
This is **not** the same wire format as Pro 3.x EPRO2 (which is a binary
|
|
op-stream with tilde/pipe delimiters, decrypted from an AES-GCM blob);
|
|
the on-disk structure is closer to lceda Std but the op vocabulary
|
|
includes Pro 2.x extras (FONTSTYLE / LINESTYLE / CONNECT / OBJ /
|
|
REGION / DIMENSION / STRING / TEARDROP) the downstream adapter has to
|
|
dispatch on.
|
|
|
|
We don't translate the ops to a normalised dict like ``replay.Document``
|
|
does for EPRO2 — the field positions per OPTYPE are spec-defined for
|
|
Pro 2.x and the adapter already has to walk them by index. Instead we
|
|
emit the raw op arrays into ``dataStr.objects``, keyed by id (position 1
|
|
for most ops, OPTYPE for singletons), and tag the head with
|
|
``head.epro_format = "pro2"`` so the adapter can branch on it.
|
|
|
|
PCB docs whose payload is encrypted-external (``dataStrId`` / ``iv`` /
|
|
``key`` instead of inline ``dataStr``) are skipped — fetching from
|
|
modules.lceda.cn + AES-decrypt is out of this writer's scope.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import gzip
|
|
import json
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
|
|
# Pro 2.x ops that carry no addressable id (one per doc) — keyed by their
|
|
# OPTYPE in our objects dict. The rest of the op list shows up as
|
|
# id-keyed entries (id = position 1 of the op array).
|
|
_SINGLETON_OPTYPES: set[str] = {
|
|
"DOCTYPE", "HEAD", "CANVAS", "ACTIVE_LAYER", "PREFERENCE",
|
|
"SILK_OPTS", "PANELIZE", "PANELIZE_STAMP", "PANELIZE_SIDE",
|
|
"PRIMITIVE",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class WriteStats:
|
|
objects: int = 0
|
|
bbox_x: float = 0.0
|
|
bbox_y: float = 0.0
|
|
bbox_w: float = 0.0
|
|
bbox_h: float = 0.0
|
|
skipped_encrypted: bool = False
|
|
|
|
|
|
def _parse_datastr(s: str) -> dict[str, list]:
|
|
"""Parse a Pro 2.x op-stream into ``{id: [OPTYPE, args...]}``.
|
|
|
|
Lines that fail to parse are skipped silently — we mirror the EPRO2
|
|
replay's tolerance for partial / malformed sources, since some
|
|
real-world docs have stray whitespace or trailing commas.
|
|
"""
|
|
objects: dict[str, list] = {}
|
|
auto_id = 0
|
|
for line in s.splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
arr = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if not isinstance(arr, list) or not arr:
|
|
continue
|
|
optype = arr[0]
|
|
if not isinstance(optype, str):
|
|
continue
|
|
|
|
if optype in _SINGLETON_OPTYPES:
|
|
obj_id = optype
|
|
elif len(arr) > 1 and isinstance(arr[1], (str, int)) and arr[1] != "":
|
|
# Most id-keyed ops have a string id ("e<n>") at position 1;
|
|
# a handful use ints (LAYER, RULE_SELECTOR). Convert ints to
|
|
# qualified strings so they don't collide with string ids.
|
|
raw = arr[1]
|
|
obj_id = raw if isinstance(raw, str) else f"{optype}_{raw}"
|
|
else:
|
|
# Defensive fallback — shouldn't fire on healthy docs but
|
|
# keeps unknown shapes addressable instead of dropped.
|
|
auto_id += 1
|
|
obj_id = f"_anon_{optype}_{auto_id}"
|
|
|
|
# Last write wins on duplicate id — matches EPRO2 replay semantics.
|
|
objects[obj_id] = arr
|
|
return objects
|
|
|
|
|
|
def _gather_bbox_from_objects(objects: dict[str, list]) -> tuple[float, float, float, float]:
|
|
"""Best-effort BBox by scanning known coord positions across Pro 2.x op arrays.
|
|
|
|
Different OPTYPEs put coords at different positions; we cover the
|
|
common ones (LINE / WIRE / VIA / RECT / TEXT / COMPONENT) — anything
|
|
we miss just makes the BBox loose, not wrong.
|
|
"""
|
|
xs: list[float] = []
|
|
ys: list[float] = []
|
|
|
|
# OPTYPE → list of (x_idx, y_idx) pairs in the op array.
|
|
coord_positions: dict[str, list[tuple[int, int]]] = {
|
|
# ["LINE", id, layer, x1, y1, x2, y2, ...]
|
|
"LINE": [(3, 4), (5, 6)],
|
|
# ["WIRE", id, x1, y1, x2, y2, ...] (sch wire)
|
|
"WIRE": [(2, 3), (4, 5)],
|
|
# ["VIA", id, layer, x, y, outerD, innerD, ...]
|
|
"VIA": [(3, 4)],
|
|
# ["RECT", id, x, y, w, h, ...]
|
|
"RECT": [(2, 3)],
|
|
# ["TEXT", id, ?, x, y, ...] — best-effort
|
|
"TEXT": [(3, 4)],
|
|
# ["COMPONENT", id, ..., x, y, rot, ...] — position varies; field
|
|
# order has stayed consistent through 2.x but not bullet-proof
|
|
"COMPONENT": [(3, 4)],
|
|
# ["ARC", id, layer, cx, cy, ...]
|
|
"ARC": [(3, 4)],
|
|
}
|
|
|
|
for arr in objects.values():
|
|
if not isinstance(arr, list) or len(arr) < 3:
|
|
continue
|
|
optype = arr[0]
|
|
for xi, yi in coord_positions.get(optype, []):
|
|
if xi >= len(arr) or yi >= len(arr):
|
|
continue
|
|
try:
|
|
xs.append(float(arr[xi]))
|
|
ys.append(float(arr[yi]))
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
if not xs:
|
|
return (0.0, 0.0, 0.0, 0.0)
|
|
return (min(xs), min(ys), max(xs) - min(xs), max(ys) - min(ys))
|
|
|
|
|
|
def _layers_from_objects(objects: dict[str, list]) -> list[str]:
|
|
"""Pro 2.x emits LAYER ops that already match the Std layer-string
|
|
format almost 1:1; we just join the array elements with ``~``."""
|
|
layers: list[str] = []
|
|
for arr in objects.values():
|
|
if not isinstance(arr, list) or len(arr) < 2 or arr[0] != "LAYER":
|
|
continue
|
|
# ["LAYER", id, type, name, attr1, color1, alpha1, color2, alpha2]
|
|
# → "id~name~color~visible~active~locked~" (we coerce to Std style)
|
|
try:
|
|
lid = arr[1]
|
|
ltype = arr[2] if len(arr) > 2 else ""
|
|
lname = arr[3] if len(arr) > 3 else f"Layer{lid}"
|
|
color = arr[5] if len(arr) > 5 else "#000000"
|
|
# use/show/locked aren't in the same positions as Std's; we
|
|
# keep stable defaults that downstream's adapter can override.
|
|
layers.append(f"{lid}~{lname}~{color}~true~false~true~")
|
|
except (TypeError, IndexError):
|
|
continue
|
|
return layers
|
|
|
|
|
|
def fetch_encrypted_plaintext(json_path: Path) -> str | None:
|
|
"""For an encrypted-external Pro 2.x JSON (carries `dataStrId/iv/key`
|
|
instead of inline `dataStr`), fetch the AES-GCM blob from modules.lceda.cn,
|
|
decrypt + gunzip, and return the plaintext op-stream string.
|
|
|
|
Caches the result alongside the source file at
|
|
``<source>/<uuid>.decrypted.txt`` so subsequent runs skip the network
|
|
+ crypto round-trip entirely.
|
|
|
|
Returns None if the JSON isn't encrypted-external, or if any step
|
|
fails (network, AES tag, gunzip — all caught and logged).
|
|
"""
|
|
raw = json.loads(json_path.read_text(encoding="utf-8"))
|
|
if "dataStr" in raw:
|
|
return None
|
|
url = raw.get("dataStrId")
|
|
iv_hex = raw.get("iv")
|
|
key_hex = raw.get("key")
|
|
if not (url and iv_hex and key_hex):
|
|
return None
|
|
|
|
cache = json_path.with_suffix(".decrypted.txt")
|
|
if cache.exists():
|
|
return cache.read_text(encoding="utf-8")
|
|
|
|
# Heavy imports only when actually fetching; the pure-replay path
|
|
# shouldn't pay for httpx + pycryptodome import time.
|
|
import httpx
|
|
from Crypto.Cipher import AES
|
|
|
|
try:
|
|
with httpx.Client(timeout=60.0) as c:
|
|
r = c.get(url)
|
|
r.raise_for_status()
|
|
blob = r.content
|
|
except httpx.HTTPError as e:
|
|
print(f" encrypted-external fetch failed for {json_path.name}: {e}")
|
|
return None
|
|
|
|
if len(blob) < 16:
|
|
print(f" encrypted-external blob too short ({len(blob)} B): {json_path.name}")
|
|
return None
|
|
|
|
ct, tag = blob[:-16], blob[-16:]
|
|
try:
|
|
cipher = AES.new(bytes.fromhex(key_hex), AES.MODE_GCM, nonce=bytes.fromhex(iv_hex))
|
|
gz = cipher.decrypt_and_verify(ct, tag)
|
|
plain_bytes = gzip.decompress(gz)
|
|
except Exception as e: # noqa: BLE001 — any crypto / gzip failure
|
|
print(f" encrypted-external decrypt failed for {json_path.name}: {e}")
|
|
return None
|
|
|
|
plain = plain_bytes.decode("utf-8", errors="replace")
|
|
try:
|
|
cache.write_text(plain, encoding="utf-8")
|
|
except OSError:
|
|
pass # caching is best-effort; skip if we can't write
|
|
return plain
|
|
|
|
|
|
def split_plaintext_by_doctype(plain: str) -> Iterator[tuple[str, str]]:
|
|
"""Walk a multi-doc plaintext op-stream and yield ``(doc_label, sub_text)``
|
|
per inner document.
|
|
|
|
Pro 2.x's encrypted blob bundles N FOOTPRINTs + 1 PCB (or N SYMBOLs +
|
|
1 SCH for schematic blobs). Each inner doc starts with a fresh
|
|
``["DOCTYPE", "<KIND>", "<version>"]`` line. We split on those.
|
|
|
|
The label is the HEAD op's `uuid` field if present, else
|
|
``<doctype_kind>_<index>``.
|
|
"""
|
|
cur_lines: list[str] = []
|
|
cur_uuid: str | None = None
|
|
cur_kind: str | None = None
|
|
idx = 0
|
|
|
|
def flush() -> tuple[str, str] | None:
|
|
nonlocal cur_lines, cur_uuid, cur_kind, idx
|
|
if not cur_lines:
|
|
return None
|
|
label = cur_uuid or f"{(cur_kind or 'doc').lower()}_{idx}"
|
|
idx += 1
|
|
text = "\n".join(cur_lines)
|
|
cur_lines = []
|
|
cur_uuid = None
|
|
cur_kind = None
|
|
return (label, text)
|
|
|
|
for line in plain.splitlines():
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
try:
|
|
arr = json.loads(stripped)
|
|
except json.JSONDecodeError:
|
|
cur_lines.append(line)
|
|
continue
|
|
if not isinstance(arr, list) or not arr:
|
|
continue
|
|
if arr[0] == "DOCTYPE":
|
|
# Boundary: flush previous doc (if any), start new
|
|
prev = flush()
|
|
if prev is not None:
|
|
yield prev
|
|
cur_kind = arr[1] if len(arr) > 1 else None
|
|
cur_lines.append(line)
|
|
continue
|
|
if arr[0] == "HEAD" and len(arr) > 1 and isinstance(arr[1], dict):
|
|
cur_uuid = arr[1].get("uuid") or cur_uuid
|
|
cur_lines.append(line)
|
|
|
|
last = flush()
|
|
if last is not None:
|
|
yield last
|
|
|
|
|
|
def write_pro2_doc(
|
|
json_path: Path,
|
|
*,
|
|
project_uuid: str = "",
|
|
editor_version_hint: str = "",
|
|
) -> dict | None:
|
|
"""Read a Pro 2.x JSON file → Option-2 Std envelope.
|
|
|
|
Returns ``None`` for docs we can't decode (e.g. encrypted-external
|
|
PCBs that store a ``dataStrId`` URL and AES iv/key instead of inline
|
|
``dataStr``); caller treats that as "skip with stats bump".
|
|
"""
|
|
raw = json.loads(json_path.read_text(encoding="utf-8"))
|
|
doc_uuid = raw.get("uuid") or json_path.stem
|
|
title = raw.get("title") or raw.get("display_title") or doc_uuid[:12]
|
|
doc_type_int = raw.get("docType")
|
|
|
|
# Encrypted-external: PCB blob lives at modules.lceda.cn keyed by
|
|
# dataStrId, AES-decrypt with the iv+key fields. We don't fetch.
|
|
if "dataStr" not in raw and ("dataStrId" in raw or "iv" in raw):
|
|
write_pro2_doc.last_stats = WriteStats(skipped_encrypted=True) # type: ignore[attr-defined]
|
|
return None
|
|
|
|
s = raw.get("dataStr")
|
|
if not isinstance(s, str):
|
|
return None
|
|
|
|
objects = _parse_datastr(s)
|
|
bbox_x, bbox_y, bbox_w, bbox_h = _gather_bbox_from_objects(objects)
|
|
layers = _layers_from_objects(objects)
|
|
|
|
# Pull the Pro 2.x editor version out of the HEAD op if present —
|
|
# finer-grained than the manifest's top-level editor_version (which
|
|
# is the project's, not the doc's).
|
|
head_op = objects.get("HEAD")
|
|
pro2_version = ""
|
|
if isinstance(head_op, list) and len(head_op) > 1 and isinstance(head_op[1], dict):
|
|
pro2_version = head_op[1].get("version", "")
|
|
|
|
result = {
|
|
"uuid": doc_uuid,
|
|
"puuid": project_uuid,
|
|
"title": title,
|
|
"description": raw.get("description", ""),
|
|
"docType": doc_type_int,
|
|
"components": {},
|
|
"dataStr": {
|
|
"head": {
|
|
"docType": str(doc_type_int) if doc_type_int is not None else "",
|
|
"editorVersion": (
|
|
f"facere-pro2/0.1 (lceda {pro2_version or editor_version_hint})"
|
|
),
|
|
"units": "mil",
|
|
"epro_format": "pro2",
|
|
"pro2_doc_uuid": doc_uuid,
|
|
"pro2_editor_version": pro2_version,
|
|
},
|
|
"BBox": {
|
|
"x": bbox_x,
|
|
"y": bbox_y,
|
|
"width": bbox_w,
|
|
"height": bbox_h,
|
|
},
|
|
"layers": layers,
|
|
"objects": objects,
|
|
"preference": {},
|
|
"netColors": [],
|
|
"DRCRULE": {},
|
|
},
|
|
}
|
|
write_pro2_doc.last_stats = WriteStats( # type: ignore[attr-defined]
|
|
objects=len(objects),
|
|
bbox_x=bbox_x, bbox_y=bbox_y, bbox_w=bbox_w, bbox_h=bbox_h,
|
|
)
|
|
return {"success": True, "code": 0, "result": result}
|