Change Data Capture (CDC)

Stream database changes in real-time with native CDC connectors.

Table of contents

  1. Overview
    1. Key Features
    2. Supported Databases
    3. Documentation
  2. Quick Start
    1. PostgreSQL
    2. MySQL
  3. Event Format
    1. Operation Types
    2. Key Fields
  4. Snapshots
    1. Snapshot Modes
    2. Configuration
    3. Snapshot Flow
    4. Event Example (Snapshot)
    5. Resumable Snapshots
    6. Best Practices
    7. Snapshot Security
  5. Incremental Snapshots
    1. Signal Table Setup
    2. Configuration
    3. Triggering a Snapshot
    4. How It Works
    5. Watermark Strategy
    6. Event Deduplication
      1. Deduplication Algorithm (DBLog)
      2. Key Deduplication Rules
    7. Parallel Chunk Execution
    8. Programmatic API (Rust)
    9. Advanced: Parallel Chunks with Automatic Deduplication
    10. Advanced: Streaming Event Deduplication
    11. Advanced: Backpressure Control
    12. Best Practices
  6. Event Routing
    1. Configuration
    2. Route Conditions
    3. Routing Flow
  7. Partitioning
    1. Configuration
    2. Partition Strategies
    3. Best Practices
  8. Pipeline Processing
    1. Configuration
    2. Pipeline Stages
    3. Pipeline Flow
  9. Log Compaction
    1. Configuration
    2. Compaction Strategies
    3. Key Strategy
  10. Parallel CDC Processing
    1. Configuration
    2. How It Works
    3. Work Stealing
  11. Transactional Outbox Pattern
    1. Configuration
    2. Outbox Table Schema
    3. How It Works
    4. Best Practices
  12. Transforms
    1. Transform Reference
    2. Example: Flatten and Mask
    3. Example: Content-Based Routing
    4. Example: Conditional Transforms (Predicates)
  13. Sinks
    1. Console (stdout)
    2. S3 / Data Lake
    3. HTTP Webhook
    4. Snowflake
  14. Rate Limiting
  15. Monitoring
    1. Core Metrics
    2. Extended Metrics
    3. Example Prometheus Alerts
  16. Next Steps

Overview

Rivven provides native CDC support for PostgreSQL, MySQL/MariaDB, and SQL Server, enabling real-time change data capture with a lightweight, high-performance Rust implementation.

Key Features

Feature Description
Zero Dependencies Native Rust implementation — no JVM, no external connectors
Standard Event Format Industry-standard CDC envelope format
17 Built-in Transforms Filter, mask, route, and transform events in-flight
Production Ready TLS/mTLS, SCRAM-SHA-256, circuit breakers, rate limiting
Full Observability Comprehensive Prometheus metrics
Health Monitoring Auto-recovery, liveness probes, lag monitoring
Task Lifecycle JoinHandle tracking for all CDC source tasks — panics propagated, graceful shutdown
I/O Timeouts All CDC protocol I/O wrapped in timeouts to prevent hung connections
Notifications Snapshot progress, streaming status, webhook alerts
Automatic Reconnection Exponential backoff with jitter for MySQL binlog reconnection
Binlog Format Validation MySQL source fails fast with a clear error if binlog_format is not ROW — empty-result edge case is handled
Proactive WAL Keepalives PostgreSQL sends periodic standby status updates to prevent slot timeout
Full JSON Type Support MySQL binary JSON columns decoded to structured values (not raw bytes)
Composite PK Snapshots PostgreSQL snapshots support multi-column primary keys

Supported Databases

Database Version Protocol Status
PostgreSQL 10+ Logical replication (pgoutput) ✅ Available
MySQL 5.7+ Binary log with GTID ✅ Available
MariaDB 10.2+ Binary log with MariaDB GTID ✅ Available
SQL Server 2016 SP1+ CDC table polling ✅ Available
Oracle 19c+ LogMiner 📋 Planned

Documentation

Guide Description
PostgreSQL CDC Guide Complete PostgreSQL setup, TLS, signal tables, incremental snapshots
MySQL/MariaDB CDC Guide MySQL and MariaDB binary log replication setup
SQL Server CDC Guide SQL Server CDC setup and configuration
Configuration Reference All CDC configuration options and environment variables
Troubleshooting Guide Diagnose and resolve common issues

