Producer Guide
This guide covers advanced producer usage, including batching, partitioning, compression, and error handling.
Overview
The Krafka producer is an async-native, high-performance message producer for Apache Kafka. Key features include:
- Async/await API with Tokio
- Automatic batching for throughput
- Multiple compression codecs (gzip, snappy, lz4, zstd)
- Flexible partitioning strategies
- Automatic metadata refresh
- Interceptor hooks for observability
Basic Usage
use krafka::producer::Producer;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// Simple send
producer.send("topic", None, b"value").await?;
// Send with key (for partitioning)
producer.send("topic", Some(b"key"), b"value").await?;
producer.close().await;
Ok(())
}
Authentication
Connect to secured Kafka clusters using SASL or TLS:
use krafka::producer::Producer;
// SASL/SCRAM-SHA-256
let producer = Producer::builder()
.bootstrap_servers("broker:9093")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// AWS MSK IAM
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let producer = Producer::builder()
.bootstrap_servers("broker:9094")
.auth(auth)
.build()
.await?;
See the Authentication Guide for all supported mechanisms.
Producer Configuration
Acknowledgments
Control durability vs. latency with the acks setting:
use krafka::producer::{Producer, Acks};
// Fire and forget (lowest latency, risk of data loss)
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::None)
.build()
.await?;
// Wait for leader (balanced)
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::Leader)
.build()
.await?;
// Wait for all in-sync replicas (highest durability)
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::All)
.build()
.await?;
Compression
Choose the right compression codec for your workload:
use krafka::producer::Producer;
use krafka::protocol::Compression;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.compression(Compression::Lz4) // Fast compression
.build()
.await?;
| Codec | Cargo Feature | Speed | Ratio | Use Case |
|---|---|---|---|---|
| None | — | N/A | 1:1 | Low CPU, high bandwidth |
| Gzip | gzip |
Slow | Best | Archival, infrequent writes |
| Snappy | snappy |
Fast | Good | General purpose |
| LZ4 | lz4 |
Fastest | Good | High-throughput, real-time |
| Zstd | zstd |
Medium | Best | Best balance of speed/ratio |
All codecs are enabled by default via the compression convenience feature.
To trim binary size or avoid the C toolchain (needed by zstd), disable defaults
and select only the codecs you need:
krafka = { version = "0.5", default-features = false, features = ["lz4"] }
Batching
Batching improves throughput by combining multiple messages:
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.batch_size(65536) // Max bytes per batch (64KB)
.linger(Duration::from_millis(5)) // Wait up to 5ms for more messages
.build()
.await?;
Linger Timer
When linger is set (> 0ms), the producer uses a background accumulator to batch records:
- Records are accumulated per partition
- Batches are sent when either:
- The batch reaches
batch_sizebytes, or - The
lingertimer expires
- The batch reaches
- This reduces the number of requests, improving throughput
For ultra-low latency (linger = 0), records are sent immediately without batching.
Note:
batch_sizemust be at least 1. Settingbatch_sizeto 0 will cause the builder to return a configuration error.
// High-throughput configuration
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.batch_size(131072) // 128KB batches
.linger(Duration::from_millis(10)) // Wait up to 10ms
.compression(Compression::Lz4) // Fast compression
.build()
.await?;
// Low-latency configuration
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.linger(Duration::from_millis(0)) // No batching, send immediately
.build()
.await?;
Memory Backpressure
The producer limits memory usage to prevent unbounded growth under high load:
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.buffer_memory(64 * 1024 * 1024) // 64MB buffer limit
.max_block(Duration::from_secs(30)) // Wait up to 30s when buffer full
.build()
.await?;
| Option | Default | Description |
|---|---|---|
buffer_memory |
32 MB | Maximum total memory for buffering records |
max_block |
60s | Maximum time to block when buffer is full |
When batching/accumulation is enabled (i.e., linger > 0) and the buffer memory limit is reached, send() will block the caller for up to max_block waiting for in-flight batches to complete and free memory. If memory is still unavailable after the timeout, an error is returned. This provides backpressure matching the Kafka Java client’s max.block.ms behavior, preventing both OOM conditions and unnecessary record loss under bursty load. In direct-send mode (linger = 0), records bypass the accumulator, so buffer_memory/max_block do not apply.
Flushing
Call flush() whenever you need a durability barrier over records that have already been handed to the producer. This now covers both linger-based batching and direct-send mode (linger = 0):
// Send multiple records
for i in 0..100 {
producer.send("topic", Some(format!("key-{}", i).as_bytes()), b"value").await?;
}
// Ensure all records are sent before closing
producer.flush().await?;
producer.close().await;
Partitioning
Default Partitioner
The default partitioner uses murmur2 hashing (Java-compatible) for keyed messages and round-robin for null keys:
// Messages with the same key go to the same partition
producer.send("topic", Some(b"user-123"), b"event1").await?;
producer.send("topic", Some(b"user-123"), b"event2").await?; // Same partition
// Messages without keys are distributed round-robin
producer.send("topic", None, b"event").await?;
Custom Partitioners
Krafka provides several built-in partitioners:
use krafka::producer::{
DefaultPartitioner,
RoundRobinPartitioner,
StickyPartitioner,
HashPartitioner,
};
// Round-robin: ignores keys, distributes evenly
let partitioner = RoundRobinPartitioner::new();
// Sticky: sticks to one partition, auto-advances after batch_threshold records (default 100)
let partitioner = StickyPartitioner::new();
// Sticky with custom batch threshold
let partitioner = StickyPartitioner::with_batch_threshold(500);
// Hash: uses Rust's default hasher instead of murmur2
let partitioner = HashPartitioner::new();
Implementing Custom Partitioners
use krafka::producer::Partitioner;
use krafka::PartitionId;
struct RegionPartitioner {
region_to_partition: std::collections::HashMap<String, PartitionId>,
}
impl Partitioner for RegionPartitioner {
fn partition(
&self,
topic: &str,
key: Option<&[u8]>,
partition_count: usize,
) -> PartitionId {
if let Some(key) = key {
if let Ok(region) = std::str::from_utf8(key) {
if let Some(&partition) = self.region_to_partition.get(region) {
return partition % partition_count as i32;
}
}
}
// Fallback to first partition
0
}
}
Error Handling
Record Validation
Before sending, each ProducerRecord is validated against Kafka wire-format limits:
- Topic name: max 32,767 bytes (i16 limit)
- Key: max 2,147,483,647 bytes (i32 limit)
- Value: max 2,147,483,647 bytes (i32 limit)
- Header keys: max 2,147,483,647 bytes (i32 limit)
- Header values: max 2,147,483,647 bytes (i32 limit)
Oversized data returns a descriptive KrafkaError::protocol error instead of panicking.
Built-in Retry
The producer automatically retries transient failures (e.g., NotLeaderForPartition, network timeouts) using the configured retry policy. On each retriable error, the producer refreshes metadata to discover the new partition leader before retrying with exponential backoff.
Configure retries via the builder:
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.retries(5) // Max retry attempts
.retry_backoff(Duration::from_millis(100)) // Initial backoff
.build()
.await?;
// send() automatically retries on transient failures
producer.send("topic", None, b"value").await?;
Delivery Timeout
The delivery_timeout setting (analogous to the Java client’s delivery.timeout.ms) caps the total time from when a record enters the producer to when it must be acknowledged. This includes time spent in the accumulator’s linger window, backpressure waits, and all retry attempts.
use krafka::producer::Producer;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.delivery_timeout(Duration::from_secs(120)) // Total delivery budget
.linger(Duration::from_millis(5)) // Batching window
.retries(u32::MAX) // Retry until timeout
.build()
.await?;
When delivery_timeout is set, backoff durations are clamped to the remaining budget so the producer does not overshoot. If the budget is exhausted, the send fails immediately regardless of the remaining retry count.
Note: By default
lingeris0(no batching delay), so the delivery timeout is nearly equivalent to network time + retry time. Withlinger > 0, add the maximum linger window to your delivery timeout budget.
Manual Retry
For additional retry control beyond the built-in behavior, handle errors explicitly:
use krafka::producer::Producer;
use krafka::error::{KrafkaError, Result};
async fn send_with_retry(
producer: &Producer,
topic: &str,
key: Option<&[u8]>,
value: &[u8],
max_retries: u32,
) -> Result<()> {
let mut attempts = 0;
loop {
match producer.send(topic, key, value).await {
Ok(metadata) => {
println!("Sent to {}:{}", metadata.partition, metadata.offset);
return Ok(());
}
Err(e) if e.is_retriable() && attempts < max_retries => {
println!("Send failed (attempt {}): {}", attempts + 1, e);
attempts += 1;
tokio::time::sleep(std::time::Duration::from_millis(100 * attempts as u64)).await;
}
Err(e) => return Err(e),
}
}
}
Using RetryPolicy
For more sophisticated retry handling with exponential backoff:
use krafka::producer::{Producer, RetryPolicy, RetryContext};
use krafka::error::Result;
async fn send_with_policy(
producer: &Producer,
topic: &str,
value: &[u8],
) -> Result<()> {
let policy = RetryPolicy::new()
.with_max_retries(5)
.with_initial_backoff(std::time::Duration::from_millis(100))
.with_max_backoff(std::time::Duration::from_secs(10))
.with_backoff_multiplier(2.0)
.with_jitter_factor(0.1); // Add 10% jitter to prevent thundering herd
let mut ctx = RetryContext::new(policy, "send_message");
loop {
match producer.send(topic, None, value).await {
Ok(metadata) => {
ctx.record_success();
return Ok(());
}
Err(e) => {
if let Some(backoff) = ctx.record_failure(&e) {
ctx.wait(backoff).await;
} else {
return Err(e);
}
}
}
}
}
Performance Tips
High Throughput
For maximum throughput:
use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::Leader) // Don't wait for all replicas
.compression(Compression::Lz4) // Fast compression
.batch_size(1048576) // 1MB batches
.linger(Duration::from_millis(10)) // Allow batching
.build()
.await?;
Low Latency
For minimum latency:
use krafka::producer::{Producer, Acks};
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::None) // Don't wait for acks
.batch_size(1) // No batching
.linger(Duration::ZERO) // Send immediately
.build()
.await?;
Durability
For maximum durability:
use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.retries(10) // Retry on failure
.build()
.await?;
Idempotent by default (KIP-679): Since Kafka 3.0, idempotent production is the default. The regular
Producernow obtains a Producer ID viaInitProducerIdat startup, tracks sequence numbers per partition, and de-duplicates retries automatically.acks = Allandmax_in_flight <= 5are enforced when idempotent is enabled. TheInitProducerIdcall retries on retriable errors (e.g.CoordinatorLoadInProgress) with exponential backoff, rotating through available brokers on each attempt.Error handling:
OutOfOrderSequenceNumbertriggers a sequence reset and batch rebuild before retrying.DuplicateSequenceNumberis treated as success (broker already committed the batch; idempotent dedup worked). The returned offset is-1since the broker does not echo the original offset for duplicates.- Multi-record batches acknowledge the last sequence (
base + count − 1), matching the Kafka Java client’sProducerBatch.lastSequence()semantics.For cross-session exactly-once semantics (transactions), use
TransactionalProducer.
Concurrency Control
The producer enforces max_in_flight to limit concurrent in-flight produce requests.
This is critical for ordering guarantees and is implemented via a semaphore:
use krafka::producer::{Producer, Acks};
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.acks(Acks::All)
.max_in_flight(1) // Strict ordering (at most 1 concurrent send)
.build()
.await?;
Graceful Shutdown
Always close producers properly to flush pending messages. The close() method is a barrier over all started sends, not just batches still resident in the accumulator. It blocks new sends, waits for buffered and already-in-flight work to finish, then tears down connections. Calling close() more than once is a no-op:
use krafka::producer::Producer;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// ... send messages ...
// Flush and close — waits for all in-flight batches to complete
producer.flush().await?;
producer.close().await;
If you need a bounded shutdown window, use close_with_timeout() instead. On timeout, Krafka tears down the connection pool and returns a timeout error, causing any remaining in-flight work to fail fast instead of hanging shutdown indefinitely:
use std::time::Duration;
producer.close_with_timeout(Duration::from_secs(10)).await?;
Transactional Producer
For exactly-once semantics across multiple partitions and topics, use the TransactionalProducer.
This is the recommended approach for idempotent and exactly-once production.
The transactional producer:
- Automatically obtains a Producer ID (PID) and epoch from the broker via
InitProducerId - Sets
producer_id,producer_epoch, andbase_sequenceon every record batch - Marks batches as transactional (attribute bit 0x10)
- Tracks sequence numbers per topic-partition for idempotent delivery
Basic Usage
use krafka::producer::TransactionalProducer;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
// Create transactional producer with unique ID
let producer = TransactionalProducer::builder()
.bootstrap_servers("localhost:9092")
.transactional_id("my-unique-transaction-id")
.build()
.await?;
// Initialize transactions (once per producer)
producer.init_transactions().await?;
// Start transaction
producer.begin_transaction()?;
// Send messages atomically
producer.send("topic-a", Some(b"key1"), b"value1").await?;
producer.send("topic-b", Some(b"key2"), b"value2").await?;
// Commit transaction (all or nothing)
producer.commit_transaction().await?;
Ok(())
}
Configuration
use krafka::producer::TransactionalProducer;
use krafka::protocol::Compression;
use std::time::Duration;
let producer = TransactionalProducer::builder()
.bootstrap_servers("localhost:9092")
.transactional_id("order-processor-1")
.client_id("my-app")
.transaction_timeout_ms(60000) // 60 second timeout
.request_timeout(Duration::from_secs(30))
.compression(Compression::Lz4)
.build()
.await?;
Authentication
Connect a transactional producer to secured Kafka clusters:
use krafka::producer::TransactionalProducer;
// SASL/SCRAM-SHA-256
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9093")
.transactional_id("my-txn-id")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// Or use AuthConfig for advanced auth (e.g., AWS MSK IAM)
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9094")
.transactional_id("my-txn-id")
.auth(auth)
.build()
.await?;
See the Authentication Guide for all supported mechanisms.
Transaction Lifecycle
- Initialize: Call
init_transactions()once when producer starts - Begin: Call
begin_transaction()to start a new transaction - Send: Send messages with
send()orsend_record() - End: Call
commit_transaction()orabort_transaction() - Close: Call
close()when done — aborts any active transaction and cleans up resources
// Error handling with abort
producer.begin_transaction()?;
match do_work(&producer).await {
Ok(()) => producer.commit_transaction().await?,
Err(e) => {
producer.abort_transaction().await?;
return Err(e);
}
}
// When finished with the producer, always close it
producer.close().await;
Graceful Shutdown (Transactional)
Always close transactional producers properly. The close() method:
- Blocks new sends and waits for already-started transactional produce requests to finish
- Aborts any active transaction to avoid dangling open transactions on the broker
- Transitions the producer to
FatalErrorstate, preventing further use - Closes the underlying connection pool
- Is idempotent — calling it more than once is a no-op
// Graceful shutdown
producer.close().await;
// Producer is no longer usable after close()
For bounded shutdown windows, close_with_timeout() provides the same semantics with an explicit deadline:
use std::time::Duration;
producer.close_with_timeout(Duration::from_secs(10)).await?;
Built-in Retry Logic
The transactional producer automatically retries sends on transient failures:
- Uses the shared
RetryPolicy(default: 3 retries, exponential backoff with jitter) - Metadata is refreshed on transient errors before retrying
OutOfOrderSequenceNumbererrors trigger a sequence number reset and batch rebuild with a fresh sequence before retrying- Sequence numbers and the batch are allocated once and reused across normal retries to maintain idempotent semantics
- Non-retriable errors (auth failures, invalid topics) fail immediately
Coordinator Re-discovery
All coordinator RPCs (InitProducerId, AddPartitionsToTxn, AddOffsetsToTxn, EndTxn)
automatically handle coordinator failover:
- On
NotCoordinator,CoordinatorNotAvailable, orCoordinatorLoadInProgressthe cached coordinator is invalidated and a freshFindCoordinatoris issued before retrying. - Network and timeout errors to the coordinator trigger the same invalidation + re-discovery flow.
- The retry uses the producer’s
RetryPolicyfor exponential backoff between attempts. - Fatal errors (
TransactionCoordinatorFenced,ProducerFenced,InvalidProducerEpoch,InvalidTxnState) are never retried. - If no coordinator is cached (e.g. after invalidation),
coordinator_connection()auto-discovers one transparently before returning the connection.
Timestamps
Both Producer and TransactionalProducer propagate the timestamp field from ProducerRecord to the Kafka record batch. If set, the timestamp is used as the base_timestamp of the record batch:
use krafka::producer::ProducerRecord;
let mut record = ProducerRecord::new("my-topic", b"value".to_vec());
record.timestamp = Some(1700000000000); // epoch millis
producer.send_record(record).await?;
Note: If
timestampis not set, the broker defaults apply (typicallyLogAppendTimeorCreateTimedepending on topic configuration).
Consume-Transform-Produce (Exactly-Once)
For read-process-write patterns with exactly-once guarantees:
use krafka::producer::TransactionalProducer;
use std::collections::HashMap;
// Commit consumer offsets atomically with produce
producer.begin_transaction()?;
// Process records and produce output
for record in consumer_records {
let output = transform(&record)?;
producer.send("output-topic", record.key, &output).await?;
}
// Commit offsets as part of transaction
let mut offsets = HashMap::new();
offsets.insert(topic_partition, offset_and_metadata);
producer.send_offsets_to_transaction(&offsets, "consumer-group").await?;
// Atomic commit of messages and offsets
producer.commit_transaction().await?;
Transaction States
The producer maintains a state machine with atomic CAS (compare-and-swap) transitions for thread safety:
| State | Description |
|---|---|
Uninitialized |
Producer created, init_transactions() not called |
Ready |
Ready to begin a new transaction |
InTransaction |
Transaction in progress |
Committing |
Transaction being committed |
Aborting |
Transaction being aborted |
FatalError |
Unrecoverable error, producer must be recreated |
Note: State transitions are protected by atomic compare-and-swap operations, preventing race conditions when multiple tasks interact with the transactional producer concurrently.
Producer Interceptors
Interceptors allow you to observe and modify records before they are sent, and observe the acknowledgement (or error) after a send completes. See the Interceptors Guide for full details.
use krafka::interceptor::{InterceptorResult, ProducerInterceptor};
use krafka::producer::{Producer, ProducerRecord, RecordMetadata};
use krafka::error::KrafkaError;
use std::sync::Arc;
#[derive(Debug)]
struct AuditInterceptor;
impl ProducerInterceptor for AuditInterceptor {
fn on_send(&self, record: &mut ProducerRecord) -> InterceptorResult {
// Add a tracing header to every record
record.headers.push(("x-trace-id".to_string(), b"abc123".to_vec()));
Ok(())
}
fn on_acknowledgement(&self, metadata: &RecordMetadata, error: Option<&KrafkaError>) -> InterceptorResult {
if let Some(err) = error {
eprintln!("Send failed: {}", err);
} else {
println!("Sent to {}:{}", metadata.topic, metadata.partition);
}
Ok(())
}
}
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.interceptor(Arc::new(AuditInterceptor))
.build()
.await?;
Next Steps
- Interceptors Guide - Producer and consumer interceptor hooks
- Consumer Guide - Learn about consuming messages
- Configuration Reference - All producer options
- Architecture Overview - How the producer works internally