Metrics & Observability Guide

This guide covers metrics collection and export in Krafka.

Overview

Krafka provides built-in metrics collection that is automatically wired into all hot paths:

  • Producer metrics: Records sent, bytes, batches, errors, retries, send latency — recorded in send(), send_to_partition(), and batch accumulator flush
  • Consumer metrics: Records received, polls, fetches, commits, rebalances, assigned partitions, lag, poll latency — recorded in poll(), commit(), and close()
  • Connection metrics: Connections created/closed, errors, establishment latency

All metrics are lock-free using atomic operations for minimal performance impact. Access metrics via producer.metrics_handle() or consumer.metrics().

Basic Usage

Each client type has its own metrics:

use krafka::metrics::{ProducerMetrics, ConsumerMetrics, ConnectionMetrics};

let producer_metrics = ProducerMetrics::new();
let consumer_metrics = ConsumerMetrics::new();
let connection_metrics = ConnectionMetrics::new();

// Record some metrics
producer_metrics.record_send(100);
producer_metrics.send_latency.record(std::time::Duration::from_millis(5));

// Get a snapshot
let snapshot = producer_metrics.snapshot();
println!("Records sent: {}", snapshot.records_sent);
println!("Bytes sent: {}", snapshot.bytes_sent);

Prometheus Export

Krafka provides built-in Prometheus text format export without external dependencies:

use krafka::metrics::{ProducerMetrics, MetricsExport};

let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_batch(5);

// Export in Prometheus text format
let prometheus_output = metrics.to_prometheus_text("krafka_producer");
println!("{}", prometheus_output);

Example output:

# HELP krafka_producer_records_sent_total Total number of records sent
# TYPE krafka_producer_records_sent_total counter
krafka_producer_records_sent_total 6
# HELP krafka_producer_bytes_sent_total Total bytes sent
# TYPE krafka_producer_bytes_sent_total counter
krafka_producer_bytes_sent_total 100
# HELP krafka_producer_batches_sent_total Total batches sent
# TYPE krafka_producer_batches_sent_total counter
krafka_producer_batches_sent_total 1
# HELP krafka_producer_send_latency_seconds Send latency
# TYPE krafka_producer_send_latency_seconds summary
krafka_producer_send_latency_seconds_count 0
krafka_producer_send_latency_seconds_sum 0.000000000

Aggregated Metrics

Use KrafkaMetrics to collect and export all metrics from multiple components:

use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

let metrics = KrafkaMetrics::new();

// Get shared metrics handles for your clients
let producer_metrics = metrics.producer_metrics();
let consumer_metrics = metrics.consumer_metrics();
let connection_metrics = metrics.connection_metrics();

// Record metrics during operations
producer_metrics.record_send(100);
consumer_metrics.record_poll(5);

// Export all metrics in a single call
let all_metrics = metrics.to_prometheus_text();
println!("{}", all_metrics);

// Reset all metrics (e.g., after scrape)
metrics.reset();

HTTP Metrics Endpoint

For production use, expose metrics via HTTP:

use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

// Create shared metrics registry
let metrics = Arc::new(KrafkaMetrics::new());

// In your HTTP server handler (pseudo-code):
async fn metrics_handler(metrics: Arc<KrafkaMetrics>) -> String {
    metrics.to_prometheus_text()
}

Example with Axum:

use axum::{routing::get, Router, Extension};
use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

async fn metrics_handler(Extension(metrics): Extension<Arc<KrafkaMetrics>>) -> String {
    metrics.to_prometheus_text()
}

#[tokio::main]
async fn main() {
    let metrics = Arc::new(KrafkaMetrics::new());
    
    let app = Router::new()
        .route("/metrics", get(metrics_handler))
        .layer(Extension(metrics.clone()));
    
    // Use metrics.producer_metrics() etc. with your Kafka clients
}

Available Metrics

Producer Metrics

Metric Type Description
records_sent_total Counter Total records sent successfully
bytes_sent_total Counter Total bytes sent (record values)
batches_sent_total Counter Total batches sent
errors_total Counter Total send errors
retries_total Counter Total retry attempts
connections Gauge Current active connections
buffered_records Gauge Currently buffered records
send_latency_seconds Summary Send latency statistics

Consumer Metrics

Metric Type Description
records_received_total Counter Total records received
bytes_received_total Counter Total bytes received
fetches_total Counter Total fetch requests
polls_total Counter Total poll operations
empty_polls_total Counter Polls that returned no records
commits_total Counter Total offset commits
errors_total Counter Total errors
rebalances_total Counter Total rebalance operations
lag Gauge Current consumer lag
assigned_partitions Gauge Currently assigned partitions
paused_partitions Gauge Currently paused partitions
poll_latency_seconds Summary Poll latency statistics
fetch_latency_seconds Summary Fetch latency statistics

Connection Metrics

Metric Type Description
connections_created_total Counter Total connections created
connections_closed_total Counter Total connections closed
connection_errors_total Counter Connection errors
active_connections Gauge Current active connections
connect_latency_seconds Summary Connection establishment latency

Latency Tracking

The LatencyTracker provides detailed latency statistics:

use krafka::metrics::LatencyTracker;
use std::time::Duration;

let tracker = LatencyTracker::new();

// Manual recording
tracker.record(Duration::from_millis(50));
tracker.record(Duration::from_millis(100));

// Or use guard for automatic timing
{
    let _guard = tracker.start();
    // ... operation being timed ...
} // Guard records latency when dropped

// Get statistics
println!("Count: {}", tracker.count());
println!("Min: {:?}", tracker.min());
println!("Max: {:?}", tracker.max());
println!("Avg: {:?}", tracker.avg());
println!("Sum: {:?}", tracker.sum());

// Get immutable snapshot
let snapshot = tracker.snapshot();

Integration with OpenTelemetry

While Krafka doesn’t have a built-in OpenTelemetry integration, you can use the Prometheus exporter with OpenTelemetry’s Prometheus receiver, or export snapshots directly:

use krafka::metrics::{KrafkaMetrics, ProducerMetricsSnapshot};

fn export_to_otel(snapshot: &ProducerMetricsSnapshot) {
    // Use your OpenTelemetry SDK to record metrics
    // meter.create_counter("krafka.records_sent").add(snapshot.records_sent);
    // etc.
}

Performance Considerations

  • All metrics use atomic operations (lock-free)
  • Counter increments use Ordering::Relaxed for minimal overhead
  • Latency tracking uses compare-and-swap for min/max updates
  • Gauge updates are immediate (no aggregation)
  • Gauge dec() saturates at zero (will not underflow below 0), ensuring correctness for connection and partition counting
  • Prometheus export only happens on request (pull-based)

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.