Admin Client Guide
This guide covers administrative operations using the Krafka AdminClient.
Overview
The AdminClient provides cluster administration capabilities:
- Topic management (create, delete, describe, list)
- Consumer group management (describe, list, KIP-848 describe)
- Topic partition details (paginated describe with ELR)
- Record deletion (delete records before an offset)
- Leader epoch queries (detect log truncation)
- Cluster information
- Partition management
- ACL management
- Delegation token management (create, describe, renew, expire)
- Client quota management (describe, alter)
- Cluster feature versioning (describe, update — KIP-584)
- Log directory inspection (describe log dirs with volume capacity)
- Move replicas between log directories
- Delete consumer group committed offsets
- SCRAM credential management (describe, alter — KIP-554)
- Transaction debugging (describe producers, describe/list transactions — KIP-664)
- Client metrics resource discovery (KIP-714)
API Version Negotiation
The AdminClient automatically negotiates the best API version for each RPC
using the broker’s ApiVersions response. This ensures forward compatibility
with newer Kafka releases while gracefully falling back to older protocol
versions on legacy brokers. If a broker does not support a required API, the
client returns a clear protocol error.
Basic Usage
use krafka::admin::AdminClient;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// List all topics
let topics = admin.list_topics().await?;
println!("Topics: {:?}", topics);
Ok(())
}
Authentication
The AdminClient supports all SASL authentication mechanisms:
SASL/PLAIN
use krafka::admin::AdminClient;
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_plain("username", "password")
.build()
.await?;
SASL/SCRAM-SHA-256
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_scram_sha256("username", "password")
.build()
.await?;
SASL/SCRAM-SHA-512
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_scram_sha512("username", "password")
.build()
.await?;
Generic AuthConfig
For AWS MSK IAM or advanced configurations:
use krafka::admin::AdminClient;
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let admin = AdminClient::builder()
.bootstrap_servers("msk-broker:9098")
.auth(auth)
.build()
.await?;
Topic Management
Creating Topics
use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// Simple topic creation
let topic = NewTopic::new("my-topic", 6, 3);
let results = admin
.create_topics(vec![topic], Duration::from_secs(30))
.await?;
for result in results {
match result.error {
None => println!("Created: {}", result.name),
Some(e) => println!("Failed to create {}: {}", result.name, e),
}
}
Creating Topics with Configuration
use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;
let topic = NewTopic::new("compacted-topic", 12, 3)
.with_config("cleanup.policy", "compact")
.with_config("min.insync.replicas", "2")
.with_config("retention.ms", "604800000"); // 7 days
admin.create_topics(vec![topic], Duration::from_secs(30)).await?;
Deleting Topics
use std::time::Duration;
let results = admin
.delete_topics(
vec!["topic-to-delete".to_string()],
Duration::from_secs(30),
)
.await?;
for result in results {
match result.error {
None => println!("Deleted: {}", result.name),
Some(e) => println!("Failed to delete {}: {}", result.name, e),
}
}
Listing Topics
let topics = admin.list_topics().await?;
println!("Topics in cluster:");
for topic in topics {
println!(" - {}", topic);
}
Describing Topics
let descriptions = admin
.describe_topics(&["topic1".to_string(), "topic2".to_string()])
.await?;
for topic in descriptions {
println!("Topic: {}", topic.name);
println!(" Partitions: {}", topic.partitions.len());
for partition in &topic.partitions {
println!(
" Partition {}: leader={}, replicas={:?}, isr={:?}",
partition.partition,
partition.leader,
partition.replicas,
partition.isr
);
}
}
Increasing Partition Count
You can increase the number of partitions for an existing topic (but never decrease):
use std::time::Duration;
let result = admin
.create_partitions("my-topic", 12, Duration::from_secs(30))
.await?;
match result.error {
None => println!("Partitions increased to 12"),
Some(e) => println!("Failed: {}", e),
}
Configuration Management
Describing Configuration
use krafka::admin::DescribeConfigsRequest;
let configs = admin.describe_configs(DescribeConfigsRequest::for_topic("my-topic")).await?;
println!("Topic configuration:");
for config in configs {
let value = config.value.as_deref().unwrap_or("(null)");
let flags = format!(
"{}{}{}",
if config.read_only { "R" } else { "" },
if config.is_default { "D" } else { "" },
if config.is_sensitive { "S" } else { "" }
);
println!(" {}: {} [{}]", config.name, value, flags);
}
Describing Broker Configuration
let configs = admin.describe_configs(DescribeConfigsRequest::for_broker(0)).await?;
println!("Broker 0 configuration:");
for config in configs.iter().filter(|c| !c.is_default) {
println!(" {}: {:?}", config.name, config.value);
}
Altering Topic Configuration
use std::collections::HashMap;
let mut configs = HashMap::new();
configs.insert("retention.ms".to_string(), "86400000".to_string()); // 1 day
configs.insert("cleanup.policy".to_string(), "compact".to_string());
let result = admin.alter_topic_config("my-topic", configs).await?;
match result.error {
None => println!("Configuration updated"),
Some(e) => println!("Failed: {}", e),
}
Cluster Information
Describing the Cluster
let cluster = admin.describe_cluster().await?;
println!("Cluster info:");
println!(" Cluster ID: {}", cluster.cluster_id);
println!(" Controller: {}", cluster.controller_id);
println!(" Brokers:");
for broker in cluster.brokers {
println!(
" - {} at {}:{} (rack: {:?})",
broker.broker_id, broker.host, broker.port, broker.rack
);
}
Getting Partition Count
if let Some(count) = admin.partition_count("my-topic").await? {
println!("Topic has {} partitions", count);
} else {
println!("Topic not found");
}
Error Handling
use krafka::admin::{AdminClient, NewTopic};
use krafka::error::KrafkaError;
use std::time::Duration;
async fn create_topic_if_not_exists(
admin: &AdminClient,
name: &str,
partitions: i32,
replication_factor: i16,
) -> Result<(), KrafkaError> {
// Check if topic exists
let topics = admin.list_topics().await?;
if topics.contains(&name.to_string()) {
println!("Topic {} already exists", name);
return Ok(());
}
// Create the topic
let topic = NewTopic::new(name, partitions, replication_factor);
let results = admin
.create_topics(vec![topic], Duration::from_secs(30))
.await?;
for result in results {
if let Some(error) = result.error {
return Err(KrafkaError::broker(
krafka::error::ErrorCode::UnknownServerError,
error,
));
}
}
println!("Created topic: {}", name);
Ok(())
}
Common Topic Configurations
| Configuration | Type | Default | Description |
|---|---|---|---|
cleanup.policy |
String | delete |
delete or compact |
compression.type |
String | producer |
Compression type |
retention.ms |
Long | -1 |
Message retention time (-1 = infinite) |
retention.bytes |
Long | -1 |
Max partition size (-1 = infinite) |
segment.bytes |
Int | 1GB | Segment file size |
min.insync.replicas |
Int | 1 |
Min ISR for writes with acks=all |
max.message.bytes |
Int | 1MB | Max message size |
unclean.leader.election.enable |
Bool | false |
Allow unclean leader election |
Best Practices
Always Check Results
let results = admin.create_topics(topics, timeout).await?;
let mut success = true;
for result in results {
if let Some(error) = &result.error {
eprintln!("Failed to create {}: {}", result.name, error);
success = false;
}
}
if !success {
return Err(KrafkaError::invalid_state("Some topics failed to create"));
}
Use Appropriate Timeouts
use std::time::Duration;
// Short timeout for simple operations
admin.list_topics().await?; // Uses default timeout
// Longer timeout for operations that may take time
admin.create_topics(topics, Duration::from_secs(60)).await?;
admin.delete_topics(topics, Duration::from_secs(60)).await?;
Handle Topic Already Exists
let results = admin.create_topics(vec![topic], timeout).await?;
for result in results {
match &result.error {
None => println!("Created: {}", result.name),
Some(e) if e.contains("TOPIC_ALREADY_EXISTS") => {
println!("Topic {} already exists (OK)", result.name);
}
Some(e) => {
return Err(KrafkaError::broker(
krafka::error::ErrorCode::UnknownServerError,
e.clone(),
));
}
}
}
ACL Management
The AdminClient supports Access Control List (ACL) management for Kafka security.
Using AclFilter (Recommended)
The AclFilter struct provides a cleaner API for ACL queries:
use krafka::admin::AclFilter;
use krafka::protocol::AclResourceType;
// Filter that matches all ACLs
let all_acls = admin.describe_acls_with_filter(AclFilter::all()).await?;
// Filter for a specific topic
let topic_acls = admin.describe_acls_with_filter(
AclFilter::for_resource(AclResourceType::Topic, "my-topic")
).await?;
// Filter for a specific principal
let user_acls = admin.describe_acls_with_filter(
AclFilter::for_principal("User:alice")
).await?;
// Builder pattern for complex filters
let filter = AclFilter::all()
.resource_type(AclResourceType::Group)
.resource_name("my-consumer-group")
.principal("User:bob");
let result = admin.describe_acls_with_filter(filter).await?;
Describe ACLs
Query existing ACLs matching a filter:
use krafka::protocol::{AclResourceType, AclPatternType, AclOperation, AclPermissionType};
// Find all ACLs for a specific topic
let result = admin.describe_acls(
AclResourceType::Topic,
Some("my-topic"),
AclPatternType::Literal,
None, // any principal
None, // any host
AclOperation::Any,
AclPermissionType::Any,
).await?;
if let Some(error) = result.error {
println!("Error: {}", error);
} else {
for binding in result.bindings {
println!("ACL: {:?} {} {:?} on {}",
binding.permission_type,
binding.principal,
binding.operation,
binding.resource_name);
}
}
Create ACLs
Create new access control entries:
use krafka::protocol::AclBinding;
// Create a simple read ACL
let read_acl = AclBinding::allow_read_topic("my-topic", "User:alice");
// Create a write ACL
let write_acl = AclBinding::allow_write_topic("my-topic", "User:bob");
// Create ACLs
let result = admin.create_acls(vec![read_acl, write_acl]).await?;
for (i, r) in result.results.iter().enumerate() {
match &r.error {
None => println!("ACL {} created successfully", i),
Some(e) => println!("ACL {} failed: {}", i, e),
}
}
Delete ACLs
Delete ACLs matching a filter:
use krafka::protocol::{AclBindingFilter, AclResourceType, AclPatternType, AclOperation, AclPermissionType};
// Delete all ACLs for a topic
let filter = AclBindingFilter {
resource_type: AclResourceType::Topic,
resource_name: Some("my-topic".to_string()),
pattern_type: AclPatternType::Literal,
principal: None,
host: None,
operation: AclOperation::Any,
permission_type: AclPermissionType::Any,
};
let result = admin.delete_acls(vec![filter]).await?;
for (i, fr) in result.filter_results.iter().enumerate() {
match &fr.error {
None => println!("Filter {} deleted {} ACLs", i, fr.deleted_count),
Some(e) => println!("Filter {} failed: {}", i, e),
}
}
Consumer Group Management
Describing Consumer Groups
Get detailed information about one or more consumer groups. The method automatically detects each group’s type (classic or KIP-848 consumer protocol) and dispatches to the appropriate API (Key 15 or Key 69). The request is routed to each group’s coordinator broker via FindCoordinator:
let descriptions = admin
.describe_consumer_groups(vec!["my-group".to_string(), "other-group".to_string()])
.await?;
for group in &descriptions {
println!("Group: {} (type: {}, state: {})", group.group_id, group.group_type, group.state);
if let Some(assignor) = &group.assignor {
println!(" Assignor: {}", assignor);
}
if let Some(epoch) = group.group_epoch {
println!(" Epoch: {}", epoch);
}
for member in &group.members {
println!(
" Member: {} (client: {}, host: {}, instance: {:?})",
member.member_id, member.client_id, member.client_host,
member.instance_id
);
}
if let Some(error) = &group.error {
println!(" Error: {}", error);
}
}
Note: Classic-protocol groups return
protocol_typeandassignorbut no epoch or assignment details. KIP-848 groups returngroup_epoch,assignment_epoch, per-member subscriptions, and topic-UUID-based current/target assignments.
Listing Consumer Groups
List all consumer groups across the cluster:
let groups = admin.list_consumer_groups().await?;
println!("Consumer groups:");
for group in &groups {
println!(" {} (type: {:?}, protocol: {})", group.group_id, group.group_type, group.protocol_type);
}
Note:
list_consumer_groups()queries all brokers in the cluster and deduplicates results, since consumer groups are managed by their respective group coordinators.
Topic Partition Details
Describing Topic Partitions
Use describe_topic_partitions() for paginated, detailed partition information
including ELR (eligible leader replicas) from KIP-966:
let result = admin
.describe_topic_partitions(vec!["my-topic".to_string()])
.await?;
for topic in &result.topics {
println!(
"Topic: {} (internal: {}, id: {:?})",
topic.name.as_deref().unwrap_or("?"),
topic.is_internal,
topic.topic_id
);
for p in &topic.partitions {
println!(
" Partition {}: leader={}, epoch={}, replicas={:?}, isr={:?}",
p.partition_index, p.leader_id, p.leader_epoch,
p.replica_nodes, p.isr_nodes
);
if let Some(elr) = &p.eligible_leader_replicas {
println!(" ELR: {:?}", elr);
}
if let Some(last_elr) = &p.last_known_elr {
println!(" Last known ELR: {:?}", last_elr);
}
if !p.offline_replicas.is_empty() {
println!(" Offline: {:?}", p.offline_replicas);
}
}
}
Note: The DescribeTopicPartitions API (Key 75) is available on Kafka 4.0+. It automatically handles pagination for topics with many partitions (default limit 2000 partitions per page). All pages are collected into a single result.
Record Deletion
Deleting Records
Delete records from topic partitions before a specified offset. Records with offsets less than the specified offset are marked for deletion (this adjusts the log start offset). Requests are automatically routed to each partition’s leader broker:
use std::collections::HashMap;
use std::time::Duration;
let mut offsets = HashMap::new();
offsets.insert(("my-topic".to_string(), 0), 100i64); // Delete before offset 100
offsets.insert(("my-topic".to_string(), 1), 250i64); // Delete before offset 250
let results = admin
.delete_records(offsets, Duration::from_secs(30))
.await?;
for result in &results {
match &result.error {
None => println!(
"Deleted records from {}:{}, new low watermark: {}",
result.topic, result.partition, result.low_watermark
),
Some(e) => println!(
"Failed to delete from {}:{}: {}",
result.topic, result.partition, e
),
}
}
Note: Deleted records are not immediately removed from disk. The broker adjusts the log start offset, and records before that offset become inaccessible. Physical deletion happens during log segment cleanup.
Leader Epoch Queries
OffsetForLeaderEpoch
Query the end offset for a given leader epoch. This is used to detect log truncation after leader changes. Requests are routed to each partition’s leader broker:
// Query the end offset for leader epoch 5 on partition 0 of "my-topic"
let results = admin
.offset_for_leader_epoch(vec![
("my-topic".to_string(), 0, 5),
("my-topic".to_string(), 1, 3),
])
.await?;
for result in &results {
match &result.error {
None => println!(
"{}:{} epoch={} end_offset={}",
result.topic, result.partition,
result.leader_epoch, result.end_offset
),
Some(e) => println!(
"{}:{} error: {}",
result.topic, result.partition, e
),
}
}
This API is useful for:
- Log truncation detection: After a leader change, check if the log was truncated
- Consumer offset validation: Ensure a consumer’s saved offset is still valid
- Replication diagnostics: Verify epoch boundaries across replicas
Delegation Tokens
Delegation tokens (KIP-48) allow a principal to delegate authentication to another principal without sharing credentials. The token HMAC can be used for SASL/SCRAM authentication.
Creating a Token
use std::time::Duration;
// Create a token that "alice" can renew, with a 24-hour lifetime
let result = admin
.create_delegation_token(
&[("User", "alice")],
Some(Duration::from_secs(86_400)),
)
.await?;
match result.token {
Some(token) => println!("Created token: {} (HMAC {} bytes)", token.token_id, token.hmac.len()),
None => println!("Error: {}", result.error.unwrap()),
}
Pass an empty renewers slice to allow only the token owner to renew.
Use None for max_lifetime to accept the server default (typically 7 days).
Describing Tokens
// Describe all tokens visible to the caller
let tokens = admin.describe_delegation_tokens(None).await?;
for token in &tokens {
println!(
"Token {} owned by {}:{}, expires at {}, {} renewer(s)",
token.token_id,
token.principal_type,
token.principal_name,
token.expiry_timestamp_ms,
token.renewers.len(),
);
}
// Describe tokens for a specific owner
let tokens = admin
.describe_delegation_tokens(Some(&[("User", "alice")]))
.await?;
Renewing a Token
use std::time::Duration;
// Obtain a token (e.g., from a prior create call)
let result = admin
.create_delegation_token(&[("User", "alice")], Some(Duration::from_secs(86_400)))
.await?;
let token = result.token.expect("token created");
// Extend the token's lifetime by 1 hour
let result = admin
.renew_delegation_token(&token.hmac, Duration::from_secs(3_600))
.await?;
match result.error {
None => println!("New expiry: {}", result.expiry_timestamp_ms),
Some(e) => println!("Renew failed: {}", e),
}
Expiring a Token
use std::time::Duration;
// Obtain a token (e.g., from describe)
let tokens = admin.describe_delegation_tokens(None).await?;
let token = &tokens[0];
// Expire a token immediately
let result = admin.expire_delegation_token(&token.hmac, None).await?;
// Expire a token after a grace period
let result = admin
.expire_delegation_token(&token.hmac, Some(Duration::from_secs(60)))
.await?;
Protocol Versions
| API | Versions | Changes |
|---|---|---|
| CreateDelegationToken | v1–v3 | v1 baseline (v0 removed in Kafka 4.0), v2 flexible encoding, v3 owner principal override |
| RenewDelegationToken | v1–v2 | v1 baseline, v2 flexible encoding |
| ExpireDelegationToken | v1–v2 | v1 baseline, v2 flexible encoding |
| DescribeDelegationToken | v1–v3 | v1 baseline, v2 flexible encoding, v3 token requester fields |
Client Quotas
Client quotas control the resource usage of clients (producer/consumer byte
rates, request percentages, etc.). Use describe_client_quotas to query
current quotas and alter_client_quotas to change them.
Describing Quotas
// Describe all quotas for user "alice" (match_type 0 = exact match)
let result = admin
.describe_client_quotas(&[("user", 0, Some("alice"))], false)
.await?;
for entry in &result.entries {
let entity: Vec<_> = entry.entity.iter().map(|e| {
format!("{}={}", e.entity_type, e.entity_name.as_deref().unwrap_or("<default>"))
}).collect();
println!("Entity: {}", entity.join(", "));
for v in &entry.values {
println!(" {} = {}", v.key, v.value);
}
}
Filter match types:
0— exact: match the entity with the given name1— default: match the default entity for this type2— any specified: match any entity with a name (non-default)
When strict is true, only entities that exactly match all given component
types are returned (entities with additional unspecified types are excluded).
Altering Quotas
use krafka::admin::QuotaAlteration;
// Set producer byte rate for user "alice"
let results = admin
.alter_client_quotas(
&[QuotaAlteration {
entity: vec![("user", Some("alice"))],
ops: vec![
("producer_byte_rate", Some(1_048_576.0)), // set to 1 MiB/s
("consumer_byte_rate", None), // remove quota
],
}],
false,
)
.await?;
for result in &results {
match &result.error {
None => println!("Quota altered successfully"),
Some(e) => println!("Error: {}", e),
}
}
// Dry-run validation (validate_only = true)
let results = admin
.alter_client_quotas(
&[QuotaAlteration {
entity: vec![("user", Some("alice"))],
ops: vec![("producer_byte_rate", Some(1_048_576.0))],
}],
true,
)
.await?;
Feature Versioning (KIP-584)
Kafka 2.7+ supports cluster-wide feature flags that control the finalized
version range for features like metadata.version. Use describe_features to
discover what the cluster supports and update_features to upgrade, downgrade,
or delete finalized feature levels.
Describing Features
let features = admin.describe_features().await?;
println!("Epoch: {}", features.finalized_features_epoch);
for f in &features.supported_features {
println!("supported: {} [{}, {}]", f.name, f.min_version, f.max_version);
}
for f in &features.finalized_features {
println!("finalized: {} [{}, {}]", f.name, f.min_version_level, f.max_version_level);
}
Updating Features
use krafka::protocol::messages::FeatureUpdateKey;
// Upgrade metadata.version to level 17
let results = admin
.update_features(
vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
false, // validate_only
)
.await?;
for r in &results.results {
match &r.error {
None => println!("{}: ok", r.feature),
Some(e) => println!("{}: {}", r.feature, e),
}
}
// Dry-run validation (validate_only = true, requires v1+)
let results = admin
.update_features(
vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
true,
)
.await?;
Upgrade types:
FeatureUpdateKey::upgrade(name, level)— raise to a higher levelFeatureUpdateKey::safe_downgrade(name, level)— lower the level safelyFeatureUpdateKey::unsafe_downgrade(name, level)— forceful downgrade (may lose data)FeatureUpdateKey::delete(name)— remove the finalized feature entirely
When the broker supports UpdateFeatures v1+, the request uses the typed
UpgradeType field. On older v0 brokers, the client falls back to the boolean
AllowDowngrade flag.
Log Directory Inspection
describe_log_dirs() queries every broker and returns per-directory information
including partition sizes, offset lag, future-replica status, and (v4+) volume
capacity.
Describe All Log Directories
let dirs = admin.describe_log_dirs(None).await?;
for dir in &dirs {
println!("broker {} — {} (total: {}, usable: {})",
dir.broker_id, dir.log_dir, dir.total_bytes, dir.usable_bytes);
if let Some(err) = &dir.error {
eprintln!(" error: {err}");
}
for topic in &dir.topics {
for p in &topic.partitions {
println!(" {}-{}: {} bytes, lag {}{}",
topic.name, p.partition_index, p.partition_size,
p.offset_lag, if p.is_future_key { " (future)" } else { "" });
}
}
}
Describe Specific Topics
use krafka::protocol::DescribableLogDirTopic;
let filter = vec![DescribableLogDirTopic {
topic: "my-topic".into(),
partitions: vec![0, 1, 2],
}];
let dirs = admin.describe_log_dirs(Some(filter)).await?;
Result Fields
| Field | Type | Description |
|---|---|---|
broker_id |
i32 |
Broker that owns the directory |
log_dir |
String |
Absolute path on the broker |
error |
Option<String> |
Per-directory error (e.g., KAFKA_STORAGE_ERROR) |
total_bytes |
i64 |
Volume total bytes (-1 if unknown, v4+) |
usable_bytes |
i64 |
Volume free bytes (-1 if unknown, v4+) |
topics[].partitions[].partition_size |
i64 |
Log size in bytes |
topics[].partitions[].offset_lag |
i64 |
Lag behind high watermark |
topics[].partitions[].is_future_key |
bool |
Future replica (reassignment) |
Protocol Versions
| Version | Changes |
|---|---|
| v1 | Baseline (v0 removed in Kafka 4.0) |
| v2 | Flexible encoding (compact strings + tagged fields) |
| v3 | Top-level ErrorCode in response |
| v4 | TotalBytes + UsableBytes per log directory |
Leader Election
elect_leaders() triggers a leader election for the specified partitions.
Supports preferred election (elect the preferred replica) and unclean election
(elect the first live replica even without in-sync replicas).
Preferred Election for All Partitions
use krafka::protocol::ElectionType;
let results = admin
.elect_leaders(ElectionType::Preferred, None, Duration::from_secs(60))
.await?;
for topic in &results {
for p in &topic.partitions {
if let Some(err) = &p.error {
eprintln!("{}-{}: {err}", topic.topic, p.partition_id);
}
}
}
Unclean Election for Specific Partitions
use krafka::protocol::{ElectionType, ElectLeadersTopicPartitions};
let results = admin
.elect_leaders(
ElectionType::Unclean,
Some(vec![ElectLeadersTopicPartitions {
topic: "my-topic".into(),
partitions: vec![0, 1],
}]),
Duration::from_secs(60),
)
.await?;
Protocol Versions
| Version | Changes |
|---|---|
| v0 | Baseline (preferred election only) |
| v1 | Adds ElectionType for preferred/unclean (KIP-460); top-level error code |
| v2 | Flexible encoding (compact strings + tagged fields) |
Partition Reassignment
alter_partition_reassignments() initiates or cancels partition reassignments.
list_partition_reassignments() lists all ongoing reassignments.
Warning: Reassigning partitions moves data between brokers and can significantly impact cluster load.
Start a Reassignment
use krafka::protocol::{ReassignableTopic, ReassignablePartition};
let result = admin.alter_partition_reassignments(
vec![ReassignableTopic {
name: "my-topic".into(),
partitions: vec![ReassignablePartition {
partition_index: 0,
replicas: Some(vec![1, 2, 3]),
}],
}],
Duration::from_secs(60),
).await?;
if let Some(err) = &result.error {
eprintln!("Top-level error: {err}");
}
for topic in &result.topics {
for p in &topic.partitions {
if let Some(err) = &p.error {
eprintln!("{}-{}: {err}", topic.name, p.partition_index);
}
}
}
Cancel a Pending Reassignment
use krafka::protocol::{ReassignableTopic, ReassignablePartition};
// Set replicas to None to cancel
let result = admin.alter_partition_reassignments(
vec![ReassignableTopic {
name: "my-topic".into(),
partitions: vec![ReassignablePartition {
partition_index: 0,
replicas: None, // cancel pending reassignment
}],
}],
Duration::from_secs(60),
).await?;
List Ongoing Reassignments
let reassignments = admin
.list_partition_reassignments(None, Duration::from_secs(60))
.await?;
for topic in &reassignments {
for p in &topic.partitions {
println!("{} p{}: replicas={:?} adding={:?} removing={:?}",
topic.name, p.partition_index, p.replicas,
p.adding_replicas, p.removing_replicas);
}
}
AlterPartitionReassignments Protocol Versions
| Version | Changes |
|---|---|
| v0 | Baseline (flexible encoding from the start) |
ListPartitionReassignments Protocol Versions
| Version | Changes |
|---|---|
| v0 | Baseline (flexible encoding from the start) |
SCRAM Credential Management
Manage SASL/SCRAM credentials (KIP-554) for users.
Describe SCRAM Credentials
// Describe all users
let result = admin.describe_user_scram_credentials(None).await?;
for user in &result.users {
println!("{}: {:?}", user.name, user.credential_infos);
}
// Describe specific users
let result = admin
.describe_user_scram_credentials(Some(vec!["alice".into(), "bob".into()]))
.await?;
Alter SCRAM Credentials
use krafka::protocol::{ScramCredentialDeletion, ScramCredentialUpsertion};
use krafka::auth::ScramMechanism;
use zeroize::Zeroizing;
let results = admin.alter_user_scram_credentials(
vec![ScramCredentialDeletion {
name: "alice".into(),
mechanism: ScramMechanism::Sha512,
}],
vec![ScramCredentialUpsertion {
name: "bob".into(),
mechanism: ScramMechanism::Sha256,
iterations: 8192,
salt: Zeroizing::new(vec![1, 2, 3]),
salted_password: Zeroizing::new(vec![4, 5, 6]),
}],
).await?;
SCRAM Credential Protocol Versions
| Version | Changes |
|---|---|
| DescribeUserScramCredentials v0 | Baseline (KIP-554, flexible from v0) |
| AlterUserScramCredentials v0 | Baseline (KIP-554, flexible from v0) |
Log Directory Management
Move Replicas Between Log Directories
use krafka::protocol::{AlterReplicaLogDir, AlterReplicaLogDirTopic};
let results = admin.alter_replica_log_dirs(vec![
AlterReplicaLogDir {
path: "/data/kafka-logs-2".into(),
topics: vec![AlterReplicaLogDirTopic {
name: "my-topic".into(),
partitions: vec![0, 1],
}],
},
]).await?;
AlterReplicaLogDirs Protocol Versions
| Version | Changes |
|---|---|
| v1 | Baseline (non-flexible encoding) |
| v2 | Flexible encoding |
Offset Management
Delete Consumer Group Offsets
let result = admin.delete_offsets(
"my-group",
&[("my-topic", &[0, 1, 2])],
).await?;
if let Some(err) = &result.error {
eprintln!("Top-level error: {err}");
}
OffsetDelete Protocol Versions
| Version | Changes |
|---|---|
| v0 | Baseline (non-flexible encoding) |
Transaction Debugging
Describe Producers
Inspect active producers on partitions (useful for debugging stuck transactions).
let results = admin
.describe_producers(&[("my-topic", &[0, 1])])
.await?;
for topic in &results {
for p in &topic.partitions {
for pr in &p.active_producers {
println!("p{}: producer_id={} epoch={} txn_offset={}",
p.partition_index, pr.producer_id,
pr.producer_epoch, pr.current_txn_start_offset);
}
}
}
Describe Transactions
let results = admin
.describe_transactions(&["txn-1", "txn-2"])
.await?;
for txn in &results {
println!("{}: state={} producer_id={}", txn.transactional_id, txn.state, txn.producer_id);
}
List Transactions
// List all ongoing transactions
let result = admin.list_transactions(&["Ongoing"], &[], -1).await?;
for txn in &result.transactions {
println!("{}: state={} producer_id={}", txn.transactional_id, txn.state, txn.producer_id);
}
Transaction Debug Protocol Versions
| Version | Changes |
|---|---|
| DescribeProducers v0 | Baseline (KIP-664, flexible from v0) |
| DescribeTransactions v0 | Baseline (KIP-664, flexible from v0) |
| ListTransactions v0 | Baseline (KIP-664, flexible from v0) |
| ListTransactions v1 | Adds DurationFilter (KIP-994) |
Client Metrics Resources
List Client Metrics Subscriptions
let names = admin.list_client_metrics_resources().await?;
for name in &names {
println!("subscription: {name}");
}
ListClientMetricsResources Protocol Versions
| Version | Changes |
|---|---|
| v0 | Baseline (KIP-714, flexible from v0) |
Transaction Markers (WriteTxnMarkers)
Write Transaction Markers
Write COMMIT or ABORT markers for transactions. Primarily useful for aborting
stuck (hanging) transactions via the abort_transaction convenience method.
// Abort a stuck transaction
admin.abort_transaction("my-transactional-id").await?;
For low-level control, use write_txn_markers directly:
use krafka::protocol::{WritableTxnMarker, WritableTxnMarkerTopic};
let results = admin
.write_txn_markers(&[WritableTxnMarker {
producer_id: 42,
producer_epoch: 5,
transaction_result: false, // ABORT
topics: vec![WritableTxnMarkerTopic {
name: "my-topic".into(),
partition_indexes: vec![0, 1],
}],
coordinator_epoch: 10,
}])
.await?;
WriteTxnMarkers Protocol Versions
| Version | Changes |
|---|---|
| WriteTxnMarkers v1 | Baseline (flexible encoding, v0 removed in Kafka 4.0) |
| WriteTxnMarkers v2 | Adds TransactionVersion field (KIP-1228) |
KRaft Quorum (DescribeQuorum)
Describe Quorum
Inspect the KRaft quorum for cluster metadata partitions. Returns voter and observer replicas, leader info, and high watermark.
let result = admin
.describe_quorum(&[("__cluster_metadata", &[0])])
.await?;
for topic in &result.topics {
for partition in &topic.partitions {
println!(
"partition {} leader={} epoch={} hw={}",
partition.partition_index,
partition.leader_id,
partition.leader_epoch,
partition.high_watermark
);
for voter in &partition.current_voters {
println!(" voter {} log_end_offset={}", voter.replica_id, voter.log_end_offset);
}
}
}
DescribeQuorum Protocol Versions
| Version | Changes |
|---|---|
| DescribeQuorum v0 | Baseline (KIP-595, flexible from v0) |
| DescribeQuorum v1 | Adds LastFetchTimestamp + LastCaughtUpTimestamp (KIP-836) |
| DescribeQuorum v2 | Adds Nodes, ErrorMessage, ReplicaDirectoryId (KIP-853) |
Next Steps
- Interceptors Guide - Producer and consumer interceptor hooks
- Configuration Reference - All admin client options
- Architecture Overview - How admin client works internally