Authentication Guide

This guide covers authentication and encryption options for connecting to Kafka clusters.

Overview

Krafka supports multiple security protocols:

Protocol Encryption Authentication
PLAINTEXT No No
SSL Yes (TLS) Optional (mTLS)
SASL_PLAINTEXT No Yes (SASL)
SASL_SSL Yes (TLS) Yes (SASL)

Supported SASL Mechanisms

Mechanism Description
PLAIN Simple username/password
SCRAM-SHA-256 Challenge-response with SHA-256
SCRAM-SHA-512 Challenge-response with SHA-512
OAUTHBEARER OAuth 2.0 bearer tokens (RFC 7628 / KIP-255)
AWS_MSK_IAM AWS IAM authentication for MSK

Security Protocol Selection

use krafka::auth::{AuthConfig, SecurityProtocol};

// Check what's configured
let config = AuthConfig::sasl_scram_sha256("user", "pass");
println!("Protocol: {}", config.security_protocol);
println!("Requires TLS: {}", config.requires_tls());
println!("Requires SASL: {}", config.requires_sasl());

SASL Authentication

SASL/PLAIN

Simple username/password authentication. Always use with TLS in production!

use krafka::auth::AuthConfig;

// Without TLS (development only!)
let config = AuthConfig::sasl_plain("username", "password");

// With TLS (recommended for production)
use krafka::auth::TlsConfig;
let config = AuthConfig::sasl_plain_ssl("username", "password", TlsConfig::new());

SASL/SCRAM-SHA-256

Challenge-response authentication with SHA-256 hashing. More secure than PLAIN.

use krafka::auth::AuthConfig;

let config = AuthConfig::sasl_scram_sha256("username", "password");

SASL/SCRAM-SHA-512

Maximum security SCRAM authentication with SHA-512 hashing.

use krafka::auth::AuthConfig;

let config = AuthConfig::sasl_scram_sha512("username", "password");

SCRAM Protocol Details

The SCRAM client implements RFC 5802 with:

  • Salted Challenge-Response mechanism
  • PBKDF2 key derivation with iteration count validation (4,096–1,000,000 range)
  • HMAC signature verification
  • Constant-time comparison via the subtle crate (timing-attack resistant)
  • Automatic secret zeroization on drop (password, salted_password, server_signature)
  • Debug output redacts the password as [REDACTED]
use krafka::auth::{ScramClient, ScramMechanism, ScramState};

// Create SCRAM client
let mut scram = ScramClient::new("alice", "secret", ScramMechanism::Sha256);
assert_eq!(scram.state(), ScramState::Initial);

// Generate client-first message
let client_first = scram.client_first_message();
// -> "n,,n=alice,r=<nonce>"

// Process server-first message
// scram.process_server_first(server_response)?;

// Generate client-final message
// let client_final = scram.client_final_message()?;

// Verify server-final
// scram.process_server_final(server_response)?;

SASL/OAUTHBEARER

OAuth 2.0 bearer token authentication per RFC 7628 and KIP-255.

use krafka::auth::{AuthConfig, OAuthBearerToken};

// Basic token authentication
let config = AuthConfig::sasl_oauthbearer("your-jwt-token-here");

// With TLS (recommended for production)
use krafka::auth::TlsConfig;
let config = AuthConfig::sasl_oauthbearer_ssl("your-jwt-token-here", TlsConfig::new());

With SASL Extensions

For providers like Confluent Cloud that require additional SASL extensions:

use krafka::auth::{AuthConfig, OAuthBearerToken};

// Create token with extensions
let token = OAuthBearerToken::new("your-jwt-token")
    .with_extension("logicalCluster", "lkc-abc123")
    .with_extension("identityPoolId", "pool-xyz789");

let config = AuthConfig::sasl_oauthbearer_token(token);

// Or with TLS
use krafka::auth::TlsConfig;
let config = AuthConfig::sasl_oauthbearer_token_ssl(
    OAuthBearerToken::new("your-jwt-token")
        .with_extension("logicalCluster", "lkc-abc123"),
    TlsConfig::new(),
);

Builder Convenience Methods

All client builders support a shorthand .sasl_oauthbearer(token) method:

use krafka::producer::Producer;
use krafka::consumer::Consumer;

let producer = Producer::builder()
    .bootstrap_servers("broker:9093")
    .sasl_oauthbearer("your-jwt-token")
    .build()
    .await?;

