Share Consumer Guide
This guide covers the share consumer, which provides queue-like consumption semantics via KIP-932 share groups. Share groups are stable as of Apache Kafka 4.0+.
Overview
Share groups differ from traditional consumer groups in several key ways:
| Feature | Consumer Group | Share Group |
|---|---|---|
| Assignment | Client or server-side | Server-side only |
| Offset tracking | Per-partition committed offsets | Per-record acknowledgements |
| Delivery | Exactly-once (with transactions) | At-least-once |
| Record sharing | One consumer per partition | Multiple consumers per partition |
| Redelivery | Seek / reset offsets | Automatic (release/reject) |
Multiple consumers in the same share group receive non-overlapping subsets of records from the same partition — the server handles all assignment and delivery tracking.
Basic Usage
use krafka::share_consumer::{ShareConsumer, AcknowledgementMode};
use std::time::Duration;
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.build()
.await?;
consumer.subscribe(&["events"]).await?;
loop {
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
process(record);
}
// In Implicit mode (default), records are auto-accepted on next poll()
}
Acknowledgement Modes
Implicit (Default)
Records fetched by the previous poll() are automatically accepted when the next poll() is called. This is the simplest mode — no application-level acknowledgement logic is needed. Consecutive offsets for the same partition are coalesced into contiguous ranges to reduce wire overhead.
Explicit
The application controls acknowledgement per record. All records from the previous poll() must be acknowledged before calling poll() again — otherwise poll() returns an error. acknowledge() is one-shot per record: acknowledging the same record twice returns an error instead of sending duplicate broker intent. If a later commit_sync() or commit_async() flush fails, the consumer restores that batch locally and later poll() calls keep returning an error until the commit is retried successfully or the local share-consumer state is cleared.
use krafka::share_consumer::{ShareConsumer, AcknowledgementMode, AcknowledgeType};
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.acknowledgement_mode(AcknowledgementMode::Explicit)
.build()
.await?;
consumer.subscribe(&["events"]).await?;
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
match try_process(record) {
Ok(_) => consumer.acknowledge(record, AcknowledgeType::Accept).await?,
Err(_) => consumer.acknowledge(record, AcknowledgeType::Release).await?,
}
}
consumer.commit_sync().await?;
To acknowledge by topic/partition/offset directly — useful when a record fails to deserialize and you have no ConsumerRecord to pass:
consumer.acknowledge_by_offset("events", partition, offset, AcknowledgeType::Reject).await?;
For a timeout-bounded flush:
consumer.commit_sync_with_timeout(Duration::from_secs(5)).await?;
Acknowledge Types
| Type | Value | Meaning |
|---|---|---|
Accept |
1 | Record processed successfully |
Release |
2 | Record released for redelivery to another consumer |
Reject |
3 | Record rejected (moved to dead-letter after max retries) |
Delivery Count
Each ConsumerRecord includes a delivery_count field (populated from the server’s acquired-records metadata). This tells you how many times the record has been delivered, which is useful for implementing retry limits:
for record in &records {
if let Some(count) = record.delivery_count {
if count > 5 {
consumer.acknowledge(record, AcknowledgeType::Reject).await?;
continue;
}
}
process(record);
}
Async Commit
commit_async() returns a handle that resolves to the final commit outcome. This keeps the send off the caller’s immediate path while still surfacing transport, decode, and broker errors explicitly. If any failure occurs, the batch is restored locally for the next commit cycle rather than silently dropped:
consumer.commit_async().await?;
Streaming API
The share consumer also supports a Stream-based API:
use tokio_stream::StreamExt;
let mut stream = consumer.stream();
while let Some(record) = stream.next().await {
let record = record?;
process(&record);
}
Configuration
| Option | Default | Description |
|---|---|---|
bootstrap_servers |
(required) | Comma-separated broker addresses |
group_id |
(required) | Share group identifier |
client_id |
"krafka-share-consumer" |
Client identifier |
acknowledgement_mode |
Implicit |
Implicit or Explicit |
fetch_min_bytes |
1 |
Minimum bytes per fetch |
fetch_max_bytes |
52_428_800 |
Maximum bytes per fetch |
max_poll_records |
500 |
Maximum records per poll |
max_records |
-1 |
Server-side max records (-1 = no limit) |
batch_size |
0 |
Server-side batch size hint (0 = default) |
fetch_max_wait_ms |
500 |
Maximum wait time for fetch responses |
request_timeout |
30s |
Request timeout |
session_timeout |
45s |
Session timeout for group membership |
heartbeat_interval |
5s |
Heartbeat interval (must be < session_timeout) |
metadata_max_age |
5min |
Metadata cache TTL |
metadata_topic_cache_ttl |
Some(5min) |
TTL for topic entries in the partial-refresh cache. None disables eviction. Use disable_metadata_topic_cache_ttl() to opt out. |
client_rack |
None |
Rack ID for rack-aware fetching |
Metadata Topic Cache TTL
During a partial metadata refresh (where only the subscribed topics are re-fetched rather than the entire cluster), Krafka caches each topic’s metadata between refreshes. By default, a topic entry is evicted from this cache after 5 minutes of not being successfully refreshed — matching Java’s metadata.max.idle.ms — to prevent unbounded growth when topics are deleted or subscriptions change.
use krafka::share_consumer::ShareConsumer;
use std::time::Duration;
// Use a custom TTL (e.g. 10 minutes):
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.metadata_topic_cache_ttl(Duration::from_secs(600))
.build()
.await?;
// Opt out of TTL eviction entirely (topics persist until the cache is flushed):
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-share-group")
.disable_metadata_topic_cache_ttl()
.build()
.await?;
Note: TTL eviction only affects the partial-refresh cache. A full metadata refresh (triggered by
metadata_max_ageexpiry or an explicit refresh) always replaces the cache unconditionally.
Session Management
Share sessions (similar to fetch sessions from KIP-227) track per-broker state with epoch-based sequencing:
- Epoch 0: Opens a new session (full fetch)
- Epoch 1..N: Incremental fetches
- Epoch -1: Closes the session
Sessions are managed automatically. They reset on errors or assignment changes.
Concurrent Fetching
Each poll() issues ShareFetch requests to all assigned brokers concurrently by spawning one Tokio task per broker and awaiting the handles directly. Pending acknowledgements are piggybacked on fetch requests to reduce round trips. If a broker fetch fails, records from other brokers are still returned, the session for the failed broker is reset, and the unsent piggyback acknowledgements are restored for the next commit cycle.
Coordinator Handling
The share consumer discovers its group coordinator via FindCoordinator (key type = GROUP). The coordinator is cached and re-discovered automatically when:
- A heartbeat fails
- A
NOT_COORDINATORerror is received unsubscribe()orclose()is called
Lifecycle
// Create
let consumer = ShareConsumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.build()
.await?;
// Subscribe
consumer.subscribe(&["topic1", "topic2"]).await?;
// Consume
let records = consumer.poll(Duration::from_secs(1)).await?;
// Unsubscribe (leaves group, generates a new member ID)
consumer.unsubscribe().await;
// Close (idempotent)
consumer.close().await?;
Close Semantics
close() is terminal and returns the first cleanup error after local state and connections have still been closed:
- Implicit mode: all pending accept acks are converted to releases so acquired records return to the pool for redelivery by other consumers.
- Explicit mode: pending acks (accept/release/reject) are flushed as-is.
- Sends and validates a leave-group heartbeat.
- Clears all local state and closes connections.
Use close_with_timeout(duration) to bound each cleanup phase. If a phase exceeds duration / 2, it returns Err(KrafkaError::Timeout) but still closes local state and connections.
Wakeup & Cancellation
Call wakeup() from any thread or task to interrupt an in-progress poll() call:
// In another task:
consumer.wakeup();
// poll() returns Err with "wakeup() was called"
// The consumer remains fully usable for subsequent poll() calls.
wakeup() is safe to call concurrently with any other consumer method.
Unsubscribe Semantics
unsubscribe() attempts a best-effort leave-group heartbeat, logs any leave failure internally, clears all partition state (pending acks, sessions, coordinator), and generates a fresh member ID. The consumer can be resubscribed afterwards.
Wire Protocol
The share consumer uses four Kafka APIs (all feature-gated behind unstable-protocol):
| API | Key | Versions | Purpose |
|---|---|---|---|
| ShareGroupHeartbeat | 76 | v1 | Group membership and assignment |
| ShareGroupDescribe | 77 | v1 | Describe share group state |
| ShareFetch | 78 | v1–v2 | Fetch records with acquisition tracking |
| ShareAcknowledge | 79 | v1–v2 | Acknowledge processed records |
See the Protocol Reference for wire format details.