Quick Start

PostgreSQL

# rivven-connect.yaml
version: "1.0"

sources:
  orders_db:
    connector: postgres-cdc
    topic: cdc.orders
    config:
      host: localhost
      port: 5432
      database: shop
      user: rivven
      password: ${POSTGRES_PASSWORD}
      slot_name: rivven_slot
      publication_name: rivven_pub
rivven-connect --config rivven-connect.yaml

See PostgreSQL CDC Guide for complete setup instructions.

MySQL

sources:
  orders_db:
    connector: mysql-cdc
    topic: cdc.orders
    config:
      host: localhost
      port: 3306
      database: shop
      user: rivven
      password: ${MYSQL_PASSWORD}
      server_id: 12345

See MySQL/MariaDB CDC Guide for complete setup instructions.


Event Format

CDC events follow the standard CDC envelope format:

{
  "before": null,
  "after": {
    "id": 1001,
    "customer_id": 42,
    "total": 99.99,
    "created_at": "2026-01-25T10:30:00Z"
  },
  "source": {
    "connector": "postgres-cdc",
    "db": "shop",
    "schema": "public",
    "table": "orders",
    "lsn": "0/16B3748",
    "ts_ms": 1737802200000
  },
  "op": "c",
  "ts_ms": 1737802200123
}

Operation Types

op Meaning
c Create (INSERT)
u Update (UPDATE)
d Delete (DELETE)
r Read (snapshot)

Key Fields

Field Description
before Row state before change (null for INSERT)
after Row state after change (null for DELETE)
source Metadata about the source database
op Operation type
ts_ms Event timestamp (milliseconds)

Snapshots

Rivven supports initial snapshots to capture the full state of tables before streaming changes. The snapshot is executed automatically based on the configured mode.

Snapshot Modes

Mode Description
initial Snapshot on first start (when no stored offsets exist)
always Snapshot on every connector start
never Never snapshot — streaming only
when_needed Same as initial
initial_only Snapshot once, then stop (no streaming)
schema_only Capture schema metadata only (no data)
recovery Force re-snapshot for disaster recovery

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      snapshot:
        mode: initial          # Snapshot mode
        batch_size: 10000      # Rows per SELECT batch
        parallel_tables: 4     # Tables to snapshot in parallel
        progress_dir: /var/lib/rivven/snapshot  # Resumable progress
        query_timeout_secs: 60 # SELECT timeout
        throttle_delay_ms: 0   # Delay between batches
        max_retries: 3         # Retry failed batches
        include_tables:        # Only snapshot these tables
          - public.orders
          - public.customers
        exclude_tables:        # Skip these tables
          - public.audit_logs

Snapshot Flow

┌──────────────────────────────────────────────────────────────────────────────┐
│                           Connector Start                                     │
├──────────────────────────────────────────────────────────────────────────────┤
│  1. Check snapshot mode (initial/always/never/etc.)                          │
│  2. Check for stored offsets (prior state)                                   │
│  3. If snapshot needed:                                                      │
│     a. Get watermark (WAL LSN / binlog position)                            │
│     b. SELECT tables in batches (keyset pagination)                         │
│     c. Emit events with op='r' (read/snapshot)                              │
│     d. Save progress for resumability                                        │
│  4. Transition to streaming from watermark position                          │
└──────────────────────────────────────────────────────────────────────────────┘

Event Example (Snapshot)

{
  "before": null,
  "after": {
    "id": 1001,
    "customer_id": 42,
    "total": 99.99
  },
  "source": {
    "connector": "postgres-cdc",
    "snapshot": "true",
    "db": "shop",
    "schema": "public",
    "table": "orders"
  },
  "op": "r",
  "ts_ms": 1737802200000
}

Resumable Snapshots

When progress_dir is configured, snapshot progress is persisted to disk:

$ ls /var/lib/rivven/snapshot/
public.orders.json        # {"table": "public.orders", "last_key": "5000", "rows": 5000}
public.customers.json     # {"table": "public.customers", "last_key": "1000", "rows": 1000}

