Protocol & Versions Guide
This guide covers Krafka’s Kafka protocol implementation and API version negotiation.
Overview
Krafka implements the Kafka wire protocol with support for:
- Automatic API version negotiation
- Multiple protocol versions per API
- All standard compression codecs
- Zero-copy message handling
Version Negotiation
On connection, Krafka automatically fetches the broker’s supported API versions and stores them. This enables dynamic version negotiation for optimal compatibility and feature usage.
How It Works
- Client connects to broker
- Client sends
ApiVersionsrequest - Broker responds with supported API version ranges
- Client stores version ranges for future requests
- Each request negotiates the best version within the client’s
[MIN, MAX]range
Using Version Negotiation
use krafka::protocol::ApiKey;
// negotiate_api_version(api_key, max, min) clamps to client MIN..MAX and broker range.
let fetch_version = conn
.negotiate_api_version(ApiKey::Fetch, 12, 4)
.await
.expect("broker does not support any usable Fetch version");
println!("Using Fetch v{}", fetch_version);
Minimum Broker Version
Krafka requires Apache Kafka 3.9 or later. The MIN constants for all APIs are set so that pre-3.9 protocol features (e.g., Metadata v0, Produce v0-v2, Fetch v0-v3) are no longer supported. Connecting to an older broker will fail version negotiation for most APIs.
Client Supported Versions
Every API has a MIN and MAX constant in krafka::protocol::versions.
The client only encodes/decodes versions within [MIN, MAX]; versions outside
this range are rejected with a protocol error.
| API | Min | Max | Key Features |
|---|---|---|---|
| Produce | 3 | 11 | v3 transactions, v9 flexible encoding, v11 ZStd compression |
| Fetch | 4 | 12 | v4 isolation level, v7 fetch sessions (KIP-227), v9 leader epoch (KIP-320), v11 closest-replica (KIP-392), v12 flexible |
| ListOffsets | 1 | 8 | v1 timestamp queries, v2 isolation level, v4 leader epoch, v6 flexible, v7 max_timestamp, v8 tiered-storage |
| Metadata | 1 | 13 | v1 controller + rack, v7 leader epoch, v8 authorized-ops, v9 flexible, v10 topic UUIDs, v12 topic-ID lookup, v13 top-level error_code |
| OffsetCommit | 2 | 9 | v2 retention, v5 drops retention_time, v6 leader epoch, v8 flexible, v9 KIP-848 |
| OffsetFetch | 1 | 9 | v1 group coordinator, v2 top-level error, v6 flexible, v8 batched groups, v9 KIP-848 |
| FindCoordinator | 1 | 6 | v1 key_type, v3 flexible, v4 batched keys (KIP-699), v6 share groups (KIP-932) |
| JoinGroup | 4 | 9 | v4 group_instance_id (KIP-345), v6 flexible, v8 reason (KIP-800) |
| Heartbeat | 3 | 4 | v3 group_instance_id (KIP-345), v4 flexible |
| SyncGroup | 3 | 5 | v3 group_instance_id, v4 flexible, v5 protocol_type/name (KIP-559) |
| LeaveGroup | 3 | 5 | v3 batch leave (KIP-345), v4 flexible, v5 reason (KIP-800) |
| CreateTopics | 2 | 7 | v2 topic validation, v5 flexible, v7 topic_id (KIP-464, KIP-525) |
| DeleteTopics | 1 | 6 | v1 baseline, v4 flexible, v6 topic-ID-based deletion |
| CreatePartitions | 0 | 3 | v0 baseline, v2 flexible, v3 KIP-599 |
| DescribeConfigs | 0 | 4 | v1 synonyms, v3 config_type + documentation, v4 flexible |
| IncrementalAlterConfigs | 0 | 1 | v0 non-flexible, v1 flexible encoding |
| DescribeAcls | 1 | 3 | v1 prefixed ACLs, v2 flexible, v3 user resource type |
| CreateAcls | 1 | 3 | v1 prefixed ACLs, v2 flexible, v3 user resource type |
| DeleteAcls | 1 | 3 | v1 prefixed ACLs, v2 flexible, v3 user resource type |
| DescribeGroups | 1 | 6 | v3 authorized_operations, v4 static members, v5 flexible, v6 KIP-1043 |
| ListGroups | 1 | 5 | v3 flexible, v4 state filter (KIP-518), v5 type filter (KIP-848) |
| DeleteRecords | 0 | 2 | v0 baseline, v2 flexible encoding |
| OffsetForLeaderEpoch | 2 | 4 | v2 leader epoch validation, v3 replica_id, v4 flexible |
| InitProducerId | 0 | 4 | v0 idempotent, v2 flexible, v3 epoch recovery, v4 latest stable |
| AddPartitionsToTxn | 0 | 3 | v0 baseline, v3 flexible encoding |
| AddOffsetsToTxn | 0 | 3 | v0 baseline, v3 flexible encoding |
| EndTxn | 0 | 3 | v0 baseline, v3 flexible encoding |
| TxnOffsetCommit | 0 | 3 | v0 baseline, v2 leader epoch, v3 flexible + consumer fields |
| CreateDelegationToken | 1 | 3 | v2 flexible, v3 owner override |
| RenewDelegationToken | 1 | 2 | v2 flexible encoding |
| ExpireDelegationToken | 1 | 2 | v2 flexible encoding |
| DescribeDelegationToken | 1 | 3 | v2 flexible, v3 token requester |
| DescribeClientQuotas | 0 | 1 | v1 flexible encoding |
| AlterClientQuotas | 0 | 1 | v1 flexible encoding |
| DeleteGroups | 0 | 2 | Consumer group deletion |
| DescribeCluster | 0 | 2 | Cluster metadata |
| ApiVersions | 0 | 4 (5¹) | API version negotiation |
| ConsumerGroupHeartbeat | 0 | 1 | KIP-848 consumer group protocol, v1 KIP-1082 regex |
| ConsumerGroupDescribe | 0 | 1 | KIP-848 group description |
| DescribeTopicPartitions | 0 | 0 | Topic partition metadata (KIP-966) |
| UpdateFeatures | 0 | 1 | Cluster feature versioning (KIP-584), v1 UpgradeType + ValidateOnly |
| GetTelemetrySubscriptions² | 0 | 0 | KIP-714 client telemetry subscription discovery |
| PushTelemetry² | 0 | 0 | KIP-714 client telemetry push |
| ShareGroupHeartbeat¹ | 1 | 1 | KIP-932 share group heartbeat |
| ShareGroupDescribe¹ | 1 | 1 | KIP-932 share group description |
| ShareFetch¹ | 1 | 2 | KIP-932 share fetch, v2 acquire mode (KIP-1206) + renew ack (KIP-1222) |
| ShareAcknowledge¹ | 1 | 2 | KIP-932 share acknowledge, v2 renew ack (KIP-1222) |
¹ Requires
unstable-protocolfeature flag. Max shown in parentheses is the feature-gated max.² Requires
telemetryfeature flag.Note: Encode/decode implementations exist for even higher versions of some APIs (e.g., Produce up to v13, Fetch up to v18) but the negotiated MAX is set conservatively for topic-UUID-based paths until those have been integration-tested against a real broker.
Version Constants
Client-supported versions are defined in krafka::protocol::versions:
use krafka::protocol::versions;
// Each API has both MIN and MAX constants
let min_fetch = versions::FETCH_MIN; // 4 (Kafka 3.9+ baseline)
let max_fetch = versions::FETCH_MAX; // 12 (v12 flexible encoding)
let min_produce = versions::PRODUCE_MIN; // 3 (v3+ transactions)
let max_produce = versions::PRODUCE_MAX; // 11 (v11 ZStd compression)
let max_metadata = versions::METADATA_MAX; // 13 (v13 top-level error_code)
Record Batches
Krafka uses Kafka’s v2 record batch format with:
- Magic byte 2 (modern format)
- CRC32C checksums (validated on decode)
- Variable-length encoding for efficiency
- Optional compression (gzip, snappy, lz4, zstd)
Header Versioning
Every Kafka request/response is prefixed with a header whose format depends on whether the API version uses flexible encoding:
| Header state | Request header | Response header |
|---|---|---|
| Non-flexible | v1 — standard KafkaString for client_id |
v0 — correlation_id only |
| Flexible | v2 — compact string for client_id + tagged fields | v1 — correlation_id + tagged fields |
The transition version varies per API (e.g., Fetch becomes flexible at v12,
Produce at v9). ApiKey::flexible_version() returns the threshold for each API,
and the header is selected automatically by RequestHeader::encode() /
ResponseHeader::decode().
Note: ApiVersions response always uses header v0 regardless of the API
version (needed for protocol bootstrapping).
Unified Version Dispatch
Core request/response message types in krafka::protocol implement the VersionedEncode and VersionedDecode traits, which dispatch to the correct encode_vN/decode_vN method based on the protocol version number:
use krafka::protocol::{VersionedEncode, VersionedDecode, MetadataRequest, MetadataResponse};
let request = MetadataRequest::all_topics();
let mut buf = bytes::BytesMut::new();
// Encode for a specific protocol version — dispatches to the right encoder
request.encode_versioned(1, &mut buf)?;
// In real usage, `response_buf` would be filled with bytes read from the network.
let mut response_buf = buf.freeze();
// Decode response for a specific version
let response = MetadataResponse::decode_versioned(1, &mut response_buf)?;
Unsupported version numbers (including negative values) return a descriptive KrafkaError::protocol error.
Creating Records
use krafka::protocol::{RecordBatchBuilder, Compression};
let batch = RecordBatchBuilder::new()
.compression(Compression::Snappy)
.add_record(Some(b"key"), Some(b"value"), vec![])
.add_record(None, Some(b"value-only"), vec![])
.build()?;
Compression Support
| Codec | Feature | Notes |
|---|---|---|
| None | Default | No compression |
| Gzip | Always | Good compression, slower |
| Snappy | Always | Fast, moderate compression |
| LZ4 | Always | Very fast, good compression |
| Zstd | Always | Best compression, fast |
Note: Decompression output is capped at 128 MiB by default to protect against compression bombs. This limit is configurable via
ConsumerConfig::max_decompressed_size(). Compressed payloads that expand beyond the limit will return aKrafkaError::compressionerror.
Protocol Safety
Krafka protects against malicious or corrupted broker responses:
- Decode array bounds: Every array-length field decoded from the wire is validated against
MAX_DECODE_ARRAY_LEN(100,000), typically viacheck_decode_array_len()and in some specialized decode paths (e.g.,KafkaArray::decode, record batch counts) via equivalent local checks. These checks reject negative counts and oversized counts across all 63+ protocol-message decode sites,KafkaArraydecode paths, and record batch/header counts. The validation runs before anyVec::with_capacity()allocation, preventing both OOM and runaway decode loops. - Decompression limits: Decompressed record data is limited to 128 MiB (configurable) via streaming
.take()limits and post-decompression size checks - Record headers: Record headers are preserved during batch building — no silent data loss
- Encode validation: The
TryEncodetrait provides fallible encoding for protocol primitives (KafkaString,KafkaBytes,KafkaArray<T>whereT: TryEncode,TaggedFields), returning errors instead of panicking on oversized data.ProducerRecord::validate()checks wire-format limits at the API boundary before encoding - Fuzz testing: The
fuzz/directory provides cargo-fuzz targets forKafkaArraydecode,RecordBatchdecode, and response message decode across multiple API versions. Seefuzz/README.mdfor usage.
Wire Protocol
Request/Response Framing
+----------------+----------------+
| Size (4B) | Data (N bytes)|
+----------------+----------------+
All messages are length-prefixed with a 4-byte big-endian size field.
Request Header
+----------+----------+---------------+-------------+
| API Key | Version | Correlation ID| Client ID |
| (2 bytes)| (2 bytes)| (4 bytes) | (variable) |
+----------+----------+---------------+-------------+
Response Header
+---------------+
| Correlation ID|
| (4 bytes) |
+---------------+
Zero-Copy Design
Krafka uses bytes::Bytes throughout for zero-copy buffer management:
- Incoming data is parsed without copying
- Record payloads share underlying buffers
- Memory is released when last reference drops
Next Steps
- Producer Guide - Sending messages
- Consumer Guide - Receiving messages
- Configuration Reference - All settings