Files
hejyou_content_creation/app.py
admin 79c6926cec Fix auth login URL and migrate users/me endpoint
- Fix SNAKKIMO_URL.replace('/api','') bug that stripped both /api occurrences
  producing wrong auth URL → use endswith/slice to remove only trailing /api
- Replace directus_users_me with JWT decode (snakkimo has no /users/me endpoint)
  Returns Directus-compatible shape with role.admin_access for UI admin checks

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-21 16:46:38 +02:00

1714 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from pathlib import Path
from datetime import datetime
from uuid import uuid4
import json
import os
import urllib.request
import urllib.error
import urllib.parse
from flask import Flask, send_from_directory, request, jsonify
from flask_cors import CORS
from PIL import Image
import ollama
import anthropic as _anthropic_sdk
ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "")
SNAKKIMO_URL = os.environ.get("SNAKKIMO_URL", "https://hyggecraftery.com/api/snakkimo/api")
SNAKKIMO_TOKEN = os.environ.get("SNAKKIMO_TOKEN", "")
BASE_DIR = Path(__file__).resolve().parent
PICTURES_DIR = BASE_DIR / "pictures"
OBJECTS_DIR = BASE_DIR / "objects_image"
SENTENCE_DIR = BASE_DIR / "sentence_object"
PROMPTS_DIR = BASE_DIR / "prompts"
app = Flask(
__name__,
template_folder=str(BASE_DIR / "templates"),
static_folder=str(BASE_DIR / "static"),
)
CORS(app)
def read_prompt(filepath: Path, fallback: str) -> str:
"""
Hilfsfunktion, um Prompt-Dateien zu lesen.
"""
try:
if filepath.exists():
return filepath.read_text(encoding="utf-8").strip()
except Exception:
pass
return fallback.strip()
def _snakkimo(method, path, token=None, body=None):
"""API-Aufruf gegen snakkimo-API. Gibt (data, status) zurück.
Antwortet im Directus-kompatiblen Format: {"data": ...} für Listen/Objekte.
"""
auth = token or SNAKKIMO_TOKEN
# Wenn kein "Bearer " Prefix, füge ihn hinzu
if auth and not auth.startswith("Bearer "):
auth = f"Bearer {auth}"
headers = {"Content-Type": "application/json"}
if auth:
headers["Authorization"] = auth
req = urllib.request.Request(
f"{SNAKKIMO_URL}{path}",
data=json.dumps(body).encode() if body is not None else None,
headers=headers,
method=method,
)
try:
with urllib.request.urlopen(req) as resp:
raw = resp.read().decode("utf-8")
if not raw:
return {}, resp.status
parsed = json.loads(raw)
# Wrapping in {"data": ...} für Directus-Kompatibilität
if isinstance(parsed, (list, dict)):
return {"data": parsed}, resp.status
return parsed, resp.status
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8")
return json.loads(raw) if raw else {}, e.code
# Legacy-Alias wird schrittweise entfernt
def _directus(method, path, token=None, body=None):
return _snakkimo(method, path, token, body)
@app.route("/api/directus/auth/login", methods=["POST"])
def directus_auth_login():
"""Proxy: Login über snakkimo-API."""
body = request.get_json(force=True, silent=True) or {}
# Directus-Format: {"email": ..., "password": ...} → snakkimo gleich
req_body = {"email": body.get("email", ""), "password": body.get("password", "")}
# Strip trailing /api to get base URL: https://host/api/snakkimo
snakkimo_base = SNAKKIMO_URL[:-4] if SNAKKIMO_URL.endswith('/api') else SNAKKIMO_URL
req = urllib.request.Request(
f"{snakkimo_base}/auth/login",
data=json.dumps(req_body).encode(),
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req) as resp:
parsed = json.loads(resp.read().decode("utf-8"))
# Directus-kompatibles Format: {"data": {"access_token": ...}}
token = parsed.get("token", "")
return jsonify({"data": {"access_token": token, "user": parsed.get("user", {})}}), 200
except urllib.error.HTTPError as e:
return jsonify(json.loads(e.read().decode("utf-8"))), e.code
@app.route("/api/directus/users/me", methods=["GET"])
def directus_users_me():
"""Returns current user info decoded from JWT in Directus-compatible shape."""
auth = request.headers.get("Authorization", "")
raw_token = auth.removeprefix("Bearer ").strip()
if not raw_token:
return jsonify({"error": "Unauthorized"}), 401
try:
# Decode JWT payload without verification (signature checked by snakkimo)
import base64
payload_b64 = raw_token.split(".")[1]
# Add padding if needed
payload_b64 += "=" * (-len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(payload_b64).decode("utf-8"))
role = payload.get("role", "end-user")
is_admin = role == "admin"
return jsonify({"data": {
"id": payload.get("userId", ""),
"email": payload.get("email", ""),
"first_name": "",
"last_name": "",
"role": {"id": role, "name": role, "admin_access": is_admin},
}}), 200
except Exception as e:
return jsonify({"error": f"Token decode failed: {e}"}), 401
@app.route("/api/directus/pictures", methods=["GET"])
def directus_pictures():
"""Proxy: Bilder nach Status filtern."""
token = request.headers.get("Authorization", "")
pic_status = request.args.get("status", "uploaded")
data, status = _snakkimo("GET", f"/pictures?status={pic_status}&limit=500", token)
# Normalize to Directus-compatible shape: id, media→picture_link, status
items = data.get("data") if isinstance(data.get("data"), list) else []
normalized = [{"id": p["id"], "media": p.get("picture_link", ""), "status": p.get("status", ""), "design": p.get("design", "")} for p in items]
return jsonify({"data": normalized}), status
@app.route("/api/directus/pictures/<pic_id>", methods=["PATCH"])
def directus_picture(pic_id):
"""Proxy: Bild aktualisieren."""
token = request.headers.get("Authorization", "")
data, status = _snakkimo("PATCH", f"/pictures/{pic_id}", token, body=request.get_json())
return jsonify(data), status
@app.route("/api/directus/objects", methods=["GET", "POST"])
def directus_objects():
"""Proxy: Objekte laden (GET) oder anlegen (POST)."""
token = request.headers.get("Authorization", "")
if request.method == "GET":
picture_id = request.args.get("picture_id", "")
path = f"/objects?picture_id={picture_id}&limit=500" if picture_id else "/objects?limit=500"
data, status = _snakkimo("GET", path, token)
# Normalize: selections, notes→user_notes
items = data.get("data") if isinstance(data.get("data"), list) else []
normalized = [{"id": o["id"], "selections": o.get("selections"), "user_notes": o.get("notes", ""), "status": o.get("status", "")} for o in items]
return jsonify({"data": normalized}), status
else:
body = request.get_json(force=True, silent=True) or {}
# Map user_notes → notes
mapped = {"selections": body.get("selections"), "notes": body.get("user_notes") or body.get("notes")}
data, status = _snakkimo("POST", "/objects", token, body=mapped)
return jsonify(data), status
@app.route("/api/directus/objects/<obj_id>", methods=["PATCH", "DELETE"])
def directus_object(obj_id):
"""Proxy: Objekt aktualisieren (PATCH) oder löschen (DELETE)."""
token = request.headers.get("Authorization", "")
if request.method == "PATCH":
body = request.get_json(force=True, silent=True) or {}
mapped = {}
if "user_notes" in body:
mapped["notes"] = body["user_notes"]
if "notes" in body:
mapped["notes"] = body["notes"]
if "status" in body:
mapped["status"] = body["status"]
if "selections" in body:
mapped["selections"] = body["selections"]
data, status = _snakkimo("PATCH", f"/objects/{obj_id}", token, body=mapped)
else:
data, status = _snakkimo("DELETE", f"/objects/{obj_id}", token)
return jsonify(data), status
def _setup_words_pictures(token: str):
"""No-op: snakkimo verwaltet M2M-Relationen intern."""
pass
@app.route("/api/directus/pictures/<pic_id>/words", methods=["GET", "POST"])
def directus_picture_words(pic_id):
"""Proxy: Wörter eines Bildes laden (GET) oder speichern (POST)."""
token = request.headers.get("Authorization", "")
if request.method == "GET":
data, status = _snakkimo("GET", f"/pictures/{pic_id}/words", token)
if status != 200:
return jsonify({"data": []})
words = data.get("data") if isinstance(data.get("data"), list) else []
items = [
{
"junction_id": w["id"], # word_id als junction_id für DELETE-Kompatibilität
"word_id": w["id"],
"titel_de": w.get("titel_de", ""),
"title_de": w.get("titel_de", ""), # legacy alias
"level": w.get("difficulty_level") or 50,
"status": w.get("status", ""),
}
for w in words if w.get("status") != "blocked"
]
return jsonify({"data": items})
else: # POST — find-or-create words and link
body = request.get_json(force=True, silent=True) or {}
words_to_save = body.get("words", [])
# Bereits verknüpfte Word-IDs laden
existing_data, _ = _snakkimo("GET", f"/pictures/{pic_id}/words", token)
existing_ids = {w["id"] for w in (existing_data.get("data") or []) if isinstance(w, dict)}
saved = 0
for entry in words_to_save:
titel_de = (entry.get("titel_de") or entry.get("title_de") or "").strip()
level = int(entry.get("level") or 50)
if not titel_de:
continue
try:
wid, is_new = _find_or_create_word(titel_de, level, token)
if not is_new:
_snakkimo("PATCH", f"/words/{wid}", token, {"difficulty_level": level})
if wid not in existing_ids:
_snakkimo("POST", f"/words/{wid}/pictures/{pic_id}", token)
existing_ids.add(wid)
saved += 1
except Exception as e:
print(f"[picture_words] error for '{titel_de}': {e}")
return jsonify({"ok": True, "saved": saved})
@app.route("/api/images", methods=["GET"])
def list_images():
"""
Gibt alle Bilder im pictures-Ordner zurück.
Query-Parameter: ?mode=draw (Standard) oder ?mode=generate
draw → Bilder ohne _saved-Suffix
generate → Bilder mit _saved-Suffix
"""
mode = request.args.get("mode", "draw")
PICTURES_DIR.mkdir(parents=True, exist_ok=True)
if mode == "generate":
image_paths = sorted(
[f for f in PICTURES_DIR.iterdir() if f.is_file() and f.stem.endswith("_saved")],
key=lambda p: p.stat().st_mtime,
)
else:
image_paths = sorted(
[f for f in PICTURES_DIR.iterdir() if f.is_file() and not f.stem.endswith("_saved")],
key=lambda p: p.stat().st_mtime,
)
return jsonify({"images": [p.name for p in image_paths]})
REACT_BUILD_DIR = BASE_DIR / "static" / "react"
def _serve_spa():
"""Liefert die React SPA index.html für alle Frontend-Routen."""
return send_from_directory(REACT_BUILD_DIR, "index.html")
@app.route("/")
@app.route("/draw")
@app.route("/generate")
@app.route("/annotate")
def serve_spa():
return _serve_spa()
@app.route("/assets/<path:filename>")
def serve_react_assets(filename: str):
"""Liefert JS/CSS-Assets aus dem Vite-Build."""
return send_from_directory(REACT_BUILD_DIR / "assets", filename)
@app.route("/pictures/<path:filename>")
def serve_picture(filename: str):
# Statisches Ausliefern der Originalbilder
return send_from_directory(PICTURES_DIR, filename)
@app.route("/objects_image/<path:filename>")
def serve_object_image(filename: str):
# Ausliefern der gespeicherten Ausschnitt-Bilder
return send_from_directory(OBJECTS_DIR, filename)
@app.route("/api/objects", methods=["GET"])
def list_objects_for_image():
"""
Gibt alle gespeicherten Objekte (Ausschnitte) zu einem Quellbild zurück.
Query-Parameter:
?filename=testbild.png
"""
filename = request.args.get("filename")
if not filename:
return jsonify({"error": "Missing filename query parameter"}), 400
# Für *_saved-Bilder auch Objekte berücksichtigen, deren source_filename
# noch auf den ursprünglichen Namen ohne "_saved" zeigt (Kompatibilität)
base_filename = filename
legacy_filename = None
if base_filename.endswith(".png") or base_filename.endswith(".jpg") or base_filename.endswith(".jpeg") or base_filename.endswith(".webp"):
stem = base_filename.rsplit(".", 1)[0]
suffix = base_filename.rsplit(".", 1)[1]
if stem.endswith("_saved"):
legacy_filename = f"{stem[:-6]}.{suffix}"
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
objects = []
for meta_file in OBJECTS_DIR.glob("*.txt"):
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
except Exception:
continue
src_name = meta.get("source_filename")
if src_name != filename and (legacy_filename is None or src_name != legacy_filename):
continue
objects.append(
{
"id": meta.get("id"),
"image_file": meta.get("image_file"),
"title_de": meta.get("title_de", ""),
"position_de": meta.get("position_de", ""),
"action_de": meta.get("action_de", ""),
"condition_de": meta.get("condition_de", ""),
# Von KI erzeugte Details (optional)
"label_en": meta.get("label_en"),
"label_de": meta.get("label_de"),
"label_se": meta.get("label_se"),
"color_en": meta.get("color_en"),
"adjective_en": meta.get("adjective_en"),
"action_verb_en": meta.get("action_verb_en"),
"preposition_en": meta.get("preposition_en"),
"relative_position_en": meta.get("relative_position_en"),
"season_en": meta.get("season_en"),
"mode": meta.get("mode"),
"bbox": meta.get("bbox"),
"polygon": meta.get("polygon"),
"hierarchy": meta.get("hierarchy", 1),
"parent_id": meta.get("parent_id"),
"created_at": meta.get("created_at"),
}
)
# Älteste zuerst sortieren (1 = ältestes Objekt)
objects.sort(key=lambda o: o.get("created_at") or "")
# Laufende Nummer je Bild (1..n) nach obiger Sortierung
for idx, obj in enumerate(objects, start=1):
obj["index"] = idx
return jsonify({"objects": objects})
@app.route("/api/object/<obj_id>/hierarchy", methods=["POST"])
def update_object_hierarchy(obj_id: str):
"""
Aktualisiert die Hierarchie (1, 2 oder 3) eines gespeicherten Objekts.
"""
data = request.get_json(force=True, silent=True) or {}
try:
hierarchy = int(data.get("hierarchy"))
except (TypeError, ValueError):
return jsonify({"error": "Invalid hierarchy"}), 400
if hierarchy not in (1, 2, 3):
return jsonify({"error": "Hierarchy must be 1, 2 or 3"}), 400
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
meta["hierarchy"] = hierarchy
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({"id": obj_id, "hierarchy": hierarchy})
@app.route("/api/object/<obj_id>", methods=["POST"])
def update_object_meta(obj_id: str):
"""
Aktualisiert Text-Metadaten (title_de, position_de, action_de, condition_de)
eines gespeicherten Objekts.
"""
data = request.get_json(force=True, silent=True) or {}
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
for key in ("title_de", "position_de", "action_de", "condition_de"):
if key in data:
meta[key] = data.get(key, "")
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({k: meta.get(k) for k in ("id", "title_de", "position_de", "action_de", "condition_de")})
@app.route("/api/object/<obj_id>/parent", methods=["POST"])
def update_object_parent(obj_id: str):
"""
Aktualisiert die Parent-Relation eines Objekts.
Erwartet JSON: { "parent_id": "<uuid>" | null }
"""
data = request.get_json(force=True, silent=True) or {}
parent_id = data.get("parent_id")
meta_path = OBJECTS_DIR / f"{obj_id}.txt"
if not meta_path.exists():
return jsonify({"error": "Object not found"}), 404
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read meta file"}), 500
# Optional: prüfen, ob parent_id existiert und zum selben Bild gehört
if parent_id:
parent_meta_path = OBJECTS_DIR / f"{parent_id}.txt"
if not parent_meta_path.exists():
return jsonify({"error": "Parent object not found"}), 400
try:
parent_meta = json.loads(parent_meta_path.read_text(encoding="utf-8"))
except Exception:
return jsonify({"error": "Could not read parent meta file"}), 500
if parent_meta.get("source_filename") != meta.get("source_filename"):
return jsonify({"error": "Parent must belong to same source image"}), 400
meta["parent_id"] = parent_id or None
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify({"id": obj_id, "parent_id": meta.get("parent_id")})
@app.route("/api/object/<obj_id>/generate_details", methods=["POST"])
def generate_object_details(obj_id: str):
"""Llama entfernt — lokale KI nicht mehr verfügbar."""
return jsonify({"error": "Lokale KI (Llama) wurde entfernt. Diese Funktion steht nicht mehr zur Verfügung."}), 501
def load_sentences_for_object(obj_id: str):
"""
Lädt alle gespeicherten Sätze für ein Objekt aus sentence_object/<obj_id>.txt.
Rückgabe: Liste von Dicts mit mindestens question_en, answer_en, created_at.
"""
SENTENCE_DIR.mkdir(parents=True, exist_ok=True)
sentence_path = SENTENCE_DIR / f"{obj_id}.txt"
if not sentence_path.exists():
return []
try:
data = json.loads(sentence_path.read_text(encoding="utf-8"))
if isinstance(data, list):
return data
if isinstance(data, dict):
# Fallback: einzelnes Objekt in Liste wrappen
return [data]
return []
except Exception:
return []
@app.route("/api/object/<obj_id>/sentences", methods=["GET"])
def get_object_sentences(obj_id: str):
"""
Gibt alle bisher erzeugten KI-Sätze zu einem Objekt zurück.
"""
sentences = load_sentences_for_object(obj_id)
return jsonify({"object_id": obj_id, "sentences": sentences})
@app.route("/api/object/<obj_id>/generate_sentence", methods=["POST"])
def generate_object_sentence(obj_id: str):
"""Llama entfernt — lokale KI nicht mehr verfügbar."""
return jsonify({"error": "Lokale KI (Llama) wurde entfernt. Diese Funktion steht nicht mehr zur Verfügung."}), 501
@app.route("/api/image/save", methods=["POST"])
def save_image():
"""
Markiert ein Bild und alle zugehörigen Objekte als 'gespeichert',
indem der Dateiname um '_saved' erweitert wird und die Metadaten
der Objekte angepasst werden.
"""
data = request.get_json(force=True, silent=True) or {}
filename = data.get("filename")
if not filename:
return jsonify({"error": "Missing filename"}), 400
src_path = PICTURES_DIR / filename
if not src_path.exists():
return jsonify({"error": "Image not found"}), 404
if src_path.stem.endswith("_saved"):
return jsonify({"error": "Image already saved"}), 400
new_name = f"{src_path.stem}_saved{src_path.suffix}"
dst_path = PICTURES_DIR / new_name
# Bild umbenennen
src_path.rename(dst_path)
# Zugehörige Objekte aktualisieren
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
for meta_file in OBJECTS_DIR.glob("*.txt"):
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
except Exception:
continue
if meta.get("source_filename") != filename:
continue
meta["source_filename"] = new_name
meta_file.write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return jsonify({"old_name": filename, "new_name": new_name})
@app.route("/api/crop", methods=["POST"])
def crop_image():
"""
Erwartet JSON:
{
"filename": "bild.jpg",
"selections": [
{
"number": 1,
"mode": "rect" | "polygon",
"bbox": {"x": 10, "y": 20, "width": 200, "height": 150}, // für rect
"polygon": [{"x": 10, "y": 20}, ...] // für polygon
},
...
],
"title_de": "...",
"position_de": "...",
"action_de": "...",
"condition_de": "..."
}
ODER (Legacy-Format für Kompatibilität):
{
"filename": "bild.jpg",
"mode": "rect" | "polygon",
"x": 10, "y": 20, "width": 200, "height": 150, // für rect
"polygon": [{"x": 10, "y": 20}, ...] // für polygon
}
"""
data = request.get_json(force=True, silent=True) or {}
if "filename" not in data:
return jsonify({"error": "Missing filename"}), 400
src_path = PICTURES_DIR / data["filename"]
if src_path.stem.endswith("_saved"):
return jsonify({"error": "Cannot crop on saved image"}), 400
if not src_path.exists():
return jsonify({"error": "Source image not found"}), 404
OBJECTS_DIR.mkdir(parents=True, exist_ok=True)
# UUID für Bild + Metadaten
obj_id = uuid4().hex
timestamp = datetime.now().isoformat()
out_image_name = f"{obj_id}{src_path.suffix}"
out_image_path = OBJECTS_DIR / out_image_name
out_meta_path = OBJECTS_DIR / f"{obj_id}.txt"
# Prüfen, ob neues Format (selections) oder Legacy-Format
selections = data.get("selections")
if selections and isinstance(selections, list) and len(selections) > 0:
# Neues Format: mehrere Auswahlen
processed_selections = []
with Image.open(src_path) as img:
img_w, img_h = img.size
for sel in selections:
sel_mode = sel.get("mode")
if sel_mode == "rect":
bbox = sel.get("bbox")
if not bbox:
continue
try:
x = int(bbox.get("x", 0))
y = int(bbox.get("y", 0))
w = int(bbox.get("width", 0))
h = int(bbox.get("height", 0))
except (ValueError, TypeError):
continue
if w <= 0 or h <= 0:
continue
x2 = min(x + w, img_w)
y2 = min(y + h, img_h)
x1 = max(0, x)
y1 = max(0, y)
if x1 >= x2 or y1 >= y2:
continue
processed_selections.append({
"number": sel.get("number", len(processed_selections) + 1),
"mode": "rect",
"bbox": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1},
})
elif sel_mode == "polygon":
polygon = sel.get("polygon")
if not isinstance(polygon, list) or len(polygon) < 3:
continue
try:
xs = [int(p.get("x", 0)) for p in polygon]
ys = [int(p.get("y", 0)) for p in polygon]
except (KeyError, TypeError, ValueError):
continue
min_x = max(min(xs), 0)
min_y = max(min(ys), 0)
max_x = min(max(xs), img_w)
max_y = min(max(ys), img_h)
if min_x >= max_x or min_y >= max_y:
continue
processed_selections.append({
"number": sel.get("number", len(processed_selections) + 1),
"mode": "polygon",
"polygon": polygon,
"bbox": {"x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y},
})
if not processed_selections:
return jsonify({"error": "No valid selections provided"}), 400
# Erstes Bild aus erster Auswahl erstellen
first_sel = processed_selections[0]
try:
with Image.open(src_path) as img:
if first_sel["mode"] == "rect":
bbox = first_sel["bbox"]
x1 = bbox["x"]
y1 = bbox["y"]
x2 = x1 + bbox["width"]
y2 = y1 + bbox["height"]
cropped = img.crop((x1, y1, x2, y2))
else: # polygon - verwende Bounding-Box für das Bild
bbox = first_sel["bbox"]
x1 = bbox["x"]
y1 = bbox["y"]
x2 = x1 + bbox["width"]
y2 = y1 + bbox["height"]
cropped = img.crop((x1, y1, x2, y2))
# Bild speichern (Format beibehalten)
img_format = img.format or "PNG"
cropped.save(out_image_path, format=img_format)
print(f"[crop_image] Bild gespeichert: {out_image_path} (Größe: {cropped.size}, Format: {img_format})")
except Exception as e:
print(f"[crop_image] Fehler beim Erstellen des Bildes: {e}")
return jsonify({"error": f"Failed to create image: {e}"}), 500
# Metadaten mit allen Auswahlen speichern
meta = {
"id": obj_id,
"created_at": timestamp,
"source_filename": data["filename"],
"image_file": out_image_name,
"hierarchy": 1,
"parent_id": None,
"title_de": data.get("title_de", ""),
"position_de": data.get("position_de", ""),
"action_de": data.get("action_de", ""),
"condition_de": data.get("condition_de", ""),
"selections": processed_selections,
}
else:
# Legacy-Format: einzelne Auswahl (für Rückwärtskompatibilität)
mode = data.get("mode", "rect")
if mode not in {"rect", "polygon"}:
return jsonify({"error": "Invalid mode"}), 400
with Image.open(src_path) as img:
img_w, img_h = img.size
if mode == "rect":
try:
x = int(data["x"])
y = int(data["y"])
w = int(data["width"])
h = int(data["height"])
except (ValueError, TypeError, KeyError):
return jsonify({"error": "Invalid rectangle coordinates"}), 400
if w <= 0 or h <= 0:
return jsonify({"error": "Width and height must be positive"}), 400
x2 = min(x + w, img_w)
y2 = min(y + h, img_h)
x1 = max(0, x)
y1 = max(0, y)
if x1 >= x2 or y1 >= y2:
return jsonify({"error": "Crop area outside of image"}), 400
cropped = img.crop((x1, y1, x2, y2))
bbox = {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1}
else: # polygon
polygon = data.get("polygon") or []
if not isinstance(polygon, list) or len(polygon) < 3:
return jsonify({"error": "Polygon must have at least 3 points"}), 400
try:
xs = [int(p["x"]) for p in polygon]
ys = [int(p["y"]) for p in polygon]
except (KeyError, TypeError, ValueError):
return jsonify({"error": "Invalid polygon coordinates"}), 400
min_x = max(min(xs), 0)
min_y = max(min(ys), 0)
max_x = min(max(xs), img_w)
max_y = min(max(ys), img_h)
if min_x >= max_x or min_y >= max_y:
return jsonify({"error": "Polygon bounding box outside of image"}), 400
cropped = img.crop((min_x, min_y, max_x, max_y))
bbox = {
"x": min_x,
"y": min_y,
"width": max_x - min_x,
"height": max_y - min_y,
}
cropped.save(out_image_path)
# Metadaten als JSON in .txt speichern
meta = {
"id": obj_id,
"created_at": timestamp,
"source_filename": data["filename"],
"mode": mode,
"bbox": bbox,
"image_file": out_image_name,
"hierarchy": 1,
"parent_id": None,
"title_de": data.get("title_de", ""),
"position_de": data.get("position_de", ""),
"action_de": data.get("action_de", ""),
"condition_de": data.get("condition_de", ""),
}
if mode == "polygon":
meta["polygon"] = data.get("polygon")
out_meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return jsonify(
{
"id": obj_id,
"image_file": out_image_name,
"meta_file": out_meta_path.name,
}
)
# ── Claude / API Helpers ──────────────────────────────────────────────────────
def _ensure_junction(collection: str, field1: str, field2: str, token: str):
"""No-op: snakkimo verwaltet Junctions intern."""
pass
def _ensure_link(collection: str, match: dict, payload: dict, token: str):
"""No-op: snakkimo verwaltet Links intern."""
pass
def _find_or_create_word(titel_de: str, level: int, token: str):
"""Return (word_id, is_new). Erstellt word mit status=requested falls nicht vorhanden."""
enc = urllib.parse.quote(titel_de, safe="")
data, status = _snakkimo("GET", f"/words?titel_de={enc}&limit=1", token)
items = data.get("data") or []
if status == 200 and items:
first = items[0] if isinstance(items, list) else items
return first["id"], False
body = {"titel_de": titel_de, "difficulty_level": level}
data, status = _snakkimo("POST", "/words", token, body)
if status in (200, 201):
item = data.get("data") or data
return item["id"], True
raise RuntimeError(f"Word creation failed ({status}): {data}")
def _find_or_create_question(question_de: str, answer_de: str, level: int,
short_answer_id, short_answer_de: str, obj_id: str, token: str):
"""Return (question_id, is_new). Erstellt question mit status=draft falls nicht vorhanden.
FLAG: answer_de, short_answer_de und level werden nicht mehr gespeichert (kein Feld in snakkimo).
"""
enc = urllib.parse.quote(question_de, safe="")
data, status = _snakkimo("GET", f"/questions?sentence_de={enc}&limit=1", token)
items = data.get("data") or []
if status == 200 and items:
first = items[0] if isinstance(items, list) else items
return first["id"], False
body = {"sentence_de": question_de}
data, status = _snakkimo("POST", "/questions", token, body)
if status in (200, 201):
item = data.get("data") or data
return item["id"], True
raise RuntimeError(f"Question creation failed ({status}): {data}")
# ── Generate & Publish endpoints ──────────────────────────────────────────────
@app.route("/api/object/<obj_id>/generate_questions", methods=["POST"])
def generate_questions(obj_id: str):
"""
1. Holt Objekt + Elternobjekt aus Directus
2. Füllt Prompt-Platzhalter
3. Ruft Claude Haiku auf
4. Speichert Wörter + Fragen als Entwurf in Directus
"""
token = request.headers.get("Authorization", "")
body = request.get_json(force=True, silent=True) or {}
prompt_template = (body.get("prompt") or "").strip()
if not prompt_template:
return jsonify({"error": "Missing prompt"}), 400
if not ANTHROPIC_API_KEY:
return jsonify({"error": "ANTHROPIC_API_KEY not configured on server"}), 500
# Objekt laden
obj_resp, s = _snakkimo("GET", f"/objects/{obj_id}", token)
if s != 200:
return jsonify({"error": "Object not found"}), 404
obj = obj_resp.get("data") or {}
if isinstance(obj, list):
obj = obj[0] if obj else {}
# Elternobjekt: snakkimo objects haben kein parent-Feld → übersprungen
parent_notes = ""
# Platzhalter ersetzen
prompt = (
prompt_template
.replace("{user-notes_object}", obj.get("notes") or obj.get("user_notes") or "")
.replace("{user-notes_parentobject}", parent_notes)
)
# Claude Haiku aufrufen
try:
client = _anthropic_sdk.Anthropic(api_key=ANTHROPIC_API_KEY)
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}],
)
raw = msg.content[0].text
except Exception as e:
return jsonify({"error": f"Claude API error: {e}"}), 500
# JSON parsen
try:
ai = json.loads(raw)
except Exception:
try:
start = raw.index("{")
end = raw.rindex("}") + 1
ai = json.loads(raw[start:end])
except Exception:
return jsonify({"error": "Invalid JSON from AI", "raw": raw[:400]}), 500
levels = ai.get("levels", [])
if not levels:
return jsonify({"error": "No levels in AI response"}), 500
stats = {
"words_created": 0,
"words_linked": 0,
"questions_created": 0,
"questions_linked": 0,
}
def _sanitize_word(raw: str) -> list[str]:
"""Zerlegt kommagetrennte Einträge und gibt nur echte Einzelwörter zurück."""
tokens = []
for part in raw.replace(";", ",").split(","):
part = part.strip().strip("\"'")
if not part:
continue
words_in_part = part.split()
if len(words_in_part) == 1:
tokens.append(words_in_part[0])
# Mehrwortige Einträge → überspringen (KI-Fehler)
return tokens
# Bestehende Verlinkungen laden
obj_detail, _ = _snakkimo("GET", f"/objects/{obj_id}", token)
obj_data = obj_detail.get("data") or {}
if isinstance(obj_data, list):
obj_data = obj_data[0] if obj_data else {}
existing_word_ids: set[str] = set(obj_data.get("word_ids") or [])
existing_question_ids: set[str] = set()
# Alle eindeutigen Wörter aus allen Leveln vorab sammeln und einmalig laden
all_words_by_level: dict[str, int] = {} # title_de → first level seen
for lvl in levels:
level = int(lvl.get("level") or 1)
raw_words = lvl.get("words", []) + lvl.get("distractor_words", []) + [lvl.get("short_answer", "")]
for raw in raw_words:
for w in _sanitize_word(str(raw)):
if w and w not in all_words_by_level:
all_words_by_level[w] = level
# Wörter einmalig anlegen / finden (globaler Cache über alle Level)
global_word_map: dict[str, str] = {} # titel_de → id
for w, lvl_num in all_words_by_level.items():
try:
wid, is_new = _find_or_create_word(w, lvl_num, token)
global_word_map[w] = wid
if wid not in existing_word_ids:
_snakkimo("POST", f"/objects/{obj_id}/words/{wid}", token)
existing_word_ids.add(wid)
stats["words_created" if is_new else "words_linked"] += 1
except Exception as e:
print(f"[generate_questions] word error '{w}': {e}")
new_question_links: list[dict] = []
for lvl in levels:
level = int(lvl.get("level") or 1)
q_de = (lvl.get("question") or "").strip()
a_de = (lvl.get("answer") or "").strip()
short_text = (lvl.get("short_answer") or "").strip()
words_list = [t for raw in lvl.get("words", []) for t in _sanitize_word(str(raw))]
distractor_list = [t for raw in lvl.get("distractor_words", []) for t in _sanitize_word(str(raw))]
if not q_de:
continue
short_answer_id = global_word_map.get(short_text) if short_text else None
# Frage anlegen / verknüpfen
try:
q_id, q_is_new = _find_or_create_question(q_de, a_de, level, short_answer_id, short_text, obj_id, token)
except Exception as e:
print(f"[generate_questions] question error level {level}: {e}")
continue
stats["questions_created" if q_is_new else "questions_linked"] += 1
# Frage anlegen — wird später mit Objekt verknüpft
existing_question_ids.add(q_id)
# FLAG: distractor_words und related_words kein Äquivalent in snakkimo → übersprungen
# Fragen mit Objekt verknüpfen
for q_id in existing_question_ids:
_snakkimo("POST", f"/objects/{obj_id}/pairs/{q_id}", token) # no-op if exists
print(f"[generate_questions] obj={obj_id} stats={stats}")
return jsonify({"ok": True, "object_id": obj_id, "stats": stats})
@app.route("/api/object/<obj_id>/publish_questions", methods=["POST"])
def publish_questions(obj_id: str):
"""Setzt Status aller verknüpften Fragen und Wörter auf 'published'."""
token = request.headers.get("Authorization", "")
# Wörter des Objekts laden
w_resp, _ = _snakkimo("GET", f"/objects/{obj_id}/words", token)
words = w_resp.get("data") or []
words = words if isinstance(words, list) else []
published_w = 0
for w in words:
_, s = _snakkimo("PATCH", f"/words/{w['id']}", token, {"status": "published"})
if s == 200:
published_w += 1
return jsonify({
"ok": True,
"published_questions": 0, # questions haben kein publish-Konzept mehr
"published_words": published_w,
})
@app.route("/api/object/<obj_id>/questions", methods=["GET"])
def get_object_questions_list(obj_id: str):
"""Gibt Pairs des Objekts zurück (snakkimo: pairs mit questions/statements)."""
token = request.headers.get("Authorization", "")
data, status = _snakkimo("GET", f"/objects/{obj_id}/pairs", token)
pairs = data.get("data") or []
pairs = pairs if isinstance(pairs, list) else []
# Normalize to expected shape
items = []
for p in pairs:
q = p.get("question") or {}
items.append({
"id": p["id"],
"question_de": q.get("sentence_de", ""),
"answer_de": "", # kein Äquivalent in snakkimo
"short_answer_de": "",
"level": p.get("difficulty_level") or 0,
"status": p.get("status", "draft"),
"distractor_words": [], # kein Äquivalent in snakkimo
})
return jsonify({"data": items})
@app.route("/api/object/<obj_id>/words", methods=["GET"])
def get_object_words_list(obj_id: str):
"""Gibt alle verknüpften Wörter eines Objekts zurück."""
token = request.headers.get("Authorization", "")
data, _ = _snakkimo("GET", f"/objects/{obj_id}/words", token)
words = data.get("data") or []
words = words if isinstance(words, list) else []
items = sorted(
[{"id": w["id"], "title_de": w.get("titel_de", ""), "titel_de": w.get("titel_de", ""),
"level": w.get("difficulty_level") or 50, "status": w.get("status", "")} for w in words
if w.get("status") != "blocked"],
key=lambda x: x.get("titel_de") or ""
)
return jsonify({"data": items})
def _delete_junction_rows(collection: str, field: str, value: str, token: str):
"""No-op: snakkimo löscht Junctions via CASCADE automatisch."""
pass
@app.route("/api/question/<q_id>", methods=["DELETE"])
def delete_question_item(q_id: str):
"""Löscht eine Frage via snakkimo API."""
token = request.headers.get("Authorization", "")
_, status = _snakkimo("DELETE", f"/questions/{q_id}", token)
if status in (200, 204):
return jsonify({"ok": True})
return jsonify({"error": "Delete failed"}), status
@app.route("/api/word/<w_id>", methods=["DELETE"])
def delete_word_item(w_id: str):
"""Löscht ein Wort via snakkimo API (CASCADE entfernt Junction-Zeilen automatisch)."""
token = request.headers.get("Authorization", "")
_, status = _snakkimo("DELETE", f"/words/{w_id}", token)
if status in (200, 204):
return jsonify({"ok": True})
return jsonify({"error": "Delete failed"}), status
@app.route("/api/object/<obj_id>/purge-orphans", methods=["POST"])
def purge_orphan_junctions(obj_id: str):
"""No-op: snakkimo verwendet CASCADE — keine verwaisten Junctions möglich."""
return jsonify({"ok": True, "orphans_removed": 0, "note": "not needed with snakkimo API"})
@app.route("/api/purge-all-orphans", methods=["POST"])
def purge_all_orphans():
"""No-op: snakkimo verwendet CASCADE — keine verwaisten Junctions möglich."""
return jsonify({"ok": True, "orphans_removed": 0, "note": "not needed with snakkimo API"})
@app.route("/api/fix-distractor-field", methods=["POST"])
def fix_distractor_field():
"""No-op: distractor_words existiert nicht in snakkimo."""
return jsonify({"ok": False, "note": "not supported in snakkimo API"})
@app.route("/api/setup-schema", methods=["POST"])
def setup_directus_schema():
"""No-op: Schema wird durch snakkimo API Migration verwaltet."""
return jsonify({"ok": True, "note": "not needed with snakkimo API"})
@app.route("/api/setup-words-pictures", methods=["POST"])
def setup_words_pictures():
"""No-op: M2M-Relationen werden durch snakkimo API verwaltet."""
return jsonify({"ok": True, "note": "not needed with snakkimo API"})
# ── db_* Collection Routes (snakkimo API) ────────────────────────────────────
# _find_or_create_db_word is identical to _find_or_create_word
_find_or_create_db_word = _find_or_create_word
@app.route("/api/directus/db-pictures", methods=["GET"])
def directus_db_pictures():
token = request.headers.get("Authorization", "")
pic_status = request.args.get("status", "uploaded")
# Directus used 'draft' — snakkimo uses 'uploaded'
status_map = {"draft": "uploaded"}
snakkimo_status = status_map.get(pic_status, pic_status)
data, status = _snakkimo("GET", f"/pictures?status={urllib.parse.quote(snakkimo_status)}&limit=500", token)
pics = data.get("data") or []
normalized = [
{
"id": p["id"],
"picture": p.get("picture_link"),
"blurhash": p.get("blurhash"),
"status": p.get("status"),
"design": p.get("design"),
}
for p in (pics if isinstance(pics, list) else [])
]
return jsonify({"data": normalized})
@app.route("/api/directus/db-pictures/design-options", methods=["GET"])
def directus_db_pictures_design_options():
"""Returns unique design values from the pictures table."""
token = request.headers.get("Authorization", "")
data, _ = _snakkimo("GET", "/pictures?limit=500", token)
pics = data.get("data") or []
seen = set()
choices = []
for p in (pics if isinstance(pics, list) else []):
d = p.get("design")
if d and d not in seen:
seen.add(d)
choices.append({"text": d, "value": d})
return jsonify({"choices": sorted(choices, key=lambda x: x["value"])})
@app.route("/api/directus/db-pictures/<pic_id>", methods=["PATCH", "DELETE"])
def directus_db_picture(pic_id):
token = request.headers.get("Authorization", "")
if request.method == "PATCH":
body = request.get_json(force=True, silent=True) or {}
# remap 'picture' → 'picture_link'
if "picture" in body and "picture_link" not in body:
body["picture_link"] = body.pop("picture")
data, status = _snakkimo("PATCH", f"/pictures/{pic_id}", token, body)
return jsonify(data), status
# DELETE — snakkimo handles S3 cleanup automatically
_, del_status = _snakkimo("DELETE", f"/pictures/{pic_id}", token)
if del_status in (200, 204):
return jsonify({}), 204
return jsonify({"error": f"Delete failed (status {del_status})"}), del_status
@app.route("/api/directus/db-objects", methods=["GET", "POST"])
def directus_db_objects():
token = request.headers.get("Authorization", "")
if request.method == "GET":
picture_id = request.args.get("picture_id", "")
path = (f"/objects?picture_id={urllib.parse.quote(picture_id)}&limit=500"
if picture_id else "/objects?limit=500")
data, status = _snakkimo("GET", path, token)
objs = data.get("data") or []
normalized = []
for o in (objs if isinstance(objs, list) else []):
pic_ids = o.get("picture_ids") or []
normalized.append({
"id": o["id"],
"selections": o.get("selections"),
"user_notes": o.get("notes"),
"status": o.get("status"),
"picture": pic_ids[0] if pic_ids else None,
})
return jsonify({"data": normalized})
else:
body = request.get_json(force=True, silent=True) or {}
if "user_notes" in body and "notes" not in body:
body["notes"] = body.pop("user_notes")
data, status = _snakkimo("POST", "/objects", token, body)
return jsonify(data), status
@app.route("/api/directus/db-objects/<obj_id>", methods=["PATCH", "DELETE"])
def directus_db_object(obj_id):
token = request.headers.get("Authorization", "")
if request.method == "PATCH":
body = request.get_json(force=True, silent=True) or {}
if "user_notes" in body and "notes" not in body:
body["notes"] = body.pop("user_notes")
data, status = _snakkimo("PATCH", f"/objects/{obj_id}", token, body)
else:
data, status = _snakkimo("DELETE", f"/objects/{obj_id}", token)
return jsonify(data), status
@app.route("/api/directus/db-pictures/<pic_id>/words", methods=["GET", "POST"])
def directus_db_picture_words(pic_id):
token = request.headers.get("Authorization", "")
if request.method == "GET":
data, s = _snakkimo("GET", f"/pictures/{pic_id}/words", token)
if s != 200:
return jsonify({"data": []})
words = data.get("data") or []
items = []
for w in (words if isinstance(words, list) else []):
if w.get("status") == "blocked":
continue
items.append({
"junction_id": w["id"], # word_id used as junction_id for deletes
"word_id": w["id"],
"titel_de": w.get("titel_de", ""),
"level": w.get("difficulty_level") or 50,
"status": w.get("status", ""),
})
return jsonify({"data": items})
else:
body = request.get_json(force=True, silent=True) or {}
words_to_save = body.get("words", [])
existing_data, _ = _snakkimo("GET", f"/pictures/{pic_id}/words", token)
existing_ids = {w["id"] for w in (existing_data.get("data") or []) if isinstance(w, dict)}
saved = 0
for entry in words_to_save:
titel_de = (entry.get("titel_de") or "").strip()
level = int(entry.get("level") or 50)
if not titel_de:
continue
try:
wid, is_new = _find_or_create_word(titel_de, level, token)
if not is_new:
_snakkimo("PATCH", f"/words/{wid}", token, {"difficulty_level": level})
if wid not in existing_ids:
_snakkimo("POST", f"/words/{wid}/pictures/{pic_id}", token)
existing_ids.add(wid)
saved += 1
except Exception as e:
print(f"[db_picture_words] error for '{titel_de}': {e}")
return jsonify({"ok": True, "saved": saved})
@app.route("/api/directus/db-pictures/<pic_id>/words/<junction_id>", methods=["DELETE"])
def directus_db_picture_word_delete(pic_id, junction_id):
# junction_id == word_id (set equal in GET above)
token = request.headers.get("Authorization", "")
_snakkimo("DELETE", f"/pictures/{pic_id}/words/{junction_id}", token)
return jsonify({"ok": True})
@app.route("/api/directus/db-objects/<obj_id>/pairs", methods=["GET", "POST"])
def directus_db_object_pairs(obj_id):
token = request.headers.get("Authorization", "")
if request.method == "GET":
data, _ = _snakkimo("GET", f"/objects/{obj_id}/pairs", token)
pairs = data.get("data") or []
pairs = pairs if isinstance(pairs, list) else []
# Collect all word IDs for bulk fetch
all_word_ids = set()
for p in pairs:
for stmt_key in ("positive_statement", "negative_statement"):
s = p.get(stmt_key) or {}
for wid in (s.get("positive_word_ids") or []):
all_word_ids.add(wid)
for wid in (s.get("negative_word_ids") or []):
all_word_ids.add(wid)
word_map = {}
for wid in all_word_ids:
wd, ws = _snakkimo("GET", f"/words/{wid}", token)
if ws == 200:
w = wd.get("data") or wd
if isinstance(w, dict) and w.get("id"):
word_map[wid] = w
result = []
for pair in pairs:
pid = pair["id"]
q = pair.get("question") or {}
pos_stmt = pair.get("positive_statement") or {}
questions = []
if q.get("id"):
questions.append({
"id": q["id"],
"question_de": q.get("sentence_de", ""),
"level": pair.get("difficulty_level") or 0,
"status": q.get("status", ""),
"words": [], # FLAG: question words not supported in snakkimo
})
statements = []
if pos_stmt.get("id"):
pos_word_ids = pos_stmt.get("positive_word_ids") or []
stmt_words = []
for wid in pos_word_ids:
w = word_map.get(wid, {})
stmt_words.append({
"junction_id": wid,
"word_id": wid,
"titel_de": w.get("titel_de", ""),
"level": w.get("difficulty_level") or 50,
})
statements.append({
"id": pos_stmt["id"],
"statement_de": pos_stmt.get("positive_sentence_de", ""),
"level": pair.get("difficulty_level") or 0,
"status": pos_stmt.get("status", ""),
"words": stmt_words,
})
result.append({
"id": pid,
"status": pair.get("status", "draft"),
"level": pair.get("difficulty_level") or 0,
"questions": questions,
"statements": statements,
})
return jsonify({"data": result})
else:
body = request.get_json(force=True, silent=True) or {}
question_de = (body.get("question_de") or "").strip()
statement_de = (body.get("statement_de") or "").strip()
level = int(body.get("level") or 1)
words = body.get("words", [])
if not statement_de:
return jsonify({"error": "statement_de is required"}), 400
# Create statement
stmt_resp, s = _snakkimo("POST", "/statements", token, {"positive_sentence_de": statement_de})
if s not in (200, 201):
return jsonify({"error": "Failed to create statement"}), 500
stmt_data = stmt_resp.get("data") or stmt_resp
stmt_id = stmt_data["id"]
# Create question if provided
q_id = None
if question_de:
q_resp, s = _snakkimo("POST", "/questions", token, {"sentence_de": question_de})
if s in (200, 201):
q_data = q_resp.get("data") or q_resp
q_id = q_data["id"]
# Create pair linking statement + question
pair_body = {"answer_type": "yes_no", "difficulty_level": level, "positive_statement_id": stmt_id}
if q_id:
pair_body["question_id"] = q_id
pair_resp, s = _snakkimo("POST", "/pairs", token, pair_body)
if s not in (200, 201):
return jsonify({"error": "Failed to create pair"}), 500
pair_data = pair_resp.get("data") or pair_resp
pair_id = pair_data["id"]
# Link pair to object
_snakkimo("POST", f"/objects/{obj_id}/pairs/{pair_id}", token)
# Add words to statement (FLAG: linking words to questions not supported)
for we in words:
titel_de = (we.get("titel_de") or "").strip()
w_level = int(we.get("level") or level)
link_to = we.get("link_to", "both")
if not titel_de:
continue
try:
wid, _ = _find_or_create_word(titel_de, w_level, token)
if link_to in ("statement", "both"):
_snakkimo("POST", f"/statements/{stmt_id}/positive-words/{wid}", token)
except Exception as e:
print(f"[db_object_pairs] word error '{titel_de}': {e}")
return jsonify({"ok": True, "pair_id": pair_id, "statement_id": stmt_id, "question_id": q_id})
@app.route("/api/directus/db-objects/<obj_id>/words", methods=["GET", "POST"])
def directus_db_object_words(obj_id):
"""Words linked to an object via snakkimo API."""
token = request.headers.get("Authorization", "")
if request.method == "GET":
data, s = _snakkimo("GET", f"/objects/{obj_id}/words", token)
if s != 200:
return jsonify({"data": []})
words = data.get("data") or []
items = []
for w in (words if isinstance(words, list) else []):
if w.get("status") == "blocked":
continue
items.append({
"junction_id": w["id"], # word_id used as junction_id for deletes
"word_id": w["id"],
"titel_de": w.get("titel_de", ""),
"level": w.get("difficulty_level") or 50,
"status": w.get("status", ""),
})
return jsonify({"data": items})
else:
body = request.get_json(force=True, silent=True) or {}
titel_de = (body.get("titel_de") or "").strip()
level = int(body.get("level") or 50)
if not titel_de:
return jsonify({"error": "titel_de required"}), 400
wid, _ = _find_or_create_word(titel_de, level, token)
existing_data, _ = _snakkimo("GET", f"/objects/{obj_id}/words", token)
existing_ids = {w["id"] for w in (existing_data.get("data") or []) if isinstance(w, dict)}
if wid in existing_ids:
return jsonify({"ok": True, "already_exists": True, "word_id": wid, "junction_id": wid})
_snakkimo("POST", f"/objects/{obj_id}/words/{wid}", token)
return jsonify({"ok": True, "word_id": wid, "junction_id": wid})
@app.route("/api/directus/db-objects/<obj_id>/words/<junction_id>", methods=["DELETE"])
def directus_db_object_word_delete(obj_id, junction_id):
token = request.headers.get("Authorization", "")
_snakkimo("DELETE", f"/objects/{obj_id}/words/{junction_id}", token)
return jsonify({"ok": True})
@app.route("/api/directus/db-pairs/<pair_id>", methods=["PATCH", "DELETE"])
def directus_db_pair(pair_id):
"""PATCH: update difficulty_level, statement_de, question_de, words.
DELETE: delete pair + linked statement + question."""
token = request.headers.get("Authorization", "")
if request.method == "PATCH":
body = request.get_json(force=True, silent=True) or {}
# Fetch current pair to get FK IDs
pair_data, ps = _snakkimo("GET", f"/pairs/{pair_id}", token)
if ps != 200:
return jsonify({"error": "Pair not found"}), 404
pair = pair_data.get("data") or pair_data
stmt_id = pair.get("positive_statement_id")
q_id = pair.get("question_id")
if "level" in body:
_snakkimo("PATCH", f"/pairs/{pair_id}", token, {"difficulty_level": body["level"]})
if "statement_de" in body and stmt_id:
_snakkimo("PATCH", f"/statements/{stmt_id}", token,
{"positive_sentence_de": body["statement_de"]})
if "question_de" in body:
question_de = (body["question_de"] or "").strip()
if q_id and question_de:
_snakkimo("PATCH", f"/questions/{q_id}", token, {"sentence_de": question_de})
elif q_id and not question_de:
# Remove question from pair then delete it
_snakkimo("PATCH", f"/pairs/{pair_id}", token, {"question_id": None})
_snakkimo("DELETE", f"/questions/{q_id}", token)
q_id = None
elif not q_id and question_de:
q_resp, s = _snakkimo("POST", "/questions", token, {"sentence_de": question_de})
if s in (200, 201):
q_data = q_resp.get("data") or q_resp
q_id = q_data["id"]
_snakkimo("PATCH", f"/pairs/{pair_id}", token, {"question_id": q_id})
# Reload stmt_id in case it changed
if not stmt_id:
pair_data2, _ = _snakkimo("GET", f"/pairs/{pair_id}", token)
pair2 = pair_data2.get("data") or pair_data2
stmt_id = pair2.get("positive_statement_id")
for we in body.get("words_add", []):
titel_de = (we.get("titel_de") or "").strip()
w_level = int(we.get("level") or 50)
link_to = we.get("link_to", "both")
if not titel_de:
continue
try:
wid, _ = _find_or_create_word(titel_de, w_level, token)
if link_to in ("statement", "both") and stmt_id:
_snakkimo("POST", f"/statements/{stmt_id}/positive-words/{wid}", token)
# FLAG: linking words to questions not supported in snakkimo
except Exception as e:
print(f"[db_pair_patch] word error '{titel_de}': {e}")
for wr in body.get("words_remove", []):
junction_id = wr.get("junction_id")
link_to = wr.get("link_to", "both")
if not junction_id:
continue
if link_to in ("statement", "both") and stmt_id:
_snakkimo("DELETE", f"/statements/{stmt_id}/positive-words/{junction_id}", token)
# FLAG: removing words from questions not supported in snakkimo
return jsonify({"ok": True})
else: # DELETE
pair_data, ps = _snakkimo("GET", f"/pairs/{pair_id}", token)
pair = (pair_data.get("data") or pair_data) if ps == 200 else {}
stmt_id = pair.get("positive_statement_id")
q_id = pair.get("question_id")
# CASCADE removes object_pairs junction automatically
_, s = _snakkimo("DELETE", f"/pairs/{pair_id}", token)
if stmt_id:
_snakkimo("DELETE", f"/statements/{stmt_id}", token)
if q_id:
_snakkimo("DELETE", f"/questions/{q_id}", token)
return jsonify({"ok": s in (200, 204)})
@app.route("/api/directus/db-words/search", methods=["GET"])
def directus_db_words_search():
"""Schlanke Suche in db_words für Autocomplete.
Liefert id, titel_de, level — case-insensitive contains, sortiert alphabetisch."""
token = request.headers.get("Authorization", "")
q = (request.args.get("q") or "").strip()
try:
limit = max(1, min(int(request.args.get("limit", "10")), 50))
except ValueError:
limit = 10
if not q:
return jsonify({"data": []}), 200
qs = (
"fields=id,titel_de,level"
f"&filter[titel_de][_icontains]={urllib.parse.quote(q)}"
"&sort=titel_de"
f"&limit={limit}"
)
data, status = _directus("GET", f"/items/db_words?{qs}", token)
return jsonify({"data": data.get("data") or []}), status
# =====================================================
# CONTENT-MANAGEMENT DASHBOARD (admin)
# =====================================================
# Allowlist: nur diese Collections sind über die generischen
# Management-Endpoints erreichbar. Erweiterbar.
DASHBOARD_COLLECTIONS = [
{"name": "db_pictures", "label": "Bilder", "kind": "image", "group": "neu",
"fields": "id,media,status,date_created,design", "preview": "media", "title_field": None},
{"name": "db_objects", "label": "Objekte", "kind": "image", "group": "neu",
"fields": "id,picture,status,date_created,user_notes", "preview": None, "title_field": "user_notes"},
{"name": "db_words", "label": "Wörter", "kind": "text", "group": "neu",
"fields": "id,titel_de,level,status,date_created", "preview": None, "title_field": "titel_de"},
{"name": "db_question", "label": "Fragen", "kind": "text", "group": "neu",
"fields": "id,question_de,level,status,date_created", "preview": None, "title_field": "question_de"},
{"name": "db_statement", "label": "Statements", "kind": "text", "group": "neu",
"fields": "id,statement_de,level,status,date_created", "preview": None, "title_field": "statement_de"},
{"name": "db_pairs", "label": "Q&A-Paare", "kind": "text", "group": "neu",
"fields": "id,level,status,date_created", "preview": None, "title_field": None},
]
DASHBOARD_BY_NAME = {c["name"]: c for c in DASHBOARD_COLLECTIONS}
def _collection_config_or_404(name):
cfg = DASHBOARD_BY_NAME.get(name)
if not cfg:
return None, (jsonify({"error": "Unbekannte Collection"}), 404)
return cfg, None
@app.route("/api/directus/dashboard/summary", methods=["GET"])
def directus_dashboard_summary():
"""Liefert pro Collection Total + Status-Counts. Admin-Token vom User."""
token = request.headers.get("Authorization", "")
out = []
for cfg in DASHBOARD_COLLECTIONS:
entry = {
"name": cfg["name"],
"label": cfg["label"],
"kind": cfg["kind"],
"group": cfg["group"],
"total": 0,
"by_status": [],
"has_status": False,
"error": None,
}
# 1) Total-Count via aggregate
data, status = _directus("GET",
f"/items/{cfg['name']}?aggregate[count]=*", token)
if status == 200:
try:
entry["total"] = int(((data.get("data") or [{}])[0] or {}).get("count", 0) or 0)
except (TypeError, ValueError):
entry["total"] = 0
else:
entry["error"] = (data.get("errors") or [{}])[0].get("message") or f"HTTP {status}"
# 2) Status-Groups (kann fehlschlagen, wenn Feld fehlt → ignorieren)
data2, status2 = _directus("GET",
f"/items/{cfg['name']}?aggregate[count]=*&groupBy[]=status&limit=-1", token)
if status2 == 200:
rows = data2.get("data") or []
grouped = []
for r in rows:
if not isinstance(r, dict):
continue
grouped.append({
"status": r.get("status"),
"count": int((r.get("count") or {}).get("id", 0) if isinstance(r.get("count"), dict) else r.get("count") or 0),
})
grouped.sort(key=lambda x: -x["count"])
entry["by_status"] = grouped
entry["has_status"] = any(g.get("status") is not None for g in grouped)
out.append(entry)
return jsonify({"data": out}), 200
@app.route("/api/directus/collection/<name>", methods=["GET"])
def directus_collection_list(name):
"""Generische Liste mit optionalem Status-Filter + Pagination."""
cfg, err = _collection_config_or_404(name)
if err:
return err
token = request.headers.get("Authorization", "")
status_filter = request.args.get("status", "").strip()
limit = request.args.get("limit", "50")
offset = request.args.get("offset", "0")
sort = request.args.get("sort", "-date_created")
parts = [f"fields={cfg['fields']}", f"limit={limit}", f"offset={offset}", f"sort={sort}"]
if status_filter:
parts.append(f"filter[status][_eq]={urllib.parse.quote(status_filter)}")
query = "&".join(parts)
data, status = _directus("GET", f"/items/{name}?{query}", token)
# Total für aktuellen Filter
count_parts = ["aggregate[count]=*"]
if status_filter:
count_parts.append(f"filter[status][_eq]={urllib.parse.quote(status_filter)}")
count_data, _ = _directus("GET", f"/items/{name}?{'&'.join(count_parts)}", token)
total = 0
try:
total = int(((count_data.get("data") or [{}])[0] or {}).get("count", 0) or 0)
except (TypeError, ValueError):
pass
return jsonify({
"data": data.get("data") or [],
"meta": {"total": total, "limit": int(limit), "offset": int(offset)},
"collection": {
"name": cfg["name"],
"label": cfg["label"],
"kind": cfg["kind"],
"preview": cfg.get("preview"),
"title_field": cfg.get("title_field"),
},
}), status
@app.route("/api/directus/collection/<name>/<item_id>", methods=["GET", "PATCH"])
def directus_collection_item(name, item_id):
"""Eintrag laden oder aktualisieren."""
cfg, err = _collection_config_or_404(name)
if err:
return err
token = request.headers.get("Authorization", "")
if request.method == "GET":
data, status = _directus("GET", f"/items/{name}/{item_id}", token)
return jsonify({
"data": data.get("data"),
"collection": {
"name": cfg["name"],
"label": cfg["label"],
"kind": cfg["kind"],
"preview": cfg.get("preview"),
"title_field": cfg.get("title_field"),
},
}), status
# PATCH
body = request.get_json(force=True, silent=True) or {}
data, status = _directus("PATCH", f"/items/{name}/{item_id}", token, body)
return jsonify(data), status
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000, debug=True)