Error Handling Guide
This guide covers error handling in Krafka, including error types, strategies, and best practices.
Error Types
Krafka uses a single error enum for all error conditions:
pub enum KrafkaError {
/// Protocol encoding/decoding errors
Protocol { message: String },
/// Kafka broker errors (with error code)
Broker { code: ErrorCode, message: String },
/// Authentication failures
Auth { message: String },
/// Operation timeouts
Timeout { operation: String },
/// Compression/decompression errors
Compression { codec: String, source: Box<dyn Error> },
/// Configuration errors
Config { message: String },
/// Invalid state errors
InvalidState { message: String },
/// Serialization errors
Serialization { message: String, source: Option<Box<dyn Error>> },
}
Kafka Error Codes
Krafka defines all Kafka error codes in the ErrorCode enum:
use krafka::error::ErrorCode;
// Common error codes
ErrorCode::None // 0: No error
ErrorCode::UnknownServerError // -1: Unknown error
ErrorCode::OffsetOutOfRange // 1: Offset out of range
ErrorCode::NotLeaderForPartition // 6: Not leader for partition
ErrorCode::RequestTimedOut // 7: Request timed out
ErrorCode::MessageTooLarge // 10: Message too large
ErrorCode::UnknownTopicOrPartition // 3: Unknown topic
ErrorCode::LeaderNotAvailable // 5: Leader not available
ErrorCode::TopicAlreadyExists // 36: Topic already exists
ErrorCode::InvalidTopic // 17: Invalid topic
ErrorCode::GroupAuthorizationFailed // 30: Group auth failed
ErrorCode::SaslAuthenticationFailed // 58: SASL auth failed
ErrorCode::UnknownProducerId // 59: Unknown producer ID
ErrorCode::FencedInstanceId // 82: Fenced instance ID
ErrorCode::UnstableOffsetCommit // 88: Unstable offset commit
Note:
OffsetOutOfRangeerrors during fetch are automatically handled by the consumer — it applies the configuredauto_offset_resetpolicy to recover the affected partition without returning an error to the application.
Checking Error Codes
use krafka::error::ErrorCode;
// Check if error code indicates success
if error_code.is_ok() {
println!("Success!");
}
// Convert from raw i16
let code = ErrorCode::from_i16(6);
assert_eq!(code, ErrorCode::NotLeaderForPartition);
Error Handling Patterns
Basic Error Handling
use krafka::error::{KrafkaError, Result};
use krafka::producer::Producer;
async fn send_message(producer: &Producer) -> Result<()> {
match producer.send("topic", Some(b"key"), b"value").await {
Ok(metadata) => {
println!("Sent to partition {} offset {}",
metadata.partition, metadata.offset);
Ok(())
}
Err(e) => {
eprintln!("Send failed: {}", e);
Err(e)
}
}
}
Pattern Matching on Errors
use krafka::error::KrafkaError;
fn handle_error(error: KrafkaError) {
match error {
KrafkaError::Timeout { operation } => {
eprintln!("Operation timed out: {}", operation);
// Consider retrying
}
KrafkaError::Broker { code, message } => {
eprintln!("Broker error {:?}: {}", code, message);
// Check if retriable
}
KrafkaError::Auth { message } => {
eprintln!("Authentication failed: {}", message);
// Likely not retriable - check credentials
}
KrafkaError::Config { message } => {
eprintln!("Configuration error: {}", message);
// Fix configuration and restart
}
_ => {
eprintln!("Other error: {}", error);
}
}
}
Retry Logic
use krafka::error::{KrafkaError, ErrorCode};
use std::time::Duration;
fn is_retriable(error: &KrafkaError) -> bool {
match error {
KrafkaError::Timeout { .. } => true,
KrafkaError::Broker { code, .. } => matches!(
code,
ErrorCode::NotLeaderForPartition
| ErrorCode::LeaderNotAvailable
| ErrorCode::RequestTimedOut
| ErrorCode::ReplicaNotAvailable
| ErrorCode::NetworkException
| ErrorCode::CorruptMessage
| ErrorCode::UnknownTopicOrPartition
| ErrorCode::OutOfOrderSequenceNumber
| ErrorCode::ConcurrentTransactions
| ErrorCode::OperationNotAttempted
),
_ => false,
}
}
async fn send_with_retry<F, T>(
mut operation: F,
max_retries: u32,
backoff: Duration,
) -> Result<T, KrafkaError>
where
F: FnMut() -> futures::future::BoxFuture<'static, Result<T, KrafkaError>>,
{
let mut attempts = 0;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) if is_retriable(&e) && attempts < max_retries => {
attempts += 1;
let delay = backoff * attempts;
eprintln!(
"Retriable error (attempt {}/{}): {}. Retrying in {:?}",
attempts, max_retries, e, delay
);
tokio::time::sleep(delay).await;
}
Err(e) => return Err(e),
}
}
}
Error Context
Add context to errors for better debugging:
use krafka::error::{KrafkaError, Result};
async fn process_topic(producer: &Producer, topic: &str) -> Result<()> {
producer
.send(topic, None, b"message")
.await
.map_err(|e| {
eprintln!("Failed to send to topic {}: {}", topic, e);
e
})?;
Ok(())
}
Consumer Error Handling
AutoOffsetReset::None Error
When auto_offset_reset is set to None and a partition has no committed offset, poll() will return an error:
use krafka::consumer::{Consumer, AutoOffsetReset};
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("strict-group")
.auto_offset_reset(AutoOffsetReset::None)
.build()
.await?;
// This will error if any assigned partition has no committed offset
match consumer.poll(Duration::from_secs(1)).await {
Err(e) => eprintln!("No committed offset: {}", e),
Ok(records) => { /* process */ }
}
Handling Poll Errors
use krafka::consumer::Consumer;
use krafka::error::KrafkaError;
use std::time::Duration;
async fn consume_safely(consumer: &Consumer) {
loop {
match consumer.poll(Duration::from_secs(1)).await {
Ok(records) => {
for record in records {
if let Err(e) = process_record(&record).await {
eprintln!("Failed to process record: {}", e);
// Decide: skip, retry, or stop
}
}
}
Err(KrafkaError::Timeout { .. }) => {
// Normal - no messages available
continue;
}
Err(KrafkaError::Broker { code, message }) => {
eprintln!("Broker error {:?}: {}", code, message);
// May need to refresh metadata or reconnect
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(e) => {
eprintln!("Fatal error: {}", e);
break;
}
}
}
}
Commit Error Handling
use krafka::error::KrafkaError;
async fn commit_with_retry(consumer: &Consumer, retries: u32) -> Result<(), KrafkaError> {
let mut attempts = 0;
loop {
match consumer.commit().await {
Ok(()) => return Ok(()),
Err(e) if attempts < retries => {
attempts += 1;
eprintln!("Commit failed (attempt {}): {}", attempts, e);
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(e) => return Err(e),
}
}
}
Producer Error Handling
Send Error Handling
use krafka::producer::Producer;
use krafka::error::{KrafkaError, ErrorCode};
async fn send_critical_message(
producer: &Producer,
topic: &str,
key: &[u8],
value: &[u8],
) -> Result<(), KrafkaError> {
const MAX_RETRIES: u32 = 3;
for attempt in 1..=MAX_RETRIES {
match producer.send(topic, Some(key), value).await {
Ok(metadata) => {
println!("Message sent to {}:{}", metadata.partition, metadata.offset);
return Ok(());
}
Err(KrafkaError::Broker { code: ErrorCode::MessageTooLarge, .. }) => {
// Not retriable - message is too large
return Err(KrafkaError::config("Message exceeds max size"));
}
Err(e) if is_retriable(&e) && attempt < MAX_RETRIES => {
eprintln!("Send failed (attempt {}): {}. Retrying...", attempt, e);
tokio::time::sleep(Duration::from_millis(100 * attempt as u64)).await;
}
Err(e) => {
return Err(e);
}
}
}
Err(KrafkaError::timeout("send after retries"))
}
Admin Error Handling
Create Topic Errors
use krafka::admin::{AdminClient, NewTopic};
use krafka::error::KrafkaError;
async fn ensure_topic_exists(
admin: &AdminClient,
name: &str,
partitions: i32,
replication_factor: i16,
) -> Result<(), KrafkaError> {
let topic = NewTopic::new(name, partitions, replication_factor);
match admin.create_topics(vec![topic], Duration::from_secs(30)).await {
Ok(results) => {
for result in results {
match &result.error {
None => println!("Created topic: {}", result.name),
Some(e) if e.contains("TOPIC_ALREADY_EXISTS") => {
println!("Topic {} already exists", result.name);
}
Some(e) => {
return Err(KrafkaError::broker(
ErrorCode::UnknownServerError,
e.clone(),
));
}
}
}
Ok(())
}
Err(e) => Err(e),
}
}
Best Practices
1. Always Handle Errors
// ❌ Bad: ignoring errors
let _ = producer.send("topic", None, b"value").await;
// ✅ Good: handling errors
if let Err(e) = producer.send("topic", None, b"value").await {
log::error!("Send failed: {}", e);
}
2. Use Appropriate Retry Strategies
// ❌ Bad: infinite retries
loop {
if producer.send(...).await.is_ok() {
break;
}
}
// ✅ Good: bounded retries with backoff
let mut attempts = 0;
while attempts < 3 {
match producer.send(...).await {
Ok(_) => break,
Err(e) if is_retriable(&e) => {
attempts += 1;
tokio::time::sleep(Duration::from_millis(100 << attempts)).await;
}
Err(e) => return Err(e),
}
}
3. Log Errors with Context
// ❌ Bad: minimal logging
log::error!("Error: {}", e);
// ✅ Good: contextual logging
log::error!(
topic = %topic,
partition = %partition,
offset = %offset,
"Failed to process message: {}",
e
);
4. Graceful Degradation
async fn process_with_fallback(record: &ConsumerRecord) -> Result<()> {
match primary_processing(record).await {
Ok(()) => Ok(()),
Err(e) => {
log::warn!("Primary processing failed: {}. Using fallback.", e);
fallback_processing(record).await
}
}
}
5. Circuit Breaker Pattern
use std::sync::atomic::{AtomicU32, AtomicBool, Ordering};
use std::time::{Duration, Instant};
struct CircuitBreaker {
failures: AtomicU32,
threshold: u32,
open: AtomicBool,
opened_at: std::sync::Mutex<Option<Instant>>,
reset_timeout: Duration,
}
impl CircuitBreaker {
fn record_failure(&self) {
let failures = self.failures.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= self.threshold {
self.open.store(true, Ordering::SeqCst);
*self.opened_at.lock().unwrap() = Some(Instant::now());
}
}
fn record_success(&self) {
self.failures.store(0, Ordering::SeqCst);
self.open.store(false, Ordering::SeqCst);
}
fn is_open(&self) -> bool {
if !self.open.load(Ordering::SeqCst) {
return false;
}
// Check if we should try again
if let Some(opened_at) = *self.opened_at.lock().unwrap() {
if opened_at.elapsed() > self.reset_timeout {
return false; // Allow retry
}
}
true
}
}
Next Steps
- Configuration Reference - Timeout and retry settings
- Architecture Overview - How errors flow through the system