Getting Started

Get Rivven up and running in minutes.

Table of contents

  1. Installation
    1. From Crates.io
    2. From Source
    3. Docker
  2. Starting the Broker
    1. Basic Startup
    2. Custom Configuration
    3. With Web Dashboard
  3. Rivven Connect
    1. Quick Start
    2. Example Configuration
    3. Validate Configuration
  4. Basic Operations
    1. Topic Management
    2. Publishing Messages
    3. Consuming Messages
    4. Consumer Groups
  5. Rust Client
    1. Add Dependency
    2. Producer Example
    3. High-Performance Producer
    4. Consumer Example
  6. Python Client
    1. Installation
    2. Basic Usage
    3. Async Iterator Pattern
    4. Authentication
    5. TLS Connection
    6. Transactions (Exactly-Once Semantics)
    7. Admin Operations
  7. Schema Registry
    1. Start the Registry
    2. Register and Query Schemas
    3. Programmatic Usage
  8. Next Steps

Installation

From Crates.io

cargo install rivven rivvend rivven-connect rivven-schema

From Source

git clone https://github.com/hupe1980/rivven
cd rivven
cargo build --release

# Binaries are in target/release/
./target/release/rivvend --help
./target/release/rivven --help
./target/release/rivven-connect --help
./target/release/rivven-schema --help

Docker

# Pull all images
docker pull ghcr.io/hupe1980/rivvend:latest
docker pull ghcr.io/hupe1980/rivven-connect:latest
docker pull ghcr.io/hupe1980/rivven-schema:latest

# Start broker
docker run -d -p 9092:9092 -p 9094:9094 ghcr.io/hupe1980/rivvend:latest --dashboard

# Start schema registry (optional)
docker run -d -p 8081:8081 ghcr.io/hupe1980/rivven-schema:latest serve --port 8081

Starting the Broker

Basic Startup

rivvend

The broker starts with sensible defaults:

  • Listen address: 0.0.0.0:9092
  • Data directory: ./data
  • Max message size: 10 MB
  • Authentication: required (use --no-require-auth for local development)

Authentication is enabled by default. For quick local development without credentials, start with rivvend --no-require-auth.

Custom Configuration

rivvend \
  --bind 0.0.0.0:9092 \
  --data-dir /var/lib/rivven \
  --max-message-size 16777216

With Web Dashboard

The dashboard is embedded in the binary when built with the dashboard feature:

# Start server with dashboard enabled
rivvend --data-dir ./data

# Dashboard available at http://localhost:8080/

Note: The dashboard is embedded during the build. See Dashboard for build instructions.


Rivven Connect

Rivven Connect manages data pipelines with sources (data ingestion) and sinks (data export).

Quick Start

# Start broker
rivvend --data-dir ./data

# Run connectors (topics auto-created!)
rivven-connect run --config connect.yaml

Example Configuration

The configuration defines sources (publish to broker) and sinks (consume from broker):

# Architecture: Sources → Broker Topics → Sinks
# The broker is ALWAYS in the middle for durability and replay

version: "1.0"

broker:
  address: localhost:9092

# Sources: read from external systems, publish to broker topics
sources:
  demo:
    connector: datagen
    topic: demo-events
    config:
      pattern: orders
      events_per_second: 3
      cdc_mode: true

# Sinks: consume from broker topics, write to external systems
sinks:
  console:
    connector: stdout
    topics: [demo-events]
    consumer_group: demo-sink
    config:
      format: pretty

Validate Configuration

rivven-connect validate --config connect.yaml

Output:

✓ Configuration valid!

Broker:
  Bootstrap servers:
    - 127.0.0.1:9092

Topic Settings:
  Auto-create: enabled
  Default partitions: 3

Sources (1 enabled):
  ✓ demo (datagen) → topic: demo-events (3 partitions)

Sinks (1 enabled):
  ✓ console (stdout) ← topics: ["demo-events"]

Basic Operations

Topic Management

# Create a topic
rivven topic create events

# Create with partitions
rivven topic create orders --partitions 3

# List topics
rivven topic list

# Delete a topic
rivven topic delete events

Publishing Messages

# Simple message
rivven produce events "Hello, World!"

# From stdin
echo '{"user": "alice", "action": "login"}' | rivven produce events

# Multiple messages
cat events.jsonl | rivven produce events

Consuming Messages

# Consume from beginning
rivven consume events

# Consume from a specific offset
rivven consume events --offset 100

# Consume with consumer group
rivven consume events --group my-app

Consumer Groups

# List consumer groups
rivven group list

# Describe a group
rivven group describe my-app

# Delete a group
rivven group delete my-app

Rust Client

Add Dependency

[dependencies]
rivven-client = "0.0.22"
tokio = { version = "1", features = ["full"] }

Producer Example

use rivven_client::Client;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut client = Client::connect("localhost:9092").await?;
    
    // Create topic
    client.create_topic("events", Some(3)).await?;
    
    // Publish message
    let offset = client.publish("events", b"Hello, Rivven!").await?;
    println!("Published at offset: {}", offset);
    
    Ok(())
}

High-Performance Producer

