- Analytics/Similar: expandable question preview with KaTeX rendering - KaTeXRenderer: auto markdown-to-HTML (code blocks, tables, bold), auto Unicode→LaTeX - ErrorBook: full question text rendering instead of truncated preview - Variant: remove hint/solution from generation (faster), async, fix null crash - Grading: add max_tokens limit - JSON parser: robust multi-layer repair + JSONDecodeError retry - Extraction prompt: enforce LaTeX notation for math - Upload: redirect to home instead of blank paper page - ProcessingBanner: add ETA time estimate + percentage - Batch import script + handoff guide for team Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
324 lines
11 KiB
Python
324 lines
11 KiB
Python
"""
|
||
批量导入试卷到 PastPaper Master
|
||
================================
|
||
|
||
用法:
|
||
# 导入单份试卷
|
||
python batch_import.py /path/to/paper.pdf --course COMP2211 --year 2024 --term spring --exam midterm
|
||
|
||
# 导入单份试卷 + 答案
|
||
python batch_import.py /path/to/paper.pdf --answer /path/to/answer.pdf --course COMP2211 --year 2024 --term spring --exam midterm
|
||
|
||
# 批量导入整个目录(自动从文件名解析元数据)
|
||
python batch_import.py /path/to/papers_dir/ --batch
|
||
|
||
# 批量导入,限制并发数(默认 1,避免 API 限流)
|
||
python batch_import.py /path/to/papers_dir/ --batch --concurrency 2
|
||
|
||
# 试运行(只打印会导入什么,不实际执行)
|
||
python batch_import.py /path/to/papers_dir/ --batch --dry-run
|
||
|
||
目录结构约定 (--batch 模式):
|
||
papers_dir/
|
||
├── COMP2211/
|
||
│ ├── 2024_spring_midterm.pdf
|
||
│ ├── 2024_spring_midterm_answer.pdf (可选,自动匹配)
|
||
│ ├── 2024_fall_final.pdf
|
||
│ └── 2023_spring_midterm.pdf
|
||
├── MATH1014/
|
||
│ ├── 2024_spring_midterm.pdf
|
||
│ └── ...
|
||
└── ...
|
||
|
||
文件名格式: {year}_{term}_{exam_type}.pdf
|
||
答案文件名: {year}_{term}_{exam_type}_answer.pdf (自动匹配)
|
||
|
||
环境:
|
||
需要项目根目录的 .env 文件(包含 Supabase 和 LLM API keys)
|
||
在 backend/ 目录下运行: python batch_import.py ...
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, os.path.dirname(__file__))
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv(os.path.join(os.path.dirname(__file__), "..", ".env"))
|
||
|
||
from app.services.supabase_client import get_supabase
|
||
from app.services.paper_processor import process_paper
|
||
|
||
# ── 服务账号 user_id(批量导入用,不关联具体用户) ──
|
||
BATCH_USER_ID = "00000000-0000-0000-0000-000000000000"
|
||
|
||
|
||
def parse_filename(filename: str) -> dict | None:
|
||
"""
|
||
从文件名解析元数据。支持格式:
|
||
- 2024_spring_midterm.pdf
|
||
- 2024-fall-final.pdf
|
||
- 2024s_mid.pdf
|
||
- (COMP2211)[2024](s)midterm~xxx.pdf (scraper 格式)
|
||
"""
|
||
base = Path(filename).stem.lower()
|
||
|
||
# 去掉 _answer 后缀
|
||
if base.endswith("_answer") or base.endswith("_ans") or base.endswith("_solution"):
|
||
return None # 这是答案文件,不单独导入
|
||
|
||
result = {}
|
||
|
||
# Year: 4位数字
|
||
year_match = re.search(r'(20[1-2]\d)', base)
|
||
if year_match:
|
||
result["year"] = int(year_match.group(1))
|
||
|
||
# Term
|
||
if re.search(r'spring|spr|\(s\)|_s_', base):
|
||
result["term"] = "spring"
|
||
elif re.search(r'fall|aut|\(f\)|_f_', base):
|
||
result["term"] = "fall"
|
||
elif re.search(r'summer|sum', base):
|
||
result["term"] = "summer"
|
||
|
||
# Exam type
|
||
if re.search(r'mid', base):
|
||
result["exam_type"] = "midterm"
|
||
elif re.search(r'final|fin', base):
|
||
result["exam_type"] = "final"
|
||
elif re.search(r'quiz', base):
|
||
result["exam_type"] = "quiz"
|
||
|
||
if "year" in result and "term" in result and "exam_type" in result:
|
||
return result
|
||
return None
|
||
|
||
|
||
def find_answer_file(paper_path: Path) -> Path | None:
|
||
"""查找对应的答案文件"""
|
||
stem = paper_path.stem
|
||
parent = paper_path.parent
|
||
for suffix in ["_answer", "_ans", "_solution"]:
|
||
candidate = parent / f"{stem}{suffix}.pdf"
|
||
if candidate.exists():
|
||
return candidate
|
||
return None
|
||
|
||
|
||
def scan_directory(dir_path: Path) -> list[dict]:
|
||
"""
|
||
扫描目录,返回待导入的试卷列表。
|
||
期望结构: dir_path/COURSE_CODE/year_term_examtype.pdf
|
||
"""
|
||
items = []
|
||
for course_dir in sorted(dir_path.iterdir()):
|
||
if not course_dir.is_dir():
|
||
continue
|
||
course_code = course_dir.name.upper()
|
||
|
||
for pdf in sorted(course_dir.glob("*.pdf")):
|
||
meta = parse_filename(pdf.name)
|
||
if meta is None:
|
||
continue
|
||
|
||
answer_file = find_answer_file(pdf)
|
||
items.append({
|
||
"paper_path": pdf,
|
||
"answer_path": answer_file,
|
||
"course_code": course_code,
|
||
**meta,
|
||
})
|
||
return items
|
||
|
||
|
||
def check_duplicate(sb, course_code: str, year: int, term: str, exam_type: str) -> bool:
|
||
"""检查是否已存在相同试卷"""
|
||
existing = (
|
||
sb.table("papers")
|
||
.select("id")
|
||
.eq("course_code", course_code)
|
||
.eq("year", year)
|
||
.eq("term", term)
|
||
.eq("exam_type", exam_type)
|
||
.in_("status", ["ready", "processing"])
|
||
.execute()
|
||
.data
|
||
)
|
||
return len(existing) > 0
|
||
|
||
|
||
async def import_single(
|
||
paper_path: Path,
|
||
answer_path: Path | None,
|
||
course_code: str,
|
||
year: int,
|
||
term: str,
|
||
exam_type: str,
|
||
skip_duplicates: bool = True,
|
||
) -> str | None:
|
||
"""导入单份试卷,返回 paper_id 或 None(跳过)"""
|
||
sb = get_supabase()
|
||
|
||
# 查重
|
||
if skip_duplicates and check_duplicate(sb, course_code, year, term, exam_type):
|
||
print(f" SKIP (duplicate): {course_code} {year} {term} {exam_type}")
|
||
return None
|
||
|
||
# 读文件
|
||
paper_bytes = paper_path.read_bytes()
|
||
answer_bytes = answer_path.read_bytes() if answer_path else None
|
||
|
||
# 创建 DB 记录
|
||
record = sb.table("papers").insert({
|
||
"user_id": BATCH_USER_ID,
|
||
"course_code": course_code,
|
||
"year": year,
|
||
"term": term,
|
||
"exam_type": exam_type,
|
||
"paper_file_url": "",
|
||
"answer_file_url": None,
|
||
"status": "processing",
|
||
}).execute()
|
||
paper_id = record.data[0]["id"]
|
||
|
||
# 上传到 Supabase Storage
|
||
storage_path = f"{course_code}/{year}_{term}_{exam_type}"
|
||
try:
|
||
sb.storage.from_("papers").upload(
|
||
f"{storage_path}/paper.pdf", paper_bytes,
|
||
file_options={"content-type": "application/pdf", "upsert": "true"},
|
||
)
|
||
paper_url = sb.storage.from_("papers").get_public_url(f"{storage_path}/paper.pdf")
|
||
update = {"paper_file_url": paper_url}
|
||
|
||
if answer_bytes:
|
||
sb.storage.from_("papers").upload(
|
||
f"{storage_path}/answer.pdf", answer_bytes,
|
||
file_options={"content-type": "application/pdf", "upsert": "true"},
|
||
)
|
||
update["answer_file_url"] = sb.storage.from_("papers").get_public_url(f"{storage_path}/answer.pdf")
|
||
|
||
sb.table("papers").update(update).eq("id", paper_id).execute()
|
||
except Exception as e:
|
||
print(f" WARNING: Storage upload failed: {e}")
|
||
|
||
# 处理试卷(Vision 提取 + AI trio)
|
||
print(f" Processing {course_code} {year} {term} {exam_type} ...")
|
||
t0 = time.time()
|
||
try:
|
||
await process_paper(paper_id, paper_bytes, answer_bytes)
|
||
elapsed = time.time() - t0
|
||
print(f" DONE in {elapsed:.0f}s -> {paper_id[:8]}")
|
||
except Exception as e:
|
||
elapsed = time.time() - t0
|
||
print(f" ERROR after {elapsed:.0f}s: {e}")
|
||
sb.table("papers").update({"status": "error", "processing_step": str(e)[:200]}).eq("id", paper_id).execute()
|
||
|
||
return paper_id
|
||
|
||
|
||
async def batch_import(dir_path: Path, concurrency: int = 1, dry_run: bool = False):
|
||
"""批量导入目录下所有试卷"""
|
||
items = scan_directory(dir_path)
|
||
|
||
if not items:
|
||
print(f"No papers found in {dir_path}")
|
||
print("Expected structure: dir/COURSE_CODE/year_term_examtype.pdf")
|
||
return
|
||
|
||
print(f"Found {len(items)} papers to import:\n")
|
||
for item in items:
|
||
ans_label = f" + answer" if item["answer_path"] else ""
|
||
print(f" {item['course_code']} {item['year']} {item['term']} {item['exam_type']}{ans_label}")
|
||
print(f" <- {item['paper_path']}")
|
||
|
||
if dry_run:
|
||
print(f"\n[DRY RUN] Would import {len(items)} papers. Exiting.")
|
||
return
|
||
|
||
print(f"\nStarting import (concurrency={concurrency})...\n")
|
||
|
||
semaphore = asyncio.Semaphore(concurrency)
|
||
results = {"ok": 0, "skip": 0, "error": 0}
|
||
|
||
async def process_one(item):
|
||
async with semaphore:
|
||
try:
|
||
pid = await import_single(
|
||
paper_path=item["paper_path"],
|
||
answer_path=item["answer_path"],
|
||
course_code=item["course_code"],
|
||
year=item["year"],
|
||
term=item["term"],
|
||
exam_type=item["exam_type"],
|
||
)
|
||
if pid:
|
||
results["ok"] += 1
|
||
else:
|
||
results["skip"] += 1
|
||
except Exception as e:
|
||
results["error"] += 1
|
||
print(f" FATAL: {item['course_code']} {item['year']} - {e}")
|
||
|
||
# 串行或并发处理
|
||
if concurrency == 1:
|
||
for item in items:
|
||
await process_one(item)
|
||
else:
|
||
await asyncio.gather(*[process_one(item) for item in items])
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"Import complete: {results['ok']} success, {results['skip']} skipped, {results['error']} errors")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Batch import papers to PastPaper Master")
|
||
parser.add_argument("path", help="Path to PDF file or directory (with --batch)")
|
||
parser.add_argument("--answer", help="Path to answer PDF (single file mode)")
|
||
parser.add_argument("--course", help="Course code (e.g. COMP2211)")
|
||
parser.add_argument("--year", type=int, help="Year (e.g. 2024)")
|
||
parser.add_argument("--term", choices=["spring", "summer", "fall"], help="Term")
|
||
parser.add_argument("--exam", choices=["midterm", "final", "quiz"], help="Exam type")
|
||
parser.add_argument("--batch", action="store_true", help="Batch import from directory")
|
||
parser.add_argument("--concurrency", type=int, default=1, help="Max concurrent imports (default: 1)")
|
||
parser.add_argument("--dry-run", action="store_true", help="Print what would be imported without doing it")
|
||
|
||
args = parser.parse_args()
|
||
path = Path(args.path)
|
||
|
||
if args.batch:
|
||
if not path.is_dir():
|
||
print(f"Error: {path} is not a directory")
|
||
sys.exit(1)
|
||
asyncio.run(batch_import(path, concurrency=args.concurrency, dry_run=args.dry_run))
|
||
else:
|
||
# Single file mode
|
||
if not path.is_file():
|
||
print(f"Error: {path} is not a file")
|
||
sys.exit(1)
|
||
if not all([args.course, args.year, args.term, args.exam]):
|
||
print("Error: --course, --year, --term, --exam are required for single file import")
|
||
sys.exit(1)
|
||
|
||
answer_path = Path(args.answer) if args.answer else None
|
||
result = asyncio.run(import_single(
|
||
paper_path=path,
|
||
answer_path=answer_path,
|
||
course_code=args.course.upper(),
|
||
year=args.year,
|
||
term=args.term,
|
||
exam_type=args.exam,
|
||
))
|
||
if result:
|
||
print(f"\nPaper ID: {result}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|