from pathlib import Path from datetime import datetime from uuid import uuid4 import json import os import urllib.request import urllib.error import urllib.parse from flask import Flask, send_from_directory, request, jsonify from flask_cors import CORS from PIL import Image import ollama import anthropic as _anthropic_sdk ANTHROPIC_API_KEY = os.environ.get("ANTHROPIC_API_KEY", "") DIRECTUS_URL = "https://db.hejyou.com" BASE_DIR = Path(__file__).resolve().parent PICTURES_DIR = BASE_DIR / "pictures" OBJECTS_DIR = BASE_DIR / "objects_image" SENTENCE_DIR = BASE_DIR / "sentence_object" PROMPTS_DIR = BASE_DIR / "prompts" app = Flask( __name__, template_folder=str(BASE_DIR / "templates"), static_folder=str(BASE_DIR / "static"), ) CORS(app) def read_prompt(filepath: Path, fallback: str) -> str: """ Hilfsfunktion, um Prompt-Dateien zu lesen. """ try: if filepath.exists(): return filepath.read_text(encoding="utf-8").strip() except Exception: pass return fallback.strip() def _directus(method, path, token, body=None): """Hilfsfunktion: Directus-API-Aufruf via urllib.""" headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = token req = urllib.request.Request( f"{DIRECTUS_URL}{path}", data=json.dumps(body).encode() if body is not None else None, headers=headers, method=method, ) try: with urllib.request.urlopen(req) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw else {}, resp.status except urllib.error.HTTPError as e: raw = e.read().decode("utf-8") return json.loads(raw) if raw else {}, e.code @app.route("/api/directus/auth/login", methods=["POST"]) def directus_auth_login(): """Proxy: Directus-Login ohne CORS-Probleme.""" data, status = _directus("POST", "/auth/login", token=None, body=request.get_json()) return jsonify(data), status @app.route("/api/directus/pictures", methods=["GET"]) def directus_pictures(): """Proxy: Directus-Bilder nach Status filtern.""" token = request.headers.get("Authorization", "") pic_status = request.args.get("status", "new") data, status = _directus("GET", f"/items/pictures?filter[status][_eq]={pic_status}&fields=id,media,status&sort=date_created", token) return jsonify(data), status @app.route("/api/directus/pictures/", methods=["PATCH"]) def directus_picture(pic_id): """Proxy: Bild-Status aktualisieren.""" token = request.headers.get("Authorization", "") data, status = _directus("PATCH", f"/items/pictures/{pic_id}", token, body=request.get_json()) return jsonify(data), status @app.route("/api/directus/objects", methods=["GET", "POST"]) def directus_objects(): """Proxy: Objekte laden (GET) oder anlegen (POST).""" token = request.headers.get("Authorization", "") if request.method == "GET": picture_id = request.args.get("picture_id", "") fields = "id,selections,user_notes,parent,status,picture" path = f"/items/objects?filter[picture][_eq]={picture_id}&fields={fields}&sort=date_created" data, status = _directus("GET", path, token) return jsonify(data), status else: data, status = _directus("POST", "/items/objects", token, body=request.get_json()) return jsonify(data), status @app.route("/api/directus/objects/", methods=["PATCH", "DELETE"]) def directus_object(obj_id): """Proxy: Objekt aktualisieren (PATCH) oder löschen (DELETE).""" token = request.headers.get("Authorization", "") if request.method == "PATCH": data, status = _directus("PATCH", f"/items/objects/{obj_id}", token, body=request.get_json()) else: data, status = _directus("DELETE", f"/items/objects/{obj_id}", token) return jsonify(data), status @app.route("/api/images", methods=["GET"]) def list_images(): """ Gibt alle Bilder im pictures-Ordner zurück. Query-Parameter: ?mode=draw (Standard) oder ?mode=generate draw → Bilder ohne _saved-Suffix generate → Bilder mit _saved-Suffix """ mode = request.args.get("mode", "draw") PICTURES_DIR.mkdir(parents=True, exist_ok=True) if mode == "generate": image_paths = sorted( [f for f in PICTURES_DIR.iterdir() if f.is_file() and f.stem.endswith("_saved")], key=lambda p: p.stat().st_mtime, ) else: image_paths = sorted( [f for f in PICTURES_DIR.iterdir() if f.is_file() and not f.stem.endswith("_saved")], key=lambda p: p.stat().st_mtime, ) return jsonify({"images": [p.name for p in image_paths]}) REACT_BUILD_DIR = BASE_DIR / "static" / "react" def _serve_spa(): """Liefert die React SPA index.html für alle Frontend-Routen.""" return send_from_directory(REACT_BUILD_DIR, "index.html") @app.route("/") @app.route("/draw") @app.route("/generate") def serve_spa(): return _serve_spa() @app.route("/assets/") def serve_react_assets(filename: str): """Liefert JS/CSS-Assets aus dem Vite-Build.""" return send_from_directory(REACT_BUILD_DIR / "assets", filename) @app.route("/pictures/") def serve_picture(filename: str): # Statisches Ausliefern der Originalbilder return send_from_directory(PICTURES_DIR, filename) @app.route("/objects_image/") def serve_object_image(filename: str): # Ausliefern der gespeicherten Ausschnitt-Bilder return send_from_directory(OBJECTS_DIR, filename) @app.route("/api/objects", methods=["GET"]) def list_objects_for_image(): """ Gibt alle gespeicherten Objekte (Ausschnitte) zu einem Quellbild zurück. Query-Parameter: ?filename=testbild.png """ filename = request.args.get("filename") if not filename: return jsonify({"error": "Missing filename query parameter"}), 400 # Für *_saved-Bilder auch Objekte berücksichtigen, deren source_filename # noch auf den ursprünglichen Namen ohne "_saved" zeigt (Kompatibilität) base_filename = filename legacy_filename = None if base_filename.endswith(".png") or base_filename.endswith(".jpg") or base_filename.endswith(".jpeg") or base_filename.endswith(".webp"): stem = base_filename.rsplit(".", 1)[0] suffix = base_filename.rsplit(".", 1)[1] if stem.endswith("_saved"): legacy_filename = f"{stem[:-6]}.{suffix}" OBJECTS_DIR.mkdir(parents=True, exist_ok=True) objects = [] for meta_file in OBJECTS_DIR.glob("*.txt"): try: meta = json.loads(meta_file.read_text(encoding="utf-8")) except Exception: continue src_name = meta.get("source_filename") if src_name != filename and (legacy_filename is None or src_name != legacy_filename): continue objects.append( { "id": meta.get("id"), "image_file": meta.get("image_file"), "title_de": meta.get("title_de", ""), "position_de": meta.get("position_de", ""), "action_de": meta.get("action_de", ""), "condition_de": meta.get("condition_de", ""), # Von KI erzeugte Details (optional) "label_en": meta.get("label_en"), "label_de": meta.get("label_de"), "label_se": meta.get("label_se"), "color_en": meta.get("color_en"), "adjective_en": meta.get("adjective_en"), "action_verb_en": meta.get("action_verb_en"), "preposition_en": meta.get("preposition_en"), "relative_position_en": meta.get("relative_position_en"), "season_en": meta.get("season_en"), "mode": meta.get("mode"), "bbox": meta.get("bbox"), "polygon": meta.get("polygon"), "hierarchy": meta.get("hierarchy", 1), "parent_id": meta.get("parent_id"), "created_at": meta.get("created_at"), } ) # Älteste zuerst sortieren (1 = ältestes Objekt) objects.sort(key=lambda o: o.get("created_at") or "") # Laufende Nummer je Bild (1..n) nach obiger Sortierung for idx, obj in enumerate(objects, start=1): obj["index"] = idx return jsonify({"objects": objects}) @app.route("/api/object//hierarchy", methods=["POST"]) def update_object_hierarchy(obj_id: str): """ Aktualisiert die Hierarchie (1, 2 oder 3) eines gespeicherten Objekts. """ data = request.get_json(force=True, silent=True) or {} try: hierarchy = int(data.get("hierarchy")) except (TypeError, ValueError): return jsonify({"error": "Invalid hierarchy"}), 400 if hierarchy not in (1, 2, 3): return jsonify({"error": "Hierarchy must be 1, 2 or 3"}), 400 meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 meta["hierarchy"] = hierarchy meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({"id": obj_id, "hierarchy": hierarchy}) @app.route("/api/object/", methods=["POST"]) def update_object_meta(obj_id: str): """ Aktualisiert Text-Metadaten (title_de, position_de, action_de, condition_de) eines gespeicherten Objekts. """ data = request.get_json(force=True, silent=True) or {} meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 for key in ("title_de", "position_de", "action_de", "condition_de"): if key in data: meta[key] = data.get(key, "") meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({k: meta.get(k) for k in ("id", "title_de", "position_de", "action_de", "condition_de")}) @app.route("/api/object//parent", methods=["POST"]) def update_object_parent(obj_id: str): """ Aktualisiert die Parent-Relation eines Objekts. Erwartet JSON: { "parent_id": "" | null } """ data = request.get_json(force=True, silent=True) or {} parent_id = data.get("parent_id") meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 # Optional: prüfen, ob parent_id existiert und zum selben Bild gehört if parent_id: parent_meta_path = OBJECTS_DIR / f"{parent_id}.txt" if not parent_meta_path.exists(): return jsonify({"error": "Parent object not found"}), 400 try: parent_meta = json.loads(parent_meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read parent meta file"}), 500 if parent_meta.get("source_filename") != meta.get("source_filename"): return jsonify({"error": "Parent must belong to same source image"}), 400 meta["parent_id"] = parent_id or None meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({"id": obj_id, "parent_id": meta.get("parent_id")}) @app.route("/api/object//generate_details", methods=["POST"]) def generate_object_details(obj_id: str): """ Erzeugt mit Llama 3.2 Vision zusätzliche englische Metadaten zu einem Objekt auf Basis des ausgeschnittenen Objektbildes + vorhandener deutscher Felder. """ meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 image_file = meta.get("image_file") if not image_file: return jsonify({"error": "Object has no image_file"}), 400 image_path = OBJECTS_DIR / image_file if not image_path.exists(): return jsonify({"error": "Object image not found"}), 404 # Prompt laden prompt_template = read_prompt( PROMPTS_DIR / "create_details.txt", "Analyze this object and return one JSON object with the requested fields.", ) title_de = meta.get("title_de", "") or "" action_de = meta.get("action_de", "") or "" condition_de = meta.get("condition_de", "") or "" # Platzhalter im Prompt ersetzen (einfache .format-Verwendung) try: prompt = prompt_template.format( title_de=title_de, action_de=action_de, condition_de=condition_de, ) except Exception: # Falls das Format fehlschlägt, einfach Rohprompt + Kontext anhängen prompt = ( f"{prompt_template}\n\n" f"Titel (Deutsch): {title_de}\n" f"Status (Deutsch): {action_de}\n" f"Zustand (Deutsch): {condition_de}\n" ) # Bild als Bytes laden try: img_bytes = image_path.read_bytes() except Exception: return jsonify({"error": "Could not read object image"}), 500 try: response = ollama.chat( model="llama3.2-vision", messages=[{"role": "user", "content": prompt, "images": [img_bytes]}], format="json", options={"temperature": 0}, ) except Exception as e: return jsonify({"error": f"LLM request failed: {e}"}), 500 content = response.get("message", {}).get("content") if not content: return jsonify({"error": "Empty response from LLM"}), 500 try: ai_data = json.loads(content) except Exception: # Fallback: versuchen, das erste JSON-Objekt zu extrahieren try: start = content.index("{") end = content.rindex("}") + 1 ai_data = json.loads(content[start:end]) except Exception: return jsonify({"error": "Could not parse JSON from LLM response"}), 500 # Debug-Ausgabe des rohen AI-Outputs try: print(f"[generate_details] Raw AI data for {obj_id}: {json.dumps(ai_data, ensure_ascii=False)}") except Exception: print(f"[generate_details] Raw AI data for {obj_id} (non-serializable): {ai_data!r}") # Erwartete Felder übernehmen (gemäß neuem Prompt) fields = [ "label_en", "label_de", "label_se", "color_en", "adjective_en", "action_verb_en", "preposition_en", "relative_position_en", "season_en", ] for key in fields: if key in ai_data: meta[key] = ai_data.get(key) # Metadatei zurückschreiben meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") # Nur die neuen Felder zurückgeben (None als leeren String konvertieren) result = {key: (meta.get(key) or "") for key in fields} try: print(f"[generate_details] Stored meta for {obj_id}: {json.dumps(result, ensure_ascii=False)}") except Exception: print(f"[generate_details] Stored meta for {obj_id}: {result!r}") return jsonify(result) def load_sentences_for_object(obj_id: str): """ Lädt alle gespeicherten Sätze für ein Objekt aus sentence_object/.txt. Rückgabe: Liste von Dicts mit mindestens question_en, answer_en, created_at. """ SENTENCE_DIR.mkdir(parents=True, exist_ok=True) sentence_path = SENTENCE_DIR / f"{obj_id}.txt" if not sentence_path.exists(): return [] try: data = json.loads(sentence_path.read_text(encoding="utf-8")) if isinstance(data, list): return data if isinstance(data, dict): # Fallback: einzelnes Objekt in Liste wrappen return [data] return [] except Exception: return [] @app.route("/api/object//sentences", methods=["GET"]) def get_object_sentences(obj_id: str): """ Gibt alle bisher erzeugten KI-Sätze zu einem Objekt zurück. """ sentences = load_sentences_for_object(obj_id) return jsonify({"object_id": obj_id, "sentences": sentences}) @app.route("/api/object//generate_sentence", methods=["POST"]) def generate_object_sentence(obj_id: str): """ Erzeugt mit Llama 3.1 einen neuen englischen Frage-Antwort-Satz zu einem Objekt basierend auf den KI-Details und bisherigen Sätzen. Speichert alle Sätze in sentence_object/.txt. """ # Objekt-Metadaten laden (inkl. KI-Details) meta_path = OBJECTS_DIR / f"{obj_id}.txt" if not meta_path.exists(): return jsonify({"error": "Object not found"}), 404 try: meta = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return jsonify({"error": "Could not read meta file"}), 500 # Relevante Felder für den Satz (KI-Details) details = { "label_en": meta.get("label_en"), "label_de": meta.get("label_de"), "label_se": meta.get("label_se"), "color_en": meta.get("color_en"), "adjective_en": meta.get("adjective_en"), "action_verb_en": meta.get("action_verb_en"), "preposition_en": meta.get("preposition_en"), "relative_position_en": meta.get("relative_position_en"), "season_en": meta.get("season_en"), "title_de": meta.get("title_de"), "position_de": meta.get("position_de"), "action_de": meta.get("action_de"), "condition_de": meta.get("condition_de"), } # Bisherige Sätze laden previous_sentences = load_sentences_for_object(obj_id) # Prompt laden prompt_template = read_prompt( PROMPTS_DIR / "create_sentence.txt", "Create one new English question_en and answer_en as JSON, based on the object details and avoiding previous sentences.", ) # Vollständigen Prompt zusammensetzen try: details_json = json.dumps(details, ensure_ascii=False, indent=2) prev_json = json.dumps(previous_sentences, ensure_ascii=False, indent=2) except Exception: details_json = str(details) prev_json = str(previous_sentences) prompt = ( f"{prompt_template}\n\n" f"OBJECT_DETAILS_JSON:\n{details_json}\n\n" f"PREVIOUS_SENTENCES_JSON:\n{prev_json}\n" ) # LLM-Aufruf (nur Text, kein Bild nötig) try: response = ollama.chat( model="llama3.1", messages=[{"role": "user", "content": prompt}], format="json", options={"temperature": 0.2}, ) except Exception as e: return jsonify({"error": f"LLM request failed: {e}"}), 500 content = response.get("message", {}).get("content") if not content: return jsonify({"error": "Empty response from LLM"}), 500 try: ai_data = json.loads(content) except Exception: # Fallback: erstes JSON-Objekt extrahieren try: start = content.index("{") end = content.rindex("}") + 1 ai_data = json.loads(content[start:end]) except Exception: return jsonify({"error": "Could not parse JSON from LLM response"}), 500 question_simple = (ai_data.get("question_simple_en") or "").strip() answer_simple = (ai_data.get("answer_simple_en") or "").strip() question_advanced = (ai_data.get("question_advanced_en") or "").strip() answer_advanced = (ai_data.get("answer_advanced_en") or "").strip() if not question_simple or not answer_simple or not question_advanced or not answer_advanced: return jsonify({"error": "LLM response missing required fields (question_simple_en, answer_simple_en, question_advanced_en, answer_advanced_en)"}), 500 # Neuen Satz mit Timestamp anreichern entry = { "object_id": obj_id, "created_at": datetime.now().isoformat(), "question_simple_en": question_simple, "answer_simple_en": answer_simple, "question_advanced_en": question_advanced, "answer_advanced_en": answer_advanced, } sentences = previous_sentences + [entry] # In Datei speichern SENTENCE_DIR.mkdir(parents=True, exist_ok=True) sentence_path = SENTENCE_DIR / f"{obj_id}.txt" sentence_path.write_text(json.dumps(sentences, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify({"object_id": obj_id, "sentence": entry, "count": len(sentences)}) @app.route("/api/image/save", methods=["POST"]) def save_image(): """ Markiert ein Bild und alle zugehörigen Objekte als 'gespeichert', indem der Dateiname um '_saved' erweitert wird und die Metadaten der Objekte angepasst werden. """ data = request.get_json(force=True, silent=True) or {} filename = data.get("filename") if not filename: return jsonify({"error": "Missing filename"}), 400 src_path = PICTURES_DIR / filename if not src_path.exists(): return jsonify({"error": "Image not found"}), 404 if src_path.stem.endswith("_saved"): return jsonify({"error": "Image already saved"}), 400 new_name = f"{src_path.stem}_saved{src_path.suffix}" dst_path = PICTURES_DIR / new_name # Bild umbenennen src_path.rename(dst_path) # Zugehörige Objekte aktualisieren OBJECTS_DIR.mkdir(parents=True, exist_ok=True) for meta_file in OBJECTS_DIR.glob("*.txt"): try: meta = json.loads(meta_file.read_text(encoding="utf-8")) except Exception: continue if meta.get("source_filename") != filename: continue meta["source_filename"] = new_name meta_file.write_text( json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8", ) return jsonify({"old_name": filename, "new_name": new_name}) @app.route("/api/crop", methods=["POST"]) def crop_image(): """ Erwartet JSON: { "filename": "bild.jpg", "selections": [ { "number": 1, "mode": "rect" | "polygon", "bbox": {"x": 10, "y": 20, "width": 200, "height": 150}, // für rect "polygon": [{"x": 10, "y": 20}, ...] // für polygon }, ... ], "title_de": "...", "position_de": "...", "action_de": "...", "condition_de": "..." } ODER (Legacy-Format für Kompatibilität): { "filename": "bild.jpg", "mode": "rect" | "polygon", "x": 10, "y": 20, "width": 200, "height": 150, // für rect "polygon": [{"x": 10, "y": 20}, ...] // für polygon } """ data = request.get_json(force=True, silent=True) or {} if "filename" not in data: return jsonify({"error": "Missing filename"}), 400 src_path = PICTURES_DIR / data["filename"] if src_path.stem.endswith("_saved"): return jsonify({"error": "Cannot crop on saved image"}), 400 if not src_path.exists(): return jsonify({"error": "Source image not found"}), 404 OBJECTS_DIR.mkdir(parents=True, exist_ok=True) # UUID für Bild + Metadaten obj_id = uuid4().hex timestamp = datetime.now().isoformat() out_image_name = f"{obj_id}{src_path.suffix}" out_image_path = OBJECTS_DIR / out_image_name out_meta_path = OBJECTS_DIR / f"{obj_id}.txt" # Prüfen, ob neues Format (selections) oder Legacy-Format selections = data.get("selections") if selections and isinstance(selections, list) and len(selections) > 0: # Neues Format: mehrere Auswahlen processed_selections = [] with Image.open(src_path) as img: img_w, img_h = img.size for sel in selections: sel_mode = sel.get("mode") if sel_mode == "rect": bbox = sel.get("bbox") if not bbox: continue try: x = int(bbox.get("x", 0)) y = int(bbox.get("y", 0)) w = int(bbox.get("width", 0)) h = int(bbox.get("height", 0)) except (ValueError, TypeError): continue if w <= 0 or h <= 0: continue x2 = min(x + w, img_w) y2 = min(y + h, img_h) x1 = max(0, x) y1 = max(0, y) if x1 >= x2 or y1 >= y2: continue processed_selections.append({ "number": sel.get("number", len(processed_selections) + 1), "mode": "rect", "bbox": {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1}, }) elif sel_mode == "polygon": polygon = sel.get("polygon") if not isinstance(polygon, list) or len(polygon) < 3: continue try: xs = [int(p.get("x", 0)) for p in polygon] ys = [int(p.get("y", 0)) for p in polygon] except (KeyError, TypeError, ValueError): continue min_x = max(min(xs), 0) min_y = max(min(ys), 0) max_x = min(max(xs), img_w) max_y = min(max(ys), img_h) if min_x >= max_x or min_y >= max_y: continue processed_selections.append({ "number": sel.get("number", len(processed_selections) + 1), "mode": "polygon", "polygon": polygon, "bbox": {"x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y}, }) if not processed_selections: return jsonify({"error": "No valid selections provided"}), 400 # Erstes Bild aus erster Auswahl erstellen first_sel = processed_selections[0] try: with Image.open(src_path) as img: if first_sel["mode"] == "rect": bbox = first_sel["bbox"] x1 = bbox["x"] y1 = bbox["y"] x2 = x1 + bbox["width"] y2 = y1 + bbox["height"] cropped = img.crop((x1, y1, x2, y2)) else: # polygon - verwende Bounding-Box für das Bild bbox = first_sel["bbox"] x1 = bbox["x"] y1 = bbox["y"] x2 = x1 + bbox["width"] y2 = y1 + bbox["height"] cropped = img.crop((x1, y1, x2, y2)) # Bild speichern (Format beibehalten) img_format = img.format or "PNG" cropped.save(out_image_path, format=img_format) print(f"[crop_image] Bild gespeichert: {out_image_path} (Größe: {cropped.size}, Format: {img_format})") except Exception as e: print(f"[crop_image] Fehler beim Erstellen des Bildes: {e}") return jsonify({"error": f"Failed to create image: {e}"}), 500 # Metadaten mit allen Auswahlen speichern meta = { "id": obj_id, "created_at": timestamp, "source_filename": data["filename"], "image_file": out_image_name, "hierarchy": 1, "parent_id": None, "title_de": data.get("title_de", ""), "position_de": data.get("position_de", ""), "action_de": data.get("action_de", ""), "condition_de": data.get("condition_de", ""), "selections": processed_selections, } else: # Legacy-Format: einzelne Auswahl (für Rückwärtskompatibilität) mode = data.get("mode", "rect") if mode not in {"rect", "polygon"}: return jsonify({"error": "Invalid mode"}), 400 with Image.open(src_path) as img: img_w, img_h = img.size if mode == "rect": try: x = int(data["x"]) y = int(data["y"]) w = int(data["width"]) h = int(data["height"]) except (ValueError, TypeError, KeyError): return jsonify({"error": "Invalid rectangle coordinates"}), 400 if w <= 0 or h <= 0: return jsonify({"error": "Width and height must be positive"}), 400 x2 = min(x + w, img_w) y2 = min(y + h, img_h) x1 = max(0, x) y1 = max(0, y) if x1 >= x2 or y1 >= y2: return jsonify({"error": "Crop area outside of image"}), 400 cropped = img.crop((x1, y1, x2, y2)) bbox = {"x": x1, "y": y1, "width": x2 - x1, "height": y2 - y1} else: # polygon polygon = data.get("polygon") or [] if not isinstance(polygon, list) or len(polygon) < 3: return jsonify({"error": "Polygon must have at least 3 points"}), 400 try: xs = [int(p["x"]) for p in polygon] ys = [int(p["y"]) for p in polygon] except (KeyError, TypeError, ValueError): return jsonify({"error": "Invalid polygon coordinates"}), 400 min_x = max(min(xs), 0) min_y = max(min(ys), 0) max_x = min(max(xs), img_w) max_y = min(max(ys), img_h) if min_x >= max_x or min_y >= max_y: return jsonify({"error": "Polygon bounding box outside of image"}), 400 cropped = img.crop((min_x, min_y, max_x, max_y)) bbox = { "x": min_x, "y": min_y, "width": max_x - min_x, "height": max_y - min_y, } cropped.save(out_image_path) # Metadaten als JSON in .txt speichern meta = { "id": obj_id, "created_at": timestamp, "source_filename": data["filename"], "mode": mode, "bbox": bbox, "image_file": out_image_name, "hierarchy": 1, "parent_id": None, "title_de": data.get("title_de", ""), "position_de": data.get("position_de", ""), "action_de": data.get("action_de", ""), "condition_de": data.get("condition_de", ""), } if mode == "polygon": meta["polygon"] = data.get("polygon") out_meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8") return jsonify( { "id": obj_id, "image_file": out_image_name, "meta_file": out_meta_path.name, } ) # ── Claude / Directus Helpers ───────────────────────────────────────────────── def _ensure_junction(collection: str, field1: str, field2: str, token: str): """Create a simple M2M junction collection in Directus if it doesn't exist.""" _, status = _directus("GET", f"/collections/{collection}", token) if status == 200: return body = { "collection": collection, "schema": {}, "meta": {"hidden": True, "icon": "import_export"}, "fields": [ { "field": "id", "type": "integer", "schema": {"is_primary_key": True, "has_auto_increment": True}, "meta": {"hidden": True}, }, {"field": field1, "type": "uuid", "schema": {}, "meta": {"hidden": True}}, {"field": field2, "type": "uuid", "schema": {}, "meta": {"hidden": True}}, ], } _directus("POST", "/collections", token, body) def _ensure_link(collection: str, match: dict, payload: dict, token: str): """Insert a junction row only if it doesn't already exist.""" qs = "&".join( f"filter[{k}][_eq]={urllib.parse.quote(str(v), safe='')}" for k, v in match.items() ) data, status = _directus("GET", f"/items/{collection}?{qs}&limit=1", token) if status == 200 and data.get("data"): return # already linked _directus("POST", f"/items/{collection}", token, payload) def _find_or_create_word(title_de: str, level: int, token: str): """Return (word_id, is_new). Creates word with status=draft if missing.""" enc = urllib.parse.quote(title_de, safe="") data, status = _directus( "GET", f"/items/words?filter[title_de][_eq]={enc}&fields=id&limit=1", token ) if status == 200 and data.get("data"): return data["data"][0]["id"], False body = {"status": "draft", "title_de": title_de, "level": level} data, status = _directus("POST", "/items/words", token, body) if status in (200, 201): return data["data"]["id"], True raise RuntimeError(f"Word creation failed ({status}): {data}") def _find_or_create_question(question_de: str, answer_de: str, level: int, short_answer_id, obj_id: str, token: str): """Return (question_id, is_new). Creates question with status=draft if missing.""" enc = urllib.parse.quote(question_de, safe="") data, status = _directus( "GET", f"/items/questions?filter[question_de][_eq]={enc}&fields=id&limit=1", token, ) if status == 200 and data.get("data"): return data["data"][0]["id"], False body = { "status": "draft", "question_de": question_de, "answer_de": answer_de, "level": level, "object": obj_id, } if short_answer_id: body["short_answer"] = short_answer_id data, status = _directus("POST", "/items/questions", token, body) if status in (200, 201): return data["data"]["id"], True raise RuntimeError(f"Question creation failed ({status}): {data}") # ── Generate & Publish endpoints ────────────────────────────────────────────── @app.route("/api/object//generate_questions", methods=["POST"]) def generate_questions(obj_id: str): """ 1. Holt Objekt + Elternobjekt aus Directus 2. Füllt Prompt-Platzhalter 3. Ruft Claude Haiku auf 4. Speichert Wörter + Fragen als Entwurf in Directus """ token = request.headers.get("Authorization", "") body = request.get_json(force=True, silent=True) or {} prompt_template = (body.get("prompt") or "").strip() if not prompt_template: return jsonify({"error": "Missing prompt"}), 400 if not ANTHROPIC_API_KEY: return jsonify({"error": "ANTHROPIC_API_KEY not configured on server"}), 500 # Objekt laden obj_resp, s = _directus("GET", f"/items/objects/{obj_id}?fields=id,user_notes,parent", token) if s != 200: return jsonify({"error": "Object not found"}), 404 obj = obj_resp.get("data") or {} # Elternobjekt laden parent_notes = "" if obj.get("parent"): p_resp, _ = _directus("GET", f"/items/objects/{obj['parent']}?fields=id,user_notes", token) parent_notes = ((p_resp.get("data") or {}).get("user_notes") or "") # Platzhalter ersetzen prompt = ( prompt_template .replace("{user-notes_object}", obj.get("user_notes") or "") .replace("{user-notes_parentobject}", parent_notes) ) # Claude Haiku aufrufen try: client = _anthropic_sdk.Anthropic(api_key=ANTHROPIC_API_KEY) msg = client.messages.create( model="claude-haiku-4-5-20251001", max_tokens=4096, messages=[{"role": "user", "content": prompt}], ) raw = msg.content[0].text except Exception as e: return jsonify({"error": f"Claude API error: {e}"}), 500 # JSON parsen try: ai = json.loads(raw) except Exception: try: start = raw.index("{") end = raw.rindex("}") + 1 ai = json.loads(raw[start:end]) except Exception: return jsonify({"error": "Invalid JSON from AI", "raw": raw[:400]}), 500 levels = ai.get("levels", []) if not levels: return jsonify({"error": "No levels in AI response"}), 500 # Junction-Collections sicherstellen (einmalig) for col, f1, f2 in [ ("words_objects", "words_id", "objects_id"), ("questions_objects", "questions_id", "objects_id"), ("questions_distractor_words", "questions_id", "words_id"), ]: _ensure_junction(col, f1, f2, token) stats = { "words_created": 0, "words_linked": 0, "questions_created": 0, "questions_linked": 0, } # Alle eindeutigen Wörter aus allen Leveln vorab sammeln und einmalig laden all_words_by_level: dict[str, int] = {} # title_de → first level seen for lvl in levels: level = int(lvl.get("level") or 1) for w in lvl.get("words", []) + lvl.get("distractor_words", []) + [lvl.get("short_answer", "")]: w = str(w).strip() if w and w not in all_words_by_level: all_words_by_level[w] = level # Wörter einmalig anlegen / finden (globaler Cache über alle Level) global_word_map: dict[str, str] = {} # title_de → id for w, lvl_num in all_words_by_level.items(): try: wid, is_new = _find_or_create_word(w, lvl_num, token) global_word_map[w] = wid _ensure_link( "words_objects", {"words_id": wid, "objects_id": obj_id}, {"words_id": wid, "objects_id": obj_id}, token, ) stats["words_created" if is_new else "words_linked"] += 1 except Exception as e: print(f"[generate_questions] word error '{w}': {e}") for lvl in levels: level = int(lvl.get("level") or 1) q_de = (lvl.get("question") or "").strip() a_de = (lvl.get("answer") or "").strip() short_text = (lvl.get("short_answer") or "").strip() words_list = [str(w).strip() for w in lvl.get("words", []) if str(w).strip()] distractor_list = [str(w).strip() for w in lvl.get("distractor_words", []) if str(w).strip()] if not q_de: continue short_answer_id = global_word_map.get(short_text) if short_text else None # Frage anlegen / verknüpfen try: q_id, q_is_new = _find_or_create_question(q_de, a_de, level, short_answer_id, obj_id, token) except Exception as e: print(f"[generate_questions] question error level {level}: {e}") continue stats["questions_created" if q_is_new else "questions_linked"] += 1 # Frage ↔ Objekt _ensure_link( "questions_objects", {"questions_id": q_id, "objects_id": obj_id}, {"questions_id": q_id, "objects_id": obj_id}, token, ) # related_words for w in words_list: if w in global_word_map: _ensure_link( "questions_words", {"questions_id": q_id, "words_id": global_word_map[w]}, {"questions_id": q_id, "words_id": global_word_map[w]}, token, ) # distractor_words for w in distractor_list: if w in global_word_map: _ensure_link( "questions_distractor_words", {"questions_id": q_id, "words_id": global_word_map[w]}, {"questions_id": q_id, "words_id": global_word_map[w]}, token, ) print(f"[generate_questions] obj={obj_id} stats={stats}") return jsonify({"ok": True, "object_id": obj_id, "stats": stats}) @app.route("/api/object//publish_questions", methods=["POST"]) def publish_questions(obj_id: str): """Ändert Status aller verknüpften Entwurfs-Fragen und -Wörter auf 'published'.""" token = request.headers.get("Authorization", "") # Verknüpfte Fragen q_resp, _ = _directus( "GET", f"/items/questions_objects?filter[objects_id][_eq]={obj_id}&fields=questions_id&limit=200", token, ) q_ids = [e["questions_id"] for e in (q_resp.get("data") or [])] # Verknüpfte Wörter w_resp, _ = _directus( "GET", f"/items/words_objects?filter[objects_id][_eq]={obj_id}&fields=words_id&limit=2000", token, ) w_ids = [e["words_id"] for e in (w_resp.get("data") or [])] published_q = 0 published_w = 0 for q_id in q_ids: _, s = _directus("PATCH", f"/items/questions/{q_id}", token, {"status": "published"}) if s == 200: published_q += 1 for w_id in w_ids: _, s = _directus("PATCH", f"/items/words/{w_id}", token, {"status": "published"}) if s == 200: published_w += 1 return jsonify({ "ok": True, "published_questions": published_q, "published_words": published_w, }) @app.route("/api/object//questions", methods=["GET"]) def get_object_questions_list(obj_id: str): """Gibt alle verknüpften Fragen eines Objekts zurück (2-Schritt-Query).""" token = request.headers.get("Authorization", "") junc, _ = _directus("GET", f"/items/questions_objects?filter[objects_id][_eq]={obj_id}&fields=questions_id&limit=200", token) q_ids = [e["questions_id"] for e in (junc.get("data") or []) if e.get("questions_id")] if not q_ids: return jsonify({"data": []}) ids_param = urllib.parse.quote(",".join(q_ids), safe="") q_data, _ = _directus("GET", f"/items/questions?filter[id][_in]={ids_param}&fields=id,question_de,answer_de,level,status&limit=200", token) items = sorted(q_data.get("data") or [], key=lambda x: x.get("level") or 0) return jsonify({"data": items}) @app.route("/api/object//words", methods=["GET"]) def get_object_words_list(obj_id: str): """Gibt alle verknüpften Wörter eines Objekts zurück (2-Schritt-Query).""" token = request.headers.get("Authorization", "") junc, _ = _directus("GET", f"/items/words_objects?filter[objects_id][_eq]={obj_id}&fields=words_id&limit=2000", token) w_ids = [e["words_id"] for e in (junc.get("data") or []) if e.get("words_id")] if not w_ids: return jsonify({"data": []}) ids_param = urllib.parse.quote(",".join(w_ids), safe="") w_data, _ = _directus("GET", f"/items/words?filter[id][_in]={ids_param}&fields=id,title_de,level,status&limit=2000", token) items = sorted(w_data.get("data") or [], key=lambda x: x.get("title_de") or "") return jsonify({"data": items}) @app.route("/api/question/", methods=["DELETE"]) def delete_question_item(q_id: str): """Löscht eine Frage aus Directus.""" token = request.headers.get("Authorization", "") _, status = _directus("DELETE", f"/items/questions/{q_id}", token) if status in (200, 204): return jsonify({"ok": True}) return jsonify({"error": "Delete failed"}), status @app.route("/api/word/", methods=["DELETE"]) def delete_word_item(w_id: str): """Löscht ein Wort aus Directus.""" token = request.headers.get("Authorization", "") _, status = _directus("DELETE", f"/items/words/{w_id}", token) if status in (200, 204): return jsonify({"ok": True}) return jsonify({"error": "Delete failed"}), status @app.route("/api/setup-schema", methods=["POST"]) def setup_directus_schema(): """ Einmalig ausführen: Konfiguriert alle M2M-Relationen in Directus. Idempotent – bereits vorhandene Felder/Relationen werden übersprungen. """ token = request.headers.get("Authorization", "") results = [] def do(label, method, path, body=None): data, status = _directus(method, path, token, body) ok = status in (200, 201, 204) results.append({"label": label, "status": status, "ok": ok, "err": None if ok else (data.get("errors") or data)}) return ok # ── Fix M2O: questions.object → objects ────────────────────────────────── do("relation questions.object→objects", "POST", "/relations", { "collection": "questions", "field": "object", "related_collection": "objects", "schema": {"on_delete": "SET NULL"}, "meta": {"one_deselect_action": "nullify"}, }) # Update display template so the object label shows do("update field questions.object template", "PATCH", "/fields/questions/object", { "meta": {"options": {"template": "{{user_notes}}"}} }) # ── Fix M2O: questions.short_answer → words ─────────────────────────────── do("relation questions.short_answer→words", "POST", "/relations", { "collection": "questions", "field": "short_answer", "related_collection": "words", "schema": {"on_delete": "SET NULL"}, "meta": {"one_deselect_action": "nullify"}, }) # ── M2M: questions ↔ objects via questions_objects ──────────────────────── do("field questions.linked_objects", "POST", "/fields/questions", { "field": "linked_objects", "type": "alias", "meta": {"special": ["m2m"], "interface": "list-m2m", "options": {"template": "{{objects_id.user_notes}}"}, "note": "Verknüpfte Objekte"}, }) do("field objects.linked_questions", "POST", "/fields/objects", { "field": "linked_questions", "type": "alias", "meta": {"special": ["m2m"], "interface": "list-m2m", "options": {"template": "{{questions_id.question_de}}"}, "note": "Verknüpfte Fragen"}, }) do("relation questions_objects.questions_id→questions", "POST", "/relations", { "collection": "questions_objects", "field": "questions_id", "related_collection": "questions", "schema": {"on_delete": "CASCADE"}, "meta": {"junction_field": "objects_id", "one_field": "linked_objects", "one_deselect_action": "nullify"}, }) do("relation questions_objects.objects_id→objects", "POST", "/relations", { "collection": "questions_objects", "field": "objects_id", "related_collection": "objects", "schema": {"on_delete": "CASCADE"}, "meta": {"junction_field": "questions_id", "one_field": "linked_questions", "one_deselect_action": "nullify"}, }) # ── M2M: words ↔ objects via words_objects ──────────────────────────────── do("field words.linked_objects", "POST", "/fields/words", { "field": "linked_objects", "type": "alias", "meta": {"special": ["m2m"], "interface": "list-m2m", "options": {"template": "{{objects_id.user_notes}}"}, "note": "Verknüpfte Objekte"}, }) do("field objects.linked_words", "POST", "/fields/objects", { "field": "linked_words", "type": "alias", "meta": {"special": ["m2m"], "interface": "list-m2m", "options": {"template": "{{words_id.title_de}}"}, "note": "Verknüpfte Wörter"}, }) do("relation words_objects.words_id→words", "POST", "/relations", { "collection": "words_objects", "field": "words_id", "related_collection": "words", "schema": {"on_delete": "CASCADE"}, "meta": {"junction_field": "objects_id", "one_field": "linked_objects", "one_deselect_action": "nullify"}, }) do("relation words_objects.objects_id→objects", "POST", "/relations", { "collection": "words_objects", "field": "objects_id", "related_collection": "objects", "schema": {"on_delete": "CASCADE"}, "meta": {"junction_field": "words_id", "one_field": "linked_words", "one_deselect_action": "nullify"}, }) # ── M2M: questions ↔ words (distractor) via questions_distractor_words ──── do("field questions.distractor_words", "POST", "/fields/questions", { "field": "distractor_words", "type": "alias", "meta": {"special": ["m2m"], "interface": "list-m2m", "options": {"template": "{{words_id.title_de}}"}, "note": "Ablenker-Wörter (nicht in Frage/Antwort)"}, }) do("relation questions_distractor_words.questions_id→questions", "POST", "/relations", { "collection": "questions_distractor_words", "field": "questions_id", "related_collection": "questions", "schema": {"on_delete": "CASCADE"}, "meta": {"junction_field": "words_id", "one_field": "distractor_words", "one_deselect_action": "nullify"}, }) do("relation questions_distractor_words.words_id→words", "POST", "/relations", { "collection": "questions_distractor_words", "field": "words_id", "related_collection": "words", "schema": {"on_delete": "CASCADE"}, "meta": {"junction_field": "questions_id", "one_deselect_action": "nullify"}, }) # ── Backref: words → questions via existing questions_words junction ─────── do("field words.linked_questions", "POST", "/fields/words", { "field": "linked_questions", "type": "alias", "meta": {"special": ["m2m"], "interface": "list-m2m", "options": {"template": "{{questions_id.question_de}}"}, "note": "Fragen in denen dieses Wort vorkommt"}, }) # Patch the existing questions_words relation to add the one_field backref on words do("patch relation questions_words.words_id one_field", "PATCH", "/relations/questions_words/words_id", { "meta": {"one_field": "linked_questions", "one_deselect_action": "nullify"} }) failed = [r for r in results if not r["ok"]] return jsonify({"ok": len(failed) == 0, "total": len(results), "failed": len(failed), "results": results}) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000, debug=True)