Authentication Guide
This guide covers authentication and encryption options for connecting to Kafka clusters.
Overview
Krafka supports multiple security protocols:
| Protocol | Encryption | Authentication |
|---|---|---|
PLAINTEXT |
No | No |
SSL |
Yes (TLS) | Optional (mTLS) |
SASL_PLAINTEXT |
No | Yes (SASL) |
SASL_SSL |
Yes (TLS) | Yes (SASL) |
Supported SASL Mechanisms
| Mechanism | Description |
|---|---|
| PLAIN | Simple username/password |
| SCRAM-SHA-256 | Challenge-response with SHA-256 |
| SCRAM-SHA-512 | Challenge-response with SHA-512 |
| OAUTHBEARER | OAuth 2.0 bearer tokens (RFC 7628 / KIP-255) |
| AWS_MSK_IAM | AWS IAM authentication for MSK |
Security Protocol Selection
use krafka::auth::{AuthConfig, SecurityProtocol};
// Check what's configured
let config = AuthConfig::sasl_scram_sha256("user", "pass");
println!("Protocol: {}", config.security_protocol);
println!("Requires TLS: {}", config.requires_tls());
println!("Requires SASL: {}", config.requires_sasl());
SASL Authentication
SASL/PLAIN
Simple username/password authentication. Always use with TLS in production!
use krafka::auth::AuthConfig;
// Without TLS (development only!)
let config = AuthConfig::sasl_plain("username", "password");
// With TLS (recommended for production)
use krafka::auth::TlsConfig;
let config = AuthConfig::sasl_plain_ssl("username", "password", TlsConfig::new());
SASL/SCRAM-SHA-256
Challenge-response authentication with SHA-256 hashing. More secure than PLAIN.
use krafka::auth::AuthConfig;
let config = AuthConfig::sasl_scram_sha256("username", "password");
SASL/SCRAM-SHA-512
Maximum security SCRAM authentication with SHA-512 hashing.
use krafka::auth::AuthConfig;
let config = AuthConfig::sasl_scram_sha512("username", "password");
SCRAM Protocol Details
The SCRAM client implements RFC 5802 with:
- Salted Challenge-Response mechanism
- PBKDF2 key derivation with iteration count validation (4,096–1,000,000 range)
- HMAC signature verification
- Constant-time comparison via the
subtlecrate (timing-attack resistant) - Automatic secret zeroization on drop (
password,salted_password,server_signature) - Debug output redacts the password as
[REDACTED]
use krafka::auth::{ScramClient, ScramMechanism, ScramState};
// Create SCRAM client
let mut scram = ScramClient::new("alice", "secret", ScramMechanism::Sha256);
assert_eq!(scram.state(), ScramState::Initial);
// Generate client-first message
let client_first = scram.client_first_message();
// -> "n,,n=alice,r=<nonce>"
// Process server-first message
// scram.process_server_first(server_response)?;
// Generate client-final message
// let client_final = scram.client_final_message()?;
// Verify server-final
// scram.process_server_final(server_response)?;
SASL/OAUTHBEARER
OAuth 2.0 bearer token authentication per RFC 7628 and KIP-255.
use krafka::auth::{AuthConfig, OAuthBearerToken};
// Basic token authentication
let config = AuthConfig::sasl_oauthbearer("your-jwt-token-here");
// With TLS (recommended for production)
use krafka::auth::TlsConfig;
let config = AuthConfig::sasl_oauthbearer_ssl("your-jwt-token-here", TlsConfig::new());
With SASL Extensions
For providers like Confluent Cloud that require additional SASL extensions:
use krafka::auth::{AuthConfig, OAuthBearerToken};
// Create token with extensions
let token = OAuthBearerToken::new("your-jwt-token")
.with_extension("logicalCluster", "lkc-abc123")
.with_extension("identityPoolId", "pool-xyz789");
let config = AuthConfig::sasl_oauthbearer_token(token);
// Or with TLS
use krafka::auth::TlsConfig;
let config = AuthConfig::sasl_oauthbearer_token_ssl(
OAuthBearerToken::new("your-jwt-token")
.with_extension("logicalCluster", "lkc-abc123"),
TlsConfig::new(),
);
Builder Convenience Methods
All client builders support a shorthand .sasl_oauthbearer(token) method:
use krafka::producer::Producer;
use krafka::consumer::Consumer;
let producer = Producer::builder()
.bootstrap_servers("broker:9093")
.sasl_oauthbearer("your-jwt-token")
.build()
.await?;
let consumer = Consumer::builder()
.bootstrap_servers("broker:9093")
.group_id("my-group")
.sasl_oauthbearer("your-jwt-token")
.build()
.await?;
OAUTHBEARER Protocol Details
The implementation follows RFC 7628 GS2 framing:
- Initial response:
n,,\x01auth=Bearer <token>[\x01key=value]*\x01\x01 - Server success: Empty response (0 bytes)
- Server error: JSON or text error message
- Security: Token zeroized on drop via
zeroizecrate - Debug safety: Token redacted as
[REDACTED]in Debug output - Extensions: Arbitrary key-value pairs appended to the GS2 frame
Note: GSSAPI/Kerberos is not supported. It requires system Kerberos libraries via FFI, which is incompatible with Krafka’s
#![deny(unsafe_code)]policy. Use OAUTHBEARER or SCRAM as alternatives.
TLS/SSL Encryption
Basic TLS
Use Mozilla’s root certificates for server verification:
use krafka::auth::{AuthConfig, TlsConfig};
let config = AuthConfig::ssl(TlsConfig::new());
Custom CA Certificate
For self-signed or private CA certificates:
use krafka::auth::TlsConfig;
let tls_config = TlsConfig::new()
.with_ca_cert("/path/to/ca.pem");
Mutual TLS (mTLS)
Client certificate authentication:
use krafka::auth::TlsConfig;
let tls_config = TlsConfig::new()
.with_ca_cert("/path/to/ca.pem")
.with_client_cert("/path/to/client.pem", "/path/to/client-key.pem");
SNI Hostname
For servers behind load balancers or proxies:
use krafka::auth::TlsConfig;
let mut tls_config = TlsConfig::new();
tls_config.sni_hostname = Some("kafka.example.com".to_string());
Skip Verification (Development Only)
Never use in production!
use krafka::auth::TlsConfig;
let tls_config = TlsConfig::insecure();
AWS MSK IAM Authentication
For AWS Managed Streaming for Apache Kafka using IAM authentication:
Binary Size Note: The
aws-mskfeature adds the AWS SDK, which increases binary size by approximately 2-3 MB (release build). If binary size is critical, useAwsMskIamCredentials::from_env()which works without theaws-mskfeature.
From Environment Variables (Recommended)
The simplest approach is to load credentials from environment variables:
use krafka::auth::{AuthConfig, AwsMskIamCredentials};
// Load from AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_REGION
let creds = AwsMskIamCredentials::from_env()?;
let config = AuthConfig::aws_msk_iam_with_credentials(creds);
Environment variables used:
AWS_ACCESS_KEY_ID- RequiredAWS_SECRET_ACCESS_KEY- RequiredAWS_SESSION_TOKEN- Optional (for temporary credentials)AWS_REGIONorAWS_DEFAULT_REGION- Required
From AWS SDK Default Chain (Recommended for Production)
For production deployments on EC2, ECS, Lambda, or EKS, use the AWS SDK default chain:
use krafka::auth::{AuthConfig, AwsMskIamCredentials};
// Requires the `aws-msk` feature in Cargo.toml:
// krafka = { version = "0.2", features = ["aws-msk"] }
// Loads from (in order):
// 1. Environment variables
// 2. Shared credentials file (~/.aws/credentials)
// 3. IAM role for EC2/ECS/Lambda
// 4. Web identity token (for EKS)
let creds = AwsMskIamCredentials::from_default_chain("us-east-1").await?;
let config = AuthConfig::aws_msk_iam_with_credentials(creds);
With Explicit Credentials (Development Only)
For development or testing, you can provide credentials directly:
use krafka::auth::AuthConfig;
// With permanent credentials (avoid in production!)
let config = AuthConfig::aws_msk_iam(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"us-east-1",
);
// With temporary credentials (session token)
use krafka::auth::AwsMskIamCredentials;
let creds = AwsMskIamCredentials::with_session_token(
"AKIAIOSFODNN7EXAMPLE",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"session-token-here",
"us-east-1",
);
Using SecureConnectionConfig with MSK IAM
use krafka::network::SecureConnectionConfig;
let config = SecureConnectionConfig::builder()
.client_id("msk-client")
.aws_msk_iam("AKID", "secret", "us-east-1")
.build();
Direct MskIamAuthenticator Usage
For low-level control over the authentication process:
use krafka::auth::{AwsMskIamCredentials, MskIamAuthenticator};
let creds = AwsMskIamCredentials::new("AKID", "secret", "us-east-1");
let authenticator = MskIamAuthenticator::new(&creds, "broker.kafka.us-east-1.amazonaws.com");
// Generate signed authentication payload
let payload = authenticator.create_auth_payload();
// -> JSON with AWS Signature v4 signed request
MSK IAM Protocol Details
The implementation uses AWS Signature v4 signing:
- Service Name:
kafka-cluster - Action:
kafka-cluster:Connect - Payload Format: JSON with signed headers
- TLS Required: Always uses SASL_SSL (TLS is mandatory)
- Region-Aware: Credentials are scoped to AWS region
Configuration Options
TlsConfig
| Option | Type | Description |
|---|---|---|
ca_cert_path |
Option<String> |
Path to CA certificate PEM file |
client_cert_path |
Option<String> |
Path to client certificate PEM file |
client_key_path |
Option<String> |
Path to client private key PEM file |
verify_server_cert |
bool |
Whether to verify server certificates (default: true) |
sni_hostname |
Option<String> |
SNI hostname for TLS handshake |
AuthConfig
| Method | Protocol | Mechanism |
|---|---|---|
plaintext() |
PLAINTEXT | None |
ssl(TlsConfig) |
SSL | None (TLS-only) |
sasl_plain(user, pass) |
SASL_PLAINTEXT | PLAIN |
sasl_plain_ssl(user, pass, tls) |
SASL_SSL | PLAIN |
sasl_scram_sha256(user, pass) |
SASL_PLAINTEXT | SCRAM-SHA-256 |
sasl_scram_sha512(user, pass) |
SASL_PLAINTEXT | SCRAM-SHA-512 |
sasl_oauthbearer(token) |
SASL_PLAINTEXT | OAUTHBEARER |
sasl_oauthbearer_ssl(token, tls) |
SASL_SSL | OAUTHBEARER |
sasl_oauthbearer_token(OAuthBearerToken) |
SASL_PLAINTEXT | OAUTHBEARER |
sasl_oauthbearer_token_ssl(OAuthBearerToken, tls) |
SASL_SSL | OAUTHBEARER |
aws_msk_iam(key, secret, region) |
SASL_SSL | AWS_MSK_IAM |
Client Authentication
All Krafka clients — AdminClient, Producer, TransactionalProducer, and Consumer — support the same authentication methods through dedicated builder methods. Authentication is wired end-to-end: TLS upgrade and SASL handshake happen automatically during connection establishment.
Admin Client
use krafka::AdminClient;
// SASL/PLAIN
let admin = AdminClient::builder()
.client_id("admin-client")
.bootstrap_servers("broker:9092")
.sasl_plain("username", "password")
.build();
// SASL/SCRAM-SHA-256
let admin = AdminClient::builder()
.bootstrap_servers("broker:9092")
.sasl_scram_sha256("username", "password")
.build();
// SASL/SCRAM-SHA-512
let admin = AdminClient::builder()
.bootstrap_servers("broker:9092")
.sasl_scram_sha512("username", "password")
.build();
Producer
use krafka::producer::Producer;
// SASL/PLAIN
let producer = Producer::builder()
.bootstrap_servers("broker:9092")
.sasl_plain("username", "password")
.build()
.await?;
// SASL/SCRAM-SHA-256
let producer = Producer::builder()
.bootstrap_servers("broker:9092")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// SASL/SCRAM-SHA-512
let producer = Producer::builder()
.bootstrap_servers("broker:9092")
.sasl_scram_sha512("username", "password")
.build()
.await?;
Consumer
use krafka::consumer::Consumer;
// SASL/PLAIN
let consumer = Consumer::builder()
.bootstrap_servers("broker:9092")
.group_id("my-group")
.sasl_plain("username", "password")
.build()
.await?;
// SASL/SCRAM-SHA-256
let consumer = Consumer::builder()
.bootstrap_servers("broker:9092")
.group_id("my-group")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// SASL/SCRAM-SHA-512
let consumer = Consumer::builder()
.bootstrap_servers("broker:9092")
.group_id("my-group")
.sasl_scram_sha512("username", "password")
.build()
.await?;
Transactional Producer
use krafka::producer::TransactionalProducer;
// SASL/PLAIN
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9092")
.transactional_id("my-txn-id")
.sasl_plain("username", "password")
.build()
.await?;
// SASL/SCRAM-SHA-256
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9092")
.transactional_id("my-txn-id")
.sasl_scram_sha256("username", "password")
.build()
.await?;
// SASL/SCRAM-SHA-512
let producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9092")
.transactional_id("my-txn-id")
.sasl_scram_sha512("username", "password")
.build()
.await?;
Generic AuthConfig
For advanced configurations or AWS MSK IAM, use .auth() on any builder:
use krafka::AdminClient;
use krafka::producer::{Producer, TransactionalProducer};
use krafka::consumer::Consumer;
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
// Works on all client types
let admin = AdminClient::builder()
.bootstrap_servers("broker:9092")
.auth(auth.clone())
.build();
let producer = Producer::builder()
.bootstrap_servers("broker:9092")
.auth(auth.clone())
.build()
.await?;
let txn_producer = TransactionalProducer::builder()
.bootstrap_servers("broker:9092")
.transactional_id("my-txn-id")
.auth(auth.clone())
.build()
.await?;
let consumer = Consumer::builder()
.bootstrap_servers("broker:9092")
.group_id("my-group")
.auth(auth)
.build()
.await?;
Security Best Practices
- Always use TLS in production - Use
SASL_SSLinstead ofSASL_PLAINTEXT - Prefer SCRAM over PLAIN - SCRAM provides challenge-response security
- Use mTLS for strongest authentication - Client certificates are harder to steal
- Store credentials securely - Use environment variables or secrets managers
- Rotate credentials regularly - Especially for long-running applications
- Verify certificates in production - Never use
TlsConfig::insecure()in production - Automatic secret zeroization - All credential types (
ScramClient,MskIamAuthenticator,PlainCredentials,ScramCredentials,OAuthBearerToken) zeroize secrets on drop to prevent memory leaks. SASL PLAIN auth bytes are wrapped inZeroizing<Vec<u8>>and automatically zeroized after being sent on the wire. - Debug safety - All credential types redact secrets in
Debugoutput, sotracing::debug!("{:?}", auth)is safe to use - Cleartext warning - Using
SASL_PLAINTEXTwithPLAINemits atracing::warn!alerting that credentials will be sent in cleartext
Secure Connection Configuration
For integrated TLS and SASL configuration, use SecureConnectionConfig:
use krafka::network::SecureConnectionConfig;
use krafka::auth::TlsConfig;
use std::time::Duration;
let config = SecureConnectionConfig::builder()
.client_id("my-app")
.connect_timeout(Duration::from_secs(10))
.sasl_scram_sha256("username", "password")
.tls(TlsConfig::new())
.build();
SaslAuthenticator
For handling SASL handshakes, use SaslAuthenticator:
use krafka::network::SaslAuthenticator;
use krafka::auth::AuthConfig;
let auth = AuthConfig::sasl_scram_sha256("user", "pass");
let mut authenticator = SaslAuthenticator::new(&auth).unwrap();
// Get mechanism name for SASL handshake
let mechanism = authenticator.mechanism_name(); // "SCRAM-SHA-256"
// Get initial authentication bytes
let initial = authenticator.initial_response();
// Process server challenges
// let response = authenticator.process_challenge(&server_bytes)?;
// Check completion
if authenticator.is_complete() {
println!("Authentication successful!");
}
Example: Production Configuration
use krafka::auth::{AuthConfig, TlsConfig};
use krafka::producer::Producer;
use krafka::consumer::Consumer;
use std::env;
fn production_auth_config() -> AuthConfig {
let username = env::var("KAFKA_USER").expect("KAFKA_USER required");
let password = env::var("KAFKA_PASSWORD").expect("KAFKA_PASSWORD required");
let tls_config = TlsConfig::new()
.with_ca_cert("/etc/ssl/certs/kafka-ca.pem");
// SCRAM-SHA-512 over TLS
AuthConfig {
security_protocol: krafka::auth::SecurityProtocol::SaslSsl,
sasl_mechanism: Some(krafka::auth::SaslMechanism::ScramSha512),
scram_credentials: Some(krafka::auth::ScramCredentials::new(username, password)),
tls_config: Some(tls_config),
..Default::default()
}
}
// Use with any client
async fn create_clients() {
let auth = production_auth_config();
let producer = Producer::builder()
.bootstrap_servers("kafka.prod.example.com:9093")
.auth(auth.clone())
.build()
.await
.unwrap();
let consumer = Consumer::builder()
.bootstrap_servers("kafka.prod.example.com:9093")
.group_id("prod-group")
.auth(auth)
.build()
.await
.unwrap();
}
Next Steps
- Producer Guide - Configure authenticated producers
- Consumer Guide - Configure authenticated consumers
- Configuration Reference - All connection options