Consumer Groups
Coordinated consumption with automatic partition assignment and exactly-once delivery.
Table of contents
- Overview
- Architecture
- Basic Usage
- Assignment Strategies
- Static Membership
- Cooperative Rebalancing
- Offset Management
- Rebalance Listeners
- Group Coordinator
- Failure Detection
- Durability & Crash Recovery
- Monitoring
- Troubleshooting
Overview
Consumer groups enable multiple consumers to share the load of processing messages from a topic:
- Automatic partition assignment — Partitions distributed across group members
- Rebalancing — Partitions reassigned when members join/leave
- Offset tracking — Progress checkpointed for exactly-once delivery
- Failure detection — Heartbeat-based health monitoring
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ CONSUMER GROUP: "order-processors" │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Topic: orders (6 partitions) │
│ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │
│ │ P0 │ │ P1 │ │ P2 │ │ P3 │ │ P4 │ │ P5 │ │
│ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ │
│ │ │ │ │ │ │ │
│ └──────┼──────┘ └──────┼──────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Consumer 1 │ │ Consumer 2 │ │ Consumer 3 │ │
│ │ P0, P1, P2 │ │ P3, P4 │ │ P5 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Basic Usage
CLI
# Consume with consumer group
rivven consume orders \
--group order-processors \
--from-beginning
# List consumer groups
rivven group list
# Describe group
rivven group describe order-processors
# Reset offsets
rivven group reset-offsets order-processors \
--topic orders \
--to-earliest
Rust Client
use rivven_client::{Consumer, ConsumerConfig};
let config = ConsumerConfig::builder()
.bootstrap_servers(vec!["rivven:9092".into()])
.group_id("order-processors")
.auto_offset_reset("earliest")
.build()?;
let consumer = Consumer::new(config).await?;
consumer.subscribe(&["orders"]).await?;
loop {
let records = consumer.poll(Duration::from_millis(100)).await?;
for record in records {
process(&record).await?;
}
consumer.commit().await?;
}
Assignment Strategies
Range (Default)
Assigns contiguous partition ranges to each consumer:
Topic: orders (6 partitions), 3 consumers
Consumer 1: P0, P1 (partitions 0-1)
Consumer 2: P2, P3 (partitions 2-3)
Consumer 3: P4, P5 (partitions 4-5)
Best for: Even partition distribution with sorted data
Round-Robin
Distributes partitions one-by-one across consumers:
Topic: orders (6 partitions), 3 consumers
Consumer 1: P0, P3
Consumer 2: P1, P4
Consumer 3: P2, P5
Best for: Uniform load distribution across multiple topics
Sticky
Minimizes partition movement during rebalances while guaranteeing KIP-54 fairness (max-difference-of-1):
Before (2 consumers):
Consumer 1: P0, P1, P2
Consumer 2: P3, P4, P5
After adding Consumer 3 (sticky):
Consumer 1: P0, P1 ← kept P0, P1
Consumer 2: P3, P4 ← kept P3, P4
Consumer 3: P2, P5 ← received partitions from both
The algorithm uses a 3-step approach:
- Preserve: Previous assignments that still exist are kept
- Trim excess: Over-provisioned members (more than ⌈total/n⌉ partitions) donate excess to the unassigned pool
- Distribute: Unassigned partitions go to under-provisioned members (fewest-first)
This guarantees every member receives partitions and the max difference between any two members is 1.
Best for: Stateful consumers, minimizing rebalance impact
Configuration
# Consumer configuration
consumer:
group_id: order-processors
partition_assignment_strategy: sticky # range, round_robin, sticky
Static Membership
Static membership provides stable consumer identity across restarts, essential for Kubernetes deployments.
The Problem
Without static membership, every pod restart triggers a full rebalance:
Pod restart → New member_id → REBALANCE → All consumers stop → Reassign ALL partitions
This causes:
- Service interruption during rebalance
- Duplicate processing (uncommitted work)
- Wasted computation rebuilding state
The Solution
With static membership, pods maintain identity:
Pod restart → Same group.instance.id → REJOIN → Keep previous assignment
Configuration
# Each consumer instance needs a unique, stable ID
consumer:
group_id: order-processors
group_instance_id: order-processor-0 # Stable across restarts
session_timeout_ms: 45000 # Longer timeout for planned restarts
Kubernetes StatefulSet Example
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: order-processor
spec:
replicas: 3
selector:
matchLabels:
app: order-processor
template:
spec:
containers:
- name: processor
env:
- name: RIVVEN_GROUP_INSTANCE_ID
valueFrom:
fieldRef:
fieldPath: metadata.name # order-processor-0, order-processor-1, etc.
- name: RIVVEN_GROUP_ID
value: order-processors
Behavior
| Event | Without Static Membership | With Static Membership |
|---|---|---|
| Pod restart | Full rebalance | Rejoin, keep assignment |
| Rolling update | N rebalances | 0 rebalances (if within session timeout) |
| Scale down | Rebalance after leave | Rebalance after session timeout |
| Network partition | Quick failover | Wait for session timeout |
Cooperative Rebalancing
Cooperative rebalancing eliminates stop-the-world rebalances by using incremental partition reassignment.
Traditional (Eager) Rebalancing
1. Rebalance triggered
2. ALL consumers STOP and revoke ALL partitions
3. Leader computes new assignment
4. ALL consumers receive new assignment and RESTART
────────────────────────────────────
Time →
[===STOP===][===IDLE===][===START===]
Total downtime
Cooperative Rebalancing
Generation correctness: complete_cooperative_rebalance bumps the generation only for the two-phase revocation path; the no-revocation fast path delegates to complete_rebalance (which does not bump, since transition_to_preparing_rebalance already did). Empty and Dead state transitions now properly move to PreparingRebalance rather than skipping rebalance phases.
1. First rebalance: Identify partitions that need to move
2. ONLY affected consumers revoke ONLY moving partitions
3. Second rebalance: Assign revoked partitions to new owners
4. Non-moving partitions continue processing throughout
────────────────────────────────────
Time →
Consumer 1: [======RUNNING======] (no change)
Consumer 2: [===][REVOKE][======] (brief pause for P3)
Consumer 3: [===][......][======] (waits for P3)
Configuration
consumer:
group_id: order-processors
partition_assignment_strategy: cooperative_sticky
# OR
rebalance_protocol: cooperative
Supported Strategies
| Strategy | Protocol | Behavior |
|---|---|---|
range |
Eager | Full revoke |
round_robin |
Eager | Full revoke |
sticky |
Eager | Minimizes movement, but full revoke |
cooperative_sticky |
Cooperative | Incremental, minimal disruption |
Requirements
- All group members must support cooperative protocol
- If any member uses eager protocol, the entire group falls back to eager
- Protocol negotiated during JoinGroup
Offset Management
Automatic Commit
consumer:
enable_auto_commit: true
auto_commit_interval_ms: 5000
Manual Commit
// Commit after processing each batch
loop {
let records = consumer.poll(Duration::from_millis(100)).await?;
for record in records {
process(&record).await?;
}
consumer.commit().await?; // Synchronous commit
}
// Or commit specific offsets
consumer.commit_offsets(&[
TopicPartitionOffset::new("orders", 0, 12345),
]).await?;
Commit Within Transaction
For exactly-once semantics, commit offsets atomically with output:
let txn = producer.begin_transaction().await?;
// Process and produce
for record in consumer.poll(timeout).await? {
let output = transform(&record);
producer.send(&output).await?;
}
// Commit offsets and output atomically
txn.commit_offsets(consumer.position()).await?;
txn.commit().await?;
Rebalance Listeners
React to partition assignments and revocations:
use rivven_client::{Consumer, RebalanceListener};
struct MyListener;
impl RebalanceListener for MyListener {
fn on_partitions_assigned(&self, partitions: &[TopicPartition]) {
println!("Assigned: {:?}", partitions);
// Initialize state for new partitions
}
fn on_partitions_revoked(&self, partitions: &[TopicPartition]) {
println!("Revoked: {:?}", partitions);
// Commit offsets, flush state
}
}
consumer.set_rebalance_listener(MyListener);
Group Coordinator
Each consumer group has a coordinator broker responsible for:
- Managing group membership
- Triggering rebalances
- Storing committed offsets
- Processing JoinGroup/SyncGroup/Heartbeat/LeaveGroup protocol messages
Coordination Protocol
The consumer group coordination protocol uses four request/response pairs:
Consumer GroupCoordinator Consumer (leader)
│ │ │
│── JoinGroup ─────────────────►│◄──── JoinGroup ───────────────│
│ │ (collects all members, │
│ │ selects leader, │
│ │ assigns generation ID) │
│◄─ JoinGroupResult ───────────│──── JoinGroupResult ──────────►│
│ (member_id, generation) │ (member_id, generation, │
│ │ + full member list) │
│ │ │
│── SyncGroup(empty) ──────────►│◄── SyncGroup(assignments) ───│
│ │ (leader sends partition │
│ │ assignments for all members)│
│◄─ SyncGroupResult ───────────│──── SyncGroupResult ──────────►│
│ (my partitions) │ (leader's partitions) │
│ │ │
│── Heartbeat ─────────────────►│ │
│◄─ HeartbeatResult(0=OK) ─────│ │
│ or (27=REBALANCE) │ │
│ │ │
│── LeaveGroup ────────────────►│ │
│◄─ LeaveGroupResult ──────────│ │
JoinGroup: Registers a consumer with the coordinator. The coordinator collects all members, selects a leader, assigns a new generation ID, and transitions the group to CompletingRebalance. Generation correctness: The generation ID is only bumped when transitioning from the Stable state to PreparingRebalance, not from CompletingRebalance. This ensures all members joining during the initial join phase share the same generation, avoiding generation-mismatch errors during rapid rejoin sequences.
SyncGroup: The leader computes partition assignments (range-based by default) and sends them via SyncGroup. The coordinator distributes each member’s assignment in the response. Follower barrier: If a follower calls SyncGroup before the leader has submitted assignments, the coordinator blocks the follower via tokio::sync::Notify until the leader’s assignments are applied (subject to the rebalance timeout). The follower’s Notified future is registered before acquiring the groups lock (barrier.notified() + enable()), eliminating a race where the leader’s notify_waiters() could fire between the lock drop and .await, which previously caused followers to stall for the full 30-second rebalance timeout. This prevents followers from receiving empty assignments due to a race condition.
Heartbeat: Periodic keep-alive. Returns error code 0 (OK) or 27 (REBALANCE_IN_PROGRESS) to signal that the consumer must rejoin.
LeaveGroup: Graceful departure. Removes the member and triggers a rebalance for remaining members. If the group becomes empty, it transitions to Dead and is cleaned up.
Finding the Coordinator
rivven group describe order-processors
# Output:
# Group: order-processors
# Coordinator: broker-2 (10.0.0.3:9092)
# State: Stable
# Members: 3
Coordinator Selection
The coordinator is determined by hashing the group ID:
coordinator_partition = hash(group_id) % __consumer_offsets_partitions
coordinator_broker = leader_of(coordinator_partition)
Failure Detection
Heartbeats
Consumers send periodic Heartbeat requests to the coordinator to prove liveness.
The coordinator responds with error code 0 (OK) or 27 (REBALANCE_IN_PROGRESS):
consumer:
heartbeat_interval_ms: 3000 # Send heartbeat every 3s
session_timeout_ms: 10000 # Coordinator waits 10s before declaring dead
Session Timeout
If no heartbeat received within session_timeout_ms:
- Coordinator marks member as dead
- Triggers rebalance
- Partitions reassigned to remaining members
Max Poll Interval
Protects against stuck consumers:
consumer:
max_poll_interval_ms: 300000 # 5 minutes max between polls
If poll() not called within this interval, consumer is considered failed.
Durability & Crash Recovery
Consumer group state is persisted after every state change to ensure crash recovery:
- Join: Group membership, generation IDs, and partition assignments are persisted whenever a member joins (not just when the group is first created).
- Leave: When a member leaves gracefully, the updated membership is persisted immediately.
- Timeout: When a member is removed due to heartbeat timeout, the updated group state is persisted before the rebalance protocol completes.
- Offsets: Committed offsets are persisted to the backend storage on every
CommitOffsetcall.
On coordinator restart, the last persisted group state is restored. Members that were removed (via leave or timeout) do not reappear, and the most recent generation ID is preserved.
Monitoring
Key Metrics
| Metric | Description |
|---|---|
rivven_consumer_group_members |
Number of active members |
rivven_consumer_group_lag |
Messages behind head |
rivven_consumer_group_rebalances_total |
Rebalance count |
rivven_consumer_group_rebalance_duration_seconds |
Rebalance time |
rivven_consumer_commit_latency_seconds |
Offset commit latency |
Alerting
# Alert on consumer lag
- alert: RivvenConsumerLag
expr: rivven_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer group is lagging"
# Alert on frequent rebalances
- alert: RivvenFrequentRebalances
expr: rate(rivven_consumer_group_rebalances_total[1h]) > 10
labels:
severity: warning
annotations:
summary: "Frequent rebalances in "
CLI Monitoring
# Watch consumer lag
rivven group lag order-processors --watch
# Output:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 12345 12350 5
# orders 1 23456 23460 4
# orders 2 34567 34600 33
Troubleshooting
Frequent Rebalances
Symptoms: Consumer constantly rejoining, processing interrupted
Causes:
max.poll.interval.mstoo short for processing time- Network instability
- Long GC pauses
Solutions:
consumer:
max_poll_interval_ms: 600000 # Increase to 10 minutes
session_timeout_ms: 60000 # Longer session timeout
heartbeat_interval_ms: 10000 # Less frequent heartbeats
Consumer Lag Growing
Symptoms: Lag increases over time
Causes:
- Processing too slow
- Not enough consumers
- Partition imbalance
Solutions:
- Add more consumers (up to partition count)
- Increase partition count
- Optimize processing logic
- Use
cooperative_stickyto reduce rebalance impact
Duplicate Processing
Symptoms: Same messages processed multiple times
Causes:
- Auto-commit with processing failure
- Rebalance before commit
Solutions:
- Use manual commit after processing
- Use transactional consumers for exactly-once
- Make processing idempotent