If the connector restarts during a snapshot, it resumes from the last checkpoint rather than starting over.

Best Practices

  1. Use initial mode for most use cases — snapshots only when needed
  2. Set progress_dir for large tables to enable resumability
  3. Tune batch_size based on row size (10K-50K typical)
  4. Use parallel_tables for multiple small tables
  5. Use exclude_tables to skip audit/log tables
  6. Monitor snapshot_duration_ms metric for performance

Snapshot Security

All snapshot table specifications are validated at construction time:

  • Identifier validation: TableSpec::new() returns Result and validates schema, table, and key column names via Validator::validate_identifier() — rejects SQL injection characters, enforces ^[a-zA-Z_][a-zA-Z0-9_]{0,254}$
  • Defense-in-depth escaping: Snapshot SELECT queries additionally escape identifiers at the query site (backtick-doubling for MySQL, double-quote-doubling for PostgreSQL, bracket-escaping for SQL Server)
  • Parameterized pagination: Keyset pagination values are always passed as parameterized query parameters, never interpolated into SQL

Incremental Snapshots

Rivven supports incremental (non-blocking) snapshots that run while CDC streaming continues. This approach enables:

  • Adding new tables to capture without stopping the connector
  • Re-syncing tables after schema changes
  • Recovering from data inconsistencies
  • Ad-hoc data refresh on demand

Signal Table Setup

Create a signal table in your database:

CREATE TABLE IF NOT EXISTS rivven_signal (
    id VARCHAR(42) PRIMARY KEY,
    type VARCHAR(32) NOT NULL,
    data VARCHAR(2048) NULL
);

Enable the signal table in your CDC publication:

-- PostgreSQL
ALTER PUBLICATION rivven_pub ADD TABLE rivven_signal;

-- MySQL - ensure table has binlog enabled

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      signal:
        enabled: true
        data_collection: public.rivven_signal
        
      incremental_snapshot:
        chunk_size: 1024          # Rows per chunk
        watermark_strategy: insert_delete  # open/close watermark mode
        max_concurrent_chunks: 1  # Chunks to process in parallel

Triggering a Snapshot

Insert a signal row to trigger an incremental snapshot:

-- Snapshot specific tables
INSERT INTO rivven_signal (id, type, data) VALUES (
    'sig-001',
    'execute-snapshot',
    '{"data-collections": ["public.orders", "public.customers"]}'
);

-- Stop an in-progress snapshot
INSERT INTO rivven_signal (id, type, data) VALUES (
    'sig-002',
    'stop-snapshot',
    NULL
);

-- Pause streaming and snapshot
INSERT INTO rivven_signal (id, type, data) VALUES (
    'sig-003',
    'pause-snapshot',
    NULL
);

-- Resume
INSERT INTO rivven_signal (id, type, data) VALUES (
    'sig-004',
    'resume-snapshot',
    NULL
);

How It Works

┌─────────────────────────────────────────────────────────────────────────────┐
│                   Incremental Snapshot Flow                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│  1. Signal INSERT flows through CDC stream                                   │
│  2. Connector detects signal and starts incremental snapshot                 │
│  3. For each chunk:                                                          │
│     a. Open watermark window (INSERT to signal table)                        │
│     b. Execute SELECT for chunk                                              │
│     c. Buffer results locally                                                │
│     d. Close watermark window (DELETE from signal table)                     │
│     e. Deduplicate streaming events against buffer                           │
│     f. Emit remaining buffer entries as op='r' events                        │
│  4. Continue to next chunk until table complete                              │
│  5. Streaming continues uninterrupted throughout                             │
└─────────────────────────────────────────────────────────────────────────────┘

Watermark Strategy

The watermark strategy determines how chunks are bounded:

Strategy Description
insert_delete INSERT/DELETE pairs bracket each chunk (default)
update UPDATE statement brackets chunks

Event Deduplication

During incremental snapshots, the same row may appear in both:

  • The snapshot chunk (op=’r’)
  • The streaming CDC events (op=’c/u/d’)

Rivven automatically deduplicates these events using the DBLog watermark algorithm, ensuring each row is emitted exactly once.

