Files
hejyou_content_creation/app.py
admin 50d5377fb5 Fix: Archived Directus items als gelöscht behandeln
Directus archiviert Items beim Löschen (status=archived) statt hart zu löschen.
- Questions/Words-Query filtert archived heraus (filter[status][_neq]=archived)
- Purge-Endpoints behandeln archived items als nicht-existent → Junction-Zeilen werden entfernt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-26 20:47:48 +02:00

1505 lines
57 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from pathlib import Path
from datetime import datetime
from uuid import uuid4
import json
import os
import urllib.request
import urllib.error
import urllib.parse
from flask import Flask, send_from_directory, request, jsonify
from flask_cors import CORS
from PIL import Image
import ollama
import anthropic as _anthropic_sdk
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
DIRECTUS_URL = "https://db.hejyou.com"
BASE_DIR = Path(__file__).resolve().parent
PICTURES_DIR = BASE_DIR / "pictures"
OBJECTS_DIR = BASE_DIR / "objects_image"
SENTENCE_DIR = BASE_DIR / "sentence_object"
PROMPTS_DIR = BASE_DIR / "prompts"
app = Flask(
__name__,
template_folder=str(BASE_DIR / "templates"),
static_folder=str(BASE_DIR / "static"),
)
CORS(app)
def read_prompt(filepath: Path, fallback: str) -> str:
"""
Hilfsfunktion, um Prompt-Dateien zu lesen.
"""
try:
if filepath.exists():
return filepath.read_text(encoding="utf-8").strip()
except Exception:
pass
return fallback.strip()
def _directus(method, path, token, body=None):
"""Hilfsfunktion: Directus-API-Aufruf via urllib."""
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = token
req = urllib.request.Request(
f"{DIRECTUS_URL}{path}",
data=json.dumps(body).encode() if body is not None else None,
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(req) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}, resp.status
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8")
return json.loads(raw) if raw else {}, e.code
@app.route("/api/directus/auth/login", methods=["POST"])
def directus_auth_login():
"""Proxy: Directus-Login ohne CORS-Probleme."""
data, status = _directus("POST", "/auth/login", token=None, body=request.get_json())
return jsonify(data), status
@app.route("/api/directus/pictures", methods=["GET"])
def directus_pictures():
"""Proxy: Directus-Bilder nach Status filtern."""
token = request.headers.get("Authorization", "")
pic_status = request.args.get("status", "new")
data, status = _directus("GET", f"/items/pictures?filter[status][_eq]={pic_status}&fields=id,media,status&sort=date_created", token)
return jsonify(data), status
@app.route("/api/directus/pictures/<pic_id>", methods=["PATCH"])
def directus_picture(pic_id):
"""Proxy: Bild-Status aktualisieren."""
token = request.headers.get("Authorization", "")
data, status = _directus("PATCH", f"/items/pictures/{pic_id}", token, body=request.get_json())
return jsonify(data), status
@app.route("/api/directus/objects", methods=["GET", "POST"])
def directus_objects():
"""Proxy: Objekte laden (GET) oder anlegen (POST)."""
token = request.headers.get("Authorization", "")
if request.method == "GET":
picture_id = request.args.get("picture_id", "")
fields = "id,selections,user_notes,parent,status,picture"
path = f"/items/objects?filter[picture][_eq]={picture_id}&fields={fields}&sort=date_created"
data, status = _directus("GET", path, token)
return jsonify(data), status
else:
data, status = _directus("POST", "/items/objects", token, body=request.get_json())
return jsonify(data), status
@app.route("/api/directus/objects/<obj_id>", methods=["PATCH", "DELETE"])
def directus_object(obj_id):
"""Proxy: Objekt aktualisieren (PATCH) oder löschen (DELETE)."""
token = request.headers.get("Authorization", "")
if request.method == "PATCH":
data, status = _directus("PATCH", f"/items/objects/{obj_id}", token, body=request.get_json())
else:
data, status = _directus("DELETE", f"/items/objects/{obj_id}", token)
return jsonify(data), status
@app.route("/api/images", methods=["GET"])
def list_images():
"""
Gibt alle Bilder im pictures-Ordner zurück.
Query-Parameter: ?mode=draw (Standard) oder ?mode=generate
draw → Bilder ohne _saved-Suffix
generate → Bilder mit _saved-Suffix
"""
mode = request.args.get("mode", "draw")
PICTURES_DIR.mkdir(parents=True, exist_ok=True)
if mode == "generate":
image_paths = sorted(
[f for f in PICTURES_DIR.iterdir() if f.is_file() and f.stem.endswith("_saved")],
key=lambda p: p.stat().st_mtime,
)
else:
image_paths = sorted(
[f for f in PICTURES_DIR.iterdir() if f.is_file() and not f.stem.endswith("_saved")],
key=lambda p: p.stat().st_mtime,
)
return jsonify({"images": [p.name for p in image_paths]})
REACT_BUILD_DIR = BASE_DIR / "static" / "react"
def _serve_spa():
"""Liefert die React SPA index.html für alle Frontend-Routen."""
return send_from_directory(REACT_BUILD_DIR, "index.html")
@app.route("/")
@app.route("/draw")
@app.route("/generate")
def serve_spa():
return _serve_spa()
@app.route("/assets/<path:filename>")
def serve_react_assets(filename: str):
"""Liefert JS/CSS-Assets aus dem Vite-Build."""
return send_from_directory(REACT_BUILD_DIR / "assets", filename)
@app.route("/pictures/<path:filename>")
def serve_picture(filename: str):
# Statisches Ausliefern der Originalbilder
return send_from_directory(PICTURES_DIR, filename)
@app.route("/objects_image/<path:filename>")
def serve_object_image(filename: str):
# Ausliefern der gespeicherten Ausschnitt-Bilder
return send_from_directory(OBJECTS_DIR, filename)
@app.route("/api/objects", methods=["GET"])
def list_objects_for_image():
"""
Gibt alle gespeicherten Objekte (Ausschnitte) zu einem Quellbild zurück.
Query-Parameter:
?filename=testbild.png
"""
filename = request.args.get("filename")
if not filename:
return jsonify({"error": "Missing filename query parameter"}), 400
# Für *_saved-Bilder auch Objekte berücksichtigen, deren source_filename
# noch auf den ursprünglichen Namen ohne "_saved" zeigt (Kompatibilität)
base_filename = filename
legacy_filename = None
if base_filename.endswith(".png") or base_filename.endswith(".jpg") or base_filename.endswith(".jpeg") or base_filename.endswith(".webp"):
stem = base_filename.rsplit(".", 1)[0]
suffix = base_filename.rsplit(".", 1)[1]
if stem.endswith("_saved"):
legacy_filename = f"{stem[:-6]}.{suffix}"
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
objects = []
for meta_file in OBJECTS_DIR.glob("*.txt"):
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
except Exception:
continue
src_name = meta.get("source_filename")
if src_name != filename and (legacy_filename is None or src_name != legacy_filename):
continue
objects.append(
{
"id": meta.get("id"),
"image_file": meta.get("image_file"),
"title_de": meta.get("title_de", ""),
"position_de": meta.get("position_de", ""),
"action_de": meta.get("action_de", ""),
"condition_de": meta.get("condition_de", ""),
# Von KI erzeugte Details (optional)
"label_en": meta.get("label_en"),
"label_de": meta.get("label_de"),
"label_se": meta.get("label_se"),
"color_en": meta.get("color_en"),
"adjective_en": meta.get("adjective_en"),
"action_verb_en": meta.get("action_verb_en"),
"preposition_en": meta.get("preposition_en"),
"relative_position_en": meta.get("relative_position_en"),
"season_en": meta.get("season_en"),
"mode": meta.get("mode"),
"bbox": meta.get("bbox"),
"polygon": meta.get("polygon"),
"hierarchy": meta.get("hierarchy", 1),
"parent_id": meta.get("parent_id"),
"created_at": meta.get("created_at"),
}
)
# Älteste zuerst sortieren (1 = ältestes Objekt)
objects.sort(key=lambda o: o.get("created_at") or "")
# Laufende Nummer je Bild (1..n) nach obiger Sortierung
for idx, obj in enumerate(objects, start=1):
obj["index"] = idx
return jsonify({"objects": objects})
@app.route("/api/object/<obj_id>/hierarchy", methods=["POST"])
def update_object_hierarchy(obj_id: str):
"""
Aktualisiert die Hierarchie (1, 2 oder 3) eines gespeicherten Objekts.
"""
data = request.get_json(force=True, silent=True) or {}
try:
hierarchy = int(data.get("hierarchy"))
except (TypeError, ValueError):
return jsonify({"error": "Invalid hierarchy"}), 400
if hierarchy not in (1, 2, 3):
return jsonify({"error": "Hierarchy must be 1, 2 or 3"}), 400
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
meta["hierarchy"] = hierarchy
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({"id": obj_id, "hierarchy": hierarchy})
@app.route("/api/object/<obj_id>", methods=["POST"])
def update_object_meta(obj_id: str):
"""
Aktualisiert Text-Metadaten (title_de, position_de, action_de, condition_de)
eines gespeicherten Objekts.
"""
data = request.get_json(force=True, silent=True) or {}
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
for key in ("title_de", "position_de", "action_de", "condition_de"):
if key in data:
meta[key] = data.get(key, "")
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({k: meta.get(k) for k in ("id", "title_de", "position_de", "action_de", "condition_de")})
@app.route("/api/object/<obj_id>/parent", methods=["POST"])
def update_object_parent(obj_id: str):
"""
Aktualisiert die Parent-Relation eines Objekts.
Erwartet JSON: { "parent_id": "<uuid>" | null }
"""
data = request.get_json(force=True, silent=True) or {}
parent_id = data.get("parent_id")
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
# Optional: prüfen, ob parent_id existiert und zum selben Bild gehört
if parent_id:
parent_meta_path = OBJECTS_DIR / f"{parent_id}.txt"
if not parent_meta_path.exists():
return jsonify({"error": "Parent object not found"}), 400
try:
parent_meta = json.loads(parent_meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read parent meta file"}), 500
if parent_meta.get("source_filename") != meta.get("source_filename"):
return jsonify({"error": "Parent must belong to same source image"}), 400
meta["parent_id"] = parent_id or None
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({"id": obj_id, "parent_id": meta.get("parent_id")})
@app.route("/api/object/<obj_id>/generate_details", methods=["POST"])
def generate_object_details(obj_id: str):
"""
Erzeugt mit Llama 3.2 Vision zusätzliche englische Metadaten zu einem Objekt
auf Basis des ausgeschnittenen Objektbildes + vorhandener deutscher Felder.
"""
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
image_file = meta.get("image_file")
if not image_file:
return jsonify({"error": "Object has no image_file"}), 400
image_path = OBJECTS_DIR / image_file
if not image_path.exists():
return jsonify({"error": "Object image not found"}), 404
# Prompt laden
prompt_template = read_prompt(
PROMPTS_DIR / "create_details.txt",
"Analyze this object and return one JSON object with the requested fields.",
)
title_de = meta.get("title_de", "") or ""
action_de = meta.get("action_de", "") or ""
condition_de = meta.get("condition_de", "") or ""
# Platzhalter im Prompt ersetzen (einfache .format-Verwendung)
try:
prompt = prompt_template.format(
title_de=title_de,
action_de=action_de,
condition_de=condition_de,
)
except Exception:
# Falls das Format fehlschlägt, einfach Rohprompt + Kontext anhängen
prompt = (
f"{prompt_template}\n\n"
f"Titel (Deutsch): {title_de}\n"
f"Status (Deutsch): {action_de}\n"
f"Zustand (Deutsch): {condition_de}\n"
)
# Bild als Bytes laden
try:
img_bytes = image_path.read_bytes()
except Exception:
return jsonify({"error": "Could not read object image"}), 500
try:
response = ollama.chat(
model="llama3.2-vision",
messages=[{"role": "user", "content": prompt, "images": [img_bytes]}],
format="json",
options={"temperature": 0},
)
except Exception as e:
return jsonify({"error": f"LLM request failed: {e}"}), 500
content = response.get("message", {}).get("content")
if not content:
return jsonify({"error": "Empty response from LLM"}), 500
try:
ai_data = json.loads(content)
except Exception:
# Fallback: versuchen, das erste JSON-Objekt zu extrahieren
try:
start = content.index("{")
end = content.rindex("}") + 1
ai_data = json.loads(content[start:end])
except Exception:
return jsonify({"error": "Could not parse JSON from LLM response"}), 500
# Debug-Ausgabe des rohen AI-Outputs
try:
print(f"[generate_details] Raw AI data for {obj_id}: {json.dumps(ai_data, ensure_ascii=False)}")
except Exception:
print(f"[generate_details] Raw AI data for {obj_id} (non-serializable): {ai_data!r}")
# Erwartete Felder übernehmen (gemäß neuem Prompt)
fields = [
"label_en",
"label_de",
"label_se",
"color_en",
"adjective_en",
"action_verb_en",
"preposition_en",
"relative_position_en",
"season_en",
]
for key in fields:
if key in ai_data:
meta[key] = ai_data.get(key)
# Metadatei zurückschreiben
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
# Nur die neuen Felder zurückgeben (None als leeren String konvertieren)
result = {key: (meta.get(key) or "") for key in fields}
try:
print(f"[generate_details] Stored meta for {obj_id}: {json.dumps(result, ensure_ascii=False)}")
except Exception:
print(f"[generate_details] Stored meta for {obj_id}: {result!r}")
return jsonify(result)
def load_sentences_for_object(obj_id: str):
"""
Lädt alle gespeicherten Sätze für ein Objekt aus sentence_object/<obj_id>.txt.
Rückgabe: Liste von Dicts mit mindestens question_en, answer_en, created_at.
"""
SENTENCE_DIR.mkdir(parents=True, exist_ok=True)
sentence_path = SENTENCE_DIR / f"{obj_id}.txt"
if not sentence_path.exists():
return []
try:
data = json.loads(sentence_path.read_text(encoding="utf-8"))
if isinstance(data, list):
return data
if isinstance(data, dict):
# Fallback: einzelnes Objekt in Liste wrappen
return [data]
return []
except Exception:
return []
@app.route("/api/object/<obj_id>/sentences", methods=["GET"])
def get_object_sentences(obj_id: str):
"""
Gibt alle bisher erzeugten KI-Sätze zu einem Objekt zurück.
"""
sentences = load_sentences_for_object(obj_id)
return jsonify({"object_id": obj_id, "sentences": sentences})
@app.route("/api/object/<obj_id>/generate_sentence", methods=["POST"])
def generate_object_sentence(obj_id: str):
"""
Erzeugt mit Llama 3.1 einen neuen englischen Frage-Antwort-Satz
zu einem Objekt basierend auf den KI-Details und bisherigen Sätzen.
Speichert alle Sätze in sentence_object/<obj_id>.txt.
"""
# Objekt-Metadaten laden (inkl. KI-Details)
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
# Relevante Felder für den Satz (KI-Details)
details = {
"label_en": meta.get("label_en"),
"label_de": meta.get("label_de"),
"label_se": meta.get("label_se"),
"color_en": meta.get("color_en"),
"adjective_en": meta.get("adjective_en"),
"action_verb_en": meta.get("action_verb_en"),
"preposition_en": meta.get("preposition_en"),
"relative_position_en": meta.get("relative_position_en"),
"season_en": meta.get("season_en"),
"title_de": meta.get("title_de"),
"position_de": meta.get("position_de"),
"action_de": meta.get("action_de"),
"condition_de": meta.get("condition_de"),
}
# Bisherige Sätze laden
previous_sentences = load_sentences_for_object(obj_id)
# Prompt laden
prompt_template = read_prompt(
PROMPTS_DIR / "create_sentence.txt",
"Create one new English question_en and answer_en as JSON, based on the object details and avoiding previous sentences.",
)
# Vollständigen Prompt zusammensetzen
try:
details_json = json.dumps(details, ensure_ascii=False, indent=2)
prev_json = json.dumps(previous_sentences, ensure_ascii=False, indent=2)
except Exception:
details_json = str(details)
prev_json = str(previous_sentences)
prompt = (
f"{prompt_template}\n\n"
f"OBJECT_DETAILS_JSON:\n{details_json}\n\n"
f"PREVIOUS_SENTENCES_JSON:\n{prev_json}\n"
)
# LLM-Aufruf (nur Text, kein Bild nötig)
try:
response = ollama.chat(
model="llama3.1",
messages=[{"role": "user", "content": prompt}],
format="json",
options={"temperature": 0.2},
)
except Exception as e:
return jsonify({"error": f"LLM request failed: {e}"}), 500
content = response.get("message", {}).get("content")
if not content:
return jsonify({"error": "Empty response from LLM"}), 500
try:
ai_data = json.loads(content)
except Exception:
# Fallback: erstes JSON-Objekt extrahieren
try:
start = content.index("{")
end = content.rindex("}") + 1
ai_data = json.loads(content[start:end])
except Exception:
return jsonify({"error": "Could not parse JSON from LLM response"}), 500
question_simple = (ai_data.get("question_simple_en") or "").strip()
answer_simple = (ai_data.get("answer_simple_en") or "").strip()
question_advanced = (ai_data.get("question_advanced_en") or "").strip()
answer_advanced = (ai_data.get("answer_advanced_en") or "").strip()
if not question_simple or not answer_simple or not question_advanced or not answer_advanced:
return jsonify({"error": "LLM response missing required fields (question_simple_en, answer_simple_en, question_advanced_en, answer_advanced_en)"}), 500
# Neuen Satz mit Timestamp anreichern
entry = {
"object_id": obj_id,
"created_at": datetime.now().isoformat(),
"question_simple_en": question_simple,
"answer_simple_en": answer_simple,
"question_advanced_en": question_advanced,
"answer_advanced_en": answer_advanced,
}
sentences = previous_sentences + [entry]
# In Datei speichern
SENTENCE_DIR.mkdir(parents=True, exist_ok=True)
sentence_path = SENTENCE_DIR / f"{obj_id}.txt"
sentence_path.write_text(json.dumps(sentences, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({"object_id": obj_id, "sentence": entry, "count": len(sentences)})
@app.route("/api/image/save", methods=["POST"])
def save_image():
"""
Markiert ein Bild und alle zugehörigen Objekte als 'gespeichert',
indem der Dateiname um '_saved' erweitert wird und die Metadaten
der Objekte angepasst werden.
"""
data = request.get_json(force=True, silent=True) or {}
filename = data.get("filename")
if not filename:
return jsonify({"error": "Missing filename"}), 400
src_path = PICTURES_DIR / filename
if not src_path.exists():
return jsonify({"error": "Image not found"}), 404
if src_path.stem.endswith("_saved"):
return jsonify({"error": "Image already saved"}), 400
new_name = f"{src_path.stem}_saved{src_path.suffix}"
dst_path = PICTURES_DIR / new_name
# Bild umbenennen
src_path.rename(dst_path)
# Zugehörige Objekte aktualisieren
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
for meta_file in OBJECTS_DIR.glob("*.txt"):
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
except Exception:
continue
if meta.get("source_filename") != filename:
continue
meta["source_filename"] = new_name
meta_file.write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return jsonify({"old_name": filename, "new_name": new_name})
@app.route("/api/crop", methods=["POST"])
def crop_image():
"""
Erwartet JSON:
{
"filename": "bild.jpg",
"selections": [
{
"number": 1,
"mode": "rect" | "polygon",
"bbox": {"x": 10, "y": 20, "width": 200, "height": 150}, // für rect
"polygon": [{"x": 10, "y": 20}, ...] // für polygon
},
...
],
"title_de": "...",
"position_de": "...",
"action_de": "...",
"condition_de": "..."
}
ODER (Legacy-Format für Kompatibilität):
{
"filename": "bild.jpg",
"mode": "rect" | "polygon",
"x": 10, "y": 20, "width": 200, "height": 150, // für rect
"polygon": [{"x": 10, "y": 20}, ...] // für polygon
}
"""
data = request.get_json(force=True, silent=True) or {}
if "filename" not in data:
return jsonify({"error": "Missing filename"}), 400
src_path = PICTURES_DIR / data["filename"]
if src_path.stem.endswith("_saved"):
return jsonify({"error": "Cannot crop on saved image"}), 400
if not src_path.exists():
return jsonify({"error": "Source image not found"}), 404
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
# UUID für Bild + Metadaten
obj_id = uuid4().hex
timestamp = datetime.now().isoformat()
out_image_name = f"{obj_id}{src_path.suffix}"
out_image_path = OBJECTS_DIR / out_image_name
out_meta_path = OBJECTS_DIR / f"{obj_id}.txt"
# Prüfen, ob neues Format (selections) oder Legacy-Format
selections = data.get("selections")
if selections and isinstance(selections, list) and len(selections) > 0:
# Neues Format: mehrere Auswahlen
processed_selections = []
with Image.open(src_path) as img:
img_w, img_h = img.size
for sel in selections:
sel_mode = sel.get("mode")
if sel_mode == "rect":
bbox = sel.get("bbox")
if not bbox:
continue
try:
x = int(bbox.get("x", 0))
y = int(bbox.get("y", 0))
w = int(bbox.get("width", 0))
h = int(bbox.get("height", 0))
except (ValueError, TypeError):
continue
if w <= 0 or h <= 0:
continue
x2 = min(x + w, img_w)
y2 = min(y + h, img_h)
x1 = max(0, x)
y1 = max(0, y)
if x1 >= x2 or y1 >= y2:
continue
processed_selections.append({
"number": sel.get("number", len(processed_selections) + 1),
"mode": "rect",
"bbox": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1},
})
elif sel_mode == "polygon":
polygon = sel.get("polygon")
if not isinstance(polygon, list) or len(polygon) < 3:
continue
try:
xs = [int(p.get("x", 0)) for p in polygon]
ys = [int(p.get("y", 0)) for p in polygon]
except (KeyError, TypeError, ValueError):
continue
min_x = max(min(xs), 0)
min_y = max(min(ys), 0)
max_x = min(max(xs), img_w)
max_y = min(max(ys), img_h)
if min_x >= max_x or min_y >= max_y:
continue
processed_selections.append({
"number": sel.get("number", len(processed_selections) + 1),
"mode": "polygon",
"polygon": polygon,
"bbox": {"x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y},
})
if not processed_selections:
return jsonify({"error": "No valid selections provided"}), 400
# Erstes Bild aus erster Auswahl erstellen
first_sel = processed_selections[0]
try:
with Image.open(src_path) as img:
if first_sel["mode"] == "rect":
bbox = first_sel["bbox"]
x1 = bbox["x"]
y1 = bbox["y"]
x2 = x1 + bbox["width"]
y2 = y1 + bbox["height"]
cropped = img.crop((x1, y1, x2, y2))
else: # polygon - verwende Bounding-Box für das Bild
bbox = first_sel["bbox"]
x1 = bbox["x"]
y1 = bbox["y"]
x2 = x1 + bbox["width"]
y2 = y1 + bbox["height"]
cropped = img.crop((x1, y1, x2, y2))
# Bild speichern (Format beibehalten)
img_format = img.format or "PNG"
cropped.save(out_image_path, format=img_format)
print(f"[crop_image] Bild gespeichert: {out_image_path} (Größe: {cropped.size}, Format: {img_format})")
except Exception as e:
print(f"[crop_image] Fehler beim Erstellen des Bildes: {e}")
return jsonify({"error": f"Failed to create image: {e}"}), 500
# Metadaten mit allen Auswahlen speichern
meta = {
"id": obj_id,
"created_at": timestamp,
"source_filename": data["filename"],
"image_file": out_image_name,
"hierarchy": 1,
"parent_id": None,
"title_de": data.get("title_de", ""),
"position_de": data.get("position_de", ""),
"action_de": data.get("action_de", ""),
"condition_de": data.get("condition_de", ""),
"selections": processed_selections,
}
else:
# Legacy-Format: einzelne Auswahl (für Rückwärtskompatibilität)
mode = data.get("mode", "rect")
if mode not in {"rect", "polygon"}:
return jsonify({"error": "Invalid mode"}), 400
with Image.open(src_path) as img:
img_w, img_h = img.size
if mode == "rect":
try:
x = int(data["x"])
y = int(data["y"])
w = int(data["width"])
h = int(data["height"])
except (ValueError, TypeError, KeyError):
return jsonify({"error": "Invalid rectangle coordinates"}), 400
if w <= 0 or h <= 0:
return jsonify({"error": "Width and height must be positive"}), 400
x2 = min(x + w, img_w)
y2 = min(y + h, img_h)
x1 = max(0, x)
y1 = max(0, y)
if x1 >= x2 or y1 >= y2:
return jsonify({"error": "Crop area outside of image"}), 400
cropped = img.crop((x1, y1, x2, y2))
bbox = {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1}
else: # polygon
polygon = data.get("polygon") or []
if not isinstance(polygon, list) or len(polygon) < 3:
return jsonify({"error": "Polygon must have at least 3 points"}), 400
try:
xs = [int(p["x"]) for p in polygon]
ys = [int(p["y"]) for p in polygon]
except (KeyError, TypeError, ValueError):
return jsonify({"error": "Invalid polygon coordinates"}), 400
min_x = max(min(xs), 0)
min_y = max(min(ys), 0)
max_x = min(max(xs), img_w)
max_y = min(max(ys), img_h)
if min_x >= max_x or min_y >= max_y:
return jsonify({"error": "Polygon bounding box outside of image"}), 400
cropped = img.crop((min_x, min_y, max_x, max_y))
bbox = {
"x": min_x,
"y": min_y,
"width": max_x - min_x,
"height": max_y - min_y,
}
cropped.save(out_image_path)
# Metadaten als JSON in .txt speichern
meta = {
"id": obj_id,
"created_at": timestamp,
"source_filename": data["filename"],
"mode": mode,
"bbox": bbox,
"image_file": out_image_name,
"hierarchy": 1,
"parent_id": None,
"title_de": data.get("title_de", ""),
"position_de": data.get("position_de", ""),
"action_de": data.get("action_de", ""),
"condition_de": data.get("condition_de", ""),
}
if mode == "polygon":
meta["polygon"] = data.get("polygon")
out_meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify(
{
"id": obj_id,
"image_file": out_image_name,
"meta_file": out_meta_path.name,
}
)
# ── Claude / Directus Helpers ─────────────────────────────────────────────────
def _ensure_junction(collection: str, field1: str, field2: str, token: str):
"""Create a simple M2M junction collection in Directus if it doesn't exist."""
_, status = _directus("GET", f"/collections/{collection}", token)
if status == 200:
return
body = {
"collection": collection,
"schema": {},
"meta": {"hidden": True, "icon": "import_export"},
"fields": [
{
"field": "id",
"type": "integer",
"schema": {"is_primary_key": True, "has_auto_increment": True},
"meta": {"hidden": True},
},
{"field": field1, "type": "uuid", "schema": {}, "meta": {"hidden": True}},
{"field": field2, "type": "uuid", "schema": {}, "meta": {"hidden": True}},
],
}
_directus("POST", "/collections", token, body)
def _ensure_link(collection: str, match: dict, payload: dict, token: str):
"""Insert a junction row only if it doesn't already exist."""
qs = "&".join(
f"filter[{k}][_eq]={urllib.parse.quote(str(v), safe='')}"
for k, v in match.items()
)
data, status = _directus("GET", f"/items/{collection}?{qs}&limit=1", token)
if status == 200 and data.get("data"):
return # already linked
_directus("POST", f"/items/{collection}", token, payload)
def _find_or_create_word(title_de: str, level: int, token: str):
"""Return (word_id, is_new). Creates word with status=draft if missing."""
enc = urllib.parse.quote(title_de, safe="")
data, status = _directus(
"GET", f"/items/words?filter[title_de][_eq]={enc}&fields=id&limit=1", token
)
if status == 200 and data.get("data"):
return data["data"][0]["id"], False
body = {"status": "draft", "title_de": title_de, "level": level}
data, status = _directus("POST", "/items/words", token, body)
if status in (200, 201):
return data["data"]["id"], True
raise RuntimeError(f"Word creation failed ({status}): {data}")
def _find_or_create_question(question_de: str, answer_de: str, level: int,
short_answer_id, short_answer_de: str, obj_id: str, token: str):
"""Return (question_id, is_new). Creates question with status=draft if missing."""
enc = urllib.parse.quote(question_de, safe="")
data, status = _directus(
"GET",
f"/items/questions?filter[question_de][_eq]={enc}&fields=id&limit=1",
token,
)
if status == 200 and data.get("data"):
return data["data"][0]["id"], False
body = {
"status": "draft",
"question_de": question_de,
"answer_de": answer_de,
"level": level,
"object": obj_id,
}
if short_answer_id:
body["short_answer"] = short_answer_id
if short_answer_de:
body["short_answer_de"] = short_answer_de
data, status = _directus("POST", "/items/questions", token, body)
if status in (200, 201):
return data["data"]["id"], True
raise RuntimeError(f"Question creation failed ({status}): {data}")
# ── Generate & Publish endpoints ──────────────────────────────────────────────
@app.route("/api/object/<obj_id>/generate_questions", methods=["POST"])
def generate_questions(obj_id: str):
"""
1. Holt Objekt + Elternobjekt aus Directus
2. Füllt Prompt-Platzhalter
3. Ruft Claude Haiku auf
4. Speichert Wörter + Fragen als Entwurf in Directus
"""
token = request.headers.get("Authorization", "")
body = request.get_json(force=True, silent=True) or {}
prompt_template = (body.get("prompt") or "").strip()
if not prompt_template:
return jsonify({"error": "Missing prompt"}), 400
if not ANTHROPIC_API_KEY:
return jsonify({"error": "ANTHROPIC_API_KEY not configured on server"}), 500
# Objekt laden
obj_resp, s = _directus("GET", f"/items/objects/{obj_id}?fields=id,user_notes,parent", token)
if s != 200:
return jsonify({"error": "Object not found"}), 404
obj = obj_resp.get("data") or {}
# Elternobjekt laden
parent_notes = ""
if obj.get("parent"):
p_resp, _ = _directus("GET", f"/items/objects/{obj['parent']}?fields=id,user_notes", token)
parent_notes = ((p_resp.get("data") or {}).get("user_notes") or "")
# Platzhalter ersetzen
prompt = (
prompt_template
.replace("{user-notes_object}", obj.get("user_notes") or "")
.replace("{user-notes_parentobject}", parent_notes)
)
# Claude Haiku aufrufen
try:
client = _anthropic_sdk.Anthropic(api_key=ANTHROPIC_API_KEY)
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
)
raw = msg.content[0].text
except Exception as e:
return jsonify({"error": f"Claude API error: {e}"}), 500
# JSON parsen
try:
ai = json.loads(raw)
except Exception:
try:
start = raw.index("{")
end = raw.rindex("}") + 1
ai = json.loads(raw[start:end])
except Exception:
return jsonify({"error": "Invalid JSON from AI", "raw": raw[:400]}), 500
levels = ai.get("levels", [])
if not levels:
return jsonify({"error": "No levels in AI response"}), 500
# Junction-Collections sicherstellen (einmalig)
for col, f1, f2 in [
("words_objects", "words_id", "objects_id"),
("questions_objects", "questions_id", "objects_id"),
("questions_distractor_words", "questions_id", "words_id"),
]:
_ensure_junction(col, f1, f2, token)
stats = {
"words_created": 0,
"words_linked": 0,
"questions_created": 0,
"questions_linked": 0,
}
def _sanitize_word(raw: str) -> list[str]:
"""Zerlegt kommagetrennte Einträge und gibt nur echte Einzelwörter zurück."""
tokens = []
for part in raw.replace(";", ",").split(","):
part = part.strip().strip("\"'")
if not part:
continue
words_in_part = part.split()
if len(words_in_part) == 1:
tokens.append(words_in_part[0])
# Mehrwortige Einträge → überspringen (KI-Fehler)
return tokens
# Alle eindeutigen Wörter aus allen Leveln vorab sammeln und einmalig laden
all_words_by_level: dict[str, int] = {} # title_de → first level seen
for lvl in levels:
level = int(lvl.get("level") or 1)
raw_words = lvl.get("words", []) + lvl.get("distractor_words", []) + [lvl.get("short_answer", "")]
for raw in raw_words:
for w in _sanitize_word(str(raw)):
if w and w not in all_words_by_level:
all_words_by_level[w] = level
# Wörter einmalig anlegen / finden (globaler Cache über alle Level)
global_word_map: dict[str, str] = {} # title_de → id
for w, lvl_num in all_words_by_level.items():
try:
wid, is_new = _find_or_create_word(w, lvl_num, token)
global_word_map[w] = wid
_ensure_link(
"words_objects",
{"words_id": wid, "objects_id": obj_id},
{"words_id": wid, "objects_id": obj_id},
token,
)
stats["words_created" if is_new else "words_linked"] += 1
except Exception as e:
print(f"[generate_questions] word error '{w}': {e}")
for lvl in levels:
level = int(lvl.get("level") or 1)
q_de = (lvl.get("question") or "").strip()
a_de = (lvl.get("answer") or "").strip()
short_text = (lvl.get("short_answer") or "").strip()
words_list = [t for raw in lvl.get("words", []) for t in _sanitize_word(str(raw))]
distractor_list = [t for raw in lvl.get("distractor_words", []) for t in _sanitize_word(str(raw))]
if not q_de:
continue
short_answer_id = global_word_map.get(short_text) if short_text else None
# Frage anlegen / verknüpfen
try:
q_id, q_is_new = _find_or_create_question(q_de, a_de, level, short_answer_id, short_text, obj_id, token)
except Exception as e:
print(f"[generate_questions] question error level {level}: {e}")
continue
stats["questions_created" if q_is_new else "questions_linked"] += 1
# Frage ↔ Objekt
_ensure_link(
"questions_objects",
{"questions_id": q_id, "objects_id": obj_id},
{"questions_id": q_id, "objects_id": obj_id},
token,
)
# related_words
for w in words_list:
if w in global_word_map:
_ensure_link(
"questions_words",
{"questions_id": q_id, "words_id": global_word_map[w]},
{"questions_id": q_id, "words_id": global_word_map[w]},
token,
)
# distractor_words
for w in distractor_list:
if w in global_word_map:
_ensure_link(
"questions_distractor_words",
{"questions_id": q_id, "words_id": global_word_map[w]},
{"questions_id": q_id, "words_id": global_word_map[w]},
token,
)
print(f"[generate_questions] obj={obj_id} stats={stats}")
return jsonify({"ok": True, "object_id": obj_id, "stats": stats})
@app.route("/api/object/<obj_id>/publish_questions", methods=["POST"])
def publish_questions(obj_id: str):
"""Ändert Status aller verknüpften Entwurfs-Fragen und -Wörter auf 'published'."""
token = request.headers.get("Authorization", "")
# Verknüpfte Fragen
q_resp, _ = _directus(
"GET",
f"/items/questions_objects?filter[objects_id][_eq]={obj_id}&fields=questions_id&limit=200",
token,
)
q_ids = [e["questions_id"] for e in (q_resp.get("data") or [])]
# Verknüpfte Wörter
w_resp, _ = _directus(
"GET",
f"/items/words_objects?filter[objects_id][_eq]={obj_id}&fields=words_id&limit=2000",
token,
)
w_ids = [e["words_id"] for e in (w_resp.get("data") or [])]
published_q = 0
published_w = 0
for q_id in q_ids:
_, s = _directus("PATCH", f"/items/questions/{q_id}", token, {"status": "published"})
if s == 200:
published_q += 1
for w_id in w_ids:
_, s = _directus("PATCH", f"/items/words/{w_id}", token, {"status": "published"})
if s == 200:
published_w += 1
return jsonify({
"ok": True,
"published_questions": published_q,
"published_words": published_w,
})
@app.route("/api/object/<obj_id>/questions", methods=["GET"])
def get_object_questions_list(obj_id: str):
"""Gibt alle verknüpften Fragen eines Objekts zurück (mit short_answer_de + distractor_words)."""
token = request.headers.get("Authorization", "")
# Schritt 1: Frage-IDs aus Junction
junc, _ = _directus("GET",
f"/items/questions_objects?filter[objects_id][_eq]={obj_id}&fields=questions_id&limit=200", token)
q_ids = [e["questions_id"] for e in (junc.get("data") or []) if e.get("questions_id")]
if not q_ids:
return jsonify({"data": []})
# Schritt 2: Fragen laden (inkl. short_answer_de) archivierte ausschließen
ids_param = urllib.parse.quote(",".join(q_ids), safe="")
q_data, _ = _directus("GET",
f"/items/questions?filter[id][_in]={ids_param}&filter[status][_neq]=archived&fields=id,question_de,answer_de,short_answer_de,level,status&limit=200", token)
items = sorted(q_data.get("data") or [], key=lambda x: x.get("level") or 0)
# Schritt 3: Distractor-Wörter pro Frage (Bulk)
dw_junc, _ = _directus("GET",
f"/items/questions_distractor_words?filter[questions_id][_in]={ids_param}&fields=questions_id,words_id&limit=5000", token)
dw_entries = dw_junc.get("data") or []
# Wort-IDs sammeln und Titel laden
all_word_ids = list({e["words_id"] for e in dw_entries if e.get("words_id")})
word_title_map: dict[str, str] = {}
if all_word_ids:
wids_param = urllib.parse.quote(",".join(all_word_ids), safe="")
w_data, _ = _directus("GET",
f"/items/words?filter[id][_in]={wids_param}&fields=id,title_de&limit=5000", token)
word_title_map = {w["id"]: w["title_de"] for w in (w_data.get("data") or [])}
# Distractor-Wörter je Frage gruppieren
dw_by_question: dict[str, list[str]] = {}
for e in dw_entries:
qid = e.get("questions_id")
wid = e.get("words_id")
if qid and wid and wid in word_title_map:
dw_by_question.setdefault(qid, []).append(word_title_map[wid])
for item in items:
item["distractor_words"] = dw_by_question.get(item["id"], [])
return jsonify({"data": items})
@app.route("/api/object/<obj_id>/words", methods=["GET"])
def get_object_words_list(obj_id: str):
"""Gibt alle verknüpften Wörter eines Objekts zurück (2-Schritt-Query)."""
token = request.headers.get("Authorization", "")
junc, _ = _directus("GET",
f"/items/words_objects?filter[objects_id][_eq]={obj_id}&fields=words_id&limit=2000", token)
w_ids = [e["words_id"] for e in (junc.get("data") or []) if e.get("words_id")]
if not w_ids:
return jsonify({"data": []})
ids_param = urllib.parse.quote(",".join(w_ids), safe="")
w_data, _ = _directus("GET",
f"/items/words?filter[id][_in]={ids_param}&filter[status][_neq]=archived&fields=id,title_de,level,status&limit=2000", token)
items = sorted(w_data.get("data") or [], key=lambda x: x.get("title_de") or "")
return jsonify({"data": items})
def _delete_junction_rows(collection: str, field: str, value: str, token: str):
"""Löscht alle Junction-Zeilen für einen gegebenen Fremdschlüssel."""
data, s = _directus("GET",
f"/items/{collection}?filter[{field}][_eq]={value}&fields=id&limit=5000", token)
ids = [e["id"] for e in (data.get("data") or []) if e.get("id")]
if ids:
_directus("DELETE", f"/items/{collection}", token, ids)
@app.route("/api/question/<q_id>", methods=["DELETE"])
def delete_question_item(q_id: str):
"""Löscht eine Frage + alle zugehörigen Junction-Zeilen aus Directus."""
token = request.headers.get("Authorization", "")
_delete_junction_rows("questions_objects", "questions_id", q_id, token)
_delete_junction_rows("questions_distractor_words", "questions_id", q_id, token)
_delete_junction_rows("questions_words", "questions_id", q_id, token)
_, status = _directus("DELETE", f"/items/questions/{q_id}", token)
if status in (200, 204):
return jsonify({"ok": True})
return jsonify({"error": "Delete failed"}), status
@app.route("/api/word/<w_id>", methods=["DELETE"])
def delete_word_item(w_id: str):
"""Löscht ein Wort + alle zugehörigen Junction-Zeilen aus Directus."""
token = request.headers.get("Authorization", "")
_delete_junction_rows("words_objects", "words_id", w_id, token)
_delete_junction_rows("questions_words", "words_id", w_id, token)
_delete_junction_rows("questions_distractor_words", "words_id", w_id, token)
_, status = _directus("DELETE", f"/items/words/{w_id}", token)
if status in (200, 204):
return jsonify({"ok": True})
return jsonify({"error": "Delete failed"}), status
@app.route("/api/object/<obj_id>/purge-orphans", methods=["POST"])
def purge_orphan_junctions(obj_id: str):
"""
Bereinigt verwaiste Junction-Einträge für ein Objekt:
Entfernt Zeilen aus questions_objects/words_objects deren Frage/Wort nicht mehr existiert.
"""
token = request.headers.get("Authorization", "")
removed = 0
for junc_col, fk_field, item_col in [
("questions_objects", "questions_id", "questions"),
("words_objects", "words_id", "words"),
]:
junc_data, _ = _directus("GET",
f"/items/{junc_col}?filter[objects_id][_eq]={obj_id}&fields=id,{fk_field}&limit=5000", token)
for row in (junc_data.get("data") or []):
fk_val = row.get(fk_field)
if not fk_val:
continue
item_data, s = _directus("GET", f"/items/{item_col}/{fk_val}?fields=id,status", token)
item = item_data.get("data") or {}
if s != 200 or not item or item.get("status") == "archived":
_directus("DELETE", f"/items/{junc_col}/{row['id']}", token)
removed += 1
return jsonify({"ok": True, "orphans_removed": removed})
@app.route("/api/purge-all-orphans", methods=["POST"])
def purge_all_orphans():
"""
Bereinigt verwaiste Junction-Einträge für ALLE Objekte auf einmal.
Lädt alle Junction-Zeilen und prüft, ob das referenzierte Item noch existiert.
"""
token = request.headers.get("Authorization", "")
removed = 0
for junc_col, fk_field, item_col in [
("questions_objects", "questions_id", "questions"),
("words_objects", "words_id", "words"),
]:
junc_data, _ = _directus("GET",
f"/items/{junc_col}?fields=id,{fk_field}&limit=10000", token)
rows = junc_data.get("data") or []
# collect all unique FK values
fk_ids = list({row[fk_field] for row in rows if row.get(fk_field)})
if not fk_ids:
continue
# fetch which IDs still exist AND are not archived
ids_param = ",".join(fk_ids)
existing_data, _ = _directus("GET",
f"/items/{item_col}?filter[id][_in]={ids_param}&filter[status][_neq]=archived&fields=id&limit=10000", token)
existing_ids = {e["id"] for e in (existing_data.get("data") or [])}
orphan_junc_ids = [row["id"] for row in rows
if row.get(fk_field) and row[fk_field] not in existing_ids]
if orphan_junc_ids:
_directus("DELETE", f"/items/{junc_col}", token, orphan_junc_ids)
removed += len(orphan_junc_ids)
return jsonify({"ok": True, "orphans_removed": removed})
@app.route("/api/fix-distractor-field", methods=["POST"])
def fix_distractor_field():
"""Setzt special=m2m auf questions.distractor_words (einmalig)."""
token = request.headers.get("Authorization", "")
# Directus erfordert den vollen Payload beim Patchen von special
payload = {
"meta": {
"special": ["m2m"],
"interface": "list-m2m",
"options": {"template": "{{words_id.title_de}}"},
"hidden": False,
"note": "Ablenker-Wörter (thematisch passend, aber nicht die Antwort)",
}
}
data, status = _directus("PATCH", "/fields/questions/distractor_words", token, payload)
return jsonify({"ok": status in (200, 201), "status": status, "data": data})
@app.route("/api/setup-schema", methods=["POST"])
def setup_directus_schema():
"""
Einmalig ausführen: Konfiguriert alle M2M-Relationen in Directus.
Idempotent bereits vorhandene Felder/Relationen werden übersprungen.
"""
token = request.headers.get("Authorization", "")
results = []
def do(label, method, path, body=None):
data, status = _directus(method, path, token, body)
ok = status in (200, 201, 204)
results.append({"label": label, "status": status, "ok": ok,
"err": None if ok else (data.get("errors") or data)})
return ok
# ── Fix M2O: questions.object → objects ──────────────────────────────────
do("relation questions.object→objects", "POST", "/relations", {
"collection": "questions", "field": "object",
"related_collection": "objects",
"schema": {"on_delete": "SET NULL"},
"meta": {"one_deselect_action": "nullify"},
})
# Update display template so the object label shows
do("update field questions.object template", "PATCH", "/fields/questions/object", {
"meta": {"options": {"template": "{{user_notes}}"}}
})
# ── Fix M2O: questions.short_answer → words ───────────────────────────────
do("relation questions.short_answer→words", "POST", "/relations", {
"collection": "questions", "field": "short_answer",
"related_collection": "words",
"schema": {"on_delete": "SET NULL"},
"meta": {"one_deselect_action": "nullify"},
})
# ── M2M: questions ↔ objects via questions_objects ────────────────────────
do("field questions.linked_objects", "POST", "/fields/questions", {
"field": "linked_objects", "type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{objects_id.user_notes}}"},
"note": "Verknüpfte Objekte"},
})
do("field objects.linked_questions", "POST", "/fields/objects", {
"field": "linked_questions", "type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{questions_id.question_de}}"},
"note": "Verknüpfte Fragen"},
})
do("relation questions_objects.questions_id→questions", "POST", "/relations", {
"collection": "questions_objects", "field": "questions_id",
"related_collection": "questions",
"schema": {"on_delete": "CASCADE"},
"meta": {"junction_field": "objects_id", "one_field": "linked_objects",
"one_deselect_action": "nullify"},
})
do("relation questions_objects.objects_id→objects", "POST", "/relations", {
"collection": "questions_objects", "field": "objects_id",
"related_collection": "objects",
"schema": {"on_delete": "CASCADE"},
"meta": {"junction_field": "questions_id", "one_field": "linked_questions",
"one_deselect_action": "nullify"},
})
# ── M2M: words ↔ objects via words_objects ────────────────────────────────
do("field words.linked_objects", "POST", "/fields/words", {
"field": "linked_objects", "type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{objects_id.user_notes}}"},
"note": "Verknüpfte Objekte"},
})
do("field objects.linked_words", "POST", "/fields/objects", {
"field": "linked_words", "type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{words_id.title_de}}"},
"note": "Verknüpfte Wörter"},
})
do("relation words_objects.words_id→words", "POST", "/relations", {
"collection": "words_objects", "field": "words_id",
"related_collection": "words",
"schema": {"on_delete": "CASCADE"},
"meta": {"junction_field": "objects_id", "one_field": "linked_objects",
"one_deselect_action": "nullify"},
})
do("relation words_objects.objects_id→objects", "POST", "/relations", {
"collection": "words_objects", "field": "objects_id",
"related_collection": "objects",
"schema": {"on_delete": "CASCADE"},
"meta": {"junction_field": "words_id", "one_field": "linked_words",
"one_deselect_action": "nullify"},
})
# ── M2M: questions ↔ words (distractor) via questions_distractor_words ────
# Feld existiert evtl. schon ohne special → PATCH erzwingen
do("field questions.distractor_words (create)", "POST", "/fields/questions", {
"field": "distractor_words", "type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{words_id.title_de}}"},
"note": "Ablenker-Wörter (nicht in Frage/Antwort)"},
})
do("field questions.distractor_words (patch special)", "PATCH", "/fields/questions/distractor_words", {
"type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{words_id.title_de}}"},
"hidden": False,
"note": "Ablenker-Wörter (nicht in Frage/Antwort)"},
})
do("relation questions_distractor_words.questions_id→questions", "POST", "/relations", {
"collection": "questions_distractor_words", "field": "questions_id",
"related_collection": "questions",
"schema": {"on_delete": "CASCADE"},
"meta": {"junction_field": "words_id", "one_field": "distractor_words",
"one_deselect_action": "nullify"},
})
do("relation questions_distractor_words.words_id→words", "POST", "/relations", {
"collection": "questions_distractor_words", "field": "words_id",
"related_collection": "words",
"schema": {"on_delete": "CASCADE"},
"meta": {"junction_field": "questions_id", "one_deselect_action": "nullify"},
})
# ── Backref: words → questions via existing questions_words junction ───────
do("field words.linked_questions", "POST", "/fields/words", {
"field": "linked_questions", "type": "alias",
"meta": {"special": ["m2m"], "interface": "list-m2m",
"options": {"template": "{{questions_id.question_de}}"},
"note": "Fragen in denen dieses Wort vorkommt"},
})
# Patch the existing questions_words relation to add the one_field backref on words
do("patch relation questions_words.words_id one_field", "PATCH",
"/relations/questions_words/words_id", {
"meta": {"one_field": "linked_questions", "one_deselect_action": "nullify"}
})
failed = [r for r in results if not r["ok"]]
return jsonify({"ok": len(failed) == 0, "total": len(results),
"failed": len(failed), "results": results})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True)