CDC Configuration Reference

Complete reference for all CDC configuration options.

Table of contents

  1. Configuration Methods
    1. Rust Builder API
    2. YAML Configuration
    3. Environment Variables
  2. Common Options
    1. Core Settings
    2. Topic Routing
      1. Supported Placeholders
      2. Examples
      3. Topic Name Normalization
        1. Case Conversion Modes
        2. Avro-Compatible Mode
        3. Prefix/Suffix Stripping
        4. Production-Ready Configuration
        5. Disabling Normalization
      4. Topic Routing vs Content-Based Routing
    3. Table Selection
    4. Column Selection
    5. Event Filtering
  3. Snapshot Settings
    1. Configuration
    2. Snapshot Modes
    3. Example
    4. Progress Persistence
  4. Performance Settings
    1. Throughput
    2. Memory
    3. Backpressure
  5. Resilience Settings
    1. Retry Configuration
    2. Guardrails
  6. TLS Settings
    1. TLS Modes
  7. Event Format Settings
    1. Output Format
    2. Format Options
    3. Envelope Styles
    4. Schema Settings
  8. Type Handling
    1. Type Handling Options
  9. Field-Level Encryption
    1. Encryption Configuration
    2. Example
    3. Key Management
    4. Security Features
  10. Deduplication
    1. Deduplication Configuration
    2. Example
    3. How It Works
  11. Tombstone Handling
    1. Tombstone Configuration
    2. Example
    3. Why Tombstones Matter
  12. Heartbeat Configuration
    1. Why Heartbeats Matter
    2. Heartbeat Options
    3. Example
    4. Monitoring Heartbeat Health
  13. Initial and Incremental Snapshots
    1. Snapshot Modes
    2. Snapshot Options
    3. Example Configuration
    4. Progress Persistence
      1. File-Based Progress (Default)
      2. Broker-Based Progress (Distributed)
    5. Architecture
    6. Keyset Pagination
    7. Incremental Snapshots (Ad-hoc)
  14. Single Message Transforms (SMT)
    1. Available Transforms
    2. Conditional Transform Application (Predicates)
      1. Basic Predicate Options
      2. Example: Per-Table Transforms
      3. Example: Multiple Tables with Regex
      4. Example: Operation-Based Predicates
      5. Example: Combined Predicates (AND Logic)
      6. Example: Negated Predicate
      7. Example: Schema-Based Filtering
      8. Example: Field-Based Predicates
      9. Example: Predicates with Different Transform Types
    3. Configuration Reference
      1. extract_new_record_state
      2. mask_field
      3. timestamp_converter
      4. filter
      5. cast
      6. flatten
      7. insert_field
      8. replace_field / rename_field
      9. value_to_key
      10. regex_router
      11. timezone_converter
      12. content_router
      13. header_to_value
      14. unwrap
      15. set_null
      16. compute_field
      17. externalize_blob
    4. Transform Pipeline Example
  15. Monitoring Settings
  16. PostgreSQL-Specific Options
    1. PostgreSQL Signal Table
  17. MySQL-Specific Options
    1. MariaDB Options
  18. Environment Variable Reference
  19. Full Configuration Example
  20. Event Routing
    1. Configuration
    2. Route Rule Options
    3. Condition Types
  21. Partitioning
    1. Configuration
    2. Partition Strategies
  22. Pipeline Processing
    1. Configuration
    2. Stage Types
  23. Log Compaction
    1. Configuration
  24. Parallel CDC Processing
    1. Configuration
  25. Transactional Outbox Pattern
    1. Configuration
    2. Outbox Table Schema
  26. Health Monitoring
    1. Configuration
    2. Health States
    3. Auto-Recovery Flow
    4. Prometheus Metrics
    5. Kubernetes Probes
    6. Custom Health Checks
  27. Notifications
    1. Configuration
    2. Channel Types
    3. Notification Types
    4. Webhook Payload
  28. Next Steps

Configuration Methods

Rust Builder API

use rivven_cdc::{CdcConfig, PostgresCdcConfig, Snapshot};

let config = PostgresCdcConfig::builder()
    .host("localhost")
    .port(5432)
    .user("rivven")
    .password("secret")
    .database("mydb")
    .slot_name("rivven_slot")
    .publication_name("rivven_pub")
    .snapshot(Snapshot::Initial)
    .build()?;

YAML Configuration

version: "1.0"

sources:
  my_source:
    connector: postgres-cdc
    topic: cdc.events
    config:
      host: localhost
      port: 5432
      # ... other options

Environment Variables

All configuration options can be set via environment variables:

export RIVVEN_CDC_HOST=localhost
export RIVVEN_CDC_PORT=5432
export RIVVEN_CDC_PASSWORD=${DB_PASSWORD}

Common Options

These options apply to all CDC connectors.

Core Settings

Option Type Default Description
name string required Unique connector name
topic string required Fallback/default topic for events
enabled bool true Enable/disable connector

Topic Routing

Topic routing enables dynamic topic selection based on CDC event metadata, following industry-standard topic naming conventions.

topic_routing is a CDC-specific option and must be configured inside the connector’s config: section, not at the source level.

Supported Placeholders

Placeholder Description Example Value
{database} Database name mydb
{schema} Schema name public
{table} Table name users

Examples

Basic Pattern:

sources:
  postgres:
    connector: postgres-cdc
    topic: cdc.default         # Fallback topic
    config:
      # ... postgres config
      topic_routing: "cdc.{schema}.{table}"

Events from public.users → topic cdc.public.users
Events from inventory.products → topic cdc.inventory.products

With Database:

sources:
  postgres:
    connector: postgres-cdc
    topic: cdc.default
    config:
      # ... postgres config
      topic_routing: "{database}.{schema}.{table}"

Events from mydb.public.users → topic mydb.public.users

Simple Table-Based:

sources:
  postgres:
    connector: postgres-cdc
    topic: events
    config:
      # ... postgres config
      topic_routing: "events.{table}"

Events from public.orders → topic events.orders

The topic field is required as a fallback for error cases or when CDC metadata is unavailable.

Topic Name Normalization

Database identifiers (schema/table names) often contain characters that are invalid in topic names. Rivven provides comprehensive normalization following industry standards with additional enhancements.

Topic Name Restrictions:

  • Valid characters: a-z, A-Z, 0-9, ., _, -
  • Maximum length: 249 characters

Basic Normalization (Default):

Original Identifier Normalized Reason
my schema my_schema Spaces replaced
user@data user_data Special characters replaced
order#items order_items Invalid characters replaced
schema.with.dots schema_with_dots Dots replaced (avoid extra segments)

Hyphens (-) are preserved since they are valid in topic names.

Case Conversion Modes

Control how identifier casing is transformed:

Mode Input Output Use Case
none (default) MyTable MyTable Preserve original casing
lower MyTable mytable Simple lowercase conversion
upper MyTable MYTABLE Uppercase conversion
snake_case UserProfiles user_profiles Convert PascalCase/camelCase to snake_case
kebab-case UserProfiles user-profiles Convert PascalCase/camelCase to kebab-case

YAML Configuration:

sources:
  postgres:
    connector: postgres-cdc
    topic: cdc.default
    config:
      topic_routing: "cdc.{schema}.{table}"
      normalization:
        case_conversion: snake_case  # Options: none, lower, upper, snake_case, kebab-case

Rust API:

use rivven_connect::{TopicResolver, CaseConversion, NormalizationConfig};

// Simple snake_case conversion
let resolver = TopicResolver::builder("cdc.{schema}.{table}")
    .snake_case()
    .build()?;

// Or use explicit case conversion
let resolver = TopicResolver::builder("cdc.{schema}.{table}")
    .normalization(
        NormalizationConfig::new()
            .with_case_conversion(CaseConversion::SnakeCase)
    )
    .build()?;

// Result: "UserProfiles" → "user_profiles"
Avro-Compatible Mode

When using Avro serialization, identifiers must follow stricter naming rules:

  • Must start with a letter or underscore
  • Only letters, digits, and underscores allowed
  • Hyphens are NOT valid in Avro names

Enable Avro-compatible mode if you’re using Avro serialization with Schema Registry.

YAML Configuration:

sources:
  postgres:
    connector: postgres-cdc
    topic: cdc.default
    config:
      topic_routing: "cdc.{schema}.{table}"
      normalization:
        avro_compatible: true

Rust API:

let resolver = TopicResolver::builder("cdc.{schema}.{table}")
    .avro_compatible()
    .build()?;

// Results:
// "my-table" → "my_table" (hyphens replaced)
// "123users" → "_123users" (leading digit prefixed)
Input Avro Output Reason
my-table my_table Hyphens not allowed in Avro
123users _123users Cannot start with digit
user.data user_data Dots not allowed
Prefix/Suffix Stripping

Remove common database-specific prefixes or suffixes from identifiers:

Common Use Cases:

  • Strip dbo_ schema prefix (SQL Server)
  • Strip public_ schema prefix (PostgreSQL)
  • Strip _table suffix patterns

YAML Configuration:

sources:
  postgres:
    connector: postgres-cdc
    topic: cdc.default
    config:
      topic_routing: "cdc.{schema}.{table}"
      normalization:
        strip_prefixes:
          - "dbo_"
          - "public_"
        strip_suffixes:
          - "_table"
          - "_tbl"

Rust API:

let resolver = TopicResolver::builder("cdc.{schema}.{table}")
    .strip_prefixes(vec!["dbo_", "public_"])
    .strip_suffixes(vec!["_table"])
    .build()?;

// Results:
// "dbo_users" → "users"
// "public_orders" → "orders"
// "customers_table" → "customers"

Prefix/suffix matching is case-insensitive. Both dbo_users and DBO_USERS will have the prefix stripped.

Production-Ready Configuration

Combine multiple normalization options for production environments:

YAML Configuration:

sources:
  postgres:
    connector: postgres-cdc
    topic: cdc.default
    config:
      topic_routing: "{database}.{schema}.{table}"
      normalization:
        case_conversion: snake_case
        avro_compatible: true
        strip_prefixes:
          - "dbo_"
        replacement_char: "_"
        truncate_long_names: true
        include_hash_suffix: true

Rust API:

let resolver = TopicResolver::builder("{database}.{schema}.{table}")
    .snake_case()
    .avro_compatible()
    .strip_prefixes(vec!["dbo_"])
    .build()?;

// Full pipeline:
// "SalesDB", "dbo_MySchema", "UserProfiles"
// → After strip prefix: "MySchema"
// → After snake_case: "my_schema"
// → After Avro: "my_schema" (already valid)
// → Final: "sales_db.my_schema.user_profiles"
Disabling Normalization

If you need exact topic names without any transformation:

let resolver = TopicResolver::builder("cdc.{schema}.{table}")
    .no_normalization()
    .build()?;

Disabling normalization may produce invalid topic names if your database identifiers contain special characters.

Long Topic Names:

When the resolved topic name exceeds 249 characters, it is automatically truncated with a hash suffix to ensure uniqueness:

very_long_prefix.very_long_schema.very_long_table_name...
→ truncated to: very_long_prefix.very_long_schema.ver..._a1b2c3d4

Topic Routing vs Content-Based Routing

Rivven provides two complementary routing mechanisms:

Mechanism Use Case Configuration
Topic Routing Route by source (table, schema, database) topic_routing pattern (in connector config)
Event Router Route by content (field values, operation type) Programmatic API

Topic Routing (config-driven) is ideal for per-table topics:

config:
  topic_routing: "cdc.{schema}.{table}"
  # public.users → cdc.public.users
  # public.orders → cdc.public.orders

Event Router (programmatic) is for complex content-based routing:

use rivven_cdc::common::router::{EventRouter, RouteRule, RouteCondition};

let router = EventRouter::builder()
    .route(RouteRule::new(
        "high-value",
        RouteCondition::field_equals("priority", json!("high")),
        "priority-orders"
    ))
    .route(RouteRule::operation_match(CdcOp::Delete, "audit-deletes"))
    .default_destination("default-events")
    .build();

Table Selection

Option Type Default Description
tables list [] Tables to include (empty = all)
exclude_tables list [] Tables to exclude
table_filter regex - Regex pattern for table names
schema_filter regex - Regex pattern for schema names

Example:

config:
  tables:
    - public.orders
    - public.customers
  exclude_tables:
    - "*_backup"
    - "*_archive"
    - "temp_*"

Column Selection

Option Type Default Description
columns map - Columns to include per table
exclude_columns map - Columns to exclude per table
column_masks list [] Columns to mask/redact

Example:

config:
  columns:
    public.users:
      - id
      - email
      - created_at
  exclude_columns:
    public.users:
      - internal_notes
  column_masks:
    - public.users.ssn
    - public.customers.credit_card

Event Filtering

Option Type Default Description
operations list [insert, update, delete] Operations to capture
filter_expression string - SQL-like filter expression

Example:

config:
  operations:
    - insert
    - update
  filter_expression: "amount > 100 AND status != 'draft'"

Snapshot Settings

Initial snapshots capture the full state of tables before streaming begins. This is essential for ensuring downstream systems have complete data.

Configuration

Option Type Default Description
snapshot.mode enum initial Snapshot behavior (see modes below)
snapshot.batch_size u64 10000 Rows per SELECT batch
snapshot.parallel_tables u32 4 Tables to snapshot concurrently
snapshot.progress_dir path - Directory for progress persistence
snapshot.query_timeout_secs u64 300 Query timeout in seconds
snapshot.throttle_delay_ms u64 0 Delay between batches (backpressure)
snapshot.max_retries u32 3 Retries per failed batch
snapshot.include_tables list [] Tables to include (empty = all)
snapshot.exclude_tables list [] Tables to exclude

Snapshot Modes

Mode Description
initial Snapshot on first start, then stream (default)
always Full snapshot on every start
never No snapshot, stream only
when_needed Snapshot only if no stored offsets
initial_only Snapshot only, no streaming after
schema_only Capture schema structure, no data
recovery Force re-snapshot for disaster recovery

Example