let consumer = Consumer::builder()
    .bootstrap_servers("broker:9093")
    .group_id("my-group")
    .sasl_oauthbearer("your-jwt-token")
    .build()
    .await?;

OAUTHBEARER Protocol Details

The implementation follows RFC 7628 GS2 framing:

  • Initial response: n,,\x01auth=Bearer <token>[\x01key=value]*\x01\x01
  • Server success: Empty response (0 bytes)
  • Server error: JSON or text error message
  • Security: Token zeroized on drop via zeroize crate
  • Debug safety: Token redacted as [REDACTED] in Debug output
  • Extensions: Arbitrary key-value pairs appended to the GS2 frame

Note: GSSAPI/Kerberos is not supported. It requires system Kerberos libraries via FFI, which is incompatible with Krafka’s #![deny(unsafe_code)] policy. Use OAUTHBEARER or SCRAM as alternatives.

TLS/SSL Encryption

Basic TLS

Use Mozilla’s root certificates for server verification:

use krafka::auth::{AuthConfig, TlsConfig};

let config = AuthConfig::ssl(TlsConfig::new());

Custom CA Certificate

For self-signed or private CA certificates:

use krafka::auth::TlsConfig;

let tls_config = TlsConfig::new()
    .with_ca_cert("/path/to/ca.pem");

Mutual TLS (mTLS)

Client certificate authentication:

use krafka::auth::TlsConfig;

let tls_config = TlsConfig::new()
    .with_ca_cert("/path/to/ca.pem")
    .with_client_cert("/path/to/client.pem", "/path/to/client-key.pem");

SNI Hostname

For servers behind load balancers or proxies:

use krafka::auth::TlsConfig;

let mut tls_config = TlsConfig::new();
tls_config.sni_hostname = Some("kafka.example.com".to_string());

Skip Verification (Development Only)

Never use in production!

use krafka::auth::TlsConfig;

let tls_config = TlsConfig::insecure();

AWS MSK IAM Authentication

For AWS Managed Streaming for Apache Kafka using IAM authentication:

Binary Size Note: The aws-msk feature adds the AWS SDK, which increases binary size by approximately 2-3 MB (release build). If binary size is critical, use AwsMskIamCredentials::from_env() which works without the aws-msk feature.

The simplest approach is to load credentials from environment variables:

use krafka::auth::{AuthConfig, AwsMskIamCredentials};

// Load from AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, AWS_REGION
let creds = AwsMskIamCredentials::from_env()?;
let config = AuthConfig::aws_msk_iam_with_credentials(creds);

Environment variables used:

  • AWS_ACCESS_KEY_ID - Required
  • AWS_SECRET_ACCESS_KEY - Required
  • AWS_SESSION_TOKEN - Optional (for temporary credentials)
  • AWS_REGION or AWS_DEFAULT_REGION - Required

For production deployments on EC2, ECS, Lambda, or EKS, use the AWS SDK default chain:

use krafka::auth::{AuthConfig, AwsMskIamCredentials};

// Requires the `aws-msk` feature in Cargo.toml:
// krafka = { version = "0.2", features = ["aws-msk"] }

// Loads from (in order):
// 1. Environment variables
// 2. Shared credentials file (~/.aws/credentials)
// 3. IAM role for EC2/ECS/Lambda
// 4. Web identity token (for EKS)
let creds = AwsMskIamCredentials::from_default_chain("us-east-1").await?;
let config = AuthConfig::aws_msk_iam_with_credentials(creds);

With Explicit Credentials (Development Only)

For development or testing, you can provide credentials directly:

use krafka::auth::AuthConfig;

// With permanent credentials (avoid in production!)
let config = AuthConfig::aws_msk_iam(
    "AKIAIOSFODNN7EXAMPLE",
    "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    "us-east-1",
);

// With temporary credentials (session token)
use krafka::auth::AwsMskIamCredentials;

let creds = AwsMskIamCredentials::with_session_token(
    "AKIAIOSFODNN7EXAMPLE",
    "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    "session-token-here",
    "us-east-1",
);

Using SecureConnectionConfig with MSK IAM

use krafka::network::SecureConnectionConfig;

let config = SecureConnectionConfig::builder()
    .client_id("msk-client")
    .aws_msk_iam("AKID", "secret", "us-east-1")
    .build();

Direct MskIamAuthenticator Usage

For low-level control over the authentication process:

use krafka::auth::{AwsMskIamCredentials, MskIamAuthenticator};