Deduplication Algorithm (DBLog)

┌─────────────────────────────────────────────────────────────────────────────┐
│                    DBLog Watermark Deduplication                             │
├─────────────────────────────────────────────────────────────────────────────┤
│  1. OPEN WINDOW                                                              │
│     - Record timestamp (watermark_ts)                                        │
│     - Insert open watermark to signal table                                  │
│                                                                              │
│  2. SNAPSHOT QUERY                                                           │
│     - SELECT rows for this chunk                                             │
│     - Buffer rows locally with primary keys                                  │
│                                                                              │
│  3. STREAMING CONTINUES (parallel)                                           │
│     - For each streaming event:                                              │
│       • If event.key exists in buffer AND event.ts >= watermark_ts:          │
│         → REMOVE from buffer (streaming wins)                                │
│       • If DELETE event and key in buffer:                                   │
│         → REMOVE from buffer (deletes always win)                            │
│                                                                              │
│  4. CLOSE WINDOW                                                             │
│     - Insert close watermark to signal table                                 │
│     - Emit remaining buffer entries as op='r'                                │
│     - These are rows NOT modified during the window                          │
└─────────────────────────────────────────────────────────────────────────────┘

Key Deduplication Rules

Scenario Action
Snapshot row + no streaming event Emit snapshot row (op=’r’)
Snapshot row + INSERT during window Drop snapshot row, streaming INSERT already emitted
Snapshot row + UPDATE during window Drop snapshot row, streaming UPDATE already emitted
Snapshot row + DELETE during window Drop snapshot row, row no longer exists
Streaming event before window opens Ignore (stale), not in buffer

Parallel Chunk Execution

For high throughput, Rivven supports parallel chunk processing via max_concurrent_chunks:

incremental_snapshot:
  chunk_size: 1024
  max_concurrent_chunks: 4   # Process 4 chunks simultaneously

How Parallel Processing Works:

┌─────────────────────────────────────────────────────────────────────────────┐
│                   Parallel Chunk Execution                                   │
├─────────────────────────────────────────────────────────────────────────────┤
│  Time →                                                                      │
│                                                                              │
│  Chunk 1: [OPEN]────────[QUERY]────────[BUFFER]────────[CLOSE]              │
│  Chunk 2:       [OPEN]────────[QUERY]────────[BUFFER]────────[CLOSE]        │
│  Chunk 3:             [OPEN]────────[QUERY]────────[BUFFER]────────[CLOSE]  │
│  Chunk 4:                   [OPEN]────────[QUERY]────────[BUFFER]───[CLOSE] │
│                                                                              │
│  Streaming Events: ─────────────────────────────────────────────────────►   │
│  (checked against ALL open windows)                                          │
└─────────────────────────────────────────────────────────────────────────────┘

Key Points:

  • Each chunk maintains its own deduplication buffer
  • Streaming events are checked against ALL open windows
  • Total memory = max_concurrent_chunks × max_buffer_memory
  • Higher concurrency = better throughput but more memory

Programmatic API (Rust)

For advanced use cases, use the executor API directly:

use rivven_connect::connectors::cdc_snapshot::{
    IncrementalSnapshotExecutor, IncrementalSnapshotExecutorConfig, SnapshotChunk
};

let executor = IncrementalSnapshotExecutor::new(config);
executor.initialize().await?;
executor.request_snapshot(&["public.orders"]).await?;

// High-level: execute_chunk convenience method
let events = executor.execute_chunk(
    &chunk,
    watermark_ts,
    || async { execute_query_and_return_rows(&chunk) }
).await?;

// Or low-level: full control over deduplication window
while let Some(chunk) = executor.next_chunk().await? {
    // Open window with watermark
    executor.open_window(&chunk, watermark_ts).await?;
    
    // Execute query and buffer rows
    for row in query_chunk(&chunk).await? {
        executor.buffer_row(&chunk.chunk_id, event, key).await;
    }
    
    // Close and get deduplicated events
    let events = executor.close_window(&chunk.chunk_id).await?;
    emit_events(events).await?;
}

// Monitor parallel execution
let stats = executor.buffer_stats_aggregate().await;
println!("Open windows: {}, Total rows: {}", 
    stats.open_windows, stats.total_rows);