connector_type: postgres-cdc
config:
  host: localhost
  database: mydb
  user: replicator
  password: ${DB_PASSWORD}
  slot_name: rivven_slot
  publication_name: rivven_pub
  
  # Snapshot configuration
  snapshot:
    mode: initial
    batch_size: 50000
    parallel_tables: 4
    progress_dir: /var/lib/rivven/snapshot
    query_timeout_secs: 600
    include_tables:
      - public.users
      - public.orders
    exclude_tables:
      - public.audit_log

Progress Persistence

When progress_dir is set, snapshot progress is persisted to JSON files. This enables:

  • Resumable snapshots: If interrupted, resume from last completed batch
  • Incremental re-snapshots: Skip already-captured tables
  • Debugging: Inspect progress files for troubleshooting
# Progress file structure
/var/lib/rivven/snapshot/
  public.users.json    # {"table":"public.users","last_key":"1000","state":"in_progress"}
  public.orders.json   # {"table":"public.orders","last_key":null,"state":"completed"}

Performance Settings

Throughput

Option Type Default Description
batch_size u32 1000 Events per batch
batch_timeout duration 100ms Max wait for batch
max_queue_size u32 10000 Internal queue limit
workers u32 4 Processing workers

Memory

Option Type Default Description
max_memory_mb u32 512 Memory limit
buffer_size u32 8192 Network buffer size

Backpressure

Option Type Default Description
backpressure.enabled bool true Enable backpressure
backpressure.high_watermark u32 8000 Pause threshold
backpressure.low_watermark u32 2000 Resume threshold

Example:

config:
  batch_size: 5000
  batch_timeout: 200ms
  max_queue_size: 20000
  workers: 8
  
  backpressure:
    enabled: true
    high_watermark: 15000
    low_watermark: 5000

Resilience Settings

Retry Configuration

Option Type Default Description
retry.max_attempts u32 10 Max retry attempts
retry.initial_delay duration 100ms First retry delay
retry.max_delay duration 30s Max retry delay
retry.multiplier f64 2.0 Exponential backoff
retry.jitter f64 0.1 Jitter factor (0-1)

Guardrails

Option Type Default Description
guardrails.max_lag duration 5m Max acceptable lag
guardrails.max_queue_lag duration 1m Max queue lag
guardrails.max_batch_size u32 100000 Max events per batch
guardrails.max_event_size_bytes u64 10485760 Max single event (10MB)

Example:

config:
  retry:
    max_attempts: 15
    initial_delay: 50ms
    max_delay: 60s
    multiplier: 2.5
    jitter: 0.2
    
  guardrails:
    max_lag: 10m
    max_queue_lag: 2m
    max_batch_size: 50000
    max_event_size_bytes: 52428800  # 50MB

TLS Settings

Option Type Default Description
tls.mode enum prefer TLS mode
tls.ca_cert_path path - CA certificate file
tls.ca_cert_pem string - CA certificate PEM (inline)
tls.client_cert_path path - Client cert for mTLS
tls.client_key_path path - Client key for mTLS
tls.verify_hostname bool true Verify server hostname
tls.sni_hostname string - SNI hostname override

TLS Modes

Mode Description
disable No TLS
prefer Use TLS if available
require TLS required, no verification
verify-ca Verify CA certificate
verify-identity Verify CA and hostname

Example:

config:
  tls:
    mode: verify-identity
    ca_cert_path: /etc/ssl/certs/ca.pem
    client_cert_path: /etc/ssl/certs/client.pem
    client_key_path: /etc/ssl/private/client.key
    verify_hostname: true

Event Format Settings

Output Format

Option Type Default Description
format enum json Event format
envelope enum standard Message envelope
key_format enum json Key serialization

Format Options

Format Description
json JSON format
avro Apache Avro
protobuf Protocol Buffers

Envelope Styles

Envelope Description
standard Industry-standard CDC format
plain Just the data, no metadata
full Rivven’s extended format

Example:

config:
  format: avro
  envelope: standard
  key_format: json

Schema Settings

Option Type Default Description
schema_registry_url string - Schema registry URL
schema_cache_size u32 1000 Cache entries
include_schema bool false Include schema in events

Type Handling

Option Type Default Description
decimal_handling enum precise Decimal conversion
time_precision enum microseconds Timestamp precision
binary_handling enum base64 Binary data encoding
uuid_handling enum string UUID representation

Type Handling Options

Option Values Description
decimal_handling precise, double, string Decimal representation
time_precision seconds, milliseconds, microseconds, nanoseconds Timestamp precision
binary_handling base64, hex, bytes Binary encoding
uuid_handling string, binary UUID format

Example:

config:
  decimal_handling: string  # Preserve precision
  time_precision: microseconds
  binary_handling: base64
  uuid_handling: string

Field-Level Encryption

Encrypt sensitive fields at the source using AES-256-GCM or ChaCha20-Poly1305 encryption. This provides end-to-end encryption for PII, PCI, and other sensitive data.

Encryption Configuration

Option Type Default Description
cdc_features.encryption.enabled bool false Enable field encryption
cdc_features.encryption.fields list [] Fields to encrypt

Example

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      # ... connection settings ...
      
      cdc_features:
        encryption:
          enabled: true
          fields:
            - credit_card
            - ssn
            - bank_account
            - tax_id

Key Management

Encryption keys are managed via environment variables:

# Set the master encryption key (256-bit for AES-256-GCM / ChaCha20-Poly1305)
export RIVVEN_ENCRYPTION_KEY="your-base64-encoded-32-byte-key"

Security Features

  • AES-256-GCM / ChaCha20-Poly1305: Industry-standard authenticated encryption (algorithm selectable)
  • Per-field encryption: Only specified fields are encrypted
  • Transparent decryption: Consumers with the same key can decrypt
  • Key rotation: Support for versioned keys enables rotation

Deduplication

Prevent duplicate events caused by network retries, replication lag, or connector restarts.

Deduplication Configuration

Option Type Default Description
cdc_features.deduplication.enabled bool false Enable deduplication
cdc_features.deduplication.ttl_seconds u64 3600 How long to remember seen events
cdc_features.deduplication.max_entries u64 100000 Max cache size (LRU eviction)

Example

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      cdc_features:
        deduplication:
          enabled: true
          ttl_seconds: 7200      # 2 hours
          max_entries: 500000    # 500K entries

How It Works

Events are hashed using a combination of:

  • Table name
  • Primary key values
  • Operation type
  • Timestamp

If an event with the same hash is seen within the TTL window, it is dropped.


Tombstone Handling

Properly emit tombstones (null-value records) for DELETE operations, enabling log compaction.

Tombstone Configuration

Option Type Default Description
cdc_features.tombstones.enabled bool false Enable tombstone emission
cdc_features.tombstones.emit_tombstone_after_delete bool true Emit tombstone after delete event

Example

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      cdc_features:
        tombstones:
          enabled: true
          emit_tombstone_after_delete: true

Why Tombstones Matter

In compacted topics, tombstones signal that a key should be deleted during compaction. Without proper tombstones:

  • Deleted records may reappear after compaction
  • Storage grows unbounded
  • Downstream consumers see stale data

