2117 lines
87 KiB
Python
2117 lines
87 KiB
Python
from pathlib import Path
|
||
from datetime import datetime
|
||
from uuid import uuid4
|
||
import json
|
||
import os
|
||
import urllib.request
|
||
import urllib.error
|
||
import urllib.parse
|
||
|
||
from flask import Flask, send_from_directory, request, jsonify
|
||
from flask_cors import CORS
|
||
from PIL import Image
|
||
import ollama
|
||
import anthropic as _anthropic_sdk
|
||
|
||
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
|
||
|
||
DIRECTUS_URL = "https://db.hejyou.com"
|
||
DIRECTUS_ADMIN_TOKEN = os.environ.get("DIRECTUS_ADMIN_TOKEN", "Bearer tnBshnvge8KBu0WqykSQvgBperI2j_0b")
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
PICTURES_DIR = BASE_DIR / "pictures"
|
||
OBJECTS_DIR = BASE_DIR / "objects_image"
|
||
SENTENCE_DIR = BASE_DIR / "sentence_object"
|
||
PROMPTS_DIR = BASE_DIR / "prompts"
|
||
|
||
app = Flask(
|
||
__name__,
|
||
template_folder=str(BASE_DIR / "templates"),
|
||
static_folder=str(BASE_DIR / "static"),
|
||
)
|
||
CORS(app)
|
||
|
||
|
||
def read_prompt(filepath: Path, fallback: str) -> str:
|
||
"""
|
||
Hilfsfunktion, um Prompt-Dateien zu lesen.
|
||
"""
|
||
try:
|
||
if filepath.exists():
|
||
return filepath.read_text(encoding="utf-8").strip()
|
||
except Exception:
|
||
pass
|
||
return fallback.strip()
|
||
|
||
|
||
def _directus(method, path, token, body=None):
|
||
"""Hilfsfunktion: Directus-API-Aufruf via urllib."""
|
||
headers = {"Content-Type": "application/json"}
|
||
if token:
|
||
headers["Authorization"] = token
|
||
req = urllib.request.Request(
|
||
f"{DIRECTUS_URL}{path}",
|
||
data=json.dumps(body).encode() if body is not None else None,
|
||
headers=headers,
|
||
method=method,
|
||
)
|
||
try:
|
||
with urllib.request.urlopen(req) as resp:
|
||
raw = resp.read().decode("utf-8")
|
||
return json.loads(raw) if raw else {}, resp.status
|
||
except urllib.error.HTTPError as e:
|
||
raw = e.read().decode("utf-8")
|
||
return json.loads(raw) if raw else {}, e.code
|
||
|
||
|
||
@app.route("/api/directus/auth/login", methods=["POST"])
|
||
def directus_auth_login():
|
||
"""Proxy: Directus-Login ohne CORS-Probleme."""
|
||
data, status = _directus("POST", "/auth/login", token=None, body=request.get_json())
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/pictures", methods=["GET"])
|
||
def directus_pictures():
|
||
"""Proxy: Directus-Bilder nach Status filtern."""
|
||
token = request.headers.get("Authorization", "")
|
||
pic_status = request.args.get("status", "new")
|
||
data, status = _directus("GET", f"/items/pictures?filter[status][_eq]={pic_status}&fields=id,media,status&sort=date_created", token)
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/pictures/<pic_id>", methods=["PATCH"])
|
||
def directus_picture(pic_id):
|
||
"""Proxy: Bild-Status aktualisieren."""
|
||
token = request.headers.get("Authorization", "")
|
||
data, status = _directus("PATCH", f"/items/pictures/{pic_id}", token, body=request.get_json())
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/objects", methods=["GET", "POST"])
|
||
def directus_objects():
|
||
"""Proxy: Objekte laden (GET) oder anlegen (POST)."""
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "GET":
|
||
picture_id = request.args.get("picture_id", "")
|
||
fields = "id,selections,user_notes,parent,status,picture"
|
||
path = f"/items/objects?filter[picture][_eq]={picture_id}&fields={fields}&sort=date_created"
|
||
data, status = _directus("GET", path, token)
|
||
return jsonify(data), status
|
||
else:
|
||
data, status = _directus("POST", "/items/objects", token, body=request.get_json())
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/objects/<obj_id>", methods=["PATCH", "DELETE"])
|
||
def directus_object(obj_id):
|
||
"""Proxy: Objekt aktualisieren (PATCH) oder löschen (DELETE)."""
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "PATCH":
|
||
data, status = _directus("PATCH", f"/items/objects/{obj_id}", token, body=request.get_json())
|
||
else:
|
||
data, status = _directus("DELETE", f"/items/objects/{obj_id}", token)
|
||
return jsonify(data), status
|
||
|
||
|
||
def _setup_words_pictures(token: str):
|
||
"""Richtet M2M-Relation words ↔ pictures idempotent ein (läuft einmalig)."""
|
||
_directus("PATCH", "/fields/pictures/linked_words", token, {
|
||
"type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{words_id.title_de}}"},
|
||
"note": "Verknüpfte Safe Words"},
|
||
})
|
||
_directus("PATCH", "/fields/words/linked_pictures", token, {
|
||
"type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{pictures_id.media}}"},
|
||
"note": "Verknüpfte Bilder"},
|
||
})
|
||
_directus("POST", "/relations", token, {
|
||
"collection": "words_pictures", "field": "words_id",
|
||
"related_collection": "words",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "pictures_id", "one_field": "linked_pictures",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
_directus("POST", "/relations", token, {
|
||
"collection": "words_pictures", "field": "pictures_id",
|
||
"related_collection": "pictures",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "words_id", "one_field": "linked_words",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
|
||
|
||
@app.route("/api/directus/pictures/<pic_id>/words", methods=["GET", "POST"])
|
||
def directus_picture_words(pic_id):
|
||
"""Proxy: Safe-Words eines Bildes laden (GET) oder speichern (POST).
|
||
Nutzt natives Directus M2M über pictures.linked_words.
|
||
"""
|
||
token = request.headers.get("Authorization", "")
|
||
|
||
if request.method == "GET":
|
||
# Natives Directus Deep-Query über M2M-Relation
|
||
data, status = _directus(
|
||
"GET",
|
||
f"/items/pictures/{pic_id}"
|
||
f"?fields[]=linked_words.id"
|
||
f"&fields[]=linked_words.words_id.id"
|
||
f"&fields[]=linked_words.words_id.title_de"
|
||
f"&fields[]=linked_words.words_id.level"
|
||
f"&fields[]=linked_words.words_id.status",
|
||
token,
|
||
)
|
||
if status != 200:
|
||
return jsonify({"data": []})
|
||
items = []
|
||
for entry in ((data.get("data") or {}).get("linked_words") or []):
|
||
word = entry.get("words_id") or {}
|
||
if not isinstance(word, dict) or not word.get("id"):
|
||
continue
|
||
if word.get("status") == "archived":
|
||
continue
|
||
items.append({
|
||
"id": entry.get("id", ""),
|
||
"word_id": word["id"],
|
||
"title_de": word.get("title_de", ""),
|
||
"level": word.get("level") or 50,
|
||
"status": word.get("status", ""),
|
||
})
|
||
return jsonify({"data": items})
|
||
|
||
else: # POST
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
words_to_save = body.get("words", [])
|
||
|
||
# Relationen einmalig sicherstellen
|
||
_setup_words_pictures(token)
|
||
|
||
# Bereits verknüpfte Word-IDs laden (Duplikat-Schutz)
|
||
existing_data, _ = _directus(
|
||
"GET",
|
||
f"/items/pictures/{pic_id}?fields[]=linked_words.words_id",
|
||
token,
|
||
)
|
||
existing_ids = set()
|
||
for e in ((existing_data.get("data") or {}).get("linked_words") or []):
|
||
wid = e.get("words_id")
|
||
if wid:
|
||
existing_ids.add(wid if isinstance(wid, str) else wid.get("id", ""))
|
||
|
||
# Wörter anlegen/finden, Level updaten, neue Links sammeln
|
||
new_links = []
|
||
saved = 0
|
||
for entry in words_to_save:
|
||
title_de = (entry.get("title_de") or "").strip()
|
||
level = int(entry.get("level") or 50)
|
||
if not title_de:
|
||
continue
|
||
try:
|
||
wid, is_new = _find_or_create_word(title_de, level, token)
|
||
if not is_new:
|
||
_directus("PATCH", f"/items/words/{wid}", token, {"level": level})
|
||
if wid not in existing_ids:
|
||
new_links.append({"words_id": wid})
|
||
existing_ids.add(wid)
|
||
saved += 1
|
||
except Exception as e:
|
||
print(f"[picture_words] error for '{title_de}': {e}")
|
||
|
||
# Alle neuen Links in einem einzigen Directus-PATCH
|
||
if new_links:
|
||
_directus("PATCH", f"/items/pictures/{pic_id}", token, {
|
||
"linked_words": {"create": new_links}
|
||
})
|
||
|
||
return jsonify({"ok": True, "saved": saved})
|
||
|
||
|
||
@app.route("/api/images", methods=["GET"])
|
||
def list_images():
|
||
"""
|
||
Gibt alle Bilder im pictures-Ordner zurück.
|
||
Query-Parameter: ?mode=draw (Standard) oder ?mode=generate
|
||
draw → Bilder ohne _saved-Suffix
|
||
generate → Bilder mit _saved-Suffix
|
||
"""
|
||
mode = request.args.get("mode", "draw")
|
||
PICTURES_DIR.mkdir(parents=True, exist_ok=True)
|
||
if mode == "generate":
|
||
image_paths = sorted(
|
||
[f for f in PICTURES_DIR.iterdir() if f.is_file() and f.stem.endswith("_saved")],
|
||
key=lambda p: p.stat().st_mtime,
|
||
)
|
||
else:
|
||
image_paths = sorted(
|
||
[f for f in PICTURES_DIR.iterdir() if f.is_file() and not f.stem.endswith("_saved")],
|
||
key=lambda p: p.stat().st_mtime,
|
||
)
|
||
return jsonify({"images": [p.name for p in image_paths]})
|
||
|
||
|
||
REACT_BUILD_DIR = BASE_DIR / "static" / "react"
|
||
|
||
|
||
def _serve_spa():
|
||
"""Liefert die React SPA index.html für alle Frontend-Routen."""
|
||
return send_from_directory(REACT_BUILD_DIR, "index.html")
|
||
|
||
|
||
@app.route("/")
|
||
@app.route("/draw")
|
||
@app.route("/generate")
|
||
@app.route("/annotate")
|
||
def serve_spa():
|
||
return _serve_spa()
|
||
|
||
|
||
@app.route("/assets/<path:filename>")
|
||
def serve_react_assets(filename: str):
|
||
"""Liefert JS/CSS-Assets aus dem Vite-Build."""
|
||
return send_from_directory(REACT_BUILD_DIR / "assets", filename)
|
||
|
||
|
||
@app.route("/pictures/<path:filename>")
|
||
def serve_picture(filename: str):
|
||
# Statisches Ausliefern der Originalbilder
|
||
return send_from_directory(PICTURES_DIR, filename)
|
||
|
||
|
||
@app.route("/objects_image/<path:filename>")
|
||
def serve_object_image(filename: str):
|
||
# Ausliefern der gespeicherten Ausschnitt-Bilder
|
||
return send_from_directory(OBJECTS_DIR, filename)
|
||
|
||
|
||
@app.route("/api/objects", methods=["GET"])
|
||
def list_objects_for_image():
|
||
"""
|
||
Gibt alle gespeicherten Objekte (Ausschnitte) zu einem Quellbild zurück.
|
||
|
||
Query-Parameter:
|
||
?filename=testbild.png
|
||
"""
|
||
filename = request.args.get("filename")
|
||
if not filename:
|
||
return jsonify({"error": "Missing filename query parameter"}), 400
|
||
|
||
# Für *_saved-Bilder auch Objekte berücksichtigen, deren source_filename
|
||
# noch auf den ursprünglichen Namen ohne "_saved" zeigt (Kompatibilität)
|
||
base_filename = filename
|
||
legacy_filename = None
|
||
if base_filename.endswith(".png") or base_filename.endswith(".jpg") or base_filename.endswith(".jpeg") or base_filename.endswith(".webp"):
|
||
stem = base_filename.rsplit(".", 1)[0]
|
||
suffix = base_filename.rsplit(".", 1)[1]
|
||
if stem.endswith("_saved"):
|
||
legacy_filename = f"{stem[:-6]}.{suffix}"
|
||
|
||
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
|
||
objects = []
|
||
for meta_file in OBJECTS_DIR.glob("*.txt"):
|
||
try:
|
||
meta = json.loads(meta_file.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
continue
|
||
|
||
src_name = meta.get("source_filename")
|
||
if src_name != filename and (legacy_filename is None or src_name != legacy_filename):
|
||
continue
|
||
|
||
objects.append(
|
||
{
|
||
"id": meta.get("id"),
|
||
"image_file": meta.get("image_file"),
|
||
"title_de": meta.get("title_de", ""),
|
||
"position_de": meta.get("position_de", ""),
|
||
"action_de": meta.get("action_de", ""),
|
||
"condition_de": meta.get("condition_de", ""),
|
||
# Von KI erzeugte Details (optional)
|
||
"label_en": meta.get("label_en"),
|
||
"label_de": meta.get("label_de"),
|
||
"label_se": meta.get("label_se"),
|
||
"color_en": meta.get("color_en"),
|
||
"adjective_en": meta.get("adjective_en"),
|
||
"action_verb_en": meta.get("action_verb_en"),
|
||
"preposition_en": meta.get("preposition_en"),
|
||
"relative_position_en": meta.get("relative_position_en"),
|
||
"season_en": meta.get("season_en"),
|
||
"mode": meta.get("mode"),
|
||
"bbox": meta.get("bbox"),
|
||
"polygon": meta.get("polygon"),
|
||
"hierarchy": meta.get("hierarchy", 1),
|
||
"parent_id": meta.get("parent_id"),
|
||
"created_at": meta.get("created_at"),
|
||
}
|
||
)
|
||
|
||
# Älteste zuerst sortieren (1 = ältestes Objekt)
|
||
objects.sort(key=lambda o: o.get("created_at") or "")
|
||
|
||
# Laufende Nummer je Bild (1..n) nach obiger Sortierung
|
||
for idx, obj in enumerate(objects, start=1):
|
||
obj["index"] = idx
|
||
|
||
return jsonify({"objects": objects})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/hierarchy", methods=["POST"])
|
||
def update_object_hierarchy(obj_id: str):
|
||
"""
|
||
Aktualisiert die Hierarchie (1, 2 oder 3) eines gespeicherten Objekts.
|
||
"""
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
try:
|
||
hierarchy = int(data.get("hierarchy"))
|
||
except (TypeError, ValueError):
|
||
return jsonify({"error": "Invalid hierarchy"}), 400
|
||
|
||
if hierarchy not in (1, 2, 3):
|
||
return jsonify({"error": "Hierarchy must be 1, 2 or 3"}), 400
|
||
|
||
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
|
||
if not meta_path.exists():
|
||
return jsonify({"error": "Object not found"}), 404
|
||
|
||
try:
|
||
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return jsonify({"error": "Could not read meta file"}), 500
|
||
|
||
meta["hierarchy"] = hierarchy
|
||
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
return jsonify({"id": obj_id, "hierarchy": hierarchy})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>", methods=["POST"])
|
||
def update_object_meta(obj_id: str):
|
||
"""
|
||
Aktualisiert Text-Metadaten (title_de, position_de, action_de, condition_de)
|
||
eines gespeicherten Objekts.
|
||
"""
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
|
||
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
|
||
if not meta_path.exists():
|
||
return jsonify({"error": "Object not found"}), 404
|
||
|
||
try:
|
||
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return jsonify({"error": "Could not read meta file"}), 500
|
||
|
||
for key in ("title_de", "position_de", "action_de", "condition_de"):
|
||
if key in data:
|
||
meta[key] = data.get(key, "")
|
||
|
||
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
return jsonify({k: meta.get(k) for k in ("id", "title_de", "position_de", "action_de", "condition_de")})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/parent", methods=["POST"])
|
||
def update_object_parent(obj_id: str):
|
||
"""
|
||
Aktualisiert die Parent-Relation eines Objekts.
|
||
Erwartet JSON: { "parent_id": "<uuid>" | null }
|
||
"""
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
parent_id = data.get("parent_id")
|
||
|
||
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
|
||
if not meta_path.exists():
|
||
return jsonify({"error": "Object not found"}), 404
|
||
|
||
try:
|
||
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return jsonify({"error": "Could not read meta file"}), 500
|
||
|
||
# Optional: prüfen, ob parent_id existiert und zum selben Bild gehört
|
||
if parent_id:
|
||
parent_meta_path = OBJECTS_DIR / f"{parent_id}.txt"
|
||
if not parent_meta_path.exists():
|
||
return jsonify({"error": "Parent object not found"}), 400
|
||
try:
|
||
parent_meta = json.loads(parent_meta_path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return jsonify({"error": "Could not read parent meta file"}), 500
|
||
|
||
if parent_meta.get("source_filename") != meta.get("source_filename"):
|
||
return jsonify({"error": "Parent must belong to same source image"}), 400
|
||
|
||
meta["parent_id"] = parent_id or None
|
||
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
return jsonify({"id": obj_id, "parent_id": meta.get("parent_id")})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/generate_details", methods=["POST"])
|
||
def generate_object_details(obj_id: str):
|
||
"""
|
||
Erzeugt mit Llama 3.2 Vision zusätzliche englische Metadaten zu einem Objekt
|
||
auf Basis des ausgeschnittenen Objektbildes + vorhandener deutscher Felder.
|
||
"""
|
||
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
|
||
if not meta_path.exists():
|
||
return jsonify({"error": "Object not found"}), 404
|
||
|
||
try:
|
||
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return jsonify({"error": "Could not read meta file"}), 500
|
||
|
||
image_file = meta.get("image_file")
|
||
if not image_file:
|
||
return jsonify({"error": "Object has no image_file"}), 400
|
||
|
||
image_path = OBJECTS_DIR / image_file
|
||
if not image_path.exists():
|
||
return jsonify({"error": "Object image not found"}), 404
|
||
|
||
# Prompt laden
|
||
prompt_template = read_prompt(
|
||
PROMPTS_DIR / "create_details.txt",
|
||
"Analyze this object and return one JSON object with the requested fields.",
|
||
)
|
||
|
||
title_de = meta.get("title_de", "") or ""
|
||
action_de = meta.get("action_de", "") or ""
|
||
condition_de = meta.get("condition_de", "") or ""
|
||
|
||
# Platzhalter im Prompt ersetzen (einfache .format-Verwendung)
|
||
try:
|
||
prompt = prompt_template.format(
|
||
title_de=title_de,
|
||
action_de=action_de,
|
||
condition_de=condition_de,
|
||
)
|
||
except Exception:
|
||
# Falls das Format fehlschlägt, einfach Rohprompt + Kontext anhängen
|
||
prompt = (
|
||
f"{prompt_template}\n\n"
|
||
f"Titel (Deutsch): {title_de}\n"
|
||
f"Status (Deutsch): {action_de}\n"
|
||
f"Zustand (Deutsch): {condition_de}\n"
|
||
)
|
||
|
||
# Bild als Bytes laden
|
||
try:
|
||
img_bytes = image_path.read_bytes()
|
||
except Exception:
|
||
return jsonify({"error": "Could not read object image"}), 500
|
||
|
||
try:
|
||
response = ollama.chat(
|
||
model="llama3.2-vision",
|
||
messages=[{"role": "user", "content": prompt, "images": [img_bytes]}],
|
||
format="json",
|
||
options={"temperature": 0},
|
||
)
|
||
except Exception as e:
|
||
return jsonify({"error": f"LLM request failed: {e}"}), 500
|
||
|
||
content = response.get("message", {}).get("content")
|
||
if not content:
|
||
return jsonify({"error": "Empty response from LLM"}), 500
|
||
|
||
try:
|
||
ai_data = json.loads(content)
|
||
except Exception:
|
||
# Fallback: versuchen, das erste JSON-Objekt zu extrahieren
|
||
try:
|
||
start = content.index("{")
|
||
end = content.rindex("}") + 1
|
||
ai_data = json.loads(content[start:end])
|
||
except Exception:
|
||
return jsonify({"error": "Could not parse JSON from LLM response"}), 500
|
||
|
||
# Debug-Ausgabe des rohen AI-Outputs
|
||
try:
|
||
print(f"[generate_details] Raw AI data for {obj_id}: {json.dumps(ai_data, ensure_ascii=False)}")
|
||
except Exception:
|
||
print(f"[generate_details] Raw AI data for {obj_id} (non-serializable): {ai_data!r}")
|
||
|
||
# Erwartete Felder übernehmen (gemäß neuem Prompt)
|
||
fields = [
|
||
"label_en",
|
||
"label_de",
|
||
"label_se",
|
||
"color_en",
|
||
"adjective_en",
|
||
"action_verb_en",
|
||
"preposition_en",
|
||
"relative_position_en",
|
||
"season_en",
|
||
]
|
||
for key in fields:
|
||
if key in ai_data:
|
||
meta[key] = ai_data.get(key)
|
||
|
||
# Metadatei zurückschreiben
|
||
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
# Nur die neuen Felder zurückgeben (None als leeren String konvertieren)
|
||
result = {key: (meta.get(key) or "") for key in fields}
|
||
|
||
try:
|
||
print(f"[generate_details] Stored meta for {obj_id}: {json.dumps(result, ensure_ascii=False)}")
|
||
except Exception:
|
||
print(f"[generate_details] Stored meta for {obj_id}: {result!r}")
|
||
|
||
return jsonify(result)
|
||
|
||
|
||
def load_sentences_for_object(obj_id: str):
|
||
"""
|
||
Lädt alle gespeicherten Sätze für ein Objekt aus sentence_object/<obj_id>.txt.
|
||
Rückgabe: Liste von Dicts mit mindestens question_en, answer_en, created_at.
|
||
"""
|
||
SENTENCE_DIR.mkdir(parents=True, exist_ok=True)
|
||
sentence_path = SENTENCE_DIR / f"{obj_id}.txt"
|
||
if not sentence_path.exists():
|
||
return []
|
||
try:
|
||
data = json.loads(sentence_path.read_text(encoding="utf-8"))
|
||
if isinstance(data, list):
|
||
return data
|
||
if isinstance(data, dict):
|
||
# Fallback: einzelnes Objekt in Liste wrappen
|
||
return [data]
|
||
return []
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/sentences", methods=["GET"])
|
||
def get_object_sentences(obj_id: str):
|
||
"""
|
||
Gibt alle bisher erzeugten KI-Sätze zu einem Objekt zurück.
|
||
"""
|
||
sentences = load_sentences_for_object(obj_id)
|
||
return jsonify({"object_id": obj_id, "sentences": sentences})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/generate_sentence", methods=["POST"])
|
||
def generate_object_sentence(obj_id: str):
|
||
"""
|
||
Erzeugt mit Llama 3.1 einen neuen englischen Frage-Antwort-Satz
|
||
zu einem Objekt basierend auf den KI-Details und bisherigen Sätzen.
|
||
Speichert alle Sätze in sentence_object/<obj_id>.txt.
|
||
"""
|
||
# Objekt-Metadaten laden (inkl. KI-Details)
|
||
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
|
||
if not meta_path.exists():
|
||
return jsonify({"error": "Object not found"}), 404
|
||
|
||
try:
|
||
meta = json.loads(meta_path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return jsonify({"error": "Could not read meta file"}), 500
|
||
|
||
# Relevante Felder für den Satz (KI-Details)
|
||
details = {
|
||
"label_en": meta.get("label_en"),
|
||
"label_de": meta.get("label_de"),
|
||
"label_se": meta.get("label_se"),
|
||
"color_en": meta.get("color_en"),
|
||
"adjective_en": meta.get("adjective_en"),
|
||
"action_verb_en": meta.get("action_verb_en"),
|
||
"preposition_en": meta.get("preposition_en"),
|
||
"relative_position_en": meta.get("relative_position_en"),
|
||
"season_en": meta.get("season_en"),
|
||
"title_de": meta.get("title_de"),
|
||
"position_de": meta.get("position_de"),
|
||
"action_de": meta.get("action_de"),
|
||
"condition_de": meta.get("condition_de"),
|
||
}
|
||
|
||
# Bisherige Sätze laden
|
||
previous_sentences = load_sentences_for_object(obj_id)
|
||
|
||
# Prompt laden
|
||
prompt_template = read_prompt(
|
||
PROMPTS_DIR / "create_sentence.txt",
|
||
"Create one new English question_en and answer_en as JSON, based on the object details and avoiding previous sentences.",
|
||
)
|
||
|
||
# Vollständigen Prompt zusammensetzen
|
||
try:
|
||
details_json = json.dumps(details, ensure_ascii=False, indent=2)
|
||
prev_json = json.dumps(previous_sentences, ensure_ascii=False, indent=2)
|
||
except Exception:
|
||
details_json = str(details)
|
||
prev_json = str(previous_sentences)
|
||
|
||
prompt = (
|
||
f"{prompt_template}\n\n"
|
||
f"OBJECT_DETAILS_JSON:\n{details_json}\n\n"
|
||
f"PREVIOUS_SENTENCES_JSON:\n{prev_json}\n"
|
||
)
|
||
|
||
# LLM-Aufruf (nur Text, kein Bild nötig)
|
||
try:
|
||
response = ollama.chat(
|
||
model="llama3.1",
|
||
messages=[{"role": "user", "content": prompt}],
|
||
format="json",
|
||
options={"temperature": 0.2},
|
||
)
|
||
except Exception as e:
|
||
return jsonify({"error": f"LLM request failed: {e}"}), 500
|
||
|
||
content = response.get("message", {}).get("content")
|
||
if not content:
|
||
return jsonify({"error": "Empty response from LLM"}), 500
|
||
|
||
try:
|
||
ai_data = json.loads(content)
|
||
except Exception:
|
||
# Fallback: erstes JSON-Objekt extrahieren
|
||
try:
|
||
start = content.index("{")
|
||
end = content.rindex("}") + 1
|
||
ai_data = json.loads(content[start:end])
|
||
except Exception:
|
||
return jsonify({"error": "Could not parse JSON from LLM response"}), 500
|
||
|
||
question_simple = (ai_data.get("question_simple_en") or "").strip()
|
||
answer_simple = (ai_data.get("answer_simple_en") or "").strip()
|
||
question_advanced = (ai_data.get("question_advanced_en") or "").strip()
|
||
answer_advanced = (ai_data.get("answer_advanced_en") or "").strip()
|
||
|
||
if not question_simple or not answer_simple or not question_advanced or not answer_advanced:
|
||
return jsonify({"error": "LLM response missing required fields (question_simple_en, answer_simple_en, question_advanced_en, answer_advanced_en)"}), 500
|
||
|
||
# Neuen Satz mit Timestamp anreichern
|
||
entry = {
|
||
"object_id": obj_id,
|
||
"created_at": datetime.now().isoformat(),
|
||
"question_simple_en": question_simple,
|
||
"answer_simple_en": answer_simple,
|
||
"question_advanced_en": question_advanced,
|
||
"answer_advanced_en": answer_advanced,
|
||
}
|
||
|
||
sentences = previous_sentences + [entry]
|
||
|
||
# In Datei speichern
|
||
SENTENCE_DIR.mkdir(parents=True, exist_ok=True)
|
||
sentence_path = SENTENCE_DIR / f"{obj_id}.txt"
|
||
sentence_path.write_text(json.dumps(sentences, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
return jsonify({"object_id": obj_id, "sentence": entry, "count": len(sentences)})
|
||
|
||
|
||
@app.route("/api/image/save", methods=["POST"])
|
||
def save_image():
|
||
"""
|
||
Markiert ein Bild und alle zugehörigen Objekte als 'gespeichert',
|
||
indem der Dateiname um '_saved' erweitert wird und die Metadaten
|
||
der Objekte angepasst werden.
|
||
"""
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
filename = data.get("filename")
|
||
if not filename:
|
||
return jsonify({"error": "Missing filename"}), 400
|
||
|
||
src_path = PICTURES_DIR / filename
|
||
if not src_path.exists():
|
||
return jsonify({"error": "Image not found"}), 404
|
||
|
||
if src_path.stem.endswith("_saved"):
|
||
return jsonify({"error": "Image already saved"}), 400
|
||
|
||
new_name = f"{src_path.stem}_saved{src_path.suffix}"
|
||
dst_path = PICTURES_DIR / new_name
|
||
|
||
# Bild umbenennen
|
||
src_path.rename(dst_path)
|
||
|
||
# Zugehörige Objekte aktualisieren
|
||
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
|
||
for meta_file in OBJECTS_DIR.glob("*.txt"):
|
||
try:
|
||
meta = json.loads(meta_file.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
continue
|
||
|
||
if meta.get("source_filename") != filename:
|
||
continue
|
||
|
||
meta["source_filename"] = new_name
|
||
meta_file.write_text(
|
||
json.dumps(meta, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
return jsonify({"old_name": filename, "new_name": new_name})
|
||
|
||
|
||
@app.route("/api/crop", methods=["POST"])
|
||
def crop_image():
|
||
"""
|
||
Erwartet JSON:
|
||
{
|
||
"filename": "bild.jpg",
|
||
"selections": [
|
||
{
|
||
"number": 1,
|
||
"mode": "rect" | "polygon",
|
||
"bbox": {"x": 10, "y": 20, "width": 200, "height": 150}, // für rect
|
||
"polygon": [{"x": 10, "y": 20}, ...] // für polygon
|
||
},
|
||
...
|
||
],
|
||
"title_de": "...",
|
||
"position_de": "...",
|
||
"action_de": "...",
|
||
"condition_de": "..."
|
||
}
|
||
|
||
ODER (Legacy-Format für Kompatibilität):
|
||
{
|
||
"filename": "bild.jpg",
|
||
"mode": "rect" | "polygon",
|
||
"x": 10, "y": 20, "width": 200, "height": 150, // für rect
|
||
"polygon": [{"x": 10, "y": 20}, ...] // für polygon
|
||
}
|
||
"""
|
||
data = request.get_json(force=True, silent=True) or {}
|
||
|
||
if "filename" not in data:
|
||
return jsonify({"error": "Missing filename"}), 400
|
||
|
||
src_path = PICTURES_DIR / data["filename"]
|
||
if src_path.stem.endswith("_saved"):
|
||
return jsonify({"error": "Cannot crop on saved image"}), 400
|
||
if not src_path.exists():
|
||
return jsonify({"error": "Source image not found"}), 404
|
||
|
||
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# UUID für Bild + Metadaten
|
||
obj_id = uuid4().hex
|
||
timestamp = datetime.now().isoformat()
|
||
out_image_name = f"{obj_id}{src_path.suffix}"
|
||
out_image_path = OBJECTS_DIR / out_image_name
|
||
out_meta_path = OBJECTS_DIR / f"{obj_id}.txt"
|
||
|
||
# Prüfen, ob neues Format (selections) oder Legacy-Format
|
||
selections = data.get("selections")
|
||
|
||
if selections and isinstance(selections, list) and len(selections) > 0:
|
||
# Neues Format: mehrere Auswahlen
|
||
processed_selections = []
|
||
|
||
with Image.open(src_path) as img:
|
||
img_w, img_h = img.size
|
||
|
||
for sel in selections:
|
||
sel_mode = sel.get("mode")
|
||
if sel_mode == "rect":
|
||
bbox = sel.get("bbox")
|
||
if not bbox:
|
||
continue
|
||
try:
|
||
x = int(bbox.get("x", 0))
|
||
y = int(bbox.get("y", 0))
|
||
w = int(bbox.get("width", 0))
|
||
h = int(bbox.get("height", 0))
|
||
except (ValueError, TypeError):
|
||
continue
|
||
|
||
if w <= 0 or h <= 0:
|
||
continue
|
||
|
||
x2 = min(x + w, img_w)
|
||
y2 = min(y + h, img_h)
|
||
x1 = max(0, x)
|
||
y1 = max(0, y)
|
||
|
||
if x1 >= x2 or y1 >= y2:
|
||
continue
|
||
|
||
processed_selections.append({
|
||
"number": sel.get("number", len(processed_selections) + 1),
|
||
"mode": "rect",
|
||
"bbox": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1},
|
||
})
|
||
|
||
elif sel_mode == "polygon":
|
||
polygon = sel.get("polygon")
|
||
if not isinstance(polygon, list) or len(polygon) < 3:
|
||
continue
|
||
|
||
try:
|
||
xs = [int(p.get("x", 0)) for p in polygon]
|
||
ys = [int(p.get("y", 0)) for p in polygon]
|
||
except (KeyError, TypeError, ValueError):
|
||
continue
|
||
|
||
min_x = max(min(xs), 0)
|
||
min_y = max(min(ys), 0)
|
||
max_x = min(max(xs), img_w)
|
||
max_y = min(max(ys), img_h)
|
||
|
||
if min_x >= max_x or min_y >= max_y:
|
||
continue
|
||
|
||
processed_selections.append({
|
||
"number": sel.get("number", len(processed_selections) + 1),
|
||
"mode": "polygon",
|
||
"polygon": polygon,
|
||
"bbox": {"x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y},
|
||
})
|
||
|
||
if not processed_selections:
|
||
return jsonify({"error": "No valid selections provided"}), 400
|
||
|
||
# Erstes Bild aus erster Auswahl erstellen
|
||
first_sel = processed_selections[0]
|
||
try:
|
||
with Image.open(src_path) as img:
|
||
if first_sel["mode"] == "rect":
|
||
bbox = first_sel["bbox"]
|
||
x1 = bbox["x"]
|
||
y1 = bbox["y"]
|
||
x2 = x1 + bbox["width"]
|
||
y2 = y1 + bbox["height"]
|
||
cropped = img.crop((x1, y1, x2, y2))
|
||
else: # polygon - verwende Bounding-Box für das Bild
|
||
bbox = first_sel["bbox"]
|
||
x1 = bbox["x"]
|
||
y1 = bbox["y"]
|
||
x2 = x1 + bbox["width"]
|
||
y2 = y1 + bbox["height"]
|
||
cropped = img.crop((x1, y1, x2, y2))
|
||
|
||
# Bild speichern (Format beibehalten)
|
||
img_format = img.format or "PNG"
|
||
cropped.save(out_image_path, format=img_format)
|
||
print(f"[crop_image] Bild gespeichert: {out_image_path} (Größe: {cropped.size}, Format: {img_format})")
|
||
except Exception as e:
|
||
print(f"[crop_image] Fehler beim Erstellen des Bildes: {e}")
|
||
return jsonify({"error": f"Failed to create image: {e}"}), 500
|
||
|
||
# Metadaten mit allen Auswahlen speichern
|
||
meta = {
|
||
"id": obj_id,
|
||
"created_at": timestamp,
|
||
"source_filename": data["filename"],
|
||
"image_file": out_image_name,
|
||
"hierarchy": 1,
|
||
"parent_id": None,
|
||
"title_de": data.get("title_de", ""),
|
||
"position_de": data.get("position_de", ""),
|
||
"action_de": data.get("action_de", ""),
|
||
"condition_de": data.get("condition_de", ""),
|
||
"selections": processed_selections,
|
||
}
|
||
|
||
else:
|
||
# Legacy-Format: einzelne Auswahl (für Rückwärtskompatibilität)
|
||
mode = data.get("mode", "rect")
|
||
if mode not in {"rect", "polygon"}:
|
||
return jsonify({"error": "Invalid mode"}), 400
|
||
|
||
with Image.open(src_path) as img:
|
||
img_w, img_h = img.size
|
||
if mode == "rect":
|
||
try:
|
||
x = int(data["x"])
|
||
y = int(data["y"])
|
||
w = int(data["width"])
|
||
h = int(data["height"])
|
||
except (ValueError, TypeError, KeyError):
|
||
return jsonify({"error": "Invalid rectangle coordinates"}), 400
|
||
|
||
if w <= 0 or h <= 0:
|
||
return jsonify({"error": "Width and height must be positive"}), 400
|
||
|
||
x2 = min(x + w, img_w)
|
||
y2 = min(y + h, img_h)
|
||
x1 = max(0, x)
|
||
y1 = max(0, y)
|
||
|
||
if x1 >= x2 or y1 >= y2:
|
||
return jsonify({"error": "Crop area outside of image"}), 400
|
||
|
||
cropped = img.crop((x1, y1, x2, y2))
|
||
bbox = {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1}
|
||
else: # polygon
|
||
polygon = data.get("polygon") or []
|
||
if not isinstance(polygon, list) or len(polygon) < 3:
|
||
return jsonify({"error": "Polygon must have at least 3 points"}), 400
|
||
|
||
try:
|
||
xs = [int(p["x"]) for p in polygon]
|
||
ys = [int(p["y"]) for p in polygon]
|
||
except (KeyError, TypeError, ValueError):
|
||
return jsonify({"error": "Invalid polygon coordinates"}), 400
|
||
|
||
min_x = max(min(xs), 0)
|
||
min_y = max(min(ys), 0)
|
||
max_x = min(max(xs), img_w)
|
||
max_y = min(max(ys), img_h)
|
||
|
||
if min_x >= max_x or min_y >= max_y:
|
||
return jsonify({"error": "Polygon bounding box outside of image"}), 400
|
||
|
||
cropped = img.crop((min_x, min_y, max_x, max_y))
|
||
bbox = {
|
||
"x": min_x,
|
||
"y": min_y,
|
||
"width": max_x - min_x,
|
||
"height": max_y - min_y,
|
||
}
|
||
|
||
cropped.save(out_image_path)
|
||
|
||
# Metadaten als JSON in .txt speichern
|
||
meta = {
|
||
"id": obj_id,
|
||
"created_at": timestamp,
|
||
"source_filename": data["filename"],
|
||
"mode": mode,
|
||
"bbox": bbox,
|
||
"image_file": out_image_name,
|
||
"hierarchy": 1,
|
||
"parent_id": None,
|
||
"title_de": data.get("title_de", ""),
|
||
"position_de": data.get("position_de", ""),
|
||
"action_de": data.get("action_de", ""),
|
||
"condition_de": data.get("condition_de", ""),
|
||
}
|
||
if mode == "polygon":
|
||
meta["polygon"] = data.get("polygon")
|
||
|
||
out_meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
return jsonify(
|
||
{
|
||
"id": obj_id,
|
||
"image_file": out_image_name,
|
||
"meta_file": out_meta_path.name,
|
||
}
|
||
)
|
||
|
||
|
||
# ── Claude / Directus Helpers ─────────────────────────────────────────────────
|
||
|
||
def _ensure_junction(collection: str, field1: str, field2: str, token: str):
|
||
"""Create a simple M2M junction collection in Directus if it doesn't exist."""
|
||
_, status = _directus("GET", f"/collections/{collection}", token)
|
||
if status == 200:
|
||
return
|
||
body = {
|
||
"collection": collection,
|
||
"schema": {},
|
||
"meta": {"hidden": True, "icon": "import_export"},
|
||
"fields": [
|
||
{
|
||
"field": "id",
|
||
"type": "integer",
|
||
"schema": {"is_primary_key": True, "has_auto_increment": True},
|
||
"meta": {"hidden": True},
|
||
},
|
||
{"field": field1, "type": "uuid", "schema": {}, "meta": {"hidden": True}},
|
||
{"field": field2, "type": "uuid", "schema": {}, "meta": {"hidden": True}},
|
||
],
|
||
}
|
||
_directus("POST", "/collections", token, body)
|
||
|
||
|
||
def _ensure_link(collection: str, match: dict, payload: dict, token: str):
|
||
"""Insert a junction row only if it doesn't already exist."""
|
||
qs = "&".join(
|
||
f"filter[{k}][_eq]={urllib.parse.quote(str(v), safe='')}"
|
||
for k, v in match.items()
|
||
)
|
||
data, status = _directus("GET", f"/items/{collection}?{qs}&limit=1", token)
|
||
if status == 200 and data.get("data"):
|
||
return # already linked
|
||
_directus("POST", f"/items/{collection}", token, payload)
|
||
|
||
|
||
def _find_or_create_word(title_de: str, level: int, token: str):
|
||
"""Return (word_id, is_new). Creates word with status=draft if missing."""
|
||
enc = urllib.parse.quote(title_de, safe="")
|
||
data, status = _directus(
|
||
"GET", f"/items/words?filter[title_de][_eq]={enc}&fields=id&limit=1", token
|
||
)
|
||
if status == 200 and data.get("data"):
|
||
return data["data"][0]["id"], False
|
||
|
||
body = {"status": "draft", "title_de": title_de, "level": level}
|
||
data, status = _directus("POST", "/items/words", token, body)
|
||
if status in (200, 201):
|
||
return data["data"]["id"], True
|
||
raise RuntimeError(f"Word creation failed ({status}): {data}")
|
||
|
||
|
||
def _find_or_create_question(question_de: str, answer_de: str, level: int,
|
||
short_answer_id, short_answer_de: str, obj_id: str, token: str):
|
||
"""Return (question_id, is_new). Creates question with status=draft if missing."""
|
||
enc = urllib.parse.quote(question_de, safe="")
|
||
data, status = _directus(
|
||
"GET",
|
||
f"/items/questions?filter[question_de][_eq]={enc}&fields=id&limit=1",
|
||
token,
|
||
)
|
||
if status == 200 and data.get("data"):
|
||
return data["data"][0]["id"], False
|
||
|
||
body = {
|
||
"status": "draft",
|
||
"question_de": question_de,
|
||
"answer_de": answer_de,
|
||
"level": level,
|
||
"object": obj_id,
|
||
}
|
||
if short_answer_id:
|
||
body["short_answer"] = short_answer_id
|
||
if short_answer_de:
|
||
body["short_answer_de"] = short_answer_de
|
||
|
||
data, status = _directus("POST", "/items/questions", token, body)
|
||
if status in (200, 201):
|
||
return data["data"]["id"], True
|
||
raise RuntimeError(f"Question creation failed ({status}): {data}")
|
||
|
||
|
||
# ── Generate & Publish endpoints ──────────────────────────────────────────────
|
||
|
||
@app.route("/api/object/<obj_id>/generate_questions", methods=["POST"])
|
||
def generate_questions(obj_id: str):
|
||
"""
|
||
1. Holt Objekt + Elternobjekt aus Directus
|
||
2. Füllt Prompt-Platzhalter
|
||
3. Ruft Claude Haiku auf
|
||
4. Speichert Wörter + Fragen als Entwurf in Directus
|
||
"""
|
||
token = request.headers.get("Authorization", "")
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
prompt_template = (body.get("prompt") or "").strip()
|
||
|
||
if not prompt_template:
|
||
return jsonify({"error": "Missing prompt"}), 400
|
||
if not ANTHROPIC_API_KEY:
|
||
return jsonify({"error": "ANTHROPIC_API_KEY not configured on server"}), 500
|
||
|
||
# Objekt laden
|
||
obj_resp, s = _directus("GET", f"/items/objects/{obj_id}?fields=id,user_notes,parent", token)
|
||
if s != 200:
|
||
return jsonify({"error": "Object not found"}), 404
|
||
obj = obj_resp.get("data") or {}
|
||
|
||
# Elternobjekt laden
|
||
parent_notes = ""
|
||
if obj.get("parent"):
|
||
p_resp, _ = _directus("GET", f"/items/objects/{obj['parent']}?fields=id,user_notes", token)
|
||
parent_notes = ((p_resp.get("data") or {}).get("user_notes") or "")
|
||
|
||
# Platzhalter ersetzen
|
||
prompt = (
|
||
prompt_template
|
||
.replace("{user-notes_object}", obj.get("user_notes") or "")
|
||
.replace("{user-notes_parentobject}", parent_notes)
|
||
)
|
||
|
||
# Claude Haiku aufrufen
|
||
try:
|
||
client = _anthropic_sdk.Anthropic(api_key=ANTHROPIC_API_KEY)
|
||
msg = client.messages.create(
|
||
model="claude-haiku-4-5-20251001",
|
||
max_tokens=4096,
|
||
messages=[{"role": "user", "content": prompt}],
|
||
)
|
||
raw = msg.content[0].text
|
||
except Exception as e:
|
||
return jsonify({"error": f"Claude API error: {e}"}), 500
|
||
|
||
# JSON parsen
|
||
try:
|
||
ai = json.loads(raw)
|
||
except Exception:
|
||
try:
|
||
start = raw.index("{")
|
||
end = raw.rindex("}") + 1
|
||
ai = json.loads(raw[start:end])
|
||
except Exception:
|
||
return jsonify({"error": "Invalid JSON from AI", "raw": raw[:400]}), 500
|
||
|
||
levels = ai.get("levels", [])
|
||
if not levels:
|
||
return jsonify({"error": "No levels in AI response"}), 500
|
||
|
||
stats = {
|
||
"words_created": 0,
|
||
"words_linked": 0,
|
||
"questions_created": 0,
|
||
"questions_linked": 0,
|
||
}
|
||
|
||
def _sanitize_word(raw: str) -> list[str]:
|
||
"""Zerlegt kommagetrennte Einträge und gibt nur echte Einzelwörter zurück."""
|
||
tokens = []
|
||
for part in raw.replace(";", ",").split(","):
|
||
part = part.strip().strip("\"'")
|
||
if not part:
|
||
continue
|
||
words_in_part = part.split()
|
||
if len(words_in_part) == 1:
|
||
tokens.append(words_in_part[0])
|
||
# Mehrwortige Einträge → überspringen (KI-Fehler)
|
||
return tokens
|
||
|
||
# Bestehende Verlinkungen vorab laden – ein einziger GET auf das Objekt
|
||
obj_links_data, _ = _directus(
|
||
"GET",
|
||
f"/items/objects/{obj_id}"
|
||
f"?fields[]=linked_words.words_id"
|
||
f"&fields[]=linked_questions.questions_id",
|
||
token,
|
||
)
|
||
obj_data = obj_links_data.get("data") or {}
|
||
existing_word_ids: set[str] = {lw["words_id"] for lw in (obj_data.get("linked_words") or [])}
|
||
existing_question_ids: set[str] = {lq["questions_id"] for lq in (obj_data.get("linked_questions") or [])}
|
||
|
||
# Alle eindeutigen Wörter aus allen Leveln vorab sammeln und einmalig laden
|
||
all_words_by_level: dict[str, int] = {} # title_de → first level seen
|
||
for lvl in levels:
|
||
level = int(lvl.get("level") or 1)
|
||
raw_words = lvl.get("words", []) + lvl.get("distractor_words", []) + [lvl.get("short_answer", "")]
|
||
for raw in raw_words:
|
||
for w in _sanitize_word(str(raw)):
|
||
if w and w not in all_words_by_level:
|
||
all_words_by_level[w] = level
|
||
|
||
# Wörter einmalig anlegen / finden (globaler Cache über alle Level)
|
||
global_word_map: dict[str, str] = {} # title_de → id
|
||
new_word_links: list[dict] = []
|
||
for w, lvl_num in all_words_by_level.items():
|
||
try:
|
||
wid, is_new = _find_or_create_word(w, lvl_num, token)
|
||
global_word_map[w] = wid
|
||
if wid not in existing_word_ids:
|
||
new_word_links.append({"words_id": wid})
|
||
existing_word_ids.add(wid)
|
||
stats["words_created" if is_new else "words_linked"] += 1
|
||
except Exception as e:
|
||
print(f"[generate_questions] word error '{w}': {e}")
|
||
|
||
# Alle neuen Wort-Verlinkungen in einem Batch via natives M2M
|
||
if new_word_links:
|
||
_directus("PATCH", f"/items/objects/{obj_id}", token,
|
||
{"linked_words": {"create": new_word_links}})
|
||
|
||
new_question_links: list[dict] = []
|
||
|
||
for lvl in levels:
|
||
level = int(lvl.get("level") or 1)
|
||
q_de = (lvl.get("question") or "").strip()
|
||
a_de = (lvl.get("answer") or "").strip()
|
||
short_text = (lvl.get("short_answer") or "").strip()
|
||
words_list = [t for raw in lvl.get("words", []) for t in _sanitize_word(str(raw))]
|
||
distractor_list = [t for raw in lvl.get("distractor_words", []) for t in _sanitize_word(str(raw))]
|
||
|
||
if not q_de:
|
||
continue
|
||
|
||
short_answer_id = global_word_map.get(short_text) if short_text else None
|
||
|
||
# Frage anlegen / verknüpfen
|
||
try:
|
||
q_id, q_is_new = _find_or_create_question(q_de, a_de, level, short_answer_id, short_text, obj_id, token)
|
||
except Exception as e:
|
||
print(f"[generate_questions] question error level {level}: {e}")
|
||
continue
|
||
|
||
stats["questions_created" if q_is_new else "questions_linked"] += 1
|
||
|
||
# Frage ↔ Objekt (für Batch am Ende sammeln)
|
||
if q_id not in existing_question_ids:
|
||
new_question_links.append({"questions_id": q_id})
|
||
existing_question_ids.add(q_id)
|
||
|
||
# related_words + distractor_words nur für neue Fragen (batch, natives M2M)
|
||
if q_is_new:
|
||
q_patch: dict = {}
|
||
rw = [{"words_id": global_word_map[w]} for w in words_list if w in global_word_map]
|
||
if rw:
|
||
q_patch["related_words"] = {"create": rw}
|
||
dw = [{"words_id": global_word_map[w]} for w in distractor_list if w in global_word_map]
|
||
if dw:
|
||
q_patch["distractor_words"] = {"create": dw}
|
||
if q_patch:
|
||
_directus("PATCH", f"/items/questions/{q_id}", token, q_patch)
|
||
|
||
# Alle neuen Fragen-Verlinkungen in einem Batch via natives M2M
|
||
if new_question_links:
|
||
_directus("PATCH", f"/items/objects/{obj_id}", token,
|
||
{"linked_questions": {"create": new_question_links}})
|
||
|
||
print(f"[generate_questions] obj={obj_id} stats={stats}")
|
||
return jsonify({"ok": True, "object_id": obj_id, "stats": stats})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/publish_questions", methods=["POST"])
|
||
def publish_questions(obj_id: str):
|
||
"""Ändert Status aller verknüpften Entwurfs-Fragen und -Wörter auf 'published'."""
|
||
token = request.headers.get("Authorization", "")
|
||
|
||
# Verknüpfte Fragen
|
||
q_resp, _ = _directus(
|
||
"GET",
|
||
f"/items/questions_objects?filter[objects_id][_eq]={obj_id}&fields=questions_id&limit=200",
|
||
token,
|
||
)
|
||
q_ids = [e["questions_id"] for e in (q_resp.get("data") or [])]
|
||
|
||
# Verknüpfte Wörter
|
||
w_resp, _ = _directus(
|
||
"GET",
|
||
f"/items/words_objects?filter[objects_id][_eq]={obj_id}&fields=words_id&limit=2000",
|
||
token,
|
||
)
|
||
w_ids = [e["words_id"] for e in (w_resp.get("data") or [])]
|
||
|
||
published_q = 0
|
||
published_w = 0
|
||
|
||
for q_id in q_ids:
|
||
_, s = _directus("PATCH", f"/items/questions/{q_id}", token, {"status": "published"})
|
||
if s == 200:
|
||
published_q += 1
|
||
|
||
for w_id in w_ids:
|
||
_, s = _directus("PATCH", f"/items/words/{w_id}", token, {"status": "published"})
|
||
if s == 200:
|
||
published_w += 1
|
||
|
||
return jsonify({
|
||
"ok": True,
|
||
"published_questions": published_q,
|
||
"published_words": published_w,
|
||
})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/questions", methods=["GET"])
|
||
def get_object_questions_list(obj_id: str):
|
||
"""Gibt alle verknüpften Fragen eines Objekts zurück (mit short_answer_de + distractor_words)."""
|
||
token = request.headers.get("Authorization", "")
|
||
|
||
# Schritt 1: Frage-IDs aus Junction
|
||
junc, _ = _directus("GET",
|
||
f"/items/questions_objects?filter[objects_id][_eq]={obj_id}&fields=questions_id&limit=200", token)
|
||
q_ids = [e["questions_id"] for e in (junc.get("data") or []) if e.get("questions_id")]
|
||
if not q_ids:
|
||
return jsonify({"data": []})
|
||
|
||
# Schritt 2: Fragen laden (inkl. short_answer_de) – archivierte ausschließen
|
||
ids_param = urllib.parse.quote(",".join(q_ids), safe="")
|
||
q_data, _ = _directus("GET",
|
||
f"/items/questions?filter[id][_in]={ids_param}&filter[status][_neq]=archived&fields=id,question_de,answer_de,short_answer_de,level,status&limit=200", token)
|
||
items = sorted(q_data.get("data") or [], key=lambda x: x.get("level") or 0)
|
||
|
||
# Schritt 3: Distractor-Wörter pro Frage (Bulk)
|
||
dw_junc, _ = _directus("GET",
|
||
f"/items/questions_distractor_words?filter[questions_id][_in]={ids_param}&fields=questions_id,words_id&limit=5000", token)
|
||
dw_entries = dw_junc.get("data") or []
|
||
|
||
# Wort-IDs sammeln und Titel laden
|
||
all_word_ids = list({e["words_id"] for e in dw_entries if e.get("words_id")})
|
||
word_title_map: dict[str, str] = {}
|
||
if all_word_ids:
|
||
wids_param = urllib.parse.quote(",".join(all_word_ids), safe="")
|
||
w_data, _ = _directus("GET",
|
||
f"/items/words?filter[id][_in]={wids_param}&fields=id,title_de&limit=5000", token)
|
||
word_title_map = {w["id"]: w["title_de"] for w in (w_data.get("data") or [])}
|
||
|
||
# Distractor-Wörter je Frage gruppieren
|
||
dw_by_question: dict[str, list[str]] = {}
|
||
for e in dw_entries:
|
||
qid = e.get("questions_id")
|
||
wid = e.get("words_id")
|
||
if qid and wid and wid in word_title_map:
|
||
dw_by_question.setdefault(qid, []).append(word_title_map[wid])
|
||
|
||
for item in items:
|
||
item["distractor_words"] = dw_by_question.get(item["id"], [])
|
||
|
||
return jsonify({"data": items})
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/words", methods=["GET"])
|
||
def get_object_words_list(obj_id: str):
|
||
"""Gibt alle verknüpften Wörter eines Objekts zurück (2-Schritt-Query)."""
|
||
token = request.headers.get("Authorization", "")
|
||
junc, _ = _directus("GET",
|
||
f"/items/words_objects?filter[objects_id][_eq]={obj_id}&fields=words_id&limit=2000", token)
|
||
w_ids = [e["words_id"] for e in (junc.get("data") or []) if e.get("words_id")]
|
||
if not w_ids:
|
||
return jsonify({"data": []})
|
||
ids_param = urllib.parse.quote(",".join(w_ids), safe="")
|
||
w_data, _ = _directus("GET",
|
||
f"/items/words?filter[id][_in]={ids_param}&filter[status][_neq]=archived&fields=id,title_de,level,status&limit=2000", token)
|
||
items = sorted(w_data.get("data") or [], key=lambda x: x.get("title_de") or "")
|
||
return jsonify({"data": items})
|
||
|
||
|
||
def _delete_junction_rows(collection: str, field: str, value: str, token: str):
|
||
"""Löscht alle Junction-Zeilen für einen gegebenen Fremdschlüssel."""
|
||
data, s = _directus("GET",
|
||
f"/items/{collection}?filter[{field}][_eq]={value}&fields=id&limit=5000", token)
|
||
ids = [e["id"] for e in (data.get("data") or []) if e.get("id")]
|
||
if ids:
|
||
_directus("DELETE", f"/items/{collection}", token, ids)
|
||
|
||
|
||
@app.route("/api/question/<q_id>", methods=["DELETE"])
|
||
def delete_question_item(q_id: str):
|
||
"""Löscht eine Frage + alle zugehörigen Junction-Zeilen aus Directus."""
|
||
token = request.headers.get("Authorization", "")
|
||
_delete_junction_rows("questions_objects", "questions_id", q_id, token)
|
||
_delete_junction_rows("questions_distractor_words", "questions_id", q_id, token)
|
||
_delete_junction_rows("questions_words", "questions_id", q_id, token)
|
||
_, status = _directus("DELETE", f"/items/questions/{q_id}", token)
|
||
if status in (200, 204):
|
||
return jsonify({"ok": True})
|
||
return jsonify({"error": "Delete failed"}), status
|
||
|
||
|
||
@app.route("/api/word/<w_id>", methods=["DELETE"])
|
||
def delete_word_item(w_id: str):
|
||
"""Löscht ein Wort + alle zugehörigen Junction-Zeilen aus Directus."""
|
||
token = request.headers.get("Authorization", "")
|
||
_delete_junction_rows("words_objects", "words_id", w_id, token)
|
||
_delete_junction_rows("questions_words", "words_id", w_id, token)
|
||
_delete_junction_rows("questions_distractor_words", "words_id", w_id, token)
|
||
_, status = _directus("DELETE", f"/items/words/{w_id}", token)
|
||
if status in (200, 204):
|
||
return jsonify({"ok": True})
|
||
return jsonify({"error": "Delete failed"}), status
|
||
|
||
|
||
@app.route("/api/object/<obj_id>/purge-orphans", methods=["POST"])
|
||
def purge_orphan_junctions(obj_id: str):
|
||
"""
|
||
Bereinigt verwaiste Junction-Einträge für ein Objekt:
|
||
Entfernt Zeilen aus questions_objects/words_objects deren Frage/Wort nicht mehr existiert.
|
||
"""
|
||
token = request.headers.get("Authorization", "")
|
||
removed = 0
|
||
|
||
for junc_col, fk_field, item_col in [
|
||
("questions_objects", "questions_id", "questions"),
|
||
("words_objects", "words_id", "words"),
|
||
]:
|
||
junc_data, _ = _directus("GET",
|
||
f"/items/{junc_col}?filter[objects_id][_eq]={obj_id}&fields=id,{fk_field}&limit=5000", token)
|
||
for row in (junc_data.get("data") or []):
|
||
fk_val = row.get(fk_field)
|
||
if not fk_val:
|
||
continue
|
||
item_data, s = _directus("GET", f"/items/{item_col}/{fk_val}?fields=id,status", token)
|
||
item = item_data.get("data") or {}
|
||
if s != 200 or not item or item.get("status") == "archived":
|
||
_directus("DELETE", f"/items/{junc_col}/{row['id']}", token)
|
||
removed += 1
|
||
|
||
return jsonify({"ok": True, "orphans_removed": removed})
|
||
|
||
|
||
@app.route("/api/purge-all-orphans", methods=["POST"])
|
||
def purge_all_orphans():
|
||
"""
|
||
Bereinigt verwaiste Junction-Einträge für ALLE Objekte auf einmal.
|
||
Lädt alle Junction-Zeilen und prüft, ob das referenzierte Item noch existiert.
|
||
"""
|
||
token = request.headers.get("Authorization", "")
|
||
removed = 0
|
||
details = []
|
||
|
||
for junc_col, fk_field, item_col in [
|
||
("questions_objects", "questions_id", "questions"),
|
||
("words_objects", "words_id", "words"),
|
||
]:
|
||
junc_data, _ = _directus("GET",
|
||
f"/items/{junc_col}?fields=id,{fk_field}&limit=10000", token)
|
||
rows = junc_data.get("data") or []
|
||
|
||
fk_ids = list({row[fk_field] for row in rows if row.get(fk_field)})
|
||
if not fk_ids:
|
||
details.append({"collection": junc_col, "junction_rows": 0, "existing": 0, "orphans": 0})
|
||
continue
|
||
|
||
# fetch existing AND check all statuses (fetch without status filter first)
|
||
ids_param = urllib.parse.quote(",".join(fk_ids), safe="")
|
||
all_data, _ = _directus("GET",
|
||
f"/items/{item_col}?filter[id][_in]={ids_param}&fields=id,status&limit=10000", token)
|
||
all_items = all_data.get("data") or []
|
||
existing_ids = {e["id"] for e in all_items if e.get("status") not in ("archived", "deleted")}
|
||
status_summary = {}
|
||
for e in all_items:
|
||
s = e.get("status", "unknown")
|
||
status_summary[s] = status_summary.get(s, 0) + 1
|
||
|
||
orphan_junc_ids = [row["id"] for row in rows
|
||
if row.get(fk_field) and row[fk_field] not in existing_ids]
|
||
if orphan_junc_ids:
|
||
_directus("DELETE", f"/items/{junc_col}", token, orphan_junc_ids)
|
||
removed += len(orphan_junc_ids)
|
||
|
||
details.append({
|
||
"collection": junc_col,
|
||
"junction_rows": len(rows),
|
||
"fk_unique": len(fk_ids),
|
||
"items_found": len(all_items),
|
||
"status_breakdown": status_summary,
|
||
"existing_active": len(existing_ids),
|
||
"orphans_removed": len(orphan_junc_ids),
|
||
})
|
||
|
||
return jsonify({"ok": True, "orphans_removed": removed, "details": details})
|
||
|
||
|
||
@app.route("/api/fix-distractor-field", methods=["POST"])
|
||
def fix_distractor_field():
|
||
"""Setzt special=m2m auf questions.distractor_words (einmalig)."""
|
||
token = request.headers.get("Authorization", "")
|
||
# Directus erfordert den vollen Payload beim Patchen von special
|
||
payload = {
|
||
"meta": {
|
||
"special": ["m2m"],
|
||
"interface": "list-m2m",
|
||
"options": {"template": "{{words_id.title_de}}"},
|
||
"hidden": False,
|
||
"note": "Ablenker-Wörter (thematisch passend, aber nicht die Antwort)",
|
||
}
|
||
}
|
||
data, status = _directus("PATCH", "/fields/questions/distractor_words", token, payload)
|
||
return jsonify({"ok": status in (200, 201), "status": status, "data": data})
|
||
|
||
|
||
@app.route("/api/setup-schema", methods=["POST"])
|
||
def setup_directus_schema():
|
||
"""
|
||
Einmalig ausführen: Konfiguriert alle M2M-Relationen in Directus.
|
||
Idempotent – bereits vorhandene Felder/Relationen werden übersprungen.
|
||
"""
|
||
token = request.headers.get("Authorization", "")
|
||
results = []
|
||
|
||
def do(label, method, path, body=None):
|
||
data, status = _directus(method, path, token, body)
|
||
ok = status in (200, 201, 204)
|
||
results.append({"label": label, "status": status, "ok": ok,
|
||
"err": None if ok else (data.get("errors") or data)})
|
||
return ok
|
||
|
||
# ── Fix M2O: questions.object → objects ──────────────────────────────────
|
||
do("relation questions.object→objects", "POST", "/relations", {
|
||
"collection": "questions", "field": "object",
|
||
"related_collection": "objects",
|
||
"schema": {"on_delete": "SET NULL"},
|
||
"meta": {"one_deselect_action": "nullify"},
|
||
})
|
||
# Update display template so the object label shows
|
||
do("update field questions.object template", "PATCH", "/fields/questions/object", {
|
||
"meta": {"options": {"template": "{{user_notes}}"}}
|
||
})
|
||
|
||
# ── Fix M2O: questions.short_answer → words ───────────────────────────────
|
||
do("relation questions.short_answer→words", "POST", "/relations", {
|
||
"collection": "questions", "field": "short_answer",
|
||
"related_collection": "words",
|
||
"schema": {"on_delete": "SET NULL"},
|
||
"meta": {"one_deselect_action": "nullify"},
|
||
})
|
||
|
||
# ── M2M: questions ↔ objects via questions_objects ────────────────────────
|
||
do("field questions.linked_objects", "POST", "/fields/questions", {
|
||
"field": "linked_objects", "type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{objects_id.user_notes}}"},
|
||
"note": "Verknüpfte Objekte"},
|
||
})
|
||
do("field objects.linked_questions", "POST", "/fields/objects", {
|
||
"field": "linked_questions", "type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{questions_id.question_de}}"},
|
||
"note": "Verknüpfte Fragen"},
|
||
})
|
||
do("relation questions_objects.questions_id→questions", "POST", "/relations", {
|
||
"collection": "questions_objects", "field": "questions_id",
|
||
"related_collection": "questions",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "objects_id", "one_field": "linked_objects",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
do("relation questions_objects.objects_id→objects", "POST", "/relations", {
|
||
"collection": "questions_objects", "field": "objects_id",
|
||
"related_collection": "objects",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "questions_id", "one_field": "linked_questions",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
|
||
# ── M2M: words ↔ objects via words_objects ────────────────────────────────
|
||
do("field words.linked_objects", "POST", "/fields/words", {
|
||
"field": "linked_objects", "type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{objects_id.user_notes}}"},
|
||
"note": "Verknüpfte Objekte"},
|
||
})
|
||
do("field objects.linked_words", "POST", "/fields/objects", {
|
||
"field": "linked_words", "type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{words_id.title_de}}"},
|
||
"note": "Verknüpfte Wörter"},
|
||
})
|
||
do("relation words_objects.words_id→words", "POST", "/relations", {
|
||
"collection": "words_objects", "field": "words_id",
|
||
"related_collection": "words",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "objects_id", "one_field": "linked_objects",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
do("relation words_objects.objects_id→objects", "POST", "/relations", {
|
||
"collection": "words_objects", "field": "objects_id",
|
||
"related_collection": "objects",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "words_id", "one_field": "linked_words",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
|
||
# ── M2M: questions ↔ words (distractor) via questions_distractor_words ────
|
||
# Feld existiert evtl. schon ohne special → PATCH erzwingen
|
||
do("field questions.distractor_words (create)", "POST", "/fields/questions", {
|
||
"field": "distractor_words", "type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{words_id.title_de}}"},
|
||
"note": "Ablenker-Wörter (nicht in Frage/Antwort)"},
|
||
})
|
||
do("field questions.distractor_words (patch special)", "PATCH", "/fields/questions/distractor_words", {
|
||
"type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{words_id.title_de}}"},
|
||
"hidden": False,
|
||
"note": "Ablenker-Wörter (nicht in Frage/Antwort)"},
|
||
})
|
||
do("relation questions_distractor_words.questions_id→questions", "POST", "/relations", {
|
||
"collection": "questions_distractor_words", "field": "questions_id",
|
||
"related_collection": "questions",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "words_id", "one_field": "distractor_words",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
do("relation questions_distractor_words.words_id→words", "POST", "/relations", {
|
||
"collection": "questions_distractor_words", "field": "words_id",
|
||
"related_collection": "words",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "questions_id", "one_deselect_action": "nullify"},
|
||
})
|
||
|
||
# ── Backref: words → questions via existing questions_words junction ───────
|
||
do("field words.linked_questions", "POST", "/fields/words", {
|
||
"field": "linked_questions", "type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{questions_id.question_de}}"},
|
||
"note": "Fragen in denen dieses Wort vorkommt"},
|
||
})
|
||
# Patch the existing questions_words relation to add the one_field backref on words
|
||
do("patch relation questions_words.words_id one_field", "PATCH",
|
||
"/relations/questions_words/words_id", {
|
||
"meta": {"one_field": "linked_questions", "one_deselect_action": "nullify"}
|
||
})
|
||
|
||
failed = [r for r in results if not r["ok"]]
|
||
return jsonify({"ok": len(failed) == 0, "total": len(results),
|
||
"failed": len(failed), "results": results})
|
||
|
||
|
||
@app.route("/api/setup-words-pictures", methods=["POST"])
|
||
def setup_words_pictures():
|
||
"""
|
||
Einmalig ausführen: Konfiguriert die M2M-Relation words ↔ pictures
|
||
via words_pictures Junction in Directus.
|
||
Idempotent – bereits vorhandene Felder/Relationen werden übersprungen.
|
||
"""
|
||
token = request.headers.get("Authorization", "")
|
||
results = []
|
||
|
||
def do(label, method, path, body=None):
|
||
data, status = _directus(method, path, token, body)
|
||
ok = status in (200, 201, 204)
|
||
results.append({"label": label, "status": status, "ok": ok,
|
||
"err": None if ok else (data.get("errors") or data)})
|
||
return ok
|
||
|
||
# Sicherstellen dass Junction-Collection existiert
|
||
_ensure_junction("words_pictures", "words_id", "pictures_id", token)
|
||
|
||
# special=m2m auf Alias-Felder setzen (idempotent)
|
||
do("patch pictures.linked_words special", "PATCH", "/fields/pictures/linked_words", {
|
||
"type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{words_id.title_de}}"},
|
||
"note": "Verknüpfte Safe Words"},
|
||
})
|
||
do("patch words.linked_pictures special", "PATCH", "/fields/words/linked_pictures", {
|
||
"type": "alias",
|
||
"meta": {"special": ["m2m"], "interface": "list-m2m",
|
||
"options": {"template": "{{pictures_id.media}}"},
|
||
"note": "Verknüpfte Bilder"},
|
||
})
|
||
|
||
# Relation words_pictures.words_id → words
|
||
do("relation words_pictures.words_id→words", "POST", "/relations", {
|
||
"collection": "words_pictures", "field": "words_id",
|
||
"related_collection": "words",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "pictures_id",
|
||
"one_field": "linked_pictures",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
# Relation words_pictures.pictures_id → pictures
|
||
do("relation words_pictures.pictures_id→pictures", "POST", "/relations", {
|
||
"collection": "words_pictures", "field": "pictures_id",
|
||
"related_collection": "pictures",
|
||
"schema": {"on_delete": "CASCADE"},
|
||
"meta": {"junction_field": "words_id",
|
||
"one_field": "linked_words",
|
||
"one_deselect_action": "nullify"},
|
||
})
|
||
|
||
failed = [r for r in results if not r["ok"]]
|
||
return jsonify({"ok": len(failed) == 0, "total": len(results),
|
||
"failed": len(failed), "results": results})
|
||
|
||
|
||
# ── db_* Collection Routes ────────────────────────────────────────────────────
|
||
|
||
def _find_or_create_db_word(titel_de: str, level: int, token: str) -> tuple:
|
||
"""Return (word_id, is_new). Creates db_word with status=draft if missing."""
|
||
enc = urllib.parse.quote(titel_de, safe="")
|
||
data, status = _directus("GET", f"/items/db_words?filter[titel_de][_eq]={enc}&fields=id&limit=1", token)
|
||
if status == 200 and data.get("data"):
|
||
return data["data"][0]["id"], False
|
||
body = {"status": "draft", "titel_de": titel_de, "level": level}
|
||
data, status = _directus("POST", "/items/db_words", token, body)
|
||
if status in (200, 201):
|
||
return data["data"]["id"], True
|
||
raise RuntimeError(f"db_word creation failed ({status}): {data}")
|
||
|
||
|
||
@app.route("/api/directus/db-pictures", methods=["GET"])
|
||
def directus_db_pictures():
|
||
token = request.headers.get("Authorization", "")
|
||
pic_status = request.args.get("status", "draft")
|
||
data, status = _directus("GET", f"/items/db_pictures?filter[status][_eq]={pic_status}&fields=id,picture,blurhash,status,design&sort=date_created", token)
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/db-pictures/design-options", methods=["GET"])
|
||
def directus_db_pictures_design_options():
|
||
# /fields/ requires admin rights — use the static admin token, not the user session token
|
||
data, status = _directus("GET", "/fields/db_pictures/design", DIRECTUS_ADMIN_TOKEN)
|
||
if status != 200:
|
||
print(f"[design-options] Directus /fields/ returned {status}: {data}")
|
||
return jsonify({"choices": []}), 200
|
||
field_data = data.get("data") or data
|
||
choices = field_data.get("meta", {}).get("options", {}).get("choices", [])
|
||
return jsonify({"choices": choices})
|
||
|
||
|
||
@app.route("/api/directus/db-pictures/<pic_id>", methods=["PATCH", "DELETE"])
|
||
def directus_db_picture(pic_id):
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "PATCH":
|
||
data, status = _directus("PATCH", f"/items/db_pictures/{pic_id}", token, body=request.get_json(force=True, silent=True))
|
||
return jsonify(data), status
|
||
|
||
# DELETE: erst picture-Eintrag laden, dann Eintrag + Datei löschen
|
||
pic_data, pic_status = _directus("GET", f"/items/db_pictures/{pic_id}?fields=id,picture", token)
|
||
if pic_status != 200:
|
||
return jsonify({"error": "Bild nicht gefunden"}), 404
|
||
file_uuid = (pic_data.get("data") or {}).get("picture")
|
||
|
||
# db_picture-Eintrag löschen
|
||
_, del_status = _directus("DELETE", f"/items/db_pictures/{pic_id}", token)
|
||
if del_status not in (200, 204):
|
||
return jsonify({"error": f"Fehler beim Löschen des Eintrags (Status {del_status})"}), del_status
|
||
|
||
# Prüfen ob wirklich gelöscht
|
||
_, check_status = _directus("GET", f"/items/db_pictures/{pic_id}?fields=id", token)
|
||
if check_status == 200:
|
||
return jsonify({"error": "Eintrag konnte nicht gelöscht werden"}), 500
|
||
|
||
# Datei löschen (nur wenn vorhanden)
|
||
if file_uuid:
|
||
_, file_del_status = _directus("DELETE", f"/files/{file_uuid}", token)
|
||
if file_del_status not in (200, 204):
|
||
return jsonify({"error": f"Eintrag gelöscht, aber Datei konnte nicht gelöscht werden (Status {file_del_status})"}), 500
|
||
|
||
return jsonify({}), 204
|
||
|
||
|
||
@app.route("/api/directus/db-objects", methods=["GET", "POST"])
|
||
def directus_db_objects():
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "GET":
|
||
picture_id = request.args.get("picture_id", "")
|
||
fields = "id,selections,user_notes,status,picture"
|
||
path = f"/items/db_objects?filter[picture][_eq]={picture_id}&fields={fields}&sort=date_created"
|
||
data, status = _directus("GET", path, token)
|
||
return jsonify(data), status
|
||
else:
|
||
data, status = _directus("POST", "/items/db_objects", token, body=request.get_json())
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/db-objects/<obj_id>", methods=["PATCH", "DELETE"])
|
||
def directus_db_object(obj_id):
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "PATCH":
|
||
data, status = _directus("PATCH", f"/items/db_objects/{obj_id}", token, body=request.get_json())
|
||
else:
|
||
data, status = _directus("DELETE", f"/items/db_objects/{obj_id}", token)
|
||
return jsonify(data), status
|
||
|
||
|
||
@app.route("/api/directus/db-pictures/<pic_id>/words", methods=["GET", "POST"])
|
||
def directus_db_picture_words(pic_id):
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "GET":
|
||
data, s = _directus(
|
||
"GET",
|
||
f"/items/db_words_db_pictures?filter[db_pictures_id][_eq]={pic_id}"
|
||
f"&fields=id,db_words_id.id,db_words_id.titel_de,db_words_id.level,db_words_id.status&limit=500",
|
||
token,
|
||
)
|
||
if s != 200:
|
||
return jsonify({"data": []})
|
||
items = []
|
||
for entry in (data.get("data") or []):
|
||
word = entry.get("db_words_id") or {}
|
||
if not isinstance(word, dict) or not word.get("id"):
|
||
continue
|
||
if word.get("status") == "archived":
|
||
continue
|
||
items.append({
|
||
"junction_id": entry.get("id"),
|
||
"word_id": word["id"],
|
||
"titel_de": word.get("titel_de", ""),
|
||
"level": word.get("level") or 50,
|
||
"status": word.get("status", ""),
|
||
})
|
||
return jsonify({"data": items})
|
||
else:
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
words_to_save = body.get("words", [])
|
||
existing_data, _ = _directus(
|
||
"GET",
|
||
f"/items/db_words_db_pictures?filter[db_pictures_id][_eq]={pic_id}&fields=db_words_id&limit=500",
|
||
token,
|
||
)
|
||
existing_ids = set()
|
||
for e in (existing_data.get("data") or []):
|
||
wid = e.get("db_words_id")
|
||
if wid:
|
||
existing_ids.add(wid if isinstance(wid, str) else wid.get("id", ""))
|
||
saved = 0
|
||
for entry in words_to_save:
|
||
titel_de = (entry.get("titel_de") or "").strip()
|
||
level = int(entry.get("level") or 50)
|
||
if not titel_de:
|
||
continue
|
||
try:
|
||
wid, is_new = _find_or_create_db_word(titel_de, level, token)
|
||
if not is_new:
|
||
_directus("PATCH", f"/items/db_words/{wid}", token, {"level": level})
|
||
if wid not in existing_ids:
|
||
_directus("POST", "/items/db_words_db_pictures", token, {"db_words_id": wid, "db_pictures_id": pic_id})
|
||
existing_ids.add(wid)
|
||
saved += 1
|
||
except Exception as e:
|
||
print(f"[db_picture_words] error for '{titel_de}': {e}")
|
||
return jsonify({"ok": True, "saved": saved})
|
||
|
||
|
||
@app.route("/api/directus/db-pictures/<pic_id>/words/<junction_id>", methods=["DELETE"])
|
||
def directus_db_picture_word_delete(pic_id, junction_id):
|
||
token = request.headers.get("Authorization", "")
|
||
_directus("DELETE", f"/items/db_words_db_pictures/{junction_id}", token)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/directus/db-objects/<obj_id>/pairs", methods=["GET", "POST"])
|
||
def directus_db_object_pairs(obj_id):
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "GET":
|
||
junc_data, _ = _directus(
|
||
"GET",
|
||
f"/items/db_objects_db_pairs?filter[db_objects_id][_eq]={obj_id}&fields=id,db_pairs_id&limit=200",
|
||
token,
|
||
)
|
||
pair_ids = [e["db_pairs_id"] for e in (junc_data.get("data") or []) if e.get("db_pairs_id")]
|
||
if not pair_ids:
|
||
return jsonify({"data": []})
|
||
ids_param = urllib.parse.quote(",".join(pair_ids), safe="")
|
||
pairs_data, _ = _directus("GET", f"/items/db_pairs?filter[id][_in]={ids_param}&fields=id,status,level&sort=date_created&limit=200", token)
|
||
pairs = pairs_data.get("data") or []
|
||
result = []
|
||
for pair in pairs:
|
||
pid = pair["id"]
|
||
q_junc, _ = _directus("GET", f"/items/db_pairs_db_question?filter[db_pairs_id][_eq]={pid}&fields=db_question_id&limit=10", token)
|
||
q_ids = [e["db_question_id"] for e in (q_junc.get("data") or []) if e.get("db_question_id")]
|
||
questions = []
|
||
for qid in q_ids:
|
||
q_d, _ = _directus("GET", f"/items/db_question/{qid}?fields=id,question_de,level,status", token)
|
||
if q_d.get("data"):
|
||
questions.append(q_d["data"])
|
||
s_junc, _ = _directus("GET", f"/items/db_pairs_db_statement?filter[db_pairs_id][_eq]={pid}&fields=db_statement_id&limit=10", token)
|
||
s_ids = [e["db_statement_id"] for e in (s_junc.get("data") or []) if e.get("db_statement_id")]
|
||
statements = []
|
||
for sid in s_ids:
|
||
s_d, _ = _directus("GET", f"/items/db_statement/{sid}?fields=id,statement_de,level,status", token)
|
||
if s_d.get("data"):
|
||
statements.append(s_d["data"])
|
||
# Wörter für jede Question laden
|
||
for q in questions:
|
||
qid = q["id"]
|
||
qw_junc, _ = _directus("GET", f"/items/db_question_db_words?filter[db_question_id][_eq]={qid}&fields=id,db_words_id.id,db_words_id.titel_de,db_words_id.level&limit=100", token)
|
||
q["words"] = [
|
||
{"junction_id": e["id"], "word_id": e["db_words_id"]["id"], "titel_de": e["db_words_id"]["titel_de"], "level": e["db_words_id"]["level"]}
|
||
for e in (qw_junc.get("data") or []) if isinstance(e.get("db_words_id"), dict)
|
||
]
|
||
# Wörter für jedes Statement laden
|
||
for s in statements:
|
||
sid = s["id"]
|
||
sw_junc, _ = _directus("GET", f"/items/db_statement_db_words?filter[db_statement_id][_eq]={sid}&fields=id,db_words_id.id,db_words_id.titel_de,db_words_id.level&limit=100", token)
|
||
s["words"] = [
|
||
{"junction_id": e["id"], "word_id": e["db_words_id"]["id"], "titel_de": e["db_words_id"]["titel_de"], "level": e["db_words_id"]["level"]}
|
||
for e in (sw_junc.get("data") or []) if isinstance(e.get("db_words_id"), dict)
|
||
]
|
||
result.append({**pair, "questions": questions, "statements": statements})
|
||
return jsonify({"data": result})
|
||
else:
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
question_de = (body.get("question_de") or "").strip()
|
||
statement_de = (body.get("statement_de") or "").strip()
|
||
level = int(body.get("level") or 1)
|
||
words = body.get("words", [])
|
||
if not statement_de:
|
||
return jsonify({"error": "statement_de is required"}), 400
|
||
pair_resp, s = _directus("POST", "/items/db_pairs", token, {"status": "draft", "level": level})
|
||
if s not in (200, 201):
|
||
return jsonify({"error": "Failed to create pair"}), 500
|
||
pair_id = pair_resp["data"]["id"]
|
||
_directus("POST", "/items/db_objects_db_pairs", token, {"db_objects_id": obj_id, "db_pairs_id": pair_id})
|
||
stmt_resp, s = _directus("POST", "/items/db_statement", token, {"status": "draft", "statement_de": statement_de, "level": level})
|
||
if s not in (200, 201):
|
||
return jsonify({"error": "Failed to create statement"}), 500
|
||
stmt_id = stmt_resp["data"]["id"]
|
||
_directus("POST", "/items/db_pairs_db_statement", token, {"db_pairs_id": pair_id, "db_statement_id": stmt_id})
|
||
q_id = None
|
||
if question_de:
|
||
q_resp, s = _directus("POST", "/items/db_question", token, {"status": "draft", "question_de": question_de, "level": level})
|
||
if s in (200, 201):
|
||
q_id = q_resp["data"]["id"]
|
||
_directus("POST", "/items/db_pairs_db_question", token, {"db_pairs_id": pair_id, "db_question_id": q_id})
|
||
for we in words:
|
||
titel_de = (we.get("titel_de") or "").strip()
|
||
w_level = int(we.get("level") or level)
|
||
link_to = we.get("link_to", "both") # 'question', 'statement', 'both'
|
||
if not titel_de:
|
||
continue
|
||
try:
|
||
wid, _ = _find_or_create_db_word(titel_de, w_level, token)
|
||
if link_to in ("statement", "both"):
|
||
_directus("POST", "/items/db_statement_db_words", token, {"db_statement_id": stmt_id, "db_words_id": wid})
|
||
if link_to in ("question", "both") and q_id:
|
||
_directus("POST", "/items/db_question_db_words", token, {"db_question_id": q_id, "db_words_id": wid})
|
||
except Exception as e:
|
||
print(f"[db_object_pairs] word error '{titel_de}': {e}")
|
||
return jsonify({"ok": True, "pair_id": pair_id, "statement_id": stmt_id, "question_id": q_id})
|
||
|
||
|
||
@app.route("/api/directus/db-objects/<obj_id>/words", methods=["GET", "POST"])
|
||
def directus_db_object_words(obj_id):
|
||
"""Gibt alle db_words zurück, die via db_objects_db_words mit dem Objekt verknüpft sind."""
|
||
token = request.headers.get("Authorization", "")
|
||
if request.method == "GET":
|
||
data, s = _directus(
|
||
"GET",
|
||
f"/items/db_objects_db_words?filter[db_objects_id][_eq]={obj_id}"
|
||
f"&fields=id,db_words_id.id,db_words_id.titel_de,db_words_id.level,db_words_id.status&limit=500",
|
||
token,
|
||
)
|
||
if s != 200:
|
||
return jsonify({"data": []})
|
||
items = []
|
||
for entry in (data.get("data") or []):
|
||
word = entry.get("db_words_id") or {}
|
||
if not isinstance(word, dict) or not word.get("id"):
|
||
continue
|
||
if word.get("status") == "archived":
|
||
continue
|
||
items.append({
|
||
"junction_id": entry.get("id"),
|
||
"word_id": word["id"],
|
||
"titel_de": word.get("titel_de", ""),
|
||
"level": word.get("level") or 50,
|
||
"status": word.get("status", ""),
|
||
})
|
||
return jsonify({"data": items})
|
||
else: # POST — add a single word to the object (M2M, allows multiple)
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
titel_de = (body.get("titel_de") or "").strip()
|
||
level = int(body.get("level") or 50)
|
||
if not titel_de:
|
||
return jsonify({"error": "titel_de required"}), 400
|
||
wid, _ = _find_or_create_db_word(titel_de, level, token)
|
||
# Check if already linked to avoid duplicates
|
||
existing, _ = _directus("GET",
|
||
f"/items/db_objects_db_words?filter[db_objects_id][_eq]={obj_id}&filter[db_words_id][_eq]={wid}&fields=id&limit=1",
|
||
token)
|
||
if existing.get("data"):
|
||
return jsonify({"ok": True, "already_exists": True, "word_id": wid, "junction_id": existing["data"][0]["id"]})
|
||
resp, s = _directus("POST", "/items/db_objects_db_words", token,
|
||
{"db_objects_id": obj_id, "db_words_id": wid})
|
||
junction_id = resp["data"]["id"] if s in (200, 201) else None
|
||
return jsonify({"ok": True, "word_id": wid, "junction_id": junction_id})
|
||
|
||
|
||
@app.route("/api/directus/db-objects/<obj_id>/words/<junction_id>", methods=["DELETE"])
|
||
def directus_db_object_word_delete(obj_id, junction_id):
|
||
token = request.headers.get("Authorization", "")
|
||
_directus("DELETE", f"/items/db_objects_db_words/{junction_id}", token)
|
||
return jsonify({"ok": True})
|
||
|
||
|
||
@app.route("/api/directus/db-pairs/<pair_id>", methods=["PATCH", "DELETE"])
|
||
def directus_db_pair(pair_id):
|
||
"""PATCH: level + question/statement inline aktualisieren.
|
||
DELETE: Pair + alle verknüpften Junctions + Question + Statement entfernen."""
|
||
token = request.headers.get("Authorization", "")
|
||
|
||
if request.method == "PATCH":
|
||
body = request.get_json(force=True, silent=True) or {}
|
||
|
||
# Pair-Level updaten
|
||
if "level" in body:
|
||
_directus("PATCH", f"/items/db_pairs/{pair_id}", token, {"level": body["level"]})
|
||
|
||
# Statement updaten (nur erstes verknüpftes)
|
||
if "statement_de" in body:
|
||
s_junc, _ = _directus("GET",
|
||
f"/items/db_pairs_db_statement?filter[db_pairs_id][_eq]={pair_id}&fields=db_statement_id&limit=1", token)
|
||
for e in (s_junc.get("data") or []):
|
||
sid = e.get("db_statement_id")
|
||
if sid:
|
||
_directus("PATCH", f"/items/db_statement/{sid}", token,
|
||
{"statement_de": body["statement_de"], "level": body.get("level", 50)})
|
||
|
||
# Question updaten/anlegen
|
||
if "question_de" in body:
|
||
question_de = (body["question_de"] or "").strip()
|
||
q_junc, _ = _directus("GET",
|
||
f"/items/db_pairs_db_question?filter[db_pairs_id][_eq]={pair_id}&fields=db_question_id&limit=1", token)
|
||
existing_q = (q_junc.get("data") or [])
|
||
if existing_q:
|
||
qid = existing_q[0].get("db_question_id")
|
||
if question_de:
|
||
_directus("PATCH", f"/items/db_question/{qid}", token,
|
||
{"question_de": question_de, "level": body.get("level", 50)})
|
||
else:
|
||
# Frage gelöscht → Junction + Item entfernen
|
||
_directus("DELETE", f"/items/db_pairs_db_question", token,
|
||
[e["id"] for e in existing_q if e.get("id")])
|
||
_directus("DELETE", f"/items/db_question/{qid}", token)
|
||
elif question_de:
|
||
# Neue Frage anlegen und verknüpfen
|
||
q_resp, s = _directus("POST", "/items/db_question", token,
|
||
{"status": "draft", "question_de": question_de,
|
||
"level": body.get("level", 50)})
|
||
if s in (200, 201):
|
||
qid = q_resp["data"]["id"]
|
||
_directus("POST", "/items/db_pairs_db_question", token,
|
||
{"db_pairs_id": pair_id, "db_question_id": qid})
|
||
|
||
# Neue Wörter hinzufügen
|
||
words_add = body.get("words_add", [])
|
||
# Statement-ID und Question-ID nochmal laden
|
||
s_junc2, _ = _directus("GET", f"/items/db_pairs_db_statement?filter[db_pairs_id][_eq]={pair_id}&fields=db_statement_id&limit=1", token)
|
||
stmt_id_edit = ((s_junc2.get("data") or [{}])[0] or {}).get("db_statement_id")
|
||
q_junc2, _ = _directus("GET", f"/items/db_pairs_db_question?filter[db_pairs_id][_eq]={pair_id}&fields=db_question_id&limit=1", token)
|
||
q_id_edit = ((q_junc2.get("data") or [{}])[0] or {}).get("db_question_id")
|
||
|
||
for we in words_add:
|
||
titel_de = (we.get("titel_de") or "").strip()
|
||
w_level = int(we.get("level") or 50)
|
||
link_to = we.get("link_to", "both")
|
||
if not titel_de:
|
||
continue
|
||
try:
|
||
wid, _ = _find_or_create_db_word(titel_de, w_level, token)
|
||
if link_to in ("statement", "both") and stmt_id_edit:
|
||
_directus("POST", "/items/db_statement_db_words", token, {"db_statement_id": stmt_id_edit, "db_words_id": wid})
|
||
if link_to in ("question", "both") and q_id_edit:
|
||
_directus("POST", "/items/db_question_db_words", token, {"db_question_id": q_id_edit, "db_words_id": wid})
|
||
except Exception as e:
|
||
print(f"[db_pair_patch] word error '{titel_de}': {e}")
|
||
|
||
# Wörter entfernen
|
||
words_remove = body.get("words_remove", [])
|
||
for wr in words_remove:
|
||
link_to = wr.get("link_to", "both")
|
||
junction_id = wr.get("junction_id")
|
||
if not junction_id:
|
||
continue
|
||
if link_to in ("statement", "both"):
|
||
_directus("DELETE", f"/items/db_statement_db_words/{junction_id}", token)
|
||
if link_to in ("question", "both"):
|
||
_directus("DELETE", f"/items/db_question_db_words/{junction_id}", token)
|
||
|
||
return jsonify({"ok": True})
|
||
|
||
else: # DELETE
|
||
# Junction zu Objekten entfernen
|
||
obj_junc, _ = _directus("GET",
|
||
f"/items/db_objects_db_pairs?filter[db_pairs_id][_eq]={pair_id}&fields=id&limit=100", token)
|
||
obj_junc_ids = [e["id"] for e in (obj_junc.get("data") or []) if e.get("id")]
|
||
if obj_junc_ids:
|
||
_directus("DELETE", "/items/db_objects_db_pairs", token, obj_junc_ids)
|
||
|
||
# Questions löschen
|
||
q_junc, _ = _directus("GET",
|
||
f"/items/db_pairs_db_question?filter[db_pairs_id][_eq]={pair_id}&fields=id,db_question_id&limit=100", token)
|
||
for e in (q_junc.get("data") or []):
|
||
if e.get("id"):
|
||
_directus("DELETE", f"/items/db_pairs_db_question/{e['id']}", token)
|
||
if e.get("db_question_id"):
|
||
_directus("DELETE", f"/items/db_question/{e['db_question_id']}", token)
|
||
|
||
# Statements löschen
|
||
s_junc, _ = _directus("GET",
|
||
f"/items/db_pairs_db_statement?filter[db_pairs_id][_eq]={pair_id}&fields=id,db_statement_id&limit=100", token)
|
||
for e in (s_junc.get("data") or []):
|
||
if e.get("id"):
|
||
_directus("DELETE", f"/items/db_pairs_db_statement/{e['id']}", token)
|
||
if e.get("db_statement_id"):
|
||
_directus("DELETE", f"/items/db_statement/{e['db_statement_id']}", token)
|
||
|
||
# Pair selbst löschen
|
||
_, s = _directus("DELETE", f"/items/db_pairs/{pair_id}", token)
|
||
return jsonify({"ok": s in (200, 204)})
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=8000, debug=True)
|
||
|