Admin Client Guide
This guide covers administrative operations using the Krafka AdminClient.
Overview
The AdminClient provides cluster administration capabilities:
- Topic management (create, delete, describe, list)
- Consumer group management (describe, list)
- Record deletion (delete records before an offset)
- Leader epoch queries (detect log truncation)
- Cluster information
- Partition management
- ACL management
Basic Usage
use krafka::admin::AdminClient;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// List all topics
let topics = admin.list_topics().await?;
println!("Topics: {:?}", topics);
Ok(())
}
Authentication
The AdminClient supports all SASL authentication mechanisms:
SASL/PLAIN
use krafka::admin::AdminClient;
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_plain("username", "password")
.build()
.await?;
SASL/SCRAM-SHA-256
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_scram_sha256("username", "password")
.build()
.await?;
SASL/SCRAM-SHA-512
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_scram_sha512("username", "password")
.build()
.await?;
Generic AuthConfig
For AWS MSK IAM or advanced configurations:
use krafka::admin::AdminClient;
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let admin = AdminClient::builder()
.bootstrap_servers("msk-broker:9098")
.auth(auth)
.build()
.await?;
Topic Management
Creating Topics
use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// Simple topic creation
let topic = NewTopic::new("my-topic", 6, 3);
let results = admin
.create_topics(vec![topic], Duration::from_secs(30))
.await?;
for result in results {
match result.error {
None => println!("Created: {}", result.name),
Some(e) => println!("Failed to create {}: {}", result.name, e),
}
}
Creating Topics with Configuration
use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;
let topic = NewTopic::new("compacted-topic", 12, 3)
.with_config("cleanup.policy", "compact")
.with_config("min.insync.replicas", "2")
.with_config("retention.ms", "604800000"); // 7 days
admin.create_topics(vec![topic], Duration::from_secs(30)).await?;
Deleting Topics
use std::time::Duration;
let results = admin
.delete_topics(
vec!["topic-to-delete".to_string()],
Duration::from_secs(30),
)
.await?;
for result in results {
match result.error {
None => println!("Deleted: {}", result.name),
Some(e) => println!("Failed to delete {}: {}", result.name, e),
}
}
Listing Topics
let topics = admin.list_topics().await?;
println!("Topics in cluster:");
for topic in topics {
println!(" - {}", topic);
}
Describing Topics
let descriptions = admin
.describe_topics(&["topic1".to_string(), "topic2".to_string()])
.await?;
for topic in descriptions {
println!("Topic: {}", topic.name);
println!(" Partitions: {}", topic.partitions.len());
for partition in &topic.partitions {
println!(
" Partition {}: leader={}, replicas={:?}, isr={:?}",
partition.partition,
partition.leader,
partition.replicas,
partition.isr
);
}
}
Increasing Partition Count
You can increase the number of partitions for an existing topic (but never decrease):
use std::time::Duration;
let result = admin
.create_partitions("my-topic", 12, Duration::from_secs(30))
.await?;
match result.error {
None => println!("Partitions increased to 12"),
Some(e) => println!("Failed: {}", e),
}
Configuration Management
Describing Topic Configuration
let configs = admin.describe_topic_config("my-topic").await?;
println!("Topic configuration:");
for config in configs {
let value = config.value.as_deref().unwrap_or("(null)");
let flags = format!(
"{}{}{}",
if config.read_only { "R" } else { "" },
if config.is_default { "D" } else { "" },
if config.is_sensitive { "S" } else { "" }
);
println!(" {}: {} [{}]", config.name, value, flags);
}
Describing Broker Configuration
let configs = admin.describe_broker_config(0).await?;
println!("Broker 0 configuration:");
for config in configs.iter().filter(|c| !c.is_default) {
println!(" {}: {:?}", config.name, config.value);
}
Altering Topic Configuration
use std::collections::HashMap;
let mut configs = HashMap::new();
configs.insert("retention.ms".to_string(), "86400000".to_string()); // 1 day
configs.insert("cleanup.policy".to_string(), "compact".to_string());
let result = admin.alter_topic_config("my-topic", configs).await?;
match result.error {
None => println!("Configuration updated"),
Some(e) => println!("Failed: {}", e),
}
Cluster Information
Describing the Cluster
let cluster = admin.describe_cluster().await?;
println!("Cluster info:");
if let Some(controller) = cluster.controller_id {
println!(" Controller: {}", controller);
}
println!(" Brokers:");
for broker in cluster.brokers {
println!(
" - {} at {}:{} (rack: {:?})",
broker.id, broker.host, broker.port, broker.rack
);
}
Getting Partition Count
if let Some(count) = admin.partition_count("my-topic").await? {
println!("Topic has {} partitions", count);
} else {
println!("Topic not found");
}
Error Handling
use krafka::admin::{AdminClient, NewTopic};
use krafka::error::KrafkaError;
use std::time::Duration;
async fn create_topic_if_not_exists(
admin: &AdminClient,
name: &str,
partitions: i32,
replication_factor: i16,
) -> Result<(), KrafkaError> {
// Check if topic exists
let topics = admin.list_topics().await?;
if topics.contains(&name.to_string()) {
println!("Topic {} already exists", name);
return Ok(());
}
// Create the topic
let topic = NewTopic::new(name, partitions, replication_factor);
let results = admin
.create_topics(vec![topic], Duration::from_secs(30))
.await?;
for result in results {
if let Some(error) = result.error {
return Err(KrafkaError::broker(
krafka::error::ErrorCode::UnknownServerError,
error,
));
}
}
println!("Created topic: {}", name);
Ok(())
}
Common Topic Configurations
| Configuration | Type | Default | Description |
|---|---|---|---|
cleanup.policy |
String | delete |
delete or compact |
compression.type |
String | producer |
Compression type |
retention.ms |
Long | -1 |
Message retention time (-1 = infinite) |
retention.bytes |
Long | -1 |
Max partition size (-1 = infinite) |
segment.bytes |
Int | 1GB | Segment file size |
min.insync.replicas |
Int | 1 |
Min ISR for writes with acks=all |
max.message.bytes |
Int | 1MB | Max message size |
unclean.leader.election.enable |
Bool | false |
Allow unclean leader election |
Best Practices
Always Check Results
let results = admin.create_topics(topics, timeout).await?;
let mut success = true;
for result in results {
if let Some(error) = &result.error {
eprintln!("Failed to create {}: {}", result.name, error);
success = false;
}
}
if !success {
return Err(KrafkaError::invalid_state("Some topics failed to create"));
}
Use Appropriate Timeouts
use std::time::Duration;
// Short timeout for simple operations
admin.list_topics().await?; // Uses default timeout
// Longer timeout for operations that may take time
admin.create_topics(topics, Duration::from_secs(60)).await?;
admin.delete_topics(topics, Duration::from_secs(60)).await?;
Handle Topic Already Exists
let results = admin.create_topics(vec![topic], timeout).await?;
for result in results {
match &result.error {
None => println!("Created: {}", result.name),
Some(e) if e.contains("TOPIC_ALREADY_EXISTS") => {
println!("Topic {} already exists (OK)", result.name);
}
Some(e) => {
return Err(KrafkaError::broker(
krafka::error::ErrorCode::UnknownServerError,
e.clone(),
));
}
}
}
ACL Management
The AdminClient supports Access Control List (ACL) management for Kafka security.
Using AclFilter (Recommended)
The AclFilter struct provides a cleaner API for ACL queries:
use krafka::admin::AclFilter;
use krafka::protocol::AclResourceType;
// Filter that matches all ACLs
let all_acls = admin.describe_acls_with_filter(AclFilter::all()).await?;
// Filter for a specific topic
let topic_acls = admin.describe_acls_with_filter(
AclFilter::for_resource(AclResourceType::Topic, "my-topic")
).await?;
// Filter for a specific principal
let user_acls = admin.describe_acls_with_filter(
AclFilter::for_principal("User:alice")
).await?;
// Builder pattern for complex filters
let filter = AclFilter::all()
.resource_type(AclResourceType::Group)
.resource_name("my-consumer-group")
.principal("User:bob");
let result = admin.describe_acls_with_filter(filter).await?;
Describe ACLs
Query existing ACLs matching a filter:
use krafka::protocol::{AclResourceType, AclPatternType, AclOperation, AclPermissionType};
// Find all ACLs for a specific topic
let result = admin.describe_acls(
AclResourceType::Topic,
Some("my-topic"),
AclPatternType::Literal,
None, // any principal
None, // any host
AclOperation::Any,
AclPermissionType::Any,
).await?;
if let Some(error) = result.error {
println!("Error: {}", error);
} else {
for binding in result.bindings {
println!("ACL: {:?} {} {:?} on {}",
binding.permission_type,
binding.principal,
binding.operation,
binding.resource_name);
}
}
Create ACLs
Create new access control entries:
use krafka::protocol::AclBinding;
// Create a simple read ACL
let read_acl = AclBinding::allow_read_topic("my-topic", "User:alice");
// Create a write ACL
let write_acl = AclBinding::allow_write_topic("my-topic", "User:bob");
// Create ACLs
let result = admin.create_acls(vec![read_acl, write_acl]).await?;
for (i, r) in result.results.iter().enumerate() {
match &r.error {
None => println!("ACL {} created successfully", i),
Some(e) => println!("ACL {} failed: {}", i, e),
}
}
Delete ACLs
Delete ACLs matching a filter:
use krafka::protocol::{AclBindingFilter, AclResourceType, AclPatternType, AclOperation, AclPermissionType};
// Delete all ACLs for a topic
let filter = AclBindingFilter {
resource_type: AclResourceType::Topic,
resource_name: Some("my-topic".to_string()),
pattern_type: AclPatternType::Literal,
principal: None,
host: None,
operation: AclOperation::Any,
permission_type: AclPermissionType::Any,
};
let result = admin.delete_acls(vec![filter]).await?;
for (i, fr) in result.filter_results.iter().enumerate() {
match &fr.error {
None => println!("Filter {} deleted {} ACLs", i, fr.deleted_count),
Some(e) => println!("Filter {} failed: {}", i, e),
}
}
Consumer Group Management
Describing Consumer Groups
Get detailed information about one or more consumer groups, including their state, members, and assignment protocol. The request is automatically routed to each group’s coordinator broker via FindCoordinator:
let descriptions = admin
.describe_groups(vec!["my-group".to_string(), "other-group".to_string()])
.await?;
for group in &descriptions {
println!("Group: {} (state: {})", group.group_id, group.state);
println!(" Protocol: {} / {}", group.protocol_type, group.protocol);
for member in &group.members {
println!(
" Member: {} (client: {}, host: {}, instance: {:?})",
member.member_id, member.client_id, member.client_host,
member.group_instance_id
);
}
if let Some(error) = &group.error {
println!(" Error: {}", error);
}
}
Listing Consumer Groups
List all consumer groups across the cluster:
let groups = admin.list_consumer_groups().await?;
println!("Consumer groups:");
for group in &groups {
println!(" {} (protocol: {})", group.group_id, group.protocol_type);
}
Note:
list_consumer_groups()queries all brokers in the cluster and deduplicates results, since consumer groups are managed by their respective group coordinators.
Record Deletion
Deleting Records
Delete records from topic partitions before a specified offset. Records with offsets less than the specified offset are marked for deletion (this adjusts the log start offset). Requests are automatically routed to each partition’s leader broker:
use std::collections::HashMap;
use std::time::Duration;
let mut offsets = HashMap::new();
offsets.insert(("my-topic".to_string(), 0), 100i64); // Delete before offset 100
offsets.insert(("my-topic".to_string(), 1), 250i64); // Delete before offset 250
let results = admin
.delete_records(offsets, Duration::from_secs(30))
.await?;
for result in &results {
match &result.error {
None => println!(
"Deleted records from {}:{}, new low watermark: {}",
result.topic, result.partition, result.low_watermark
),
Some(e) => println!(
"Failed to delete from {}:{}: {}",
result.topic, result.partition, e
),
}
}
Note: Deleted records are not immediately removed from disk. The broker adjusts the log start offset, and records before that offset become inaccessible. Physical deletion happens during log segment cleanup.
Leader Epoch Queries
OffsetForLeaderEpoch
Query the end offset for a given leader epoch. This is used to detect log truncation after leader changes. Requests are routed to each partition’s leader broker:
// Query the end offset for leader epoch 5 on partition 0 of "my-topic"
let results = admin
.offset_for_leader_epoch(vec![
("my-topic".to_string(), 0, 5),
("my-topic".to_string(), 1, 3),
])
.await?;
for result in &results {
match &result.error {
None => println!(
"{}:{} epoch={} end_offset={}",
result.topic, result.partition,
result.leader_epoch, result.end_offset
),
Some(e) => println!(
"{}:{} error: {}",
result.topic, result.partition, e
),
}
}
This API is useful for:
- Log truncation detection: After a leader change, check if the log was truncated
- Consumer offset validation: Ensure a consumer’s saved offset is still valid
- Replication diagnostics: Verify epoch boundaries across replicas
Next Steps
- Interceptors Guide - Producer and consumer interceptor hooks
- Configuration Reference - All admin client options
- Architecture Overview - How admin client works internally