Advanced: Parallel Chunks with Automatic Deduplication

Execute multiple chunks simultaneously with a single API call:

// Get batch of chunks
let chunks = vec![chunk1, chunk2, chunk3, chunk4];
let watermark_ts = get_db_timestamp().await?;

// Execute ALL chunks in parallel with automatic deduplication
let all_events = executor.execute_chunks_parallel(
    &chunks,
    watermark_ts,
    |chunk| async move {
        let rows = db.query(&build_chunk_query(&chunk)).await?;
        rows.into_iter()
            .map(|r| (r.id.to_string(), CdcEvent::from_row(&r)))
            .collect::<Result<Vec<_>>>()
    }
).await?;
emit_events(all_events).await?;

Advanced: Streaming Event Deduplication

Automatically deduplicate streaming events during incremental snapshots:

// During streaming while incremental snapshot is active
for event in cdc_stream {
    // Deduplicate streaming events against snapshot buffer
    if executor.is_active() && executor.is_window_open().await {
        executor.process_streaming_event(&event, &["id"]).await;
    }
    
    // Always emit streaming events (they always win)
    emit(event);
}

Advanced: Backpressure Control

Implement automatic backpressure based on memory utilization:

while let Some(chunk) = executor.next_chunk().await? {
    // Wait if memory pressure is high (>80% utilization)
    while executor.should_throttle().await {
        tokio::time::sleep(Duration::from_millis(100)).await;
    }
    
    // Monitor utilization
    let utilization = executor.memory_utilization_percent().await;
    println!("Memory utilization: {:.1}%", utilization);
    
    executor.execute_chunk(&chunk, watermark_ts, query_fn).await?;
}

Best Practices

  1. Keep chunk_size reasonable (1024-4096 rows) to minimize watermark window
  2. Use insert_delete strategy for better compatibility
  3. Monitor chunk processing via metrics
  4. Test signal table setup before production deployment
  5. Avoid very large tables for incremental snapshots — use initial snapshot instead

Event Routing

Route CDC events to different destinations based on content, table, operation, or custom conditions.

Note: There are two kinds of DLQ in Rivven CDC:

  1. Routing DLQ (router.dead_letter_queue) — catches events that don’t match any routing rule (configured below)
  2. Publish DLQ (dead_letter_topic on the source) — catches events that fail all broker publish retries. This is critical for CDC because the replication slot advances past events; without a publish DLQ, a failed publish means permanent data loss. See Connectors — Dead Letter Queue for configuration.

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      router:
        enabled: true
        default_destination: default-topic
        dead_letter_queue: dlq-topic      # Unroutable events go here
        drop_unroutable: false            # Or drop silently
        rules:
          - name: high_priority
            priority: 100
            condition:
              type: And
              conditions:
                - type: Table
                  table: public.orders
                - type: FieldValue
                  field: priority
                  value: high
            destinations: [priority-orders]
            continue_matching: false      # Stop on first match
            
          - name: customer_changes
            priority: 50
            condition:
              type: TablePattern
              pattern: "public\\.customer.*"
            destinations: [customer-events]
            
          - name: deletes_audit
            priority: 10
            condition:
              type: Operation
              op: delete
            destinations: [audit-topic, delete-archive]

Route Conditions

Condition Description
Always Always matches
Table Match specific table name
TablePattern Match table via regex
Schema Match database schema
Operation Match op type (insert/update/delete)
FieldExists Check if field exists
FieldValue Match field to specific value
FieldPattern Match field via regex
Header Match event header value
And Combine multiple conditions with AND
Or Combine multiple conditions with OR
Not Negate a condition

Routing Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│                        Event Routing Flow                                    │
├─────────────────────────────────────────────────────────────────────────────┤
│  1. Event arrives from CDC stream                                            │
│  2. Evaluate rules in priority order (highest first)                         │
│  3. For each matching rule:                                                  │
│     • Add destinations to route list                                         │
│     • If continue_matching=false, stop evaluation                            │
│  4. If no rules match:                                                       │
│     • Route to default_destination (if configured)                           │
│     • Or route to dead_letter_queue                                          │
│     • Or drop if drop_unroutable=true                                        │
│  5. Emit event to all collected destinations                                 │
└─────────────────────────────────────────────────────────────────────────────┘

