Metrics & Observability Guide

This guide covers metrics collection and export in Krafka.

Overview

Krafka provides built-in metrics collection that is automatically wired into all hot paths:

  • Producer metrics: Records sent, bytes, batches, errors, retries, send latency — recorded in send(), send_to_partition(), and batch accumulator flush
  • Consumer metrics: Records received, polls, fetches, commits, rebalances, seeks, assigned partitions, lag, poll latency — recorded in poll(), commit(), seek(), seek_many(), and close()
  • Connection metrics: Connections created/closed, errors, establishment latency, priority scheduling counters, and broker throttle delays

All metrics are lock-free using atomic operations for minimal performance impact. Access metrics via producer.metrics_handle(), consumer.metrics(), or the connection metric handles exposed by producer, consumer, share-consumer, admin, and connection-pool APIs.

Pluggable Export

Krafka uses a trait-based export system. Implement MetricsExporter to add any backend. Built-in exporters:

Exporter Format Dependency
PrometheusExporter Prometheus text exposition None
JsonExporter JSON array of metric objects None
OtlpExporter OTLP MetricsData v1 protobuf telemetry feature

Custom Exporter

use krafka::metrics::{MetricsExporter, LatencySnapshot};

struct StatsDExporter { /* ... */ }

impl MetricsExporter for StatsDExporter {
    fn export_counter(&mut self, name: &str, _help: &str, value: u64) {
        // send_udp(format!("{name}:{value}|c"));
    }
    fn export_gauge(&mut self, name: &str, _help: &str, value: u64) {
        // send_udp(format!("{name}:{value}|g"));
    }
    fn export_latency(&mut self, name: &str, _help: &str, snapshot: &LatencySnapshot) {
        // send_udp(format!("{name}.count:{0}|g", snapshot.count));
    }
}

Basic Usage

Each client type has its own metrics:

use krafka::metrics::{ProducerMetrics, ConsumerMetrics, ConnectionMetrics, MetricsVisitable};

let producer_metrics = ProducerMetrics::new();
let consumer_metrics = ConsumerMetrics::new();
let connection_metrics = ConnectionMetrics::new();

// Record some metrics
producer_metrics.record_send(100);
producer_metrics.send_latency.record(std::time::Duration::from_millis(5));

// Get a snapshot
let snapshot = producer_metrics.snapshot();
println!("Records sent: {}", snapshot.records_sent);
println!("Bytes sent: {}", snapshot.bytes_sent);

Prometheus Export

use krafka::metrics::{ProducerMetrics, MetricsVisitable};

let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_batch(5);

// Export in Prometheus text format (convenience method)
let prometheus_output = metrics.to_prometheus_text("krafka_producer");
println!("{}", prometheus_output);

Or use the exporter directly:

use krafka::metrics::{ProducerMetrics, PrometheusExporter, MetricsVisitable};

let metrics = ProducerMetrics::new();
let mut exporter = PrometheusExporter::new();
metrics.export_metrics("krafka_producer", &mut exporter);
let output = exporter.finish();

JSON Export

use krafka::metrics::{ProducerMetrics, JsonExporter, MetricsVisitable};

let metrics = ProducerMetrics::new();
metrics.record_send(100);

let mut exporter = JsonExporter::new();
metrics.export_metrics("krafka_producer", &mut exporter);
let json = exporter.finish();
// [{"name":"krafka_producer_records_sent","type":"counter","help":"Total records sent","value":1}, ...]

Aggregated Metrics

Use KrafkaMetrics to collect and export all metrics from multiple components:

use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

let metrics = KrafkaMetrics::new();

// Get shared metrics handles for your clients
let producer_metrics = metrics.producer_metrics();
let consumer_metrics = metrics.consumer_metrics();
let connection_metrics = metrics.connection_metrics();

// Record metrics during operations
producer_metrics.record_send(100);
consumer_metrics.record_poll(5);

// Export all metrics in a single call
let all_metrics = metrics.to_prometheus_text();
println!("{}", all_metrics);

// Export as JSON
let json = metrics.to_json();

// Use a custom exporter
use krafka::metrics::PrometheusExporter;
let mut exporter = PrometheusExporter::new();
metrics.export_all(&mut exporter);
let output = exporter.finish();

// Reset all metrics (e.g., after scrape)
metrics.reset();

