feat: automatische Wort-Kategorisierung (Batches API + Sofort-Backfill)

Feste ~20er-Taxonomie geseedet (de/en/sv, published; bestehende
Kategorien werden wiederverwendet) + Tabelle category_batches.

src/lib/classifyWords.js: findet in Pairs verwendete Wörter ohne
Kategorie und klassifiziert sie per Haiku gegen die feste Liste.
- Stundenjob über die Message Batches API (asynchron, ~50% günstiger):
  submit/collect-Ticks, in index.js nach Boot + stündlich.
- Sofortiger synchroner One-Shot-Backfill (classifyWordsSync) für
  Live-Test ohne 24h-Verzug.
Beides materialisiert pair_categories via derivePairCategories.

POST /api/categories/auto-assign (admin): ?sync=true = Sofort-Backfill,
sonst ein Batch-Tick. Entkoppelt von generate-words und Publish.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-15 14:27:09 +02:00
parent 9738d3e35a
commit d66cff3f61
4 changed files with 365 additions and 0 deletions

295
src/lib/classifyWords.js Normal file
View File

@@ -0,0 +1,295 @@
// Automatische Wort-Kategorisierung über die Anthropic Message Batches API (asynchron, ~50% günstiger).
// Entkoppelt vom generate-words-Prompt und vom Publish-Flow: ein stündlicher Job (src/index.js)
// findet Wörter, die in Pairs verwendet werden aber noch keine Kategorie haben, lässt sie von Haiku
// gegen die feste Taxonomie (src/db-migrate.js) klassifizieren und materialisiert danach pair_categories.
const { query } = require('../db');
const { resolvePlaceholdersToLabels } = require('./placeholders');
const { derivePairCategories } = require('./pairCategories');
const ANTHROPIC_BASE = 'https://api.anthropic.com';
const MODEL = 'claude-haiku-4-5-20251001';
const BATCH_LIMIT = 500; // max. Wörter pro Submit (Batches API erlaubt bis 100k)
const MAX_EXAMPLES = 3;
let running = false; // Overlap-Schutz zwischen Ticks
function headers() {
const apiKey = process.env.ANTHROPIC_API_KEY;
if (!apiKey) throw new Error('ANTHROPIC_API_KEY nicht konfiguriert');
return { 'Content-Type': 'application/json', 'x-api-key': apiKey, 'anthropic-version': '2023-06-01' };
}
// Veröffentlichte Kategorien laden → Map (lower(titel_de|titel_en) → {id, titel_de}) + Namensliste fürs Prompt.
async function loadCategories() {
const r = await query(`SELECT id, titel_de, titel_en FROM categories WHERE status = 'published'`);
const byName = new Map();
for (const c of r.rows) {
if (c.titel_de) byName.set(c.titel_de.toLowerCase(), c);
if (c.titel_en) byName.set(c.titel_en.toLowerCase(), c);
}
return { rows: r.rows, byName };
}
// Wörter ohne Kategorie, die in Pairs (Statements oder Objekte) verwendet werden.
async function findUncategorizedUsedWords(limit = BATCH_LIMIT) {
const r = await query(
`SELECT w.id, w.titel_de, w.titel_en, w.titel_sv
FROM words w
WHERE NOT EXISTS (SELECT 1 FROM word_categories wc WHERE wc.word_id = w.id)
AND (
EXISTS (SELECT 1 FROM statement_positive_words spw WHERE spw.word_id = w.id)
OR EXISTS (SELECT 1 FROM statement_negative_words snw WHERE snw.word_id = w.id)
OR EXISTS (SELECT 1 FROM object_words ow WHERE ow.word_id = w.id)
)
AND COALESCE(w.titel_de, w.titel_en, w.titel_sv) IS NOT NULL
ORDER BY w.created_at DESC
LIMIT $1`,
[limit]
);
return r.rows;
}
// Bis zu `max` englische Beispielsätze, die das Wort enthalten (Tokens → Labels, ohne uuid).
async function examplesForWord(wordId, max = MAX_EXAMPLES) {
const out = [];
const seen = new Set();
const push = (s) => {
const t = resolvePlaceholdersToLabels(s || '').trim();
if (t && !seen.has(t.toLowerCase())) { seen.add(t.toLowerCase()); out.push(t); }
};
const stmt = await query(
`SELECT s.positive_sentence_en AS s
FROM statement_positive_words spw JOIN statements s ON s.id = spw.statement_id
WHERE spw.word_id = $1 AND s.positive_sentence_en IS NOT NULL
UNION
SELECT s.negative_sentence_en
FROM statement_negative_words snw JOIN statements s ON s.id = snw.statement_id
WHERE snw.word_id = $1 AND s.negative_sentence_en IS NOT NULL
LIMIT 10`,
[wordId]
);
for (const row of stmt.rows) { push(row.s); if (out.length >= max) return out; }
const qs = await query(
`SELECT DISTINCT q.sentence_en AS s
FROM object_words ow
JOIN object_pairs op ON op.object_id = ow.object_id
JOIN pairs p ON p.id = op.pair_id
JOIN questions q ON q.id = p.question_id
WHERE ow.word_id = $1 AND q.sentence_en IS NOT NULL
LIMIT 10`,
[wordId]
);
for (const row of qs.rows) { push(row.s); if (out.length >= max) break; }
return out;
}
function buildPrompt(word, examples, categoryNamesDe) {
const title = word.titel_en || word.titel_de || word.titel_sv || '';
const titleDe = word.titel_de ? ` (de: "${word.titel_de}")` : '';
const ex = examples.length
? `\n\nExample sentences using the word:\n${examples.map(e => `- ${e}`).join('\n')}`
: '';
return (
`Categories (choose exactly one, by its German name):\n${categoryNamesDe.join(', ')}\n\n` +
`Classify this single vocabulary word into the best-fitting category. ` +
`If none fits, use "Sonstiges".\n\n` +
`Word: "${title}"${titleDe}${ex}\n\n` +
`Reply with JSON only: {"category":"<exact German category name>"}`
);
}
// Wörter als Batch einreichen (ein Request pro Wort, custom_id = word.id). Gibt batch_id zurück.
async function submitBatch(words, categoryNamesDe) {
const system = 'Du bist ein präziser Klassifizierer. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown.';
const requests = [];
for (const w of words) {
const examples = await examplesForWord(w.id);
requests.push({
custom_id: w.id,
params: {
model: MODEL,
max_tokens: 64,
system,
messages: [{ role: 'user', content: buildPrompt(w, examples, categoryNamesDe) }],
},
});
}
const res = await fetch(`${ANTHROPIC_BASE}/v1/messages/batches`, {
method: 'POST', headers: headers(), body: JSON.stringify({ requests }),
});
if (!res.ok) {
const err = await res.text().catch(() => '');
throw new Error(`Batch-Submit fehlgeschlagen (${res.status}): ${err.slice(0, 300)}`);
}
const data = await res.json();
await query(`INSERT INTO category_batches (batch_id, status) VALUES ($1, 'submitted') ON CONFLICT DO NOTHING`, [data.id]);
return data.id;
}
// pair_categories für alle Pairs neu ableiten, die eines der Wörter referenzieren.
async function rederivePairsForWords(wordIds) {
if (!wordIds.length) return;
const pairs = await query(
`SELECT DISTINCT p.id FROM pairs p
WHERE p.positive_statement_id IN (SELECT statement_id FROM statement_positive_words WHERE word_id = ANY($1))
OR p.positive_statement_id IN (SELECT statement_id FROM statement_negative_words WHERE word_id = ANY($1))
OR p.negative_statement_id IN (SELECT statement_id FROM statement_positive_words WHERE word_id = ANY($1))
OR p.negative_statement_id IN (SELECT statement_id FROM statement_negative_words WHERE word_id = ANY($1))
OR p.id IN (SELECT op.pair_id FROM object_pairs op
JOIN object_words ow ON ow.object_id = op.object_id
WHERE ow.word_id = ANY($1))`,
[wordIds]
);
if (pairs.rows.length) await derivePairCategories(pairs.rows.map(p => p.id)).catch(() => {});
}
// Synchroner Claude-Call (/v1/messages) — für den sofortigen One-Shot-Backfill (kein 24h-Batch-Verzug).
async function messagesCall(system, user, maxTokens = 2000) {
const res = await fetch(`${ANTHROPIC_BASE}/v1/messages`, {
method: 'POST', headers: headers(),
body: JSON.stringify({ model: MODEL, max_tokens: maxTokens, system, messages: [{ role: 'user', content: user }] }),
});
if (!res.ok) { const t = await res.text().catch(() => ''); throw new Error(`Claude ${res.status}: ${t.slice(0, 200)}`); }
const data = await res.json();
let raw = (data.content?.[0]?.text || '').trim();
const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/);
if (md) raw = md[1];
return JSON.parse(raw);
}
function parseCategory(text) {
if (!text) return null;
let raw = text.trim();
const md = raw.match(/```(?:json)?\s*([\s\S]+?)\s*```/);
if (md) raw = md[1];
try { return (JSON.parse(raw).category || '').toString().trim() || null; }
catch { return null; }
}
// Batch einsammeln, falls fertig: Ergebnisse anwenden (word_categories + pair_categories).
// Gibt { ended, linked } zurück.
async function collectBatch(batchId) {
const res = await fetch(`${ANTHROPIC_BASE}/v1/messages/batches/${batchId}`, { headers: headers() });
if (!res.ok) {
// Batch unbekannt/gelöscht → Eintrag aufräumen, damit der nächste Tick neu submitten kann
if (res.status === 404) await query(`DELETE FROM category_batches WHERE batch_id = $1`, [batchId]);
return { ended: false, linked: 0 };
}
const batch = await res.json();
if (batch.processing_status !== 'ended' || !batch.results_url) return { ended: false, linked: 0 };
const { byName } = await loadCategories();
const fallback = byName.get('sonstiges') || null;
const r = await fetch(batch.results_url, { headers: headers() });
if (!r.ok) return { ended: false, linked: 0 };
const jsonl = await r.text();
const linkedWordIds = [];
for (const line of jsonl.split('\n')) {
const trimmed = line.trim();
if (!trimmed) continue;
let entry;
try { entry = JSON.parse(trimmed); } catch { continue; }
if (entry.result?.type !== 'succeeded') continue;
const wordId = entry.custom_id;
const text = entry.result.message?.content?.[0]?.text;
const name = parseCategory(text);
const cat = (name && byName.get(name.toLowerCase())) || fallback;
if (!cat) continue;
await query(
`INSERT INTO word_categories (word_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`,
[wordId, cat.id]
).catch(() => {});
linkedWordIds.push(wordId);
}
// pair_categories für betroffene Pairs neu ableiten
await rederivePairsForWords(linkedWordIds);
await query(`DELETE FROM category_batches WHERE batch_id = $1`, [batchId]);
return { ended: true, linked: linkedWordIds.length };
}
// Ein Tick: offenen Batch einsammeln; sonst neuen Batch für unkategorisierte Wörter einreichen.
async function runCategorizationTick() {
if (running) return { skipped: true };
running = true;
try {
const open = await query(`SELECT batch_id FROM category_batches ORDER BY created_at ASC LIMIT 1`);
if (open.rows.length) {
const { ended, linked } = await collectBatch(open.rows[0].batch_id);
return { collected: ended, linked, batchId: open.rows[0].batch_id };
}
const words = await findUncategorizedUsedWords();
if (!words.length) return { remaining: 0 };
const { rows } = await loadCategories();
const names = rows.map(c => c.titel_de).filter(Boolean);
const batchId = await submitBatch(words, names);
return { submitted: words.length, batchId };
} finally {
running = false;
}
}
// Sofortiger One-Shot-Backfill (synchron, ohne 24h-Batch-Verzug): klassifiziert bestehende,
// in Pairs verwendete Wörter ohne Kategorie in Schüben per /v1/messages und materialisiert
// pair_categories direkt. Für den Live-Test gedacht; der Stundenjob bleibt für laufenden Nachschub.
async function classifyWordsSync({ max = 2000 } = {}) {
if (running) return { skipped: true };
running = true;
try {
const { rows: catRows, byName } = await loadCategories();
const names = catRows.map(c => c.titel_de).filter(Boolean);
const fallback = byName.get('sonstiges') || null;
const system = 'Du bist ein präziser Klassifizierer. Antworte AUSSCHLIESSLICH mit gültigem JSON, ohne Markdown.';
let processed = 0, linked = 0;
while (processed < max) {
const words = await findUncategorizedUsedWords(Math.min(40, max - processed));
if (!words.length) break;
const list = words.map(w => {
const t = w.titel_en || w.titel_de || w.titel_sv || '';
const de = w.titel_de && w.titel_de !== t ? ` (de: ${w.titel_de})` : '';
return `${w.id}\t${t}${de}`;
}).join('\n');
const user =
`Categories (choose exactly one German name per word):\n${names.join(', ')}\n\n` +
`Classify each vocabulary word into the best-fitting category. If none fits, use "Sonstiges".\n` +
`Words (id<TAB>title):\n${list}\n\n` +
`Reply with JSON only: {"assignments":[{"id":"<id>","category":"<German category name>"}]}`;
let assignments = [];
try {
const data = await messagesCall(system, user, 2000);
assignments = Array.isArray(data.assignments) ? data.assignments : [];
} catch { /* Fehler → ganze Charge bekommt Fallback, damit der Lauf fortschreitet */ }
const byId = new Map(assignments.map(a => [String(a.id), a.category]));
const linkedIds = [];
for (const w of words) {
const name = byId.get(String(w.id));
const cat = (name && byName.get(String(name).toLowerCase())) || fallback;
if (!cat) continue;
await query(
`INSERT INTO word_categories (word_id, category_id) VALUES ($1, $2) ON CONFLICT DO NOTHING`,
[w.id, cat.id]
).catch(() => {});
linkedIds.push(w.id);
}
await rederivePairsForWords(linkedIds);
processed += words.length;
linked += linkedIds.length;
if (!linkedIds.length) break; // Sicherung gegen Endlosschleife (z. B. fehlende Fallback-Kategorie)
}
return { processed, linked };
} finally {
running = false;
}
}
module.exports = { runCategorizationTick, classifyWordsSync, findUncategorizedUsedWords, collectBatch, submitBatch };