Admin Client Guide

This guide covers administrative operations using the Krafka AdminClient.

Overview

The AdminClient provides cluster administration capabilities:

  • Topic management (create, delete, describe, list)
  • Consumer group management (describe, list, KIP-848 describe)
  • Topic partition details (paginated describe with ELR)
  • Record deletion (delete records before an offset)
  • Leader epoch queries (detect log truncation)
  • Cluster information
  • Partition management
  • ACL management
  • Delegation token management (create, describe, renew, expire)
  • Client quota management (describe, alter)
  • Cluster feature versioning (describe, update — KIP-584)
  • Log directory inspection (describe log dirs with volume capacity)
  • Move replicas between log directories
  • Delete consumer group committed offsets
  • SCRAM credential management (describe, alter — KIP-554)
  • Transaction debugging (describe producers, describe/list transactions — KIP-664)
  • Client metrics resource discovery (KIP-714)

API Version Negotiation

The AdminClient automatically negotiates the best API version for each RPC using the broker’s ApiVersions response. This ensures forward compatibility with newer Kafka releases while gracefully falling back to older protocol versions on legacy brokers. If a broker does not support a required API, the client returns a clear protocol error.

Basic Usage

use krafka::admin::AdminClient;
use krafka::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let admin = AdminClient::builder()
        .bootstrap_servers("localhost:9092")
        .build()
        .await?;

    // List all topics
    let topics = admin.list_topics().await?;
    println!("Topics: {:?}", topics);

    Ok(())
}

Authentication

The AdminClient supports all SASL authentication mechanisms:

SASL/PLAIN

use krafka::admin::AdminClient;

let admin = AdminClient::builder()
    .bootstrap_servers("localhost:9092")
    .sasl_plain("username", "password")
    .build()
    .await?;

SASL/SCRAM-SHA-256

let admin = AdminClient::builder()
    .bootstrap_servers("localhost:9092")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

SASL/SCRAM-SHA-512

let admin = AdminClient::builder()
    .bootstrap_servers("localhost:9092")
    .sasl_scram_sha512("username", "password")
    .build()
    .await?;

Generic AuthConfig

For AWS MSK IAM or advanced configurations:

use krafka::admin::AdminClient;
use krafka::auth::AuthConfig;

let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let admin = AdminClient::builder()
    .bootstrap_servers("msk-broker:9098")
    .auth(auth)
    .build()
    .await?;

Topic Management

Creating Topics

use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;

let admin = AdminClient::builder()
    .bootstrap_servers("localhost:9092")
    .build()
    .await?;

// Simple topic creation
let topic = NewTopic::new("my-topic", 6, 3);

let results = admin
    .create_topics(vec![topic], Duration::from_secs(30))
    .await?;

for result in results {
    match result.error {
        None => println!("Created: {}", result.name),
        Some(e) => println!("Failed to create {}: {}", result.name, e),
    }
}

Creating Topics with Configuration

use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;

let topic = NewTopic::new("compacted-topic", 12, 3)
    .with_config("cleanup.policy", "compact")
    .with_config("min.insync.replicas", "2")
    .with_config("retention.ms", "604800000");  // 7 days

admin.create_topics(vec![topic], Duration::from_secs(30)).await?;

Deleting Topics

use std::time::Duration;

let results = admin
    .delete_topics(
        vec!["topic-to-delete".to_string()],
        Duration::from_secs(30),
    )
    .await?;

for result in results {
    match result.error {
        None => println!("Deleted: {}", result.name),
        Some(e) => println!("Failed to delete {}: {}", result.name, e),
    }
}

Listing Topics

let topics = admin.list_topics().await?;
println!("Topics in cluster:");
for topic in topics {
    println!("  - {}", topic);
}

Describing Topics

let descriptions = admin
    .describe_topics(&["topic1".to_string(), "topic2".to_string()])
    .await?;

for topic in descriptions {
    println!("Topic: {}", topic.name);
    println!("  Partitions: {}", topic.partitions.len());
    for partition in &topic.partitions {
        println!(
            "    Partition {}: leader={}, replicas={:?}, isr={:?}",
            partition.partition,
            partition.leader,
            partition.replicas,
            partition.isr
        );
    }
}

Increasing Partition Count

