Consumer Groups

Coordinated consumption with automatic partition assignment and exactly-once delivery.

Table of contents

  1. Overview
  2. Architecture
  3. Basic Usage
    1. CLI
    2. Rust Client
  4. Assignment Strategies
    1. Range (Default)
    2. Round-Robin
    3. Sticky
    4. Configuration
  5. Static Membership
    1. The Problem
    2. The Solution
    3. Configuration
    4. Kubernetes StatefulSet Example
    5. Behavior
  6. Cooperative Rebalancing
    1. Traditional (Eager) Rebalancing
    2. Cooperative Rebalancing
    3. Configuration
    4. Supported Strategies
    5. Requirements
  7. Offset Management
    1. Automatic Commit
    2. Manual Commit
    3. Commit Within Transaction
  8. Rebalance Listeners
  9. Group Coordinator
    1. Coordination Protocol
    2. Finding the Coordinator
    3. Coordinator Selection
  10. Failure Detection
    1. Heartbeats
    2. Session Timeout
    3. Max Poll Interval
  11. Durability & Crash Recovery
  12. Monitoring
    1. Key Metrics
    2. Alerting
    3. CLI Monitoring
  13. Troubleshooting
    1. Frequent Rebalances
    2. Consumer Lag Growing
    3. Duplicate Processing

Overview

Consumer groups enable multiple consumers to share the load of processing messages from a topic:

  • Automatic partition assignment — Partitions distributed across group members
  • Rebalancing — Partitions reassigned when members join/leave
  • Offset tracking — Progress checkpointed for exactly-once delivery
  • Failure detection — Heartbeat-based health monitoring

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                     CONSUMER GROUP: "order-processors"              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Topic: orders (6 partitions)                                      │
│   ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐                        │
│   │ P0 │ │ P1 │ │ P2 │ │ P3 │ │ P4 │ │ P5 │                        │
│   └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘ └──┬─┘                        │
│      │      │      │      │      │      │                           │
│      └──────┼──────┘      └──────┼──────┘                           │
│             │                    │                                  │
│             ▼                    ▼                                  │
│      ┌────────────┐       ┌────────────┐       ┌────────────┐      │
│      │ Consumer 1 │       │ Consumer 2 │       │ Consumer 3 │      │
│      │ P0, P1, P2 │       │   P3, P4   │       │     P5     │      │
│      └────────────┘       └────────────┘       └────────────┘      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Basic Usage

CLI

# Consume with consumer group
rivven consume orders \
  --group order-processors \
  --from-beginning

# List consumer groups
rivven group list

# Describe group
rivven group describe order-processors

# Reset offsets
rivven group reset-offsets order-processors \
  --topic orders \
  --to-earliest

Rust Client

use rivven_client::{Consumer, ConsumerConfig};

let config = ConsumerConfig::builder()
    .bootstrap_servers(vec!["rivven:9092".into()])
    .group_id("order-processors")
    .auto_offset_reset("earliest")
    .build()?;

let consumer = Consumer::new(config).await?;
consumer.subscribe(&["orders"]).await?;

loop {
    let records = consumer.poll(Duration::from_millis(100)).await?;
    for record in records {
        process(&record).await?;
    }
    consumer.commit().await?;
}

Assignment Strategies

Range (Default)

Assigns contiguous partition ranges to each consumer:

Topic: orders (6 partitions), 3 consumers

Consumer 1: P0, P1  (partitions 0-1)
Consumer 2: P2, P3  (partitions 2-3)  
Consumer 3: P4, P5  (partitions 4-5)

Best for: Even partition distribution with sorted data

Round-Robin

Distributes partitions one-by-one across consumers:

Topic: orders (6 partitions), 3 consumers

Consumer 1: P0, P3
Consumer 2: P1, P4
Consumer 3: P2, P5

Best for: Uniform load distribution across multiple topics

Sticky

Minimizes partition movement during rebalances while guaranteeing KIP-54 fairness (max-difference-of-1):

Before (2 consumers):
  Consumer 1: P0, P1, P2
  Consumer 2: P3, P4, P5

