""" 批量导入试卷到 PastPaper Master ================================ 用法: # 导入单份试卷 python batch_import.py /path/to/paper.pdf --course COMP2211 --year 2024 --term spring --exam midterm # 导入单份试卷 + 答案 python batch_import.py /path/to/paper.pdf --answer /path/to/answer.pdf --course COMP2211 --year 2024 --term spring --exam midterm # 批量导入整个目录(自动从文件名解析元数据) python batch_import.py /path/to/papers_dir/ --batch # 批量导入,限制并发数(默认 1,避免 API 限流) python batch_import.py /path/to/papers_dir/ --batch --concurrency 2 # 试运行(只打印会导入什么,不实际执行) python batch_import.py /path/to/papers_dir/ --batch --dry-run 目录结构约定 (--batch 模式): papers_dir/ ├── COMP2211/ │ ├── 2024_spring_midterm.pdf │ ├── 2024_spring_midterm_answer.pdf (可选,自动匹配) │ ├── 2024_fall_final.pdf │ └── 2023_spring_midterm.pdf ├── MATH1014/ │ ├── 2024_spring_midterm.pdf │ └── ... └── ... 文件名格式: {year}_{term}_{exam_type}.pdf 答案文件名: {year}_{term}_{exam_type}_answer.pdf (自动匹配) 环境: 需要项目根目录的 .env 文件(包含 Supabase 和 LLM API keys) 在 backend/ 目录下运行: python batch_import.py ... """ import argparse import asyncio import os import re import sys import time from pathlib import Path sys.path.insert(0, os.path.dirname(__file__)) from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(__file__), "..", ".env")) from app.services.supabase_client import get_supabase from app.services.paper_processor import process_paper # ── 服务账号 user_id(批量导入用,不关联具体用户) ── BATCH_USER_ID = "00000000-0000-0000-0000-000000000000" def parse_filename(filename: str) -> dict | None: """ 从文件名解析元数据。支持格式: - 2024_spring_midterm.pdf - 2024-fall-final.pdf - 2024s_mid.pdf - (COMP2211)[2024](s)midterm~xxx.pdf (scraper 格式) """ base = Path(filename).stem.lower() # 去掉 _answer 后缀 if base.endswith("_answer") or base.endswith("_ans") or base.endswith("_solution"): return None # 这是答案文件,不单独导入 result = {} # Year: 4位数字 year_match = re.search(r'(20[1-2]\d)', base) if year_match: result["year"] = int(year_match.group(1)) # Term if re.search(r'spring|spr|\(s\)|_s_', base): result["term"] = "spring" elif re.search(r'fall|aut|\(f\)|_f_', base): result["term"] = "fall" elif re.search(r'summer|sum', base): result["term"] = "summer" # Exam type if re.search(r'mid', base): result["exam_type"] = "midterm" elif re.search(r'final|fin', base): result["exam_type"] = "final" elif re.search(r'quiz', base): result["exam_type"] = "quiz" if "year" in result and "term" in result and "exam_type" in result: return result return None def find_answer_file(paper_path: Path) -> Path | None: """查找对应的答案文件""" stem = paper_path.stem parent = paper_path.parent for suffix in ["_answer", "_ans", "_solution"]: candidate = parent / f"{stem}{suffix}.pdf" if candidate.exists(): return candidate return None def scan_directory(dir_path: Path) -> list[dict]: """ 扫描目录,返回待导入的试卷列表。 期望结构: dir_path/COURSE_CODE/year_term_examtype.pdf """ items = [] for course_dir in sorted(dir_path.iterdir()): if not course_dir.is_dir(): continue course_code = course_dir.name.upper() for pdf in sorted(course_dir.glob("*.pdf")): meta = parse_filename(pdf.name) if meta is None: continue answer_file = find_answer_file(pdf) items.append({ "paper_path": pdf, "answer_path": answer_file, "course_code": course_code, **meta, }) return items def check_duplicate(sb, course_code: str, year: int, term: str, exam_type: str) -> bool: """检查是否已存在相同试卷""" existing = ( sb.table("papers") .select("id") .eq("course_code", course_code) .eq("year", year) .eq("term", term) .eq("exam_type", exam_type) .in_("status", ["ready", "processing"]) .execute() .data ) return len(existing) > 0 async def import_single( paper_path: Path, answer_path: Path | None, course_code: str, year: int, term: str, exam_type: str, skip_duplicates: bool = True, ) -> str | None: """导入单份试卷,返回 paper_id 或 None(跳过)""" sb = get_supabase() # 查重 if skip_duplicates and check_duplicate(sb, course_code, year, term, exam_type): print(f" SKIP (duplicate): {course_code} {year} {term} {exam_type}") return None # 读文件 paper_bytes = paper_path.read_bytes() answer_bytes = answer_path.read_bytes() if answer_path else None # 创建 DB 记录 record = sb.table("papers").insert({ "user_id": BATCH_USER_ID, "course_code": course_code, "year": year, "term": term, "exam_type": exam_type, "paper_file_url": "", "answer_file_url": None, "status": "processing", }).execute() paper_id = record.data[0]["id"] # 上传到 Supabase Storage storage_path = f"{course_code}/{year}_{term}_{exam_type}" try: sb.storage.from_("papers").upload( f"{storage_path}/paper.pdf", paper_bytes, file_options={"content-type": "application/pdf", "upsert": "true"}, ) paper_url = sb.storage.from_("papers").get_public_url(f"{storage_path}/paper.pdf") update = {"paper_file_url": paper_url} if answer_bytes: sb.storage.from_("papers").upload( f"{storage_path}/answer.pdf", answer_bytes, file_options={"content-type": "application/pdf", "upsert": "true"}, ) update["answer_file_url"] = sb.storage.from_("papers").get_public_url(f"{storage_path}/answer.pdf") sb.table("papers").update(update).eq("id", paper_id).execute() except Exception as e: print(f" WARNING: Storage upload failed: {e}") # 处理试卷(Vision 提取 + AI trio) print(f" Processing {course_code} {year} {term} {exam_type} ...") t0 = time.time() try: await process_paper(paper_id, paper_bytes, answer_bytes) elapsed = time.time() - t0 print(f" DONE in {elapsed:.0f}s -> {paper_id[:8]}") except Exception as e: elapsed = time.time() - t0 print(f" ERROR after {elapsed:.0f}s: {e}") sb.table("papers").update({"status": "error", "processing_step": str(e)[:200]}).eq("id", paper_id).execute() return paper_id async def batch_import(dir_path: Path, concurrency: int = 1, dry_run: bool = False): """批量导入目录下所有试卷""" items = scan_directory(dir_path) if not items: print(f"No papers found in {dir_path}") print("Expected structure: dir/COURSE_CODE/year_term_examtype.pdf") return print(f"Found {len(items)} papers to import:\n") for item in items: ans_label = f" + answer" if item["answer_path"] else "" print(f" {item['course_code']} {item['year']} {item['term']} {item['exam_type']}{ans_label}") print(f" <- {item['paper_path']}") if dry_run: print(f"\n[DRY RUN] Would import {len(items)} papers. Exiting.") return print(f"\nStarting import (concurrency={concurrency})...\n") semaphore = asyncio.Semaphore(concurrency) results = {"ok": 0, "skip": 0, "error": 0} async def process_one(item): async with semaphore: try: pid = await import_single( paper_path=item["paper_path"], answer_path=item["answer_path"], course_code=item["course_code"], year=item["year"], term=item["term"], exam_type=item["exam_type"], ) if pid: results["ok"] += 1 else: results["skip"] += 1 except Exception as e: results["error"] += 1 print(f" FATAL: {item['course_code']} {item['year']} - {e}") # 串行或并发处理 if concurrency == 1: for item in items: await process_one(item) else: await asyncio.gather(*[process_one(item) for item in items]) print(f"\n{'='*50}") print(f"Import complete: {results['ok']} success, {results['skip']} skipped, {results['error']} errors") def main(): parser = argparse.ArgumentParser(description="Batch import papers to PastPaper Master") parser.add_argument("path", help="Path to PDF file or directory (with --batch)") parser.add_argument("--answer", help="Path to answer PDF (single file mode)") parser.add_argument("--course", help="Course code (e.g. COMP2211)") parser.add_argument("--year", type=int, help="Year (e.g. 2024)") parser.add_argument("--term", choices=["spring", "summer", "fall"], help="Term") parser.add_argument("--exam", choices=["midterm", "final", "quiz"], help="Exam type") parser.add_argument("--batch", action="store_true", help="Batch import from directory") parser.add_argument("--concurrency", type=int, default=1, help="Max concurrent imports (default: 1)") parser.add_argument("--dry-run", action="store_true", help="Print what would be imported without doing it") args = parser.parse_args() path = Path(args.path) if args.batch: if not path.is_dir(): print(f"Error: {path} is not a directory") sys.exit(1) asyncio.run(batch_import(path, concurrency=args.concurrency, dry_run=args.dry_run)) else: # Single file mode if not path.is_file(): print(f"Error: {path} is not a file") sys.exit(1) if not all([args.course, args.year, args.term, args.exam]): print("Error: --course, --year, --term, --exam are required for single file import") sys.exit(1) answer_path = Path(args.answer) if args.answer else None result = asyncio.run(import_single( paper_path=path, answer_path=answer_path, course_code=args.course.upper(), year=args.year, term=args.term, exam_type=args.exam, )) if result: print(f"\nPaper ID: {result}") if __name__ == "__main__": main()