"""用户答题记录 + 拍照批改 + 错题本""" import asyncio from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends from pydantic import BaseModel from app.services.supabase_client import get_supabase from app.services.grader import ocr_photo, grade_answer from app.dependencies.auth import get_current_user_id router = APIRouter() class AttemptCreate(BaseModel): question_id: str attempt_type: str # "select" | "input" | "photo" user_answer: str | None = None is_correct: bool | None = None class AttemptUpdate(BaseModel): in_error_book: bool | None = None mastered: bool | None = None @router.post("/") async def create_attempt(data: AttemptCreate, user_id: str = Depends(get_current_user_id)): """记录一次答题""" sb = get_supabase() record = { "user_id": user_id, "question_id": data.question_id, "attempt_type": data.attempt_type, "user_answer": data.user_answer, "is_correct": data.is_correct, } # Auto add to error book if wrong if data.is_correct is False: record["in_error_book"] = True result = sb.table("user_attempts").insert(record).execute() return result.data[0] @router.post("/photo") async def photo_attempt( question_id: str = Form(...), photo: UploadFile = File(...), user_id: str = Depends(get_current_user_id), ): """拍照上传 → OCR → AI批改""" sb = get_supabase() # 1. Read photo photo_bytes = await photo.read() # 2. Upload to storage storage_path = f"attempts/{user_id}/{question_id}/{photo.filename}" sb.storage.from_("attempt-photos").upload( storage_path, photo_bytes, file_options={"content-type": photo.content_type or "image/jpeg", "upsert": "true"}, ) photo_url = sb.storage.from_("attempt-photos").get_public_url(storage_path) # 3. OCR (run in thread pool to avoid blocking event loop) ocr_text = await asyncio.to_thread(ocr_photo, photo_bytes) # 4. Fetch question for grading context q_result = sb.table("paper_questions").select("*").eq("id", question_id).execute() if not q_result.data: raise HTTPException(status_code=404, detail="Question not found") question = q_result.data[0] # 5. AI grading (run in thread pool) grade_result = await asyncio.to_thread(grade_answer, question, ocr_text) # 6. Save attempt record = { "user_id": user_id, "question_id": question_id, "attempt_type": "photo", "photo_url": photo_url, "photo_ocr_text": ocr_text, "is_correct": grade_result.get("is_correct", False), "feedback": grade_result.get("feedback", ""), "error_at_step": grade_result.get("error_at_step"), "in_error_book": not grade_result.get("is_correct", False), } result = sb.table("user_attempts").insert(record).execute() return { "attempt": result.data[0], "ocr_text": ocr_text, "grade": grade_result, } @router.get("/error-book") async def get_error_book( course_code: str | None = None, user_id: str = Depends(get_current_user_id), ): """获取错题本""" sb = get_supabase() attempts = ( sb.table("user_attempts") .select("*") .eq("user_id", user_id) .eq("in_error_book", True) .eq("mastered", False) .order("created_at", desc=True) .execute() .data ) if not attempts: return [] question_ids = list({attempt["question_id"] for attempt in attempts}) questions = ( sb.table("paper_questions") .select("*") .in_("id", question_ids) .execute() .data ) questions_by_id = {question["id"]: question for question in questions} paper_ids = list({question["paper_id"] for question in questions}) papers = ( sb.table("papers") .select("id, course_code, year, term, exam_type, part_label") .in_("id", paper_ids) .execute() .data ) papers_by_id = {paper["id"]: paper for paper in papers} enriched = [] for attempt in attempts: question = questions_by_id.get(attempt["question_id"]) if not question: continue paper = papers_by_id.get(question["paper_id"]) if course_code and paper and paper.get("course_code") != course_code.upper(): continue enriched.append( { **attempt, "paper_questions": { **question, "paper": paper, }, } ) return enriched @router.get("/by-paper/{paper_id}") async def get_paper_attempts(paper_id: str, user_id: str = Depends(get_current_user_id)): """获取某张试卷所有题目的最新判卷记录""" sb = get_supabase() attempts = ( sb.table("user_attempts") .select("question_id, is_correct, feedback, photo_ocr_text, attempt_type, created_at") .eq("user_id", user_id) .order("created_at", desc=True) .execute() .data ) # 只保留 photo 类型的,且只保留每题最新一条 question_ids = ( sb.table("paper_questions") .select("id") .eq("paper_id", paper_id) .execute() .data ) qid_set = {q["id"] for q in question_ids} seen: set[str] = set() result = [] for a in attempts: if a["question_id"] not in qid_set: continue if a["question_id"] in seen: continue if a["attempt_type"] != "photo": continue seen.add(a["question_id"]) result.append(a) return result @router.patch("/{attempt_id}") async def update_attempt(attempt_id: str, data: AttemptUpdate): """更新错题状态(标记掌握等)""" sb = get_supabase() update = {} if data.in_error_book is not None: update["in_error_book"] = data.in_error_book if data.mastered is not None: update["mastered"] = data.mastered if not update: raise HTTPException(status_code=400, detail="Nothing to update") result = sb.table("user_attempts").update(update).eq("id", attempt_id).execute() if not result.data: raise HTTPException(status_code=404, detail="Attempt not found") return result.data[0]