You can increase the number of partitions for an existing topic (but never decrease):

use std::time::Duration;

let result = admin
    .create_partitions("my-topic", 12, Duration::from_secs(30))
    .await?;

match result.error {
    None => println!("Partitions increased to 12"),
    Some(e) => println!("Failed: {}", e),
}

Configuration Management

Describing Configuration

use krafka::admin::DescribeConfigsRequest;

let configs = admin.describe_configs(DescribeConfigsRequest::for_topic("my-topic")).await?;

println!("Topic configuration:");
for config in configs {
    let value = config.value.as_deref().unwrap_or("(null)");
    let flags = format!(
        "{}{}{}",
        if config.read_only { "R" } else { "" },
        if config.is_default { "D" } else { "" },
        if config.is_sensitive { "S" } else { "" }
    );
    println!("  {}: {} [{}]", config.name, value, flags);
}

Describing Broker Configuration

let configs = admin.describe_configs(DescribeConfigsRequest::for_broker(0)).await?;

println!("Broker 0 configuration:");
for config in configs.iter().filter(|c| !c.is_default) {
    println!("  {}: {:?}", config.name, config.value);
}

Altering Topic Configuration

use std::collections::HashMap;

let mut configs = HashMap::new();
configs.insert("retention.ms".to_string(), "86400000".to_string());  // 1 day
configs.insert("cleanup.policy".to_string(), "compact".to_string());

let result = admin.alter_topic_config("my-topic", configs).await?;

match result.error {
    None => println!("Configuration updated"),
    Some(e) => println!("Failed: {}", e),
}

Cluster Information

Describing the Cluster

let cluster = admin.describe_cluster().await?;

println!("Cluster info:");
println!("  Cluster ID: {}", cluster.cluster_id);
println!("  Controller: {}", cluster.controller_id);
println!("  Brokers:");
for broker in cluster.brokers {
    println!(
        "    - {} at {}:{} (rack: {:?})",
        broker.broker_id, broker.host, broker.port, broker.rack
    );
}

Getting Partition Count

if let Some(count) = admin.partition_count("my-topic").await? {
    println!("Topic has {} partitions", count);
} else {
    println!("Topic not found");
}

Error Handling

use krafka::admin::{AdminClient, NewTopic};
use krafka::error::KrafkaError;
use std::time::Duration;

async fn create_topic_if_not_exists(
    admin: &AdminClient,
    name: &str,
    partitions: i32,
    replication_factor: i16,
) -> Result<(), KrafkaError> {
    // Check if topic exists
    let topics = admin.list_topics().await?;
    if topics.contains(&name.to_string()) {
        println!("Topic {} already exists", name);
        return Ok(());
    }

    // Create the topic
    let topic = NewTopic::new(name, partitions, replication_factor);
    let results = admin
        .create_topics(vec![topic], Duration::from_secs(30))
        .await?;

    for result in results {
        if let Some(error) = result.error {
            return Err(KrafkaError::broker(
                krafka::error::ErrorCode::UnknownServerError,
                error,
            ));
        }
    }

    println!("Created topic: {}", name);
    Ok(())
}

Common Topic Configurations

Configuration Type Default Description
cleanup.policy String delete delete or compact
compression.type String producer Compression type
retention.ms Long -1 Message retention time (-1 = infinite)
retention.bytes Long -1 Max partition size (-1 = infinite)
segment.bytes Int 1GB Segment file size
min.insync.replicas Int 1 Min ISR for writes with acks=all
max.message.bytes Int 1MB Max message size
unclean.leader.election.enable Bool false Allow unclean leader election

Best Practices

Always Check Results

let results = admin.create_topics(topics, timeout).await?;

let mut success = true;
for result in results {
    if let Some(error) = &result.error {
        eprintln!("Failed to create {}: {}", result.name, error);
        success = false;
    }
}

if !success {
    return Err(KrafkaError::invalid_state("Some topics failed to create"));
}

Use Appropriate Timeouts

use std::time::Duration;

// Short timeout for simple operations
admin.list_topics().await?;  // Uses default timeout

// Longer timeout for operations that may take time
admin.create_topics(topics, Duration::from_secs(60)).await?;
admin.delete_topics(topics, Duration::from_secs(60)).await?;

Handle Topic Already Exists