Heartbeat Configuration

Heartbeat monitoring maintains replication slot health and provides lag detection for CDC connectors.

Why Heartbeats Matter

  • PostgreSQL: Without heartbeats, inactive replication slots accumulate WAL files, potentially filling disk
  • MySQL: Keeps binlog position fresh during periods of no changes
  • Health Monitoring: Detects stalled connections and replication lag

Heartbeat Options

Option Type Default Description
heartbeat_interval_secs int 10 Heartbeat interval in seconds

The heartbeat tracks position updates on every event and calculates lag. If the lag exceeds a threshold (default: 30x interval = 5 minutes for 10s interval), the connector reports as unhealthy.

Example

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      heartbeat_interval_secs: 10  # Update position every 10 seconds

Monitoring Heartbeat Health

The heartbeat status is exposed via the connector’s health check:

  • Healthy: Lag is below threshold
  • Unhealthy: Lag exceeds threshold (possible replication stall)

Initial and Incremental Snapshots

Rivven provides comprehensive snapshot support for CDC connectors, including initial table synchronization and incremental (ad-hoc) snapshots for catching up on missed changes.

Snapshot Modes

Mode Description
initial Snapshot on first start, then stream (default)
always Full snapshot on every start
never Skip snapshot, stream only
when_needed Snapshot only if no stored position available
initial_only Snapshot and stop (no streaming)
schema_only Capture schema, skip data
recovery Snapshot for disaster recovery

Snapshot Options

Option Type Default Description
snapshot_mode enum initial Snapshot behavior
snapshot_batch_size int 10000 Rows per SELECT batch
snapshot_parallel_tables int 4 Tables to snapshot concurrently
snapshot_query_timeout_secs int 300 Query timeout
snapshot_progress_dir string - Directory for progress persistence

Example Configuration

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      host: localhost
      port: 5432
      database: mydb
      
      # Snapshot settings
      snapshot_mode: initial
      snapshot_batch_size: 50000
      snapshot_parallel_tables: 4
      snapshot_progress_dir: /var/lib/rivven/progress

Progress Persistence

Rivven stores progress in the destination system (broker or files), never in the source database:

Storage Option Use Case
File-based Single-node deployments
Broker topics Distributed/HA deployments

File-Based Progress (Default)

Rivven tracks snapshot progress to disk, enabling resumable snapshots:

/var/lib/rivven/progress/
  public.users.json      # Progress for users table
  public.orders.json     # Progress for orders table

Each progress file contains:

  • Rows processed
  • Last key value (for resumption)
  • Watermark (WAL position at snapshot start)
  • State (pending, in_progress, completed, failed)

Broker-Based Progress (Distributed)

For distributed deployments, configure Rivven to store progress in dedicated topics:

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      # Use broker for offset storage (distributed mode)
      offset_storage: broker
      offset_topic: _rivven_cdc_offsets

Why not store in the source database?

  1. Source databases might be read-only replicas
  2. Avoid polluting source schemas with CDC metadata
  3. Separation of concerns: CDC metadata belongs with CDC infrastructure
  4. The broker is the natural location for consumer offset tracking

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Snapshot Coordinator                         │
├─────────────────────────────────────────────────────────────────┤
│  1. Get WAL position (watermark)                                │
│  2. Parallel SELECT with keyset pagination                      │
│  3. Emit events with op: Snapshot                               │
│  4. Save progress periodically                                  │
│  5. Start streaming from watermark                              │
└─────────────────────────────────────────────────────────────────┘

Keyset Pagination

Rivven uses keyset pagination (cursor-based) instead of OFFSET for efficient, memory-safe batch fetching:

-- First batch
SELECT * FROM users ORDER BY id LIMIT 10000;

-- Subsequent batches (using last key)
SELECT * FROM users WHERE id > 'last_key' ORDER BY id LIMIT 10000;

This approach:

  • Maintains consistent performance regardless of table size
  • Avoids skipping or duplicating rows
  • Allows resumption from any point

Incremental Snapshots (Ad-hoc)

For catching up on missed data during streaming, Rivven supports DBLog-style incremental snapshots via signal tables:

sources:
  production_postgres:
    connector: postgres-cdc
    config:
      signal_table: "rivven_signals"
      incremental_snapshot:
        enabled: true
        chunk_size: 10000
        watermark_strategy: insert_insert  # DBLog approach

Trigger an incremental snapshot by inserting a signal:

INSERT INTO rivven_signals (id, type, data)
VALUES (
  'snapshot-' || now()::text,
  'execute-snapshot',
  '{"data_collections": ["public.users", "public.orders"]}'::jsonb
);

Single Message Transforms (SMT)

Apply transformations to CDC events before they reach their destination. Transforms are applied in order, allowing powerful event processing pipelines.

Available Transforms

Transform Description
extract_new_record_state Flatten envelope, extract “after” state
value_to_key Extract key fields from value
mask_field Mask sensitive fields (PII, credit cards)
insert_field Add static or computed fields
replace_field / rename_field Rename, include, or exclude fields
regex_router Route events based on regex patterns
timestamp_converter Convert timestamp formats
filter Filter events based on conditions
cast Convert field types
flatten Flatten nested JSON structures
timezone_converter Convert timestamps between timezones
content_router Route events based on field content
header_to_value Copy envelope metadata into record
unwrap Extract nested field to top level
set_null Set fields to null conditionally
compute_field Compute new fields (concat, hash, etc.)
externalize_blob Store large blobs in object storage (S3/GCS/Azure)

Conditional Transform Application (Predicates)

Predicates allow you to apply transforms only to events matching specific conditions. This is essential when a single CDC connector captures multiple tables but you need different transforms for each table.

Predicates are evaluated before the transform runs. If the predicate doesn’t match, the event passes through unchanged (not filtered out).

Basic Predicate Options

Option Type Description
table string Apply only to events from this table (regex pattern)
tables list Apply to events from any of these tables (OR logic)
schema string Apply only to events from this schema (regex pattern)
schemas list Apply to events from any of these schemas (OR logic)
database string Apply only to events from this database (regex pattern)
operations list Apply only to these operations: insert, update, delete, snapshot, truncate
field_exists string Apply only if this field exists in the event
field_value object Apply only if field matches value: {field: "name", value: "val"}
negate bool Invert the predicate (apply when conditions DON’T match)

When multiple conditions are specified, they are combined with AND logic. Use negate: true to invert the entire predicate.

Example: Per-Table Transforms

Apply different transforms to different tables in the same CDC stream:

config:
  # Capture both tables
  tables:
    - public.users
    - public.documents
  
  transforms:
    # Mask SSN only for users table
    - type: mask_field
      name: mask_user_pii
      predicate:
        table: "users"
      config:
        fields:
          - ssn
          - credit_card

    # Externalize blobs only for documents table  
    - type: externalize_blob
      name: externalize_docs
      predicate:
        table: "documents"
      config:
        storage_type: s3
        bucket: my-blobs
        size_threshold: 100000

Example: Multiple Tables with Regex

Apply to tables matching a pattern:

transforms:
  - type: externalize_blob
    predicate:
      table: "^doc.*"  # Matches: documents, doc_archive, docs
    config:
      storage_type: s3
      bucket: doc-blobs

Example: Operation-Based Predicates

Apply transform only on specific operations:

transforms:
  # Externalize only on INSERT and UPDATE (not DELETE)
  - type: externalize_blob
    predicate:
      operations:
        - insert
        - update
    config:
      storage_type: s3
      bucket: my-blobs

Example: Combined Predicates (AND Logic)

Apply only when ALL conditions match:

transforms:
  # Externalize documents table on INSERT only
  - type: externalize_blob
    predicate:
      table: "documents"
      operations:
        - insert
    config:
      storage_type: s3
      bucket: my-blobs

Example: Negated Predicate

Apply to all tables EXCEPT a specific one:

transforms:
  # Externalize all tables EXCEPT audit_log
  - type: externalize_blob
    predicate:
      table: "audit_log"
      negate: true  # Invert: apply when table is NOT audit_log
    config:
      storage_type: s3
      bucket: my-blobs

Example: Schema-Based Filtering

Apply only to specific schemas:

transforms:
  # Mask PII only in production schema
  - type: mask_field
    predicate:
      schema: "production"
    config:
      fields:
        - ssn
        - credit_card

  # Externalize only in archive schema
  - type: externalize_blob
    predicate:
      schemas:
        - archive
        - historical
    config:
      storage_type: s3
      bucket: archive-blobs

Example: Field-Based Predicates

Apply only when specific fields exist or have certain values:

transforms:
  # Externalize only if 'large_content' field exists
  - type: externalize_blob
    predicate:
      field_exists: "large_content"
    config:
      storage_type: s3
      bucket: my-blobs

  # Externalize only if status is 'archived'
  - type: externalize_blob
    predicate:
      field_value:
        field: "status"
        value: "archived"
    config:
      storage_type: s3
      bucket: archive-blobs

Example: Predicates with Different Transform Types

Predicates work with any transform type, not just mask_field and externalize_blob:

sources:
  ecommerce_cdc:
    connector: postgres-cdc
    config:
      tables:
        - public.users
        - public.orders
        - public.products
        - public.audit_log

    transforms:
      # ── Users table: PII protection ──
      - type: mask_field
        predicate:
          table: "users"
        config:
          fields: [ssn, credit_card, password_hash]

      - type: replace_field
        predicate:
          table: "users"
        config:
          exclude: [internal_notes, admin_flags]
          rename:
            email_addr: email

      # ── Orders table: timestamp normalization + routing ──
      - type: timestamp_converter
        predicate:
          table: "orders"
        config:
          fields: [created_at, updated_at, shipped_at]
          format: iso8601

      - type: timezone_converter
        predicate:
          table: "orders"
        config:
          fields: [created_at]
          from: UTC
          to: America/New_York

      - type: content_router
        predicate:
          table: "orders"
        config:
          field: priority
          routes:
            high: priority-orders
            normal: standard-orders
          default: other-orders

      # ── Products table: computed fields + flatten ──
      - type: compute_field
        predicate:
          table: "products"
        config:
          computations:
            - target: display_name
              operation: concat
              fields: [brand, " ", name]
            - target: price_hash
              operation: hash
              fields: [id, price]

      - type: flatten
        predicate:
          table: "products"
        config:
          delimiter: "_"

      # ── Audit log: nullify empty strings + unwrap ──
      - type: set_null
        predicate:
          table: "audit_log"
        config:
          fields: [old_value, new_value]
          condition: if_empty

      - type: unwrap
        predicate:
          table: "audit_log"
        config:
          path: payload.data

      # ── Snapshots only: add marker field ──
      - type: insert_field
        predicate:
          operations: [snapshot]
        config:
          fields:
            _is_snapshot: true
            _loaded_at: "${current_timestamp}"

      # ── All tables: extract envelope + add metadata ──
      - type: extract_new_record_state
        config:
          add_table: true
          add_op: true
          add_ts: true

      - type: header_to_value
        config:
          fields:
            - target: __source_db
              source: database
            - target: __source_schema
              source: schema

Transforms without a predicate apply to all events. The pipeline executes top-to-bottom — order matters when transforms depend on each other’s output.

Configuration Reference

extract_new_record_state

Flatten the CDC envelope to just the data, optionally adding metadata.

Option Type Default Description
drop_tombstones bool false Drop delete events
add_table bool false Add __table field
add_schema bool false Add __schema field
add_op bool false Add __op field
add_ts bool false Add __ts field
transforms:
  - type: extract_new_record_state
    config:
      drop_tombstones: false
      add_table: true
      add_op: true

mask_field

Mask sensitive data for compliance (GDPR, PCI-DSS, HIPAA).

Option Type Default Description
fields list required Fields to mask
transforms:
  - type: mask_field
    config:
      fields:
        - ssn
        - credit_card_number
        - password_hash

timestamp_converter

Convert timestamps between formats.

Option Type Default Description
fields list required Fields to convert
format enum iso8601 Target format

Format Options: iso8601, epoch_seconds, epoch_millis, epoch_micros, date_only, time_only

transforms:
  - type: timestamp_converter
    config:
      fields:
        - created_at
        - updated_at
      format: iso8601

filter

Keep or drop events based on conditions.

Option Type Default Description
condition string required Filter condition
drop bool false Drop matching (vs keep)

Supported Operators: =, !=, is_null, is_not_null, matches, in

transforms:
  # Keep only active users
  - type: filter
    config:
      condition: "status = active"
      drop: false
  
  # Drop test records
  - type: filter
    config:
      condition: "email matches .*@test\\.com"
      drop: true

cast

Convert field types.

Option Type Default Description
specs map required Field -> type mapping

Type Options: string, integer, float, boolean, json

transforms:
  - type: cast
    config:
      specs:
        price: float
        quantity: integer
        is_active: boolean
        metadata: json

flatten

Flatten nested JSON structures.

Option Type Default Description
delimiter string . Key separator
max_depth int 0 Max depth (0=unlimited)
transforms:
  - type: flatten
    config:
      delimiter: "_"
      max_depth: 3

Before:

{"user": {"address": {"city": "NYC"}}}

After:

{"user_address_city": "NYC"}

insert_field

Add static or computed fields.

Option Type Default Description
static_fields map - Static values
timestamp_field string - Add current timestamp
date_field string - Add current date
transforms:
  - type: insert_field
    config:
      static_fields:
        source: "postgres-cdc"
        version: "1.0"
      timestamp_field: _ingested_at

replace_field / rename_field

Rename fields or filter to specific fields.

Option Type Default Description
renames map - old_name -> new_name
include list - Only keep these fields
exclude list - Remove these fields
transforms:
  - type: replace_field
    config:
      renames:
        usr_id: user_id
        ts: timestamp
      exclude:
        - internal_notes
        - debug_info

value_to_key

Extract fields from value to use as message key.

Option Type Default Description
fields list required Fields for key
transforms:
  - type: value_to_key
    config:
      fields:
        - id
        - tenant_id

