"""EPRO2 line parser. EPRO2 is EasyEDA Pro 3.x's event-sourced project source format. After AES-GCM decryption + gunzip (handled by the crawler), each newline-separated line has the shape: {"type":"X","ticket":N,"id":"..."}||{payload JSON}||{optional extra}| Field separator is ``||``; line terminator is a single trailing ``|`` (NOT a field separator — easy to mis-parse, see docs/sources/easyeda_pro_source.md §3.1). This module only does line-level parsing (raw → ``Op``). State semantics (create / update / delete) live in ``replay.py``. """ from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Iterator @dataclass(slots=True) class Op: """A single EPRO2 message after raw parsing.""" type: str ticket: int | None id: str | None payload: dict | None extra: dict | None raw: bytes # original line, for debugging / round-trip class Epro2ParseError(ValueError): """Raised when a line cannot be parsed.""" def parse_line(ln: bytes) -> Op: """Parse one EPRO2 line. Raises ``Epro2ParseError`` on a malformed head.""" stripped = ln.strip().rstrip(b"|") if not stripped: raise Epro2ParseError("empty line") parts = stripped.split(b"||") try: head = json.loads(parts[0]) except json.JSONDecodeError as e: raise Epro2ParseError( f"bad head JSON at byte {e.pos}: {parts[0][:160]!r}" ) from e payload = _maybe_json(parts[1]) if len(parts) >= 2 else None extra = _maybe_json(parts[2]) if len(parts) >= 3 else None return Op( type=str(head.get("type", "?")), ticket=head.get("ticket"), id=head.get("id"), payload=payload if isinstance(payload, dict) else None, extra=extra if isinstance(extra, dict) else None, raw=ln, ) def _maybe_json(b: bytes) -> object | None: """JSON-decode if non-empty; tolerate malformed payloads (return None).""" if not b: return None try: return json.loads(b) except json.JSONDecodeError: return None def iter_ops(path: Path | str) -> Iterator[Op]: """Yield ``Op`` records from a ``.epro2`` file. Lines that fail to parse are skipped; structural failures (file not found, encoding error) propagate. """ p = Path(path) with p.open("rb") as f: for ln in f: ln = ln.rstrip(b"\n") if not ln.strip(): continue try: yield parse_line(ln) except Epro2ParseError: continue