let results = admin.create_topics(vec![topic], timeout).await?;

for result in results {
    match &result.error {
        None => println!("Created: {}", result.name),
        Some(e) if e.contains("TOPIC_ALREADY_EXISTS") => {
            println!("Topic {} already exists (OK)", result.name);
        }
        Some(e) => {
            return Err(KrafkaError::broker(
                krafka::error::ErrorCode::UnknownServerError,
                e.clone(),
            ));
        }
    }
}

ACL Management

The AdminClient supports Access Control List (ACL) management for Kafka security.

The AclFilter struct provides a cleaner API for ACL queries:

use krafka::admin::AclFilter;
use krafka::protocol::AclResourceType;

// Filter that matches all ACLs
let all_acls = admin.describe_acls_with_filter(AclFilter::all()).await?;

// Filter for a specific topic
let topic_acls = admin.describe_acls_with_filter(
    AclFilter::for_resource(AclResourceType::Topic, "my-topic")
).await?;

// Filter for a specific principal
let user_acls = admin.describe_acls_with_filter(
    AclFilter::for_principal("User:alice")
).await?;

// Builder pattern for complex filters
let filter = AclFilter::all()
    .resource_type(AclResourceType::Group)
    .resource_name("my-consumer-group")
    .principal("User:bob");

let result = admin.describe_acls_with_filter(filter).await?;

Describe ACLs

Query existing ACLs matching a filter:

use krafka::protocol::{AclResourceType, AclPatternType, AclOperation, AclPermissionType};

// Find all ACLs for a specific topic
let result = admin.describe_acls(
    AclResourceType::Topic,
    Some("my-topic"),
    AclPatternType::Literal,
    None,  // any principal
    None,  // any host
    AclOperation::Any,
    AclPermissionType::Any,
).await?;

if let Some(error) = result.error {
    println!("Error: {}", error);
} else {
    for binding in result.bindings {
        println!("ACL: {:?} {} {:?} on {}", 
            binding.permission_type, 
            binding.principal, 
            binding.operation,
            binding.resource_name);
    }
}

Create ACLs

Create new access control entries:

use krafka::protocol::AclBinding;

// Create a simple read ACL
let read_acl = AclBinding::allow_read_topic("my-topic", "User:alice");

// Create a write ACL
let write_acl = AclBinding::allow_write_topic("my-topic", "User:bob");

// Create ACLs
let result = admin.create_acls(vec![read_acl, write_acl]).await?;

for (i, r) in result.results.iter().enumerate() {
    match &r.error {
        None => println!("ACL {} created successfully", i),
        Some(e) => println!("ACL {} failed: {}", i, e),
    }
}

Delete ACLs

Delete ACLs matching a filter:

use krafka::protocol::{AclBindingFilter, AclResourceType, AclPatternType, AclOperation, AclPermissionType};

// Delete all ACLs for a topic
let filter = AclBindingFilter {
    resource_type: AclResourceType::Topic,
    resource_name: Some("my-topic".to_string()),
    pattern_type: AclPatternType::Literal,
    principal: None,
    host: None,
    operation: AclOperation::Any,
    permission_type: AclPermissionType::Any,
};

let result = admin.delete_acls(vec![filter]).await?;

for (i, fr) in result.filter_results.iter().enumerate() {
    match &fr.error {
        None => println!("Filter {} deleted {} ACLs", i, fr.deleted_count),
        Some(e) => println!("Filter {} failed: {}", i, e),
    }
}

Consumer Group Management

Describing Consumer Groups

Get detailed information about one or more consumer groups. The method automatically detects each group’s type (classic or KIP-848 consumer protocol) and dispatches to the appropriate API (Key 15 or Key 69). The request is routed to each group’s coordinator broker via FindCoordinator:

let descriptions = admin
    .describe_consumer_groups(vec!["my-group".to_string(), "other-group".to_string()])
    .await?;

for group in &descriptions {
    println!("Group: {} (type: {}, state: {})", group.group_id, group.group_type, group.state);
    if let Some(assignor) = &group.assignor {
        println!("  Assignor: {}", assignor);
    }
    if let Some(epoch) = group.group_epoch {
        println!("  Epoch: {}", epoch);
    }
    for member in &group.members {
        println!(
            "    Member: {} (client: {}, host: {}, instance: {:?})",
            member.member_id, member.client_id, member.client_host,
            member.instance_id
        );
    }
    if let Some(error) = &group.error {
        println!("  Error: {}", error);
    }
}