regex_router

Route events to different topics based on patterns.

Option Type Default Description
default_topic string default Fallback topic
routes list - Pattern -> topic rules
transforms:
  - type: regex_router
    config:
      default_topic: events.other
      routes:
        - pattern: "^orders.*"
          topic: events.orders
        - pattern: "^users.*"
          topic: events.users

timezone_converter

Convert timestamp fields between timezones (IANA timezone names).

Option Type Default Description
fields list required Fields to convert
from string UTC Source timezone
to string required Target timezone
date_only bool false Output date only
transforms:
  - type: timezone_converter
    config:
      fields:
        - created_at
        - updated_at
      from: UTC
      to: America/New_York

content_router

Route events based on field values or patterns.

Option Type Default Description
default_topic string default Fallback topic
routes list - Field/value/pattern -> topic rules
transforms:
  - type: content_router
    config:
      default_topic: events.default
      routes:
        # Exact value matching
        - field: region
          value: us-east
          topic: events.us-east
        # Pattern matching
        - field: email
          pattern: ".*@enterprise\\.com"
          topic: events.enterprise

header_to_value

Copy envelope metadata (source, table, operation, etc.) into the record.

Option Type Default Description
fields map - source_field -> target_field mapping
all_headers_prefix string - Add all headers with this prefix
move bool false Move (remove from envelope) vs copy

Source Fields: source_type, database, schema, table, operation, timestamp, transaction_id

transforms:
  - type: header_to_value
    config:
      fields:
        source_type: db_source
        table: source_table
        operation: op_type
      # Or add all with prefix:
      # all_headers_prefix: "__"

unwrap

Extract a nested field to the top level.

Option Type Default Description
field string required Nested field path
transforms:
  - type: unwrap
    config:
      field: payload.data

Before:

{"id": 1, "payload": {"data": {"name": "test", "value": 42}}}

After:

{"id": 1, "name": "test", "value": 42}

set_null

Set specified fields to null, optionally with conditions.

Option Type Default Description
fields list required Fields to nullify
condition string/object always When to nullify

Conditions: always, if_empty, { "equals": value }, { "matches": "pattern" }

transforms:
  # Always nullify
  - type: set_null
    config:
      fields:
        - password
        - api_key
  
  # Conditionally nullify empty strings
  - type: set_null
    config:
      fields:
        - description
      condition: if_empty

compute_field

Compute new fields from existing data.

Option Type Default Description
computations list required List of computation specs

Computation Types:

  • concat: Concatenate field values
  • hash: Hash field values (SHA-256)
  • upper / lower: Case conversion
  • coalesce: First non-null value
  • uuid: Generate UUID
  • timestamp: Current timestamp
transforms:
  - type: compute_field
    config:
      computations:
        # Concatenate fields
        - target: full_name
          type: concat
          parts:
            - first_name
            - " "
            - last_name
        
        # Hash for anonymization
        - target: email_hash
          type: hash
          fields:
            - email
        
        # Case conversion
        - target: email_lower
          type: lower
          source: email
        
        # Default value
        - target: status
          type: coalesce
          fields:
            - status
            - default_status
        
        # Generate ID
        - target: event_id
          type: uuid
        
        # Add processing timestamp
        - target: processed_at
          type: timestamp

externalize_blob

Requires the cloud-storage feature. Enable with s3, gcs, or azure feature for cloud providers.

Externalize large blob values to object storage (S3/GCS/Azure) and replace them with reference URLs. This reduces message size and improves throughput for topics with binary data.

Option Type Default Description
provider enum required Storage provider: s3, gcs, azure, local
bucket string required Bucket/container name
size_threshold usize 10240 Minimum bytes to externalize
fields list [] Fields to check (empty = all fields)
prefix string "" Object key prefix
url_scheme string s3:// URL scheme for references

Provider-specific options:

S3:

transforms:
  - type: externalize_blob
    config:
      provider: s3
      bucket: my-cdc-blobs
      region: us-east-1
      size_threshold: 10240  # 10KB
      fields:
        - image_data
        - document_content
      prefix: cdc-blobs/production

GCS:

transforms:
  - type: externalize_blob
    config:
      provider: gcs
      bucket: my-cdc-blobs
      size_threshold: 10240
      prefix: cdc-blobs/

Azure:

transforms:
  - type: externalize_blob
    config:
      provider: azure
      account: mystorageaccount
      container: my-cdc-blobs
      size_threshold: 10240
      prefix: cdc-blobs/

Reference Format:

Externalized fields are replaced with a reference object:

{
  "__externalized": true,
  "url": "s3://my-cdc-blobs/cdc-blobs/users/image_data/1704067200000_abc123.bin",
  "size": 1048576,
  "content_type": "application/octet-stream",
  "sha256": "a1b2c3d4e5f6..."
}

Object Keys:

Objects are stored at: {prefix}/{table}/{field}/{timestamp}_{uuid}.bin

Rust API:

use rivven_cdc::ExternalizeBlob;

// Create with S3
let smt = ExternalizeBlob::s3("my-bucket", "us-east-1")?
    .size_threshold(10 * 1024)  // 10KB
    .fields(["image_data", "document"])
    .prefix("cdc-blobs/");

// Or with pre-configured ObjectStore
let smt = ExternalizeBlob::new(object_store, "my-bucket")
    .size_threshold(10 * 1024)
    .fields(["blob_field"]);

Transform Pipeline Example

Chain multiple transforms for complex processing:

config:
  transforms:
    # 1. Flatten envelope
    - type: extract_new_record_state
      config:
        add_table: true
        add_op: true
    
    # 2. Mask PII
    - type: mask_field
      config:
        fields: [ssn, credit_card]
    
    # 3. Filter out test data
    - type: filter
      config:
        condition: "email matches .*@test\\.com"
        drop: true
    
    # 4. Normalize timestamps
    - type: timestamp_converter
      config:
        fields: [created_at, updated_at]
        format: iso8601
    
    # 5. Route to topics
    - type: regex_router
      config:
        routes:
          - pattern: "^public\\.orders.*"
            topic: cdc.orders
          - pattern: "^public\\.users.*"
            topic: cdc.users

Monitoring Settings

Option Type Default Description
metrics.enabled bool true Enable metrics
metrics.port u16 8080 Metrics HTTP port
metrics.path string /metrics Metrics endpoint
metrics.labels map {} Custom labels

Example:

config:
  metrics:
    enabled: true
    port: 9090
    path: /prometheus/metrics
    labels:
      environment: production
      team: platform

PostgreSQL-Specific Options

Option Type Default Description
slot_name string rivven_slot Replication slot name
publication_name string rivven_pub Publication name
plugin enum pgoutput Output plugin
replication_mode enum logical Replication mode

PostgreSQL Signal Table

Option Type Default Description
signal_table.enabled bool false Enable signal table
signal_table.schema string rivven Signal table schema
signal_table.name string signals Signal table name

Example:

config:
  slot_name: my_app_slot
  publication_name: my_app_pub
  plugin: pgoutput
  
  signal_table:
    enabled: true
    schema: rivven
    name: signals

