Files
FacereDataset/tools/epro2/std/pro2_writer.py
Knowit d11ca1d3be tools/epro2/std: fetch + decrypt Pro 2.x encrypted-external blobs
Pro 2.x stores some doc payloads (notably Taishan's PCB) externally at
modules.lceda.cn keyed by dataStrId, AES-256-GCM encrypted with the
iv/key fields stored alongside. Same crypto pattern as Pro 3.x EPRO2:
last 16 bytes are the GCM auth tag, rest is gzip(plaintext-op-stream).
The CDN doesn't require auth.

  - pro2_writer.fetch_encrypted_plaintext(): fetch + decrypt + gunzip,
    cache result at source/<uuid>.decrypted.txt so re-runs skip the
    network round-trip. Heavy imports (httpx, pycryptodome) are
    deferred to call-time so the pure-replay path doesn't pay for them.
  - pro2_writer.split_plaintext_by_doctype(): walk the multi-doc
    plaintext (Pro 2.x bundles N FOOTPRINTs + 1 PCB into one blob), yield
    (label, sub_text) per inner doc. Label = HEAD.uuid if present, else
    fallback `<kind>_<idx>`.
  - __main__._convert_pro2_encrypted(): for each sub-doc, write a
    synthetic inline-Pro-2.x JSON next to the original and re-route
    through write_pro2_doc — re-uses BBox / layers / objects-extraction
    instead of duplicating the logic. Output filename
    `<parent_uuid>__<sub_label>.json` makes the parent association
    visible.

Smoke (Taishan): 28 inline SCHs → 55 total. Decrypts:
  - one PCB blob (3.4 MB plaintext, 20267-object PCB + 25 FOOTPRINT
    sub-docs of 130-580 objects each)
  - one SCH-typed encrypted doc (1 sub-SCH of 891 objects)

86 unit tests still pass; new fetch/decrypt path is covered manually
via the smoke test rather than mocking httpx + AES.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 02:07:40 +08:00

362 lines
13 KiB
Python

"""Convert one EasyEDA Pro 2.x JSON document → an Option-2 Std-shaped JSON.
Pro 2.x stores each document as a JSON file whose ``dataStr`` field is a
**plaintext op-stream** — one JSON array per line, e.g.::
["DOCTYPE","SCH","1.1"]
["HEAD",{"originX":0,"originY":0,"version":"2.1.39","maxId":4639}]
["COMPONENT","e1","",0,0,0,0,{},0]
["ATTR","e18","e1","Symbol","6d31...",0,0,2506,-116,0,"st1",0]
...
This is **not** the same wire format as Pro 3.x EPRO2 (which is a binary
op-stream with tilde/pipe delimiters, decrypted from an AES-GCM blob);
the on-disk structure is closer to lceda Std but the op vocabulary
includes Pro 2.x extras (FONTSTYLE / LINESTYLE / CONNECT / OBJ /
REGION / DIMENSION / STRING / TEARDROP) the downstream adapter has to
dispatch on.
We don't translate the ops to a normalised dict like ``replay.Document``
does for EPRO2 — the field positions per OPTYPE are spec-defined for
Pro 2.x and the adapter already has to walk them by index. Instead we
emit the raw op arrays into ``dataStr.objects``, keyed by id (position 1
for most ops, OPTYPE for singletons), and tag the head with
``head.epro_format = "pro2"`` so the adapter can branch on it.
PCB docs whose payload is encrypted-external (``dataStrId`` / ``iv`` /
``key`` instead of inline ``dataStr``) are skipped — fetching from
modules.lceda.cn + AES-decrypt is out of this writer's scope.
"""
from __future__ import annotations
import gzip
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
# Pro 2.x ops that carry no addressable id (one per doc) — keyed by their
# OPTYPE in our objects dict. The rest of the op list shows up as
# id-keyed entries (id = position 1 of the op array).
_SINGLETON_OPTYPES: set[str] = {
"DOCTYPE", "HEAD", "CANVAS", "ACTIVE_LAYER", "PREFERENCE",
"SILK_OPTS", "PANELIZE", "PANELIZE_STAMP", "PANELIZE_SIDE",
"PRIMITIVE",
}
@dataclass
class WriteStats:
objects: int = 0
bbox_x: float = 0.0
bbox_y: float = 0.0
bbox_w: float = 0.0
bbox_h: float = 0.0
skipped_encrypted: bool = False
def _parse_datastr(s: str) -> dict[str, list]:
"""Parse a Pro 2.x op-stream into ``{id: [OPTYPE, args...]}``.
Lines that fail to parse are skipped silently — we mirror the EPRO2
replay's tolerance for partial / malformed sources, since some
real-world docs have stray whitespace or trailing commas.
"""
objects: dict[str, list] = {}
auto_id = 0
for line in s.splitlines():
line = line.strip()
if not line:
continue
try:
arr = json.loads(line)
except json.JSONDecodeError:
continue
if not isinstance(arr, list) or not arr:
continue
optype = arr[0]
if not isinstance(optype, str):
continue
if optype in _SINGLETON_OPTYPES:
obj_id = optype
elif len(arr) > 1 and isinstance(arr[1], (str, int)) and arr[1] != "":
# Most id-keyed ops have a string id ("e<n>") at position 1;
# a handful use ints (LAYER, RULE_SELECTOR). Convert ints to
# qualified strings so they don't collide with string ids.
raw = arr[1]
obj_id = raw if isinstance(raw, str) else f"{optype}_{raw}"
else:
# Defensive fallback — shouldn't fire on healthy docs but
# keeps unknown shapes addressable instead of dropped.
auto_id += 1
obj_id = f"_anon_{optype}_{auto_id}"
# Last write wins on duplicate id — matches EPRO2 replay semantics.
objects[obj_id] = arr
return objects
def _gather_bbox_from_objects(objects: dict[str, list]) -> tuple[float, float, float, float]:
"""Best-effort BBox by scanning known coord positions across Pro 2.x op arrays.
Different OPTYPEs put coords at different positions; we cover the
common ones (LINE / WIRE / VIA / RECT / TEXT / COMPONENT) — anything
we miss just makes the BBox loose, not wrong.
"""
xs: list[float] = []
ys: list[float] = []
# OPTYPE → list of (x_idx, y_idx) pairs in the op array.
coord_positions: dict[str, list[tuple[int, int]]] = {
# ["LINE", id, layer, x1, y1, x2, y2, ...]
"LINE": [(3, 4), (5, 6)],
# ["WIRE", id, x1, y1, x2, y2, ...] (sch wire)
"WIRE": [(2, 3), (4, 5)],
# ["VIA", id, layer, x, y, outerD, innerD, ...]
"VIA": [(3, 4)],
# ["RECT", id, x, y, w, h, ...]
"RECT": [(2, 3)],
# ["TEXT", id, ?, x, y, ...] — best-effort
"TEXT": [(3, 4)],
# ["COMPONENT", id, ..., x, y, rot, ...] — position varies; field
# order has stayed consistent through 2.x but not bullet-proof
"COMPONENT": [(3, 4)],
# ["ARC", id, layer, cx, cy, ...]
"ARC": [(3, 4)],
}
for arr in objects.values():
if not isinstance(arr, list) or len(arr) < 3:
continue
optype = arr[0]
for xi, yi in coord_positions.get(optype, []):
if xi >= len(arr) or yi >= len(arr):
continue
try:
xs.append(float(arr[xi]))
ys.append(float(arr[yi]))
except (TypeError, ValueError):
pass
if not xs:
return (0.0, 0.0, 0.0, 0.0)
return (min(xs), min(ys), max(xs) - min(xs), max(ys) - min(ys))
def _layers_from_objects(objects: dict[str, list]) -> list[str]:
"""Pro 2.x emits LAYER ops that already match the Std layer-string
format almost 1:1; we just join the array elements with ``~``."""
layers: list[str] = []
for arr in objects.values():
if not isinstance(arr, list) or len(arr) < 2 or arr[0] != "LAYER":
continue
# ["LAYER", id, type, name, attr1, color1, alpha1, color2, alpha2]
# → "id~name~color~visible~active~locked~" (we coerce to Std style)
try:
lid = arr[1]
ltype = arr[2] if len(arr) > 2 else ""
lname = arr[3] if len(arr) > 3 else f"Layer{lid}"
color = arr[5] if len(arr) > 5 else "#000000"
# use/show/locked aren't in the same positions as Std's; we
# keep stable defaults that downstream's adapter can override.
layers.append(f"{lid}~{lname}~{color}~true~false~true~")
except (TypeError, IndexError):
continue
return layers
def fetch_encrypted_plaintext(json_path: Path) -> str | None:
"""For an encrypted-external Pro 2.x JSON (carries `dataStrId/iv/key`
instead of inline `dataStr`), fetch the AES-GCM blob from modules.lceda.cn,
decrypt + gunzip, and return the plaintext op-stream string.
Caches the result alongside the source file at
``<source>/<uuid>.decrypted.txt`` so subsequent runs skip the network
+ crypto round-trip entirely.
Returns None if the JSON isn't encrypted-external, or if any step
fails (network, AES tag, gunzip — all caught and logged).
"""
raw = json.loads(json_path.read_text(encoding="utf-8"))
if "dataStr" in raw:
return None
url = raw.get("dataStrId")
iv_hex = raw.get("iv")
key_hex = raw.get("key")
if not (url and iv_hex and key_hex):
return None
cache = json_path.with_suffix(".decrypted.txt")
if cache.exists():
return cache.read_text(encoding="utf-8")
# Heavy imports only when actually fetching; the pure-replay path
# shouldn't pay for httpx + pycryptodome import time.
import httpx
from Crypto.Cipher import AES
try:
with httpx.Client(timeout=60.0) as c:
r = c.get(url)
r.raise_for_status()
blob = r.content
except httpx.HTTPError as e:
print(f" encrypted-external fetch failed for {json_path.name}: {e}")
return None
if len(blob) < 16:
print(f" encrypted-external blob too short ({len(blob)} B): {json_path.name}")
return None
ct, tag = blob[:-16], blob[-16:]
try:
cipher = AES.new(bytes.fromhex(key_hex), AES.MODE_GCM, nonce=bytes.fromhex(iv_hex))
gz = cipher.decrypt_and_verify(ct, tag)
plain_bytes = gzip.decompress(gz)
except Exception as e: # noqa: BLE001 — any crypto / gzip failure
print(f" encrypted-external decrypt failed for {json_path.name}: {e}")
return None
plain = plain_bytes.decode("utf-8", errors="replace")
try:
cache.write_text(plain, encoding="utf-8")
except OSError:
pass # caching is best-effort; skip if we can't write
return plain
def split_plaintext_by_doctype(plain: str) -> Iterator[tuple[str, str]]:
"""Walk a multi-doc plaintext op-stream and yield ``(doc_label, sub_text)``
per inner document.
Pro 2.x's encrypted blob bundles N FOOTPRINTs + 1 PCB (or N SYMBOLs +
1 SCH for schematic blobs). Each inner doc starts with a fresh
``["DOCTYPE", "<KIND>", "<version>"]`` line. We split on those.
The label is the HEAD op's `uuid` field if present, else
``<doctype_kind>_<index>``.
"""
cur_lines: list[str] = []
cur_uuid: str | None = None
cur_kind: str | None = None
idx = 0
def flush() -> tuple[str, str] | None:
nonlocal cur_lines, cur_uuid, cur_kind, idx
if not cur_lines:
return None
label = cur_uuid or f"{(cur_kind or 'doc').lower()}_{idx}"
idx += 1
text = "\n".join(cur_lines)
cur_lines = []
cur_uuid = None
cur_kind = None
return (label, text)
for line in plain.splitlines():
stripped = line.strip()
if not stripped:
continue
try:
arr = json.loads(stripped)
except json.JSONDecodeError:
cur_lines.append(line)
continue
if not isinstance(arr, list) or not arr:
continue
if arr[0] == "DOCTYPE":
# Boundary: flush previous doc (if any), start new
prev = flush()
if prev is not None:
yield prev
cur_kind = arr[1] if len(arr) > 1 else None
cur_lines.append(line)
continue
if arr[0] == "HEAD" and len(arr) > 1 and isinstance(arr[1], dict):
cur_uuid = arr[1].get("uuid") or cur_uuid
cur_lines.append(line)
last = flush()
if last is not None:
yield last
def write_pro2_doc(
json_path: Path,
*,
project_uuid: str = "",
editor_version_hint: str = "",
) -> dict | None:
"""Read a Pro 2.x JSON file → Option-2 Std envelope.
Returns ``None`` for docs we can't decode (e.g. encrypted-external
PCBs that store a ``dataStrId`` URL and AES iv/key instead of inline
``dataStr``); caller treats that as "skip with stats bump".
"""
raw = json.loads(json_path.read_text(encoding="utf-8"))
doc_uuid = raw.get("uuid") or json_path.stem
title = raw.get("title") or raw.get("display_title") or doc_uuid[:12]
doc_type_int = raw.get("docType")
# Encrypted-external: PCB blob lives at modules.lceda.cn keyed by
# dataStrId, AES-decrypt with the iv+key fields. We don't fetch.
if "dataStr" not in raw and ("dataStrId" in raw or "iv" in raw):
write_pro2_doc.last_stats = WriteStats(skipped_encrypted=True) # type: ignore[attr-defined]
return None
s = raw.get("dataStr")
if not isinstance(s, str):
return None
objects = _parse_datastr(s)
bbox_x, bbox_y, bbox_w, bbox_h = _gather_bbox_from_objects(objects)
layers = _layers_from_objects(objects)
# Pull the Pro 2.x editor version out of the HEAD op if present —
# finer-grained than the manifest's top-level editor_version (which
# is the project's, not the doc's).
head_op = objects.get("HEAD")
pro2_version = ""
if isinstance(head_op, list) and len(head_op) > 1 and isinstance(head_op[1], dict):
pro2_version = head_op[1].get("version", "")
result = {
"uuid": doc_uuid,
"puuid": project_uuid,
"title": title,
"description": raw.get("description", ""),
"docType": doc_type_int,
"components": {},
"dataStr": {
"head": {
"docType": str(doc_type_int) if doc_type_int is not None else "",
"editorVersion": (
f"facere-pro2/0.1 (lceda {pro2_version or editor_version_hint})"
),
"units": "mil",
"epro_format": "pro2",
"pro2_doc_uuid": doc_uuid,
"pro2_editor_version": pro2_version,
},
"BBox": {
"x": bbox_x,
"y": bbox_y,
"width": bbox_w,
"height": bbox_h,
},
"layers": layers,
"objects": objects,
"preference": {},
"netColors": [],
"DRCRULE": {},
},
}
write_pro2_doc.last_stats = WriteStats( # type: ignore[attr-defined]
objects=len(objects),
bbox_x=bbox_x, bbox_y=bbox_y, bbox_w=bbox_w, bbox_h=bbox_h,
)
return {"success": True, "code": 0, "result": result}