from pathlib import Path from datetime import datetime from uuid import uuid4 import json import os import urllib.request import urllib.error import urllib.parse from flask import Flask, send_from_directory, request, jsonify from flask_cors import CORS from PIL import Image import ollama import anthropic as _anthropic_sdk ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") SNAKKIMO_URL = os.environ.get("SNAKKIMO_URL", "https://hyggecraftery.com/api/snakkimo/api") SNAKKIMO_TOKEN = os.environ.get("SNAKKIMO_TOKEN", "") BASE_DIR = Path(__file__).resolve().parent PICTURES_DIR = BASE_DIR / "pictures" OBJECTS_DIR = BASE_DIR / "objects_image" SENTENCE_DIR = BASE_DIR / "sentence_object" PROMPTS_DIR = BASE_DIR / "prompts" app = Flask( __name__, template_folder=str(BASE_DIR / "templates"), static_folder=str(BASE_DIR / "static"), ) CORS(app) def read_prompt(filepath: Path, fallback: str) -> str: """ Hilfsfunktion, um Prompt-Dateien zu lesen. """ try: if filepath.exists(): return filepath.read_text(encoding="utf-8").strip() except Exception: pass return fallback.strip() def _snakkimo(method, path, token=None, body=None): """API-Aufruf gegen snakkimo-API. Gibt (data, status) zurück. Antwortet im Directus-kompatiblen Format: {"data": ...} für Listen/Objekte. """ auth = token or SNAKKIMO_TOKEN # Wenn kein "Bearer " Prefix, füge ihn hinzu if auth and not auth.startswith("Bearer "): auth = f"Bearer {auth}" headers = {"Content-Type": "application/json"} if auth: headers["Authorization"] = auth req = urllib.request.Request( f"{SNAKKIMO_URL}{path}", data=json.dumps(body).encode() if body is not None else None, headers=headers, method=method, ) try: with urllib.request.urlopen(req) as resp: raw = resp.read().decode("utf-8") if not raw: return {}, resp.status parsed = json.loads(raw) # Wrapping in {"data": ...} für Directus-Kompatibilität if isinstance(parsed, (list, dict)): return {"data": parsed}, resp.status return parsed, resp.status except urllib.error.HTTPError as e: raw = e.read().decode("utf-8") return json.loads(raw) if raw else {}, e.code # Legacy-Alias – wird schrittweise entfernt def _directus(method, path, token=None, body=None): return _snakkimo(method, path, token, body) @app.route("/api/directus/auth/login", methods=["POST"]) def directus_auth_login(): """Proxy: Login über snakkimo-API.""" body = request.get_json(force=True, silent=True) or {} # Directus-Format: {"email": ..., "password": ...} → snakkimo gleich req_body = {"email": body.get("email", ""), "password": body.get("password", "")} req = urllib.request.Request( f"{SNAKKIMO_URL.replace('/api', '')}/auth/login", data=json.dumps(req_body).encode(), headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req) as resp: parsed = json.loads(resp.read().decode("utf-8")) # Directus-kompatibles Format: {"data": {"access_token": ...}} token = parsed.get("token", "") return jsonify({"data": {"access_token": token, "user": parsed.get("user", {})}}), 200 except urllib.error.HTTPError as e: return jsonify(json.loads(e.read().decode("utf-8"))), e.code @app.route("/api/directus/users/me", methods=["GET"]) def directus_users_me(): """Proxy: aktueller User inkl. Rolle (für Begrüßung + Admin-Check).""" token = request.headers.get("Authorization", "") fields = "id,first_name,last_name,email,role.id,role.name,role.admin_access" data, status = _directus("GET", f"/users/me?fields={fields}", token) return jsonify(data), status @app.route("/api/directus/pictures", methods=["GET"]) def directus_pictures(): """Proxy: Bilder nach Status filtern.""" token = request.headers.get("Authorization", "") pic_status = request.args.get("status", "uploaded") data, status = _snakkimo("GET", f"/pictures?status={pic_status}&limit=500", token) # Normalize to Directus-compatible shape: id, media→picture_link, status items = data.get("data") if isinstance(data.get("data"), list) else [] normalized = [{"id": p["id"], "media": p.get("picture_link", ""), "status": p.get("status", ""), "design": p.get("design", "")} for p in items] return jsonify({"data": normalized}), status @app.route("/api/directus/pictures/", methods=["PATCH"]) def directus_picture(pic_id): """Proxy: Bild aktualisieren.""" token = request.headers.get("Authorization", "") data, status = _snakkimo("PATCH", f"/pictures/{pic_id}", token, body=request.get_json()) return jsonify(data), status @app.route("/api/directus/objects", methods=["GET", "POST"]) def directus_objects(): """Proxy: Objekte laden (GET) oder anlegen (POST).""" token = request.headers.get("Authorization", "") if request.method == "GET": picture_id = request.args.get("picture_id", "") path = f"/objects?picture_id={picture_id}&limit=500" if picture_id else "/objects?limit=500" data, status = _snakkimo("GET", path, token) # Normalize: selections, notes→user_notes items = data.get("data") if isinstance(data.get("data"), list) else [] normalized = [{"id": o["id"], "selections": o.get("selections"), "user_notes": o.get("notes", ""), "status": o.get("status", "")} for o in items] return jsonify({"data": normalized}), status else: body = request.get_json(force=True, silent=True) or {} # Map user_notes → notes mapped = {"selections": body.get("selections"), "notes": body.get("user_notes") or body.get("notes")} data, status = _snakkimo("POST", "/objects", token, body=mapped) return jsonify(data), status @app.route("/api/directus/objects/", methods=["PATCH", "DELETE"]) def directus_object(obj_id): """Proxy: Objekt aktualisieren (PATCH) oder löschen (DELETE).""" token = request.headers.get("Authorization", "") if request.method == "PATCH": body = request.get_json(force=True, silent=True) or {} mapped = {} if "user_notes" in body: mapped["notes"] = body["user_notes"] if "notes" in body: mapped["notes"] = body["notes"] if "status" in body: mapped["status"] = body["status"] if "selections" in body: mapped["selections"] = body["selections"] data, status = _snakkimo("PATCH", f"/objects/{obj_id}", token, body=mapped) else: data, status = _snakkimo("DELETE", f"/objects/{obj_id}", token) return jsonify(data), status def _setup_words_pictures(token: str): """No-op: snakkimo verwaltet M2M-Relationen intern.""" pass @app.route("/api/directus/pictures//words", methods=["GET", "POST"]) def directus_picture_words(pic_id): """Proxy: Wörter eines Bildes laden (GET) oder speichern (POST).""" token = request.headers.get("Authorization", "") if request.method == "GET": data, status = _snakkimo("GET", f"/pictures/{pic_id}/words", token) if status != 200: return jsonify({"data": []}) words = data.get("data") if isinstance(data.get("data"), list) else [] items = [ { "junction_id": w["id"], # word_id als junction_id für DELETE-Kompatibilität "word_id": w["id"], "titel_de": w.get("titel_de", ""), "title_de": w.get("titel_de", ""), # legacy alias "level": w.get("difficulty_level") or 50, "status": w.get("status", ""), } for w in words if w.get("status") != "blocked" ] return jsonify({"data": items}) else: # POST — find-or-create words and link body = request.get_json(force=True, silent=True) or {} words_to_save = body.get("words", []) # Bereits verknüpfte Word-IDs laden existing_data, _ = _snakkimo("GET", f"/pictures/{pic_id}/words", token) existing_ids = {w["id"] for w in (existing_data.get("data") or []) if isinstance(w, dict)} saved = 0 for entry in words_to_save: titel_de = (entry.get("titel_de") or entry.get("title_de") or "").strip() level = int(entry.get("level") or 50) if not titel_de: continue try: wid, is_new = _find_or_create_word(titel_de, level, token) if not is_new: _snakkimo("PATCH", f"/words/{wid}", token, {"difficulty_level": level}) if wid not in existing_ids: _snakkimo("POST", f"/words/{wid}/pictures/{pic_id}", token) existing_ids.add(wid) saved += 1 except Exception as e: print(f"[picture_words] error for '{titel_de}': {e}") return jsonify({"ok": True, "saved": saved}) @app.route("/api/images", methods=["GET"]) def list_images(): """ Gibt alle Bilder im pictures-Ordner zurück. Query-Parameter: ?mode=draw (Standard) oder ?mode=generate draw → Bilder ohne _saved-Suffix generate → Bilder mit _saved-Suffix """ mode = request.args.get("mode", "draw") PICTURES_DIR.mkdir(parents=True, exist_ok=True) if mode == "generate": image_paths = sorted( [f for f in PICTURES_DIR.iterdir() if f.is_file() and f.stem.endswith("_saved")], key=lambda p: p.stat().st_mtime, ) else: image_paths = sorted( [f for f in PICTURES_DIR.iterdir() if f.is_file() and not f.stem.endswith("_saved")], key=lambda p: p.stat().st_mtime, ) return jsonify({"images": [p.name for p in image_paths]}) REACT_BUILD_DIR = BASE_DIR / "static" / "react" def _serve_spa(): """Liefert die React SPA index.html für alle Frontend-Routen.""" return send_from_directory(REACT_BUILD_DIR, "index.html") @app.route("/") @app.route("/draw") @app.route("/generate") @app.route("/annotate") def serve_spa(): return _serve_spa() @app.route("/assets/") def serve_react_assets(filename: str): """Liefert JS/CSS-Assets aus dem Vite-Build.""" return send_from_directory(REACT_BUILD_DIR / "assets", filename) @app.route("/pictures/") def serve_picture(filename: str): # Statisches Ausliefern der Originalbilder return send_from_directory(PICTURES_DIR, filename) @app.route("/objects_image/") def serve_object_image(filename: str): # Ausliefern der gespeicherten Ausschnitt-Bilder return send_from_directory(OBJECTS_DIR, filename) @app.route("/api/objects", methods=["GET"]) def list_objects_for_image(): """ Gibt alle gespeicherten Objekte (Ausschnitte) zu einem Quellbild zurück. Query-Parameter: ?filename=testbild.png """ filename = request.args.get("filename") if not filename: return jsonify({"error": "Missing filename query parameter"}), 400 # Für *_saved-Bilder auch Objekte berücksichtigen, deren source_filename # noch auf den ursprünglichen Namen ohne "_saved" zeigt (Kompatibilität) base_filename = filename legacy_filename = None if base_filename.endswith(".png") or base_filename.endswith(".jpg") or base_filename.endswith(".jpeg") or base_filename.endswith(".webp"): stem = base_filename.rsplit(".", 1)[0] suffix = base_filename.rsplit(".", 1)[1] if stem.endswith("_saved"): legacy_filename = f"{stem[:-6]}.{suffix}" OBJECTS_DIR.mkdir(parents=True, exist_ok=True) objects = [] for meta_file in OBJECTS_DIR.glob("*.txt"): try: meta = json.loads(meta_file.read_text(encoding="utf-8")) except Exception: continue src_name = meta.get("source_filename") if src_name != filename and (legacy_filename is None or src_name != legacy_filename): continue objects.append( { "id": meta.get("id"), "image_file": meta.get("image_file"), "title_de": meta.get("title_de", ""), "position_de": meta.get("position_de", ""), "action_de": meta.get("action_de", ""), "condition_de": meta.get("condition_de", ""), # Von KI erzeugte Details (optional) "label_en": meta.get("label_en"), "label_de": meta.get("label_de"), "label_se": meta.get("label_se"), "color_en": meta.get("color_en"), "adjective_en": meta.get("adjective_en"), "action_verb_en": meta.get("action_verb_en"), "preposition_en": meta.get("preposition_en"), "relative_position_en": meta.get("relative_position_en"), "season_en": meta.get("season_en"), "mode": meta.get("mode"), "bbox": meta.get("bbox"), "polygon": meta.get("polygon"), "hierarchy": meta.get("hierarchy", 1), "parent_id": meta.get("parent_id"), "created_at": meta.get("created_at"), } ) # Älteste zuerst sortieren (1 = ältestes Objekt) objects.sort(key=lambda o: o.get("created_at") or "") # Laufende Nummer je Bild (1..n) nach obiger Sortierung for idx, obj in enumerate(objects, start=1): obj["index"] = idx return jsonify({"objects": objects}) @app.route("/api/object//hierarchy", methods=["POST"]) def update_object_hierarchy(obj_id: str): """ Aktualisiert die Hierarchie (1, 2 oder 3) eines gespeicherten Objekts. """ data = request.get_json(force=True, silent=True) or {} try: hierarchy = int(data.get("hierarchy")) except (TypeError, ValueError): return jsonify({"error": "Invalid hierarchy"}), 400 if hierarchy not in (1, 2, 3): return jsonify({"error": "Hierarchy must be 1, 2 or 3"}), 400 meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 meta["hierarchy"] = hierarchy meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({"id": obj_id, "hierarchy": hierarchy}) @app.route("/api/object/", methods=["POST"]) def update_object_meta(obj_id: str): """ Aktualisiert Text-Metadaten (title_de, position_de, action_de, condition_de) eines gespeicherten Objekts. """ data = request.get_json(force=True, silent=True) or {} meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 for key in ("title_de", "position_de", "action_de", "condition_de"): if key in data: meta[key] = data.get(key, "") meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({k: meta.get(k) for k in ("id", "title_de", "position_de", "action_de", "condition_de")}) @app.route("/api/object//parent", methods=["POST"]) def update_object_parent(obj_id: str): """ Aktualisiert die Parent-Relation eines Objekts. Erwartet JSON: { "parent_id": "" | null } """ data = request.get_json(force=True, silent=True) or {} parent_id = data.get("parent_id") meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 # Optional: prüfen, ob parent_id existiert und zum selben Bild gehört if parent_id: parent_meta_path = OBJECTS_DIR / f"{parent_id}.txt" if not parent_meta_path.exists(): return jsonify({"error": "Parent object not found"}), 400 try: parent_meta = json.loads(parent_meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read parent meta file"}), 500 if parent_meta.get("source_filename") != meta.get("source_filename"): return jsonify({"error": "Parent must belong to same source image"}), 400 meta["parent_id"] = parent_id or None meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({"id": obj_id, "parent_id": meta.get("parent_id")}) @app.route("/api/object//generate_details", methods=["POST"]) def generate_object_details(obj_id: str): """Llama entfernt — lokale KI nicht mehr verfügbar.""" return jsonify({"error": "Lokale KI (Llama) wurde entfernt. Diese Funktion steht nicht mehr zur Verfügung."}), 501 def load_sentences_for_object(obj_id: str): """ Lädt alle gespeicherten Sätze für ein Objekt aus sentence_object/.txt. Rückgabe: Liste von Dicts mit mindestens question_en, answer_en, created_at. """ SENTENCE_DIR.mkdir(parents=True, exist_ok=True) sentence_path = SENTENCE_DIR / f"{obj_id}.txt" if not sentence_path.exists(): return [] try: data = json.loads(sentence_path.read_text(encoding="utf-8")) if isinstance(data, list): return data if isinstance(data, dict): # Fallback: einzelnes Objekt in Liste wrappen return [data] return [] except Exception: return [] @app.route("/api/object//sentences", methods=["GET"]) def get_object_sentences(obj_id: str): """ Gibt alle bisher erzeugten KI-Sätze zu einem Objekt zurück. """ sentences = load_sentences_for_object(obj_id) return jsonify({"object_id": obj_id, "sentences": sentences}) @app.route("/api/object//generate_sentence", methods=["POST"]) def generate_object_sentence(obj_id: str): """Llama entfernt — lokale KI nicht mehr verfügbar.""" return jsonify({"error": "Lokale KI (Llama) wurde entfernt. Diese Funktion steht nicht mehr zur Verfügung."}), 501 @app.route("/api/image/save", methods=["POST"]) def save_image(): """ Markiert ein Bild und alle zugehörigen Objekte als 'gespeichert', indem der Dateiname um '_saved' erweitert wird und die Metadaten der Objekte angepasst werden. """ data = request.get_json(force=True, silent=True) or {} filename = data.get("filename") if not filename: return jsonify({"error": "Missing filename"}), 400 src_path = PICTURES_DIR / filename if not src_path.exists(): return jsonify({"error": "Image not found"}), 404 if src_path.stem.endswith("_saved"): return jsonify({"error": "Image already saved"}), 400 new_name = f"{src_path.stem}_saved{src_path.suffix}" dst_path = PICTURES_DIR / new_name # Bild umbenennen src_path.rename(dst_path) # Zugehörige Objekte aktualisieren OBJECTS_DIR.mkdir(parents=True, exist_ok=True) for meta_file in OBJECTS_DIR.glob("*.txt"): try: meta = json.loads(meta_file.read_text(encoding="utf-8")) except Exception: continue if meta.get("source_filename") != filename: continue meta["source_filename"] = new_name meta_file.write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8", ) return jsonify({"old_name": filename, "new_name": new_name}) @app.route("/api/crop", methods=["POST"]) def crop_image(): """ Erwartet JSON: { "filename": "bild.jpg", "selections": [ { "number": 1, "mode": "rect" | "polygon", "bbox": {"x": 10, "y": 20, "width": 200, "height": 150}, // für rect "polygon": [{"x": 10, "y": 20}, ...] // für polygon }, ... ], "title_de": "...", "position_de": "...", "action_de": "...", "condition_de": "..." } ODER (Legacy-Format für Kompatibilität): { "filename": "bild.jpg", "mode": "rect" | "polygon", "x": 10, "y": 20, "width": 200, "height": 150, // für rect "polygon": [{"x": 10, "y": 20}, ...] // für polygon } """ data = request.get_json(force=True, silent=True) or {} if "filename" not in data: return jsonify({"error": "Missing filename"}), 400 src_path = PICTURES_DIR / data["filename"] if src_path.stem.endswith("_saved"): return jsonify({"error": "Cannot crop on saved image"}), 400 if not src_path.exists(): return jsonify({"error": "Source image not found"}), 404 OBJECTS_DIR.mkdir(parents=True, exist_ok=True) # UUID für Bild + Metadaten obj_id = uuid4().hex timestamp = datetime.now().isoformat() out_image_name = f"{obj_id}{src_path.suffix}" out_image_path = OBJECTS_DIR / out_image_name out_meta_path = OBJECTS_DIR / f"{obj_id}.txt" # Prüfen, ob neues Format (selections) oder Legacy-Format selections = data.get("selections") if selections and isinstance(selections, list) and len(selections) > 0: # Neues Format: mehrere Auswahlen processed_selections = [] with Image.open(src_path) as img: img_w, img_h = img.size for sel in selections: sel_mode = sel.get("mode") if sel_mode == "rect": bbox = sel.get("bbox") if not bbox: continue try: x = int(bbox.get("x", 0)) y = int(bbox.get("y", 0)) w = int(bbox.get("width", 0)) h = int(bbox.get("height", 0)) except (ValueError, TypeError): continue if w <= 0 or h <= 0: continue x2 = min(x + w, img_w) y2 = min(y + h, img_h) x1 = max(0, x) y1 = max(0, y) if x1 >= x2 or y1 >= y2: continue processed_selections.append({ "number": sel.get("number", len(processed_selections) + 1), "mode": "rect", "bbox": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1}, }) elif sel_mode == "polygon": polygon = sel.get("polygon") if not isinstance(polygon, list) or len(polygon) < 3: continue try: xs = [int(p.get("x", 0)) for p in polygon] ys = [int(p.get("y", 0)) for p in polygon] except (KeyError, TypeError, ValueError): continue min_x = max(min(xs), 0) min_y = max(min(ys), 0) max_x = min(max(xs), img_w) max_y = min(max(ys), img_h) if min_x >= max_x or min_y >= max_y: continue processed_selections.append({ "number": sel.get("number", len(processed_selections) + 1), "mode": "polygon", "polygon": polygon, "bbox": {"x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y}, }) if not processed_selections: return jsonify({"error": "No valid selections provided"}), 400 # Erstes Bild aus erster Auswahl erstellen first_sel = processed_selections[0] try: with Image.open(src_path) as img: if first_sel["mode"] == "rect": bbox = first_sel["bbox"] x1 = bbox["x"] y1 = bbox["y"] x2 = x1 + bbox["width"] y2 = y1 + bbox["height"] cropped = img.crop((x1, y1, x2, y2)) else: # polygon - verwende Bounding-Box für das Bild bbox = first_sel["bbox"] x1 = bbox["x"] y1 = bbox["y"] x2 = x1 + bbox["width"] y2 = y1 + bbox["height"] cropped = img.crop((x1, y1, x2, y2)) # Bild speichern (Format beibehalten) img_format = img.format or "PNG" cropped.save(out_image_path, format=img_format) print(f"[crop_image] Bild gespeichert: {out_image_path} (Größe: {cropped.size}, Format: {img_format})") except Exception as e: print(f"[crop_image] Fehler beim Erstellen des Bildes: {e}") return jsonify({"error": f"Failed to create image: {e}"}), 500 # Metadaten mit allen Auswahlen speichern meta = { "id": obj_id, "created_at": timestamp, "source_filename": data["filename"], "image_file": out_image_name, "hierarchy": 1, "parent_id": None, "title_de": data.get("title_de", ""), "position_de": data.get("position_de", ""), "action_de": data.get("action_de", ""), "condition_de": data.get("condition_de", ""), "selections": processed_selections, } else: # Legacy-Format: einzelne Auswahl (für Rückwärtskompatibilität) mode = data.get("mode", "rect") if mode not in {"rect", "polygon"}: return jsonify({"error": "Invalid mode"}), 400 with Image.open(src_path) as img: img_w, img_h = img.size if mode == "rect": try: x = int(data["x"]) y = int(data["y"]) w = int(data["width"]) h = int(data["height"]) except (ValueError, TypeError, KeyError): return jsonify({"error": "Invalid rectangle coordinates"}), 400 if w <= 0 or h <= 0: return jsonify({"error": "Width and height must be positive"}), 400 x2 = min(x + w, img_w) y2 = min(y + h, img_h) x1 = max(0, x) y1 = max(0, y) if x1 >= x2 or y1 >= y2: return jsonify({"error": "Crop area outside of image"}), 400 cropped = img.crop((x1, y1, x2, y2)) bbox = {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1} else: # polygon polygon = data.get("polygon") or [] if not isinstance(polygon, list) or len(polygon) < 3: return jsonify({"error": "Polygon must have at least 3 points"}), 400 try: xs = [int(p["x"]) for p in polygon] ys = [int(p["y"]) for p in polygon] except (KeyError, TypeError, ValueError): return jsonify({"error": "Invalid polygon coordinates"}), 400 min_x = max(min(xs), 0) min_y = max(min(ys), 0) max_x = min(max(xs), img_w) max_y = min(max(ys), img_h) if min_x >= max_x or min_y >= max_y: return jsonify({"error": "Polygon bounding box outside of image"}), 400 cropped = img.crop((min_x, min_y, max_x, max_y)) bbox = { "x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y, } cropped.save(out_image_path) # Metadaten als JSON in .txt speichern meta = { "id": obj_id, "created_at": timestamp, "source_filename": data["filename"], "mode": mode, "bbox": bbox, "image_file": out_image_name, "hierarchy": 1, "parent_id": None, "title_de": data.get("title_de", ""), "position_de": data.get("position_de", ""), "action_de": data.get("action_de", ""), "condition_de": data.get("condition_de", ""), } if mode == "polygon": meta["polygon"] = data.get("polygon") out_meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify( { "id": obj_id, "image_file": out_image_name, "meta_file": out_meta_path.name, } ) # ── Claude / API Helpers ────────────────────────────────────────────────────── def _ensure_junction(collection: str, field1: str, field2: str, token: str): """No-op: snakkimo verwaltet Junctions intern.""" pass def _ensure_link(collection: str, match: dict, payload: dict, token: str): """No-op: snakkimo verwaltet Links intern.""" pass def _find_or_create_word(titel_de: str, level: int, token: str): """Return (word_id, is_new). Erstellt word mit status=requested falls nicht vorhanden.""" enc = urllib.parse.quote(titel_de, safe="") data, status = _snakkimo("GET", f"/words?titel_de={enc}&limit=1", token) items = data.get("data") or [] if status == 200 and items: first = items[0] if isinstance(items, list) else items return first["id"], False body = {"titel_de": titel_de, "difficulty_level": level} data, status = _snakkimo("POST", "/words", token, body) if status in (200, 201): item = data.get("data") or data return item["id"], True raise RuntimeError(f"Word creation failed ({status}): {data}") def _find_or_create_question(question_de: str, answer_de: str, level: int, short_answer_id, short_answer_de: str, obj_id: str, token: str): """Return (question_id, is_new). Erstellt question mit status=draft falls nicht vorhanden. FLAG: answer_de, short_answer_de und level werden nicht mehr gespeichert (kein Feld in snakkimo). """ enc = urllib.parse.quote(question_de, safe="") data, status = _snakkimo("GET", f"/questions?sentence_de={enc}&limit=1", token) items = data.get("data") or [] if status == 200 and items: first = items[0] if isinstance(items, list) else items return first["id"], False body = {"sentence_de": question_de} data, status = _snakkimo("POST", "/questions", token, body) if status in (200, 201): item = data.get("data") or data return item["id"], True raise RuntimeError(f"Question creation failed ({status}): {data}") # ── Generate & Publish endpoints ────────────────────────────────────────────── @app.route("/api/object//generate_questions", methods=["POST"]) def generate_questions(obj_id: str): """ 1. Holt Objekt + Elternobjekt aus Directus 2. Füllt Prompt-Platzhalter 3. Ruft Claude Haiku auf 4. Speichert Wörter + Fragen als Entwurf in Directus """ token = request.headers.get("Authorization", "") body = request.get_json(force=True, silent=True) or {} prompt_template = (body.get("prompt") or "").strip() if not prompt_template: return jsonify({"error": "Missing prompt"}), 400 if not ANTHROPIC_API_KEY: return jsonify({"error": "ANTHROPIC_API_KEY not configured on server"}), 500 # Objekt laden obj_resp, s = _snakkimo("GET", f"/objects/{obj_id}", token) if s != 200: return jsonify({"error": "Object not found"}), 404 obj = obj_resp.get("data") or {} if isinstance(obj, list): obj = obj[0] if obj else {} # Elternobjekt: snakkimo objects haben kein parent-Feld → übersprungen parent_notes = "" # Platzhalter ersetzen prompt = ( prompt_template .replace("{user-notes_object}", obj.get("notes") or obj.get("user_notes") or "") .replace("{user-notes_parentobject}", parent_notes) ) # Claude Haiku aufrufen try: client = _anthropic_sdk.Anthropic(api_key=ANTHROPIC_API_KEY) msg = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=4096, messages=[{"role": "user", "content": prompt}], ) raw = msg.content[0].text except Exception as e: return jsonify({"error": f"Claude API error: {e}"}), 500 # JSON parsen try: ai = json.loads(raw) except Exception: try: start = raw.index("{") end = raw.rindex("}") + 1 ai = json.loads(raw[start:end]) except Exception: return jsonify({"error": "Invalid JSON from AI", "raw": raw[:400]}), 500 levels = ai.get("levels", []) if not levels: return jsonify({"error": "No levels in AI response"}), 500 stats = { "words_created": 0, "words_linked": 0, "questions_created": 0, "questions_linked": 0, } def _sanitize_word(raw: str) -> list[str]: """Zerlegt kommagetrennte Einträge und gibt nur echte Einzelwörter zurück.""" tokens = [] for part in raw.replace(";", ",").split(","): part = part.strip().strip("\"'") if not part: continue words_in_part = part.split() if len(words_in_part) == 1: tokens.append(words_in_part[0]) # Mehrwortige Einträge → überspringen (KI-Fehler) return tokens # Bestehende Verlinkungen laden obj_detail, _ = _snakkimo("GET", f"/objects/{obj_id}", token) obj_data = obj_detail.get("data") or {} if isinstance(obj_data, list): obj_data = obj_data[0] if obj_data else {} existing_word_ids: set[str] = set(obj_data.get("word_ids") or []) existing_question_ids: set[str] = set() # Alle eindeutigen Wörter aus allen Leveln vorab sammeln und einmalig laden all_words_by_level: dict[str, int] = {} # title_de → first level seen for lvl in levels: level = int(lvl.get("level") or 1) raw_words = lvl.get("words", []) + lvl.get("distractor_words", []) + [lvl.get("short_answer", "")] for raw in raw_words: for w in _sanitize_word(str(raw)): if w and w not in all_words_by_level: all_words_by_level[w] = level # Wörter einmalig anlegen / finden (globaler Cache über alle Level) global_word_map: dict[str, str] = {} # titel_de → id for w, lvl_num in all_words_by_level.items(): try: wid, is_new = _find_or_create_word(w, lvl_num, token) global_word_map[w] = wid if wid not in existing_word_ids: _snakkimo("POST", f"/objects/{obj_id}/words/{wid}", token) existing_word_ids.add(wid) stats["words_created" if is_new else "words_linked"] += 1 except Exception as e: print(f"[generate_questions] word error '{w}': {e}") new_question_links: list[dict] = [] for lvl in levels: level = int(lvl.get("level") or 1) q_de = (lvl.get("question") or "").strip() a_de = (lvl.get("answer") or "").strip() short_text = (lvl.get("short_answer") or "").strip() words_list = [t for raw in lvl.get("words", []) for t in _sanitize_word(str(raw))] distractor_list = [t for raw in lvl.get("distractor_words", []) for t in _sanitize_word(str(raw))] if not q_de: continue short_answer_id = global_word_map.get(short_text) if short_text else None # Frage anlegen / verknüpfen try: q_id, q_is_new = _find_or_create_question(q_de, a_de, level, short_answer_id, short_text, obj_id, token) except Exception as e: print(f"[generate_questions] question error level {level}: {e}") continue stats["questions_created" if q_is_new else "questions_linked"] += 1 # Frage anlegen — wird später mit Objekt verknüpft existing_question_ids.add(q_id) # FLAG: distractor_words und related_words kein Äquivalent in snakkimo → übersprungen # Fragen mit Objekt verknüpfen for q_id in existing_question_ids: _snakkimo("POST", f"/objects/{obj_id}/pairs/{q_id}", token) # no-op if exists print(f"[generate_questions] obj={obj_id} stats={stats}") return jsonify({"ok": True, "object_id": obj_id, "stats": stats}) @app.route("/api/object//publish_questions", methods=["POST"]) def publish_questions(obj_id: str): """Setzt Status aller verknüpften Fragen und Wörter auf 'published'.""" token = request.headers.get("Authorization", "") # Wörter des Objekts laden w_resp, _ = _snakkimo("GET", f"/objects/{obj_id}/words", token) words = w_resp.get("data") or [] words = words if isinstance(words, list) else [] published_w = 0 for w in words: _, s = _snakkimo("PATCH", f"/words/{w['id']}", token, {"status": "published"}) if s == 200: published_w += 1 return jsonify({ "ok": True, "published_questions": 0, # questions haben kein publish-Konzept mehr "published_words": published_w, }) @app.route("/api/object//questions", methods=["GET"]) def get_object_questions_list(obj_id: str): """Gibt Pairs des Objekts zurück (snakkimo: pairs mit questions/statements).""" token = request.headers.get("Authorization", "") data, status = _snakkimo("GET", f"/objects/{obj_id}/pairs", token) pairs = data.get("data") or [] pairs = pairs if isinstance(pairs, list) else [] # Normalize to expected shape items = [] for p in pairs: q = p.get("question") or {} items.append({ "id": p["id"], "question_de": q.get("sentence_de", ""), "answer_de": "", # kein Äquivalent in snakkimo "short_answer_de": "", "level": p.get("difficulty_level") or 0, "status": p.get("status", "draft"), "distractor_words": [], # kein Äquivalent in snakkimo }) return jsonify({"data": items}) @app.route("/api/object//words", methods=["GET"]) def get_object_words_list(obj_id: str): """Gibt alle verknüpften Wörter eines Objekts zurück.""" token = request.headers.get("Authorization", "") data, _ = _snakkimo("GET", f"/objects/{obj_id}/words", token) words = data.get("data") or [] words = words if isinstance(words, list) else [] items = sorted( [{"id": w["id"], "title_de": w.get("titel_de", ""), "titel_de": w.get("titel_de", ""), "level": w.get("difficulty_level") or 50, "status": w.get("status", "")} for w in words if w.get("status") != "blocked"], key=lambda x: x.get("titel_de") or "" ) return jsonify({"data": items}) def _delete_junction_rows(collection: str, field: str, value: str, token: str): """No-op: snakkimo löscht Junctions via CASCADE automatisch.""" pass @app.route("/api/question/", methods=["DELETE"]) def delete_question_item(q_id: str): """Löscht eine Frage via snakkimo API.""" token = request.headers.get("Authorization", "") _, status = _snakkimo("DELETE", f"/questions/{q_id}", token) if status in (200, 204): return jsonify({"ok": True}) return jsonify({"error": "Delete failed"}), status @app.route("/api/word/", methods=["DELETE"]) def delete_word_item(w_id: str): """Löscht ein Wort via snakkimo API (CASCADE entfernt Junction-Zeilen automatisch).""" token = request.headers.get("Authorization", "") _, status = _snakkimo("DELETE", f"/words/{w_id}", token) if status in (200, 204): return jsonify({"ok": True}) return jsonify({"error": "Delete failed"}), status @app.route("/api/object//purge-orphans", methods=["POST"]) def purge_orphan_junctions(obj_id: str): """No-op: snakkimo verwendet CASCADE — keine verwaisten Junctions möglich.""" return jsonify({"ok": True, "orphans_removed": 0, "note": "not needed with snakkimo API"}) @app.route("/api/purge-all-orphans", methods=["POST"]) def purge_all_orphans(): """No-op: snakkimo verwendet CASCADE — keine verwaisten Junctions möglich.""" return jsonify({"ok": True, "orphans_removed": 0, "note": "not needed with snakkimo API"}) @app.route("/api/fix-distractor-field", methods=["POST"]) def fix_distractor_field(): """No-op: distractor_words existiert nicht in snakkimo.""" return jsonify({"ok": False, "note": "not supported in snakkimo API"}) @app.route("/api/setup-schema", methods=["POST"]) def setup_directus_schema(): """No-op: Schema wird durch snakkimo API Migration verwaltet.""" return jsonify({"ok": True, "note": "not needed with snakkimo API"}) @app.route("/api/setup-words-pictures", methods=["POST"]) def setup_words_pictures(): """No-op: M2M-Relationen werden durch snakkimo API verwaltet.""" return jsonify({"ok": True, "note": "not needed with snakkimo API"}) # ── db_* Collection Routes (snakkimo API) ──────────────────────────────────── # _find_or_create_db_word is identical to _find_or_create_word _find_or_create_db_word = _find_or_create_word @app.route("/api/directus/db-pictures", methods=["GET"]) def directus_db_pictures(): token = request.headers.get("Authorization", "") pic_status = request.args.get("status", "uploaded") # Directus used 'draft' — snakkimo uses 'uploaded' status_map = {"draft": "uploaded"} snakkimo_status = status_map.get(pic_status, pic_status) data, status = _snakkimo("GET", f"/pictures?status={urllib.parse.quote(snakkimo_status)}&limit=500", token) pics = data.get("data") or [] normalized = [ { "id": p["id"], "picture": p.get("picture_link"), "blurhash": p.get("blurhash"), "status": p.get("status"), "design": p.get("design"), } for p in (pics if isinstance(pics, list) else []) ] return jsonify({"data": normalized}) @app.route("/api/directus/db-pictures/design-options", methods=["GET"]) def directus_db_pictures_design_options(): """Returns unique design values from the pictures table.""" token = request.headers.get("Authorization", "") data, _ = _snakkimo("GET", "/pictures?limit=500", token) pics = data.get("data") or [] seen = set() choices = [] for p in (pics if isinstance(pics, list) else []): d = p.get("design") if d and d not in seen: seen.add(d) choices.append({"text": d, "value": d}) return jsonify({"choices": sorted(choices, key=lambda x: x["value"])}) @app.route("/api/directus/db-pictures/", methods=["PATCH", "DELETE"]) def directus_db_picture(pic_id): token = request.headers.get("Authorization", "") if request.method == "PATCH": body = request.get_json(force=True, silent=True) or {} # remap 'picture' → 'picture_link' if "picture" in body and "picture_link" not in body: body["picture_link"] = body.pop("picture") data, status = _snakkimo("PATCH", f"/pictures/{pic_id}", token, body) return jsonify(data), status # DELETE — snakkimo handles S3 cleanup automatically _, del_status = _snakkimo("DELETE", f"/pictures/{pic_id}", token) if del_status in (200, 204): return jsonify({}), 204 return jsonify({"error": f"Delete failed (status {del_status})"}), del_status @app.route("/api/directus/db-objects", methods=["GET", "POST"]) def directus_db_objects(): token = request.headers.get("Authorization", "") if request.method == "GET": picture_id = request.args.get("picture_id", "") path = (f"/objects?picture_id={urllib.parse.quote(picture_id)}&limit=500" if picture_id else "/objects?limit=500") data, status = _snakkimo("GET", path, token) objs = data.get("data") or [] normalized = [] for o in (objs if isinstance(objs, list) else []): pic_ids = o.get("picture_ids") or [] normalized.append({ "id": o["id"], "selections": o.get("selections"), "user_notes": o.get("notes"), "status": o.get("status"), "picture": pic_ids[0] if pic_ids else None, }) return jsonify({"data": normalized}) else: body = request.get_json(force=True, silent=True) or {} if "user_notes" in body and "notes" not in body: body["notes"] = body.pop("user_notes") data, status = _snakkimo("POST", "/objects", token, body) return jsonify(data), status @app.route("/api/directus/db-objects/", methods=["PATCH", "DELETE"]) def directus_db_object(obj_id): token = request.headers.get("Authorization", "") if request.method == "PATCH": body = request.get_json(force=True, silent=True) or {} if "user_notes" in body and "notes" not in body: body["notes"] = body.pop("user_notes") data, status = _snakkimo("PATCH", f"/objects/{obj_id}", token, body) else: data, status = _snakkimo("DELETE", f"/objects/{obj_id}", token) return jsonify(data), status @app.route("/api/directus/db-pictures//words", methods=["GET", "POST"]) def directus_db_picture_words(pic_id): token = request.headers.get("Authorization", "") if request.method == "GET": data, s = _snakkimo("GET", f"/pictures/{pic_id}/words", token) if s != 200: return jsonify({"data": []}) words = data.get("data") or [] items = [] for w in (words if isinstance(words, list) else []): if w.get("status") == "blocked": continue items.append({ "junction_id": w["id"], # word_id used as junction_id for deletes "word_id": w["id"], "titel_de": w.get("titel_de", ""), "level": w.get("difficulty_level") or 50, "status": w.get("status", ""), }) return jsonify({"data": items}) else: body = request.get_json(force=True, silent=True) or {} words_to_save = body.get("words", []) existing_data, _ = _snakkimo("GET", f"/pictures/{pic_id}/words", token) existing_ids = {w["id"] for w in (existing_data.get("data") or []) if isinstance(w, dict)} saved = 0 for entry in words_to_save: titel_de = (entry.get("titel_de") or "").strip() level = int(entry.get("level") or 50) if not titel_de: continue try: wid, is_new = _find_or_create_word(titel_de, level, token) if not is_new: _snakkimo("PATCH", f"/words/{wid}", token, {"difficulty_level": level}) if wid not in existing_ids: _snakkimo("POST", f"/words/{wid}/pictures/{pic_id}", token) existing_ids.add(wid) saved += 1 except Exception as e: print(f"[db_picture_words] error for '{titel_de}': {e}") return jsonify({"ok": True, "saved": saved}) @app.route("/api/directus/db-pictures//words/", methods=["DELETE"]) def directus_db_picture_word_delete(pic_id, junction_id): # junction_id == word_id (set equal in GET above) token = request.headers.get("Authorization", "") _snakkimo("DELETE", f"/pictures/{pic_id}/words/{junction_id}", token) return jsonify({"ok": True}) @app.route("/api/directus/db-objects//pairs", methods=["GET", "POST"]) def directus_db_object_pairs(obj_id): token = request.headers.get("Authorization", "") if request.method == "GET": data, _ = _snakkimo("GET", f"/objects/{obj_id}/pairs", token) pairs = data.get("data") or [] pairs = pairs if isinstance(pairs, list) else [] # Collect all word IDs for bulk fetch all_word_ids = set() for p in pairs: for stmt_key in ("positive_statement", "negative_statement"): s = p.get(stmt_key) or {} for wid in (s.get("positive_word_ids") or []): all_word_ids.add(wid) for wid in (s.get("negative_word_ids") or []): all_word_ids.add(wid) word_map = {} for wid in all_word_ids: wd, ws = _snakkimo("GET", f"/words/{wid}", token) if ws == 200: w = wd.get("data") or wd if isinstance(w, dict) and w.get("id"): word_map[wid] = w result = [] for pair in pairs: pid = pair["id"] q = pair.get("question") or {} pos_stmt = pair.get("positive_statement") or {} questions = [] if q.get("id"): questions.append({ "id": q["id"], "question_de": q.get("sentence_de", ""), "level": pair.get("difficulty_level") or 0, "status": q.get("status", ""), "words": [], # FLAG: question words not supported in snakkimo }) statements = [] if pos_stmt.get("id"): pos_word_ids = pos_stmt.get("positive_word_ids") or [] stmt_words = [] for wid in pos_word_ids: w = word_map.get(wid, {}) stmt_words.append({ "junction_id": wid, "word_id": wid, "titel_de": w.get("titel_de", ""), "level": w.get("difficulty_level") or 50, }) statements.append({ "id": pos_stmt["id"], "statement_de": pos_stmt.get("positive_sentence_de", ""), "level": pair.get("difficulty_level") or 0, "status": pos_stmt.get("status", ""), "words": stmt_words, }) result.append({ "id": pid, "status": pair.get("status", "draft"), "level": pair.get("difficulty_level") or 0, "questions": questions, "statements": statements, }) return jsonify({"data": result}) else: body = request.get_json(force=True, silent=True) or {} question_de = (body.get("question_de") or "").strip() statement_de = (body.get("statement_de") or "").strip() level = int(body.get("level") or 1) words = body.get("words", []) if not statement_de: return jsonify({"error": "statement_de is required"}), 400 # Create statement stmt_resp, s = _snakkimo("POST", "/statements", token, {"positive_sentence_de": statement_de}) if s not in (200, 201): return jsonify({"error": "Failed to create statement"}), 500 stmt_data = stmt_resp.get("data") or stmt_resp stmt_id = stmt_data["id"] # Create question if provided q_id = None if question_de: q_resp, s = _snakkimo("POST", "/questions", token, {"sentence_de": question_de}) if s in (200, 201): q_data = q_resp.get("data") or q_resp q_id = q_data["id"] # Create pair linking statement + question pair_body = {"answer_type": "yes_no", "difficulty_level": level, "positive_statement_id": stmt_id} if q_id: pair_body["question_id"] = q_id pair_resp, s = _snakkimo("POST", "/pairs", token, pair_body) if s not in (200, 201): return jsonify({"error": "Failed to create pair"}), 500 pair_data = pair_resp.get("data") or pair_resp pair_id = pair_data["id"] # Link pair to object _snakkimo("POST", f"/objects/{obj_id}/pairs/{pair_id}", token) # Add words to statement (FLAG: linking words to questions not supported) for we in words: titel_de = (we.get("titel_de") or "").strip() w_level = int(we.get("level") or level) link_to = we.get("link_to", "both") if not titel_de: continue try: wid, _ = _find_or_create_word(titel_de, w_level, token) if link_to in ("statement", "both"): _snakkimo("POST", f"/statements/{stmt_id}/positive-words/{wid}", token) except Exception as e: print(f"[db_object_pairs] word error '{titel_de}': {e}") return jsonify({"ok": True, "pair_id": pair_id, "statement_id": stmt_id, "question_id": q_id}) @app.route("/api/directus/db-objects//words", methods=["GET", "POST"]) def directus_db_object_words(obj_id): """Words linked to an object via snakkimo API.""" token = request.headers.get("Authorization", "") if request.method == "GET": data, s = _snakkimo("GET", f"/objects/{obj_id}/words", token) if s != 200: return jsonify({"data": []}) words = data.get("data") or [] items = [] for w in (words if isinstance(words, list) else []): if w.get("status") == "blocked": continue items.append({ "junction_id": w["id"], # word_id used as junction_id for deletes "word_id": w["id"], "titel_de": w.get("titel_de", ""), "level": w.get("difficulty_level") or 50, "status": w.get("status", ""), }) return jsonify({"data": items}) else: body = request.get_json(force=True, silent=True) or {} titel_de = (body.get("titel_de") or "").strip() level = int(body.get("level") or 50) if not titel_de: return jsonify({"error": "titel_de required"}), 400 wid, _ = _find_or_create_word(titel_de, level, token) existing_data, _ = _snakkimo("GET", f"/objects/{obj_id}/words", token) existing_ids = {w["id"] for w in (existing_data.get("data") or []) if isinstance(w, dict)} if wid in existing_ids: return jsonify({"ok": True, "already_exists": True, "word_id": wid, "junction_id": wid}) _snakkimo("POST", f"/objects/{obj_id}/words/{wid}", token) return jsonify({"ok": True, "word_id": wid, "junction_id": wid}) @app.route("/api/directus/db-objects//words/", methods=["DELETE"]) def directus_db_object_word_delete(obj_id, junction_id): token = request.headers.get("Authorization", "") _snakkimo("DELETE", f"/objects/{obj_id}/words/{junction_id}", token) return jsonify({"ok": True}) @app.route("/api/directus/db-pairs/", methods=["PATCH", "DELETE"]) def directus_db_pair(pair_id): """PATCH: update difficulty_level, statement_de, question_de, words. DELETE: delete pair + linked statement + question.""" token = request.headers.get("Authorization", "") if request.method == "PATCH": body = request.get_json(force=True, silent=True) or {} # Fetch current pair to get FK IDs pair_data, ps = _snakkimo("GET", f"/pairs/{pair_id}", token) if ps != 200: return jsonify({"error": "Pair not found"}), 404 pair = pair_data.get("data") or pair_data stmt_id = pair.get("positive_statement_id") q_id = pair.get("question_id") if "level" in body: _snakkimo("PATCH", f"/pairs/{pair_id}", token, {"difficulty_level": body["level"]}) if "statement_de" in body and stmt_id: _snakkimo("PATCH", f"/statements/{stmt_id}", token, {"positive_sentence_de": body["statement_de"]}) if "question_de" in body: question_de = (body["question_de"] or "").strip() if q_id and question_de: _snakkimo("PATCH", f"/questions/{q_id}", token, {"sentence_de": question_de}) elif q_id and not question_de: # Remove question from pair then delete it _snakkimo("PATCH", f"/pairs/{pair_id}", token, {"question_id": None}) _snakkimo("DELETE", f"/questions/{q_id}", token) q_id = None elif not q_id and question_de: q_resp, s = _snakkimo("POST", "/questions", token, {"sentence_de": question_de}) if s in (200, 201): q_data = q_resp.get("data") or q_resp q_id = q_data["id"] _snakkimo("PATCH", f"/pairs/{pair_id}", token, {"question_id": q_id}) # Reload stmt_id in case it changed if not stmt_id: pair_data2, _ = _snakkimo("GET", f"/pairs/{pair_id}", token) pair2 = pair_data2.get("data") or pair_data2 stmt_id = pair2.get("positive_statement_id") for we in body.get("words_add", []): titel_de = (we.get("titel_de") or "").strip() w_level = int(we.get("level") or 50) link_to = we.get("link_to", "both") if not titel_de: continue try: wid, _ = _find_or_create_word(titel_de, w_level, token) if link_to in ("statement", "both") and stmt_id: _snakkimo("POST", f"/statements/{stmt_id}/positive-words/{wid}", token) # FLAG: linking words to questions not supported in snakkimo except Exception as e: print(f"[db_pair_patch] word error '{titel_de}': {e}") for wr in body.get("words_remove", []): junction_id = wr.get("junction_id") link_to = wr.get("link_to", "both") if not junction_id: continue if link_to in ("statement", "both") and stmt_id: _snakkimo("DELETE", f"/statements/{stmt_id}/positive-words/{junction_id}", token) # FLAG: removing words from questions not supported in snakkimo return jsonify({"ok": True}) else: # DELETE pair_data, ps = _snakkimo("GET", f"/pairs/{pair_id}", token) pair = (pair_data.get("data") or pair_data) if ps == 200 else {} stmt_id = pair.get("positive_statement_id") q_id = pair.get("question_id") # CASCADE removes object_pairs junction automatically _, s = _snakkimo("DELETE", f"/pairs/{pair_id}", token) if stmt_id: _snakkimo("DELETE", f"/statements/{stmt_id}", token) if q_id: _snakkimo("DELETE", f"/questions/{q_id}", token) return jsonify({"ok": s in (200, 204)}) @app.route("/api/directus/db-words/search", methods=["GET"]) def directus_db_words_search(): """Schlanke Suche in db_words für Autocomplete. Liefert id, titel_de, level — case-insensitive contains, sortiert alphabetisch.""" token = request.headers.get("Authorization", "") q = (request.args.get("q") or "").strip() try: limit = max(1, min(int(request.args.get("limit", "10")), 50)) except ValueError: limit = 10 if not q: return jsonify({"data": []}), 200 qs = ( "fields=id,titel_de,level" f"&filter[titel_de][_icontains]={urllib.parse.quote(q)}" "&sort=titel_de" f"&limit={limit}" ) data, status = _directus("GET", f"/items/db_words?{qs}", token) return jsonify({"data": data.get("data") or []}), status # ===================================================== # CONTENT-MANAGEMENT DASHBOARD (admin) # ===================================================== # Allowlist: nur diese Collections sind über die generischen # Management-Endpoints erreichbar. Erweiterbar. DASHBOARD_COLLECTIONS = [ {"name": "db_pictures", "label": "Bilder", "kind": "image", "group": "neu", "fields": "id,media,status,date_created,design", "preview": "media", "title_field": None}, {"name": "db_objects", "label": "Objekte", "kind": "image", "group": "neu", "fields": "id,picture,status,date_created,user_notes", "preview": None, "title_field": "user_notes"}, {"name": "db_words", "label": "Wörter", "kind": "text", "group": "neu", "fields": "id,titel_de,level,status,date_created", "preview": None, "title_field": "titel_de"}, {"name": "db_question", "label": "Fragen", "kind": "text", "group": "neu", "fields": "id,question_de,level,status,date_created", "preview": None, "title_field": "question_de"}, {"name": "db_statement", "label": "Statements", "kind": "text", "group": "neu", "fields": "id,statement_de,level,status,date_created", "preview": None, "title_field": "statement_de"}, {"name": "db_pairs", "label": "Q&A-Paare", "kind": "text", "group": "neu", "fields": "id,level,status,date_created", "preview": None, "title_field": None}, ] DASHBOARD_BY_NAME = {c["name"]: c for c in DASHBOARD_COLLECTIONS} def _collection_config_or_404(name): cfg = DASHBOARD_BY_NAME.get(name) if not cfg: return None, (jsonify({"error": "Unbekannte Collection"}), 404) return cfg, None @app.route("/api/directus/dashboard/summary", methods=["GET"]) def directus_dashboard_summary(): """Liefert pro Collection Total + Status-Counts. Admin-Token vom User.""" token = request.headers.get("Authorization", "") out = [] for cfg in DASHBOARD_COLLECTIONS: entry = { "name": cfg["name"], "label": cfg["label"], "kind": cfg["kind"], "group": cfg["group"], "total": 0, "by_status": [], "has_status": False, "error": None, } # 1) Total-Count via aggregate data, status = _directus("GET", f"/items/{cfg['name']}?aggregate[count]=*", token) if status == 200: try: entry["total"] = int(((data.get("data") or [{}])[0] or {}).get("count", 0) or 0) except (TypeError, ValueError): entry["total"] = 0 else: entry["error"] = (data.get("errors") or [{}])[0].get("message") or f"HTTP {status}" # 2) Status-Groups (kann fehlschlagen, wenn Feld fehlt → ignorieren) data2, status2 = _directus("GET", f"/items/{cfg['name']}?aggregate[count]=*&groupBy[]=status&limit=-1", token) if status2 == 200: rows = data2.get("data") or [] grouped = [] for r in rows: if not isinstance(r, dict): continue grouped.append({ "status": r.get("status"), "count": int((r.get("count") or {}).get("id", 0) if isinstance(r.get("count"), dict) else r.get("count") or 0), }) grouped.sort(key=lambda x: -x["count"]) entry["by_status"] = grouped entry["has_status"] = any(g.get("status") is not None for g in grouped) out.append(entry) return jsonify({"data": out}), 200 @app.route("/api/directus/collection/", methods=["GET"]) def directus_collection_list(name): """Generische Liste mit optionalem Status-Filter + Pagination.""" cfg, err = _collection_config_or_404(name) if err: return err token = request.headers.get("Authorization", "") status_filter = request.args.get("status", "").strip() limit = request.args.get("limit", "50") offset = request.args.get("offset", "0") sort = request.args.get("sort", "-date_created") parts = [f"fields={cfg['fields']}", f"limit={limit}", f"offset={offset}", f"sort={sort}"] if status_filter: parts.append(f"filter[status][_eq]={urllib.parse.quote(status_filter)}") query = "&".join(parts) data, status = _directus("GET", f"/items/{name}?{query}", token) # Total für aktuellen Filter count_parts = ["aggregate[count]=*"] if status_filter: count_parts.append(f"filter[status][_eq]={urllib.parse.quote(status_filter)}") count_data, _ = _directus("GET", f"/items/{name}?{'&'.join(count_parts)}", token) total = 0 try: total = int(((count_data.get("data") or [{}])[0] or {}).get("count", 0) or 0) except (TypeError, ValueError): pass return jsonify({ "data": data.get("data") or [], "meta": {"total": total, "limit": int(limit), "offset": int(offset)}, "collection": { "name": cfg["name"], "label": cfg["label"], "kind": cfg["kind"], "preview": cfg.get("preview"), "title_field": cfg.get("title_field"), }, }), status @app.route("/api/directus/collection//", methods=["GET", "PATCH"]) def directus_collection_item(name, item_id): """Eintrag laden oder aktualisieren.""" cfg, err = _collection_config_or_404(name) if err: return err token = request.headers.get("Authorization", "") if request.method == "GET": data, status = _directus("GET", f"/items/{name}/{item_id}", token) return jsonify({ "data": data.get("data"), "collection": { "name": cfg["name"], "label": cfg["label"], "kind": cfg["kind"], "preview": cfg.get("preview"), "title_field": cfg.get("title_field"), }, }), status # PATCH body = request.get_json(force=True, silent=True) or {} data, status = _directus("PATCH", f"/items/{name}/{item_id}", token, body) return jsonify(data), status if __name__ == "__main__": app.run(host="0.0.0.0", port=8000, debug=True)