CDC Troubleshooting Guide

Diagnose and resolve common CDC issues.

Table of contents

  1. Diagnostic Commands
    1. Check Connector Status
    2. View Extended Metrics
    3. Export Prometheus Metrics
  2. Common Issues
    1. Connection Problems
      1. “Connection refused”
      2. “Authentication failed”
      3. “TLS/SSL errors”
    2. Replication Problems
      1. “Replication slot does not exist” (PostgreSQL)
      2. “Replication slot is active” (PostgreSQL)
      3. “Server ID conflict” (MySQL)
      4. “WAL/Binlog purged”
    3. Schema Problems
      1. “Column not found in schema”
      2. “Schema evolution not supported”
    4. Performance Problems
      1. High Replication Lag
      2. Memory Growth
      3. Snapshot Taking Too Long
    5. Data Issues
      1. Missing Events
      2. Duplicate Events
      3. Incorrect Data Types
  3. Health Checks
    1. Basic Health Check
    2. Kubernetes Liveness Probe
    3. Alert Rules
  4. Recovery Procedures
    1. Force Reconnection
    2. Reset Offset
    3. Emergency Cleanup (PostgreSQL)
    4. Emergency Cleanup (MySQL)
  5. Debug Logging
    1. Enable Verbose Logging
    2. Log Levels
    3. Event Tracing
  6. Getting Help
    1. Information to Collect
    2. Debug Report
  7. Next Steps

Diagnostic Commands

Check Connector Status

use rivven_cdc::CdcSource;

// Get current state
let state = cdc.state();
println!("Status: {:?}", state.status);
println!("Position: {:?}", state.position);
println!("Lag: {:?}", state.lag);

View Extended Metrics

use rivven_cdc::common::ExtendedCdcMetrics;

let metrics = cdc.extended_metrics();

// Check streaming status
println!("Connected: {}", metrics.is_connected());
println!("Last event: {:?}", metrics.last_event_time());
println!("Events/sec: {}", metrics.events_per_second());

// Check latency
println!("Capture to emit: {:?}", metrics.capture_to_emit_latency_p99());
println!("Transaction latency: {:?}", metrics.committed_transaction_latency_p99());

Export Prometheus Metrics

// Get all metrics in Prometheus format
let prometheus_output = metrics.to_prometheus();
println!("{}", prometheus_output);

Common Issues

Connection Problems

“Connection refused”

Symptoms:

  • Connector fails to start
  • Error: Connection refused (os error 111)

Causes:

  1. Database server not running
  2. Wrong host/port
  3. Firewall blocking connection

Solutions:

# Test connectivity
nc -zv localhost 5432

# Check database is running (PostgreSQL)
pg_isready -h localhost -p 5432

# Check database is running (MySQL)
mysqladmin ping -h localhost

“Authentication failed”

Symptoms:

  • Error: password authentication failed for user
  • Error: Access denied for user

Solutions:

-- PostgreSQL: Check user exists
SELECT usename, passwd IS NOT NULL as has_password 
FROM pg_shadow WHERE usename = 'rivven';

-- MySQL: Check user exists
SELECT User, Host FROM mysql.user WHERE User = 'rivven';

-- Reset password
ALTER USER 'rivven' WITH PASSWORD 'new_password';

“TLS/SSL errors”

Symptoms:

  • Error: certificate verify failed
  • Error: unknown CA

Solutions:

# Use correct CA certificate
tls:
  mode: verify-ca
  ca_cert_path: /correct/path/to/ca.pem

# For testing only (not production!)
tls:
  mode: require  # Encrypted but no verification

Replication Problems

“Replication slot does not exist” (PostgreSQL)

Symptoms:

  • Error: replication slot "rivven_slot" does not exist

Solutions:

-- Create slot manually
SELECT pg_create_logical_replication_slot('rivven_slot', 'pgoutput');

-- List existing slots
SELECT slot_name, plugin, database FROM pg_replication_slots;

-- Drop stale slot
SELECT pg_drop_replication_slot('old_slot_name');

“Replication slot is active” (PostgreSQL)

Symptoms:

  • Error: replication slot is already active

Solutions:

-- Find active connection
SELECT * FROM pg_stat_replication;

-- Terminate (if safe)
SELECT pg_terminate_backend(pid) 
FROM pg_stat_activity 
WHERE usename = 'rivven' AND application_name LIKE 'rivven%';