Partitioning

Control how events are distributed across topic partitions for ordering and parallelism.

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      partitioner:
        enabled: true
        num_partitions: 16
        strategy:
          type: KeyHash           # Hash primary key
          # Or:
          # type: TableHash       # Hash table name
          # type: FullTableHash   # Hash full table path (schema.table)
          # type: RoundRobin      # Distribute evenly
          # type: Sticky          # Same partition per batch

Partition Strategies

Strategy Description Use Case
RoundRobin Distribute events evenly Maximum parallelism
KeyHash Hash primary key Maintain per-row ordering
TableHash Hash table name Keep table events together
FullTableHash Hash schema.table Multi-schema environments
Sticky Same partition per batch Batch locality

Best Practices

  1. Use KeyHash when consumers need per-key ordering
  2. Use TableHash when consumers process entire tables
  3. Set num_partitions to match consumer parallelism
  4. Monitor partition distribution via metrics

Pipeline Processing

Build composable CDC processing pipelines with stages for filtering, transformation, and routing.

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      pipeline:
        enabled: true
        name: order-processing
        dead_letter_queue: dlq-topic
        concurrency: 4
        stages:
          - type: Filter
            condition:
              type: Operation
              op: delete
            
          - type: Transform
            transforms:
              - type: ExtractNewRecordState
              - type: MaskField
                config:
                  fields: [credit_card]
                  
          - type: Route
            rules:
              - condition:
                  type: FieldValue
                  field: status
                  value: urgent
                destinations: [urgent-orders]

Pipeline Stages

Stage Description
Filter Drop events matching condition
Transform Apply SMT transforms
Route Content-based routing

Pipeline Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│  Event → [Filter] → [Transform] → [Route] → Destination                      │
│                                                                              │
│  If any stage fails:                                                         │
│    • Event goes to dead_letter_queue (if configured)                         │
│    • Processing continues with next event                                    │
└─────────────────────────────────────────────────────────────────────────────┘

Log Compaction

Reduce storage and replay time by keeping only the latest state per key.

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      compaction:
        enabled: true
        key_columns: [id]            # Columns forming the compaction key
        min_cleanable_ratio: 0.5     # Trigger compaction at 50% duplicates
        segment_size: 104857600      # 100MB segments
        delete_retention_ms: 86400000 # Keep tombstones for 24 hours
        min_compaction_lag_ms: 0     # Minimum age before compaction
        max_compaction_lag_ms: 0     # Force compaction after this age
        cleanup_policy: compact      # compact, delete, or compact_delete

Compaction Strategies

Strategy Description
compact Keep latest value per key
delete Delete segments after retention
compact_delete Compact, then delete after retention

Key Strategy

Key Strategy Description
PrimaryKey Use table’s primary key
AllColumns Hash all columns
CustomColumns Specify columns via key_columns

Parallel CDC Processing

Process multiple tables concurrently for maximum throughput.

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      parallel:
        enabled: true
        concurrency: 8               # Max concurrent table streams
        per_table_buffer: 1000       # Events to buffer per table
        work_stealing: true          # Rebalance work across threads
        backpressure_threshold: 0.8  # Throttle at 80% buffer utilization
        batch_timeout_ms: 100        # Max time to accumulate batch

How It Works

┌─────────────────────────────────────────────────────────────────────────────┐
│                     Parallel CDC Processing                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌─────────┐   ┌─────────┐   ┌─────────┐   ┌─────────┐                      │
│  │ Table A │   │ Table B │   │ Table C │   │ Table D │                      │
│  └────┬────┘   └────┬────┘   └────┬────┘   └────┬────┘                      │
│       │             │             │             │                            │
│       ▼             ▼             ▼             ▼                            │
│  ┌─────────────────────────────────────────────────────────────┐            │
│  │                   Worker Pool (concurrency=8)                │            │
│  │  ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐   │            │
│  │  │ W1 │ │ W2 │ │ W3 │ │ W4 │ │ W5 │ │ W6 │ │ W7 │ │ W8 │   │            │
│  │  └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘   │            │
│  └─────────────────────────────────────────────────────────────┘            │
│                              │                                               │
│                              ▼                                               │
│                    ┌──────────────────┐                                     │
│                    │   Merged Output   │                                     │
│                    └──────────────────┘                                     │
└─────────────────────────────────────────────────────────────────────────────┘