let creds = AwsMskIamCredentials::new("AKID", "secret", "us-east-1");
let authenticator = MskIamAuthenticator::new(&creds, "broker.kafka.us-east-1.amazonaws.com");

// Generate signed authentication payload
let payload = authenticator.create_auth_payload();
// -> JSON with AWS Signature v4 signed request

MSK IAM Protocol Details

The implementation uses AWS Signature v4 signing:

  • Service Name: kafka-cluster
  • Action: kafka-cluster:Connect
  • Payload Format: JSON with signed headers
  • TLS Required: Always uses SASL_SSL (TLS is mandatory)
  • Region-Aware: Credentials are scoped to AWS region

Configuration Options

TlsConfig

Option Type Description
ca_cert_path Option<String> Path to CA certificate PEM file
client_cert_path Option<String> Path to client certificate PEM file
client_key_path Option<String> Path to client private key PEM file
verify_server_cert bool Whether to verify server certificates (default: true)
sni_hostname Option<String> SNI hostname for TLS handshake

AuthConfig

Method Protocol Mechanism
plaintext() PLAINTEXT None
ssl(TlsConfig) SSL None (TLS-only)
sasl_plain(user, pass) SASL_PLAINTEXT PLAIN
sasl_plain_ssl(user, pass, tls) SASL_SSL PLAIN
sasl_scram_sha256(user, pass) SASL_PLAINTEXT SCRAM-SHA-256
sasl_scram_sha512(user, pass) SASL_PLAINTEXT SCRAM-SHA-512
sasl_oauthbearer(token) SASL_PLAINTEXT OAUTHBEARER
sasl_oauthbearer_ssl(token, tls) SASL_SSL OAUTHBEARER
sasl_oauthbearer_token(OAuthBearerToken) SASL_PLAINTEXT OAUTHBEARER
sasl_oauthbearer_token_ssl(OAuthBearerToken, tls) SASL_SSL OAUTHBEARER
aws_msk_iam(key, secret, region) SASL_SSL AWS_MSK_IAM

Client Authentication

All Krafka clients — AdminClient, Producer, TransactionalProducer, and Consumer — support the same authentication methods through dedicated builder methods. Authentication is wired end-to-end: TLS upgrade and SASL handshake happen automatically during connection establishment.

Admin Client

use krafka::AdminClient;

// SASL/PLAIN
let admin = AdminClient::builder()
    .client_id("admin-client")
    .bootstrap_servers("broker:9092")
    .sasl_plain("username", "password")
    .build();

// SASL/SCRAM-SHA-256
let admin = AdminClient::builder()
    .bootstrap_servers("broker:9092")
    .sasl_scram_sha256("username", "password")
    .build();

// SASL/SCRAM-SHA-512
let admin = AdminClient::builder()
    .bootstrap_servers("broker:9092")
    .sasl_scram_sha512("username", "password")
    .build();

Producer

use krafka::producer::Producer;

// SASL/PLAIN
let producer = Producer::builder()
    .bootstrap_servers("broker:9092")
    .sasl_plain("username", "password")
    .build()
    .await?;

// SASL/SCRAM-SHA-256
let producer = Producer::builder()
    .bootstrap_servers("broker:9092")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

// SASL/SCRAM-SHA-512
let producer = Producer::builder()
    .bootstrap_servers("broker:9092")
    .sasl_scram_sha512("username", "password")
    .build()
    .await?;

Consumer

use krafka::consumer::Consumer;

// SASL/PLAIN
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9092")
    .group_id("my-group")
    .sasl_plain("username", "password")
    .build()
    .await?;

// SASL/SCRAM-SHA-256
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9092")
    .group_id("my-group")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

// SASL/SCRAM-SHA-512
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9092")
    .group_id("my-group")
    .sasl_scram_sha512("username", "password")
    .build()
    .await?;

Transactional Producer

use krafka::producer::TransactionalProducer;

// SASL/PLAIN
let producer = TransactionalProducer::builder()
    .bootstrap_servers("broker:9092")
    .transactional_id("my-txn-id")
    .sasl_plain("username", "password")
    .build()
    .await?;

// SASL/SCRAM-SHA-256
let producer = TransactionalProducer::builder()
    .bootstrap_servers("broker:9092")
    .transactional_id("my-txn-id")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

// SASL/SCRAM-SHA-512
let producer = TransactionalProducer::builder()
    .bootstrap_servers("broker:9092")
    .transactional_id("my-txn-id")
    .sasl_scram_sha512("username", "password")
    .build()
    .await?;