use rivven_client::{Producer, ProducerConfig, CompressionType};
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create producer with batching, compression, and auth
    let config = ProducerConfig::builder()
        .bootstrap_servers(vec!["localhost:9092".to_string()])
        .batch_size(16384)           // 16 KB batches
        .linger_ms(5)                // 5ms linger for batching
        .compression_type(CompressionType::Lz4)  // LZ4 batch compression
        .auth("producer-app", "secure-password")  // SCRAM-SHA-256 auth
        .metadata_max_age(std::time::Duration::from_secs(300)) // 5 min metadata cache
        .max_in_flight_requests(5)   // Memory-bounded backpressure
        .build();
    
    // Producer::new() connects with auto-handshake and auto-authentication
    let producer = Arc::new(Producer::new(config).await?);
    
    // Simple send (uses murmur2 partitioning like Kafka)
    let metadata = producer.send("events", b"value").await?;
    println!("Published at partition {} offset {}", metadata.partition, metadata.offset);
    
    // Send with key for consistent partitioning
    let metadata = producer.send_with_key("events", Some("user-123"), b"event").await?;
    
    // Share across tasks for parallel publishing
    for i in 0..100 {
        let producer = Arc::clone(&producer);
        tokio::spawn(async move {
            producer.send("events", format!("msg-{}", i)).await
        });
    }
    
    // Flush ensures all pending records are delivered
    producer.flush().await?;
    
    // Check statistics
    let stats = producer.stats();
    println!("Published {} records, {} delivered", stats.records_sent, stats.records_delivered);
    
    producer.close().await;
    Ok(())
}

Consumer Example

use rivven_client::Client;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut client = Client::connect("localhost:9092").await?;
    
    // Consume messages
    let messages = client.consume("events", 0, 0, 100).await?;
    
    for msg in messages {
        println!("Offset {}: {:?}", msg.offset, msg.value);
    }
    
    Ok(())
}

Python Client

High-performance Python bindings with full async/await support.

Installation

pip install rivven

Basic Usage

import asyncio
import rivven

async def main():
    # Connect to Rivven
    client = await rivven.connect("localhost:9092")
    
    # Create a topic
    await client.create_topic("events", partitions=3)
    
    # Produce messages
    producer = await client.producer("events")
    offset = await producer.send(b"Hello from Python!", key=b"key-1")
    print(f"Published at offset: {offset}")
    await producer.close()
    
    # Consume messages
    consumer = client.consumer("events", group_id="my-group")
    messages = await consumer.fetch(max_messages=10)
    for msg in messages:
        print(f"Offset {msg.offset}: {msg.value_str()}")
    await consumer.commit()

asyncio.run(main())

Async Iterator Pattern

import asyncio
import rivven

async def stream():
    client = await rivven.connect("localhost:9092")
    consumer = client.consumer("events", group_id="my-group")
    
    async for message in consumer:
        print(f"Received: {message.value_str()}")
        await consumer.commit()

asyncio.run(stream())

Authentication

import asyncio
import rivven

async def authenticated():
    client = await rivven.connect("localhost:9092")
    
    # Simple authentication
    await client.authenticate("username", "password")
    
    # Or SCRAM-SHA-256
    await client.authenticate_scram("username", "password")
    
    topics = await client.list_topics()

asyncio.run(authenticated())

TLS Connection

import asyncio
import rivven

async def secure():
    client = await rivven.connect_tls(
        "localhost:9093",
        ca_cert="/path/to/ca.crt",
        client_cert="/path/to/client.crt",  # Optional: mTLS
        client_key="/path/to/client.key",   # Optional: mTLS
    )
    
    topics = await client.list_topics()

asyncio.run(secure())

Transactions (Exactly-Once Semantics)

import asyncio
import rivven

async def transactional():
    client = await rivven.connect("localhost:9092")
    
    # Initialize transactional producer
    producer_id, epoch = await client.init_producer_id("my-txn-id")
    
    try:
        await client.begin_transaction("my-txn-id", producer_id, epoch)
        
        await client.publish_idempotent(
            topic="events",
            value=b"message",
            producer_id=producer_id,
            epoch=epoch,
            sequence=0,
            key=b"key"
        )
        
        await client.commit_transaction("my-txn-id", producer_id, epoch)
    except Exception:
        await client.abort_transaction("my-txn-id", producer_id, epoch)
        raise

asyncio.run(transactional())

Admin Operations

import asyncio
import rivven

async def admin():
    client = await rivven.connect("localhost:9092")
    
    # Topic management
    await client.create_topic("my-topic", partitions=3)
    topics = await client.list_topics()
    
    # Topic configuration
    configs = await client.describe_topic_configs("my-topic")
    await client.alter_topic_config("my-topic", "retention.ms", "86400000")
    
    # Partition management
    await client.create_partitions("my-topic", new_total=6)
    
    # Offset management
    offset = await client.get_offset_for_timestamp("my-topic", 0, 1699900000000)
    await client.delete_records("my-topic", 0, before_offset=100)
    
    # Consumer groups
    groups = await client.list_groups()
    await client.describe_group("my-group")
    await client.commit_offset("my-group", "my-topic", 0, 100)
    committed = await client.get_offset("my-group", "my-topic", 0)

asyncio.run(admin())

For complete API reference, see the Python SDK README.


Schema Registry

Rivven includes a high-performance Schema Registry for schema management.

Start the Registry

# Start schema registry (in-memory storage for development)
rivven-schema serve --port 8081

# With broker-backed storage (production)
rivven-schema serve --port 8081 --broker localhost:9092

Register and Query Schemas

# Register a schema
rivven-schema register --url http://localhost:8081 --subject user-value \
  --schema '{"type":"record","name":"User","fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}'

# List subjects
rivven-schema subjects --url http://localhost:8081

# Get schema by ID
rivven-schema get --url http://localhost:8081 --id 1

Programmatic Usage

use rivven_schema::{SchemaRegistry, RegistryConfig, SchemaType};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let registry = SchemaRegistry::new(RegistryConfig::memory()).await?;
    
    let schema_id = registry.register(
        "user-value",
        SchemaType::Avro,
        r#"{"type":"record","name":"User","fields":[...]}"#
    ).await?;
    
    println!("Registered schema ID: {}", schema_id.0);
    Ok(())
}

For more details, see the Schema Registry guide.


Next Steps


Back to top

Copyright © 2026 Rivven Contributors. Licensed under the Apache License 2.0.