// Automatische Content-Pipeline pro Bild: Pairs generieren → übersetzen → KI-Review → Audio → ready. // In-Process-Queue mit einem Worker (rate-limit-freundlich). Jeder Schritt ist idempotent, // d.h. ein Resume nach Crash/Redeploy überspringt bereits Erledigtes. const { query } = require('../db'); const { LANGS, fillMissingRow } = require('./translate'); const { PLACEHOLDER_RE } = require('./placeholders'); const { translateWordGroup } = require('./pairContent'); const { generatePairsForObject, persistPair } = require('./generatePairs'); const { reviewPicturePairs } = require('./reviewPairs'); const { generateAndStore, describeError } = require('../routes/audios'); const queue = []; let running = false; function enqueue(pictureId) { if (!queue.includes(pictureId)) queue.push(pictureId); pump(); } async function pump() { if (running) return; running = true; try { while (queue.length) { const id = queue.shift(); try { await runPicture(id); } catch (err) { console.error(`Pipeline für Bild ${id} fehlgeschlagen:`, err); await setFailed(id, null, err).catch(() => {}); } } } finally { running = false; } } // Beim API-Boot: hängengebliebene Läufe (queued/running) wieder aufnehmen. async function resumePending() { const r = await query( `SELECT id FROM pictures WHERE pipeline_status IN ('queued', 'running')`); for (const row of r.rows) enqueue(row.id); if (r.rows.length) console.log(`Pipeline: ${r.rows.length} Bild(er) nach Neustart wieder aufgenommen`); } async function getPairsPerObject() { const r = await query(`SELECT value FROM app_settings WHERE key = 'pipeline.pairs_per_object'`); const n = parseInt(r.rows[0]?.value); return Math.min(Math.max(isNaN(n) ? 5 : n, 1), 20); } async function setStep(pictureId, step, progress) { await query( `UPDATE pictures SET pipeline_step = $2, pipeline_progress = $3 WHERE id = $1`, [pictureId, step, JSON.stringify(progress)]); } async function setFailed(pictureId, step, err) { await query( `UPDATE pictures SET pipeline_status='failed', pipeline_step=COALESCE($2, pipeline_step), pipeline_error=$3, pipeline_finished_at=NOW() WHERE id=$1`, [pictureId, step, (err.message || String(err)).slice(0, 1000)]); } // Objekte eines Bildes inkl. zugewiesener Wörter + Selektionen laden. async function loadObjects(pictureId) { const objs = (await query( `SELECT o.id, o.status, o.selections FROM object_pictures op JOIN objects o ON o.id = op.object_id WHERE op.picture_id = $1 AND o.status <> 'blocked' ORDER BY o.created_at`, [pictureId])).rows; for (const o of objs) { o.words = (await query( `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv FROM object_words ow JOIN words w ON w.id = ow.word_id WHERE ow.object_id = $1`, [o.id])).rows; } return objs; } // Alle Pairs eines Bildes (über object_pairs), ohne geblockte. async function loadPairs(pictureId) { return (await query( `SELECT DISTINCT p.id, p.answer_type, p.status, p.question_id, p.positive_statement_id, p.negative_statement_id FROM object_pairs op JOIN object_pictures pic ON pic.object_id = op.object_id JOIN pairs p ON p.id = op.pair_id WHERE pic.picture_id = $1 AND p.status <> 'blocked' ORDER BY p.id`, [pictureId])).rows; } // Word-IDs aller {{label.w:uuid}}-Placeholder in den Sätzen der Pairs. // Diese Wörter entstehen bei der Generierung (Nomen im Satz) und hängen nicht an // statement_words/object_words — für Übersetzung + Audio müssen sie mitgenommen werden. async function collectPlaceholderWordIds(pairs) { const ids = new Set(); const scan = text => { for (const m of String(text || '').matchAll(PLACEHOLDER_RE)) if (m[2] === 'w') ids.add(m[3]); }; const questionIds = [...new Set(pairs.map(p => p.question_id).filter(Boolean))]; const stmtIds = [...new Set(pairs.flatMap(p => [p.positive_statement_id, p.negative_statement_id]).filter(Boolean))]; if (questionIds.length) { const r = await query( `SELECT sentence_de, sentence_en, sentence_sv FROM questions WHERE id = ANY($1)`, [questionIds]); r.rows.forEach(row => Object.values(row).forEach(scan)); } if (stmtIds.length) { const r = await query( `SELECT positive_sentence_de, positive_sentence_en, positive_sentence_sv, negative_sentence_de, negative_sentence_en, negative_sentence_sv FROM statements WHERE id = ANY($1)`, [stmtIds]); r.rows.forEach(row => Object.values(row).forEach(scan)); } return ids; } async function runPicture(pictureId) { // Claim — nur Bilder, die in der Pipeline sind const claim = await query( `UPDATE pictures SET pipeline_status='running', pipeline_error=NULL, pipeline_started_at=COALESCE(pipeline_started_at, NOW()), pipeline_finished_at=NULL WHERE id=$1 AND pipeline_status IN ('queued','running','failed') RETURNING id, picture_link`, [pictureId]); if (!claim.rows.length) return; const picture = claim.rows[0]; const objects = await loadObjects(pictureId); if (!objects.length) { await setFailed(pictureId, 'pairs', new Error('Bild hat keine Objekte')); return; } const progress = { objectsDone: 0, objectsTotal: objects.length, pairsCreated: 0, translatedPairs: 0, pairsTotal: 0, audiosDone: 0, audiosTotal: 0, incompletePairs: 0 }; // ── Step 1: Pairs generieren (pro Objekt, skip wenn schon genug) ──────────── try { const targetCount = await getPairsPerObject(); await setStep(pictureId, 'pairs', progress); const claudeObjects = objects.map(o => ({ id: o.id, words: o.words, selections: o.selections || [] })); for (const obj of objects) { const have = parseInt((await query( `SELECT count(*) AS c FROM object_pairs op JOIN pairs p ON p.id = op.pair_id WHERE op.object_id = $1 AND p.status <> 'blocked'`, [obj.id])).rows[0].c); if (have < targetCount) { if (!picture.picture_link) throw new Error('Bild hat keinen picture_link für die KI-Analyse'); let pairs; try { pairs = await generatePairsForObject({ imageUrl: picture.picture_link, objects: claudeObjects, selectedObjectId: obj.id, count: targetCount - have, }); } catch (err) { // Ein Retry bei Parse-/API-Fehlern, dann aufgeben pairs = await generatePairsForObject({ imageUrl: picture.picture_link, objects: claudeObjects, selectedObjectId: obj.id, count: targetCount - have, }); } for (const p of pairs) { await persistPair(p, obj.id); progress.pairsCreated++; await setStep(pictureId, 'pairs', progress); } } progress.objectsDone++; await setStep(pictureId, 'pairs', progress); } } catch (err) { await setFailed(pictureId, 'pairs', err); return; } // ── Step 2: Übersetzen (pro Pair, füllt nur fehlende Sprachen) ────────────── // Pro Pair gekapselt: scheitert eine Übersetzung (z.B. transienter API-Fehler trotz // Retry), verliert das nicht den ganzen Lauf — der Rest läuft weiter, Fehlende werden // gezählt und können später über „Übersetzungen nachholen" ergänzt werden. const pairs = await loadPairs(pictureId); progress.pairsTotal = pairs.length; progress.translateFailures = 0; await setStep(pictureId, 'translate', progress); for (const p of pairs) { try { await translatePair(p); } catch (err) { progress.translateFailures++; console.error(`Translate-Fehler bei Pair ${p.id}:`, err.message); } progress.translatedPairs++; await setStep(pictureId, 'translate', progress); } // Nomen-Wörter aus Satz-Placeholdern ({{label.w:id}}) mitübersetzen try { for (const wid of await collectPlaceholderWordIds(pairs)) { try { await fillMissingRow('words', wid, ['titel']); } catch (err) { progress.translateFailures++; console.error(`Translate-Fehler bei Wort ${wid}:`, err.message); } } } catch (err) { console.error(`Placeholder-Wörter sammeln fehlgeschlagen:`, err.message); } // ── Step 2.5: KI-Review — alle Pairs + Bild an Sonnet zum Korrekturlesen ──── // (Rechtschreibung, Übersetzungs-Konsistenz, Plausibilität zum Bild). Korrekturen // landen vor der Audio-Erzeugung in der DB; Fehler sind wie beim Übersetzen nicht // fatal — Audio läuft trotzdem, der Lauf wird nicht abgebrochen. progress.reviewedPairs = 0; progress.correctionsApplied = 0; progress.reviewFailures = 0; await setStep(pictureId, 'review', progress); try { await reviewPicturePairs({ pictureId, pictureUrl: picture.picture_link, pairs, progress, onProgress: () => setStep(pictureId, 'review', progress), }); } catch (err) { progress.reviewFailures++; console.error(`Review-Fehler bei Bild ${pictureId}:`, err.message); } await setStep(pictureId, 'review', progress); // ── Step 3: Audio für alle Sätze + Wörter des Bildes in allen Sprachen ────── try { const units = await collectAudioUnits(pictureId, pairs); progress.audiosTotal = units.length; progress.audiosDone = units.filter(u => u.hasAudio).length; await setStep(pictureId, 'audio', progress); const failures = []; for (const u of units) { if (u.hasAudio) continue; try { await generateWithBackoff(u); progress.audiosDone++; } catch (err) { failures.push(`${u.source_table}/${u.source_field}/${u.language}: ${describeError(err)}`); } await setStep(pictureId, 'audio', progress); } if (failures.length && progress.audiosDone === 0) throw new Error(`Alle Audio-Generierungen fehlgeschlagen: ${failures[0]}`); progress.audioFailures = failures.length; } catch (err) { await setFailed(pictureId, 'audio', err); return; } // ── Step 4: Abschluss — vollständige Inhalte auf 'reviewed', Bild auf 'ready' try { let incomplete = 0; for (const p of pairs) { if (await isPairComplete(p)) { if (p.question_id) await query(`UPDATE questions SET status='reviewed' WHERE id=$1 AND status='draft'`, [p.question_id]); const stmtIds = [p.positive_statement_id, p.negative_statement_id].filter(Boolean); if (stmtIds.length) await query(`UPDATE statements SET status='reviewed' WHERE id = ANY($1) AND status='draft'`, [stmtIds]); await query(`UPDATE pairs SET status='reviewed' WHERE id=$1 AND status='draft'`, [p.id]); } else incomplete++; } progress.incompletePairs = incomplete; await query( `UPDATE objects SET status='reviewed' WHERE id IN (SELECT object_id FROM object_pictures WHERE picture_id = $1) AND status='draft'`, [pictureId]); await query( `UPDATE pictures SET status = CASE WHEN status='uploaded' THEN 'reviewed' ELSE status END, pipeline_status='ready', pipeline_step=NULL, pipeline_progress=$2, pipeline_finished_at=NOW() WHERE id=$1`, [pictureId, JSON.stringify(progress)]); } catch (err) { await setFailed(pictureId, 'finish', err); return; } } // Übersetzt die fehlenden Sprachen EINES Pairs (Frage + Sätze bzw. Wort-Gruppen). // overwrite=true übersetzt auch bereits gefüllte Zielsprachen neu. async function translatePair(p, overwrite = false) { let questionRow = null; if (p.question_id) { questionRow = (await query( `SELECT sentence_de, sentence_en, sentence_sv FROM questions WHERE id = $1`, [p.question_id])).rows[0] || null; await fillMissingRow('questions', p.question_id, ['sentence'], { overwrite }); } if (p.answer_type === 'word') { if (p.positive_statement_id) await translateWordGroup(p.positive_statement_id, 'statement_positive_words', questionRow, overwrite); if (p.negative_statement_id) await translateWordGroup(p.negative_statement_id, 'statement_negative_words', questionRow, overwrite); } else { if ((p.answer_type === 'text' || p.answer_type === 'question') && p.positive_statement_id) await fillMissingRow('statements', p.positive_statement_id, ['positive_sentence'], { overwrite }); if (p.answer_type === 'question' && p.negative_statement_id) await fillMissingRow('statements', p.negative_statement_id, ['negative_sentence'], { overwrite }); } } // Alle 3 Sprachen in den genutzten Feldern des Pairs gefüllt? (Spiegel des Review-Checks) async function isPairComplete(p) { if (p.question_id) { const q = (await query( `SELECT sentence_de, sentence_en, sentence_sv FROM questions WHERE id=$1`, [p.question_id])).rows[0]; if (!q || LANGS.some(l => !(q[`sentence_${l}`] || '').trim())) return false; } if (p.answer_type === 'word') { for (const [stmtId, link] of [[p.positive_statement_id, 'statement_positive_words'], [p.negative_statement_id, 'statement_negative_words']]) { if (!stmtId) continue; const ws = (await query( `SELECT w.titel_de, w.titel_en, w.titel_sv FROM ${link} lw JOIN words w ON w.id = lw.word_id WHERE lw.statement_id = $1`, [stmtId])).rows; if (ws.some(w => LANGS.some(l => !(w[`titel_${l}`] || '').trim()))) return false; } } else if (p.answer_type === 'text' || p.answer_type === 'question') { if (p.positive_statement_id) { const s = (await query( `SELECT positive_sentence_de, positive_sentence_en, positive_sentence_sv FROM statements WHERE id=$1`, [p.positive_statement_id])).rows[0]; if (!s || LANGS.some(l => !(s[`positive_sentence_${l}`] || '').trim())) return false; } if (p.answer_type === 'question' && p.negative_statement_id) { const s = (await query( `SELECT negative_sentence_de, negative_sentence_en, negative_sentence_sv FROM statements WHERE id=$1`, [p.negative_statement_id])).rows[0]; const hasAny = s && LANGS.some(l => (s[`negative_sentence_${l}`] || '').trim()); if (hasAny && LANGS.some(l => !(s[`negative_sentence_${l}`] || '').trim())) return false; } } return true; } // Audio-Einheiten des Bildes: Frage-/Statement-Sätze + verlinkte Wörter × Sprachen. // Nur Felder, die in ALLEN Sprachen Text haben (Regel wie audios.js computeUnits). async function collectAudioUnits(pictureId, pairs) { const units = []; const questionIds = [...new Set(pairs.map(p => p.question_id).filter(Boolean))]; const stmtIds = [...new Set(pairs.flatMap(p => [p.positive_statement_id, p.negative_statement_id]).filter(Boolean))]; // Wörter: über die Statements der Pairs (word-Typ) + object_words des Bildes const wordIds = new Set(); if (stmtIds.length) { for (const link of ['statement_positive_words', 'statement_negative_words']) { const r = await query(`SELECT word_id FROM ${link} WHERE statement_id = ANY($1)`, [stmtIds]); r.rows.forEach(x => wordIds.add(x.word_id)); } } const ow = await query( `SELECT ow.word_id FROM object_words ow JOIN object_pictures op ON op.object_id = ow.object_id WHERE op.picture_id = $1`, [pictureId]); ow.rows.forEach(x => wordIds.add(x.word_id)); // + Nomen-Wörter aus Satz-Placeholdern ({{label.w:id}}) (await collectPlaceholderWordIds(pairs)).forEach(id => wordIds.add(id)); const sources = []; if (questionIds.length) { const r = await query( `SELECT id, sentence_de, sentence_en, sentence_sv FROM questions WHERE id = ANY($1)`, [questionIds]); r.rows.forEach(row => sources.push({ table: 'questions', row, fields: ['sentence'] })); } if (stmtIds.length) { const r = await query( `SELECT id, positive_sentence_de, positive_sentence_en, positive_sentence_sv, negative_sentence_de, negative_sentence_en, negative_sentence_sv FROM statements WHERE id = ANY($1)`, [stmtIds]); r.rows.forEach(row => sources.push({ table: 'statements', row, fields: ['positive_sentence', 'negative_sentence'] })); } if (wordIds.size) { const r = await query( `SELECT id, titel_de, titel_en, titel_sv FROM words WHERE id = ANY($1) AND status <> 'blocked'`, [[...wordIds]]); r.rows.forEach(row => sources.push({ table: 'words', row, fields: ['titel'] })); } // Vorhandene Audios in einem Rutsch laden const sourceIds = sources.map(s => s.row.id); const have = new Set(); if (sourceIds.length) { const a = await query( `SELECT source_table, source_id, source_field, language FROM audios WHERE source_id = ANY($1) AND status <> 'blocked'`, [sourceIds]); a.rows.forEach(x => have.add(`${x.source_table}|${x.source_id}|${x.source_field}|${x.language}`)); } for (const { table, row, fields } of sources) { for (const f of fields) { const allFilled = LANGS.every(l => (row[`${f}_${l}`] || '').trim()); if (!allFilled) continue; for (const l of LANGS) { units.push({ source_table: table, source_id: row.id, source_field: f, language: l, text: (row[`${f}_${l}`] || '').trim(), hasAudio: have.has(`${table}|${row.id}|${f}|${l}`), }); } } } return units; } // ElevenLabs mit Backoff bei Rate-Limit (429). async function generateWithBackoff(u) { const delays = [2000, 8000, 30000]; for (let attempt = 0; ; attempt++) { try { return await generateAndStore({ text: u.text, language: u.language, source_table: u.source_table, source_id: u.source_id, source_field: u.source_field, }); } catch (err) { if (err.status === 429 && attempt < delays.length) { await new Promise(r => setTimeout(r, delays[attempt])); continue; } throw err; } } } module.exports = { enqueue, resumePending, loadPairs, collectAudioUnits, generateWithBackoff, translatePair };