Architecture Overview
This document describes the internal architecture of Krafka, a pure Rust Apache Kafka client.
Design Principles
1. Pure Rust
- No C bindings or FFI
- Full control over all code paths
- No FFI overhead or complexity
2. Async-Native
- Built on Tokio from the ground up
- Non-blocking I/O everywhere
- Efficient connection multiplexing
3. Zero Unsafe
- Memory safety guaranteed by Rust’s type system
- No undefined behavior risks
- Security by design
4. Zero-Copy Where Possible
- Uses
bytescrate for buffer management - Avoids unnecessary copies in hot paths
- Efficient protocol parsing
5. Security Hardened
- Secrets zeroized on drop (SCRAM passwords, AWS credentials)
- Constant-time comparison via
subtlecrate (timing-attack resistant) - PBKDF2 iteration count validated to prevent DoS
- Protocol allocations capped to prevent OOM from malicious brokers
- Decompression bomb protection (128 MiB limit)
- Debug output redacts all credentials
Module Architecture
krafka/
├── protocol/ # Kafka wire protocol
│ ├── primitives.rs # Basic types (strings, arrays, varints)
│ ├── record.rs # Record batches and compression
│ ├── messages.rs # API request/response types (incl. ACL messages)
│ ├── api.rs # API keys and versions
│ ├── header.rs # Request/response headers
│ └── codec.rs # Framing encoder/decoder
├── network/ # Networking layer
│ ├── connection.rs # Async TCP connections
│ ├── secure.rs # TLS/SASL authentication
│ └── pool.rs # Connection pooling
├── metadata.rs # Cluster metadata management
├── producer/ # Producer implementation
│ ├── mod.rs # Producer API
│ ├── config.rs # Producer configuration
│ ├── partitioner.rs # Partitioning strategies
│ ├── batch.rs # Record batching
│ ├── accumulator.rs # Record accumulator with linger timer
│ ├── record.rs # Producer records
│ ├── retry.rs # Retry policy with exponential backoff
│ └── idempotent.rs # Idempotent producer (PID, sequence tracking)
├── consumer/ # Consumer implementation
│ ├── mod.rs # Consumer API
│ ├── config.rs # Consumer configuration
│ ├── group.rs # Consumer group coordination (rebalance listeners, heartbeat)
│ ├── offset.rs # Offset management
│ └── record.rs # Consumer records
├── admin.rs # Admin client (topics, partitions, configs, ACLs)
├── auth/ # Authentication
│ ├── mod.rs # Auth module (SASL mechanisms)
│ ├── scram.rs # SCRAM-SHA-256/512 implementation
│ ├── msk_iam.rs # AWS MSK IAM authentication (Signature v4)
│ └── tls.rs # TLS/SSL connections with rustls
├── error.rs # Error types
├── metrics.rs # Metrics (counters, gauges, latency tracking)
├── tracing_ext.rs # Tracing (OpenTelemetry-compatible spans)
└── util.rs # Utilities (CRC, varints)
Protocol Layer
Wire Protocol
Krafka implements the Kafka binary protocol:
+----------------+----------------+----------------+
| Size (4 bytes) | API Key (2) | API Version (2)|
+----------------+----------------+----------------+
| Correlation ID | Client ID | Request Body |
+----------------+----------------+----------------+
Record Batch Format (v2)
+----------------+----------------+----------------+
| Base Offset | Batch Length | Partition Leader Epoch |
+----------------+----------------+----------------+
| Magic | CRC | Attributes |
+----------------+----------------+----------------+
| Last Offset | Base Timestamp | Max Timestamp |
+----------------+----------------+----------------+
| Producer ID | Producer Epoch | Base Sequence |
+----------------+----------------+----------------+
| Records Count | Records... |
+----------------+---------------------------------+
Compression
All four Kafka compression codecs are supported:
| Codec | Implementation | Characteristics |
|---|---|---|
| Gzip | flate2 |
Best ratio, slowest |
| Snappy | snap |
Good balance |
| LZ4 | lz4_flex |
Fastest |
| Zstd | zstd |
Best modern choice |
Network Layer
Connection Architecture
┌───────────────────────────────────────────────────────────────┐
│ ConnectionPool │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────┐ │
│ │ BrokerBundle(1) │ │ BrokerBundle(2) │ │ BrokerBundle │ │
│ │ ┌──────┬──────┐ │ │ ┌──────┬──────┐ │ │ (N...) │ │
│ │ │Conn 1│Conn 2│ │ │ │Conn 1│Conn 2│ │ │ │ │
│ │ └──────┴──────┘ │ │ └──────┴──────┘ │ │ │ │
│ │ Round-Robin │ │ Round-Robin │ │ │ │
│ └──────────────────┘ └──────────────────┘ └──────────────┘ │
└───────────────────────────────────────────────────────────────┘
For extreme high-throughput (>100k msg/s per broker), configure multiple connections:
let config = ConnectionConfig::builder()
.connections_per_broker(4) // 4 parallel connections
.build();
Priority Channels
Each connection maintains two request channels to prevent consumer group ejection during backpressure:
┌─────────────────────────────────────────────────────┐
│ BrokerConnection │
│ ┌───────────────────┐ ┌─────────────────────────┐ │
│ │ High-Priority Ch │ │ Normal-Priority Ch │ │
│ │ (Heartbeat, Meta) │ │ (Produce, Fetch, etc.) │ │
│ └─────────┬─────────┘ └───────────┬─────────────┘ │
│ │ biased select! │ │
│ └─────────►◄─────────────┘ │
│ │ │
│ ▼ │
│ TCP Stream │
└─────────────────────────────────────────────────────┘
High-priority requests (Heartbeat, Metadata, FindCoordinator, ApiVersions) are always processed first, ensuring consumer group membership is maintained even under heavy produce/fetch load.
Automatic Reconnection
When connections fail, Krafka automatically attempts to reconnect with exponential backoff:
- Max Retries: 3 (configurable)
- Initial Backoff: 100ms
- Max Backoff: 10 seconds
- Backoff Multiplier: 2.0x
Connection Failure
│
▼
┌───────────────┐
│ Wait 100ms │───► Retry 1
└───────────────┘
│ fail
▼
┌───────────────┐
│ Wait 200ms │───► Retry 2
└───────────────┘
│ fail
▼
┌───────────────┐
│ Wait 400ms │───► Retry 3
└───────────────┘
│ fail
▼
Return Error
The reconnection logic checks is_retriable() on errors to avoid retrying non-transient failures
(e.g., authentication errors, configuration errors).
Request/Response Flow
- Caller creates request struct
- Request is encoded to bytes
- Correlation ID is assigned
- Request is sent over TCP
- Response is received and framed
- Response is decoded and returned
// Internal flow
async fn send_request<R>(&self, request: R) -> Result<Response>
where
R: Into<Bytes>,
{
let correlation_id = self.correlation_id_gen.next();
let header = RequestHeader::new(api_key, version, correlation_id);
// Encode and send
let encoded = encode_request(header, request);
self.writer.write_all(&encoded).await?;
// Receive and decode
let response_bytes = self.read_response().await?;
decode_response(response_bytes)
}
Metadata Management
Metadata Caching
┌─────────────────────────────────────────────────────┐
│ ClusterMetadata │
│ ┌──────────────────────────────────────────────┐ │
│ │ Broker Cache │ │
│ │ { broker_id -> (host, port, rack) } │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Topic Cache │ │
│ │ { topic -> [partition metadata] } │ │
│ └──────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Leader Cache │ │
│ │ { (topic, partition) -> broker_id } │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
Metadata Refresh
- Automatic refresh when cache is stale (configurable TTL)
- Forced refresh on NotLeaderForPartition errors
- Topic-specific refresh when subscribing
Producer Architecture
Send Path
User Code Producer Broker
│ │ │
│ send(topic, key, value) │ │
│ ─────────────────────────> │ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ 1. Partition │ │
│ │ (murmur2 hash) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ 2. Build RecordBatch│ │
│ │ (compression) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ 3. Get Leader Conn │ │
│ └──────────┬──────────┘ │
│ │ │
│ │ ProduceRequest │
│ │ ─────────────────────────> │
│ │ │
│ │ ProduceResponse │
│ │ <───────────────────────── │
│ RecordMetadata │ │
│ <───────────────────────── │ │
Partitioning
// DefaultPartitioner (murmur2, Java-compatible)
fn partition(key: &[u8], partition_count: usize) -> i32 {
let hash = murmur2(key);
(hash as usize % partition_count) as i32
}
Consumer Architecture
Poll Path
User Code Consumer Broker
│ │ │
│ poll(timeout) │ │
│ ─────────────────────────> │ │
│ │ │
│ ┌──────────┴──────────┐ │
│ │ For each assigned │ │
│ │ partition: │ │
│ └──────────┬──────────┘ │
│ │ FetchRequest │
│ │ ─────────────────────────> │
│ │ │
│ │ FetchResponse │
│ │ <───────────────────────── │
│ ┌──────────┴──────────┐ │
│ │ Decompress & │ │
│ │ Decode Records │ │
│ └──────────┬──────────┘ │
│ Vec<ConsumerRecord> │ │
│ <───────────────────────── │ │
Consumer Group Protocol
┌────────────────────────────────────────────────────────────┐
│ Consumer Group Lifecycle │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Unjoined │───>│ Joining │───>│ Awaiting │ │
│ └──────────┘ └──────────┘ │ Sync │ │
│ ▲ └────┬─────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────┐ ┌──────────┐ │
│ └──────────│ Preparing│<───│ Stable │<─ Heartbeat │
│ │ Rebalance│ └──────────┘ │
│ └──────────┘ │
└────────────────────────────────────────────────────────────┘
Performance Optimizations
Hot Path Inlining
#[inline] annotations on critical paths:
- Protocol primitives: varint/varlong encoding and decoding
- Protocol primitives: i8, i16, i32, u32, i64, bool
- Request/response headers: encode_v0/v1/v2, decode_v0/v1
- Record encoding/decoding: Record::encode, Record::decode, RecordHeader encode/decode
- Hash functions: murmur2 for partition assignment
- Accessor methods: Consumer/Producer record getters
- Enum conversions: ApiKey, Compression, TimestampType, RecordBatchAttributes
- Error handling: ErrorCode to/from i16 conversions
- Utilities: CRC32C checksum, correlation ID generation
- Partitioners: All 4 partitioner implementations
- Batch operations: try_add, size checking methods
- Metadata lookups: partition_count, partition, leader lookups
- Predicates: is_empty, is_null, is_closed, is_retriable, is_ok, is_leader, is_alive
- Retry policy: calculate_backoff, should_retry, max_retries_reached
- Heartbeat controller: interval, session_timeout, is_running accessors
Cold Path Optimization
#[cold] annotations on error creation paths:
- Error constructors: protocol, auth, timeout, broker, config, compression, invalid_state, serialization
- Tells the compiler these paths are unlikely, improving branch prediction on hot paths
Zero-Copy Design
- Zero-copy buffers:
Bytesfor shared ownership without copying - Pre-allocated buffers: Capacity hints for vectors
- Efficient hashing: murmur2 for partitioning (Java-compatible)
Memory Model
Producer Record Journey
User Data (owned) Producer (borrowed) Wire (owned)
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Vec<u8> │ ─borrow─> │ &[u8] │ ─copy─> │ Bytes │
└─────────┘ └─────────┘ └─────────┘
Lazy Deserialization
LazyRecordBatch defers individual record parsing until access:
use krafka::protocol::LazyRecordBatch;
// Decode batch header but not records
let lazy = LazyRecordBatch::decode(&mut buf)?;
// Iterate and decode on demand
for result in lazy.records() {
let record = result?;
if should_process(&record) {
process(record);
}
}
// Or convert to eager batch if needed
let batch = lazy.into_record_batch()?;
Benefits:
- Avoids parsing records that will be filtered out
- Reduced memory allocation for streaming consumers
- Useful when filtering by offset before accessing key/value
Pre-allocation
Vec::with_capacity used throughout for known-size collections, capped at 10,000 elements to protect against malicious broker responses:
- Record batch building
- Response decoding
- Header collection
All protocol decoding paths cap Vec::with_capacity(len.min(10_000)) to prevent OOM from broker-supplied lengths.
Error Handling
Error Hierarchy
pub enum KrafkaError {
Protocol { message: String }, // Wire protocol errors
Broker { code: ErrorCode, message }, // Kafka error codes
Auth { message: String }, // Authentication failures
Timeout { operation: String }, // Operation timeouts
Compression { codec, source }, // Compression errors
Config { message: String }, // Configuration errors
InvalidState { message: String }, // State machine errors
Serialization { message, source }, // Encoding/decoding errors
}
Retriable Errors
Some errors are automatically retriable:
NotLeaderForPartition- Triggers metadata refreshLeaderNotAvailable- Wait and retry- Network timeouts - Retry with backoff
Thread Safety
All Krafka types are designed for concurrent use:
Producer:Send + Sync- can be shared across tasksConsumer:Send + Sync- can be shared across tasksAdminClient:Send + Sync- can be shared across tasks
Internal state is protected by:
RwLock<T>for read-heavy data (metadata, offsets)AtomicBoolfor flags (closed state)AtomicU8withcompare_exchangefor transaction state machineArc<T>for shared ownership (coordinator state shared with heartbeat task)
Connection pool uses a read-lock fast path for hot-path lookups, dropping all locks before network I/O during reconnection.
Benchmarks
Krafka includes comprehensive Criterion benchmarks in benches/:
Producer Benchmarks (benches/producer.rs)
- Record batch encoding: 1, 10, 100, 1000 records
- Compression codecs: None, Gzip, Snappy, LZ4, Zstd
- murmur2 hashing: Various key sizes (8, 32, 128, 512 bytes)
- Varint encoding: Signed and unsigned values
- Roundtrip latency: Single record encode/decode
- Partitioners: Default, RoundRobin, Sticky, Hash strategies
Consumer Benchmarks (benches/consumer.rs)
- Record batch decoding: 1, 10, 100, 500 records
- Decompression: All 4 compression codecs
- Record iteration: Iteration overhead for various batch sizes
- Lazy vs eager: Comparison showing 7.5x speedup for streaming
Protocol Benchmarks (benches/protocol.rs)
- Primitive encode/decode: i32, i64, bool operations
- Varint detailed: 1-5 byte encoding/decoding performance
- CRC32C checksum: 64B to 16KB data sizes
- Request headers: v0, v1, v2 encoding
- Error code conversions: from_i16, to_i16, is_retriable
- API key conversions: from_i16, to_i16
Run benchmarks with:
cargo bench
Implemented Features
Krafka includes the following production-ready features:
- ✅ Transactional Producer: Exactly-once semantics with
TransactionalProducer - ✅ TLS/SSL encryption: Secure connections with rustls and mTLS support
- ✅ AWS MSK IAM authentication: Native support with optional SDK integration
- ✅ SASL/SCRAM Authentication: SHA-256 and SHA-512 mechanisms
- ✅ Metrics and Observability: Producer, consumer, and connection metrics
- ✅ ACL Management: Create, describe, and delete ACLs
- ✅ Security Hardening: Secret zeroization, constant-time auth, PBKDF2 validation, decompression limits, allocation caps