PostgreSQL CDC Guide
Complete guide to Change Data Capture from PostgreSQL databases.
Table of contents
- Overview
- Prerequisites
- Basic Configuration
- Configuration Reference
- TLS Configuration
- Authentication
- Signal Table
- Incremental Snapshots
- Read Replica Support
- Monitoring
- Security Considerations
- Troubleshooting
- Best Practices
- Examples
- Next Steps
Overview
Rivven CDC for PostgreSQL uses logical replication via the pgoutput plugin to stream row-level changes in real-time. This guide covers:
- Prerequisites and database setup
- Configuration options
- TLS/mTLS security
- Advanced features (signals, incremental snapshots)
- Troubleshooting
Prerequisites
PostgreSQL Version
PostgreSQL 14+ is recommended. Versions 10-13 reached end-of-life.
| Version | Support | EOL | Notes |
|---|---|---|---|
| 9.x | ❌ | - | No logical replication |
| 10.x-13.x | ⚠️ | EOL | Not recommended (unsupported) |
| 14.x | ✅ | Nov 2026 | Streaming large transactions |
| 15.x | ✅ | Nov 2027 | Row filters, column lists |
| 16.x | ✅ | Nov 2028 | Parallel apply (recommended) |
| 17.x | ✅ | Nov 2029 | Enhanced logical replication (latest) |
We test against PostgreSQL 14, 15, 16, and 17 in CI. PostgreSQL 16 is our recommended version for production deployments.
Database Configuration
Edit postgresql.conf:
# Required settings
wal_level = logical # Enable logical decoding
max_replication_slots = 4 # At least 1 per CDC connector
max_wal_senders = 4 # At least 1 per CDC connector
# Recommended settings
wal_keep_size = 1GB # Prevent slot from blocking WAL recycling
hot_standby_feedback = on # If using read replicas
Restart PostgreSQL after changes.
User Permissions
Create a dedicated replication user:
-- Create user with replication privilege
CREATE ROLE rivven WITH REPLICATION LOGIN PASSWORD 'secure_password';
-- Grant SELECT on tables to capture
GRANT SELECT ON ALL TABLES IN SCHEMA public TO rivven;
GRANT USAGE ON SCHEMA public TO rivven;
-- For auto-provisioning (optional)
GRANT CREATE ON DATABASE mydb TO rivven;
Superuser Alternative (development only):
ALTER ROLE rivven SUPERUSER;
Basic Configuration
Minimal Setup
use rivven_cdc::postgres::{PostgresCdc, PostgresCdcConfig};
use rivven_cdc::CdcSource;
let config = PostgresCdcConfig::builder()
.connection_string("postgres://rivven:password@localhost:5432/mydb")
.slot_name("rivven_slot")
.publication_name("rivven_pub")
.build()?;
let mut cdc = PostgresCdc::new(config);
// Process events
while let Some(event) = cdc.next().await? {
println!("Event: {:?}", event);
}
YAML Configuration (rivven-connect)
Single topic (all tables → one topic):
version: "1.0"
sources:
orders_cdc:
connector: postgres-cdc
topic: cdc.orders # Fallback topic
config:
host: localhost
port: 5432
database: shop
user: rivven
password: ${POSTGRES_PASSWORD}
slot_name: rivven_orders
publication_name: rivven_orders_pub
tables:
- public.orders
- public.order_items
Dynamic topic routing (per-table topics):
version: "1.0"
sources:
orders_cdc:
connector: postgres-cdc
topic: cdc.default # Fallback topic
config:
host: localhost
port: 5432
database: shop
user: rivven
password: ${POSTGRES_PASSWORD}
slot_name: rivven_orders
publication_name: rivven_orders_pub
tables:
- public.orders # → cdc.public.orders
- public.order_items # → cdc.public.order_items
topic_routing: "cdc.{schema}.{table}" # Dynamic routing
Topic routing supports placeholders:
{database},{schema},{table}. See CDC Configuration Reference for details.
Configuration Reference
Connection Settings
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
string | localhost |
PostgreSQL host |
port |
u16 | 5432 |
PostgreSQL port |
database |
string | required | Database name |
user |
string | required | Username |
password |
string | required | Password |
connection_string |
string | - | Alternative: full connection URI |
connect_timeout |
duration | 30s |
Connection timeout |
socket_timeout |
duration | 60s |
Socket read/write timeout |
Replication Settings
| Parameter | Type | Default | Description |
|---|---|---|---|
slot_name |
string | required | Replication slot name (unique) |
publication_name |
string | required | Publication name |
auto_create_slot |
bool | true |
Auto-create slot if missing |
auto_create_publication |
bool | true |
Auto-create publication if missing |
drop_slot_on_stop |
bool | false |
Drop slot on shutdown (dev only) |
Table Selection
| Parameter | Type | Default | Description |
|---|---|---|---|
tables |
list | [] |
Tables to include (empty = all) |
exclude_tables |
list | [] |
Tables to exclude |
table_pattern |
regex | - | Regex pattern for table names |
Snapshot Settings
| Parameter | Type | Default | Description |
|---|---|---|---|
snapshot_mode |
enum | initial |
Snapshot behavior (see below) |
snapshot_batch_size |
u64 | 10000 |
Rows per snapshot batch |
snapshot_parallel |
u32 | 1 |
Parallel snapshot threads |
snapshot_fetch_size |
u32 | 1024 |
Server-side cursor fetch size |
Snapshot Modes:
All standard snapshot modes are supported:
| Mode | Description |
|---|---|
initial |
Snapshot on first run, then stream changes (default) |
always |
Full snapshot on every restart |
never |
Skip snapshot, stream changes only |
when_needed |
Snapshot if no valid LSN position exists |
initial_only |
Snapshot and stop (for data migration) |
no_data |
Capture schema only, skip data (alias: schema_only) |
TLS Settings
| Parameter | Type | Default | Description |
|---|---|---|---|
tls.mode |
enum | prefer |
TLS mode |
tls.ca_cert_path |
path | - | CA certificate file |
tls.client_cert_path |
path | - | Client certificate (mTLS) |
tls.client_key_path |
path | - | Client private key (mTLS) |
tls.verify_hostname |
bool | true |
Verify server hostname |
TLS Modes:
| Mode | Description |
|---|---|
disable |
No TLS |
prefer |
TLS if available (default) |
require |
TLS required, no cert validation |
verify-ca |
TLS + CA certificate validation |
verify-full |
TLS + CA + hostname validation |
Advanced Settings
| Parameter | Type | Default | Description |
|---|---|---|---|
heartbeat_interval |
duration | 10s |
Heartbeat frequency |
heartbeat_action_query |
string | - | Custom SQL on heartbeat |
max_batch_size |
u64 | 8192 |
Max events per batch |
lsn_commit_timeout |
duration | 5s |
LSN commit timeout |
wal_sender_timeout |
duration | 60s |
Replication timeout |
Composite Primary Key Snapshots
Tables with composite primary keys are fully supported during snapshots. Keyset pagination uses all key columns in the ORDER BY and WHERE clauses, so initial snapshots work correctly on tables with multi-column primary keys without manual configuration.
Proactive WAL Status Updates
Rivven sends proactive StandbyStatusUpdate messages to PostgreSQL during streaming, including keepalive responses. This prevents the server from terminating idle replication connections due to wal_sender_timeout and keeps replication slot positions advancing even during periods of low change activity.
TLS Configuration
Basic TLS (verify-ca)
config:
tls:
mode: verify-ca
ca_cert_path: /etc/ssl/certs/postgresql-ca.crt
Mutual TLS (mTLS)
config:
tls:
mode: verify-full
ca_cert_path: /etc/ssl/certs/ca.crt
client_cert_path: /etc/ssl/certs/client.crt
client_key_path: /etc/ssl/private/client.key
AWS RDS
config:
tls:
mode: verify-full
ca_cert_path: /etc/ssl/certs/rds-ca-2019-root.pem
Download RDS CA certificate:
curl -o /etc/ssl/certs/rds-ca-2019-root.pem \
https://s3.amazonaws.com/rds-downloads/rds-ca-2019-root.pem
Authentication
SCRAM-SHA-256 (Recommended)
Rivven supports modern PostgreSQL SCRAM authentication:
-- PostgreSQL 10+: Ensure scram-sha-256 is configured
-- In pg_hba.conf:
host all rivven 0.0.0.0/0 scram-sha-256
No additional configuration needed - Rivven auto-negotiates.
MD5 (Legacy)
MD5 authentication is supported but not recommended:
-- In pg_hba.conf:
host all rivven 0.0.0.0/0 md5
Signal Table
Control CDC connectors at runtime without restarts.
Setup
-- Create signal table
CREATE TABLE public.rivven_signal (
id VARCHAR(64) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data JSONB
);
-- Grant permissions
GRANT SELECT, INSERT, DELETE ON public.rivven_signal TO rivven;
Configuration
config:
signal_config:
enabled: true
data_collection: public.rivven_signal
channels:
- source # Read from database
- topic # Read from Rivven topic
- file # Read from file
Signal Types
| Signal | Description | Example |
|---|---|---|
execute-snapshot |
Trigger incremental snapshot | {"data-collections": ["public.users"]} |
stop-snapshot |
Cancel running snapshot | {} |
pause |
Pause CDC streaming | {} |
resume |
Resume CDC streaming | {} |
Example: Trigger Snapshot
INSERT INTO public.rivven_signal (id, type, data)
VALUES (
'signal-001',
'execute-snapshot',
'{"data-collections": ["public.orders"]}'::jsonb
);
Incremental Snapshots
Re-snapshot specific tables while streaming continues (DBLog algorithm).
Configuration
config:
incremental_snapshot:
enabled: true
chunk_size: 10000 # Rows per chunk
watermark_strategy: insert_delete # or insert_insert
signal_data_collection: public.rivven_signal
Trigger via Signal
INSERT INTO public.rivven_signal (id, type, data)
VALUES (
'incr-snap-001',
'execute-snapshot',
'{"type": "incremental", "data-collections": ["public.orders"]}'::jsonb
);
Watermark Strategies
| Strategy | Description | Signal Table Impact |
|---|---|---|
insert_insert |
Insert open marker, insert close marker | Table grows |
insert_delete |
Insert open marker, delete on close | Table stays small |
Read Replica Support
Capture changes from PostgreSQL standbys (PostgreSQL 13+).
Requirements
- PostgreSQL 13+ on primary
hot_standby_feedback = onon replica- Replication slot created on primary
Configuration
config:
read_only: true
heartbeat_watermarking: true
signal_config:
enabled: true
channels:
- topic # Cannot write to source on replica
- file
Monitoring
Prometheus Metrics
# Core metrics
rivven_cdc_events_total{connector="orders_cdc",op="insert"}
rivven_cdc_events_total{connector="orders_cdc",op="update"}
rivven_cdc_events_total{connector="orders_cdc",op="delete"}
rivven_cdc_lag_milliseconds{connector="orders_cdc"}
rivven_cdc_connected{connector="orders_cdc"}
# Snapshot metrics
rivven_cdc_snapshot_running{connector="orders_cdc"}
rivven_cdc_snapshot_rows_scanned{connector="orders_cdc",table="orders"}
rivven_cdc_snapshot_duration_ms{connector="orders_cdc"}
# Performance metrics
rivven_cdc_processing_time_p99_us{connector="orders_cdc"}
rivven_cdc_batch_size{connector="orders_cdc"}
Grafana Dashboard
Import the Rivven CDC dashboard (ID: TBD) or use these panels:
- Event Rate:
rate(rivven_cdc_events_total[5m]) - Lag:
rivven_cdc_lag_milliseconds - Error Rate:
rate(rivven_cdc_errors_total[5m]) - Connection Status:
rivven_cdc_connected
Security Considerations
Identifier Validation
All connection-level identifiers (user, database, slot_name, publication_name) and snapshot identifiers (table, schema, key column) are validated via Validator::validate_identifier(), enforcing ^[a-zA-Z_][a-zA-Z0-9_]{0,254}$. Invalid identifiers are rejected before any SQL or replication protocol command is constructed.
SQL Injection Prevention
Snapshot SELECT queries apply defense-in-depth double-quote-doubling (" → "") on all identifiers, preventing injection even if validation is bypassed. Keyset pagination values are always passed as parameterized query parameters ($1).
Authentication
SCRAM-SHA-256 is fully supported and recommended. Cleartext password warnings are logged when authentication occurs over unencrypted connections.
Troubleshooting
Common Issues
“replication slot does not exist”
Cause: Slot was dropped or never created.
Solution:
config:
auto_create_slot: true # Or create manually
Manual creation:
SELECT pg_create_logical_replication_slot('rivven_slot', 'pgoutput');
“publication does not exist”
Cause: Publication was dropped or never created.
Solution:
config:
auto_create_publication: true # Or create manually
Manual creation:
CREATE PUBLICATION rivven_pub FOR ALL TABLES;
-- Or specific tables:
CREATE PUBLICATION rivven_pub FOR TABLE orders, customers;
“must be superuser or replication role”
Cause: User lacks replication privileges.
Solution:
ALTER ROLE rivven WITH REPLICATION;
“WAL segment has been removed”
Cause: CDC fell too far behind and WAL was recycled.
Solution:
- Increase
wal_keep_sizeinpostgresql.conf - Use replication slots (they prevent WAL recycling)
- Ensure CDC is consuming fast enough
# postgresql.conf
wal_keep_size = 5GB # Increase from default
High Replication Lag
Cause: CDC not keeping up with change rate.
Solution:
- Increase batch size:
config: max_batch_size: 16384 - Check downstream sink performance
- Consider parallel processing
TLS Connection Failures
Cause: Certificate mismatch or path issues.
Solution:
- Verify certificate paths exist and are readable
- Check CA chain is complete
- Use
verify-cafirst, then upgrade toverify-full
# Test certificate
openssl s_client -connect localhost:5432 -starttls postgres
Best Practices
Production Checklist
- Dedicated replication user (not superuser)
- TLS enabled (
verify-fullrecommended) - Replication slot created manually (not auto)
- Publication includes only needed tables
- Monitoring and alerting configured
- WAL retention sized appropriately
- Snapshot mode is
initialornever drop_slot_on_stop: falsein production
Performance Tuning
config:
# Larger batches for throughput
max_batch_size: 16384
snapshot_batch_size: 50000
# Parallel snapshots
snapshot_parallel: 4
# Reduce heartbeat overhead
heartbeat_interval: 30s
High Availability
-
Primary failover: Replication slots don’t survive failover. Use
when_neededsnapshot mode to recover. -
Read replica CDC: Use PostgreSQL 13+ with logical decoding on standby.
-
Multiple connectors: Use unique slot names per connector.
Examples
Full Production Configuration
version: "1.0"
sources:
production_cdc:
connector: postgres-cdc
topic: cdc.production # All events go to this topic
config:
# Connection
host: postgres-primary.internal
port: 5432
database: production
user: rivven_cdc
password: ${POSTGRES_CDC_PASSWORD}
connect_timeout: 30s
socket_timeout: 60s
# TLS
tls:
mode: verify-full
ca_cert_path: /etc/ssl/certs/db-ca.crt
client_cert_path: /etc/ssl/certs/client.crt
client_key_path: /etc/ssl/private/client.key
# Replication
slot_name: rivven_prod_slot
publication_name: rivven_prod_pub
auto_create_slot: false
auto_create_publication: false
# Tables
tables:
- public.orders
- public.order_items
- public.customers
exclude_tables:
- "*_audit"
- "*_log"
# Snapshot
snapshot_mode: initial
snapshot_batch_size: 50000
snapshot_parallel: 4
# Filtering
column_masks:
- public.customers.ssn
- public.customers.credit_card
# Advanced
heartbeat_interval: 30s
max_batch_size: 16384
# Signal
signal_config:
enabled: true
data_collection: public.cdc_signals
channels:
- source
- topic
Next Steps
- MySQL CDC Guide - MySQL/MariaDB setup
- CDC Troubleshooting - Debug common issues
- CDC Configuration Reference - All parameters