Error Handling Guide

This guide covers error handling in Krafka, including error types, strategies, and best practices.

Error Types

Krafka uses a single error enum for all error conditions:

pub enum KrafkaError {
    /// Protocol encoding/decoding errors
    Protocol { message: String },
    
    /// Kafka broker errors (with error code)
    Broker { code: ErrorCode, message: String },
    
    /// Authentication failures
    Auth { message: String },
    
    /// Operation timeouts
    Timeout { operation: String },
    
    /// Compression/decompression errors
    Compression { codec: String, source: Box<dyn Error> },
    
    /// Configuration errors
    Config { message: String },
    
    /// Invalid state errors
    InvalidState { message: String },
    
    /// Serialization errors
    Serialization { message: String, source: Option<Box<dyn Error>> },
}

Kafka Error Codes

Krafka defines all Kafka error codes in the ErrorCode enum:

use krafka::error::ErrorCode;

// Common error codes
ErrorCode::None                      // 0: No error
ErrorCode::UnknownServerError        // -1: Unknown error
ErrorCode::OffsetOutOfRange          // 1: Offset out of range
ErrorCode::NotLeaderForPartition     // 6: Not leader for partition
ErrorCode::RequestTimedOut           // 7: Request timed out
ErrorCode::MessageTooLarge           // 10: Message too large
ErrorCode::UnknownTopicOrPartition   // 3: Unknown topic
ErrorCode::LeaderNotAvailable        // 5: Leader not available
ErrorCode::TopicAlreadyExists        // 36: Topic already exists
ErrorCode::InvalidTopic              // 17: Invalid topic
ErrorCode::GroupAuthorizationFailed  // 30: Group auth failed
ErrorCode::SaslAuthenticationFailed  // 58: SASL auth failed
ErrorCode::UnknownProducerId         // 59: Unknown producer ID
ErrorCode::FencedInstanceId          // 82: Fenced instance ID
ErrorCode::UnstableOffsetCommit      // 88: Unstable offset commit

Note: OffsetOutOfRange errors during fetch are automatically handled by the consumer — it applies the configured auto_offset_reset policy to recover the affected partition without returning an error to the application.

Checking Error Codes

use krafka::error::ErrorCode;

// Check if error code indicates success
if error_code.is_ok() {
    println!("Success!");
}

// Convert from raw i16
let code = ErrorCode::from_i16(6);
assert_eq!(code, ErrorCode::NotLeaderForPartition);

Error Handling Patterns

Basic Error Handling

use krafka::error::{KrafkaError, Result};
use krafka::producer::Producer;

async fn send_message(producer: &Producer) -> Result<()> {
    match producer.send("topic", Some(b"key"), b"value").await {
        Ok(metadata) => {
            println!("Sent to partition {} offset {}", 
                     metadata.partition, metadata.offset);
            Ok(())
        }
        Err(e) => {
            eprintln!("Send failed: {}", e);
            Err(e)
        }
    }
}

Pattern Matching on Errors

use krafka::error::KrafkaError;

fn handle_error(error: KrafkaError) {
    match error {
        KrafkaError::Timeout { operation } => {
            eprintln!("Operation timed out: {}", operation);
            // Consider retrying
        }
        KrafkaError::Broker { code, message } => {
            eprintln!("Broker error {:?}: {}", code, message);
            // Check if retriable
        }
        KrafkaError::Auth { message } => {
            eprintln!("Authentication failed: {}", message);
            // Likely not retriable - check credentials
        }
        KrafkaError::Config { message } => {
            eprintln!("Configuration error: {}", message);
            // Fix configuration and restart
        }
        _ => {
            eprintln!("Other error: {}", error);
        }
    }
}

Retry Logic

use krafka::error::{KrafkaError, ErrorCode};
use std::time::Duration;

