Getting Started with Krafka

This guide will help you get up and running with Krafka in just a few minutes.

Installation

Add Krafka to your Cargo.toml:

[dependencies]
krafka = "0.2"
tokio = { version = "1", features = ["full"] }

Prerequisites

  • Rust 1.85 or later (MSRV 1.85)
  • A running Kafka cluster (or use Docker)

Running Kafka with Docker

# Start Kafka with Docker Compose
docker run -d --name kafka \
  -p 9092:9092 \
  -e KAFKA_BROKER_ID=1 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  confluentinc/cp-kafka:latest

Or use our docker-compose.yml:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Your First Producer

use krafka::producer::Producer;
use krafka::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    // Create a producer
    let producer = Producer::builder()
        .bootstrap_servers("localhost:9092")
        .client_id("my-producer")
        .build()
        .await?;

    // Send a message
    let metadata = producer
        .send("my-topic", Some(b"key"), b"Hello, Kafka!")
        .await?;

    println!(
        "Message sent to partition {} at offset {}",
        metadata.partition, metadata.offset
    );

    // Close the producer
    producer.close().await;

    Ok(())
}

Your First Consumer

use krafka::consumer::Consumer;
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    // Create a consumer
    let consumer = Consumer::builder()
        .bootstrap_servers("localhost:9092")
        .group_id("my-group")
        .client_id("my-consumer")
        .build()
        .await?;

    // Subscribe to topics
    consumer.subscribe(&["my-topic"]).await?;

    // Poll for messages
    loop {
        let records = consumer.poll(Duration::from_secs(1)).await?;
        
        for record in records {
            println!(
                "Received: topic={}, partition={}, offset={}, key={:?}, value={:?}",
                record.topic,
                record.partition,
                record.offset,
                record.key.map(|k| String::from_utf8_lossy(&k).to_string()),
                String::from_utf8_lossy(&record.value)
            );
        }
    }
}

Using the Admin Client

use krafka::admin::{AdminClient, NewTopic};
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let admin = AdminClient::builder()
        .bootstrap_servers("localhost:9092")
        .build()
        .await?;

    // Create a topic
    let topic = NewTopic::new("new-topic", 3, 1)
        .with_config("retention.ms", "86400000");

    let results = admin
        .create_topics(vec![topic], Duration::from_secs(30))
        .await?;

    for result in results {
        match result.error {
            None => println!("Created topic: {}", result.name),
            Some(e) => println!("Failed to create {}: {}", result.name, e),
        }
    }

    // List topics
    let topics = admin.list_topics().await?;
    println!("Topics: {:?}", topics);

    Ok(())
}

Configuration Options

See the Configuration Reference for all available options.

Common Producer Options

use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .acks(Acks::All)                          // Wait for all replicas
    .compression(Compression::Lz4)             // Use LZ4 compression
    .batch_size(65536)                         // 64KB batches
    .linger(Duration::from_millis(5))          // Wait up to 5ms for batching
    .build()
    .await?;

Common Consumer Options

use krafka::consumer::{Consumer, AutoOffsetReset};
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .auto_offset_reset(AutoOffsetReset::Earliest)  // Start from beginning
    .enable_auto_commit(true)                       // Auto-commit offsets
    .auto_commit_interval(Duration::from_secs(5))   // Commit every 5 seconds
    .build()
    .await?;

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.