Interceptors Guide

Interceptors allow you to hook into the producer and consumer pipelines at key points. They are modeled after the Kafka Java client’s ProducerInterceptor and ConsumerInterceptor interfaces.

Overview

Hook Pipeline When
on_send Producer Before a record is partitioned and sent
on_acknowledgement Producer After a record is acknowledged (or fails)
close Producer When the producer is shutting down
on_consume Consumer After records are fetched, before returned to the application
on_commit Consumer After offsets are committed
close Consumer When the consumer is shutting down

Use cases:

  • Observability: Count records, measure latency, log errors
  • Record enrichment: Add tracing headers, inject metadata
  • Auditing: Track what was produced and consumed
  • Metrics collection: Feed data into Prometheus, StatsD, etc.

Producer Interceptor

Trait Definition

pub trait ProducerInterceptor: Send + Sync + fmt::Debug {
    /// Called before a record is sent (before partitioning).
    /// The record can be mutated (e.g., adding headers).
    fn on_send(&self, _record: &mut ProducerRecord) {}

    /// Called after a record is acknowledged or fails.
    /// `error` is `None` on success.
    fn on_acknowledgement(&self, _metadata: &RecordMetadata, _error: Option<&KrafkaError>) {}

    /// Called when the producer is being closed.
    /// Use this to release any resources held by the interceptor.
    fn close(&self) {}
}

All methods have default no-op implementations, so you only need to override the hooks you care about.

Note: on_acknowledgement is called for all send paths — both the direct send path (linger = 0) and the batched accumulator path (linger > 0).

Example: Tracing Headers

use krafka::interceptor::ProducerInterceptor;
use krafka::producer::{Producer, ProducerRecord, RecordMetadata};
use krafka::error::KrafkaError;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Debug)]
struct TracingInterceptor;

impl ProducerInterceptor for TracingInterceptor {
    fn on_send(&self, record: &mut ProducerRecord) {
        let trace_id = Uuid::new_v4().to_string();
        record.headers.push(("x-trace-id".to_string(), trace_id.into_bytes()));
    }

    fn on_acknowledgement(&self, metadata: &RecordMetadata, error: Option<&KrafkaError>) {
        match error {
            None => tracing::info!(
                topic = %metadata.topic,
                partition = metadata.partition,
                offset = metadata.offset,
                "record acknowledged"
            ),
            Some(e) => tracing::error!("send failed: {}", e),
        }
    }
}

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .interceptor(Arc::new(TracingInterceptor))
    .build()
    .await?;

Example: Metrics Counter

use krafka::interceptor::ProducerInterceptor;
use krafka::producer::{ProducerRecord, RecordMetadata};
use krafka::error::KrafkaError;
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug)]
struct MetricsInterceptor {
    sent: AtomicU64,
    errors: AtomicU64,
}

impl MetricsInterceptor {
    fn new() -> Self {
        Self {
            sent: AtomicU64::new(0),
            errors: AtomicU64::new(0),
        }
    }
}

impl ProducerInterceptor for MetricsInterceptor {
    fn on_acknowledgement(&self, _metadata: &RecordMetadata, error: Option<&KrafkaError>) {
        if error.is_some() {
            self.errors.fetch_add(1, Ordering::Relaxed);
        } else {
            self.sent.fetch_add(1, Ordering::Relaxed);
        }
    }
}

Consumer Interceptor

Trait Definition

pub trait ConsumerInterceptor: Send + Sync + fmt::Debug {
    /// Called after records are fetched, before returned to the application.
    fn on_consume(&self, _records: &[ConsumerRecord]) {}

    /// Called after offsets are committed.
    /// The map keys are `(topic, partition)` and values are the committed offsets.
    fn on_commit(
        &self,
        _offsets: &HashMap<(String, PartitionId), Offset>,
        _error: Option<&KrafkaError>,
    ) {}

    /// Called when the consumer is being closed.
    /// Use this to release any resources held by the interceptor.
    fn close(&self) {}
}

Example: Consumption Logging

use krafka::interceptor::ConsumerInterceptor;
use krafka::consumer::{Consumer, ConsumerRecord};
use std::sync::Arc;

#[derive(Debug)]
struct LoggingInterceptor;

impl ConsumerInterceptor for LoggingInterceptor {
    fn on_consume(&self, records: &[ConsumerRecord]) {
        for record in records {
            println!(
                "Consumed: topic={}, partition={}, offset={}",
                record.topic, record.partition, record.offset
            );
        }
    }
}

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .interceptor(Arc::new(LoggingInterceptor))
    .build()
    .await?;

Example: Commit Monitoring

use krafka::interceptor::ConsumerInterceptor;
use krafka::error::KrafkaError;
use krafka::{Offset, PartitionId};
use std::collections::HashMap;

#[derive(Debug)]
struct CommitMonitor;

impl ConsumerInterceptor for CommitMonitor {
    fn on_commit(
        &self,
        offsets: &HashMap<(String, PartitionId), Offset>,
        error: Option<&KrafkaError>,
    ) {
        match error {
            None => {
                for ((topic, partition), offset) in offsets {
                    println!("Committed {}:{} at offset {}", topic, partition, offset);
                }
            }
            Some(e) => eprintln!("Commit failed: {}", e),
        }
    }
}

Wiring Interceptors

Producer

use std::sync::Arc;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .interceptor(Arc::new(MyProducerInterceptor))
    .build()
    .await?;

Consumer

use std::sync::Arc;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .interceptor(Arc::new(MyConsumerInterceptor))
    .build()
    .await?;

No Interceptor (Default)

When no interceptor is configured, a no-op implementation is used internally. There is zero overhead — the no-op methods are inlined away by the compiler.

Pipeline Integration Points

Producer Pipeline

  send_record(record)
       │
       ▼
  interceptor.on_send(&mut record)     ← Modify record here
       │
       ▼
  partitioner.partition()
       │
       ▼
  encode + send to broker
       │
       ├─ success ─► interceptor.on_acknowledgement(metadata, None)
       │
       └─ failure ─► interceptor.on_acknowledgement(metadata, Some(error))

Consumer Pipeline

  poll()
    │
    ▼
  fetch from brokers
    │
    ▼
  interceptor.on_consume(&records)     ← Observe records here
    │
    ▼
  return records to application
    │
    ▼
  commit()
    │
    ▼
  interceptor.on_commit(&offsets, error)  ← Only committed offsets (filtered to assigned partitions)

Thread Safety

Interceptors must implement Send + Sync + Debug. Use atomic types or Mutex/RwLock for any mutable state:

use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug)]
struct SafeInterceptor {
    counter: AtomicU64,
}

// AtomicU64 is Send + Sync, so SafeInterceptor is too

Panic Safety

All interceptor method calls are wrapped in catch_unwind. If your interceptor panics, the panic is caught and logged as an error — it will not crash the producer or consumer. This means interceptors can never bring down the application, though a panicking interceptor will silently stop executing for that invocation.

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.