--- name: llm-structured-data-pipeline description: Build multi-phase LLM pipelines that extract structured JSON data, chain phases with DB persistence, and handle truncation/race conditions. Patterns from building AI video production pipelines with sequential LLM analysis steps. version: 1.0.0 tags: [llm, json, pipeline, structured-data, anthropic, openrouter, sqlite] metadata: hermes: tags: [llm, json, pipeline, structured-data, anthropic, openrouter, sqlite] --- # LLM Structured Data Pipeline Patterns for building multi-phase apps where each phase calls an LLM to extract/generate structured JSON, stores results in a database, and feeds into the next phase. ## When to Use - App has sequential LLM analysis phases (e.g., generate → analyze → segment → scene-define → shot-breakdown) - Each phase produces structured JSON that gets stored in a relational DB - Later phases depend on earlier phase outputs (timestamps, IDs, entity names) - Long-running async operations where users may trigger re-runs mid-flight ## Pattern 1: Robust JSON Extraction from LLM LLMs frequently return slightly invalid JSON — markdown fences, truncated output, trailing commas. Always post-process. ```javascript function safeParseJSON(text) { // Step 1: Direct parse try { return JSON.parse(text) } catch {} // Step 2: Strip markdown fences let cleaned = text.trim() if (cleaned.startsWith('```')) { cleaned = cleaned.replace(/^```(?:json)?\n?/, '').replace(/\n?```$/, '') try { return JSON.parse(cleaned) } catch {} } // Step 3: Extract JSON from prose preamble (model wrote "Looking at the scene..." before JSON) const arrayStart = cleaned.indexOf('[') const objStart = cleaned.indexOf('{') const jsonStart = (arrayStart >= 0 && (objStart < 0 || arrayStart < objStart)) ? arrayStart : objStart if (jsonStart > 0) { cleaned = cleaned.substring(jsonStart) try { return JSON.parse(cleaned) } catch {} } // Step 4: Repair truncated JSON const lastBrace = cleaned.lastIndexOf('}') if (lastBrace > 0) { let fixed = cleaned.substring(0, lastBrace + 1) if (!fixed.trim().endsWith(']')) fixed += ']' try { return JSON.parse(fixed) } catch {} } throw new Error(`Invalid JSON from LLM: ${text.substring(0, 100)}...`) } ``` **Critical**: Fallback LLM providers (OpenRouter) often return prose before JSON — "Looking at the scene, here are the shots:" followed by JSON. Step 3 handles this by finding the first `[` or `{`. Also add retry logic: if JSON parse fails, call the LLM again (up to 2 attempts). ### Key Pitfalls 1. **max_tokens too low** — OpenRouter and Anthropic both truncate silently. Structured JSON for scene breakdowns with detailed prompts can easily exceed 4096 tokens. Use 8192+ for any response containing multiple objects with long text fields. 2. **OpenRouter inherits max_tokens** — If your fallback function hardcodes max_tokens, it overrides what the caller requested. Always pass through: `max_tokens: params.max_tokens || 8192` 3. **Variable names in JSON repair** — Don't reuse the variable name `text` if the outer scope already has one. Use `parsed`, `jsonText`, `cleaned` etc. to avoid patch tool matching issues. ## Pattern 2: Computed Fields from Cross-Phase Data LLMs often omit fields they were asked for, especially numeric ones like timestamps. Always compute critical fields from authoritative data rather than trusting the LLM. ```javascript // LLM returns scene with start_segment_index/end_segment_index // but may omit start_time_ms/end_time_ms let startMs = scene.start_time_ms let endMs = scene.end_time_ms // Compute from segment indices (authoritative source) if LLM didn't include them if ((startMs === undefined || startMs === null) && scene.start_segment_index !== undefined) { const startSeg = segments[scene.start_segment_index] const endSeg = segments[Math.min(scene.end_segment_index, segments.length - 1)] if (startSeg) startMs = startSeg.start_time_ms if (endSeg) endMs = endSeg.end_time_ms } ``` **Rule:** Audio timestamps are the source of truth. Scene timestamps derive from segment timestamps. Shot timestamps derive from scene timestamps. Never trust LLM-generated timestamps when you have computed ones available. ## Pattern 3: Race Condition Guards for Async Chains When phase N is running async (LLM calls take 5-30s each), the user may trigger phase N-1 again, which deletes and recreates the data that phase N depends on. This causes foreign key constraint errors. ```javascript for (const scene of scenes) { const shots = await breakdownShots(scene, ...) // Takes 10-30 seconds // Scene may have been deleted + recreated while we were waiting const stillExists = db.prepare('SELECT id FROM scenes WHERE id = ?').get(scene.id) if (!stillExists) { console.log(`Scene ${scene.scene_number} was re-defined during breakdown, skipping`) continue } // Safe to insert shots now for (const shot of shots) { insertShot.run(shotId, scene.id, ...) } } ``` ## Pattern 4: Entity Name Matching Between Phases When the LLM extracts entities (characters, locations, props) in phase 1, later phases must reference them by exact name. Build name→ID maps and use fuzzy matching as fallback: ```javascript // Build maps from DB records const charMap = {} characters.forEach(c => { charMap[c.name] = c.id }) // LLM returns character names — look up IDs for (const name of scene.characters_present || []) { const id = charMap[name] if (id) insertSceneChar.run(sceneId, id) // else: LLM used a slightly different name — log and skip } ``` **Tip:** In the LLM prompt, always list the exact available names: `Available characters: ${charNames}`. This dramatically reduces name mismatches. ## Pattern 5: Cascading Deletes for Re-analysis When a user re-runs an earlier phase, all downstream data must be cleared. Order matters with foreign keys: ```javascript // Clear in reverse dependency order db.prepare('DELETE FROM scene_characters WHERE scene_id IN (SELECT id FROM scenes WHERE project_id = ?)').run(projectId) db.prepare('DELETE FROM scene_props WHERE scene_id IN (SELECT id FROM scenes WHERE project_id = ?)').run(projectId) db.prepare('DELETE FROM scene_segments WHERE scene_id IN (SELECT id FROM scenes WHERE project_id = ?)').run(projectId) db.prepare('DELETE FROM shots WHERE project_id = ?').run(projectId) db.prepare('DELETE FROM scenes WHERE project_id = ?').run(projectId) ``` ## Checklist - [ ] All LLM JSON responses go through safeParseJSON (strip fences + repair truncation) - [ ] max_tokens set to 8192+ for any structured response with multiple objects - [ ] Numeric fields (timestamps, durations) computed from authoritative data, not LLM output - [ ] Async operations check that parent records still exist before inserting children - [ ] Entity names listed explicitly in LLM prompts for cross-phase consistency - [ ] Cascading deletes clear junction tables before parent tables - [ ] Re-run operations are idempotent (clear old data before inserting new)