--- name: third-party-api-data-sync description: Build a data sync pipeline from a third-party API/system into your own database. Covers schema analysis, field mapping, API client porting, sync modules, and admin endpoints. Use when integrating external data as ground truth. tags: [data-sync, api-integration, etl, schema-mapping, database] triggers: - sync data from external API - pull data from third-party system - integrate external data source - map external schema to our database - build ETL pipeline - port API client from another language --- # Third-Party API Data Sync Pipeline ## When to Use - Integrating a third-party system (SaaS API, partner system) as ground truth for your data - Porting an existing sync pipeline from one codebase/language to another - Building incremental sync with UPSERT logic between two schemas ## Phase 1: Deep Analysis (DO NOT skip) ### 1A. Audit the source system - Clone/access the reference codebase that already integrates with the API - Read ALL sync-related files exhaustively — don't summarize, extract exact field names - Document every API endpoint, auth method, report ID, and field name - Map: `Source Field Name -> Variable Name -> DB Column -> Transform Logic` - Note multi-key fallbacks (APIs often have inconsistent field naming across versions) ### 1B. Audit your own schema - Get EXACT column definitions for every table you'll sync into - Include: column_name, data_type, is_nullable, column_default, constraints, indexes - Get sample data (3-5 rows) to understand real-world values - Note ID generation patterns (sequences vs manual vs UUIDs) ### 1C. Cross-reference mapping (the critical step) - Create a field-by-field comparison table: Source Field -> Our Column -> Match Quality - Identify 3 categories: - ✅ Direct matches (same concept, compatible types) - ⚠️ Similar but divergent (different formats, enums, or naming) - ❌ Gaps (fields they have that we don't, and vice versa) - Document value differences (e.g., they use "JBA", we use "Bishop Arts") ### 1D. Write analysis report - Save as a markdown doc in the project (e.g., `docs/integration-analysis.md`) - Do TWO passes — first pass discovers, second pass audits/corrects findings - This report is the contract for everything that follows ## Phase 2: Schema Migration ### Key principles - Use `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` for safety (idempotent) - New columns should have sensible defaults (NULL or DEFAULT value) - Add `synced_at TIMESTAMPTZ` to every table that will receive sync data - Add a foreign reference column for the source system's ID (e.g., `mt_reservation_id`) - Create new aggregate/snapshot tables if the source has analytics the target doesn't - Create a `sync_log` table for audit trail - Save migration as a numbered SQL file (e.g., `migrations/001_sync_schema.sql`) - Run in a transaction ### Pitfall: ID conflicts - Source system IDs may collide with your auto-increment sequences - NEVER force source IDs into your primary key column - Instead: add a separate column (e.g., `external_id`) with a UNIQUE index - Let your sequences handle `id`, use `external_id` for UPSERT conflict detection ## Phase 3: API Client ### Porting from another language - Read the ENTIRE source file before writing a single line - Match behavior exactly — same auth flow, same retry logic, same response parsing - Key patterns to preserve: - Multi-auth fallback (try API key → OAuth password → refresh token) - Sync vs async fetch (some APIs cap row counts, need async/pagination) - Response format handling (JSON:API, list-of-dicts, headers+rows, S3 download) - Case-insensitive field accessor helper (`_get(row, ...keys)`) - Currency/number parser helper (`safeFloat(val)` — strips $, commas) ### Structure ``` src/sync/ ├── api-client.ts # API client class (auth, fetch, response parsing) ├── constants.ts # Mappings, rates, classification functions ├── sync-{entity}.ts # One file per entity type ├── index.ts # Orchestrator ``` ## Phase 4: Sync Modules ### One module per entity type Each sync module should: 1. Accept a DB pool + array of raw API rows 2. Transform rows using classification helpers from constants.ts 3. Match related records (e.g., customer by email → customer_id) 4. UPSERT using `ON CONFLICT (external_id) DO UPDATE` 5. Batch in groups of 50-100 for performance 6. Log errors per-row but don't stop the sync 7. Return `{ inserted, updated, errors }` counts ### UPSERT pattern (PostgreSQL) ```sql INSERT INTO reservations (col1, col2, ..., synced_at) VALUES ($1, $2, ..., NOW()) ON CONFLICT (external_id) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2, synced_at = NOW() ``` ### Pitfall: Unique constraint violations - Check MAX(id) for ALL unique columns, not just the primary key - Example: `memberships` had `id` (PK, max 2644) and `instance_id` (UNIQUE, max 9652) - Using `MAX(id)+1` for `instance_id` caused duplicate key errors - Solution: query MAX separately for each unique column, or let sequences handle it ### Daily metrics / aggregation module If the source system has aggregate analytics: - Build an aggregation module that processes raw synced data - Use same ID format as source for compatibility - UPSERT into dedicated metrics tables ## Phase 5: Admin API Endpoints ### Standard endpoint set ``` POST /api/admin/sync # Full sync (all entities) POST /api/admin/sync/{entity} # Single entity sync GET /api/admin/sync/status # Check if credentials configured GET /api/admin/sync/log # Sync history from sync_log table GET /api/admin/metrics # Query aggregate metrics ``` ### Key design decisions - Use dynamic import for sync modules (they may fail if credentials aren't set) - Accept `{ days, start_date, end_date }` for date-range syncs - Default to last 7 days for incremental sync - Log every sync attempt to sync_log (started_at, completed_at, rows_synced, errors) ## Phase 6: Credential Setup & Testing ### Don't hardcode credentials - Use environment variables (MT_API_KEY, etc.) - Add a `/api/admin/sync/status` endpoint that reports which auth method is available - Gracefully degrade if credentials aren't set (503, not 500) ### Testing without credentials - All admin endpoints should work (returning empty results or "not configured") - Schema migration should be testable independently - Verify new columns exist, new tables created, indexes in place ## Pitfalls & Lessons Learned 1. **OAuth user tokens ≠ API integration keys — but test before assuming**: A SaaS platform's OAuth password grant (`/o/token/` with username+password) may return a USER-scoped token that works for end-user endpoints but NOT for admin/reporting APIs. However, don't ASSUME this is the case until you've confirmed. In Mariana Tek's case, the password grant tokens DO work for report endpoints — early test failures were caused by stale tokens, not insufficient scope. ALWAYS test the actual data-fetching endpoint with a FRESHLY obtained token, in a single atomic operation (get token → immediately use it), before concluding the auth method doesn't work. 2. **Deferred revenue ≠ sale price**: Accounting fields like `deferred_revenue` track revenue recognition, not what the customer paid. Cross-reference with `order_line_items` or transaction records for actual sale prices. 3. **Field name inconsistency**: APIs often return different field names depending on the report version, fetch method (sync vs async), or even the time of day. The `_get(row, ...keys)` pattern with 3-5 fallback keys per field is essential, not optional. 4. **500-row API limit**: Some APIs cap sync responses. Need async pipeline (start job → poll → download from S3/CDN). Build BOTH sync and async paths from the start. 5. **Classification logic is business logic**: Functions like `classifyBookingSource()` and `classifyClassType()` encode domain knowledge. Extract these into constants.ts, not inline in sync modules. 6. **Two-pass analysis**: First pass finds the data. Second pass audits the findings — catches misunderstandings about field semantics, pricing calculations, and ID generation patterns. 7. **Ground truth principle**: When the external system is ground truth, your sync should UPDATE existing records, not just INSERT new ones. Use UPSERT everywhere. 8. **Unique constraint violations on non-PK columns**: Check MAX() for ALL unique columns, not just the primary key. Example: `memberships` had `id` (PK, max 2644) and `instance_id` (UNIQUE, max 9652). Using `MAX(id)+1` for both caused duplicate key errors. Let sequences handle auto-increment PKs; query MAX separately for other unique columns. 9. **Test auth end-to-end before building**: Don't assume "auth works" because the token endpoint returns 200. Test with the ACTUAL data endpoint you need. Different API scopes, different auth mechanisms, or separate admin keys are common in SaaS platforms (Mariana Tek, Stripe, Shopify all have this pattern). 10. **Token freshness matters**: OAuth tokens can be short-lived. A token that works for one curl call may fail 30 seconds later if you stored it and reused it across multiple test commands. When debugging "auth not provided" errors, always get a FRESH token immediately before each test request. In the Mariana Tek case, repeated tests with stale tokens looked like the API was rejecting the auth method entirely, when it was just token expiry. 11. **Check the SPA/admin app's config for OAuth client credentials**: SaaS platforms with Ember/React admin dashboards often embed OAuth client_id and client_secret in their HTML meta tags or JS config. `curl` the admin page and look for `` tags with auth config — these credentials are "public" (embedded in client-side code) and are needed to get properly-scoped tokens. A bare password grant without client_id/secret may return a limited-scope token. 12. **CDN/CloudFront is usually NOT the problem**: When seeing 401s on GET endpoints but 200s on POST endpoints behind CloudFront, the instinct is to blame header stripping. Check the CORS preflight first — if `access-control-allow-headers` includes `authorization`, CloudFront IS forwarding it. The real issue is almost always token scope or expiry, not infrastructure. 13. **Reference codebase may have dead code paths**: When an existing codebase supports multiple auth modes (API key, password, refresh token), the production deployment may only use ONE of them. Don't assume all paths work equally. Check the deployment's actual environment variables to know which auth mode is active. The "preferred" method in comments may be the only one that works for certain endpoints. 14. **Existing reference app's cron can invalidate shared credentials**: If a reference app runs a cron job that rotates refresh tokens, the token you got from the user/admin dashboard will stop working after the next cron run. Prefer password grant (which generates fresh tokens each time) over refresh tokens that can be rotated out from under you. 15. **PM2 env var gotchas**: When restarting PM2 processes with new environment variables, `pm2 restart` does NOT update env vars. You must `pm2 delete` then re-start, OR use `--update-env`. The safest pattern: save credentials to a `.env.mt` file, then `env $(cat .env.mt | xargs) pm2 start ...`. Also check that DB connection env vars (like DB_PORT) survive process restarts — a dashboard showing "zero data" is often just a stale PM2 process with missing env vars. 16. **Async report jobs can take 2-5+ minutes**: When a SaaS API's sync endpoint caps at N rows and kicks off an async export job, that job runs on THEIR infrastructure and may take minutes. Don't make this a synchronous HTTP request to your own server — it'll timeout. Either: (a) return a job ID immediately and poll from the client, (b) use a background worker, or (c) set very long HTTP timeouts. Exxir's approach: 300-second timeout with 3-second polling intervals. 17. **Don't debug auth with multiple cached curl calls**: When testing OAuth-protected endpoints, NEVER reuse a token variable across multiple separate terminal commands. Each test should get its own fresh token in the same command chain. In the Mariana Tek investigation, 15+ minutes were spent debugging "auth not provided" errors that were simply caused by token expiry between separate curl invocations. The fix: `TOKEN=$(curl -s -X POST .../o/token/ ... | jq -r .access_token) && curl -s .../api/endpoint -H "Authorization: Bearer $TOKEN"`