fn is_retriable(error: &KrafkaError) -> bool {
    match error {
        KrafkaError::Timeout { .. } => true,
        KrafkaError::Broker { code, .. } => matches!(
            code,
            ErrorCode::NotLeaderForPartition
            | ErrorCode::LeaderNotAvailable
            | ErrorCode::RequestTimedOut
            | ErrorCode::ReplicaNotAvailable
            | ErrorCode::NetworkException
            | ErrorCode::CorruptMessage
            | ErrorCode::UnknownTopicOrPartition
            | ErrorCode::OutOfOrderSequenceNumber
            | ErrorCode::ConcurrentTransactions
            | ErrorCode::OperationNotAttempted
        ),
        _ => false,
    }
}

async fn send_with_retry<F, T>(
    mut operation: F,
    max_retries: u32,
    backoff: Duration,
) -> Result<T, KrafkaError>
where
    F: FnMut() -> futures::future::BoxFuture<'static, Result<T, KrafkaError>>,
{
    let mut attempts = 0;
    
    loop {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if is_retriable(&e) && attempts < max_retries => {
                attempts += 1;
                let delay = backoff * attempts;
                eprintln!(
                    "Retriable error (attempt {}/{}): {}. Retrying in {:?}",
                    attempts, max_retries, e, delay
                );
                tokio::time::sleep(delay).await;
            }
            Err(e) => return Err(e),
        }
    }
}

Error Context

Add context to errors for better debugging:

use krafka::error::{KrafkaError, Result};

async fn process_topic(producer: &Producer, topic: &str) -> Result<()> {
    producer
        .send(topic, None, b"message")
        .await
        .map_err(|e| {
            eprintln!("Failed to send to topic {}: {}", topic, e);
            e
        })?;
    
    Ok(())
}

Consumer Error Handling

AutoOffsetReset::None Error

When auto_offset_reset is set to None and a partition has no committed offset, poll() will return an error:

use krafka::consumer::{Consumer, AutoOffsetReset};

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("strict-group")
    .auto_offset_reset(AutoOffsetReset::None)
    .build()
    .await?;

// This will error if any assigned partition has no committed offset
match consumer.poll(Duration::from_secs(1)).await {
    Err(e) => eprintln!("No committed offset: {}", e),
    Ok(records) => { /* process */ }
}

Handling Poll Errors

use krafka::consumer::Consumer;
use krafka::error::KrafkaError;
use std::time::Duration;