Generic AuthConfig

For advanced configurations or AWS MSK IAM, use .auth() on any builder:

use krafka::AdminClient;
use krafka::producer::{Producer, TransactionalProducer};
use krafka::consumer::Consumer;
use krafka::auth::AuthConfig;

let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");

// Works on all client types
let admin = AdminClient::builder()
    .bootstrap_servers("broker:9092")
    .auth(auth.clone())
    .build();

let producer = Producer::builder()
    .bootstrap_servers("broker:9092")
    .auth(auth.clone())
    .build()
    .await?;

let txn_producer = TransactionalProducer::builder()
    .bootstrap_servers("broker:9092")
    .transactional_id("my-txn-id")
    .auth(auth.clone())
    .build()
    .await?;

let consumer = Consumer::builder()
    .bootstrap_servers("broker:9092")
    .group_id("my-group")
    .auth(auth)
    .build()
    .await?;

Security Best Practices

  1. Always use TLS in production - Use SASL_SSL instead of SASL_PLAINTEXT
  2. Prefer SCRAM over PLAIN - SCRAM provides challenge-response security
  3. Use mTLS for strongest authentication - Client certificates are harder to steal
  4. Store credentials securely - Use environment variables or secrets managers
  5. Rotate credentials regularly - Especially for long-running applications
  6. Verify certificates in production - Never use TlsConfig::insecure() in production
  7. Automatic secret zeroization - All credential types (ScramClient, MskIamAuthenticator, PlainCredentials, ScramCredentials, OAuthBearerToken) zeroize secrets on drop to prevent memory leaks. SASL PLAIN auth bytes are wrapped in Zeroizing<Vec<u8>> and automatically zeroized after being sent on the wire.
  8. Debug safety - All credential types redact secrets in Debug output, so tracing::debug!("{:?}", auth) is safe to use
  9. Cleartext warning - Using SASL_PLAINTEXT with PLAIN emits a tracing::warn! alerting that credentials will be sent in cleartext

Secure Connection Configuration

For integrated TLS and SASL configuration, use SecureConnectionConfig:

use krafka::network::SecureConnectionConfig;
use krafka::auth::TlsConfig;
use std::time::Duration;

let config = SecureConnectionConfig::builder()
    .client_id("my-app")
    .connect_timeout(Duration::from_secs(10))
    .sasl_scram_sha256("username", "password")
    .tls(TlsConfig::new())
    .build();

SaslAuthenticator

For handling SASL handshakes, use SaslAuthenticator:

use krafka::network::SaslAuthenticator;
use krafka::auth::AuthConfig;

let auth = AuthConfig::sasl_scram_sha256("user", "pass");
let mut authenticator = SaslAuthenticator::new(&auth).unwrap();

// Get mechanism name for SASL handshake
let mechanism = authenticator.mechanism_name(); // "SCRAM-SHA-256"

// Get initial authentication bytes
let initial = authenticator.initial_response();

// Process server challenges
// let response = authenticator.process_challenge(&server_bytes)?;

// Check completion
if authenticator.is_complete() {
    println!("Authentication successful!");
}

Example: Production Configuration

use krafka::auth::{AuthConfig, TlsConfig};
use krafka::producer::Producer;
use krafka::consumer::Consumer;
use std::env;

fn production_auth_config() -> AuthConfig {
    let username = env::var("KAFKA_USER").expect("KAFKA_USER required");
    let password = env::var("KAFKA_PASSWORD").expect("KAFKA_PASSWORD required");
    
    let tls_config = TlsConfig::new()
        .with_ca_cert("/etc/ssl/certs/kafka-ca.pem");
    
    // SCRAM-SHA-512 over TLS
    AuthConfig {
        security_protocol: krafka::auth::SecurityProtocol::SaslSsl,
        sasl_mechanism: Some(krafka::auth::SaslMechanism::ScramSha512),
        scram_credentials: Some(krafka::auth::ScramCredentials::new(username, password)),
        tls_config: Some(tls_config),
        ..Default::default()
    }
}

// Use with any client
async fn create_clients() {
    let auth = production_auth_config();

    let producer = Producer::builder()
        .bootstrap_servers("kafka.prod.example.com:9093")
        .auth(auth.clone())
        .build()
        .await
        .unwrap();

    let consumer = Consumer::builder()
        .bootstrap_servers("kafka.prod.example.com:9093")
        .group_id("prod-group")
        .auth(auth)
        .build()
        .await
        .unwrap();
}

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.