HTTP Metrics Endpoint

For production use, expose metrics via HTTP:

use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

// Create shared metrics registry
let metrics = Arc::new(KrafkaMetrics::new());

// In your HTTP server handler (pseudo-code):
async fn metrics_handler(metrics: Arc<KrafkaMetrics>) -> String {
    metrics.to_prometheus_text()
}

Example with Axum:

use axum::{routing::get, Router, Extension};
use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

async fn metrics_handler(Extension(metrics): Extension<Arc<KrafkaMetrics>>) -> String {
    metrics.to_prometheus_text()
}

#[tokio::main]
async fn main() {
    let metrics = Arc::new(KrafkaMetrics::new());
    
    let app = Router::new()
        .route("/metrics", get(metrics_handler))
        .layer(Extension(metrics.clone()));
    
    // Use metrics.producer_metrics() etc. with your Kafka clients
}

Available Metrics

Producer Metrics

Metric Type Description
records_sent_total Counter Total records sent successfully
bytes_sent_total Counter Total bytes sent (record values)
batches_sent_total Counter Total batches sent
errors_total Counter Total send errors
retries_total Counter Total retry attempts
compressed_bytes_total Counter Total compressed bytes written for compressed batches
uncompressed_bytes_total Counter Total uncompressed bytes for the same compressed batches
connections Gauge Current active connections
buffered_records Gauge Producer records currently admitted under the memory budget
send_latency_seconds Summary Send latency statistics
topic_records_sent_total{topic="<name>"} Counter Records sent to a specific topic (per-topic label)
topic_bytes_sent_total{topic="<name>"} Counter Bytes sent to a specific topic (per-topic label)
topic_errors_total{topic="<name>"} Counter Send errors for a specific topic (per-topic label)

Per-topic metrics use Prometheus labels. Example PromQL queries:

# Records sent per topic (rate over 5 min)
rate(krafka_producer_topic_records_sent_total[5m])

# Errors for a specific topic
krafka_producer_topic_errors_total{topic="orders"}

compression_ratio_avg is available as a derived field in ProducerMetricsSnapshot (computed as compressed_bytes / uncompressed_bytes). A value of 0.3 means the codec reduced data to 30% of original size. The field is None when no compressed batches have been sent.

Consumer Metrics

Metric Type Description
records_received_total Counter Total records received
bytes_received_total Counter Total bytes received
fetches_total Counter Total fetch requests
polls_total Counter Total poll operations
empty_polls_total Counter Polls that returned no records
commits_total Counter Total offset commits
errors_total Counter Total errors
rebalances_total Counter Total rebalance operations
seeks_total Counter Total seek operations (seek + seek_many partition count)
lag Gauge Total consumer lag across all assigned partitions
lag_max Gauge Maximum per-partition consumer lag
assigned_partitions Gauge Currently assigned partitions
paused_partitions Gauge Currently paused partitions
buffered_records Gauge Currently buffered records in recv() buffer
poll_latency_seconds Summary Poll latency statistics
fetch_latency_seconds Summary Fetch latency statistics

Connection Metrics

Metric Type Description
connections_created_total Counter Total connections created
connections_closed_total Counter Total connections closed
connection_errors_total Counter Connection errors
high_priority_requests_total Counter High-priority requests sent
normal_priority_requests_total Counter Normal-priority requests sent
high_priority_bypasses_total Counter High-priority requests processed ahead of normal-priority work
high_priority_bypass_yields_total Counter Forced normal-priority drain steps after exhausting the high-priority bypass budget
throttle_delays_total Counter Normal-priority requests delayed due to broker throttling
throttle_delay_ms_total Counter Total broker-throttle delay applied to normal-priority requests, in milliseconds
active_connections Gauge Current active connections
connect_latency_seconds Summary Connection establishment latency
tls_handshake_latency_seconds Summary TLS handshake latency (populated for TLS connections only)

Latency Tracking

The LatencyTracker provides detailed latency statistics:

Accuracy note — percentile estimates: LatencyTracker uses a 512-bucket histogram (8 equal sub-buckets per power-of-2 band). The percentile estimate is the midpoint of the matching sub-bucket, giving a maximum relative error of ≤ 6.25 % per sub-bucket. In practice:

True p99 Sub-bucket width Max error
1 ms – 2 ms 125 µs 6.25 %
8 ms – 16 ms 1 ms 6.25 %
64 ms – 128 ms 8 ms 6.25 %

This is suitable for p99 SLO alerting with a threshold tolerance of ≥ 8 %. For sub-millisecond or tighter requirements, use the OTLP exporter and aggregate into an HDR histogram or T-Digest in your observability backend.

use krafka::metrics::LatencyTracker;
use std::time::Duration;

let tracker = LatencyTracker::new();

// Manual recording
tracker.record(Duration::from_millis(50));
tracker.record(Duration::from_millis(100));

// Or use guard for automatic timing
{
    let _guard = tracker.start();
    // ... operation being timed ...
} // Guard records latency when dropped

// Get statistics
println!("Count: {}", tracker.count());
println!("Min: {:?}", tracker.min());
println!("Max: {:?}", tracker.max());
println!("Avg: {:?}", tracker.avg());
println!("Sum: {:?}", tracker.sum());

// Get immutable snapshot
let snapshot = tracker.snapshot();

Integration with OpenTelemetry

Built-in OTLP Export (feature telemetry)

Enable the telemetry feature for native OTLP protobuf export and KIP-714 broker telemetry:

[dependencies]
krafka = { version = "...", features = ["telemetry"] }

Export metrics as OTLP protobuf bytes for ingestion by any OTLP-compatible backend:

use krafka::telemetry::otlp::OtlpExporter;
use krafka::metrics::{KrafkaMetrics, MetricsVisitable};

let metrics = KrafkaMetrics::new();
// ... record metrics ...

let mut exporter = OtlpExporter::new(true, 0); // delta temporality
exporter.add_resource_attribute("service.name", "my-service");
metrics.export_all(&mut exporter);
let otlp_bytes: Vec<u8> = exporter.finish();
// Send otlp_bytes to your OTLP receiver via gRPC or HTTP

KIP-714 Automatic Telemetry

The TelemetryReporter implements KIP-714 client telemetry — it subscribes to the broker’s telemetry endpoint and pushes metric snapshots on the broker-specified interval:

use krafka::telemetry::reporter::{TelemetryReporter, TelemetryConfig};

let config = TelemetryConfig {
    enabled: true,
    metrics_prefix: "krafka".into(),
    resource_attributes: vec![
        ("service.name".into(), "my-app".into()),
    ],
};

let reporter = TelemetryReporter::new(connection, krafka_metrics, config, shutdown_rx);
tokio::spawn(reporter.run());

The reporter handles subscription polling, push interval jitter, local OTLP payload chunking under the broker’s TelemetryMaxBytes limit, re-subscription on UNKNOWN_SUBSCRIPTION_ID or unsplittable oversized metrics, and a graceful terminating push on shutdown. When the broker advertises accepted compression codecs, the reporter tries them in broker preference order, skips locally unavailable codecs after the first failure, and only uses uncompressed payloads when the broker explicitly advertises Compression::None; otherwise the reporter stops if none of the advertised codecs is locally usable. If a multi-chunk push is only partially accepted, the reporter commits delta baselines for the accepted chunks and retries the exact remaining chunk slice on the next interval.

Manual Bridge to External OTel SDKs

You can also bridge metrics to an external OpenTelemetry SDK using snapshots or a custom exporter:

use krafka::metrics::{KrafkaMetrics, ProducerMetricsSnapshot};

fn export_to_otel(snapshot: &ProducerMetricsSnapshot) {
    // Use your OpenTelemetry SDK to record metrics
    // meter.create_counter("krafka.records_sent").add(snapshot.records_sent);
}

Performance Considerations

  • All metrics use atomic operations (lock-free)
  • Counter increments use Ordering::Relaxed for minimal overhead
  • Latency tracking uses compare-and-swap for min/max updates
  • Gauge updates are immediate (no aggregation)
  • Gauge dec() saturates at zero (will not underflow below 0), ensuring correctness for connection and partition counting. Every underflow emits a warn! log with a cumulative underflow_count field so that miscounted inc/dec pairs surface immediately rather than silently inflating counters
  • Prometheus and JSON export only happen on request (pull-based)
  • OTLP protobuf encoding is zero-copy where possible; no external protobuf dependency
  • KIP-714 telemetry push runs on a background task with broker-controlled intervals

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.