After adding Consumer 3 (sticky):
  Consumer 1: P0, P1      ← kept P0, P1
  Consumer 2: P3, P4      ← kept P3, P4
  Consumer 3: P2, P5      ← received partitions from both

The algorithm uses a 3-step approach:

  1. Preserve: Previous assignments that still exist are kept
  2. Trim excess: Over-provisioned members (more than ⌈total/n⌉ partitions) donate excess to the unassigned pool
  3. Distribute: Unassigned partitions go to under-provisioned members (fewest-first)

This guarantees every member receives partitions and the max difference between any two members is 1.

Best for: Stateful consumers, minimizing rebalance impact

Configuration

# Consumer configuration
consumer:
  group_id: order-processors
  partition_assignment_strategy: sticky  # range, round_robin, sticky

Static Membership

Static membership provides stable consumer identity across restarts, essential for Kubernetes deployments.

The Problem

Without static membership, every pod restart triggers a full rebalance:

Pod restart → New member_id → REBALANCE → All consumers stop → Reassign ALL partitions

This causes:

  • Service interruption during rebalance
  • Duplicate processing (uncommitted work)
  • Wasted computation rebuilding state

The Solution

With static membership, pods maintain identity:

Pod restart → Same group.instance.id → REJOIN → Keep previous assignment

Configuration

# Each consumer instance needs a unique, stable ID
consumer:
  group_id: order-processors
  group_instance_id: order-processor-0  # Stable across restarts
  session_timeout_ms: 45000             # Longer timeout for planned restarts

Kubernetes StatefulSet Example

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: order-processor
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-processor
  template:
    spec:
      containers:
        - name: processor
          env:
            - name: RIVVEN_GROUP_INSTANCE_ID
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name  # order-processor-0, order-processor-1, etc.
            - name: RIVVEN_GROUP_ID
              value: order-processors

Behavior

Event Without Static Membership With Static Membership
Pod restart Full rebalance Rejoin, keep assignment
Rolling update N rebalances 0 rebalances (if within session timeout)
Scale down Rebalance after leave Rebalance after session timeout
Network partition Quick failover Wait for session timeout

Cooperative Rebalancing

Cooperative rebalancing eliminates stop-the-world rebalances by using incremental partition reassignment.

Traditional (Eager) Rebalancing

1. Rebalance triggered
2. ALL consumers STOP and revoke ALL partitions
3. Leader computes new assignment
4. ALL consumers receive new assignment and RESTART
   
   ────────────────────────────────────
   Time →
   [===STOP===][===IDLE===][===START===]
            Total downtime

Cooperative Rebalancing

Generation correctness: complete_cooperative_rebalance bumps the generation only for the two-phase revocation path; the no-revocation fast path delegates to complete_rebalance (which does not bump, since transition_to_preparing_rebalance already did). Empty and Dead state transitions now properly move to PreparingRebalance rather than skipping rebalance phases.

1. First rebalance: Identify partitions that need to move
2. ONLY affected consumers revoke ONLY moving partitions
3. Second rebalance: Assign revoked partitions to new owners
4. Non-moving partitions continue processing throughout

   ────────────────────────────────────
   Time →
   Consumer 1: [======RUNNING======]  (no change)
   Consumer 2: [===][REVOKE][======]  (brief pause for P3)
   Consumer 3: [===][......][======]  (waits for P3)

Configuration

consumer:
  group_id: order-processors
  partition_assignment_strategy: cooperative_sticky
  # OR
  rebalance_protocol: cooperative

Supported Strategies

Strategy Protocol Behavior
range Eager Full revoke
round_robin Eager Full revoke
sticky Eager Minimizes movement, but full revoke
cooperative_sticky Cooperative Incremental, minimal disruption

Requirements

  • All group members must support cooperative protocol
  • If any member uses eager protocol, the entire group falls back to eager
  • Protocol negotiated during JoinGroup

Offset Management

Automatic Commit

consumer:
  enable_auto_commit: true
  auto_commit_interval_ms: 5000