-- Then restart CDC connector

“Server ID conflict” (MySQL)

Symptoms:

  • Error: A slave with the same server_uuid/server_id

Solutions:

Use a unique server_id for each CDC instance:

config:
  server_id: 12346  # Different from other replicas

“WAL/Binlog purged”

Symptoms:

  • PostgreSQL: requested WAL segment has already been removed
  • MySQL: Could not find first log file name in binary log index file

Causes:

  • CDC connector was offline too long
  • WAL/binlog retention too short

Solutions:

-- PostgreSQL: Increase retention
ALTER SYSTEM SET wal_keep_size = '16GB';
SELECT pg_reload_conf();

-- MySQL: Increase retention (my.cnf)
-- expire_logs_days = 14

Recovery:

# Force new snapshot
snapshot_mode: always

Schema Problems

“Column not found in schema”

Symptoms:

  • Error: column "new_column" not found in schema

Causes:

  • Schema changed but CDC has cached old schema

Solutions:

// Force schema refresh
cdc.refresh_schema().await?;

Or restart the CDC connector with fresh cache.

“Schema evolution not supported”

Symptoms:

  • Error after DROP COLUMN or type change

Solutions:

Use schema evolution settings:

config:
  schema_evolution:
    mode: compatible  # or: full, none
    allow_column_drops: true

Performance Problems

High Replication Lag

Symptoms:

  • rivven_cdc_lag_milliseconds consistently high
  • Events arriving late

Diagnosis:

let metrics = cdc.extended_metrics();
println!("Source lag: {:?}", metrics.source_lag());
println!("Emit lag: {:?}", metrics.emit_lag());
println!("Pending events: {}", metrics.pending_events());

Solutions:

  1. Scale workers:
    config:
      workers: 8
    
  2. Increase batch size:
    config:
      batch_size: 10000
    
  3. Check bottlenecks:
    • Network bandwidth
    • Downstream sink throughput
    • CPU usage on CDC host

Memory Growth

Symptoms:

  • Memory usage keeps increasing
  • OOM errors

Diagnosis:

let metrics = cdc.extended_metrics();
println!("Memory usage: {} bytes", metrics.memory_usage_bytes());
println!("Queue size: {}", metrics.queue_size());

Solutions:

  1. Limit queue size:
    config:
      max_queue_size: 10000
      queue_timeout: 30s
    
  2. Enable backpressure:
    config:
      backpressure:
        enabled: true
        high_watermark: 8000
        low_watermark: 2000
    

Snapshot Taking Too Long

Symptoms:

  • Initial snapshot runs for hours
  • Database load spikes during snapshot

Solutions:

  1. Use parallel snapshot:
    config:
      snapshot_parallel_tables: 4
    
  2. Filter tables:
    config:
      tables:
        - app.critical_table
      exclude_tables:
        - "*_archive"
        - "*_backup"
    
  3. Use incremental snapshot (PostgreSQL):
    config:
      snapshot_mode: incremental
      signal_table: rivven.signals
    

Data Issues

Missing Events

Symptoms:

  • Source has more changes than CDC emitted
  • Gaps in data

Diagnosis:

// Check for skipped events
let metrics = cdc.extended_metrics();
println!("Filtered events: {}", metrics.filtered_events());
println!("Skipped events: {}", metrics.skipped_events());
println!("Error events: {}", metrics.errored_events());

Common causes:

  1. Table not in inclusion list
  2. Filter rules excluding events
  3. Connector crashed and restarted without checkpointing

Solutions:

  1. Check table filters:
    config:
      tables:
        - schema.table_name  # Ensure table is included
    
  2. Disable aggressive filtering:
    config:
      filter:
        include_all_dml: true
    
  3. Force re-snapshot:
    config:
      snapshot_mode: always
    

Duplicate Events

Symptoms:

  • Same event received multiple times
  • Idempotency errors downstream

Causes:

  • At-least-once delivery (expected)
  • Checkpoint failed during processing

Solutions:

  1. Enable exactly-once (if available):
    config:
      delivery: exactly_once
    
  2. Use event IDs for deduplication:
    let event_id = event.metadata().event_id();
    if !seen.contains(&event_id) {
        process(event)?;
        seen.insert(event_id);
    }
    

