Metrics & Observability Guide

This guide covers metrics collection and export in Krafka.

Overview

Krafka provides built-in metrics collection that is automatically wired into all hot paths:

  • Producer metrics: Records sent, bytes, batches, errors, retries, send latency — recorded in send(), send_to_partition(), and batch accumulator flush
  • Consumer metrics: Records received, polls, fetches, commits, rebalances, assigned partitions, lag, poll latency — recorded in poll(), commit(), and close()
  • Connection metrics: Connections created/closed, errors, establishment latency

All metrics are lock-free using atomic operations for minimal performance impact. Access metrics via producer.metrics_handle() or consumer.metrics().

Pluggable Export

Krafka uses a trait-based export system. Implement MetricsExporter to add any backend. Built-in exporters:

Exporter Format Dependency
PrometheusExporter Prometheus text exposition None
JsonExporter JSON array of metric objects None
OtlpExporter OTLP MetricsData v1 protobuf telemetry feature

Custom Exporter

use krafka::metrics::{MetricsExporter, LatencySnapshot};

struct StatsDExporter { /* ... */ }

impl MetricsExporter for StatsDExporter {
    fn export_counter(&mut self, name: &str, _help: &str, value: u64) {
        // send_udp(format!("{name}:{value}|c"));
    }
    fn export_gauge(&mut self, name: &str, _help: &str, value: u64) {
        // send_udp(format!("{name}:{value}|g"));
    }
    fn export_latency(&mut self, name: &str, _help: &str, snapshot: &LatencySnapshot) {
        // send_udp(format!("{name}.count:{0}|g", snapshot.count));
    }
}

Basic Usage

Each client type has its own metrics:

use krafka::metrics::{ProducerMetrics, ConsumerMetrics, ConnectionMetrics, MetricsVisitable};

let producer_metrics = ProducerMetrics::new();
let consumer_metrics = ConsumerMetrics::new();
let connection_metrics = ConnectionMetrics::new();

// Record some metrics
producer_metrics.record_send(100);
producer_metrics.send_latency.record(std::time::Duration::from_millis(5));

// Get a snapshot
let snapshot = producer_metrics.snapshot();
println!("Records sent: {}", snapshot.records_sent);
println!("Bytes sent: {}", snapshot.bytes_sent);

Prometheus Export

use krafka::metrics::{ProducerMetrics, MetricsVisitable};

let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_batch(5);

// Export in Prometheus text format (convenience method)
let prometheus_output = metrics.to_prometheus_text("krafka_producer");
println!("{}", prometheus_output);

Or use the exporter directly:

use krafka::metrics::{ProducerMetrics, PrometheusExporter, MetricsVisitable};

let metrics = ProducerMetrics::new();
let mut exporter = PrometheusExporter::new();
metrics.export_metrics("krafka_producer", &mut exporter);
let output = exporter.finish();

JSON Export

use krafka::metrics::{ProducerMetrics, JsonExporter, MetricsVisitable};

let metrics = ProducerMetrics::new();
metrics.record_send(100);

let mut exporter = JsonExporter::new();
metrics.export_metrics("krafka_producer", &mut exporter);
let json = exporter.finish();
// [{"name":"krafka_producer_records_sent","type":"counter","help":"Total records sent","value":1}, ...]

Aggregated Metrics

Use KrafkaMetrics to collect and export all metrics from multiple components:

use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

let metrics = KrafkaMetrics::new();

// Get shared metrics handles for your clients
let producer_metrics = metrics.producer_metrics();
let consumer_metrics = metrics.consumer_metrics();
let connection_metrics = metrics.connection_metrics();

// Record metrics during operations
producer_metrics.record_send(100);
consumer_metrics.record_poll(5);

// Export all metrics in a single call
let all_metrics = metrics.to_prometheus_text();
println!("{}", all_metrics);

// Export as JSON
let json = metrics.to_json();

// Use a custom exporter
use krafka::metrics::PrometheusExporter;
let mut exporter = PrometheusExporter::new();
metrics.export_all(&mut exporter);
let output = exporter.finish();

// Reset all metrics (e.g., after scrape)
metrics.reset();

HTTP Metrics Endpoint

For production use, expose metrics via HTTP:

use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

// Create shared metrics registry
let metrics = Arc::new(KrafkaMetrics::new());

// In your HTTP server handler (pseudo-code):
async fn metrics_handler(metrics: Arc<KrafkaMetrics>) -> String {
    metrics.to_prometheus_text()
}

Example with Axum:

use axum::{routing::get, Router, Extension};
use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

async fn metrics_handler(Extension(metrics): Extension<Arc<KrafkaMetrics>>) -> String {
    metrics.to_prometheus_text()
}

#[tokio::main]
async fn main() {
    let metrics = Arc::new(KrafkaMetrics::new());
    
    let app = Router::new()
        .route("/metrics", get(metrics_handler))
        .layer(Extension(metrics.clone()));
    
    // Use metrics.producer_metrics() etc. with your Kafka clients
}

