Why: - schema 必须能自动校验,否则后续放量无法防腐。现在 scripts/validate.py 对全部 metadata.json 做两层检查(schema + 本地文件 sha256),跑一次 即可对全量数据签收;10/10 项目已通过。 - docs/sources/oshwhub.md 之前把 fs-web-stream.jlc.com 标为"工程源待查", 排查后确认那些 URL 全部是嘉立创服务侧栏/推广图标,与项目无关。 image.lceda.cn/attachments/ 是项目附件的唯一入口,现在调研文档闭合。 What: - scripts/validate.py: jsonschema 校验 + optional --check-files 核 sha256 - pyproject.toml: 加 jsonschema>=4.26 依赖 - docs/sources/oshwhub.md: fs-web-stream 归类为推广资源(已排除),附 context 证据 - log.md: 本次会话记录 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
100 lines
2.8 KiB
Python
100 lines
2.8 KiB
Python
"""Validate all data/raw/<source>/<project>/metadata.json against project.schema.json.
|
||
|
||
Usage:
|
||
uv run python scripts/validate.py # 校验全部
|
||
uv run python scripts/validate.py data/raw/oshwhub # 指定子目录
|
||
uv run python scripts/validate.py --check-files # 也核对本地文件是否存在+sha256 一致
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import hashlib
|
||
import json
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
import jsonschema
|
||
|
||
REPO = Path(__file__).resolve().parent.parent
|
||
SCHEMA_PATH = REPO / "schemas" / "project.schema.json"
|
||
|
||
|
||
def iter_metadata(roots: list[Path]):
|
||
for root in roots:
|
||
for meta in root.rglob("metadata.json"):
|
||
yield meta
|
||
|
||
|
||
def sha256_of(path: Path, chunk: int = 1 << 15) -> str:
|
||
h = hashlib.sha256()
|
||
with open(path, "rb") as f:
|
||
while True:
|
||
b = f.read(chunk)
|
||
if not b:
|
||
break
|
||
h.update(b)
|
||
return h.hexdigest()
|
||
|
||
|
||
def check_files(meta_path: Path, meta: dict) -> list[str]:
|
||
"""Return list of errors for the file-presence / hash check."""
|
||
errs: list[str] = []
|
||
proj_dir = meta_path.parent
|
||
for f in meta.get("files", []):
|
||
rel = f.get("path")
|
||
if not rel:
|
||
continue # URL-only entry, ok
|
||
p = proj_dir / rel
|
||
if not p.exists():
|
||
errs.append(f"missing file: {rel}")
|
||
continue
|
||
want = f.get("sha256")
|
||
if want:
|
||
got = sha256_of(p)
|
||
if got != want:
|
||
errs.append(f"sha256 mismatch for {rel}: {got} != {want}")
|
||
return errs
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
ap = argparse.ArgumentParser(description="schema + file integrity check")
|
||
ap.add_argument(
|
||
"roots",
|
||
nargs="*",
|
||
type=Path,
|
||
default=[REPO / "data" / "raw"],
|
||
help="目录,递归找 metadata.json(默认 data/raw)",
|
||
)
|
||
ap.add_argument("--check-files", action="store_true", help="验证本地文件存在且 sha256 一致")
|
||
args = ap.parse_args(argv)
|
||
|
||
schema = json.loads(SCHEMA_PATH.read_text())
|
||
validator = jsonschema.Draft202012Validator(schema)
|
||
|
||
ok = 0
|
||
bad = 0
|
||
for meta_path in iter_metadata(args.roots):
|
||
meta = json.loads(meta_path.read_text())
|
||
rel = meta_path.relative_to(REPO)
|
||
|
||
errors = [f"schema: {e.message} at {'/'.join(map(str, e.path))}" for e in validator.iter_errors(meta)]
|
||
if args.check_files:
|
||
errors += [f"file: {m}" for m in check_files(meta_path, meta)]
|
||
|
||
if errors:
|
||
bad += 1
|
||
print(f"FAIL {rel}")
|
||
for e in errors:
|
||
print(f" - {e}")
|
||
else:
|
||
ok += 1
|
||
|
||
total = ok + bad
|
||
print(f"\n{ok}/{total} passed")
|
||
return 0 if bad == 0 else 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|