Manual Commit

// Commit after processing each batch
loop {
    let records = consumer.poll(Duration::from_millis(100)).await?;
    for record in records {
        process(&record).await?;
    }
    consumer.commit().await?;  // Synchronous commit
}

// Or commit specific offsets
consumer.commit_offsets(&[
    TopicPartitionOffset::new("orders", 0, 12345),
]).await?;

Commit Within Transaction

For exactly-once semantics, commit offsets atomically with output:

let txn = producer.begin_transaction().await?;

// Process and produce
for record in consumer.poll(timeout).await? {
    let output = transform(&record);
    producer.send(&output).await?;
}

// Commit offsets and output atomically
txn.commit_offsets(consumer.position()).await?;
txn.commit().await?;

Rebalance Listeners

React to partition assignments and revocations:

use rivven_client::{Consumer, RebalanceListener};

struct MyListener;

impl RebalanceListener for MyListener {
    fn on_partitions_assigned(&self, partitions: &[TopicPartition]) {
        println!("Assigned: {:?}", partitions);
        // Initialize state for new partitions
    }
    
    fn on_partitions_revoked(&self, partitions: &[TopicPartition]) {
        println!("Revoked: {:?}", partitions);
        // Commit offsets, flush state
    }
}

consumer.set_rebalance_listener(MyListener);

Group Coordinator

Each consumer group has a coordinator broker responsible for:

  • Managing group membership
  • Triggering rebalances
  • Storing committed offsets
  • Processing JoinGroup/SyncGroup/Heartbeat/LeaveGroup protocol messages

Coordination Protocol

The consumer group coordination protocol uses four request/response pairs:

Consumer                    GroupCoordinator                 Consumer (leader)
   │                               │                               │
   │── JoinGroup ─────────────────►│◄──── JoinGroup ───────────────│
   │                               │  (collects all members,       │
   │                               │   selects leader,             │
   │                               │   assigns generation ID)      │
   │◄─ JoinGroupResult ───────────│──── JoinGroupResult ──────────►│
   │   (member_id, generation)     │   (member_id, generation,     │
   │                               │    + full member list)         │
   │                               │                               │
   │── SyncGroup(empty) ──────────►│◄── SyncGroup(assignments) ───│
   │                               │  (leader sends partition      │
   │                               │   assignments for all members)│
   │◄─ SyncGroupResult ───────────│──── SyncGroupResult ──────────►│
   │   (my partitions)             │   (leader's partitions)       │
   │                               │                               │
   │── Heartbeat ─────────────────►│                               │
   │◄─ HeartbeatResult(0=OK) ─────│                               │
   │   or (27=REBALANCE)           │                               │
   │                               │                               │
   │── LeaveGroup ────────────────►│                               │
   │◄─ LeaveGroupResult ──────────│                               │

JoinGroup: Registers a consumer with the coordinator. The coordinator collects all members, selects a leader, assigns a new generation ID, and transitions the group to CompletingRebalance. Generation correctness: The generation ID is only bumped when transitioning from the Stable state to PreparingRebalance, not from CompletingRebalance. This ensures all members joining during the initial join phase share the same generation, avoiding generation-mismatch errors during rapid rejoin sequences.

SyncGroup: The leader computes partition assignments (range-based by default) and sends them via SyncGroup. The coordinator distributes each member’s assignment in the response. Follower barrier: If a follower calls SyncGroup before the leader has submitted assignments, the coordinator blocks the follower via tokio::sync::Notify until the leader’s assignments are applied (subject to the rebalance timeout). The follower’s Notified future is registered before acquiring the groups lock (barrier.notified() + enable()), eliminating a race where the leader’s notify_waiters() could fire between the lock drop and .await, which previously caused followers to stall for the full 30-second rebalance timeout. This prevents followers from receiving empty assignments due to a race condition.

Heartbeat: Periodic keep-alive. Returns error code 0 (OK) or 27 (REBALANCE_IN_PROGRESS) to signal that the consumer must rejoin.

LeaveGroup: Graceful departure. Removes the member and triggers a rebalance for remaining members. If the group becomes empty, it transitions to Dead and is cleaned up.

Finding the Coordinator

rivven group describe order-processors

# Output:
# Group: order-processors
# Coordinator: broker-2 (10.0.0.3:9092)
# State: Stable
# Members: 3

Coordinator Selection

The coordinator is determined by hashing the group ID:

coordinator_partition = hash(group_id) % __consumer_offsets_partitions
coordinator_broker = leader_of(coordinator_partition)

Failure Detection

Heartbeats

Consumers send periodic Heartbeat requests to the coordinator to prove liveness. The coordinator responds with error code 0 (OK) or 27 (REBALANCE_IN_PROGRESS):

consumer:
  heartbeat_interval_ms: 3000   # Send heartbeat every 3s
  session_timeout_ms: 10000     # Coordinator waits 10s before declaring dead

Session Timeout

If no heartbeat received within session_timeout_ms:

  1. Coordinator marks member as dead
  2. Triggers rebalance
  3. Partitions reassigned to remaining members

Max Poll Interval

Protects against stuck consumers:

consumer:
  max_poll_interval_ms: 300000  # 5 minutes max between polls

If poll() not called within this interval, consumer is considered failed.


Durability & Crash Recovery

Consumer group state is persisted after every state change to ensure crash recovery:

  • Join: Group membership, generation IDs, and partition assignments are persisted whenever a member joins (not just when the group is first created).
  • Leave: When a member leaves gracefully, the updated membership is persisted immediately.
  • Timeout: When a member is removed due to heartbeat timeout, the updated group state is persisted before the rebalance protocol completes.
  • Offsets: Committed offsets are persisted to the backend storage on every CommitOffset call.

On coordinator restart, the last persisted group state is restored. Members that were removed (via leave or timeout) do not reappear, and the most recent generation ID is preserved.


Monitoring

Key Metrics

Metric Description
rivven_consumer_group_members Number of active members
rivven_consumer_group_lag Messages behind head
rivven_consumer_group_rebalances_total Rebalance count
rivven_consumer_group_rebalance_duration_seconds Rebalance time
rivven_consumer_commit_latency_seconds Offset commit latency

Alerting

# Alert on consumer lag
- alert: RivvenConsumerLag
  expr: rivven_consumer_group_lag > 10000
  for: 5m
  labels:
    severity: warning
  annotations:
    summary: "Consumer group  is lagging"

# Alert on frequent rebalances
- alert: RivvenFrequentRebalances
  expr: rate(rivven_consumer_group_rebalances_total[1h]) > 10
  labels:
    severity: warning
  annotations:
    summary: "Frequent rebalances in "

CLI Monitoring

# Watch consumer lag
rivven group lag order-processors --watch

# Output:
# TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# orders   0          12345           12350           5
# orders   1          23456           23460           4
# orders   2          34567           34600           33

Troubleshooting

Frequent Rebalances

Symptoms: Consumer constantly rejoining, processing interrupted

Causes:

  • max.poll.interval.ms too short for processing time
  • Network instability
  • Long GC pauses

Solutions:

consumer:
  max_poll_interval_ms: 600000  # Increase to 10 minutes
  session_timeout_ms: 60000     # Longer session timeout
  heartbeat_interval_ms: 10000  # Less frequent heartbeats

Consumer Lag Growing

Symptoms: Lag increases over time

Causes:

  • Processing too slow
  • Not enough consumers
  • Partition imbalance

Solutions:

  1. Add more consumers (up to partition count)
  2. Increase partition count
  3. Optimize processing logic
  4. Use cooperative_sticky to reduce rebalance impact

Duplicate Processing

Symptoms: Same messages processed multiple times

Causes:

  • Auto-commit with processing failure
  • Rebalance before commit

Solutions:

  1. Use manual commit after processing
  2. Use transactional consumers for exactly-once
  3. Make processing idempotent

Back to top

Copyright © 2026 Rivven Contributors. Licensed under the Apache License 2.0.