Work Stealing

When work_stealing: true, idle workers steal work from busy workers:

Worker 1: [████████████████████] ← heavy table
Worker 2: [████]                 ← light table, steals from Worker 1
Worker 3: [██████]               ← medium table
Worker 4: [██]                   ← light table, steals from Worker 1

Transactional Outbox Pattern

Reliably publish events from your application using the transactional outbox pattern.

Configuration

sources:
  orders_db:
    connector: postgres-cdc
    config:
      # ... connection config ...
      
      outbox:
        enabled: true
        table_name: outbox          # Outbox table name
        poll_interval_ms: 1000      # Polling interval
        batch_size: 100             # Events per batch
        max_retries: 3              # Retry failed events
        retry_delay_ms: 1000        # Delay between retries
        delete_after_publish: true  # Clean up processed events

Outbox Table Schema

CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,    -- e.g., "Order", "Customer"
    aggregate_id VARCHAR(255) NOT NULL,      -- Business key
    event_type VARCHAR(255) NOT NULL,        -- e.g., "OrderCreated"
    payload JSONB NOT NULL,                  -- Event data
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    published BOOLEAN DEFAULT FALSE,
    retries INTEGER DEFAULT 0
);

-- Index for efficient polling
CREATE INDEX idx_outbox_unpublished ON outbox (timestamp) WHERE NOT published;

How It Works

┌─────────────────────────────────────────────────────────────────────────────┐
│                    Transactional Outbox Flow                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  Application                      Rivven CDC                                 │
│  ┌─────────────────────┐         ┌─────────────────────┐                    │
│  │  BEGIN TRANSACTION  │         │                     │                    │
│  │  INSERT INTO orders │         │  1. Poll outbox     │                    │
│  │  INSERT INTO outbox │◄────────│  2. Read events     │                    │
│  │  COMMIT             │         │  3. Publish to topic│                    │
│  └─────────────────────┘         │  4. Mark published  │                    │
│                                  │  5. (Optional) Delete│                    │
│                                  └─────────────────────┘                    │
│                                                                              │
│  Benefits:                                                                   │
│  • Atomic: event inserted in same TX as business data                       │
│  • Reliable: no events lost if app crashes after commit                     │
│  • Ordered: events processed in timestamp order                             │
└─────────────────────────────────────────────────────────────────────────────┘

Best Practices

  1. Use CDC on the outbox table for lowest latency
  2. Set reasonable batch_size to balance throughput and latency
  3. Enable delete_after_publish to prevent table bloat
  4. Add index on unpublished events for efficient polling
  5. Monitor outbox lag to detect publishing issues

Transforms

Rivven provides 17 built-in Single Message Transforms (SMTs) for in-flight data manipulation.

Transform Reference

Transform Description
ExtractNewRecordState Flatten envelope to just after state
MaskField Redact sensitive fields
ReplaceField Rename, include, or exclude fields
InsertField Add static or computed fields
Filter Drop events based on condition
Cast Convert field types
Flatten Flatten nested structures
TimestampConverter Convert timestamp formats
TimezoneConverter Convert between timezones
RegexRouter Route based on regex patterns
ContentRouter Route based on field values
ValueToKey Extract key fields from value
HeaderToValue Move envelope fields into record
Unwrap Extract nested field to top level
ComputeField Compute new fields
SetNull Conditionally nullify fields

Any transform supports a predicate: block for conditional application (see Conditional Transforms below).

Example: Flatten and Mask

sources:
  orders:
    connector: postgres-cdc
    topic: cdc.orders
    transforms:
      - type: ExtractNewRecordState
      - type: MaskField
        config:
          fields: [credit_card, ssn]
          mask_char: "*"
      - type: ReplaceField
        config:
          exclude: [internal_notes]

Example: Content-Based Routing

