From d11ca1d3bef9731382eb7fc7986605f1aed680cc Mon Sep 17 00:00:00 2001 From: Knowit Date: Wed, 29 Apr 2026 02:07:40 +0800 Subject: [PATCH] tools/epro2/std: fetch + decrypt Pro 2.x encrypted-external blobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pro 2.x stores some doc payloads (notably Taishan's PCB) externally at modules.lceda.cn keyed by dataStrId, AES-256-GCM encrypted with the iv/key fields stored alongside. Same crypto pattern as Pro 3.x EPRO2: last 16 bytes are the GCM auth tag, rest is gzip(plaintext-op-stream). The CDN doesn't require auth. - pro2_writer.fetch_encrypted_plaintext(): fetch + decrypt + gunzip, cache result at source/.decrypted.txt so re-runs skip the network round-trip. Heavy imports (httpx, pycryptodome) are deferred to call-time so the pure-replay path doesn't pay for them. - pro2_writer.split_plaintext_by_doctype(): walk the multi-doc plaintext (Pro 2.x bundles N FOOTPRINTs + 1 PCB into one blob), yield (label, sub_text) per inner doc. Label = HEAD.uuid if present, else fallback `_`. - __main__._convert_pro2_encrypted(): for each sub-doc, write a synthetic inline-Pro-2.x JSON next to the original and re-route through write_pro2_doc — re-uses BBox / layers / objects-extraction instead of duplicating the logic. Output filename `__.json` makes the parent association visible. Smoke (Taishan): 28 inline SCHs → 55 total. Decrypts: - one PCB blob (3.4 MB plaintext, 20267-object PCB + 25 FOOTPRINT sub-docs of 130-580 objects each) - one SCH-typed encrypted doc (1 sub-SCH of 891 objects) 86 unit tests still pass; new fetch/decrypt path is covered manually via the smoke test rather than mocking httpx + AES. Co-Authored-By: Claude Opus 4.7 (1M context) --- tools/epro2/std/__main__.py | 103 ++++++++++++++++++++++++++-- tools/epro2/std/pro2_writer.py | 118 +++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 6 deletions(-) diff --git a/tools/epro2/std/__main__.py b/tools/epro2/std/__main__.py index 52f8faf..5911767 100644 --- a/tools/epro2/std/__main__.py +++ b/tools/epro2/std/__main__.py @@ -25,7 +25,11 @@ from pathlib import Path from ..replay import Project, replay_project from .pcb_writer import write_pcb_std -from .pro2_writer import write_pro2_doc +from .pro2_writer import ( + fetch_encrypted_plaintext, + split_plaintext_by_doctype, + write_pro2_doc, +) from .sch_writer import write_sch_std @@ -99,6 +103,80 @@ def _convert_schs(proj: Project, out_dir: Path) -> int: return len(uuids) +def _convert_pro2_encrypted( + json_path: Path, out_dir: Path, + project_uuid: str, editor_version: str, parent_uuid: str, +) -> int: + """Try fetch + AES-256-GCM decrypt + gunzip the encrypted-external + blob, then split by DOCTYPE boundary into per-sub-doc JSONs. + + Pro 2.x bundles N FOOTPRINTs + 1 PCB (or N SYMBOLs + 1 SCH) into one + blob; we emit each as a separate file named + ``__.json`` so the parent association is + visible in the filename without colliding with other sources. + """ + plain = fetch_encrypted_plaintext(json_path) + if plain is None: + return 0 + + n = 0 + for sub_label, sub_text in split_plaintext_by_doctype(plain): + # Re-route the inline path: build a synthetic Pro 2.x doc shape + # in a temp file so write_pro2_doc + its caching behave normally. + synth = { + "uuid": f"{parent_uuid}__{sub_label}", + "title": sub_label, + "docType": _doctype_from_first_line(sub_text), + "dataStr": sub_text, + } + # Write the synthetic JSON next to the original, with a name + # that won't collide with manifest entries. + synth_path = json_path.parent / f".synth__{parent_uuid}__{sub_label}.json" + synth_path.write_text(json.dumps(synth, ensure_ascii=False), encoding="utf-8") + try: + payload = write_pro2_doc( + synth_path, project_uuid=project_uuid, + editor_version_hint=editor_version, + ) + finally: + synth_path.unlink(missing_ok=True) + if payload is None: + continue + out_name = f"{parent_uuid}__{sub_label}.json" + (out_dir / out_name).write_text( + json.dumps(payload, ensure_ascii=False, separators=(",", ":")), + encoding="utf-8", + ) + s = getattr(write_pro2_doc, "last_stats", None) + if s: + print( + f" {parent_uuid[:12]}__{sub_label}: docType={synth['docType']} " + f"objects={s.objects}" + ) + n += 1 + return n + + +def _doctype_from_first_line(text: str) -> int: + """Read the leading ``["DOCTYPE","KIND","x.y"]`` line and return the + Std docType code (1=SCH, 2=SYMBOL, 3=PCB, 4=FOOTPRINT, 5=DEVICE).""" + for line in text.splitlines(): + line = line.strip() + if not line: + continue + try: + arr = json.loads(line) + except json.JSONDecodeError: + continue + if not (isinstance(arr, list) and arr and arr[0] == "DOCTYPE"): + continue + kind = arr[1] if len(arr) > 1 else "" + return { + "SCH": 1, "SYMBOL": 2, "PCB": 3, "FOOTPRINT": 4, "DEVICE": 5, + }.get(kind, 0) + return 0 + + def _convert_pro2(project_dir: Path, out_dir: Path, editor_version: str, want_pcb: bool, want_sch: bool) -> int: """Pro 2.x path — read each .json directly (no EPRO2 replay) @@ -130,12 +208,25 @@ def _convert_pro2(project_dir: Path, out_dir: Path, if payload is None: stats = getattr(write_pro2_doc, "last_stats", None) if stats and stats.skipped_encrypted: - print( - f" SKIP {entry['doc_uuid'][:12]}: PCB blob is " - f"AES-encrypted external (dataStrId+iv+key); needs " - f"a separate fetch+decrypt step we don't run here." + # Try fetching + decrypting from modules.lceda.cn. The blob + # bundles N FOOTPRINTs/SYMBOLs + 1 parent PCB/SCH; we emit + # one JSON per sub-doc. + m_n = _convert_pro2_encrypted( + path, out_dir, project_uuid, editor_version, + parent_uuid=entry["doc_uuid"], ) - skipped_encrypted += 1 + if m_n > 0: + print( + f" decrypted {entry['doc_uuid'][:12]}: " + f"{m_n} sub-doc(s) emitted" + ) + n += m_n + else: + print( + f" SKIP {entry['doc_uuid'][:12]}: encrypted-external " + f"and fetch/decrypt failed." + ) + skipped_encrypted += 1 continue out_path = out_dir / f"{entry['doc_uuid']}.json" out_path.write_text( diff --git a/tools/epro2/std/pro2_writer.py b/tools/epro2/std/pro2_writer.py index e1201f0..150a5ac 100644 --- a/tools/epro2/std/pro2_writer.py +++ b/tools/epro2/std/pro2_writer.py @@ -30,9 +30,11 @@ modules.lceda.cn + AES-decrypt is out of this writer's scope. from __future__ import annotations +import gzip import json from dataclasses import dataclass from pathlib import Path +from typing import Iterator # Pro 2.x ops that carry no addressable id (one per doc) — keyed by their @@ -166,6 +168,122 @@ def _layers_from_objects(objects: dict[str, list]) -> list[str]: return layers +def fetch_encrypted_plaintext(json_path: Path) -> str | None: + """For an encrypted-external Pro 2.x JSON (carries `dataStrId/iv/key` + instead of inline `dataStr`), fetch the AES-GCM blob from modules.lceda.cn, + decrypt + gunzip, and return the plaintext op-stream string. + + Caches the result alongside the source file at + ``/.decrypted.txt`` so subsequent runs skip the network + + crypto round-trip entirely. + + Returns None if the JSON isn't encrypted-external, or if any step + fails (network, AES tag, gunzip — all caught and logged). + """ + raw = json.loads(json_path.read_text(encoding="utf-8")) + if "dataStr" in raw: + return None + url = raw.get("dataStrId") + iv_hex = raw.get("iv") + key_hex = raw.get("key") + if not (url and iv_hex and key_hex): + return None + + cache = json_path.with_suffix(".decrypted.txt") + if cache.exists(): + return cache.read_text(encoding="utf-8") + + # Heavy imports only when actually fetching; the pure-replay path + # shouldn't pay for httpx + pycryptodome import time. + import httpx + from Crypto.Cipher import AES + + try: + with httpx.Client(timeout=60.0) as c: + r = c.get(url) + r.raise_for_status() + blob = r.content + except httpx.HTTPError as e: + print(f" encrypted-external fetch failed for {json_path.name}: {e}") + return None + + if len(blob) < 16: + print(f" encrypted-external blob too short ({len(blob)} B): {json_path.name}") + return None + + ct, tag = blob[:-16], blob[-16:] + try: + cipher = AES.new(bytes.fromhex(key_hex), AES.MODE_GCM, nonce=bytes.fromhex(iv_hex)) + gz = cipher.decrypt_and_verify(ct, tag) + plain_bytes = gzip.decompress(gz) + except Exception as e: # noqa: BLE001 — any crypto / gzip failure + print(f" encrypted-external decrypt failed for {json_path.name}: {e}") + return None + + plain = plain_bytes.decode("utf-8", errors="replace") + try: + cache.write_text(plain, encoding="utf-8") + except OSError: + pass # caching is best-effort; skip if we can't write + return plain + + +def split_plaintext_by_doctype(plain: str) -> Iterator[tuple[str, str]]: + """Walk a multi-doc plaintext op-stream and yield ``(doc_label, sub_text)`` + per inner document. + + Pro 2.x's encrypted blob bundles N FOOTPRINTs + 1 PCB (or N SYMBOLs + + 1 SCH for schematic blobs). Each inner doc starts with a fresh + ``["DOCTYPE", "", ""]`` line. We split on those. + + The label is the HEAD op's `uuid` field if present, else + ``_``. + """ + cur_lines: list[str] = [] + cur_uuid: str | None = None + cur_kind: str | None = None + idx = 0 + + def flush() -> tuple[str, str] | None: + nonlocal cur_lines, cur_uuid, cur_kind, idx + if not cur_lines: + return None + label = cur_uuid or f"{(cur_kind or 'doc').lower()}_{idx}" + idx += 1 + text = "\n".join(cur_lines) + cur_lines = [] + cur_uuid = None + cur_kind = None + return (label, text) + + for line in plain.splitlines(): + stripped = line.strip() + if not stripped: + continue + try: + arr = json.loads(stripped) + except json.JSONDecodeError: + cur_lines.append(line) + continue + if not isinstance(arr, list) or not arr: + continue + if arr[0] == "DOCTYPE": + # Boundary: flush previous doc (if any), start new + prev = flush() + if prev is not None: + yield prev + cur_kind = arr[1] if len(arr) > 1 else None + cur_lines.append(line) + continue + if arr[0] == "HEAD" and len(arr) > 1 and isinstance(arr[1], dict): + cur_uuid = arr[1].get("uuid") or cur_uuid + cur_lines.append(line) + + last = flush() + if last is not None: + yield last + + def write_pro2_doc( json_path: Path, *,