234 lines
8.7 KiB
Python
234 lines
8.7 KiB
Python
"""Split COMP2211 Spring 2022 midterm top-level problems into subquestions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from app.services.supabase_client import get_supabase
|
|
|
|
|
|
EXAM_KEY = "COMP2211-2022-spring-midterm"
|
|
TRUE_FALSE_OPTIONS = [{"label": "True", "text": "True"}, {"label": "False", "text": "False"}]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ChildSpec:
|
|
question_number: str
|
|
parent_question: str
|
|
top_level_number: str
|
|
path: tuple[str, ...]
|
|
score: float
|
|
question_type: str
|
|
question_format: str | None = None
|
|
page_number: int = 1
|
|
|
|
|
|
def short_answer(
|
|
question_number: str,
|
|
parent_question: str,
|
|
top_level_number: str,
|
|
path: tuple[str, ...],
|
|
score: float,
|
|
*,
|
|
page_number: int,
|
|
) -> ChildSpec:
|
|
return ChildSpec(
|
|
question_number=question_number,
|
|
parent_question=parent_question,
|
|
top_level_number=top_level_number,
|
|
path=path,
|
|
score=score,
|
|
question_type="long_question",
|
|
question_format="short_answer",
|
|
page_number=page_number,
|
|
)
|
|
|
|
|
|
CHILDREN: list[ChildSpec] = [
|
|
*[
|
|
ChildSpec(f"1{letter}", "1", "1", (letter,), 1.5, "true_false", page_number=2)
|
|
for letter in "abcdefghij"
|
|
],
|
|
ChildSpec("2a_i", "2a", "2", ("a", "i"), 1, "fill_blank", page_number=4),
|
|
ChildSpec("2a_ii", "2a", "2", ("a", "ii"), 1, "fill_blank", page_number=4),
|
|
ChildSpec("2a_iii", "2a", "2", ("a", "iii"), 1, "fill_blank", page_number=4),
|
|
ChildSpec("2a_iv", "2a", "2", ("a", "iv"), 1, "fill_blank", page_number=4),
|
|
ChildSpec("2a_v", "2a", "2", ("a", "v"), 1, "fill_blank", page_number=4),
|
|
ChildSpec("2b", "2", "2", ("b",), 2, "fill_blank", page_number=4),
|
|
ChildSpec("2c", "2", "2", ("c",), 9, "long_question", "coding", page_number=5),
|
|
ChildSpec("3a", "3", "3", ("a",), 2, "fill_blank", page_number=7),
|
|
ChildSpec("3b_i", "3b", "3", ("b", "i"), 1.75, "fill_blank", page_number=7),
|
|
ChildSpec("3b_ii", "3b", "3", ("b", "ii"), 1.75, "fill_blank", page_number=7),
|
|
ChildSpec("3b_iii", "3b", "3", ("b", "iii"), 1.75, "fill_blank", page_number=7),
|
|
ChildSpec("3b_iv", "3b", "3", ("b", "iv"), 1.75, "fill_blank", page_number=7),
|
|
short_answer("3c", "3", "3", ("c",), 2, page_number=8),
|
|
ChildSpec("4a", "4", "4", ("a",), 3, "long_question", "long_answer", page_number=9),
|
|
short_answer("4b_i", "4b", "4", ("b", "i"), 3, page_number=9),
|
|
short_answer("4b_ii", "4b", "4", ("b", "ii"), 3, page_number=9),
|
|
ChildSpec("4c_i", "4c", "4", ("c", "i"), 2, "long_question", "long_answer", page_number=10),
|
|
ChildSpec("4c_ii", "4c", "4", ("c", "ii"), 3, "long_question", "long_answer", page_number=10),
|
|
ChildSpec("5a", "5", "5", ("a",), 4.5, "long_question", "long_answer", page_number=11),
|
|
ChildSpec("5b", "5", "5", ("b",), 1.5, "fill_blank", page_number=11),
|
|
ChildSpec("5c", "5", "5", ("c",), 4.5, "long_question", "long_answer", page_number=11),
|
|
short_answer("5d", "5", "5", ("d",), 1.5, page_number=11),
|
|
ChildSpec("6a", "6", "6", ("a",), 8, "long_question", "long_answer", page_number=12),
|
|
short_answer("6b", "6", "6", ("b",), 2, page_number=13),
|
|
ChildSpec("6c", "6", "6", ("c",), 10, "long_question", "coding", page_number=13),
|
|
short_answer("7a", "7", "7", ("a",), 4, page_number=14),
|
|
short_answer("7b", "7", "7", ("b",), 6, page_number=14),
|
|
ChildSpec("7c", "7", "7", ("c",), 2, "fill_blank", page_number=15),
|
|
]
|
|
|
|
|
|
MARKER_RE = re.compile(r"(?m)^\(([a-z]+)\)\s*")
|
|
PROBLEM_SEED_PATH = (
|
|
Path(__file__).resolve().parent.parent
|
|
/ "pastpaper-scraper"
|
|
/ "reviews"
|
|
/ "COMP2211"
|
|
/ "problem_seed.json"
|
|
)
|
|
|
|
|
|
def split_sections(text: str) -> tuple[str, dict[str, str]]:
|
|
matches = list(MARKER_RE.finditer(text))
|
|
if not matches:
|
|
return text.strip(), {}
|
|
intro = text[: matches[0].start()].strip()
|
|
sections: dict[str, str] = {}
|
|
for idx, match in enumerate(matches):
|
|
marker = match.group(1)
|
|
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
|
|
sections[marker] = text[match.start() : end].strip()
|
|
return intro, sections
|
|
|
|
|
|
def extract_segment(text: str, path: tuple[str, ...]) -> str:
|
|
intro, sections = split_sections(text)
|
|
if not path:
|
|
return text.strip()
|
|
first = sections.get(path[0], "")
|
|
if not first:
|
|
return text.strip()
|
|
if len(path) == 1:
|
|
return "\n".join(part for part in [intro, first] if part).strip()
|
|
child_intro, child_sections = split_sections(first)
|
|
second = child_sections.get(path[1], "")
|
|
return "\n".join(part for part in [intro, child_intro, second] if part).strip()
|
|
|
|
|
|
def extract_true_false_answers(answer_text: str) -> dict[str, str]:
|
|
answers: dict[str, str] = {}
|
|
matches = list(re.finditer(r"(?m)^\(([a-j])\)\s*\n?([TF])\b", answer_text))
|
|
for match in matches:
|
|
answers[match.group(1)] = match.group(2)
|
|
return answers
|
|
|
|
|
|
def derive_correct_answer(answer_text: str) -> str | None:
|
|
if not answer_text:
|
|
return None
|
|
if "Answer:" in answer_text:
|
|
tail = answer_text.split("Answer:", 1)[1]
|
|
else:
|
|
tail = answer_text
|
|
lines = [line.strip() for line in tail.splitlines() if line.strip()]
|
|
if not lines:
|
|
return None
|
|
first = lines[0]
|
|
if first.lower().startswith("marking scheme"):
|
|
return None
|
|
if len(first) <= 240:
|
|
return first
|
|
return None
|
|
|
|
|
|
def load_seed_rows() -> dict[str, dict]:
|
|
data = json.loads(PROBLEM_SEED_PATH.read_text())
|
|
return {
|
|
row["question_number"]: row
|
|
for row in data
|
|
if row["source_exam_key"] == EXAM_KEY
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
sb = get_supabase()
|
|
paper = (
|
|
sb.table("papers")
|
|
.select("id")
|
|
.eq("source_exam_key", EXAM_KEY)
|
|
.execute()
|
|
.data[0]
|
|
)
|
|
paper_id = paper["id"]
|
|
|
|
current_rows = (
|
|
sb.table("paper_questions")
|
|
.select("*")
|
|
.eq("paper_id", paper_id)
|
|
.order("display_order")
|
|
.execute()
|
|
.data
|
|
)
|
|
existing_by_number = {row["question_number"]: row for row in current_rows}
|
|
parent_rows = load_seed_rows()
|
|
tf_answers = extract_true_false_answers(parent_rows["1"]["raw_answer_text"] or "")
|
|
|
|
inserts = []
|
|
for display_order, child in enumerate(CHILDREN, start=1):
|
|
parent = parent_rows[child.top_level_number]
|
|
existing = existing_by_number.get(child.question_number, {})
|
|
question_text = extract_segment(parent["question_text"] or "", child.path)
|
|
raw_answer_text = extract_segment(parent["raw_answer_text"] or "", child.path)
|
|
|
|
correct_option = None
|
|
correct_answer = None
|
|
options = None
|
|
if child.question_type == "true_false":
|
|
marker = child.path[0]
|
|
correct_option = tf_answers.get(marker)
|
|
options = TRUE_FALSE_OPTIONS
|
|
elif child.question_type == "fill_blank":
|
|
correct_answer = derive_correct_answer(raw_answer_text)
|
|
|
|
inserts.append(
|
|
{
|
|
"paper_id": paper_id,
|
|
"question_number": child.question_number,
|
|
"parent_question": child.parent_question,
|
|
"display_order": display_order,
|
|
"question_type": child.question_type,
|
|
"question_format": child.question_format,
|
|
"question_text": question_text,
|
|
"score": child.score,
|
|
"page_number": child.page_number,
|
|
"page_y_ratio": existing.get("page_y_ratio"),
|
|
"options": options,
|
|
"correct_option": correct_option,
|
|
"correct_answer": correct_answer,
|
|
"raw_answer_text": raw_answer_text,
|
|
"topics": existing.get("topics") or parent.get("topics"),
|
|
"topic_primary": existing.get("topic_primary") or parent.get("topic_primary"),
|
|
"analytics_topic": existing.get("analytics_topic") or parent.get("analytics_topic"),
|
|
"topic_tags": existing.get("topic_tags") or parent.get("topic_tags"),
|
|
"skill_tags": existing.get("skill_tags") or parent.get("skill_tags"),
|
|
"difficulty": existing.get("difficulty") or parent.get("difficulty"),
|
|
"knowledge_reminder": existing.get("knowledge_reminder", ""),
|
|
"ai_hint": existing.get("ai_hint", ""),
|
|
"solution": existing.get("solution", ""),
|
|
}
|
|
)
|
|
|
|
sb.table("paper_questions").delete().eq("paper_id", paper_id).execute()
|
|
sb.table("paper_questions").insert(inserts).execute()
|
|
sb.table("papers").update({"question_count": len(inserts), "status": "processing"}).eq("id", paper_id).execute()
|
|
print(f"Inserted {len(inserts)} rows for {EXAM_KEY}.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|