Note: Classic-protocol groups return protocol_type and assignor but no epoch or assignment details. KIP-848 groups return group_epoch, assignment_epoch, per-member subscriptions, and topic-UUID-based current/target assignments.

Listing Consumer Groups

List all consumer groups across the cluster:

let groups = admin.list_consumer_groups().await?;

println!("Consumer groups:");
for group in &groups {
    println!("  {} (type: {:?}, protocol: {})", group.group_id, group.group_type, group.protocol_type);
}

Note: list_consumer_groups() queries all brokers in the cluster and deduplicates results, since consumer groups are managed by their respective group coordinators.

Topic Partition Details

Describing Topic Partitions

Use describe_topic_partitions() for paginated, detailed partition information including ELR (eligible leader replicas) from KIP-966:

let result = admin
    .describe_topic_partitions(vec!["my-topic".to_string()])
    .await?;

for topic in &result.topics {
    println!(
        "Topic: {} (internal: {}, id: {:?})",
        topic.name.as_deref().unwrap_or("?"),
        topic.is_internal,
        topic.topic_id
    );
    for p in &topic.partitions {
        println!(
            "  Partition {}: leader={}, epoch={}, replicas={:?}, isr={:?}",
            p.partition_index, p.leader_id, p.leader_epoch,
            p.replica_nodes, p.isr_nodes
        );
        if let Some(elr) = &p.eligible_leader_replicas {
            println!("    ELR: {:?}", elr);
        }
        if let Some(last_elr) = &p.last_known_elr {
            println!("    Last known ELR: {:?}", last_elr);
        }
        if !p.offline_replicas.is_empty() {
            println!("    Offline: {:?}", p.offline_replicas);
        }
    }
}

Note: The DescribeTopicPartitions API (Key 75) is available on Kafka 4.0+. It automatically handles pagination for topics with many partitions (default limit 2000 partitions per page). All pages are collected into a single result.

Record Deletion

Deleting Records

Delete records from topic partitions before a specified offset. Records with offsets less than the specified offset are marked for deletion (this adjusts the log start offset). Requests are automatically routed to each partition’s leader broker:

use std::collections::HashMap;
use std::time::Duration;

let mut offsets = HashMap::new();
offsets.insert(("my-topic".to_string(), 0), 100i64);  // Delete before offset 100
offsets.insert(("my-topic".to_string(), 1), 250i64);  // Delete before offset 250

let results = admin
    .delete_records(offsets, Duration::from_secs(30))
    .await?;

for result in &results {
    match &result.error {
        None => println!(
            "Deleted records from {}:{}, new low watermark: {}",
            result.topic, result.partition, result.low_watermark
        ),
        Some(e) => println!(
            "Failed to delete from {}:{}: {}",
            result.topic, result.partition, e
        ),
    }
}

Note: Deleted records are not immediately removed from disk. The broker adjusts the log start offset, and records before that offset become inaccessible. Physical deletion happens during log segment cleanup.

Leader Epoch Queries

OffsetForLeaderEpoch

Query the end offset for a given leader epoch. This is used to detect log truncation after leader changes. Requests are routed to each partition’s leader broker:

// Query the end offset for leader epoch 5 on partition 0 of "my-topic"
let results = admin
    .offset_for_leader_epoch(vec![
        ("my-topic".to_string(), 0, 5),
        ("my-topic".to_string(), 1, 3),
    ])
    .await?;

for result in &results {
    match &result.error {
        None => println!(
            "{}:{} epoch={} end_offset={}",
            result.topic, result.partition,
            result.leader_epoch, result.end_offset
        ),
        Some(e) => println!(
            "{}:{} error: {}",
            result.topic, result.partition, e
        ),
    }
}

This API is useful for:

  • Log truncation detection: After a leader change, check if the log was truncated
  • Consumer offset validation: Ensure a consumer’s saved offset is still valid
  • Replication diagnostics: Verify epoch boundaries across replicas

Delegation Tokens

Delegation tokens (KIP-48) allow a principal to delegate authentication to another principal without sharing credentials. The token HMAC can be used for SASL/SCRAM authentication.