Available Metrics

Producer Metrics

Metric Type Description
records_sent_total Counter Total records sent successfully
bytes_sent_total Counter Total bytes sent (record values)
batches_sent_total Counter Total batches sent
errors_total Counter Total send errors
retries_total Counter Total retry attempts
connections Gauge Current active connections
buffered_records Gauge Currently buffered records
send_latency_seconds Summary Send latency statistics

Consumer Metrics

Metric Type Description
records_received_total Counter Total records received
bytes_received_total Counter Total bytes received
fetches_total Counter Total fetch requests
polls_total Counter Total poll operations
empty_polls_total Counter Polls that returned no records
commits_total Counter Total offset commits
errors_total Counter Total errors
rebalances_total Counter Total rebalance operations
lag Gauge Total consumer lag across all assigned partitions
lag_max Gauge Maximum per-partition consumer lag
assigned_partitions Gauge Currently assigned partitions
paused_partitions Gauge Currently paused partitions
buffered_records Gauge Currently buffered records in recv() buffer
poll_latency_seconds Summary Poll latency statistics
fetch_latency_seconds Summary Fetch latency statistics

Connection Metrics

Metric Type Description
connections_created_total Counter Total connections created
connections_closed_total Counter Total connections closed
connection_errors_total Counter Connection errors
active_connections Gauge Current active connections
connect_latency_seconds Summary Connection establishment latency

Latency Tracking

The LatencyTracker provides detailed latency statistics:

use krafka::metrics::LatencyTracker;
use std::time::Duration;

let tracker = LatencyTracker::new();

// Manual recording
tracker.record(Duration::from_millis(50));
tracker.record(Duration::from_millis(100));

// Or use guard for automatic timing
{
    let _guard = tracker.start();
    // ... operation being timed ...
} // Guard records latency when dropped

// Get statistics
println!("Count: {}", tracker.count());
println!("Min: {:?}", tracker.min());
println!("Max: {:?}", tracker.max());
println!("Avg: {:?}", tracker.avg());
println!("Sum: {:?}", tracker.sum());

// Get immutable snapshot
let snapshot = tracker.snapshot();

Integration with OpenTelemetry

Built-in OTLP Export (feature telemetry)

Enable the telemetry feature for native OTLP protobuf export and KIP-714 broker telemetry:

[dependencies]
krafka = { version = "...", features = ["telemetry"] }

Export metrics as OTLP protobuf bytes for ingestion by any OTLP-compatible backend:

use krafka::telemetry::otlp::OtlpExporter;
use krafka::metrics::{KrafkaMetrics, MetricsVisitable};

let metrics = KrafkaMetrics::new();
// ... record metrics ...

let mut exporter = OtlpExporter::new(true, 0); // delta temporality
exporter.add_resource_attribute("service.name", "my-service");
metrics.export_all(&mut exporter);
let otlp_bytes: Vec<u8> = exporter.finish();
// Send otlp_bytes to your OTLP receiver via gRPC or HTTP

KIP-714 Automatic Telemetry

The TelemetryReporter implements KIP-714 client telemetry — it subscribes to the broker’s telemetry endpoint and pushes metric snapshots on the broker-specified interval:

use krafka::telemetry::reporter::{TelemetryReporter, TelemetryConfig};

let config = TelemetryConfig {
    enabled: true,
    metrics_prefix: "krafka".into(),
    resource_attributes: vec![
        ("service.name".into(), "my-app".into()),
    ],
};

let reporter = TelemetryReporter::new(connection, krafka_metrics, config, shutdown_rx);
tokio::spawn(reporter.run());

The reporter handles subscription polling, push interval jitter, re-subscription on UNKNOWN_SUBSCRIPTION_ID, and a graceful terminating push on shutdown — all per KIP-714.

Manual Bridge to External OTel SDKs

You can also bridge metrics to an external OpenTelemetry SDK using snapshots or a custom exporter:

use krafka::metrics::{KrafkaMetrics, ProducerMetricsSnapshot};

fn export_to_otel(snapshot: &ProducerMetricsSnapshot) {
    // Use your OpenTelemetry SDK to record metrics
    // meter.create_counter("krafka.records_sent").add(snapshot.records_sent);
}

Performance Considerations

  • All metrics use atomic operations (lock-free)
  • Counter increments use Ordering::Relaxed for minimal overhead
  • Latency tracking uses compare-and-swap for min/max updates
  • Gauge updates are immediate (no aggregation)
  • Gauge dec() saturates at zero (will not underflow below 0), ensuring correctness for connection and partition counting
  • Prometheus and JSON export only happen on request (pull-based)
  • OTLP protobuf encoding is zero-copy where possible; no external protobuf dependency
  • KIP-714 telemetry push runs on a background task with broker-controlled intervals

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.