MySQL-Specific Options

Option Type Default Description
server_id u32 required Unique server ID
gtid_mode bool true Use GTID
gtid_set string - Starting GTID set
binlog_filename string - Starting binlog file
binlog_position u64 - Starting position

MariaDB Options

Option Type Default Description
mariadb_gtid bool false Use MariaDB GTID format
gtid_domain_id u32 - MariaDB domain ID

Example:

config:
  server_id: 12345
  gtid_mode: true
  # MySQL GTID set
  gtid_set: "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-5"

Environment Variable Reference

All options can be set via environment variables using this pattern: RIVVEN_CDC_<SECTION>_<OPTION> (uppercase, underscores)

Config Path Environment Variable
host RIVVEN_CDC_HOST
password RIVVEN_CDC_PASSWORD
tls.mode RIVVEN_CDC_TLS_MODE
retry.max_attempts RIVVEN_CDC_RETRY_MAX_ATTEMPTS
guardrails.max_lag RIVVEN_CDC_GUARDRAILS_MAX_LAG

Priority: Environment variables > YAML config > Defaults


Full Configuration Example

version: "1.0"

sources:
  production_postgres:
    connector: postgres-cdc
    topic: cdc.production
    enabled: true
    
    config:
      # Connection
      host: postgres.internal
      port: 5432
      database: production
      user: rivven_cdc
      password: ${POSTGRES_PASSWORD}
      
      # TLS
      tls:
        mode: verify-identity
        ca_cert_path: /etc/ssl/certs/ca.pem
        client_cert_path: /etc/ssl/certs/client.pem
        client_key_path: /etc/ssl/private/client.key
      
      # PostgreSQL-specific
      slot_name: rivven_prod_slot
      publication_name: rivven_prod_pub
      plugin: pgoutput
      
      signal_table:
        enabled: true
        schema: rivven
        name: signals
      
      # Table selection
      tables:
        - public.orders
        - public.order_items
        - public.customers
      exclude_tables:
        - "*_backup"
        - "*_archive"
      
      column_masks:
        - public.customers.ssn
        - public.customers.credit_card_number
      
      # Snapshot
      snapshot_mode: initial
      snapshot_batch_size: 50000
      snapshot_parallel_tables: 4
      
      # Performance
      batch_size: 5000
      batch_timeout: 200ms
      max_queue_size: 20000
      workers: 8
      
      backpressure:
        enabled: true
        high_watermark: 15000
        low_watermark: 5000
      
      # Resilience
      retry:
        max_attempts: 15
        initial_delay: 100ms
        max_delay: 60s
        multiplier: 2.0
        jitter: 0.1
      
      guardrails:
        max_lag: 5m
        max_queue_lag: 1m
        max_batch_size: 100000
        max_event_size_bytes: 10485760
      
      # Event format
      format: json
      envelope: standard
      decimal_handling: string
      time_precision: microseconds
      
      # Monitoring
      metrics:
        enabled: true
        port: 9090
        labels:
          environment: production
          service: cdc

Event Routing

Route CDC events to different destinations based on content, table, operation, or custom conditions.

Configuration

Option Type Default Description
router.enabled bool false Enable event routing
router.default_destination string - Default destination for unmatched events
router.dead_letter_queue string - Topic for unroutable events
router.drop_unroutable bool false Drop events that don’t match any rule
router.rules list [] Routing rules (evaluated in priority order)

Route Rule Options

Option Type Description
name string Rule name for logging/debugging
priority int Higher priority rules evaluated first (default: 0)
condition object Matching condition
destinations list Target topics for matched events
continue_matching bool Continue to next rule after match (default: false)

Condition Types

Type Description Fields
Always Always matches -
Table Match specific table table
TablePattern Match table via regex pattern
Schema Match database schema schema
Operation Match operation type op (insert/update/delete)
FieldExists Check field existence field
FieldValue Match field value field, value
FieldPattern Match field via regex field, pattern
Header Match event header header, value
And All conditions must match conditions
Or Any condition must match conditions
Not Negate a condition condition

Example:

config:
  router:
    enabled: true
    default_destination: default-events
    dead_letter_queue: dlq-events
    rules:
      - name: high_priority_orders
        priority: 100
        condition:
          type: And
          conditions:
            - type: Table
              table: public.orders
            - type: FieldValue
              field: priority
              value: high
        destinations:
          - priority-orders
        continue_matching: false
      
      - name: customer_changes
        priority: 50
        condition:
          type: TablePattern
          pattern: "public\\.customer.*"
        destinations:
          - customer-events
      
      - name: audit_deletes
        priority: 10
        condition:
          type: Operation
          op: delete
        destinations:
          - audit-topic
          - delete-archive
        continue_matching: true

Partitioning

Control how events are distributed across topic partitions for ordering guarantees and parallelism.

Configuration

Option Type Default Description
partitioner.enabled bool false Enable custom partitioning
partitioner.num_partitions u32 1 Number of partitions
partitioner.strategy object - Partitioning strategy

Partition Strategies

Strategy Description Use Case
RoundRobin Distribute evenly Maximum throughput
KeyHash Hash primary key Per-row ordering
TableHash Hash table name Per-table ordering
FullTableHash Hash schema.table Multi-schema environments
Sticky Same partition per batch Batch locality

Example:

config:
  partitioner:
    enabled: true
    num_partitions: 16
    strategy:
      type: KeyHash
      # Or: type: TableHash
      # Or: type: RoundRobin

Pipeline Processing

Build composable CDC processing pipelines with stages for filtering, transformation, and routing.

Configuration

Option Type Default Description
pipeline.enabled bool false Enable pipeline processing
pipeline.name string - Pipeline name for logging
pipeline.dead_letter_queue string - Topic for failed events
pipeline.concurrency int 1 Parallel processing workers
pipeline.stages list [] Processing stages

Stage Types

Stage Description
Filter Drop events matching condition
Transform Apply SMT transforms
Route Content-based routing

Example:

config:
  pipeline:
    enabled: true
    name: order-processing
    dead_letter_queue: pipeline-dlq
    concurrency: 4
    stages:
      - type: Filter
        condition:
          type: FieldValue
          field: status
          value: draft
      
      - type: Transform
        transforms:
          - type: ExtractNewRecordState
          - type: MaskField
            config:
              fields: [credit_card]
      
      - type: Route
        rules:
          - condition:
              type: FieldValue
              field: priority
              value: urgent
            destinations:
              - urgent-orders

Log Compaction

Keep only the latest state per key to reduce storage and replay time.

Configuration

Option Type Default Description
compaction.enabled bool false Enable log compaction
compaction.key_columns list [] Columns forming the compaction key
compaction.min_cleanable_ratio f64 0.5 Trigger compaction at this duplicate ratio
compaction.segment_size u64 104857600 Segment size in bytes (100MB)
compaction.delete_retention_ms u64 86400000 Keep tombstones for 24 hours
compaction.min_compaction_lag_ms u64 0 Minimum age before compaction
compaction.max_compaction_lag_ms u64 0 Force compaction after this age (0 = disabled)
compaction.cleanup_policy string compact compact, delete, or compact_delete

