tools/epro2/std: fetch + decrypt Pro 2.x encrypted-external blobs

Pro 2.x stores some doc payloads (notably Taishan's PCB) externally at
modules.lceda.cn keyed by dataStrId, AES-256-GCM encrypted with the
iv/key fields stored alongside. Same crypto pattern as Pro 3.x EPRO2:
last 16 bytes are the GCM auth tag, rest is gzip(plaintext-op-stream).
The CDN doesn't require auth.

  - pro2_writer.fetch_encrypted_plaintext(): fetch + decrypt + gunzip,
    cache result at source/<uuid>.decrypted.txt so re-runs skip the
    network round-trip. Heavy imports (httpx, pycryptodome) are
    deferred to call-time so the pure-replay path doesn't pay for them.
  - pro2_writer.split_plaintext_by_doctype(): walk the multi-doc
    plaintext (Pro 2.x bundles N FOOTPRINTs + 1 PCB into one blob), yield
    (label, sub_text) per inner doc. Label = HEAD.uuid if present, else
    fallback `<kind>_<idx>`.
  - __main__._convert_pro2_encrypted(): for each sub-doc, write a
    synthetic inline-Pro-2.x JSON next to the original and re-route
    through write_pro2_doc — re-uses BBox / layers / objects-extraction
    instead of duplicating the logic. Output filename
    `<parent_uuid>__<sub_label>.json` makes the parent association
    visible.

Smoke (Taishan): 28 inline SCHs → 55 total. Decrypts:
  - one PCB blob (3.4 MB plaintext, 20267-object PCB + 25 FOOTPRINT
    sub-docs of 130-580 objects each)
  - one SCH-typed encrypted doc (1 sub-SCH of 891 objects)

86 unit tests still pass; new fetch/decrypt path is covered manually
via the smoke test rather than mocking httpx + AES.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-29 02:07:40 +08:00
parent 3720cd176a
commit d11ca1d3be
2 changed files with 215 additions and 6 deletions

View File

@@ -25,7 +25,11 @@ from pathlib import Path
from ..replay import Project, replay_project
from .pcb_writer import write_pcb_std
from .pro2_writer import write_pro2_doc
from .pro2_writer import (
fetch_encrypted_plaintext,
split_plaintext_by_doctype,
write_pro2_doc,
)
from .sch_writer import write_sch_std
@@ -99,6 +103,80 @@ def _convert_schs(proj: Project, out_dir: Path) -> int:
return len(uuids)
def _convert_pro2_encrypted(
json_path: Path, out_dir: Path,
project_uuid: str, editor_version: str, parent_uuid: str,
) -> int:
"""Try fetch + AES-256-GCM decrypt + gunzip the encrypted-external
blob, then split by DOCTYPE boundary into per-sub-doc JSONs.
Pro 2.x bundles N FOOTPRINTs + 1 PCB (or N SYMBOLs + 1 SCH) into one
blob; we emit each as a separate file named
``<parent_uuid>__<sub_label>.json`` so the parent association is
visible in the filename without colliding with other sources.
"""
plain = fetch_encrypted_plaintext(json_path)
if plain is None:
return 0
n = 0
for sub_label, sub_text in split_plaintext_by_doctype(plain):
# Re-route the inline path: build a synthetic Pro 2.x doc shape
# in a temp file so write_pro2_doc + its caching behave normally.
synth = {
"uuid": f"{parent_uuid}__{sub_label}",
"title": sub_label,
"docType": _doctype_from_first_line(sub_text),
"dataStr": sub_text,
}
# Write the synthetic JSON next to the original, with a name
# that won't collide with manifest entries.
synth_path = json_path.parent / f".synth__{parent_uuid}__{sub_label}.json"
synth_path.write_text(json.dumps(synth, ensure_ascii=False), encoding="utf-8")
try:
payload = write_pro2_doc(
synth_path, project_uuid=project_uuid,
editor_version_hint=editor_version,
)
finally:
synth_path.unlink(missing_ok=True)
if payload is None:
continue
out_name = f"{parent_uuid}__{sub_label}.json"
(out_dir / out_name).write_text(
json.dumps(payload, ensure_ascii=False, separators=(",", ":")),
encoding="utf-8",
)
s = getattr(write_pro2_doc, "last_stats", None)
if s:
print(
f" {parent_uuid[:12]}__{sub_label}: docType={synth['docType']} "
f"objects={s.objects}"
)
n += 1
return n
def _doctype_from_first_line(text: str) -> int:
"""Read the leading ``["DOCTYPE","KIND","x.y"]`` line and return the
Std docType code (1=SCH, 2=SYMBOL, 3=PCB, 4=FOOTPRINT, 5=DEVICE)."""
for line in text.splitlines():
line = line.strip()
if not line:
continue
try:
arr = json.loads(line)
except json.JSONDecodeError:
continue
if not (isinstance(arr, list) and arr and arr[0] == "DOCTYPE"):
continue
kind = arr[1] if len(arr) > 1 else ""
return {
"SCH": 1, "SYMBOL": 2, "PCB": 3, "FOOTPRINT": 4, "DEVICE": 5,
}.get(kind, 0)
return 0
def _convert_pro2(project_dir: Path, out_dir: Path,
editor_version: str, want_pcb: bool, want_sch: bool) -> int:
"""Pro 2.x path — read each <uuid>.json directly (no EPRO2 replay)
@@ -130,12 +208,25 @@ def _convert_pro2(project_dir: Path, out_dir: Path,
if payload is None:
stats = getattr(write_pro2_doc, "last_stats", None)
if stats and stats.skipped_encrypted:
print(
f" SKIP {entry['doc_uuid'][:12]}: PCB blob is "
f"AES-encrypted external (dataStrId+iv+key); needs "
f"a separate fetch+decrypt step we don't run here."
# Try fetching + decrypting from modules.lceda.cn. The blob
# bundles N FOOTPRINTs/SYMBOLs + 1 parent PCB/SCH; we emit
# one JSON per sub-doc.
m_n = _convert_pro2_encrypted(
path, out_dir, project_uuid, editor_version,
parent_uuid=entry["doc_uuid"],
)
skipped_encrypted += 1
if m_n > 0:
print(
f" decrypted {entry['doc_uuid'][:12]}: "
f"{m_n} sub-doc(s) emitted"
)
n += m_n
else:
print(
f" SKIP {entry['doc_uuid'][:12]}: encrypted-external "
f"and fetch/decrypt failed."
)
skipped_encrypted += 1
continue
out_path = out_dir / f"{entry['doc_uuid']}.json"
out_path.write_text(

View File

@@ -30,9 +30,11 @@ modules.lceda.cn + AES-decrypt is out of this writer's scope.
from __future__ import annotations
import gzip
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
# Pro 2.x ops that carry no addressable id (one per doc) — keyed by their
@@ -166,6 +168,122 @@ def _layers_from_objects(objects: dict[str, list]) -> list[str]:
return layers
def fetch_encrypted_plaintext(json_path: Path) -> str | None:
"""For an encrypted-external Pro 2.x JSON (carries `dataStrId/iv/key`
instead of inline `dataStr`), fetch the AES-GCM blob from modules.lceda.cn,
decrypt + gunzip, and return the plaintext op-stream string.
Caches the result alongside the source file at
``<source>/<uuid>.decrypted.txt`` so subsequent runs skip the network
+ crypto round-trip entirely.
Returns None if the JSON isn't encrypted-external, or if any step
fails (network, AES tag, gunzip — all caught and logged).
"""
raw = json.loads(json_path.read_text(encoding="utf-8"))
if "dataStr" in raw:
return None
url = raw.get("dataStrId")
iv_hex = raw.get("iv")
key_hex = raw.get("key")
if not (url and iv_hex and key_hex):
return None
cache = json_path.with_suffix(".decrypted.txt")
if cache.exists():
return cache.read_text(encoding="utf-8")
# Heavy imports only when actually fetching; the pure-replay path
# shouldn't pay for httpx + pycryptodome import time.
import httpx
from Crypto.Cipher import AES
try:
with httpx.Client(timeout=60.0) as c:
r = c.get(url)
r.raise_for_status()
blob = r.content
except httpx.HTTPError as e:
print(f" encrypted-external fetch failed for {json_path.name}: {e}")
return None
if len(blob) < 16:
print(f" encrypted-external blob too short ({len(blob)} B): {json_path.name}")
return None
ct, tag = blob[:-16], blob[-16:]
try:
cipher = AES.new(bytes.fromhex(key_hex), AES.MODE_GCM, nonce=bytes.fromhex(iv_hex))
gz = cipher.decrypt_and_verify(ct, tag)
plain_bytes = gzip.decompress(gz)
except Exception as e: # noqa: BLE001 — any crypto / gzip failure
print(f" encrypted-external decrypt failed for {json_path.name}: {e}")
return None
plain = plain_bytes.decode("utf-8", errors="replace")
try:
cache.write_text(plain, encoding="utf-8")
except OSError:
pass # caching is best-effort; skip if we can't write
return plain
def split_plaintext_by_doctype(plain: str) -> Iterator[tuple[str, str]]:
"""Walk a multi-doc plaintext op-stream and yield ``(doc_label, sub_text)``
per inner document.
Pro 2.x's encrypted blob bundles N FOOTPRINTs + 1 PCB (or N SYMBOLs +
1 SCH for schematic blobs). Each inner doc starts with a fresh
``["DOCTYPE", "<KIND>", "<version>"]`` line. We split on those.
The label is the HEAD op's `uuid` field if present, else
``<doctype_kind>_<index>``.
"""
cur_lines: list[str] = []
cur_uuid: str | None = None
cur_kind: str | None = None
idx = 0
def flush() -> tuple[str, str] | None:
nonlocal cur_lines, cur_uuid, cur_kind, idx
if not cur_lines:
return None
label = cur_uuid or f"{(cur_kind or 'doc').lower()}_{idx}"
idx += 1
text = "\n".join(cur_lines)
cur_lines = []
cur_uuid = None
cur_kind = None
return (label, text)
for line in plain.splitlines():
stripped = line.strip()
if not stripped:
continue
try:
arr = json.loads(stripped)
except json.JSONDecodeError:
cur_lines.append(line)
continue
if not isinstance(arr, list) or not arr:
continue
if arr[0] == "DOCTYPE":
# Boundary: flush previous doc (if any), start new
prev = flush()
if prev is not None:
yield prev
cur_kind = arr[1] if len(arr) > 1 else None
cur_lines.append(line)
continue
if arr[0] == "HEAD" and len(arr) > 1 and isinstance(arr[1], dict):
cur_uuid = arr[1].get("uuid") or cur_uuid
cur_lines.append(line)
last = flush()
if last is not None:
yield last
def write_pro2_doc(
json_path: Path,
*,