Creating a Token

use std::time::Duration;

// Create a token that "alice" can renew, with a 24-hour lifetime
let result = admin
    .create_delegation_token(
        &[("User", "alice")],
        Some(Duration::from_secs(86_400)),
    )
    .await?;

match result.token {
    Some(token) => println!("Created token: {} (HMAC {} bytes)", token.token_id, token.hmac.len()),
    None => println!("Error: {}", result.error.unwrap()),
}

Pass an empty renewers slice to allow only the token owner to renew. Use None for max_lifetime to accept the server default (typically 7 days).

Describing Tokens

// Describe all tokens visible to the caller
let tokens = admin.describe_delegation_tokens(None).await?;
for token in &tokens {
    println!(
        "Token {} owned by {}:{}, expires at {}, {} renewer(s)",
        token.token_id,
        token.principal_type,
        token.principal_name,
        token.expiry_timestamp_ms,
        token.renewers.len(),
    );
}

// Describe tokens for a specific owner
let tokens = admin
    .describe_delegation_tokens(Some(&[("User", "alice")]))
    .await?;

Renewing a Token

use std::time::Duration;

// Obtain a token (e.g., from a prior create call)
let result = admin
    .create_delegation_token(&[("User", "alice")], Some(Duration::from_secs(86_400)))
    .await?;
let token = result.token.expect("token created");

// Extend the token's lifetime by 1 hour
let result = admin
    .renew_delegation_token(&token.hmac, Duration::from_secs(3_600))
    .await?;

match result.error {
    None => println!("New expiry: {}", result.expiry_timestamp_ms),
    Some(e) => println!("Renew failed: {}", e),
}

Expiring a Token

use std::time::Duration;

// Obtain a token (e.g., from describe)
let tokens = admin.describe_delegation_tokens(None).await?;
let token = &tokens[0];

// Expire a token immediately
let result = admin.expire_delegation_token(&token.hmac, None).await?;

// Expire a token after a grace period
let result = admin
    .expire_delegation_token(&token.hmac, Some(Duration::from_secs(60)))
    .await?;

Protocol Versions

API Versions Changes
CreateDelegationToken v1–v3 v1 baseline (v0 removed in Kafka 4.0), v2 flexible encoding, v3 owner principal override
RenewDelegationToken v1–v2 v1 baseline, v2 flexible encoding
ExpireDelegationToken v1–v2 v1 baseline, v2 flexible encoding
DescribeDelegationToken v1–v3 v1 baseline, v2 flexible encoding, v3 token requester fields

Client Quotas

Client quotas control the resource usage of clients (producer/consumer byte rates, request percentages, etc.). Use describe_client_quotas to query current quotas and alter_client_quotas to change them.

Describing Quotas

// Describe all quotas for user "alice" (match_type 0 = exact match)
let result = admin
    .describe_client_quotas(&[("user", 0, Some("alice"))], false)
    .await?;

for entry in &result.entries {
    let entity: Vec<_> = entry.entity.iter().map(|e| {
        format!("{}={}", e.entity_type, e.entity_name.as_deref().unwrap_or("<default>"))
    }).collect();
    println!("Entity: {}", entity.join(", "));
    for v in &entry.values {
        println!("  {} = {}", v.key, v.value);
    }
}

Filter match types:

  • 0 — exact: match the entity with the given name
  • 1 — default: match the default entity for this type
  • 2 — any specified: match any entity with a name (non-default)

When strict is true, only entities that exactly match all given component types are returned (entities with additional unspecified types are excluded).

Altering Quotas

use krafka::admin::QuotaAlteration;

// Set producer byte rate for user "alice"
let results = admin
    .alter_client_quotas(
        &[QuotaAlteration {
            entity: vec![("user", Some("alice"))],
            ops: vec![
                ("producer_byte_rate", Some(1_048_576.0)),  // set to 1 MiB/s
                ("consumer_byte_rate", None),               // remove quota
            ],
        }],
        false,
    )
    .await?;

for result in &results {
    match &result.error {
        None => println!("Quota altered successfully"),
        Some(e) => println!("Error: {}", e),
    }
}