transforms:
  - type: ContentRouter
    config:
      field: priority
      routes:
        high: priority-orders
        normal: standard-orders
      default: other-orders

Example: Conditional Transforms (Predicates)

Apply transforms only to events matching specific conditions. This is essential when a single connector captures multiple tables with different processing needs:

sources:
  multi_table_cdc:
    connector: postgres-cdc
    topic: cdc.events
    config:
      tables:
        - public.users
        - public.orders
        - public.documents

    transforms:
      # Mask PII only for the users table
      - type: MaskField
        predicate:
          table: "users"
        config:
          fields: [ssn, credit_card]

      # Convert timestamps only for orders
      - type: TimestampConverter
        predicate:
          table: "orders"
        config:
          fields: [created_at, shipped_at]
          format: iso8601

      # Externalize large blobs only on insert/update, not delete
      - type: ExternalizeBlob
        predicate:
          table: "documents"
          operations: [insert, update]
        config:
          storage_type: s3
          bucket: doc-blobs

      # Flatten nested JSON for all tables EXCEPT orders
      - type: Flatten
        predicate:
          table: "orders"
          negate: true
        config:
          delimiter: "."

      # Extract new record state for everything (no predicate)
      - type: ExtractNewRecordState
        config:
          add_table: true
          add_op: true

Transforms without a predicate apply to all events. Predicates support table, schema, operations, field_exists, field_value, and negate. See CDC Configuration for the full reference.


Sinks

Route CDC events to various destinations.

Console (stdout)

sinks:
  console:
    connector: stdout
    topics: [cdc.orders]
    consumer_group: console-sink
    config:
      format: json
      pretty: true

S3 / Data Lake

sinks:
  data_lake:
    connector: s3
    topics: [cdc.orders]
    consumer_group: s3-sink
    config:
      bucket: my-data-lake
      prefix: cdc/orders
      region: us-east-1
      format: jsonl
      partition_by: day
      compression: gzip
      batch_size: 1000

HTTP Webhook

sinks:
  webhook:
    connector: http-webhook
    topics: [cdc.orders]
    consumer_group: webhook-sink
    config:
      url: https://api.example.com/events
      method: POST
      headers:
        Authorization: "Bearer ${API_TOKEN}"
      batch_size: 100
      timeout_secs: 30

Snowflake

sinks:
  warehouse:
    connector: snowflake
    topics: [cdc.orders]
    consumer_group: snowflake-sink
    config:
      account: myorg-account123
      user: RIVVEN_USER
      private_key_path: /path/to/rsa_key.p8
      database: MY_DATABASE
      schema: MY_SCHEMA
      table: MY_TABLE
      batch_size: 1000

Rate Limiting

Prevent overwhelming downstream systems:

sinks:
  s3:
    connector: s3
    rate_limit:
      events_per_second: 10000
      burst_capacity: 1000
    config:
      # ...

Monitoring

Rivven CDC provides comprehensive Prometheus metrics for full observability.

Core Metrics

Metric Description
rivven_cdc_events_total Total events processed
rivven_cdc_lag_milliseconds Replication lag
rivven_cdc_errors_total Error count
rivven_cdc_connected Connection status (1/0)

Extended Metrics

Category Example Metrics
Snapshot snapshot_running, snapshot_duration_ms, snapshot_rows_scanned
Streaming streaming_lag_ms, create_events, update_events, delete_events
Incremental Snapshot incremental_snapshot_chunks_processed, incremental_snapshot_rows_captured
Performance processing_time_p99_us, average_batch_size, batches_processed
Dead Letter Queue events_dead_lettered — events routed to DLQ after publish retry exhaustion

See Troubleshooting Guide for alert rules and health checks.

Example Prometheus Alerts

groups:
- name: rivven-cdc
  rules:
  - alert: CDCLagHigh
    expr: rivven_cdc_lag_milliseconds > 10000
    for: 5m
    labels:
      severity: warning

  - alert: CDCDisconnected
    expr: rivven_cdc_connected == 0
    for: 1m
    labels:
      severity: critical

Next Steps


Table of contents


Back to top

Copyright © 2026 Rivven Contributors. Licensed under the Apache License 2.0.