Producer Guide
This guide covers advanced producer usage, including batching, partitioning, compression, and error handling.
Overview
The Krafka producer is an async-native, high-performance message producer for Apache Kafka. Key features include:
- Async/await API with Tokio
- Automatic batching for throughput
- Multiple compression codecs (gzip, snappy, lz4, zstd)
- Flexible partitioning strategies
- Automatic metadata refresh
- Interceptor hooks for observability
Basic Usage
use krafka::producer::Producer;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// Simple send
producer.send("topic", None, b"value").await?;
// Send with key (for partitioning)
producer.send("topic", Some(b"key"), b"value").await?;
producer.close().await;
Ok(())
}
Authentication
Connect to secured Kafka clusters using SASL or TLS:
use krafka::producer::Producer;
// SASL/SCRAM-SHA-256
let producer = Producer::builder()
.bootstrap_servers("broker:9093")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// AWS MSK IAM
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let producer = Producer::builder()
.bootstrap_servers("broker:9094")
.auth(auth)
.build()
.await?;
See the Authentication Guide for all supported mechanisms.
Producer Configuration
Acknowledgments
Control durability vs. latency with the acks setting:
use krafka::producer::{Producer, Acks};
// Fire and forget (lowest latency, risk of data loss)
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::None)
.build()
.await?;
// Wait for leader (balanced)
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::Leader)
.build()
.await?;
// Wait for all in-sync replicas (highest durability)
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::All)
.build()
.await?;
Compression
Choose the right compression codec for your workload:
use krafka::producer::Producer;
use krafka::protocol::Compression;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.compression(Compression::Lz4) // Fast compression
.build()
.await?;
| Codec | Speed | Ratio | Use Case |
|---|---|---|---|
| None | N/A | 1:1 | Low CPU, high bandwidth |
| Gzip | Slow | Best | Archival, infrequent writes |
| Snappy | Fast | Good | General purpose |
| LZ4 | Fastest | Good | High-throughput, real-time |
| Zstd | Medium | Best | Best balance of speed/ratio |
Batching
Batching improves throughput by combining multiple messages:
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.batch_size(65536) // Max bytes per batch (64KB)
.linger(Duration::from_millis(5)) // Wait up to 5ms for more messages
.build()
.await?;
Linger Timer
When linger is set (> 0ms), the producer uses a background accumulator to batch records:
- Records are accumulated per partition
- Batches are sent when either:
- The batch reaches
batch_sizebytes, or - The
lingertimer expires
- The batch reaches
- This reduces the number of requests, improving throughput
For ultra-low latency (linger = 0), records are sent immediately without batching.
Note:
batch_sizemust be at least 1. Settingbatch_sizeto 0 will cause the builder to return a configuration error.
// High-throughput configuration
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.batch_size(131072) // 128KB batches
.linger(Duration::from_millis(10)) // Wait up to 10ms
.compression(Compression::Lz4) // Fast compression
.build()
.await?;
// Low-latency configuration
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.linger(Duration::from_millis(0)) // No batching, send immediately
.build()
.await?;
Memory Backpressure
The producer limits memory usage to prevent unbounded growth under high load:
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.buffer_memory(64 * 1024 * 1024) // 64MB buffer limit
.max_block(Duration::from_secs(30)) // Wait up to 30s when buffer full
.build()
.await?;
| Option | Default | Description |
|---|---|---|
buffer_memory |
32 MB | Maximum total memory for buffering records |
max_block |
60s | Maximum time to block when buffer is full |
When the buffer memory limit is reached, send() will return an error. This provides backpressure to prevent OOM conditions under sustained high load.
Flushing
When using linger-based batching, call flush() to ensure all pending records are sent:
// Send multiple records
for i in 0..100 {
producer.send("topic", Some(format!("key-{}", i).as_bytes()), b"value").await?;
}
// Ensure all records are sent before closing
producer.flush().await?;
producer.close().await;
Partitioning
Default Partitioner
The default partitioner uses murmur2 hashing (Java-compatible) for keyed messages and round-robin for null keys:
// Messages with the same key go to the same partition
producer.send("topic", Some(b"user-123"), b"event1").await?;
producer.send("topic", Some(b"user-123"), b"event2").await?; // Same partition
// Messages without keys are distributed round-robin
producer.send("topic", None, b"event").await?;
Custom Partitioners
Krafka provides several built-in partitioners:
use krafka::producer::{
DefaultPartitioner,
RoundRobinPartitioner,
StickyPartitioner,
HashPartitioner,
};
// Round-robin: ignores keys, distributes evenly
let partitioner = RoundRobinPartitioner::new();
// Sticky: sticks to one partition, auto-advances after batch_threshold records (default 100)
let partitioner = StickyPartitioner::new();
// Sticky with custom batch threshold
let partitioner = StickyPartitioner::with_batch_threshold(500);
// Hash: uses Rust's default hasher instead of murmur2
let partitioner = HashPartitioner::new();
Implementing Custom Partitioners
use krafka::producer::Partitioner;
use krafka::PartitionId;
struct RegionPartitioner {
region_to_partition: std::collections::HashMap<String, PartitionId>,
}
impl Partitioner for RegionPartitioner {
fn partition(
&self,
topic: &str,
key: Option<&[u8]>,
partition_count: usize,
) -> PartitionId {
if let Some(key) = key {
if let Ok(region) = std::str::from_utf8(key) {
if let Some(&partition) = self.region_to_partition.get(region) {
return partition % partition_count as i32;
}
}
}
// Fallback to first partition
0
}
}
Error Handling
Built-in Retry
The producer automatically retries transient failures (e.g., NotLeaderForPartition, network timeouts) using the configured retry policy. On each retriable error, the producer refreshes metadata to discover the new partition leader before retrying with exponential backoff.
Configure retries via the builder:
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.retries(5) // Max retry attempts
.retry_backoff(Duration::from_millis(100)) // Initial backoff
.build()
.await?;
// send() automatically retries on transient failures
producer.send("topic", None, b"value").await?;
Manual Retry
For additional retry control beyond the built-in behavior, handle errors explicitly:
use krafka::producer::Producer;
use krafka::error::{KrafkaError, Result};
async fn send_with_retry(
producer: &Producer,
topic: &str,
key: Option<&[u8]>,
value: &[u8],
max_retries: u32,
) -> Result<()> {
let mut attempts = 0;
loop {
match producer.send(topic, key, value).await {
Ok(metadata) => {
println!("Sent to {}:{}", metadata.partition, metadata.offset);
return Ok(());
}
Err(e) if e.is_retriable() && attempts < max_retries => {
println!("Send failed (attempt {}): {}", attempts + 1, e);
attempts += 1;
tokio::time::sleep(std::time::Duration::from_millis(100 * attempts as u64)).await;
}
Err(e) => return Err(e),
}
}
}
Using RetryPolicy
For more sophisticated retry handling with exponential backoff:
use krafka::producer::{Producer, RetryPolicy, RetryContext};
use krafka::error::Result;
async fn send_with_policy(
producer: &Producer,
topic: &str,
value: &[u8],
) -> Result<()> {
let policy = RetryPolicy::new()
.with_max_retries(5)
.with_initial_backoff(std::time::Duration::from_millis(100))
.with_max_backoff(std::time::Duration::from_secs(10))
.with_backoff_multiplier(2.0)
.with_jitter_factor(0.1); // Add 10% jitter to prevent thundering herd
let mut ctx = RetryContext::new(policy, "send_message");
loop {
match producer.send(topic, None, value).await {
Ok(metadata) => {
ctx.record_success();
return Ok(());
}
Err(e) => {
if let Some(backoff) = ctx.record_failure(&e) {
ctx.wait(backoff).await;
} else {
return Err(e);
}
}
}
}
}
Performance Tips
High Throughput
For maximum throughput:
use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::Leader) // Don't wait for all replicas
.compression(Compression::Lz4) // Fast compression
.batch_size(1048576) // 1MB batches
.linger(Duration::from_millis(10)) // Allow batching
.build()
.await?;
Low Latency
For minimum latency:
use krafka::producer::{Producer, Acks};
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::None) // Don't wait for acks
.batch_size(1) // No batching
.linger(Duration::ZERO) // Send immediately
.build()
.await?;
Durability
For maximum durability:
use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::All) // Wait for all ISR
.retries(10) // Retry on failure
.build()
.await?;
Note: For idempotent/exactly-once semantics, use
TransactionalProducerinstead. Theenable_idempotence()method on the regularProduceris deprecated since v0.2.0.TransactionalProducerhandles PID/epoch allocation, sequence numbers, and transactional batch marking automatically viainit_transactions().
Concurrency Control
The producer enforces max_in_flight to limit concurrent in-flight produce requests.
This is critical for ordering guarantees and is implemented via a semaphore:
use krafka::producer::{Producer, Acks};
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::All)
.max_in_flight(1) // Strict ordering (at most 1 concurrent send)
.build()
.await?;
Graceful Shutdown
Always close producers properly to flush pending messages. The close() method guarantees that all pending batches in the accumulator are flushed to brokers before connections are torn down:
use krafka::producer::Producer;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// ... send messages ...
// Flush and close — waits for all in-flight batches to complete
producer.flush().await?;
producer.close().await;
Transactional Producer
For exactly-once semantics across multiple partitions and topics, use the TransactionalProducer.
This is the recommended approach for idempotent and exactly-once production.
The transactional producer:
- Automatically obtains a Producer ID (PID) and epoch from the broker via
InitProducerId - Sets
producer_id,producer_epoch, andbase_sequenceon every record batch - Marks batches as transactional (attribute bit 0x10)
- Tracks sequence numbers per topic-partition for idempotent delivery
Basic Usage
use krafka::producer::TransactionalProducer;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Create transactional producer with unique ID
let producer = TransactionalProducer::builder()
.bootstrap_servers("localhost:9092")
.transactional_id("my-unique-transaction-id")
.build()
.await?;
// Initialize transactions (once per producer)
producer.init_transactions().await?;
// Start transaction
producer.begin_transaction()?;
// Send messages atomically
producer.send("topic-a", Some(b"key1"), b"value1").await?;
producer.send("topic-b", Some(b"key2"), b"value2").await?;
// Commit transaction (all or nothing)
producer.commit_transaction().await?;
Ok(())
}
Configuration
use krafka::producer::TransactionalProducer;
use krafka::protocol::Compression;
use std::time::Duration;
let producer = TransactionalProducer::builder()
.bootstrap_servers("localhost:9092")
.transactional_id("order-processor-1")
.client_id("my-app")
.transaction_timeout_ms(60000) // 60 second timeout
.request_timeout(Duration::from_secs(30))
.compression(Compression::Lz4)
.build()
.await?;
Authentication
Connect a transactional producer to secured Kafka clusters:
use krafka::producer::TransactionalProducer;
// SASL/SCRAM-SHA-256
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9093")
.transactional_id("my-txn-id")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// Or use AuthConfig for advanced auth (e.g., AWS MSK IAM)
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9094")
.transactional_id("my-txn-id")
.auth(auth)
.build()
.await?;
See the Authentication Guide for all supported mechanisms.
Transaction Lifecycle
- Initialize: Call
init_transactions()once when producer starts - Begin: Call
begin_transaction()to start a new transaction - Send: Send messages with
send()orsend_record() - End: Call
commit_transaction()orabort_transaction() - Close: Call
close()when done — aborts any active transaction and cleans up resources
// Error handling with abort
producer.begin_transaction()?;
match do_work(&producer).await {
Ok(()) => producer.commit_transaction().await?,
Err(e) => {
producer.abort_transaction().await?;
return Err(e);
}
}
// When finished with the producer, always close it
producer.close().await;
Graceful Shutdown (Transactional)
Always close transactional producers properly. The close() method:
- Aborts any active transaction to avoid dangling open transactions on the broker
- Transitions the producer to
FatalErrorstate, preventing further use - Closes the underlying connection pool
// Graceful shutdown
producer.close().await;
// Producer is no longer usable after close()
Built-in Retry Logic
The transactional producer automatically retries sends on transient failures:
- Up to 3 retry attempts per send with exponential backoff (100ms–5s)
- Metadata is refreshed on transient errors before retrying
OutOfOrderSequenceNumbererrors trigger a sequence number reset and retry- Non-retriable errors (auth failures, invalid topics) fail immediately
Timestamps
Both Producer and TransactionalProducer propagate the timestamp field from ProducerRecord to the Kafka record batch. If set, the timestamp is used as the base_timestamp of the record batch:
use krafka::producer::ProducerRecord;
let mut record = ProducerRecord::new("my-topic", b"value".to_vec());
record.timestamp = Some(1700000000000); // epoch millis
producer.send_record(record).await?;
Note: If
timestampis not set, the broker defaults apply (typicallyLogAppendTimeorCreateTimedepending on topic configuration).
Consume-Transform-Produce (Exactly-Once)
For read-process-write patterns with exactly-once guarantees:
use krafka::producer::TransactionalProducer;
use std::collections::HashMap;
// Commit consumer offsets atomically with produce
producer.begin_transaction()?;
// Process records and produce output
for record in consumer_records {
let output = transform(&record)?;
producer.send("output-topic", record.key, &output).await?;
}
// Commit offsets as part of transaction
let mut offsets = HashMap::new();
offsets.insert(topic_partition, offset_and_metadata);
producer.send_offsets_to_transaction(&offsets, "consumer-group").await?;
// Atomic commit of messages and offsets
producer.commit_transaction().await?;
Transaction States
The producer maintains a state machine with atomic CAS (compare-and-swap) transitions for thread safety:
| State | Description |
|---|---|
Uninitialized |
Producer created, init_transactions() not called |
Ready |
Ready to begin a new transaction |
InTransaction |
Transaction in progress |
Committing |
Transaction being committed |
Aborting |
Transaction being aborted |
FatalError |
Unrecoverable error, producer must be recreated |
Note: State transitions are protected by atomic compare-and-swap operations, preventing race conditions when multiple tasks interact with the transactional producer concurrently.
Producer Interceptors
Interceptors allow you to observe and modify records before they are sent, and observe the acknowledgement (or error) after a send completes. See the Interceptors Guide for full details.
use krafka::interceptor::ProducerInterceptor;
use krafka::producer::{Producer, ProducerRecord, RecordMetadata};
use krafka::error::KrafkaError;
use std::sync::Arc;
#[derive(Debug)]
struct AuditInterceptor;
impl ProducerInterceptor for AuditInterceptor {
fn on_send(&self, record: &mut ProducerRecord) {
// Add a tracing header to every record
record.headers.push(("x-trace-id".to_string(), b"abc123".to_vec()));
}
fn on_acknowledgement(&self, metadata: &RecordMetadata, error: Option<&KrafkaError>) {
if let Some(err) = error {
eprintln!("Send failed: {}", err);
} else {
println!("Sent to {}:{}", metadata.topic, metadata.partition);
}
}
}
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.interceptor(Arc::new(AuditInterceptor))
.build()
.await?;
Next Steps
- Interceptors Guide - Producer and consumer interceptor hooks
- Consumer Guide - Learn about consuming messages
- Configuration Reference - All producer options
- Architecture Overview - How the producer works internally