// Automatische Wort-Kategorisierung über die Anthropic Message Batches API (asynchron, ~50% günstiger). // Entkoppelt vom generate-words-Prompt und vom Publish-Flow: ein stündlicher Job (src/index.js) // findet Wörter, die in Pairs verwendet werden aber noch keine Kategorie haben, lässt sie von Haiku // gegen die feste Taxonomie (src/db-migrate.js) klassifizieren und materialisiert danach pair_categories. const { query } = require('../db'); const { resolvePlaceholdersToLabels } = require('./placeholders'); const { derivePairCategories } = require('./pairCategories'); const ANTHROPIC_BASE = 'https://api.anthropic.com'; const MODEL = 'claude-haiku-4-5-20251001'; const BATCH_LIMIT = 500; // max. Wörter pro Submit (Batches API erlaubt bis 100k) const MAX_EXAMPLES = 3; let running = false; // Overlap-Schutz zwischen Ticks function headers() { const apiKey = process.env.ANTHROPIC_API_KEY; if (!apiKey) throw new Error('ANTHROPIC_API_KEY nicht konfiguriert'); return { 'Content-Type': 'application/json', 'x-api-key': apiKey, 'anthropic-version': '2023-06-01' }; } // Veröffentlichte Kategorien laden → Map (lower(titel_de|titel_en) → {id, titel_de}) + Namensliste fürs Prompt. async function loadCategories() { const r = await query(`SELECT id, titel_de, titel_en FROM categories WHERE status = 'published'`); const byName = new Map(); for (const c of r.rows) { if (c.titel_de) byName.set(c.titel_de.toLowerCase(), c); if (c.titel_en) byName.set(c.titel_en.toLowerCase(), c); } return { rows: r.rows, byName }; } // Wörter ohne Kategorie, die in Pairs (Statements oder Objekte) verwendet werden. async function findUncategorizedUsedWords(limit = BATCH_LIMIT) { const r = await query( `SELECT w.id, w.titel_de, w.titel_en, w.titel_sv FROM words w WHERE NOT EXISTS (SELECT 1 FROM word_categories wc WHERE wc.word_id = w.id) AND ( EXISTS (SELECT 1 FROM statement_positive_words spw WHERE spw.word_id = w.id) OR EXISTS (SELECT 1 FROM statement_negative_words snw WHERE snw.word_id = w.id) OR EXISTS (SELECT 1 FROM object_words ow WHERE ow.word_id = w.id) ) AND COALESCE(w.titel_de, w.titel_en, w.titel_sv) IS NOT NULL ORDER BY w.created_at DESC LIMIT $1`, [limit] ); return r.rows; } // Bis zu `max` englische Beispielsätze, die das Wort enthalten (Tokens → Labels, ohne uuid). async function examplesForWord(wordId, max = MAX_EXAMPLES) { const out = []; const seen = new Set(); const push = (s) => { const t = resolvePlaceholdersToLabels(s || '').trim(); if (t && !seen.has(t.toLowerCase())) { seen.add(t.toLowerCase()); out.push(t); } }; const stmt = await query( `SELECT s.positive_sentence_en AS s FROM statement_positive_words spw JOIN statements s ON s.id = spw.statement_id WHERE spw.word_id = $1 AND s.positive_sentence_en IS NOT NULL UNION SELECT s.negative_sentence_en FROM statement_negative_words snw JOIN statements s ON s.id = snw.statement_id WHERE snw.word_id = $1 AND s.negative_sentence_en IS NOT NULL LIMIT 10`, [wordId] ); for (const row of stmt.rows) { push(row.s); if (out.length >= max) return out; } const qs = await query( `SELECT DISTINCT q.sentence_en AS s FROM object_words ow JOIN object_pairs op ON op.object_id = ow.object_id JOIN pairs p ON p.id = op.pair_id JOIN questions q ON q.id = p.question_id WHERE ow.word_id = $1 AND q.sentence_en IS NOT NULL LIMIT 10`, [wordId] ); for (const row of qs.rows) { push(row.s); if (out.length >= max) break; } return out; } // Gemeinsame Klassifizierungs-Regeln. Drückt Sonstiges stark zurück und gibt Wortart-Hinweise. const CLASSIFY_RULES = `Rules:\n` + `- Pick the SINGLE best-fitting category by its exact German name.\n` + `- Most concrete nouns DO fit a topic: animals→Tiere, food/fruit/vegetables→Lebensmittel, ` + `sky/star/fire/water/mountain/plants→Natur & Pflanzen, furniture/window/carpet/cushion→Wohnen & Möbel, ` + `street/building/lamp post→Stadt & Gebäude, books/pages→Schule & Bildung.\n` + `- Adjectives / properties (warm, fast, sweet, old, fragile, transparent…) → "Eigenschaften".\n` + `- Verbs / actions → "Verben & Handlungen".\n` + `- Use "Sonstiges" ONLY as a true last resort when nothing else fits at all.`; function buildPrompt(word, examples, categoryNamesDe) { const title = word.titel_en || word.titel_de || word.titel_sv || ''; const titleDe = word.titel_de ? ` (de: "${word.titel_de}")` : ''; const ex = examples.length ? `\n\nExample sentences using the word:\n${examples.map(e => `- ${e}`).join('\n')}` : ''; return ( `Categories (German names):\n${categoryNamesDe.join(', ')}\n\n${CLASSIFY_RULES}\n\n` + `Classify this single vocabulary word.\n\nWord: "${title}"${titleDe}${ex}\n\n` + `Reply with JSON only: {"category":""}` ); } // Wörter als Batch einreichen (ein Request pro Wort, custom_id = word.id). Gibt batch_id zurück. async function submitBatch(words, categoryNamesDe) { const system = 'Du bist ein präziser Klassifizierer. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown.'; const requests = []; for (const w of words) { const examples = await examplesForWord(w.id); requests.push({ custom_id: w.id, params: { model: MODEL, max_tokens: 64, system, messages: [{ role: 'user', content: buildPrompt(w, examples, categoryNamesDe) }], }, }); } const res = await fetch(`${ANTHROPIC_BASE}/v1/messages/batches`, { method: 'POST', headers: headers(), body: JSON.stringify({ requests }), }); if (!res.ok) { const err = await res.text().catch(() => ''); throw new Error(`Batch-Submit fehlgeschlagen (${res.status}): ${err.slice(0, 300)}`); } const data = await res.json(); await query(`INSERT INTO category_batches (batch_id, status) VALUES ($1, 'submitted') ON CONFLICT DO NOTHING`, [data.id]); return data.id; } // pair_categories für alle Pairs neu ableiten, die eines der Wörter referenzieren. async function rederivePairsForWords(wordIds) { if (!wordIds.length) return; const pairs = await query( `SELECT DISTINCT p.id FROM pairs p WHERE p.positive_statement_id IN (SELECT statement_id FROM statement_positive_words WHERE word_id = ANY($1)) OR p.positive_statement_id IN (SELECT statement_id FROM statement_negative_words WHERE word_id = ANY($1)) OR p.negative_statement_id IN (SELECT statement_id FROM statement_positive_words WHERE word_id = ANY($1)) OR p.negative_statement_id IN (SELECT statement_id FROM statement_negative_words WHERE word_id = ANY($1)) OR p.id IN (SELECT op.pair_id FROM object_pairs op JOIN object_words ow ON ow.object_id = op.object_id WHERE ow.word_id = ANY($1))`, [wordIds] ); if (pairs.rows.length) await derivePairCategories(pairs.rows.map(p => p.id)).catch(() => {}); } // Synchroner Claude-Call (/v1/messages) — für den sofortigen One-Shot-Backfill (kein 24h-Batch-Verzug). async function messagesCall(system, user, maxTokens = 2000) { const res = await fetch(`${ANTHROPIC_BASE}/v1/messages`, { method: 'POST', headers: headers(), body: JSON.stringify({ model: MODEL, max_tokens: maxTokens, system, messages: [{ role: 'user', content: user }] }), }); if (!res.ok) { const t = await res.text().catch(() => ''); throw new Error(`Claude ${res.status}: ${t.slice(0, 200)}`); } const data = await res.json(); let raw = (data.content?.[0]?.text || '').trim(); const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/); if (md) raw = md[1]; return JSON.parse(raw); } function parseCategory(text) { if (!text) return null; let raw = text.trim(); const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/); if (md) raw = md[1]; try { return (JSON.parse(raw).category || '').toString().trim() || null; } catch { return null; } } // Batch einsammeln, falls fertig: Ergebnisse anwenden (word_categories + pair_categories). // Gibt { ended, linked } zurück. async function collectBatch(batchId) { const res = await fetch(`${ANTHROPIC_BASE}/v1/messages/batches/${batchId}`, { headers: headers() }); if (!res.ok) { // Batch unbekannt/gelöscht → Eintrag aufräumen, damit der nächste Tick neu submitten kann if (res.status === 404) await query(`DELETE FROM category_batches WHERE batch_id = $1`, [batchId]); return { ended: false, linked: 0 }; } const batch = await res.json(); if (batch.processing_status !== 'ended' || !batch.results_url) return { ended: false, linked: 0 }; const { byName } = await loadCategories(); const fallback = byName.get('sonstiges') || null; const r = await fetch(batch.results_url, { headers: headers() }); if (!r.ok) return { ended: false, linked: 0 }; const jsonl = await r.text(); const linkedWordIds = []; for (const line of jsonl.split('\n')) { const trimmed = line.trim(); if (!trimmed) continue; let entry; try { entry = JSON.parse(trimmed); } catch { continue; } if (entry.result?.type !== 'succeeded') continue; const wordId = entry.custom_id; const text = entry.result.message?.content?.[0]?.text; const name = parseCategory(text); const cat = (name && byName.get(name.toLowerCase())) || fallback; if (!cat) continue; await query( `INSERT INTO word_categories (word_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, [wordId, cat.id] ).catch(() => {}); linkedWordIds.push(wordId); } // pair_categories für betroffene Pairs neu ableiten await rederivePairsForWords(linkedWordIds); await query(`DELETE FROM category_batches WHERE batch_id = $1`, [batchId]); return { ended: true, linked: linkedWordIds.length }; } // Ein Tick: offenen Batch einsammeln; sonst neuen Batch für unkategorisierte Wörter einreichen. async function runCategorizationTick() { if (running) return { skipped: true }; running = true; try { const open = await query(`SELECT batch_id FROM category_batches ORDER BY created_at ASC LIMIT 1`); if (open.rows.length) { const { ended, linked } = await collectBatch(open.rows[0].batch_id); return { collected: ended, linked, batchId: open.rows[0].batch_id }; } const words = await findUncategorizedUsedWords(); if (!words.length) return { remaining: 0 }; const { rows } = await loadCategories(); const names = rows.map(c => c.titel_de).filter(Boolean); const batchId = await submitBatch(words, names); return { submitted: words.length, batchId }; } finally { running = false; } } // Sofortiger One-Shot-Backfill (synchron, ohne 24h-Batch-Verzug): klassifiziert bestehende, // in Pairs verwendete Wörter ohne Kategorie in Schüben per /v1/messages und materialisiert // pair_categories direkt. Für den Live-Test gedacht; der Stundenjob bleibt für laufenden Nachschub. async function classifyWordsSync({ max = 2000, reset = false } = {}) { if (running) return { skipped: true }; running = true; try { const { rows: catRows, byName } = await loadCategories(); const names = catRows.map(c => c.titel_de).filter(Boolean); const fallback = byName.get('sonstiges') || null; const system = 'Du bist ein präziser Klassifizierer. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown.'; let processed = 0, linked = 0; // reset → bestehende Zuordnungen verwerfen und mit verbesserter Logik/Taxonomie neu klassifizieren if (reset) await query(`DELETE FROM word_categories`).catch(() => {}); while (processed < max) { const words = await findUncategorizedUsedWords(Math.min(15, max - processed)); if (!words.length) break; const lines = []; for (const w of words) { const t = w.titel_en || w.titel_de || w.titel_sv || ''; const de = w.titel_de && w.titel_de !== t ? ` (de: ${w.titel_de})` : ''; const ex = await examplesForWord(w.id, 2); const exStr = ex.length ? ` | e.g.: ${ex.map(e => `"${e}"`).join('; ')}` : ''; lines.push(`${w.id}\t${t}${de}${exStr}`); } const user = `Categories (German names):\n${names.join(', ')}\n\n${CLASSIFY_RULES}\n\n` + `Classify each vocabulary word below.\nWords (idtitle | examples):\n${lines.join('\n')}\n\n` + `Reply with JSON only: {"assignments":[{"id":"","category":""}]}`; let assignments = []; try { const data = await messagesCall(system, user, 1500); assignments = Array.isArray(data.assignments) ? data.assignments : []; } catch { /* Fehler → ganze Charge bekommt Fallback, damit der Lauf fortschreitet */ } const byId = new Map(assignments.map(a => [String(a.id), a.category])); const linkedIds = []; for (const w of words) { const name = byId.get(String(w.id)); const cat = (name && byName.get(String(name).toLowerCase())) || fallback; if (!cat) continue; await query( `INSERT INTO word_categories (word_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`, [w.id, cat.id] ).catch(() => {}); linkedIds.push(w.id); } await rederivePairsForWords(linkedIds); processed += words.length; linked += linkedIds.length; if (!linkedIds.length) break; // Sicherung gegen Endlosschleife (z. B. fehlende Fallback-Kategorie) } return { processed, linked }; } finally { running = false; } } module.exports = { runCategorizationTick, classifyWordsSync, findUncategorizedUsedWords, collectBatch, submitBatch };