Files
PastpaperMaster/backend/batch_import.py
Zhao 9c09944c96 feat: expandable previews, KaTeX rendering, variant speedup, batch import
- Analytics/Similar: expandable question preview with KaTeX rendering
- KaTeXRenderer: auto markdown-to-HTML (code blocks, tables, bold), auto Unicode→LaTeX
- ErrorBook: full question text rendering instead of truncated preview
- Variant: remove hint/solution from generation (faster), async, fix null crash
- Grading: add max_tokens limit
- JSON parser: robust multi-layer repair + JSONDecodeError retry
- Extraction prompt: enforce LaTeX notation for math
- Upload: redirect to home instead of blank paper page
- ProcessingBanner: add ETA time estimate + percentage
- Batch import script + handoff guide for team

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-24 22:41:57 +09:00

324 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
批量导入试卷到 PastPaper Master
================================
用法:
# 导入单份试卷
python batch_import.py /path/to/paper.pdf --course COMP2211 --year 2024 --term spring --exam midterm
# 导入单份试卷 + 答案
python batch_import.py /path/to/paper.pdf --answer /path/to/answer.pdf --course COMP2211 --year 2024 --term spring --exam midterm
# 批量导入整个目录(自动从文件名解析元数据)
python batch_import.py /path/to/papers_dir/ --batch
# 批量导入,限制并发数(默认 1避免 API 限流)
python batch_import.py /path/to/papers_dir/ --batch --concurrency 2
# 试运行(只打印会导入什么,不实际执行)
python batch_import.py /path/to/papers_dir/ --batch --dry-run
目录结构约定 (--batch 模式):
papers_dir/
├── COMP2211/
│ ├── 2024_spring_midterm.pdf
│ ├── 2024_spring_midterm_answer.pdf (可选,自动匹配)
│ ├── 2024_fall_final.pdf
│ └── 2023_spring_midterm.pdf
├── MATH1014/
│ ├── 2024_spring_midterm.pdf
│ └── ...
└── ...
文件名格式: {year}_{term}_{exam_type}.pdf
答案文件名: {year}_{term}_{exam_type}_answer.pdf (自动匹配)
环境:
需要项目根目录的 .env 文件(包含 Supabase 和 LLM API keys
在 backend/ 目录下运行: python batch_import.py ...
"""
import argparse
import asyncio
import os
import re
import sys
import time
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from dotenv import load_dotenv
load_dotenv(os.path.join(os.path.dirname(__file__), "..", ".env"))
from app.services.supabase_client import get_supabase
from app.services.paper_processor import process_paper
# ── 服务账号 user_id批量导入用不关联具体用户 ──
BATCH_USER_ID = "00000000-0000-0000-0000-000000000000"
def parse_filename(filename: str) -> dict | None:
"""
从文件名解析元数据。支持格式:
- 2024_spring_midterm.pdf
- 2024-fall-final.pdf
- 2024s_mid.pdf
- (COMP2211)[2024](s)midterm~xxx.pdf (scraper 格式)
"""
base = Path(filename).stem.lower()
# 去掉 _answer 后缀
if base.endswith("_answer") or base.endswith("_ans") or base.endswith("_solution"):
return None # 这是答案文件,不单独导入
result = {}
# Year: 4位数字
year_match = re.search(r'(20[1-2]\d)', base)
if year_match:
result["year"] = int(year_match.group(1))
# Term
if re.search(r'spring|spr|\(s\)|_s_', base):
result["term"] = "spring"
elif re.search(r'fall|aut|\(f\)|_f_', base):
result["term"] = "fall"
elif re.search(r'summer|sum', base):
result["term"] = "summer"
# Exam type
if re.search(r'mid', base):
result["exam_type"] = "midterm"
elif re.search(r'final|fin', base):
result["exam_type"] = "final"
elif re.search(r'quiz', base):
result["exam_type"] = "quiz"
if "year" in result and "term" in result and "exam_type" in result:
return result
return None
def find_answer_file(paper_path: Path) -> Path | None:
"""查找对应的答案文件"""
stem = paper_path.stem
parent = paper_path.parent
for suffix in ["_answer", "_ans", "_solution"]:
candidate = parent / f"{stem}{suffix}.pdf"
if candidate.exists():
return candidate
return None
def scan_directory(dir_path: Path) -> list[dict]:
"""
扫描目录,返回待导入的试卷列表。
期望结构: dir_path/COURSE_CODE/year_term_examtype.pdf
"""
items = []
for course_dir in sorted(dir_path.iterdir()):
if not course_dir.is_dir():
continue
course_code = course_dir.name.upper()
for pdf in sorted(course_dir.glob("*.pdf")):
meta = parse_filename(pdf.name)
if meta is None:
continue
answer_file = find_answer_file(pdf)
items.append({
"paper_path": pdf,
"answer_path": answer_file,
"course_code": course_code,
**meta,
})
return items
def check_duplicate(sb, course_code: str, year: int, term: str, exam_type: str) -> bool:
"""检查是否已存在相同试卷"""
existing = (
sb.table("papers")
.select("id")
.eq("course_code", course_code)
.eq("year", year)
.eq("term", term)
.eq("exam_type", exam_type)
.in_("status", ["ready", "processing"])
.execute()
.data
)
return len(existing) > 0
async def import_single(
paper_path: Path,
answer_path: Path | None,
course_code: str,
year: int,
term: str,
exam_type: str,
skip_duplicates: bool = True,
) -> str | None:
"""导入单份试卷,返回 paper_id 或 None跳过"""
sb = get_supabase()
# 查重
if skip_duplicates and check_duplicate(sb, course_code, year, term, exam_type):
print(f" SKIP (duplicate): {course_code} {year} {term} {exam_type}")
return None
# 读文件
paper_bytes = paper_path.read_bytes()
answer_bytes = answer_path.read_bytes() if answer_path else None
# 创建 DB 记录
record = sb.table("papers").insert({
"user_id": BATCH_USER_ID,
"course_code": course_code,
"year": year,
"term": term,
"exam_type": exam_type,
"paper_file_url": "",
"answer_file_url": None,
"status": "processing",
}).execute()
paper_id = record.data[0]["id"]
# 上传到 Supabase Storage
storage_path = f"{course_code}/{year}_{term}_{exam_type}"
try:
sb.storage.from_("papers").upload(
f"{storage_path}/paper.pdf", paper_bytes,
file_options={"content-type": "application/pdf", "upsert": "true"},
)
paper_url = sb.storage.from_("papers").get_public_url(f"{storage_path}/paper.pdf")
update = {"paper_file_url": paper_url}
if answer_bytes:
sb.storage.from_("papers").upload(
f"{storage_path}/answer.pdf", answer_bytes,
file_options={"content-type": "application/pdf", "upsert": "true"},
)
update["answer_file_url"] = sb.storage.from_("papers").get_public_url(f"{storage_path}/answer.pdf")
sb.table("papers").update(update).eq("id", paper_id).execute()
except Exception as e:
print(f" WARNING: Storage upload failed: {e}")
# 处理试卷Vision 提取 + AI trio
print(f" Processing {course_code} {year} {term} {exam_type} ...")
t0 = time.time()
try:
await process_paper(paper_id, paper_bytes, answer_bytes)
elapsed = time.time() - t0
print(f" DONE in {elapsed:.0f}s -> {paper_id[:8]}")
except Exception as e:
elapsed = time.time() - t0
print(f" ERROR after {elapsed:.0f}s: {e}")
sb.table("papers").update({"status": "error", "processing_step": str(e)[:200]}).eq("id", paper_id).execute()
return paper_id
async def batch_import(dir_path: Path, concurrency: int = 1, dry_run: bool = False):
"""批量导入目录下所有试卷"""
items = scan_directory(dir_path)
if not items:
print(f"No papers found in {dir_path}")
print("Expected structure: dir/COURSE_CODE/year_term_examtype.pdf")
return
print(f"Found {len(items)} papers to import:\n")
for item in items:
ans_label = f" + answer" if item["answer_path"] else ""
print(f" {item['course_code']} {item['year']} {item['term']} {item['exam_type']}{ans_label}")
print(f" <- {item['paper_path']}")
if dry_run:
print(f"\n[DRY RUN] Would import {len(items)} papers. Exiting.")
return
print(f"\nStarting import (concurrency={concurrency})...\n")
semaphore = asyncio.Semaphore(concurrency)
results = {"ok": 0, "skip": 0, "error": 0}
async def process_one(item):
async with semaphore:
try:
pid = await import_single(
paper_path=item["paper_path"],
answer_path=item["answer_path"],
course_code=item["course_code"],
year=item["year"],
term=item["term"],
exam_type=item["exam_type"],
)
if pid:
results["ok"] += 1
else:
results["skip"] += 1
except Exception as e:
results["error"] += 1
print(f" FATAL: {item['course_code']} {item['year']} - {e}")
# 串行或并发处理
if concurrency == 1:
for item in items:
await process_one(item)
else:
await asyncio.gather(*[process_one(item) for item in items])
print(f"\n{'='*50}")
print(f"Import complete: {results['ok']} success, {results['skip']} skipped, {results['error']} errors")
def main():
parser = argparse.ArgumentParser(description="Batch import papers to PastPaper Master")
parser.add_argument("path", help="Path to PDF file or directory (with --batch)")
parser.add_argument("--answer", help="Path to answer PDF (single file mode)")
parser.add_argument("--course", help="Course code (e.g. COMP2211)")
parser.add_argument("--year", type=int, help="Year (e.g. 2024)")
parser.add_argument("--term", choices=["spring", "summer", "fall"], help="Term")
parser.add_argument("--exam", choices=["midterm", "final", "quiz"], help="Exam type")
parser.add_argument("--batch", action="store_true", help="Batch import from directory")
parser.add_argument("--concurrency", type=int, default=1, help="Max concurrent imports (default: 1)")
parser.add_argument("--dry-run", action="store_true", help="Print what would be imported without doing it")
args = parser.parse_args()
path = Path(args.path)
if args.batch:
if not path.is_dir():
print(f"Error: {path} is not a directory")
sys.exit(1)
asyncio.run(batch_import(path, concurrency=args.concurrency, dry_run=args.dry_run))
else:
# Single file mode
if not path.is_file():
print(f"Error: {path} is not a file")
sys.exit(1)
if not all([args.course, args.year, args.term, args.exam]):
print("Error: --course, --year, --term, --exam are required for single file import")
sys.exit(1)
answer_path = Path(args.answer) if args.answer else None
result = asyncio.run(import_single(
paper_path=path,
answer_path=answer_path,
course_code=args.course.upper(),
year=args.year,
term=args.term,
exam_type=args.exam,
))
if result:
print(f"\nPaper ID: {result}")
if __name__ == "__main__":
main()