From d66cff3f612f84b5e970a537578c48844ef45af6 Mon Sep 17 00:00:00 2001 From: admin Date: Mon, 15 Jun 2026 14:27:09 +0200 Subject: [PATCH] feat: automatische Wort-Kategorisierung (Batches API + Sofort-Backfill) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Feste ~20er-Taxonomie geseedet (de/en/sv, published; bestehende Kategorien werden wiederverwendet) + Tabelle category_batches. src/lib/classifyWords.js: findet in Pairs verwendete Wörter ohne Kategorie und klassifiziert sie per Haiku gegen die feste Liste. - Stundenjob über die Message Batches API (asynchron, ~50% günstiger): submit/collect-Ticks, in index.js nach Boot + stündlich. - Sofortiger synchroner One-Shot-Backfill (classifyWordsSync) für Live-Test ohne 24h-Verzug. Beides materialisiert pair_categories via derivePairCategories. POST /api/categories/auto-assign (admin): ?sync=true = Sofort-Backfill, sonst ein Batch-Tick. Entkoppelt von generate-words und Publish. Co-Authored-By: Claude Opus 4.8 --- src/db-migrate.js | 50 +++++++ src/index.js | 8 ++ src/lib/classifyWords.js | 295 +++++++++++++++++++++++++++++++++++++++ src/routes/categories.js | 12 ++ 4 files changed, 365 insertions(+) create mode 100644 src/lib/classifyWords.js diff --git a/src/db-migrate.js b/src/db-migrate.js index a7eeafb..908cb75 100644 --- a/src/db-migrate.js +++ b/src/db-migrate.js @@ -131,6 +131,56 @@ async function migrate() { ) `); + // Feste Alltags-Taxonomie seeden (de/en/sv, published). Basis für die automatische + // Wort-Kategorisierung (src/lib/classifyWords.js) und die Kategorie-Punkte im Profil. + // Idempotent: bestehende Kategorie (z. B. "Tiere") wird wiederverwendet, keine Dubletten. + const CATEGORY_TAXONOMY = [ + ['Lebensmittel', 'Food', 'Mat'], + ['Tiere', 'Animals', 'Djur'], + ['Körper', 'Body', 'Kropp'], + ['Kleidung', 'Clothing', 'Kläder'], + ['Familie & Menschen','Family & People', 'Familj & människor'], + ['Beruf & Arbeit', 'Job & Work', 'Jobb & arbete'], + ['Haushalt', 'Household', 'Hushåll'], + ['Wohnen & Möbel', 'Home & Furniture', 'Hem & möbler'], + ['Natur & Pflanzen', 'Nature & Plants', 'Natur & växter'], + ['Wetter', 'Weather', 'Väder'], + ['Verkehr & Reisen', 'Transport & Travel', 'Transport & resor'], + ['Stadt & Gebäude', 'City & Buildings', 'Stad & byggnader'], + ['Schule & Bildung', 'School & Education', 'Skola & utbildning'], + ['Technik & Geräte', 'Technology & Devices','Teknik & apparater'], + ['Sport & Freizeit', 'Sports & Leisure', 'Sport & fritid'], + ['Gefühle', 'Emotions', 'Känslor'], + ['Farben', 'Colors', 'Färger'], + ['Zahlen & Zeit', 'Numbers & Time', 'Tal & tid'], + ['Werkzeuge', 'Tools', 'Verktyg'], + ['Sonstiges', 'Other', 'Övrigt'], + ]; + for (const [de, en, sv] of CATEGORY_TAXONOMY) { + await query( + `INSERT INTO categories (titel_de, titel_en, titel_sv, status, requested_at, published_at) + SELECT $1, $2, $3, 'published', NOW(), NOW() + WHERE NOT EXISTS (SELECT 1 FROM categories WHERE lower(titel_de) = lower($1))`, + [de, en, sv] + ).catch(() => {}); + } + // Bestehende Treffer auf published heben (z. B. die alte "Tiere"-Kategorie) + await query( + `UPDATE categories + SET status = 'published', published_at = COALESCE(published_at, NOW()) + WHERE lower(titel_de) = ANY($1) AND status <> 'published'`, + [CATEGORY_TAXONOMY.map(([de]) => de.toLowerCase())] + ).catch(() => {}); + + // Asynchroner Kategorisierungs-Batch (Message Batches API) — Status über Boots/Redeploys merken + await query(` + CREATE TABLE IF NOT EXISTS category_batches ( + batch_id TEXT PRIMARY KEY, + status TEXT NOT NULL DEFAULT 'submitted', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `); + await query(` CREATE TABLE IF NOT EXISTS questions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), diff --git a/src/index.js b/src/index.js index d499672..7cea604 100644 --- a/src/index.js +++ b/src/index.js @@ -62,5 +62,13 @@ migrate() // Hängengebliebene Pipeline-Läufe (z.B. nach Redeploy) wieder aufnehmen require('./lib/pipeline').resumePending() .catch(err => console.error('Pipeline-Resume fehlgeschlagen:', err)); + + // Automatische Wort-Kategorisierung (Message Batches API): kurz nach Boot + stündlich. + // Submit/Collect-Ticks, entkoppelt von generate-words und Publish. + const { runCategorizationTick } = require('./lib/classifyWords'); + const HOUR = 60 * 60 * 1000; + const tick = () => runCategorizationTick().catch(err => console.error('Auto-Kategorisierung:', err.message)); + setTimeout(tick, 30_000); + setInterval(tick, HOUR); }) .catch(err => { console.error('Migration failed:', err); process.exit(1); }); diff --git a/src/lib/classifyWords.js b/src/lib/classifyWords.js new file mode 100644 index 0000000..d57f244 --- /dev/null +++ b/src/lib/classifyWords.js @@ -0,0 +1,295 @@ +// Automatische Wort-Kategorisierung über die Anthropic Message Batches API (asynchron, ~50% günstiger). +// Entkoppelt vom generate-words-Prompt und vom Publish-Flow: ein stündlicher Job (src/index.js) +// findet Wörter, die in Pairs verwendet werden aber noch keine Kategorie haben, lässt sie von Haiku +// gegen die feste Taxonomie (src/db-migrate.js) klassifizieren und materialisiert danach pair_categories. +const { query } = require('../db'); +const { resolvePlaceholdersToLabels } = require('./placeholders'); +const { derivePairCategories } = require('./pairCategories'); + +const ANTHROPIC_BASE = 'https://api.anthropic.com'; +const MODEL = 'claude-haiku-4-5-20251001'; +const BATCH_LIMIT = 500; // max. Wörter pro Submit (Batches API erlaubt bis 100k) +const MAX_EXAMPLES = 3; + +let running = false; // Overlap-Schutz zwischen Ticks + +function headers() { + const apiKey = process.env.ANTHROPIC_API_KEY; + if (!apiKey) throw new Error('ANTHROPIC_API_KEY nicht konfiguriert'); + return { 'Content-Type': 'application/json', 'x-api-key': apiKey, 'anthropic-version': '2023-06-01' }; +} + +// Veröffentlichte Kategorien laden → Map (lower(titel_de|titel_en) → {id, titel_de}) + Namensliste fürs Prompt. +async function loadCategories() { + const r = await query(`SELECT id, titel_de, titel_en FROM categories WHERE status = 'published'`); + const byName = new Map(); + for (const c of r.rows) { + if (c.titel_de) byName.set(c.titel_de.toLowerCase(), c); + if (c.titel_en) byName.set(c.titel_en.toLowerCase(), c); + } + return { rows: r.rows, byName }; +} + +// Wörter ohne Kategorie, die in Pairs (Statements oder Objekte) verwendet werden. +async function findUncategorizedUsedWords(limit = BATCH_LIMIT) { + const r = await query( + `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv + FROM words w + WHERE NOT EXISTS (SELECT 1 FROM word_categories wc WHERE wc.word_id = w.id) + AND ( + EXISTS (SELECT 1 FROM statement_positive_words spw WHERE spw.word_id = w.id) + OR EXISTS (SELECT 1 FROM statement_negative_words snw WHERE snw.word_id = w.id) + OR EXISTS (SELECT 1 FROM object_words ow WHERE ow.word_id = w.id) + ) + AND COALESCE(w.titel_de, w.titel_en, w.titel_sv) IS NOT NULL + ORDER BY w.created_at DESC + LIMIT $1`, + [limit] + ); + return r.rows; +} + +// Bis zu `max` englische Beispielsätze, die das Wort enthalten (Tokens → Labels, ohne uuid). +async function examplesForWord(wordId, max = MAX_EXAMPLES) { + const out = []; + const seen = new Set(); + const push = (s) => { + const t = resolvePlaceholdersToLabels(s || '').trim(); + if (t && !seen.has(t.toLowerCase())) { seen.add(t.toLowerCase()); out.push(t); } + }; + + const stmt = await query( + `SELECT s.positive_sentence_en AS s + FROM statement_positive_words spw JOIN statements s ON s.id = spw.statement_id + WHERE spw.word_id = $1 AND s.positive_sentence_en IS NOT NULL + UNION + SELECT s.negative_sentence_en + FROM statement_negative_words snw JOIN statements s ON s.id = snw.statement_id + WHERE snw.word_id = $1 AND s.negative_sentence_en IS NOT NULL + LIMIT 10`, + [wordId] + ); + for (const row of stmt.rows) { push(row.s); if (out.length >= max) return out; } + + const qs = await query( + `SELECT DISTINCT q.sentence_en AS s + FROM object_words ow + JOIN object_pairs op ON op.object_id = ow.object_id + JOIN pairs p ON p.id = op.pair_id + JOIN questions q ON q.id = p.question_id + WHERE ow.word_id = $1 AND q.sentence_en IS NOT NULL + LIMIT 10`, + [wordId] + ); + for (const row of qs.rows) { push(row.s); if (out.length >= max) break; } + return out; +} + +function buildPrompt(word, examples, categoryNamesDe) { + const title = word.titel_en || word.titel_de || word.titel_sv || ''; + const titleDe = word.titel_de ? ` (de: "${word.titel_de}")` : ''; + const ex = examples.length + ? `\n\nExample sentences using the word:\n${examples.map(e => `- ${e}`).join('\n')}` + : ''; + return ( + `Categories (choose exactly one, by its German name):\n${categoryNamesDe.join(', ')}\n\n` + + `Classify this single vocabulary word into the best-fitting category. ` + + `If none fits, use "Sonstiges".\n\n` + + `Word: "${title}"${titleDe}${ex}\n\n` + + `Reply with JSON only: {"category":""}` + ); +} + +// Wörter als Batch einreichen (ein Request pro Wort, custom_id = word.id). Gibt batch_id zurück. +async function submitBatch(words, categoryNamesDe) { + const system = 'Du bist ein präziser Klassifizierer. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown.'; + const requests = []; + for (const w of words) { + const examples = await examplesForWord(w.id); + requests.push({ + custom_id: w.id, + params: { + model: MODEL, + max_tokens: 64, + system, + messages: [{ role: 'user', content: buildPrompt(w, examples, categoryNamesDe) }], + }, + }); + } + const res = await fetch(`${ANTHROPIC_BASE}/v1/messages/batches`, { + method: 'POST', headers: headers(), body: JSON.stringify({ requests }), + }); + if (!res.ok) { + const err = await res.text().catch(() => ''); + throw new Error(`Batch-Submit fehlgeschlagen (${res.status}): ${err.slice(0, 300)}`); + } + const data = await res.json(); + await query(`INSERT INTO category_batches (batch_id, status) VALUES ($1, 'submitted') ON CONFLICT DO NOTHING`, [data.id]); + return data.id; +} + +// pair_categories für alle Pairs neu ableiten, die eines der Wörter referenzieren. +async function rederivePairsForWords(wordIds) { + if (!wordIds.length) return; + const pairs = await query( + `SELECT DISTINCT p.id FROM pairs p + WHERE p.positive_statement_id IN (SELECT statement_id FROM statement_positive_words WHERE word_id = ANY($1)) + OR p.positive_statement_id IN (SELECT statement_id FROM statement_negative_words WHERE word_id = ANY($1)) + OR p.negative_statement_id IN (SELECT statement_id FROM statement_positive_words WHERE word_id = ANY($1)) + OR p.negative_statement_id IN (SELECT statement_id FROM statement_negative_words WHERE word_id = ANY($1)) + OR p.id IN (SELECT op.pair_id FROM object_pairs op + JOIN object_words ow ON ow.object_id = op.object_id + WHERE ow.word_id = ANY($1))`, + [wordIds] + ); + if (pairs.rows.length) await derivePairCategories(pairs.rows.map(p => p.id)).catch(() => {}); +} + +// Synchroner Claude-Call (/v1/messages) — für den sofortigen One-Shot-Backfill (kein 24h-Batch-Verzug). +async function messagesCall(system, user, maxTokens = 2000) { + const res = await fetch(`${ANTHROPIC_BASE}/v1/messages`, { + method: 'POST', headers: headers(), + body: JSON.stringify({ model: MODEL, max_tokens: maxTokens, system, messages: [{ role: 'user', content: user }] }), + }); + if (!res.ok) { const t = await res.text().catch(() => ''); throw new Error(`Claude ${res.status}: ${t.slice(0, 200)}`); } + const data = await res.json(); + let raw = (data.content?.[0]?.text || '').trim(); + const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/); + if (md) raw = md[1]; + return JSON.parse(raw); +} + +function parseCategory(text) { + if (!text) return null; + let raw = text.trim(); + const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/); + if (md) raw = md[1]; + try { return (JSON.parse(raw).category || '').toString().trim() || null; } + catch { return null; } +} + +// Batch einsammeln, falls fertig: Ergebnisse anwenden (word_categories + pair_categories). +// Gibt { ended, linked } zurück. +async function collectBatch(batchId) { + const res = await fetch(`${ANTHROPIC_BASE}/v1/messages/batches/${batchId}`, { headers: headers() }); + if (!res.ok) { + // Batch unbekannt/gelöscht → Eintrag aufräumen, damit der nächste Tick neu submitten kann + if (res.status === 404) await query(`DELETE FROM category_batches WHERE batch_id = $1`, [batchId]); + return { ended: false, linked: 0 }; + } + const batch = await res.json(); + if (batch.processing_status !== 'ended' || !batch.results_url) return { ended: false, linked: 0 }; + + const { byName } = await loadCategories(); + const fallback = byName.get('sonstiges') || null; + + const r = await fetch(batch.results_url, { headers: headers() }); + if (!r.ok) return { ended: false, linked: 0 }; + const jsonl = await r.text(); + + const linkedWordIds = []; + for (const line of jsonl.split('\n')) { + const trimmed = line.trim(); + if (!trimmed) continue; + let entry; + try { entry = JSON.parse(trimmed); } catch { continue; } + if (entry.result?.type !== 'succeeded') continue; + const wordId = entry.custom_id; + const text = entry.result.message?.content?.[0]?.text; + const name = parseCategory(text); + const cat = (name && byName.get(name.toLowerCase())) || fallback; + if (!cat) continue; + await query( + `INSERT INTO word_categories (word_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, + [wordId, cat.id] + ).catch(() => {}); + linkedWordIds.push(wordId); + } + + // pair_categories für betroffene Pairs neu ableiten + await rederivePairsForWords(linkedWordIds); + + await query(`DELETE FROM category_batches WHERE batch_id = $1`, [batchId]); + return { ended: true, linked: linkedWordIds.length }; +} + +// Ein Tick: offenen Batch einsammeln; sonst neuen Batch für unkategorisierte Wörter einreichen. +async function runCategorizationTick() { + if (running) return { skipped: true }; + running = true; + try { + const open = await query(`SELECT batch_id FROM category_batches ORDER BY created_at ASC LIMIT 1`); + if (open.rows.length) { + const { ended, linked } = await collectBatch(open.rows[0].batch_id); + return { collected: ended, linked, batchId: open.rows[0].batch_id }; + } + const words = await findUncategorizedUsedWords(); + if (!words.length) return { remaining: 0 }; + const { rows } = await loadCategories(); + const names = rows.map(c => c.titel_de).filter(Boolean); + const batchId = await submitBatch(words, names); + return { submitted: words.length, batchId }; + } finally { + running = false; + } +} + +// Sofortiger One-Shot-Backfill (synchron, ohne 24h-Batch-Verzug): klassifiziert bestehende, +// in Pairs verwendete Wörter ohne Kategorie in Schüben per /v1/messages und materialisiert +// pair_categories direkt. Für den Live-Test gedacht; der Stundenjob bleibt für laufenden Nachschub. +async function classifyWordsSync({ max = 2000 } = {}) { + if (running) return { skipped: true }; + running = true; + try { + const { rows: catRows, byName } = await loadCategories(); + const names = catRows.map(c => c.titel_de).filter(Boolean); + const fallback = byName.get('sonstiges') || null; + const system = 'Du bist ein präziser Klassifizierer. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown.'; + let processed = 0, linked = 0; + + while (processed < max) { + const words = await findUncategorizedUsedWords(Math.min(40, max - processed)); + if (!words.length) break; + + const list = words.map(w => { + const t = w.titel_en || w.titel_de || w.titel_sv || ''; + const de = w.titel_de && w.titel_de !== t ? ` (de: ${w.titel_de})` : ''; + return `${w.id}\t${t}${de}`; + }).join('\n'); + const user = + `Categories (choose exactly one German name per word):\n${names.join(', ')}\n\n` + + `Classify each vocabulary word into the best-fitting category. If none fits, use "Sonstiges".\n` + + `Words (idtitle):\n${list}\n\n` + + `Reply with JSON only: {"assignments":[{"id":"","category":""}]}`; + + let assignments = []; + try { + const data = await messagesCall(system, user, 2000); + assignments = Array.isArray(data.assignments) ? data.assignments : []; + } catch { /* Fehler → ganze Charge bekommt Fallback, damit der Lauf fortschreitet */ } + + const byId = new Map(assignments.map(a => [String(a.id), a.category])); + const linkedIds = []; + for (const w of words) { + const name = byId.get(String(w.id)); + const cat = (name && byName.get(String(name).toLowerCase())) || fallback; + if (!cat) continue; + await query( + `INSERT INTO word_categories (word_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, + [w.id, cat.id] + ).catch(() => {}); + linkedIds.push(w.id); + } + await rederivePairsForWords(linkedIds); + + processed += words.length; + linked += linkedIds.length; + if (!linkedIds.length) break; // Sicherung gegen Endlosschleife (z. B. fehlende Fallback-Kategorie) + } + return { processed, linked }; + } finally { + running = false; + } +} + +module.exports = { runCategorizationTick, classifyWordsSync, findUncategorizedUsedWords, collectBatch, submitBatch }; diff --git a/src/routes/categories.js b/src/routes/categories.js index e93a409..3d3058b 100644 --- a/src/routes/categories.js +++ b/src/routes/categories.js @@ -1,8 +1,20 @@ const router = require('express').Router(); const { query } = require('../db'); +const { runCategorizationTick, classifyWordsSync } = require('../lib/classifyWords'); const STATUSES = ['requested', 'blocked', 'published']; +// POST /api/categories/auto-assign — Kategorisierung anstoßen. +// ?sync=true → sofortiger One-Shot-Backfill bestehender Wörter (synchron, kein 24h-Verzug) +// sonst → ein asynchroner Batch-Tick (submit/collect über die Message Batches API) +router.post('/auto-assign', async (req, res, next) => { + try { + const sync = req.query.sync === 'true' || req.body?.sync === true; + const result = sync ? await classifyWordsSync({}) : await runCategorizationTick(); + res.json(result); + } catch (err) { next(err); } +}); + const STATUS_TIMESTAMP = { requested: 'requested_at', published: 'published_at',