Protocol & Versions Guide
This guide covers Krafka’s Kafka protocol implementation and API version negotiation.
Overview
Krafka implements the Kafka wire protocol with support for:
- Automatic API version negotiation
- Multiple protocol versions per API
- All standard compression codecs
- Zero-copy message handling
Version Negotiation
On connection, Krafka automatically fetches the broker’s supported API versions and stores them. This enables dynamic version negotiation for optimal compatibility and feature usage.
How It Works
- Client connects to broker
- Client sends
ApiVersionsrequest - Broker responds with supported API version ranges
- Client stores version ranges for future requests
- Each request can negotiate the best version
Using Version Negotiation
use krafka::protocol::ApiKey;
// After connection, negotiate the best Fetch version
// Client supports v4-v12, broker might support v0-v13
let version = conn.negotiate_api_version(ApiKey::Fetch, 12, 4).await;
match version {
Some(v) => println!("Using Fetch v{}", v),
None => println!("No compatible version found!"),
}
// Convenience method with min=0
let version = conn.negotiate_api_version_max(ApiKey::Produce, 9).await;
Client Supported Versions
Krafka supports the following API version ranges (clamped to match actual encode/decode implementations):
| API | Min | Max | Key Features |
|---|---|---|---|
| Produce | 0 | 3 | v3 transactions, headers |
| Fetch | 0 | 4 | v4 leader epoch |
| ListOffsets | 0 | 2 | v2 isolation level |
| Metadata | 0 | 1 | v1 controller info |
| OffsetCommit | 0 | 2 | v2 retention |
| OffsetFetch | 0 | 1 | v1 group coordinator |
| FindCoordinator | 0 | 0 | Group coordinator lookup |
| JoinGroup | 0 | 5 | v5 group instance id |
| Heartbeat | 0 | 0 | Standard heartbeat |
| SyncGroup | 0 | 3 | v3 group instance id |
| LeaveGroup | 0 | 1 | v1 with response |
| CreateTopics | 0 | 3 | Topic creation |
| DeleteTopics | 0 | 1 | Topic deletion |
| DescribeConfigs | 0 | 1 | Config reading |
| AlterConfigs | 0 | 1 | Config updates |
| InitProducerId | 0 | 1 | Idempotent/transactional |
Version Constants
Client-supported versions are defined in krafka::protocol::versions:
use krafka::protocol::versions;
// Maximum versions the client supports
let max_fetch = versions::FETCH_MAX; // 4
let max_produce = versions::PRODUCE_MAX; // 3
let max_metadata = versions::METADATA_MAX; // 1
Record Batches
Krafka uses Kafka’s v2 record batch format with:
- Magic byte 2 (modern format)
- CRC32C checksums (validated on decode)
- Variable-length encoding for efficiency
- Optional compression (gzip, snappy, lz4, zstd)
Creating Records
use krafka::protocol::{RecordBatchBuilder, Compression};
let batch = RecordBatchBuilder::new()
.compression(Compression::Snappy)
.add_record(Some(b"key"), Some(b"value"), vec![])
.add_record(None, Some(b"value-only"), vec![])
.build()?;
Compression Support
| Codec | Feature | Notes |
|---|---|---|
| None | Default | No compression |
| Gzip | Always | Good compression, slower |
| Snappy | Always | Fast, moderate compression |
| LZ4 | Always | Very fast, good compression |
| Zstd | Always | Best compression, fast |
Note: Decompression output is capped at 128 MiB to protect against compression bombs. Compressed payloads that expand beyond this limit will return a
KrafkaError::compressionerror.
Protocol Safety
Krafka protects against malicious or corrupted broker responses:
- Allocation caps: All
Vec::with_capacity()calls in protocol decoding are capped at 10,000 elements, preventing OOM from broker-supplied lengths - Decompression limits: Decompressed record data is limited to 128 MiB via streaming
.take()limits and post-decompression size checks - Record headers: Record headers are preserved during batch building — no silent data loss
Wire Protocol
Request/Response Framing
+----------------+----------------+
| Size (4B) | Data (N bytes)|
+----------------+----------------+
All messages are length-prefixed with a 4-byte big-endian size field.
Request Header
+----------+----------+---------------+-------------+
| API Key | Version | Correlation ID| Client ID |
| (2 bytes)| (2 bytes)| (4 bytes) | (variable) |
+----------+----------+---------------+-------------+
Response Header
+---------------+
| Correlation ID|
| (4 bytes) |
+---------------+
Zero-Copy Design
Krafka uses bytes::Bytes throughout for zero-copy buffer management:
- Incoming data is parsed without copying
- Record payloads share underlying buffers
- Memory is released when last reference drops
Next Steps
- Producer Guide - Sending messages
- Consumer Guide - Receiving messages
- Configuration Reference - All settings