// Dry-run validation (validate_only = true)
let results = admin
    .alter_client_quotas(
        &[QuotaAlteration {
            entity: vec![("user", Some("alice"))],
            ops: vec![("producer_byte_rate", Some(1_048_576.0))],
        }],
        true,
    )
    .await?;

Feature Versioning (KIP-584)

Kafka 2.7+ supports cluster-wide feature flags that control the finalized version range for features like metadata.version. Use describe_features to discover what the cluster supports and update_features to upgrade, downgrade, or delete finalized feature levels.

Describing Features

let features = admin.describe_features().await?;
println!("Epoch: {}", features.finalized_features_epoch);
for f in &features.supported_features {
    println!("supported: {} [{}, {}]", f.name, f.min_version, f.max_version);
}
for f in &features.finalized_features {
    println!("finalized: {} [{}, {}]", f.name, f.min_version_level, f.max_version_level);
}

Updating Features

use krafka::protocol::messages::FeatureUpdateKey;

// Upgrade metadata.version to level 17
let results = admin
    .update_features(
        vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
        false, // validate_only
    )
    .await?;

for r in &results.results {
    match &r.error {
        None => println!("{}: ok", r.feature),
        Some(e) => println!("{}: {}", r.feature, e),
    }
}

// Dry-run validation (validate_only = true, requires v1+)
let results = admin
    .update_features(
        vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
        true,
    )
    .await?;

Upgrade types:

  • FeatureUpdateKey::upgrade(name, level) — raise to a higher level
  • FeatureUpdateKey::safe_downgrade(name, level) — lower the level safely
  • FeatureUpdateKey::unsafe_downgrade(name, level) — forceful downgrade (may lose data)
  • FeatureUpdateKey::delete(name) — remove the finalized feature entirely

When the broker supports UpdateFeatures v1+, the request uses the typed UpgradeType field. On older v0 brokers, the client falls back to the boolean AllowDowngrade flag.

Log Directory Inspection

describe_log_dirs() queries every broker and returns per-directory information including partition sizes, offset lag, future-replica status, and (v4+) volume capacity.

Describe All Log Directories

let dirs = admin.describe_log_dirs(None).await?;
for dir in &dirs {
    println!("broker {} — {} (total: {}, usable: {})",
        dir.broker_id, dir.log_dir, dir.total_bytes, dir.usable_bytes);
    if let Some(err) = &dir.error {
        eprintln!("  error: {err}");
    }
    for topic in &dir.topics {
        for p in &topic.partitions {
            println!("  {}-{}: {} bytes, lag {}{}",
                topic.name, p.partition_index, p.partition_size,
                p.offset_lag, if p.is_future_key { " (future)" } else { "" });
        }
    }
}

Describe Specific Topics

use krafka::protocol::DescribableLogDirTopic;

let filter = vec![DescribableLogDirTopic {
    topic: "my-topic".into(),
    partitions: vec![0, 1, 2],
}];
let dirs = admin.describe_log_dirs(Some(filter)).await?;

Result Fields

Field Type Description
broker_id i32 Broker that owns the directory
log_dir String Absolute path on the broker
error Option<String> Per-directory error (e.g., KAFKA_STORAGE_ERROR)
total_bytes i64 Volume total bytes (-1 if unknown, v4+)
usable_bytes i64 Volume free bytes (-1 if unknown, v4+)
topics[].partitions[].partition_size i64 Log size in bytes
topics[].partitions[].offset_lag i64 Lag behind high watermark
topics[].partitions[].is_future_key bool Future replica (reassignment)

Protocol Versions

Version Changes
v1 Baseline (v0 removed in Kafka 4.0)
v2 Flexible encoding (compact strings + tagged fields)
v3 Top-level ErrorCode in response
v4 TotalBytes + UsableBytes per log directory

Leader Election

elect_leaders() triggers a leader election for the specified partitions. Supports preferred election (elect the preferred replica) and unclean election (elect the first live replica even without in-sync replicas).

Preferred Election for All Partitions

use krafka::protocol::ElectionType;

let results = admin
    .elect_leaders(ElectionType::Preferred, None, Duration::from_secs(60))
    .await?;
for topic in &results {
    for p in &topic.partitions {
        if let Some(err) = &p.error {
            eprintln!("{}-{}: {err}", topic.topic, p.partition_id);
        }
    }
}

