From 6af2428df5fcf2efc6ba1414166a5f0e1e0be22e Mon Sep 17 00:00:00 2001 From: admin Date: Wed, 10 Jun 2026 20:52:11 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20automatische=20Content-Pipeline=20(rele?= =?UTF-8?q?ase=20=E2=86=92=20pairs=20=E2=86=92=20=C3=BCbersetzen=20?= =?UTF-8?q?=E2=86=92=20audio=20=E2=86=92=20ready)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - pictures.pipeline_* Spalten + app_settings Tabelle (Migration) - lib/placeholders.js: Placeholder-Auflösung; TTS spricht keine UUIDs mehr - lib/pairContent.js: geteilte Pair-Logik (Readiness mit Skip-Optionen) - lib/generatePairs.js: Claude-Generierung (konfigurierbare Anzahl, nur Nomen/Adjektive bei word-Pairs) + serverseitige Persistenz inkl. object_pairs - lib/pipeline.js: In-Process-Runner, idempotente Schritte, Boot-Resume - routes/pipeline.js: release/retry/overview/bundle/settings + Bild-Publish (kaskadiert Fragen/Statements/Pairs/Wörter/Objekte/Bild) Co-Authored-By: Claude Fable 5 --- src/db-migrate.js | 32 ++++ src/index.js | 8 +- src/lib/generatePairs.js | 179 +++++++++++++++++++++ src/lib/pairContent.js | 156 ++++++++++++++++++ src/lib/pipeline.js | 337 +++++++++++++++++++++++++++++++++++++++ src/lib/placeholders.js | 18 +++ src/lib/translate.js | 2 +- src/routes/audios.js | 7 + src/routes/pairs.js | 151 +----------------- src/routes/pipeline.js | 207 ++++++++++++++++++++++++ 10 files changed, 946 insertions(+), 151 deletions(-) create mode 100644 src/lib/generatePairs.js create mode 100644 src/lib/pairContent.js create mode 100644 src/lib/pipeline.js create mode 100644 src/lib/placeholders.js create mode 100644 src/routes/pipeline.js diff --git a/src/db-migrate.js b/src/db-migrate.js index d000ff9..ce73df6 100644 --- a/src/db-migrate.js +++ b/src/db-migrate.js @@ -603,6 +603,38 @@ async function migrate() { ON CONFLICT (language) DO NOTHING `).catch(() => {}); + // ── Content-Pipeline: Job-Tracking direkt auf der Picture-Zeile ────────────── + await query(`ALTER TABLE pictures ADD COLUMN IF NOT EXISTS pipeline_status TEXT NOT NULL DEFAULT 'none'`).catch(() => {}); + await query(`ALTER TABLE pictures DROP CONSTRAINT IF EXISTS pictures_pipeline_status_check`).catch(() => {}); + await query(` + ALTER TABLE pictures ADD CONSTRAINT pictures_pipeline_status_check + CHECK (pipeline_status IN ('none', 'queued', 'running', 'failed', 'ready', 'published')) + `).catch(() => {}); + await query(`ALTER TABLE pictures ADD COLUMN IF NOT EXISTS pipeline_step TEXT`).catch(() => {}); + await query(`ALTER TABLE pictures ADD COLUMN IF NOT EXISTS pipeline_progress JSONB`).catch(() => {}); + await query(`ALTER TABLE pictures ADD COLUMN IF NOT EXISTS pipeline_error TEXT`).catch(() => {}); + await query(`ALTER TABLE pictures ADD COLUMN IF NOT EXISTS pipeline_started_at TIMESTAMPTZ`).catch(() => {}); + await query(`ALTER TABLE pictures ADD COLUMN IF NOT EXISTS pipeline_finished_at TIMESTAMPTZ`).catch(() => {}); + + // app_settings — generischer Key/Value-Store (JSONB) für Konfiguration + await query(` + CREATE TABLE IF NOT EXISTS app_settings ( + key TEXT PRIMARY KEY, + value JSONB NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `); + await query(` + DROP TRIGGER IF EXISTS app_settings_updated_at ON app_settings; + CREATE TRIGGER app_settings_updated_at + BEFORE UPDATE ON app_settings + FOR EACH ROW EXECUTE FUNCTION update_updated_at() + `); + await query(` + INSERT INTO app_settings (key, value) VALUES ('pipeline.pairs_per_object', '5'::jsonb) + ON CONFLICT (key) DO NOTHING + `).catch(() => {}); + // ── Migrate old {{uuid}} placeholders → new {{label.w:uuid}} / {{label.o:uuid}} ── await migratePlaceholders(); diff --git a/src/index.js b/src/index.js index 0e715cb..d499672 100644 --- a/src/index.js +++ b/src/index.js @@ -43,6 +43,7 @@ app.use('/api/users', auth, require('./routes/users')); app.use('/api/audios', auth, require('./routes/audios')); app.use('/api/tts-settings', auth, require('./routes/tts-settings')); app.use('/api/claude', auth, require('./routes/claude')); +app.use('/api/pipeline', auth, require('./routes/pipeline')); // 404 app.use((req, res) => { @@ -56,5 +57,10 @@ app.use((err, req, res, next) => { }); migrate() - .then(() => app.listen(PORT, '0.0.0.0', () => console.log(`snakkimo-API running on port ${PORT}`))) + .then(() => { + app.listen(PORT, '0.0.0.0', () => console.log(`snakkimo-API running on port ${PORT}`)); + // Hängengebliebene Pipeline-Läufe (z.B. nach Redeploy) wieder aufnehmen + require('./lib/pipeline').resumePending() + .catch(err => console.error('Pipeline-Resume fehlgeschlagen:', err)); + }) .catch(err => { console.error('Migration failed:', err); process.exit(1); }); diff --git a/src/lib/generatePairs.js b/src/lib/generatePairs.js new file mode 100644 index 0000000..401485f --- /dev/null +++ b/src/lib/generatePairs.js @@ -0,0 +1,179 @@ +// Pair-Generierung via Claude (Vision) + serverseitige Persistenz. +// Genutzt von lib/pipeline.js (Automatik) und routes/claude.js (manueller Endpoint). +const { query } = require('../db'); + +const ANTHROPIC_API_URL = 'https://api.anthropic.com/v1/messages'; +const GENERATE_MODEL = process.env.GENERATE_MODEL || 'claude-haiku-4-5-20251001'; + +// ── Claude-Call: Pairs für EIN Objekt eines Bildes generieren ──────────────── +// objects: [{ id, words: [{titel_de,titel_en}], selections: [{points:[{x,y}]}] }] +// Liefert ein validiertes Array von Pair-Vorschlägen (persistiert nichts). +async function generatePairsForObject({ imageUrl, objects, selectedObjectId, count = 5 }) { + const apiKey = process.env.ANTHROPIC_API_KEY; + if (!apiKey) { const e = new Error('ANTHROPIC_API_KEY nicht konfiguriert'); e.status = 500; throw e; } + count = Math.min(Math.max(parseInt(count) || 5, 1), 40); + + const objectsDesc = objects.map((obj, i) => { + const words = (obj.words || []).map(w => w.titel_de || w.titel_en).filter(Boolean).join(', '); + const isSelected = obj.id === selectedObjectId; + let posStr = ''; + if (obj.selections?.[0]?.points?.length) { + const pts = obj.selections[0].points; + const cx = (pts.reduce((s, p) => s + p.x, 0) / pts.length * 100).toFixed(0); + const cy = (pts.reduce((s, p) => s + p.y, 0) / pts.length * 100).toFixed(0); + posStr = ` (ca. ${cx}% von links, ${cy}% von oben)`; + } + return `- Objekt ${i + 1}${isSelected ? ' [DIESES OBJEKT]' : ''}: ${words || '(unbenannt)'}${posStr}`; + }).join('\n'); + + const userPrompt = `Analysiere das Bild. Folgende Objekte sind markiert:\n${objectsDesc}\n\n` + + `Erstelle Sprachlernmaterial für das Objekt [DIESES OBJEKT] auf Deutsch — natürliche Sätze wie in einem echten Gespräch.\n\n` + + `Antworte NUR mit gültigem JSON ohne Markdown:\n{"pairs":[...]}\n\n` + + `Jedes Pair braucht: "type" und "difficulty" ("easy" oder "medium"). Feldregeln:\n` + + `- type "text": {"type":"text","difficulty":"...","positive":"Aussage."}\n` + + `- type "yes_no": {"type":"yes_no","difficulty":"...","question":"Frage?","answer":true}\n` + + `- type "question": {"type":"question","difficulty":"...","question":"Frage?","positive":"Positive Aussage.","negative":"Negative Aussage."}\n` + + `- type "word": {"type":"word","difficulty":"...","question":"Frage?","positive_words":[{"w":"Wort1","pos":"noun"}],"negative_words":[{"w":"Wort2","pos":"adjective"},{"w":"Wort3","pos":"noun"},{"w":"Wort4","pos":"noun"}]}\n\n` + + `Erstelle genau ${count} Pairs, möglichst gleichmäßig verteilt über die 4 Typen (text, yes_no, question, word) ` + + `und gemischt aus difficulty "easy" (max 8 Wörter, für Kinder) und "medium" (8–15 Wörter, für Teenager). ` + + `Bei yes_no: mix aus answer:true und answer:false. Bei word: positive_words 1–3 passende Wörter, negative_words genau 3 falsche Wörter.\n\n` + + `Regeln: Alle Sätze und Wörter auf Deutsch. Sätze müssen natürlich klingen. Keine Wiederholungen. ` + + `Wörter beim type "word" sind AUSSCHLIESSLICH Nomen ("pos":"noun") oder Adjektive ("pos":"adjective") — ` + + `KEINE Verben, Pronomen, Artikel, Präpositionen oder Funktionswörter. Gib für jedes Wort das "pos"-Feld an.`; + + const res = await fetch(ANTHROPIC_API_URL, { + method: 'POST', + headers: { 'Content-Type': 'application/json', 'x-api-key': apiKey, 'anthropic-version': '2023-06-01' }, + body: JSON.stringify({ + model: GENERATE_MODEL, + max_tokens: 8000, + system: 'Du bist ein Deutsch-Sprachlernassistent. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown-Codeblöcke, ohne Erklärungen.', + messages: [{ role: 'user', content: [ + { type: 'image', source: { type: 'url', url: imageUrl } }, + { type: 'text', text: userPrompt }, + ]}], + }), + }); + if (!res.ok) { + const err = await res.json().catch(() => ({})); + const e = new Error(err.error?.message || `Claude API Fehler ${res.status}`); e.status = res.status; throw e; + } + const data = await res.json(); + let raw = data.content[0].text.trim(); + const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/); + if (md) raw = md[1]; + const parsed = JSON.parse(raw); + if (!Array.isArray(parsed.pairs)) throw new Error('Ungültiges JSON-Format von Claude (pairs fehlt)'); + return parsed.pairs.map(normalizePair).filter(Boolean); +} + +// Word-Einträge können {"w":"...","pos":"..."} oder plain Strings sein. +// Nur Nomen/Adjektive zulassen; bei plain Strings (kein pos) durchlassen (Regel steht im Prompt). +function cleanWordList(list) { + if (!Array.isArray(list)) return []; + const out = []; + for (const item of list) { + if (typeof item === 'string') { const t = item.trim(); if (t) out.push(t); continue; } + if (item && typeof item === 'object') { + const t = (item.w || item.word || item.text || '').toString().trim(); + const pos = (item.pos || '').toString().toLowerCase(); + if (t && (!pos || pos === 'noun' || pos === 'adjective')) out.push(t); + } + } + return [...new Set(out)]; +} + +function normalizePair(p) { + if (!p || typeof p !== 'object') return null; + const type = p.type; + if (!['text', 'yes_no', 'question', 'word'].includes(type)) return null; + const out = { type, difficulty: p.difficulty === 'medium' ? 'medium' : 'easy' }; + if (type === 'text') { + out.positive = (p.positive || '').toString().trim(); + if (!out.positive) return null; + } else { + out.question = (p.question || '').toString().trim(); + if (!out.question) return null; + } + if (type === 'yes_no') out.answer = p.answer === true; + if (type === 'question') { + out.positive = (p.positive || '').toString().trim(); + out.negative = (p.negative || '').toString().trim(); + if (!out.positive) return null; + } + if (type === 'word') { + out.positive_words = cleanWordList(p.positive_words); + out.negative_words = cleanWordList(p.negative_words); + if (!out.positive_words.length) return null; + } + return out; +} + +// ── Persistenz (Spiegel von savePairsForObject im CMT, serverseitig) ────────── +async function findOrCreateWord(title) { + const t = title.trim(); + const r = await query(`SELECT id FROM words WHERE lower(titel_de) = lower($1) LIMIT 1`, [t]); + if (r.rows.length) return r.rows[0].id; + const ins = await query( + `INSERT INTO words (titel_de, status) VALUES ($1, 'requested') RETURNING id`, [t]); + return ins.rows[0].id; +} + +// Persistiert EIN generiertes Pair für ein Objekt (inkl. object_pairs-Pflichtlink). +async function persistPair(p, objectId) { + const difficultyLevel = p.difficulty === 'easy' ? 1 : p.difficulty === 'medium' ? 2 : null; + + let questionId = null; + if (p.type !== 'text' && p.question) { + const q = await query( + `INSERT INTO questions (sentence_de, status) VALUES ($1, 'draft') RETURNING id`, [p.question]); + questionId = q.rows[0].id; + } + + let posStmtId = null, negStmtId = null; + if (p.type === 'text' && p.positive) { + posStmtId = (await query( + `INSERT INTO statements (positive_sentence_de, status) VALUES ($1, 'draft') RETURNING id`, [p.positive])).rows[0].id; + } else if (p.type === 'yes_no') { + posStmtId = (await query( + `INSERT INTO statements (answer, status) VALUES ($1, 'draft') RETURNING id`, [p.answer ?? null])).rows[0].id; + } else if (p.type === 'question') { + if (p.positive) + posStmtId = (await query( + `INSERT INTO statements (positive_sentence_de, status) VALUES ($1, 'draft') RETURNING id`, [p.positive])).rows[0].id; + if (p.negative) + negStmtId = (await query( + `INSERT INTO statements (negative_sentence_de, status) VALUES ($1, 'draft') RETURNING id`, [p.negative])).rows[0].id; + } else if (p.type === 'word') { + if (p.positive_words?.length) { + posStmtId = (await query(`INSERT INTO statements (status) VALUES ('draft') RETURNING id`)).rows[0].id; + for (const w of p.positive_words) { + const wid = await findOrCreateWord(w); + await query( + `INSERT INTO statement_positive_words (statement_id, word_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, + [posStmtId, wid]); + } + } + if (p.negative_words?.length) { + negStmtId = (await query(`INSERT INTO statements (status) VALUES ('draft') RETURNING id`)).rows[0].id; + for (const w of p.negative_words) { + const wid = await findOrCreateWord(w); + await query( + `INSERT INTO statement_negative_words (statement_id, word_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, + [negStmtId, wid]); + } + } + } + + const pair = (await query( + `INSERT INTO pairs (answer_type, difficulty_level, question_id, positive_statement_id, negative_statement_id, status) + VALUES ($1, $2, $3, $4, $5, 'draft') RETURNING id`, + [p.type, difficultyLevel, questionId, posStmtId, negStmtId])).rows[0]; + + await query( + `INSERT INTO object_pairs (object_id, pair_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, + [objectId, pair.id]); + return pair.id; +} + +module.exports = { generatePairsForObject, persistPair, findOrCreateWord }; diff --git a/src/lib/pairContent.js b/src/lib/pairContent.js new file mode 100644 index 0000000..7341dfb --- /dev/null +++ b/src/lib/pairContent.js @@ -0,0 +1,156 @@ +// Geteilte Pair-Logik: Kontext laden, Readiness berechnen, 3-Sprachen-Bündel, +// Wortgruppen-Übersetzung. Genutzt von routes/pairs.js und lib/pipeline.js. +const { query } = require('../db'); +const { LANGS, translateWords, maybeAutoTranslated } = require('./translate'); + +// Sammelt für eine Menge Pairs den Kontext (Fragen, Statements, Bilder, Audios) für eine Sprache. +async function loadPairContext(pairs, lang) { + const questionIds = [...new Set(pairs.map(p => p.question_id).filter(Boolean))]; + const statementIds = [...new Set([ + ...pairs.map(p => p.positive_statement_id), + ...pairs.map(p => p.negative_statement_id), + ].filter(Boolean))]; + const pairIds = pairs.map(p => p.id); + + const questionsMap = {}, statementsMap = {}, pictureMap = {}, audioMap = {}; + + if (questionIds.length) { + const r = await query( + `SELECT id, status, sentence_${lang} AS sentence FROM questions WHERE id = ANY($1)`, [questionIds]); + r.rows.forEach(q => { questionsMap[q.id] = q; }); + } + if (statementIds.length) { + const r = await query( + `SELECT id, status, positive_sentence_${lang} AS positive, negative_sentence_${lang} AS negative + FROM statements WHERE id = ANY($1)`, [statementIds]); + r.rows.forEach(s => { statementsMap[s.id] = s; }); + } + if (pairIds.length) { + const r = await query( + `SELECT op.pair_id, + bool_or(true) AS has_picture, + bool_or(pp.status = 'published') AS has_published_picture + FROM object_pairs op + JOIN object_pictures pic ON pic.object_id = op.object_id + JOIN pictures pp ON pp.id = pic.picture_id + WHERE op.pair_id = ANY($1) + GROUP BY op.pair_id`, [pairIds]); + r.rows.forEach(row => { pictureMap[row.pair_id] = row; }); + + const ids = [...questionIds, ...statementIds]; + if (ids.length) { + const a = await query( + `SELECT source_table, source_id, source_field FROM audios + WHERE source_id = ANY($1) AND language = $2 AND status <> 'blocked'`, [ids, lang]); + a.rows.forEach(x => { audioMap[`${x.source_table}|${x.source_id}|${x.source_field}`] = true; }); + } + } + return { questionsMap, statementsMap, pictureMap, audioMap }; +} + +// Berechnet, was einem Pair zur Veröffentlichung (für eine Sprache) noch fehlt. +// opts.skipPicturePublished: Bild-Publish-Check überspringen (Bundle-Publish veröffentlicht +// das Bild im selben Schritt), Bild-Existenz wird weiter geprüft. +// opts.skipStatusChecks: Publish-Status von Frage/Statements ignorieren (werden mitveröffentlicht). +function computeReadiness(p, ctx, lang, opts = {}) { + const missing = []; + const q = p.question_id ? ctx.questionsMap[p.question_id] : null; + const ps = p.positive_statement_id ? ctx.statementsMap[p.positive_statement_id] : null; + const ns = p.negative_statement_id ? ctx.statementsMap[p.negative_statement_id] : null; + const pic = ctx.pictureMap[p.id]; + + // Bild + if (!pic || !pic.has_picture) missing.push('Bild fehlt'); + else if (!opts.skipPicturePublished && !pic.has_published_picture) missing.push('Bild nicht veröffentlicht'); + + // Frage + if (q) { + if (!(q.sentence || '').trim()) missing.push(`Frage-Text (${lang}) fehlt`); + else { + if (!opts.skipStatusChecks && q.status !== 'published') missing.push('Frage nicht freigegeben'); + if (!ctx.audioMap[`questions|${p.question_id}|sentence`]) missing.push('Audio Frage fehlt'); + } + } + // Positiv-Statement + if (ps) { + if (!(ps.positive || '').trim()) missing.push(`Positiv-Satz (${lang}) fehlt`); + else { + if (!opts.skipStatusChecks && ps.status !== 'published') missing.push('Positiv-Satz nicht freigegeben'); + if (!ctx.audioMap[`statements|${p.positive_statement_id}|positive_sentence`]) missing.push('Audio Positiv fehlt'); + } + } + // Negativ-Statement (nur wenn Text vorhanden) + if (ns && (ns.negative || '').trim()) { + if (!opts.skipStatusChecks && ns.status !== 'published') missing.push('Negativ-Satz nicht freigegeben'); + if (!ctx.audioMap[`statements|${p.negative_statement_id}|negative_sentence`]) missing.push('Audio Negativ fehlt'); + } + + return { missing, missingCount: missing.length, ready: missing.length === 0 }; +} + +// Lädt das vollständige 3-sprachige Inhalts-Bündel eines Pairs (Frage, Statements, Wörter). +async function loadPairContent(p) { + const langCols = (prefix) => `${prefix}_de, ${prefix}_en, ${prefix}_sv`; + const content = { answer_type: p.answer_type, question: null, positive: null, negative: null }; + + if (p.question_id) { + content.question = (await query( + `SELECT ${langCols('sentence')} FROM questions WHERE id = $1`, [p.question_id])).rows[0] || null; + } + + async function loadStatement(stmtId, sentencePrefix, linkTable) { + if (!stmtId) return null; + const s = (await query( + `SELECT ${langCols(sentencePrefix)}, answer FROM statements WHERE id = $1`, [stmtId])).rows[0] || {}; + const words = (await query( + `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv + FROM ${linkTable} lw JOIN words w ON w.id = lw.word_id + WHERE lw.statement_id = $1 + ORDER BY w.titel_de`, [stmtId])).rows; + return { sentence: s, words, answer: s.answer ?? null }; + } + + content.positive = await loadStatement(p.positive_statement_id, 'positive_sentence', 'statement_positive_words'); + content.negative = await loadStatement(p.negative_statement_id, 'negative_sentence', 'statement_negative_words'); + return content; +} + +// Übersetzt die einem Statement zugeordneten Wörter gemeinsam (ein Claude-Call je Zielsprache), +// mit der Frage als Kontext zur Disambiguierung. `questionRow` = { sentence_de/en/sv } | null. +// Gibt die Anzahl tatsächlich aktualisierter Wort-Felder zurück. +async function translateWordGroup(statementId, linkTable, questionRow, overwrite) { + const rows = (await query( + `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv + FROM ${linkTable} lw JOIN words w ON w.id = lw.word_id + WHERE lw.statement_id = $1`, [statementId])).rows; + if (!rows.length) return 0; + + // Quellsprache: erste Sprache, in der mind. ein Wort Text hat + let src = null; + for (const l of LANGS) if (rows.some(w => (w[`titel_${l}`] || '').trim())) { src = l; break; } + if (!src) return 0; + + const context = questionRow + ? (questionRow[`sentence_${src}`] || questionRow.sentence_de || questionRow.sentence_en || questionRow.sentence_sv || '') + : ''; + + let count = 0; + for (const to of LANGS) { + if (to === src) continue; + const need = rows + .filter(w => (w[`titel_${src}`] || '').trim() && (overwrite || !(w[`titel_${to}`] || '').trim())) + .map(w => ({ id: w.id, text: (w[`titel_${src}`] || '').trim() })); + if (!need.length) continue; + const map = await translateWords({ words: need, from: src, to, context }); + for (const w of need) { + const t = map[w.id]; + if (!t) continue; + await query(`UPDATE words SET titel_${to} = $1 WHERE id = $2`, [t, w.id]); + await maybeAutoTranslated(w.id); + count++; + } + } + return count; +} + +module.exports = { loadPairContext, computeReadiness, loadPairContent, translateWordGroup }; diff --git a/src/lib/pipeline.js b/src/lib/pipeline.js new file mode 100644 index 0000000..dc6a880 --- /dev/null +++ b/src/lib/pipeline.js @@ -0,0 +1,337 @@ +// Automatische Content-Pipeline pro Bild: Pairs generieren → übersetzen → Audio → ready. +// In-Process-Queue mit einem Worker (rate-limit-freundlich). Jeder Schritt ist idempotent, +// d.h. ein Resume nach Crash/Redeploy überspringt bereits Erledigtes. +const { query } = require('../db'); +const { LANGS, fillMissingRow } = require('./translate'); +const { translateWordGroup } = require('./pairContent'); +const { generatePairsForObject, persistPair } = require('./generatePairs'); +const { generateAndStore } = require('../routes/audios'); + +const queue = []; +let running = false; + +function enqueue(pictureId) { + if (!queue.includes(pictureId)) queue.push(pictureId); + pump(); +} + +async function pump() { + if (running) return; + running = true; + try { + while (queue.length) { + const id = queue.shift(); + try { await runPicture(id); } + catch (err) { + console.error(`Pipeline für Bild ${id} fehlgeschlagen:`, err); + await setFailed(id, null, err).catch(() => {}); + } + } + } finally { running = false; } +} + +// Beim API-Boot: hängengebliebene Läufe (queued/running) wieder aufnehmen. +async function resumePending() { + const r = await query( + `SELECT id FROM pictures WHERE pipeline_status IN ('queued', 'running')`); + for (const row of r.rows) enqueue(row.id); + if (r.rows.length) console.log(`Pipeline: ${r.rows.length} Bild(er) nach Neustart wieder aufgenommen`); +} + +async function getPairsPerObject() { + const r = await query(`SELECT value FROM app_settings WHERE key = 'pipeline.pairs_per_object'`); + const n = parseInt(r.rows[0]?.value); + return Math.min(Math.max(isNaN(n) ? 5 : n, 1), 20); +} + +async function setStep(pictureId, step, progress) { + await query( + `UPDATE pictures SET pipeline_step = $2, pipeline_progress = $3 WHERE id = $1`, + [pictureId, step, JSON.stringify(progress)]); +} + +async function setFailed(pictureId, step, err) { + await query( + `UPDATE pictures SET pipeline_status='failed', pipeline_step=COALESCE($2, pipeline_step), + pipeline_error=$3, pipeline_finished_at=NOW() WHERE id=$1`, + [pictureId, step, (err.message || String(err)).slice(0, 1000)]); +} + +// Objekte eines Bildes inkl. zugewiesener Wörter + Selektionen laden. +async function loadObjects(pictureId) { + const objs = (await query( + `SELECT o.id, o.status, o.selections + FROM object_pictures op JOIN objects o ON o.id = op.object_id + WHERE op.picture_id = $1 AND o.status <> 'blocked' + ORDER BY o.created_at`, [pictureId])).rows; + for (const o of objs) { + o.words = (await query( + `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv + FROM object_words ow JOIN words w ON w.id = ow.word_id + WHERE ow.object_id = $1`, [o.id])).rows; + } + return objs; +} + +// Alle Pairs eines Bildes (über object_pairs), ohne geblockte. +async function loadPairs(pictureId) { + return (await query( + `SELECT DISTINCT p.id, p.answer_type, p.status, p.question_id, + p.positive_statement_id, p.negative_statement_id + FROM object_pairs op + JOIN object_pictures pic ON pic.object_id = op.object_id + JOIN pairs p ON p.id = op.pair_id + WHERE pic.picture_id = $1 AND p.status <> 'blocked' + ORDER BY p.id`, [pictureId])).rows; +} + +async function runPicture(pictureId) { + // Claim — nur Bilder, die in der Pipeline sind + const claim = await query( + `UPDATE pictures + SET pipeline_status='running', pipeline_error=NULL, + pipeline_started_at=COALESCE(pipeline_started_at, NOW()), pipeline_finished_at=NULL + WHERE id=$1 AND pipeline_status IN ('queued','running','failed') + RETURNING id, picture_link`, [pictureId]); + if (!claim.rows.length) return; + const picture = claim.rows[0]; + + const objects = await loadObjects(pictureId); + if (!objects.length) { await setFailed(pictureId, 'pairs', new Error('Bild hat keine Objekte')); return; } + + const progress = { objectsDone: 0, objectsTotal: objects.length, pairsCreated: 0, + translatedPairs: 0, pairsTotal: 0, audiosDone: 0, audiosTotal: 0, incompletePairs: 0 }; + + // ── Step 1: Pairs generieren (pro Objekt, skip wenn schon genug) ──────────── + try { + const targetCount = await getPairsPerObject(); + await setStep(pictureId, 'pairs', progress); + const claudeObjects = objects.map(o => ({ id: o.id, words: o.words, selections: o.selections || [] })); + + for (const obj of objects) { + const have = parseInt((await query( + `SELECT count(*) AS c FROM object_pairs op JOIN pairs p ON p.id = op.pair_id + WHERE op.object_id = $1 AND p.status <> 'blocked'`, [obj.id])).rows[0].c); + if (have < targetCount) { + if (!picture.picture_link) throw new Error('Bild hat keinen picture_link für die KI-Analyse'); + let pairs; + try { + pairs = await generatePairsForObject({ + imageUrl: picture.picture_link, objects: claudeObjects, + selectedObjectId: obj.id, count: targetCount - have, + }); + } catch (err) { + // Ein Retry bei Parse-/API-Fehlern, dann aufgeben + pairs = await generatePairsForObject({ + imageUrl: picture.picture_link, objects: claudeObjects, + selectedObjectId: obj.id, count: targetCount - have, + }); + } + for (const p of pairs) { + await persistPair(p, obj.id); + progress.pairsCreated++; + await setStep(pictureId, 'pairs', progress); + } + } + progress.objectsDone++; + await setStep(pictureId, 'pairs', progress); + } + } catch (err) { await setFailed(pictureId, 'pairs', err); return; } + + // ── Step 2: Übersetzen (pro Pair, füllt nur fehlende Sprachen) ────────────── + const pairs = await loadPairs(pictureId); + progress.pairsTotal = pairs.length; + try { + await setStep(pictureId, 'translate', progress); + for (const p of pairs) { + let questionRow = null; + if (p.question_id) { + questionRow = (await query( + `SELECT sentence_de, sentence_en, sentence_sv FROM questions WHERE id = $1`, + [p.question_id])).rows[0] || null; + await fillMissingRow('questions', p.question_id, ['sentence']); + } + if (p.answer_type === 'word') { + if (p.positive_statement_id) + await translateWordGroup(p.positive_statement_id, 'statement_positive_words', questionRow, false); + if (p.negative_statement_id) + await translateWordGroup(p.negative_statement_id, 'statement_negative_words', questionRow, false); + } else { + if ((p.answer_type === 'text' || p.answer_type === 'question') && p.positive_statement_id) + await fillMissingRow('statements', p.positive_statement_id, ['positive_sentence']); + if (p.answer_type === 'question' && p.negative_statement_id) + await fillMissingRow('statements', p.negative_statement_id, ['negative_sentence']); + } + progress.translatedPairs++; + await setStep(pictureId, 'translate', progress); + } + } catch (err) { await setFailed(pictureId, 'translate', err); return; } + + // ── Step 3: Audio für alle Sätze + Wörter des Bildes in allen Sprachen ────── + try { + const units = await collectAudioUnits(pictureId, pairs); + progress.audiosTotal = units.length; + progress.audiosDone = units.filter(u => u.hasAudio).length; + await setStep(pictureId, 'audio', progress); + + const failures = []; + for (const u of units) { + if (u.hasAudio) continue; + try { + await generateWithBackoff(u); + progress.audiosDone++; + } catch (err) { + failures.push(`${u.source_table}/${u.source_field}/${u.language}: ${err.message}`); + } + await setStep(pictureId, 'audio', progress); + } + if (failures.length && progress.audiosDone === 0) + throw new Error(`Alle Audio-Generierungen fehlgeschlagen: ${failures[0]}`); + progress.audioFailures = failures.length; + } catch (err) { await setFailed(pictureId, 'audio', err); return; } + + // ── Step 4: Abschluss — vollständige Inhalte auf 'reviewed', Bild auf 'ready' + try { + let incomplete = 0; + for (const p of pairs) { + if (await isPairComplete(p)) { + if (p.question_id) + await query(`UPDATE questions SET status='reviewed' WHERE id=$1 AND status='draft'`, [p.question_id]); + const stmtIds = [p.positive_statement_id, p.negative_statement_id].filter(Boolean); + if (stmtIds.length) + await query(`UPDATE statements SET status='reviewed' WHERE id = ANY($1) AND status='draft'`, [stmtIds]); + await query(`UPDATE pairs SET status='reviewed' WHERE id=$1 AND status='draft'`, [p.id]); + } else incomplete++; + } + progress.incompletePairs = incomplete; + await query( + `UPDATE objects SET status='reviewed' + WHERE id IN (SELECT object_id FROM object_pictures WHERE picture_id = $1) AND status='draft'`, [pictureId]); + await query( + `UPDATE pictures SET status = CASE WHEN status='uploaded' THEN 'reviewed' ELSE status END, + pipeline_status='ready', pipeline_step=NULL, pipeline_progress=$2, pipeline_finished_at=NOW() + WHERE id=$1`, [pictureId, JSON.stringify(progress)]); + } catch (err) { await setFailed(pictureId, 'finish', err); return; } +} + +// Alle 3 Sprachen in den genutzten Feldern des Pairs gefüllt? (Spiegel des Review-Checks) +async function isPairComplete(p) { + if (p.question_id) { + const q = (await query( + `SELECT sentence_de, sentence_en, sentence_sv FROM questions WHERE id=$1`, [p.question_id])).rows[0]; + if (!q || LANGS.some(l => !(q[`sentence_${l}`] || '').trim())) return false; + } + if (p.answer_type === 'word') { + for (const [stmtId, link] of [[p.positive_statement_id, 'statement_positive_words'], + [p.negative_statement_id, 'statement_negative_words']]) { + if (!stmtId) continue; + const ws = (await query( + `SELECT w.titel_de, w.titel_en, w.titel_sv FROM ${link} lw JOIN words w ON w.id = lw.word_id + WHERE lw.statement_id = $1`, [stmtId])).rows; + if (ws.some(w => LANGS.some(l => !(w[`titel_${l}`] || '').trim()))) return false; + } + } else if (p.answer_type === 'text' || p.answer_type === 'question') { + if (p.positive_statement_id) { + const s = (await query( + `SELECT positive_sentence_de, positive_sentence_en, positive_sentence_sv FROM statements WHERE id=$1`, + [p.positive_statement_id])).rows[0]; + if (!s || LANGS.some(l => !(s[`positive_sentence_${l}`] || '').trim())) return false; + } + if (p.answer_type === 'question' && p.negative_statement_id) { + const s = (await query( + `SELECT negative_sentence_de, negative_sentence_en, negative_sentence_sv FROM statements WHERE id=$1`, + [p.negative_statement_id])).rows[0]; + const hasAny = s && LANGS.some(l => (s[`negative_sentence_${l}`] || '').trim()); + if (hasAny && LANGS.some(l => !(s[`negative_sentence_${l}`] || '').trim())) return false; + } + } + return true; +} + +// Audio-Einheiten des Bildes: Frage-/Statement-Sätze + verlinkte Wörter × Sprachen. +// Nur Felder, die in ALLEN Sprachen Text haben (Regel wie audios.js computeUnits). +async function collectAudioUnits(pictureId, pairs) { + const units = []; + const questionIds = [...new Set(pairs.map(p => p.question_id).filter(Boolean))]; + const stmtIds = [...new Set(pairs.flatMap(p => [p.positive_statement_id, p.negative_statement_id]).filter(Boolean))]; + + // Wörter: über die Statements der Pairs (word-Typ) + object_words des Bildes + const wordIds = new Set(); + if (stmtIds.length) { + for (const link of ['statement_positive_words', 'statement_negative_words']) { + const r = await query(`SELECT word_id FROM ${link} WHERE statement_id = ANY($1)`, [stmtIds]); + r.rows.forEach(x => wordIds.add(x.word_id)); + } + } + const ow = await query( + `SELECT ow.word_id FROM object_words ow + JOIN object_pictures op ON op.object_id = ow.object_id + WHERE op.picture_id = $1`, [pictureId]); + ow.rows.forEach(x => wordIds.add(x.word_id)); + + const sources = []; + if (questionIds.length) { + const r = await query( + `SELECT id, sentence_de, sentence_en, sentence_sv FROM questions WHERE id = ANY($1)`, [questionIds]); + r.rows.forEach(row => sources.push({ table: 'questions', row, fields: ['sentence'] })); + } + if (stmtIds.length) { + const r = await query( + `SELECT id, positive_sentence_de, positive_sentence_en, positive_sentence_sv, + negative_sentence_de, negative_sentence_en, negative_sentence_sv + FROM statements WHERE id = ANY($1)`, [stmtIds]); + r.rows.forEach(row => sources.push({ table: 'statements', row, fields: ['positive_sentence', 'negative_sentence'] })); + } + if (wordIds.size) { + const r = await query( + `SELECT id, titel_de, titel_en, titel_sv FROM words WHERE id = ANY($1) AND status <> 'blocked'`, + [[...wordIds]]); + r.rows.forEach(row => sources.push({ table: 'words', row, fields: ['titel'] })); + } + + // Vorhandene Audios in einem Rutsch laden + const sourceIds = sources.map(s => s.row.id); + const have = new Set(); + if (sourceIds.length) { + const a = await query( + `SELECT source_table, source_id, source_field, language FROM audios + WHERE source_id = ANY($1) AND status <> 'blocked'`, [sourceIds]); + a.rows.forEach(x => have.add(`${x.source_table}|${x.source_id}|${x.source_field}|${x.language}`)); + } + + for (const { table, row, fields } of sources) { + for (const f of fields) { + const allFilled = LANGS.every(l => (row[`${f}_${l}`] || '').trim()); + if (!allFilled) continue; + for (const l of LANGS) { + units.push({ + source_table: table, source_id: row.id, source_field: f, language: l, + text: (row[`${f}_${l}`] || '').trim(), + hasAudio: have.has(`${table}|${row.id}|${f}|${l}`), + }); + } + } + } + return units; +} + +// ElevenLabs mit Backoff bei Rate-Limit (429). +async function generateWithBackoff(u) { + const delays = [2000, 8000, 30000]; + for (let attempt = 0; ; attempt++) { + try { + return await generateAndStore({ + text: u.text, language: u.language, + source_table: u.source_table, source_id: u.source_id, source_field: u.source_field, + }); + } catch (err) { + if (err.status === 429 && attempt < delays.length) { + await new Promise(r => setTimeout(r, delays[attempt])); + continue; + } + throw err; + } + } +} + +module.exports = { enqueue, resumePending }; diff --git a/src/lib/placeholders.js b/src/lib/placeholders.js new file mode 100644 index 0000000..9ff0e8b --- /dev/null +++ b/src/lib/placeholders.js @@ -0,0 +1,18 @@ +// Placeholder-Format in Satz-/Titel-Feldern: {{label.w:uuid}} (Wort) bzw. {{label.o:uuid}} (Objekt). +// Geteilt zwischen translate.js (Token-Schutz) und TTS (Auflösung in Sprechtext). +const PLACEHOLDER_RE = /\{\{([^.{}]+)\.(w|o):([0-9a-f-]{36})\}\}/g; + +// Legacy-Form ohne Label: {{uuid}} — sollte migriert sein, defensiv trotzdem entfernen. +const LEGACY_PLACEHOLDER_RE = /\{\{\s*[0-9a-f-]{36}\s*\}\}/g; + +// Macht aus "Ist das ein {{Apfel.w:1234-…}}?" → "Ist das ein Apfel?" (für TTS/Anzeige). +function resolvePlaceholdersToLabels(text) { + if (!text) return ''; + return String(text) + .replace(PLACEHOLDER_RE, (_, label) => label) + .replace(LEGACY_PLACEHOLDER_RE, '') + .replace(/\s{2,}/g, ' ') + .trim(); +} + +module.exports = { PLACEHOLDER_RE, resolvePlaceholdersToLabels }; diff --git a/src/lib/translate.js b/src/lib/translate.js index b5c5813..2a34596 100644 --- a/src/lib/translate.js +++ b/src/lib/translate.js @@ -18,7 +18,7 @@ const TRANSLATE_CONFIG = { // ── Placeholder-Schutz ──────────────────────────────────────────────────────── // Format im Quelltext: {{label.w:uuid}} oder {{label.o:uuid}} -const PLACEHOLDER_RE = /\{\{([^.{}]+)\.(w|o):([0-9a-f-]{36})\}\}/g; +const { PLACEHOLDER_RE } = require('./placeholders'); // Sätze für Claude vorbereiten: jedes Placeholder durch ⟦PHn:label⟧-Token ersetzen. // Token-Format ist absichtlich exotisch, damit Claude es nicht versehentlich ändert. diff --git a/src/routes/audios.js b/src/routes/audios.js index b323f7c..c31d8b0 100644 --- a/src/routes/audios.js +++ b/src/routes/audios.js @@ -3,6 +3,7 @@ const { v4: uuidv4 } = require('uuid'); const { query } = require('../db'); const { uploadFile, deleteFile, keyFromUrl } = require('../s3'); const { voiceForLanguage } = require('../voices'); +const { resolvePlaceholdersToLabels } = require('../lib/placeholders'); const ELEVENLABS_BASE = 'https://api.elevenlabs.io/v1'; const ALLOWED_STATUSES = ['generated', 'published', 'blocked']; @@ -40,6 +41,10 @@ async function generateAndStore({ text, voice_id, language, model_id, speed, sta const apiKey = process.env.ELEVENLABS_API_KEY; if (!apiKey) { const e = new Error('ELEVENLABS_API_KEY not configured'); e.status = 500; throw e; } + // Placeholder ({{label.w:uuid}}) in Sprechtext auflösen — UUIDs dürfen nie vertont werden. + text = resolvePlaceholdersToLabels(text); + if (!text) { const e = new Error('text is empty after placeholder resolution'); e.status = 400; throw e; } + const cfg = await getTtsSettings(language); const voice = voice_id || cfg.voice_id; const m = model_id || cfg.model_id; @@ -311,3 +316,5 @@ router.delete('/:id', async (req, res, next) => { }); module.exports = router; +// Für lib/pipeline.js (Audio-Generierung außerhalb des HTTP-Kontexts) +module.exports.generateAndStore = generateAndStore; diff --git a/src/routes/pairs.js b/src/routes/pairs.js index 372f86e..92537fd 100644 --- a/src/routes/pairs.js +++ b/src/routes/pairs.js @@ -1,6 +1,7 @@ const router = require('express').Router(); const { query } = require('../db'); -const { fillMissingRow, translateWords, maybeAutoTranslated } = require('../lib/translate'); +const { fillMissingRow } = require('../lib/translate'); +const { loadPairContext, computeReadiness, loadPairContent, translateWordGroup } = require('../lib/pairContent'); const STATUSES = ['draft', 'reviewed', 'blocked', 'published']; const ANSWER_TYPES = new Set(['yes_no', 'text', 'question', 'word']); @@ -18,88 +19,6 @@ const STATUS_TIMESTAMP = { const LANGS = ['de', 'en', 'sv']; -// Sammelt für eine Menge Pairs den Kontext (Fragen, Statements, Bilder, Audios) für eine Sprache. -async function loadPairContext(pairs, lang) { - const questionIds = [...new Set(pairs.map(p => p.question_id).filter(Boolean))]; - const statementIds = [...new Set([ - ...pairs.map(p => p.positive_statement_id), - ...pairs.map(p => p.negative_statement_id), - ].filter(Boolean))]; - const pairIds = pairs.map(p => p.id); - - const questionsMap = {}, statementsMap = {}, pictureMap = {}, audioMap = {}; - - if (questionIds.length) { - const r = await query( - `SELECT id, status, sentence_${lang} AS sentence FROM questions WHERE id = ANY($1)`, [questionIds]); - r.rows.forEach(q => { questionsMap[q.id] = q; }); - } - if (statementIds.length) { - const r = await query( - `SELECT id, status, positive_sentence_${lang} AS positive, negative_sentence_${lang} AS negative - FROM statements WHERE id = ANY($1)`, [statementIds]); - r.rows.forEach(s => { statementsMap[s.id] = s; }); - } - if (pairIds.length) { - const r = await query( - `SELECT op.pair_id, - bool_or(true) AS has_picture, - bool_or(pp.status = 'published') AS has_published_picture - FROM object_pairs op - JOIN object_pictures pic ON pic.object_id = op.object_id - JOIN pictures pp ON pp.id = pic.picture_id - WHERE op.pair_id = ANY($1) - GROUP BY op.pair_id`, [pairIds]); - r.rows.forEach(row => { pictureMap[row.pair_id] = row; }); - - const ids = [...questionIds, ...statementIds]; - if (ids.length) { - const a = await query( - `SELECT source_table, source_id, source_field FROM audios - WHERE source_id = ANY($1) AND language = $2 AND status <> 'blocked'`, [ids, lang]); - a.rows.forEach(x => { audioMap[`${x.source_table}|${x.source_id}|${x.source_field}`] = true; }); - } - } - return { questionsMap, statementsMap, pictureMap, audioMap }; -} - -// Berechnet, was einem Pair zur Veröffentlichung (für eine Sprache) noch fehlt. -function computeReadiness(p, ctx, lang) { - const missing = []; - const q = p.question_id ? ctx.questionsMap[p.question_id] : null; - const ps = p.positive_statement_id ? ctx.statementsMap[p.positive_statement_id] : null; - const ns = p.negative_statement_id ? ctx.statementsMap[p.negative_statement_id] : null; - const pic = ctx.pictureMap[p.id]; - - // Bild - if (!pic || !pic.has_picture) missing.push('Bild fehlt'); - else if (!pic.has_published_picture) missing.push('Bild nicht veröffentlicht'); - - // Frage - if (q) { - if (!(q.sentence || '').trim()) missing.push(`Frage-Text (${lang}) fehlt`); - else { - if (q.status !== 'published') missing.push('Frage nicht freigegeben'); - if (!ctx.audioMap[`questions|${p.question_id}|sentence`]) missing.push('Audio Frage fehlt'); - } - } - // Positiv-Statement - if (ps) { - if (!(ps.positive || '').trim()) missing.push(`Positiv-Satz (${lang}) fehlt`); - else { - if (ps.status !== 'published') missing.push('Positiv-Satz nicht freigegeben'); - if (!ctx.audioMap[`statements|${p.positive_statement_id}|positive_sentence`]) missing.push('Audio Positiv fehlt'); - } - } - // Negativ-Statement (nur wenn Text vorhanden) - if (ns && (ns.negative || '').trim()) { - if (ns.status !== 'published') missing.push('Negativ-Satz nicht freigegeben'); - if (!ctx.audioMap[`statements|${p.negative_statement_id}|negative_sentence`]) missing.push('Audio Negativ fehlt'); - } - - return { missing, missingCount: missing.length, ready: missing.length === 0 }; -} - // GET /api/pairs/publishability?lang=sv — Pairs mit Readiness, sortierbar nach "am wenigsten fehlt" router.get('/publishability', async (req, res, next) => { try { @@ -215,72 +134,6 @@ router.patch('/:id', async (req, res, next) => { } catch (err) { next(err); } }); -// Lädt das vollständige 3-sprachige Inhalts-Bündel eines Pairs (Frage, Statements, Wörter) -// — für das Review-Modal im Frontend. -async function loadPairContent(p) { - const langCols = (prefix) => `${prefix}_de, ${prefix}_en, ${prefix}_sv`; - const content = { answer_type: p.answer_type, question: null, positive: null, negative: null }; - - if (p.question_id) { - content.question = (await query( - `SELECT ${langCols('sentence')} FROM questions WHERE id = $1`, [p.question_id])).rows[0] || null; - } - - async function loadStatement(stmtId, sentencePrefix, linkTable) { - if (!stmtId) return null; - const s = (await query( - `SELECT ${langCols(sentencePrefix)}, answer FROM statements WHERE id = $1`, [stmtId])).rows[0] || {}; - const words = (await query( - `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv - FROM ${linkTable} lw JOIN words w ON w.id = lw.word_id - WHERE lw.statement_id = $1 - ORDER BY w.titel_de`, [stmtId])).rows; - return { sentence: s, words, answer: s.answer ?? null }; - } - - content.positive = await loadStatement(p.positive_statement_id, 'positive_sentence', 'statement_positive_words'); - content.negative = await loadStatement(p.negative_statement_id, 'negative_sentence', 'statement_negative_words'); - return content; -} - -// Übersetzt die einem Statement zugeordneten Wörter gemeinsam (ein Claude-Call je Zielsprache), -// mit der Frage als Kontext zur Disambiguierung. `questionRow` = { sentence_de/en/sv } | null. -// Gibt die Anzahl tatsächlich aktualisierter Wort-Felder zurück. -async function translateWordGroup(statementId, linkTable, questionRow, overwrite) { - const rows = (await query( - `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv - FROM ${linkTable} lw JOIN words w ON w.id = lw.word_id - WHERE lw.statement_id = $1`, [statementId])).rows; - if (!rows.length) return 0; - - // Quellsprache: erste Sprache, in der mind. ein Wort Text hat - let src = null; - for (const l of LANGS) if (rows.some(w => (w[`titel_${l}`] || '').trim())) { src = l; break; } - if (!src) return 0; - - const context = questionRow - ? (questionRow[`sentence_${src}`] || questionRow.sentence_de || questionRow.sentence_en || questionRow.sentence_sv || '') - : ''; - - let count = 0; - for (const to of LANGS) { - if (to === src) continue; - const need = rows - .filter(w => (w[`titel_${src}`] || '').trim() && (overwrite || !(w[`titel_${to}`] || '').trim())) - .map(w => ({ id: w.id, text: (w[`titel_${src}`] || '').trim() })); - if (!need.length) continue; - const map = await translateWords({ words: need, from: src, to, context }); - for (const w of need) { - const t = map[w.id]; - if (!t) continue; - await query(`UPDATE words SET titel_${to} = $1 WHERE id = $2`, [t, w.id]); - await maybeAutoTranslated(w.id); - count++; - } - } - return count; -} - // POST /api/pairs/:id/translate — übersetzt fehlende Sätze/Wörter dieses Pairs in die // fehlenden Sprachen (de/en/sv). Body `{ overwrite: true }` übersetzt auch bereits gefüllte // Zielsprachen neu (Quellsprache bleibt unangetastet). Liefert das aktualisierte Inhalts-Bündel. diff --git a/src/routes/pipeline.js b/src/routes/pipeline.js new file mode 100644 index 0000000..40d5d11 --- /dev/null +++ b/src/routes/pipeline.js @@ -0,0 +1,207 @@ +// Content-Pipeline: Freigeben (release) → Auto-Verarbeitung → Review-Bundle → Bild-Publish. +const router = require('express').Router(); +const { query } = require('../db'); +const { LANGS } = require('../lib/translate'); +const { loadPairContext, computeReadiness, loadPairContent } = require('../lib/pairContent'); +const { enqueue } = require('../lib/pipeline'); + +// POST /api/pipeline/release/:pictureId — Bild in die Pipeline geben +router.post('/release/:pictureId', async (req, res, next) => { + try { + const pr = await query(`SELECT id, pipeline_status FROM pictures WHERE id = $1`, [req.params.pictureId]); + if (!pr.rows.length) return res.status(404).json({ error: 'Bild nicht gefunden' }); + if (['queued', 'running'].includes(pr.rows[0].pipeline_status)) + return res.status(409).json({ error: 'Pipeline läuft bereits für dieses Bild' }); + + const oc = await query( + `SELECT count(*) AS c FROM object_pictures WHERE picture_id = $1`, [req.params.pictureId]); + if (!parseInt(oc.rows[0].c)) + return res.status(400).json({ error: 'Bild hat keine Objekte — erst Objekte anlegen' }); + + const upd = await query( + `UPDATE pictures SET pipeline_status='queued', pipeline_error=NULL, pipeline_step=NULL, + pipeline_started_at=NULL, pipeline_finished_at=NULL + WHERE id=$1 RETURNING *`, [req.params.pictureId]); + enqueue(req.params.pictureId); + res.status(202).json(upd.rows[0]); + } catch (err) { next(err); } +}); + +// POST /api/pipeline/retry/:pictureId — fehlgeschlagenen Lauf erneut starten +router.post('/retry/:pictureId', async (req, res, next) => { + try { + const upd = await query( + `UPDATE pictures SET pipeline_status='queued', pipeline_error=NULL + WHERE id=$1 AND pipeline_status='failed' RETURNING *`, [req.params.pictureId]); + if (!upd.rows.length) return res.status(409).json({ error: 'Bild ist nicht im Status "failed"' }); + enqueue(req.params.pictureId); + res.status(202).json(upd.rows[0]); + } catch (err) { next(err); } +}); + +// GET /api/pipeline/overview — alle Bilder, die in der Pipeline sind (Polling-Endpoint) +router.get('/overview', async (req, res, next) => { + try { + const r = await query( + `SELECT p.id, p.design, p.picture_link, p.blurhash, p.status, + p.pipeline_status, p.pipeline_step, p.pipeline_progress, p.pipeline_error, + p.pipeline_started_at, p.pipeline_finished_at, + (SELECT count(*) FROM object_pictures op WHERE op.picture_id = p.id) AS object_count + FROM pictures p + WHERE p.pipeline_status NOT IN ('none', 'published') + ORDER BY p.pipeline_started_at DESC NULLS LAST, p.updated_at DESC`); + res.json(r.rows); + } catch (err) { next(err); } +}); + +// GET /api/pipeline/picture/:id/bundle — kompletter Review-Payload für die Veröffentlichen-Seite +router.get('/picture/:id/bundle', async (req, res, next) => { + try { + const pr = await query(`SELECT * FROM pictures WHERE id = $1`, [req.params.id]); + if (!pr.rows.length) return res.status(404).json({ error: 'Bild nicht gefunden' }); + const picture = pr.rows[0]; + + const objects = (await query( + `SELECT o.id, o.status, o.selections + FROM object_pictures op JOIN objects o ON o.id = op.object_id + WHERE op.picture_id = $1 AND o.status <> 'blocked' + ORDER BY o.created_at`, [req.params.id])).rows; + + for (const obj of objects) { + obj.words = (await query( + `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv, w.status + FROM object_words ow JOIN words w ON w.id = ow.word_id + WHERE ow.object_id = $1`, [obj.id])).rows; + + const pairs = (await query( + `SELECT p.id, p.answer_type, p.status, p.difficulty_level, + p.question_id, p.positive_statement_id, p.negative_statement_id + FROM object_pairs op JOIN pairs p ON p.id = op.pair_id + WHERE op.object_id = $1 AND p.status <> 'blocked' + ORDER BY p.created_at`, [obj.id])).rows; + + // Audio-Abdeckung pro Pair × Sprache (für die 🔊-Indikatoren) + for (const p of pairs) { + p.content = await loadPairContent(p); + p.audio = {}; + for (const lang of LANGS) { + const ctx = await loadPairContext([p], lang); + const r = computeReadiness(p, ctx, lang, { skipPicturePublished: true, skipStatusChecks: true }); + p.audio[lang] = { ready: r.ready, missing: r.missing }; + } + delete p.question_id; delete p.positive_statement_id; delete p.negative_statement_id; + } + obj.pairs = pairs; + } + + res.json({ picture, objects }); + } catch (err) { next(err); } +}); + +// GET /api/pipeline/settings +router.get('/settings', async (req, res, next) => { + try { + const r = await query(`SELECT value FROM app_settings WHERE key = 'pipeline.pairs_per_object'`); + const n = parseInt(r.rows[0]?.value); + res.json({ pairs_per_object: isNaN(n) ? 5 : n }); + } catch (err) { next(err); } +}); + +// PUT /api/pipeline/settings — { pairs_per_object } +router.put('/settings', async (req, res, next) => { + try { + const n = Math.min(Math.max(parseInt(req.body.pairs_per_object) || 5, 1), 20); + await query( + `INSERT INTO app_settings (key, value) VALUES ('pipeline.pairs_per_object', $1::jsonb) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, [JSON.stringify(n)]); + res.json({ pairs_per_object: n }); + } catch (err) { next(err); } +}); + +// POST /api/pipeline/picture/:id/publish — Body { excluded_pair_ids: [] } +// Blockt ausgeschlossene Pairs, validiert den Rest (Text + Audio in allen Sprachen) +// und veröffentlicht dann kaskadierend: Fragen, Statements, Pairs, Wörter, Objekte, Bild. +router.post('/picture/:id/publish', async (req, res, next) => { + try { + const pictureId = req.params.id; + const excluded = Array.isArray(req.body?.excluded_pair_ids) ? req.body.excluded_pair_ids : []; + + const pr = await query(`SELECT id, status, pipeline_status FROM pictures WHERE id = $1`, [pictureId]); + if (!pr.rows.length) return res.status(404).json({ error: 'Bild nicht gefunden' }); + + // 1) Ausgeschlossene Pairs blocken + if (excluded.length) { + await query( + `UPDATE pairs SET status='blocked', blocked_at=NOW(), blocked_topic='Im Publish-Review ausgeschlossen' + WHERE id = ANY($1)`, [excluded]); + } + + // 2) Verbleibende Pairs des Bildes laden + const pairs = (await query( + `SELECT DISTINCT p.id, p.answer_type, p.status, p.question_id, + p.positive_statement_id, p.negative_statement_id + FROM object_pairs op + JOIN object_pictures pic ON pic.object_id = op.object_id + JOIN pairs p ON p.id = op.pair_id + WHERE pic.picture_id = $1 AND p.status <> 'blocked'`, [pictureId])).rows; + if (!pairs.length) return res.status(400).json({ error: 'Keine veröffentlichbaren Pairs übrig' }); + + // 3) Readiness pro Pair × Sprache (Bild-/Status-Checks übersprungen — werden hier mitveröffentlicht) + const notReady = []; + for (const lang of LANGS) { + const ctx = await loadPairContext(pairs, lang); + for (const p of pairs) { + const r = computeReadiness(p, ctx, lang, { skipPicturePublished: true, skipStatusChecks: true }); + if (!r.ready) notReady.push({ pair_id: p.id, lang, missing: r.missing }); + } + } + if (notReady.length) + return res.status(409).json({ error: 'Noch nicht veröffentlichbar', notReady }); + + // 4) Kaskadierend veröffentlichen + const now = new Date().toISOString(); + const questionIds = [...new Set(pairs.map(p => p.question_id).filter(Boolean))]; + const stmtIds = [...new Set(pairs.flatMap(p => [p.positive_statement_id, p.negative_statement_id]).filter(Boolean))]; + const pairIds = pairs.map(p => p.id); + + if (questionIds.length) + await query(`UPDATE questions SET status='published', published_at=COALESCE(published_at,$2) + WHERE id = ANY($1)`, [questionIds, now]); + if (stmtIds.length) + await query(`UPDATE statements SET status='published', published_at=COALESCE(published_at,$2) + WHERE id = ANY($1)`, [stmtIds, now]); + await query(`UPDATE pairs SET status='published', published_at=COALESCE(published_at,$2) + WHERE id = ANY($1)`, [pairIds, now]); + + // Verlinkte Wörter: nur 'generated' → 'published' (translated bleibt für die Bild-Generierung + // im ServerMonitor-Flow; published würde diesen Schritt überspringen) + let publishedWords = 0; + if (stmtIds.length) { + const w = await query( + `UPDATE words SET status='published', published_at=COALESCE(published_at,$2) + WHERE status='generated' AND id IN ( + SELECT word_id FROM statement_positive_words WHERE statement_id = ANY($1) + UNION SELECT word_id FROM statement_negative_words WHERE statement_id = ANY($1) + ) RETURNING id`, [stmtIds, now]); + publishedWords = w.rows.length; + } + + await query( + `UPDATE objects SET status='published', published_at=COALESCE(published_at,$2) + WHERE id IN (SELECT object_id FROM object_pictures WHERE picture_id = $1) + AND status <> 'blocked'`, [pictureId, now]); + await query( + `UPDATE pictures SET status='published', published_timestamp=COALESCE(published_timestamp,$2), + pipeline_status='published' + WHERE id=$1`, [pictureId, now]); + + res.json({ + published_pairs: pairIds.length, + blocked_pairs: excluded.length, + published_words: publishedWords, + picture_id: pictureId, + }); + } catch (err) { next(err); } +}); + +module.exports = router;