async fn consume_safely(consumer: &Consumer) {
    loop {
        match consumer.poll(Duration::from_secs(1)).await {
            Ok(records) => {
                for record in records {
                    if let Err(e) = process_record(&record).await {
                        eprintln!("Failed to process record: {}", e);
                        // Decide: skip, retry, or stop
                    }
                }
            }
            Err(KrafkaError::Timeout { .. }) => {
                // Normal - no messages available
                continue;
            }
            Err(KrafkaError::Broker { code, message }) => {
                eprintln!("Broker error {:?}: {}", code, message);
                // May need to refresh metadata or reconnect
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
            Err(e) => {
                eprintln!("Fatal error: {}", e);
                break;
            }
        }
    }
}

Commit Error Handling

use krafka::error::KrafkaError;

async fn commit_with_retry(consumer: &Consumer, retries: u32) -> Result<(), KrafkaError> {
    let mut attempts = 0;
    
    loop {
        match consumer.commit().await {
            Ok(()) => return Ok(()),
            Err(e) if attempts < retries => {
                attempts += 1;
                eprintln!("Commit failed (attempt {}): {}", attempts, e);
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
            Err(e) => return Err(e),
        }
    }
}

Producer Error Handling

Send Error Handling

use krafka::producer::Producer;
use krafka::error::{KrafkaError, ErrorCode};

async fn send_critical_message(
    producer: &Producer,
    topic: &str,
    key: &[u8],
    value: &[u8],
) -> Result<(), KrafkaError> {
    const MAX_RETRIES: u32 = 3;
    
    for attempt in 1..=MAX_RETRIES {
        match producer.send(topic, Some(key), value).await {
            Ok(metadata) => {
                println!("Message sent to {}:{}", metadata.partition, metadata.offset);
                return Ok(());
            }
            Err(KrafkaError::Broker { code: ErrorCode::MessageTooLarge, .. }) => {
                // Not retriable - message is too large
                return Err(KrafkaError::config("Message exceeds max size"));
            }
            Err(e) if is_retriable(&e) && attempt < MAX_RETRIES => {
                eprintln!("Send failed (attempt {}): {}. Retrying...", attempt, e);
                tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
            }
            Err(e) => {
                return Err(e);
            }
        }
    }
    
    Err(KrafkaError::timeout("send after retries"))
}

Admin Error Handling

Create Topic Errors

use krafka::admin::{AdminClient, NewTopic};
use krafka::error::KrafkaError;

async fn ensure_topic_exists(
    admin: &AdminClient,
    name: &str,
    partitions: i32,
    replication_factor: i16,
) -> Result<(), KrafkaError> {
    let topic = NewTopic::new(name, partitions, replication_factor);
    
    match admin.create_topics(vec![topic], Duration::from_secs(30)).await {
        Ok(results) => {
            for result in results {
                match &result.error {
                    None => println!("Created topic: {}", result.name),
                    Some(e) if e.contains("TOPIC_ALREADY_EXISTS") => {
                        println!("Topic {} already exists", result.name);
                    }
                    Some(e) => {
                        return Err(KrafkaError::broker(
                            ErrorCode::UnknownServerError,
                            e.clone(),
                        ));
                    }
                }
            }
            Ok(())
        }
        Err(e) => Err(e),
    }
}

Best Practices

1. Always Handle Errors

// ❌ Bad: ignoring errors
let _ = producer.send("topic", None, b"value").await;

// ✅ Good: handling errors
if let Err(e) = producer.send("topic", None, b"value").await {
    log::error!("Send failed: {}", e);
}

2. Use Appropriate Retry Strategies

// ❌ Bad: infinite retries
loop {
    if producer.send(...).await.is_ok() {
        break;
    }
}

// ✅ Good: bounded retries with backoff
let mut attempts = 0;
while attempts < 3 {
    match producer.send(...).await {
        Ok(_) => break,
        Err(e) if is_retriable(&e) => {
            attempts += 1;
            tokio::time::sleep(Duration::from_millis(100 << attempts)).await;
        }
        Err(e) => return Err(e),
    }
}

3. Log Errors with Context

// ❌ Bad: minimal logging
log::error!("Error: {}", e);

// ✅ Good: contextual logging
log::error!(
    topic = %topic,
    partition = %partition,
    offset = %offset,
    "Failed to process message: {}",
    e
);

4. Graceful Degradation

async fn process_with_fallback(record: &ConsumerRecord) -> Result<()> {
    match primary_processing(record).await {
        Ok(()) => Ok(()),
        Err(e) => {
            log::warn!("Primary processing failed: {}. Using fallback.", e);
            fallback_processing(record).await
        }
    }
}

5. Circuit Breaker Pattern

use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::time::{Duration, Instant};

struct CircuitBreaker {
    failures: AtomicU32,
    threshold: u32,
    open: AtomicBool,
    opened_at: std::sync::Mutex<Option<Instant>>,
    reset_timeout: Duration,
}

impl CircuitBreaker {
    fn record_failure(&self) {
        let failures = self.failures.fetch_add(1, Ordering::SeqCst) + 1;
        if failures >= self.threshold {
            self.open.store(true, Ordering::SeqCst);
            *self.opened_at.lock().unwrap() = Some(Instant::now());
        }
    }

    fn record_success(&self) {
        self.failures.store(0, Ordering::SeqCst);
        self.open.store(false, Ordering::SeqCst);
    }

    fn is_open(&self) -> bool {
        if !self.open.load(Ordering::SeqCst) {
            return false;
        }
        // Check if we should try again
        if let Some(opened_at) = *self.opened_at.lock().unwrap() {
            if opened_at.elapsed() > self.reset_timeout {
                return false;  // Allow retry
            }
        }
        true
    }
}

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.