Unclean Election for Specific Partitions

use krafka::protocol::{ElectionType, ElectLeadersTopicPartitions};

let results = admin
    .elect_leaders(
        ElectionType::Unclean,
        Some(vec![ElectLeadersTopicPartitions {
            topic: "my-topic".into(),
            partitions: vec![0, 1],
        }]),
        Duration::from_secs(60),
    )
    .await?;

Protocol Versions

Version Changes
v0 Baseline (preferred election only)
v1 Adds ElectionType for preferred/unclean (KIP-460); top-level error code
v2 Flexible encoding (compact strings + tagged fields)

Partition Reassignment

alter_partition_reassignments() initiates or cancels partition reassignments. list_partition_reassignments() lists all ongoing reassignments.

Warning: Reassigning partitions moves data between brokers and can significantly impact cluster load.

Start a Reassignment

use krafka::protocol::{ReassignableTopic, ReassignablePartition};

let result = admin.alter_partition_reassignments(
    vec![ReassignableTopic {
        name: "my-topic".into(),
        partitions: vec![ReassignablePartition {
            partition_index: 0,
            replicas: Some(vec![1, 2, 3]),
        }],
    }],
    Duration::from_secs(60),
).await?;

if let Some(err) = &result.error {
    eprintln!("Top-level error: {err}");
}
for topic in &result.topics {
    for p in &topic.partitions {
        if let Some(err) = &p.error {
            eprintln!("{}-{}: {err}", topic.name, p.partition_index);
        }
    }
}

Cancel a Pending Reassignment

use krafka::protocol::{ReassignableTopic, ReassignablePartition};

// Set replicas to None to cancel
let result = admin.alter_partition_reassignments(
    vec![ReassignableTopic {
        name: "my-topic".into(),
        partitions: vec![ReassignablePartition {
            partition_index: 0,
            replicas: None,  // cancel pending reassignment
        }],
    }],
    Duration::from_secs(60),
).await?;

List Ongoing Reassignments

let reassignments = admin
    .list_partition_reassignments(None, Duration::from_secs(60))
    .await?;
for topic in &reassignments {
    for p in &topic.partitions {
        println!("{} p{}: replicas={:?} adding={:?} removing={:?}",
            topic.name, p.partition_index, p.replicas,
            p.adding_replicas, p.removing_replicas);
    }
}

AlterPartitionReassignments Protocol Versions

Version Changes
v0 Baseline (flexible encoding from the start)

ListPartitionReassignments Protocol Versions

Version Changes
v0 Baseline (flexible encoding from the start)

SCRAM Credential Management

Manage SASL/SCRAM credentials (KIP-554) for users.

Describe SCRAM Credentials

// Describe all users
let result = admin.describe_user_scram_credentials(None).await?;
for user in &result.users {
    println!("{}: {:?}", user.name, user.credential_infos);
}

// Describe specific users
let result = admin
    .describe_user_scram_credentials(Some(vec!["alice".into(), "bob".into()]))
    .await?;

Alter SCRAM Credentials

use krafka::protocol::{ScramCredentialDeletion, ScramCredentialUpsertion};
use krafka::auth::ScramMechanism;
use zeroize::Zeroizing;

let results = admin.alter_user_scram_credentials(
    vec![ScramCredentialDeletion {
        name: "alice".into(),
        mechanism: ScramMechanism::Sha512,
    }],
    vec![ScramCredentialUpsertion {
        name: "bob".into(),
        mechanism: ScramMechanism::Sha256,
        iterations: 8192,
        salt: Zeroizing::new(vec![1, 2, 3]),
        salted_password: Zeroizing::new(vec![4, 5, 6]),
    }],
).await?;

SCRAM Credential Protocol Versions

Version Changes
DescribeUserScramCredentials v0 Baseline (KIP-554, flexible from v0)
AlterUserScramCredentials v0 Baseline (KIP-554, flexible from v0)

Log Directory Management

Move Replicas Between Log Directories

use krafka::protocol::{AlterReplicaLogDir, AlterReplicaLogDirTopic};

let results = admin.alter_replica_log_dirs(vec![
    AlterReplicaLogDir {
        path: "/data/kafka-logs-2".into(),
        topics: vec![AlterReplicaLogDirTopic {
            name: "my-topic".into(),
            partitions: vec![0, 1],
        }],
    },
]).await?;