Example:

config:
  compaction:
    enabled: true
    key_columns:
      - id
    min_cleanable_ratio: 0.5
    segment_size: 104857600
    delete_retention_ms: 86400000
    cleanup_policy: compact

Parallel CDC Processing

Process multiple tables concurrently for maximum throughput.

Configuration

Option Type Default Description
parallel.enabled bool false Enable parallel processing
parallel.concurrency int 4 Max concurrent table streams
parallel.per_table_buffer int 1000 Events to buffer per table
parallel.work_stealing bool true Rebalance work across threads
parallel.backpressure_threshold f64 0.8 Throttle at this buffer utilization
parallel.batch_timeout_ms u64 100 Max time to accumulate batch

Example:

config:
  parallel:
    enabled: true
    concurrency: 8
    per_table_buffer: 1000
    work_stealing: true
    backpressure_threshold: 0.8
    batch_timeout_ms: 100

Transactional Outbox Pattern

Reliably publish events using the transactional outbox pattern.

Configuration

Option Type Default Description
outbox.enabled bool false Enable outbox pattern
outbox.table_name string outbox Outbox table name
outbox.poll_interval_ms u64 1000 Polling interval in ms
outbox.batch_size int 100 Events per batch
outbox.max_retries int 3 Retry failed events
outbox.retry_delay_ms u64 1000 Delay between retries
outbox.delete_after_publish bool true Delete processed events

Outbox Table Schema

CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(255) NOT NULL,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    payload JSONB NOT NULL,
    timestamp TIMESTAMPTZ DEFAULT NOW(),
    published BOOLEAN DEFAULT FALSE,
    retries INTEGER DEFAULT 0
);

CREATE INDEX idx_outbox_unpublished ON outbox (timestamp) WHERE NOT published;

Example:

config:
  outbox:
    enabled: true
    table_name: outbox
    poll_interval_ms: 1000
    batch_size: 100
    max_retries: 3
    delete_after_publish: true

Health Monitoring

Monitor CDC connector health with automatic failure detection and recovery.

Configuration

Option Type Default Description
health.enabled bool false Enable health monitoring
health.check_interval_secs u64 10 Interval between health checks
health.max_lag_ms u64 30000 Maximum allowed replication lag
health.failure_threshold u32 3 Failed checks before unhealthy
health.success_threshold u32 2 Successful checks to recover
health.check_timeout_secs u64 5 Timeout for health checks
health.auto_recovery bool true Enable automatic recovery
health.recovery_delay_secs u64 1 Initial recovery delay
health.max_recovery_delay_secs u64 60 Maximum recovery delay (backoff)

Health States

State Description
Healthy All checks passing
Degraded Some checks passing with warnings
Unhealthy Failure threshold exceeded
Recovering Auto-recovery in progress

Example:

config:
  health:
    enabled: true
    check_interval_secs: 10
    max_lag_ms: 30000
    failure_threshold: 3
    success_threshold: 2
    auto_recovery: true
    recovery_delay_secs: 1
    max_recovery_delay_secs: 60

Auto-Recovery Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│                      Health Monitoring Flow                                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  HEALTHY ──[3 failures]──► UNHEALTHY ──[auto_recovery]──► RECOVERING        │
│     ▲                                                          │            │
│     │                                                          │            │
│     └──────────────────[2 successes]───────────────────────────┘            │
│                                                                              │
│  Recovery uses exponential backoff:                                          │
│    1s → 2s → 4s → 8s → ... → max_recovery_delay_secs                        │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

Prometheus Metrics

The health monitor emits the following Prometheus metrics:

Metric Type Description
rivven_cdc_health_monitoring_enabled Gauge Whether health monitoring is enabled
rivven_cdc_health_state_healthy Gauge Current health state (1=healthy, 0=unhealthy)
rivven_cdc_health_state_ready Gauge Current readiness state
rivven_cdc_health_checks_passed_total Counter Total health checks that passed
rivven_cdc_health_checks_failed_total Counter Total health checks that failed
rivven_cdc_health_state_transitions_total Counter Health state transitions (by direction)
rivven_cdc_health_recoveries_succeeded_total Counter Successful recovery attempts
rivven_cdc_health_recoveries_failed_total Counter Failed recovery attempts
rivven_cdc_health_unhealthy_time_ms_total Counter Total time spent in unhealthy state

Kubernetes Probes

The health monitor provides K8s-compatible liveness and readiness probes:

// Liveness probe (is the process alive?)
let (status, msg) = processor.liveness_probe().await;
// Returns (200, "OK") or (503, "Service Unavailable")

// Readiness probe (can we serve traffic?)
let (status, msg) = processor.readiness_probe().await;

// JSON health endpoint
let json = processor.health_json().await;
// {"status":"healthy","ready":true,"lag_ms":100,"uptime_secs":3600,...}

Custom Health Checks

Register custom health checks for application-specific monitoring:

// Register a database connectivity check
processor.register_health_check("database", || async {
    match check_database_connection().await {
        Ok(_) => HealthCheckResult::Healthy,
        Err(e) => HealthCheckResult::Unhealthy(e.to_string()),
    }
}).await;

// Unregister when no longer needed
processor.unregister_health_check("database").await;

Notifications

Subscribe to CDC progress and status notifications for operational visibility.

Configuration

Option Type Default Description
notifications.enabled bool false Enable notifications
notifications.channels list [] Notification channels
notifications.snapshot_progress bool true Snapshot progress events
notifications.streaming_status bool true Streaming status events
notifications.error_notifications bool true Error notifications
notifications.min_interval_ms u64 1000 Debounce interval

Channel Types

Type Description
log Log notifications (configurable level)
webhook HTTP webhook notifications
metrics Emit as Prometheus metrics

Notification Types

Type Description
InitialSnapshotStarted Initial snapshot began
InitialSnapshotInProgress Snapshot progress update
InitialSnapshotTableCompleted Single table completed
InitialSnapshotCompleted All tables completed
StreamingStarted Streaming began
StreamingLagUpdate Replication lag changed
StreamingError Streaming error occurred
ConnectorPaused Connector paused via signal
ConnectorResumed Connector resumed
SchemaChangeDetected DDL change detected

Example:

config:
  notifications:
    enabled: true
    snapshot_progress: true
    streaming_status: true
    error_notifications: true
    min_interval_ms: 1000
    channels:
      - type: log
        level: info
      
      - type: webhook
        url: https://api.example.com/cdc-events
        authorization: "Bearer ${CDC_WEBHOOK_TOKEN}"
        timeout_secs: 10
      
      - type: metrics
        prefix: rivven_cdc

Webhook Payload

{
  "type": "INITIAL_SNAPSHOT_IN_PROGRESS",
  "connector_id": "orders-cdc",
  "timestamp": "2026-02-02T10:30:00Z",
  "data": {
    "table": "public.orders",
    "rows_completed": 50000,
    "total_rows": 100000,
    "percent_complete": 50.0
  }
}

Next Steps


Back to top

Copyright © 2026 Rivven Contributors. Licensed under the Apache License 2.0.