Incorrect Data Types

Symptoms:

  • Numeric precision lost
  • Timestamps incorrectly parsed

Solutions:

Note: PostgreSQL NUMERIC values are now automatically preserved as strings (no intermediate f64 conversion), maintaining full decimal precision.

config:
  # Preserve decimal precision (default for PostgreSQL NUMERIC)
  decimal_handling: string  # or: precise, double
  
  # Timestamp handling
  time_precision: microseconds
  timezone: UTC

Health Checks

Basic Health Check

pub async fn health_check(cdc: &impl CdcSource) -> Result<(), Error> {
    // Check connection
    if !cdc.is_connected() {
        return Err(Error::NotConnected);
    }
    
    // Check lag
    let metrics = cdc.extended_metrics();
    if metrics.lag_milliseconds() > 60_000 {
        return Err(Error::HighLag(metrics.lag_milliseconds()));
    }
    
    // Check recent activity
    if let Some(last_event) = metrics.last_event_time() {
        if last_event.elapsed() > Duration::from_secs(300) {
            return Err(Error::Stale);
        }
    }
    
    Ok(())
}

Kubernetes Liveness Probe

livenessProbe:
  httpGet:
    path: /health/live
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  failureThreshold: 3

readinessProbe:
  httpGet:
    path: /health/ready
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5
  failureThreshold: 3

Alert Rules

groups:
- name: rivven-cdc-alerts
  rules:
  - alert: CdcHighLag
    expr: rivven_cdc_lag_milliseconds > 60000
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "CDC lag is high"
      description: "CDC connector  has ms lag"
      
  - alert: CdcDisconnected
    expr: rivven_cdc_connected == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "CDC connector disconnected"
      description: "CDC connector  lost connection"
      
  - alert: CdcErrorRate
    expr: rate(rivven_cdc_errors_total[5m]) > 0.1
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "CDC error rate elevated"

Recovery Procedures

Force Reconnection

// Graceful reconnect
cdc.disconnect().await?;
cdc.connect().await?;

// Or force reset
cdc.reset().await?;

Reset Offset

// Reset to beginning (re-snapshot)
cdc.reset_offset(Offset::Beginning).await?;

// Reset to specific position (PostgreSQL)
cdc.reset_offset(Offset::Lsn("0/1234567".parse()?)).await?;

// Reset to timestamp
cdc.reset_offset(Offset::Timestamp(some_time)).await?;

Emergency Cleanup (PostgreSQL)

-- List all Rivven slots
SELECT slot_name, active, restart_lsn 
FROM pg_replication_slots 
WHERE slot_name LIKE 'rivven%';

-- Drop inactive slots
SELECT pg_drop_replication_slot(slot_name)
FROM pg_replication_slots
WHERE slot_name LIKE 'rivven%' AND NOT active;

-- Terminate stuck connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE usename = 'rivven' AND state = 'idle';

Emergency Cleanup (MySQL)

-- Show replication connections
SHOW PROCESSLIST;

-- Kill stuck connection
KILL <process_id>;

-- Reset binary log position (careful!)
RESET MASTER;

Debug Logging

Enable Verbose Logging

// Rust
use tracing_subscriber::EnvFilter;

tracing_subscriber::fmt()
    .with_env_filter(EnvFilter::new("rivven_cdc=debug"))
    .init();

Or via environment:

export RUST_LOG=rivven_cdc=debug

Log Levels

Level Use Case
error Failures only
warn Recoverable issues
info Normal operations (default)
debug Detailed diagnostics
trace All messages including raw data

Event Tracing

let config = CdcConfig::builder()
    .trace_events(true)
    .build();

// Events will include trace IDs
let event = cdc.next().await?;
println!("Trace ID: {:?}", event.trace_id());

Getting Help

Information to Collect

When reporting issues, include:

  1. Configuration (redact passwords)
  2. Error message and stack trace
  3. Relevant metrics from Prometheus
  4. Database version and settings
  5. Steps to reproduce

Debug Report

// Generate debug report
let report = cdc.debug_report().await?;
println!("{}", report);

This includes:

  • Connection status
  • Current position
  • Recent errors
  • Performance metrics
  • Configuration (sanitized)

Next Steps


Back to top

Copyright © 2026 Rivven Contributors. Licensed under the Apache License 2.0.