209 lines
6.2 KiB
Python
209 lines
6.2 KiB
Python
"""用户答题记录 + 拍照批改 + 错题本"""
|
|
|
|
import asyncio
|
|
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends
|
|
from pydantic import BaseModel
|
|
from app.services.supabase_client import get_supabase
|
|
from app.services.grader import ocr_photo, grade_answer
|
|
from app.dependencies.auth import get_current_user_id
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class AttemptCreate(BaseModel):
|
|
question_id: str
|
|
attempt_type: str # "select" | "input" | "photo"
|
|
user_answer: str | None = None
|
|
is_correct: bool | None = None
|
|
|
|
|
|
class AttemptUpdate(BaseModel):
|
|
in_error_book: bool | None = None
|
|
mastered: bool | None = None
|
|
|
|
|
|
@router.post("/")
|
|
async def create_attempt(data: AttemptCreate, user_id: str = Depends(get_current_user_id)):
|
|
"""记录一次答题"""
|
|
sb = get_supabase()
|
|
record = {
|
|
"user_id": user_id,
|
|
"question_id": data.question_id,
|
|
"attempt_type": data.attempt_type,
|
|
"user_answer": data.user_answer,
|
|
"is_correct": data.is_correct,
|
|
}
|
|
# Auto add to error book if wrong
|
|
if data.is_correct is False:
|
|
record["in_error_book"] = True
|
|
|
|
result = sb.table("user_attempts").insert(record).execute()
|
|
return result.data[0]
|
|
|
|
|
|
@router.post("/photo")
|
|
async def photo_attempt(
|
|
question_id: str = Form(...),
|
|
photo: UploadFile = File(...),
|
|
user_id: str = Depends(get_current_user_id),
|
|
):
|
|
"""拍照上传 → OCR → AI批改"""
|
|
sb = get_supabase()
|
|
|
|
# 1. Read photo
|
|
photo_bytes = await photo.read()
|
|
|
|
# 2. Upload to storage
|
|
storage_path = f"attempts/{user_id}/{question_id}/{photo.filename}"
|
|
sb.storage.from_("attempt-photos").upload(
|
|
storage_path, photo_bytes,
|
|
file_options={"content-type": photo.content_type or "image/jpeg", "upsert": "true"},
|
|
)
|
|
photo_url = sb.storage.from_("attempt-photos").get_public_url(storage_path)
|
|
|
|
# 3. OCR (run in thread pool to avoid blocking event loop)
|
|
ocr_text = await asyncio.to_thread(ocr_photo, photo_bytes)
|
|
|
|
# 4. Fetch question for grading context
|
|
q_result = sb.table("paper_questions").select("*").eq("id", question_id).execute()
|
|
if not q_result.data:
|
|
raise HTTPException(status_code=404, detail="Question not found")
|
|
question = q_result.data[0]
|
|
|
|
# 5. AI grading (run in thread pool)
|
|
grade_result = await asyncio.to_thread(grade_answer, question, ocr_text)
|
|
|
|
# 6. Save attempt
|
|
record = {
|
|
"user_id": user_id,
|
|
"question_id": question_id,
|
|
"attempt_type": "photo",
|
|
"photo_url": photo_url,
|
|
"photo_ocr_text": ocr_text,
|
|
"is_correct": grade_result.get("is_correct", False),
|
|
"feedback": grade_result.get("feedback", ""),
|
|
"error_at_step": grade_result.get("error_at_step"),
|
|
"in_error_book": not grade_result.get("is_correct", False),
|
|
}
|
|
result = sb.table("user_attempts").insert(record).execute()
|
|
|
|
return {
|
|
"attempt": result.data[0],
|
|
"ocr_text": ocr_text,
|
|
"grade": grade_result,
|
|
}
|
|
|
|
|
|
@router.get("/error-book")
|
|
async def get_error_book(
|
|
course_code: str | None = None,
|
|
user_id: str = Depends(get_current_user_id),
|
|
):
|
|
"""获取错题本"""
|
|
sb = get_supabase()
|
|
attempts = (
|
|
sb.table("user_attempts")
|
|
.select("*")
|
|
.eq("user_id", user_id)
|
|
.eq("in_error_book", True)
|
|
.eq("mastered", False)
|
|
.order("created_at", desc=True)
|
|
.execute()
|
|
.data
|
|
)
|
|
if not attempts:
|
|
return []
|
|
|
|
question_ids = list({attempt["question_id"] for attempt in attempts})
|
|
questions = (
|
|
sb.table("paper_questions")
|
|
.select("*")
|
|
.in_("id", question_ids)
|
|
.execute()
|
|
.data
|
|
)
|
|
questions_by_id = {question["id"]: question for question in questions}
|
|
|
|
paper_ids = list({question["paper_id"] for question in questions})
|
|
papers = (
|
|
sb.table("papers")
|
|
.select("id, course_code, year, term, exam_type, part_label")
|
|
.in_("id", paper_ids)
|
|
.execute()
|
|
.data
|
|
)
|
|
papers_by_id = {paper["id"]: paper for paper in papers}
|
|
|
|
enriched = []
|
|
for attempt in attempts:
|
|
question = questions_by_id.get(attempt["question_id"])
|
|
if not question:
|
|
continue
|
|
paper = papers_by_id.get(question["paper_id"])
|
|
if course_code and paper and paper.get("course_code") != course_code.upper():
|
|
continue
|
|
|
|
enriched.append(
|
|
{
|
|
**attempt,
|
|
"paper_questions": {
|
|
**question,
|
|
"paper": paper,
|
|
},
|
|
}
|
|
)
|
|
return enriched
|
|
|
|
|
|
@router.get("/by-paper/{paper_id}")
|
|
async def get_paper_attempts(paper_id: str, user_id: str = Depends(get_current_user_id)):
|
|
"""获取某张试卷所有题目的最新判卷记录"""
|
|
sb = get_supabase()
|
|
attempts = (
|
|
sb.table("user_attempts")
|
|
.select("question_id, is_correct, feedback, photo_ocr_text, attempt_type, created_at")
|
|
.eq("user_id", user_id)
|
|
.order("created_at", desc=True)
|
|
.execute()
|
|
.data
|
|
)
|
|
# 只保留 photo 类型的,且只保留每题最新一条
|
|
question_ids = (
|
|
sb.table("paper_questions")
|
|
.select("id")
|
|
.eq("paper_id", paper_id)
|
|
.execute()
|
|
.data
|
|
)
|
|
qid_set = {q["id"] for q in question_ids}
|
|
seen: set[str] = set()
|
|
result = []
|
|
for a in attempts:
|
|
if a["question_id"] not in qid_set:
|
|
continue
|
|
if a["question_id"] in seen:
|
|
continue
|
|
if a["attempt_type"] != "photo":
|
|
continue
|
|
seen.add(a["question_id"])
|
|
result.append(a)
|
|
return result
|
|
|
|
|
|
@router.patch("/{attempt_id}")
|
|
async def update_attempt(attempt_id: str, data: AttemptUpdate):
|
|
"""更新错题状态(标记掌握等)"""
|
|
sb = get_supabase()
|
|
update = {}
|
|
if data.in_error_book is not None:
|
|
update["in_error_book"] = data.in_error_book
|
|
if data.mastered is not None:
|
|
update["mastered"] = data.mastered
|
|
if not update:
|
|
raise HTTPException(status_code=400, detail="Nothing to update")
|
|
|
|
result = sb.table("user_attempts").update(update).eq("id", attempt_id).execute()
|
|
if not result.data:
|
|
raise HTTPException(status_code=404, detail="Attempt not found")
|
|
return result.data[0]
|