AlterReplicaLogDirs Protocol Versions

Version Changes
v1 Baseline (non-flexible encoding)
v2 Flexible encoding

Offset Management

Delete Consumer Group Offsets

let result = admin.delete_offsets(
    "my-group",
    &[("my-topic", &[0, 1, 2])],
).await?;
if let Some(err) = &result.error {
    eprintln!("Top-level error: {err}");
}

OffsetDelete Protocol Versions

Version Changes
v0 Baseline (non-flexible encoding)

Transaction Debugging

Describe Producers

Inspect active producers on partitions (useful for debugging stuck transactions).

let results = admin
    .describe_producers(&[("my-topic", &[0, 1])])
    .await?;
for topic in &results {
    for p in &topic.partitions {
        for pr in &p.active_producers {
            println!("p{}: producer_id={} epoch={} txn_offset={}",
                p.partition_index, pr.producer_id,
                pr.producer_epoch, pr.current_txn_start_offset);
        }
    }
}

Describe Transactions

let results = admin
    .describe_transactions(&["txn-1", "txn-2"])
    .await?;
for txn in &results {
    println!("{}: state={} producer_id={}", txn.transactional_id, txn.state, txn.producer_id);
}

List Transactions

// List all ongoing transactions
let result = admin.list_transactions(&["Ongoing"], &[], -1).await?;
for txn in &result.transactions {
    println!("{}: state={} producer_id={}", txn.transactional_id, txn.state, txn.producer_id);
}

Transaction Debug Protocol Versions

Version Changes
DescribeProducers v0 Baseline (KIP-664, flexible from v0)
DescribeTransactions v0 Baseline (KIP-664, flexible from v0)
ListTransactions v0 Baseline (KIP-664, flexible from v0)
ListTransactions v1 Adds DurationFilter (KIP-994)

Client Metrics Resources

List Client Metrics Subscriptions

let names = admin.list_client_metrics_resources().await?;
for name in &names {
    println!("subscription: {name}");
}

ListClientMetricsResources Protocol Versions

Version Changes
v0 Baseline (KIP-714, flexible from v0)

Transaction Markers (WriteTxnMarkers)

Write Transaction Markers

Write COMMIT or ABORT markers for transactions. Primarily useful for aborting stuck (hanging) transactions via the abort_transaction convenience method.

// Abort a stuck transaction
admin.abort_transaction("my-transactional-id").await?;

For low-level control, use write_txn_markers directly:

use krafka::protocol::{WritableTxnMarker, WritableTxnMarkerTopic};

let results = admin
    .write_txn_markers(&[WritableTxnMarker {
        producer_id: 42,
        producer_epoch: 5,
        transaction_result: false, // ABORT
        topics: vec![WritableTxnMarkerTopic {
            name: "my-topic".into(),
            partition_indexes: vec![0, 1],
        }],
        coordinator_epoch: 10,
    }])
    .await?;

WriteTxnMarkers Protocol Versions

Version Changes
WriteTxnMarkers v1 Baseline (flexible encoding, v0 removed in Kafka 4.0)
WriteTxnMarkers v2 Adds TransactionVersion field (KIP-1228)

KRaft Quorum (DescribeQuorum)

Describe Quorum

Inspect the KRaft quorum for cluster metadata partitions. Returns voter and observer replicas, leader info, and high watermark.

let result = admin
    .describe_quorum(&[("__cluster_metadata", &[0])])
    .await?;
for topic in &result.topics {
    for partition in &topic.partitions {
        println!(
            "partition {} leader={} epoch={} hw={}",
            partition.partition_index,
            partition.leader_id,
            partition.leader_epoch,
            partition.high_watermark
        );
        for voter in &partition.current_voters {
            println!("  voter {} log_end_offset={}", voter.replica_id, voter.log_end_offset);
        }
    }
}

DescribeQuorum Protocol Versions

Version Changes
DescribeQuorum v0 Baseline (KIP-595, flexible from v0)
DescribeQuorum v1 Adds LastFetchTimestamp + LastCaughtUpTimestamp (KIP-836)
DescribeQuorum v2 Adds Nodes, ErrorMessage, ReplicaDirectoryId (KIP-853)

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.