"""Validate all data/raw///metadata.json against project.schema.json. Usage: uv run python scripts/validate.py # 校验全部 uv run python scripts/validate.py data/raw/oshwhub # 指定子目录 uv run python scripts/validate.py --check-files # 也核对本地文件是否存在+sha256 一致 """ from __future__ import annotations import argparse import hashlib import json import sys from pathlib import Path import jsonschema REPO = Path(__file__).resolve().parent.parent SCHEMA_PATH = REPO / "schemas" / "project.schema.json" def iter_metadata(roots: list[Path]): for root in roots: for meta in root.rglob("metadata.json"): yield meta def sha256_of(path: Path, chunk: int = 1 << 15) -> str: h = hashlib.sha256() with open(path, "rb") as f: while True: b = f.read(chunk) if not b: break h.update(b) return h.hexdigest() def check_files(meta_path: Path, meta: dict) -> list[str]: """Return list of errors for the file-presence / hash check.""" errs: list[str] = [] proj_dir = meta_path.parent for f in meta.get("files", []): rel = f.get("path") if not rel: continue # URL-only entry, ok p = proj_dir / rel if not p.exists(): errs.append(f"missing file: {rel}") continue want = f.get("sha256") if want: got = sha256_of(p) if got != want: errs.append(f"sha256 mismatch for {rel}: {got} != {want}") return errs def main(argv: list[str] | None = None) -> int: ap = argparse.ArgumentParser(description="schema + file integrity check") ap.add_argument( "roots", nargs="*", type=Path, default=[REPO / "data" / "raw"], help="目录,递归找 metadata.json(默认 data/raw)", ) ap.add_argument("--check-files", action="store_true", help="验证本地文件存在且 sha256 一致") args = ap.parse_args(argv) schema = json.loads(SCHEMA_PATH.read_text()) validator = jsonschema.Draft202012Validator(schema) ok = 0 bad = 0 for meta_path in iter_metadata(args.roots): meta = json.loads(meta_path.read_text()) rel = meta_path.relative_to(REPO) errors = [f"schema: {e.message} at {'/'.join(map(str, e.path))}" for e in validator.iter_errors(meta)] if args.check_files: errors += [f"file: {m}" for m in check_files(meta_path, meta)] if errors: bad += 1 print(f"FAIL {rel}") for e in errors: print(f" - {e}") else: ok += 1 total = ok + bad print(f"\n{ok}/{total} passed") return 0 if bad == 0 else 1 if __name__ == "__main__": raise SystemExit(main())