Files
PastpaperMaster/backend/import_course_manifest.py
Zhao 7a09167261 Initial commit: PastPaper Master full stack
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-21 12:27:47 +07:00

241 lines
7.2 KiB
Python

"""Import a canonical course manifest into Supabase-backed papers."""
from __future__ import annotations
import argparse
import asyncio
import json
from pathlib import Path
from typing import Any
from app.services.paper_processor import process_paper
from app.services.supabase_client import get_supabase
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Import a canonical course paper manifest into Supabase."
)
parser.add_argument(
"--manifest",
type=Path,
required=True,
help="Path to the manifest JSON file.",
)
parser.add_argument(
"--papers-root",
type=Path,
required=True,
help="Root folder that contains the course PDF files referenced by the manifest.",
)
parser.add_argument(
"--user-id",
required=False,
help="Existing auth.users UUID used as the owner of imported course-library rows.",
)
parser.add_argument(
"--course-code",
help="Optional filter to only import entries from one course.",
)
parser.add_argument(
"--exam-key",
action="append",
dest="exam_keys",
default=[],
help="Optional exam_key filter. Repeat the flag to import multiple entries.",
)
parser.add_argument(
"--process",
action="store_true",
help="Run the full paper processing pipeline after the files are uploaded.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be imported without uploading or writing database rows.",
)
return parser.parse_args()
def load_manifest(path: Path) -> list[dict[str, Any]]:
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("Manifest must be a JSON array.")
return data
def should_import(entry: dict[str, Any], args: argparse.Namespace) -> bool:
if args.course_code and entry.get("course_code") != args.course_code:
return False
if args.exam_keys and entry.get("exam_key") not in set(args.exam_keys):
return False
return bool(entry.get("importable"))
def resolve_file_path(root: Path, filename: str | None) -> Path | None:
if not filename:
return None
direct = root / filename
if direct.exists():
return direct
all_files = [candidate for candidate in root.iterdir() if candidate.is_file()]
def normalize(name: str) -> str:
return name.replace(" (1)", "")
target_name = normalize(filename)
normalized = [candidate for candidate in all_files if normalize(candidate.name) == target_name]
if len(normalized) == 1:
return normalized[0]
path = Path(filename)
normalized_stem = normalize(path.stem)
suffix = path.suffix
stem_matches = [
candidate
for candidate in all_files
if candidate.suffix == suffix and normalize(candidate.stem) == normalized_stem
]
if len(stem_matches) == 1:
return stem_matches[0]
return None
def read_file_bytes(root: Path, filename: str | None) -> bytes | None:
if not filename:
return None
path = resolve_file_path(root, filename)
if path is None or not path.exists():
raise FileNotFoundError(f"Referenced file does not exist under {root}: {filename}")
return path.read_bytes()
def build_storage_path(entry: dict[str, Any], kind: str) -> str:
exam_key = entry["exam_key"]
return f"course-library/{entry['course_code']}/{exam_key}/{kind}.pdf"
def upsert_paper_record(
entry: dict[str, Any],
user_id: str | None,
paper_url: str,
answer_url: str | None,
) -> str:
sb = get_supabase()
payload = {
"user_id": user_id,
"course_code": entry["course_code"],
"year": entry["year"],
"term": entry["term"],
"exam_type": entry["exam_type"],
"part_label": entry.get("part_label"),
"paper_file_url": paper_url,
"answer_file_url": answer_url,
"status": "processing",
"source_kind": "course_library",
"source_exam_key": entry["exam_key"],
"source_question_filename": entry.get("question_pdf"),
"source_answer_filename": entry.get("primary_answer_pdf"),
}
existing = (
sb.table("papers")
.select("id")
.eq("source_kind", "course_library")
.eq("source_exam_key", entry["exam_key"])
.limit(1)
.execute()
.data
)
if existing:
paper_id = existing[0]["id"]
sb.table("papers").update(payload).eq("id", paper_id).execute()
return paper_id
created = sb.table("papers").insert(payload).execute().data
return created[0]["id"]
def reset_existing_processed_data(paper_id: str) -> None:
sb = get_supabase()
sb.table("paper_questions").delete().eq("paper_id", paper_id).execute()
sb.table("papers").update(
{
"status": "processing",
"error_message": None,
"paper_extracted_text": None,
"answer_extracted_text": None,
"total_score": None,
"question_count": None,
"topics_summary": None,
"difficulty_level": None,
}
).eq("id", paper_id).execute()
async def import_entry(
entry: dict[str, Any],
args: argparse.Namespace,
) -> None:
paper_bytes = read_file_bytes(args.papers_root, entry.get("question_pdf"))
answer_bytes = read_file_bytes(args.papers_root, entry.get("primary_answer_pdf"))
if paper_bytes is None:
raise ValueError(f"Importable entry is missing question PDF: {entry['exam_key']}")
if args.dry_run:
print(
f"[dry-run] {entry['exam_key']}: "
f"question={entry.get('question_pdf')} answer={entry.get('primary_answer_pdf')}"
)
return
sb = get_supabase()
paper_path = build_storage_path(entry, "paper")
sb.storage.from_("papers").upload(
paper_path,
paper_bytes,
file_options={"content-type": "application/pdf", "upsert": "true"},
)
paper_url = sb.storage.from_("papers").get_public_url(paper_path)
answer_url = None
if answer_bytes:
answer_path = build_storage_path(entry, "answer")
sb.storage.from_("papers").upload(
answer_path,
answer_bytes,
file_options={"content-type": "application/pdf", "upsert": "true"},
)
answer_url = sb.storage.from_("papers").get_public_url(answer_path)
paper_id = upsert_paper_record(entry, args.user_id, paper_url, answer_url)
print(f"Imported metadata for {entry['exam_key']} -> paper_id={paper_id}")
if args.process:
reset_existing_processed_data(paper_id)
await process_paper(paper_id, paper_bytes, answer_bytes)
print(f"Processed {entry['exam_key']}")
async def main() -> None:
args = parse_args()
manifest = load_manifest(args.manifest)
entries = [entry for entry in manifest if should_import(entry, args)]
if not entries:
print("No manifest entries matched the provided filters.")
return
print(f"Preparing to import {len(entries)} manifest entries.")
for entry in entries:
await import_entry(entry, args)
if __name__ == "__main__":
asyncio.run(main())