Schema Registry Guide

This guide covers Krafka’s schema registry integration, including the Confluent wire format, subject naming strategies, caching, and the built-in HTTP client.

Overview

Krafka provides schema registry support at two levels:

  • Always available (no extra dependencies): Wire format encode/decode, subject name strategies, the SchemaRegistryClient trait, CachedSchemaRegistry, the Glue wire format, the GlueSchemaRegistryClient trait, and CachedGlueSchemaRegistry.
  • Feature-gated (schema-registry): ConfluentSchemaRegistry HTTP client for the Confluent Schema Registry.
  • Feature-gated (aws-glue-schema-registry): AwsGlueSchemaRegistry SDK client for the AWS Glue Schema Registry.

Krafka handles the wire format framing and registry communication. Actual serialization (Avro, Protobuf, JSON Schema) is left to your preferred library — this keeps the dependency tree lean and gives you full control over serde.

Wire Format

The Confluent wire format prepends a 5-byte header to every serialized payload:

┌──────────┬────────────────────┬──────────────────┐
│ 0x00 (1B)│ Schema ID (4B, BE) │ Payload (N bytes)│
└──────────┴────────────────────┴──────────────────┘

Use encode_wire_format() and decode_wire_format():

use krafka::schema_registry::{encode_wire_format, decode_wire_format};

// Encoding: prepend wire format header to serialized data
let avro_bytes: Vec<u8> = serialize_with_avro(&my_record);
let framed = encode_wire_format(schema_id, &avro_bytes);
// framed is ready to use as a Kafka record value

// Decoding: strip the header to get schema ID + raw payload
let (schema_id, payload) = decode_wire_format(&record.value.unwrap())?;
// Use schema_id to look up the schema, then deserialize payload

You can also detect wire format before dispatching to Confluent/Glue-specific decoders:

use krafka::schema_registry::{detect_wire_format, DetectedWireFormat};

match detect_wire_format(data) {
    DetectedWireFormat::Confluent { schema_id, payload_offset } => {
        // route to Confluent registry
    }
    DetectedWireFormat::Glue { version_id, payload_offset } => {
        // route to Glue registry
    }
    DetectedWireFormat::InvalidConfluent | DetectedWireFormat::InvalidGlue => {
        // reject malformed framed data
    }
    DetectedWireFormat::Unknown | _ => {
        // passthrough or custom handling
    }
}

Zero-Copy Decoding with Bytes

When working with Bytes values (e.g., from CompactedTable), use decode_wire_format_bytes() for zero-copy slicing — the returned payload shares the same backing allocation:

use krafka::schema_registry::decode_wire_format_bytes;

// value is &Bytes from CompactedTable::get()
let (schema_id, payload) = decode_wire_format_bytes(value)?;
// payload is a Bytes slice — no copy, no allocation

Subject Name Strategies

A subject determines where a schema is registered and looked up in the registry. Krafka supports three strategies matching the Confluent conventions:

Strategy Subject format Best for
TopicName (default) {topic}-key / {topic}-value One schema per topic
RecordName {record_name} Same type across multiple topics
TopicRecordName {topic}-{record_name} Per-topic evolution of shared types
use krafka::schema_registry::SubjectNameStrategy;

let strategy = SubjectNameStrategy::TopicName;
let subject = strategy.subject_name("orders", None, false)?;
assert_eq!(subject, "orders-value");

let strategy = SubjectNameStrategy::RecordName;
let subject = strategy.subject_name("orders", Some("com.example.Order"), false)?;
assert_eq!(subject, "com.example.Order");

Compatible Registries

The ConfluentSchemaRegistry HTTP client uses the standard Confluent REST API and works with any registry that implements it:

Registry Notes
Confluent Schema Registry The reference implementation
Karapace (Aiven, Apache 2.0) Drop-in replacement; compatible with Confluent SR API level 6.1.1
Apicurio Registry (Red Hat, Apache 2.0) Enable its Confluent-compatible API mode

No code changes are needed — just point ConfluentSchemaRegistry at the compatible URL.

For AWS environments, the AwsGlueSchemaRegistry SDK client communicates with the AWS Glue Schema Registry via the AWS SDK.

Schema Registry Client Trait

The SchemaRegistryClient trait allows pluggable registry backends:

use krafka::schema_registry::{SchemaRegistryClient, Schema, SchemaId, SchemaType, SchemaVersion, SchemaReference};
use krafka::error::Result;

struct MyRegistry { /* ... */ }

impl SchemaRegistryClient for MyRegistry {
    async fn get_schema_by_id(&self, id: SchemaId) -> Result<Schema> {
        // Fetch from your registry backend
        Ok(Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#))
    }

    async fn get_latest_schema(&self, subject: &str) -> Result<Schema> {
        // Fetch the schema for the latest version of `subject`
        todo!()
    }

    async fn get_schema_by_version(&self, subject: &str, version: SchemaVersion) -> Result<Schema> {
        // Fetch the schema for a specific version of `subject`
        todo!()
    }

    async fn register_schema(
        &self,
        subject: &str,
        schema: &str,
        schema_type: SchemaType,
        references: &[SchemaReference],
    ) -> Result<SchemaId> {
        // Register the schema and return its assigned ID
        todo!()
    }
}

Caching

CachedSchemaRegistry wraps any SchemaRegistryClient with an in-memory ID-to-schema cache. Schema IDs are immutable in the registry, so cached entries never expire unless you opt into bounded eviction with with_max_entries(). Concurrent cold misses for the same schema ID are also coalesced, so only one upstream request runs per ID at a time:

use krafka::schema_registry::CachedSchemaRegistry;

let cached = CachedSchemaRegistry::new(my_registry);

// First call fetches from the registry
let schema = cached.get_schema_by_id(1).await?;

// Second call is served from cache (no network request)
let same = cached.get_schema_by_id(1).await?;

// get_latest_schema always forwards but populates the ID cache
let latest = cached.get_latest_schema("orders-value").await?;
let by_id = cached.get_schema_by_id(latest.id).await?; // cache hit

// Inspect or clear the cache
println!("Cached schemas: {}", cached.cache_len());
cached.clear_cache();

// Invalidate one entry or all entries
cached.invalidate(1);
cached.invalidate_all();

// Optional: pre-warm immutable IDs at startup
cached.warm_cache(&[1, 2, 3]).await?;

// Optional: bound cache growth by evicting the oldest inserted IDs
let bounded = CachedSchemaRegistry::with_max_entries(other_registry, 1024);

CachedSchemaRegistry and CachedGlueSchemaRegistry also provide inherent async methods with the same names as their client traits for ergonomic calls on concrete cached types.

CachedGlueSchemaRegistry follows the same rules for AWS Glue schema version IDs: immutable-ID caching, concurrent miss coalescing, and optional bounded eviction via with_max_entries().

For provider-agnostic cache lifecycle operations, both wrappers implement AnySchemaCache:

use krafka::schema_registry::{AnySchemaCache, CachedSchemaRegistry, SchemaId};

async fn reset_and_prewarm(cache: &dyn AnySchemaCache<Id = SchemaId>) -> krafka::Result<()> {
    cache.invalidate_all();
    cache.warm_cache(&[1, 2, 3]).await
}

Confluent Schema Registry HTTP Client

Enable the schema-registry feature to use the built-in HTTP client:

[dependencies]
krafka = { version = "0.12.0", features = ["schema-registry"] }

Basic Usage

use krafka::schema_registry::{
    ConfluentSchemaRegistry, CachedSchemaRegistry, SchemaType,
    encode_wire_format, decode_wire_format,
};

// Create and cache the client
let client = ConfluentSchemaRegistry::new("http://localhost:8081");
let registry = CachedSchemaRegistry::new(client);

// Register a schema
let schema_id = registry.register_schema(
    "orders-value",
    r#"{"type":"record","name":"Order","fields":[{"name":"id","type":"string"}]}"#,
    SchemaType::Avro,
    &[],
).await?;

// Encode with wire format
let avro_bytes = serialize_order(&order);
let wire_bytes = encode_wire_format(schema_id, &avro_bytes);
producer.send("orders", Some(b"key"), &wire_bytes).await?;

// Decode from wire format
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
    if let Some(value) = &record.value {
        let (id, payload) = decode_wire_format(value)?;
        let schema = registry.get_schema_by_id(id).await?;
        let order = deserialize_order(payload, &schema.schema);
    }
}

Authentication

use krafka::schema_registry::ConfluentSchemaRegistry;

// Basic auth
let client = ConfluentSchemaRegistry::builder()
    .url("https://registry.example.com")
    .basic_auth("user", "password")
    .build()?;

// Bearer token
let client = ConfluentSchemaRegistry::builder()
    .url("https://registry.example.com")
    .bearer_token("my-jwt-token")
    .build()?;

// Custom timeout
let client = ConfluentSchemaRegistry::builder()
    .url("http://localhost:8081")
    .request_timeout(Duration::from_secs(10))
    .build()?;

Additional Operations

The HTTP client provides extra methods beyond the trait:

// Check schema compatibility (supports references)
let compatible = client.check_compatibility(
    "orders-value",
    &new_schema,
    SchemaType::Avro,
    &[],  // pass SchemaReference values if the schema has dependencies
).await?;

// List all subjects
let subjects = client.get_subjects().await?;

// List all versions of a subject
let versions = client.get_versions("orders-value").await?;

// Delete a subject (soft-delete)
let deleted = client.delete_subject("orders-value", false).await?;

// Delete a subject (permanent hard-delete)
let deleted = client.delete_subject("orders-value", true).await?;

Schema References

For schemas with dependencies (e.g., Protobuf imports, Avro references), pass SchemaReference values when registering:

use krafka::schema_registry::{SchemaReference, SchemaType};

let refs = vec![
    SchemaReference::new("com.example.Address", "address-value", 1),
];

let id = registry.register_schema(
    "order-value",
    &order_schema,
    SchemaType::Avro,
    &refs,
).await?;

Using with CompactedTable

CompactedTable stores key-value pairs as Bytes. When the values are Confluent wire-format encoded, use decode_wire_format_bytes() for zero-copy decoding:

use krafka::consumer::CompactedTopicConsumer;
use krafka::schema_registry::{
    decode_wire_format_bytes, CachedSchemaRegistry, ConfluentSchemaRegistry,
};

// Set up the schema registry client with caching
let registry = CachedSchemaRegistry::new(
    ConfluentSchemaRegistry::new("http://localhost:8081"),
);

// Build and start the compacted topic consumer
let ctc = CompactedTopicConsumer::builder()
    .bootstrap_servers("localhost:9092")
    .topic("user-profiles")
    .build()
    .await?;

// Look up a single key
if let Some(value) = ctc.table().get(b"user-42") {
    let (schema_id, payload) = decode_wire_format_bytes(value)?;
    let schema = registry.get_schema_by_id(schema_id).await?;
    let user = deserialize_avro(&payload, &schema.schema);
}

// Iterate all entries
for (key, value) in ctc.table() {
    let (schema_id, payload) = decode_wire_format_bytes(value)?;
    let schema = registry.get_schema_by_id(schema_id).await?;
    // schema_id lookups are cached after the first fetch
}

Since schema IDs are immutable, CachedSchemaRegistry ensures you only make one HTTP round-trip per schema ID, even when iterating thousands of table entries.

AWS Glue Schema Registry

For AWS MSK users, Krafka provides first-class AWS Glue Schema Registry support. Glue uses a completely different wire format and UUID-based schema version IDs.

Glue Wire Format

The Glue wire format uses an 18-byte header (vs Confluent’s 5-byte header):

┌──────────┬─────────────┬──────────────────────┬──────────────────┐
│ 0x03 (1B)│ Compr. (1B) │ Schema Version UUID  │ Payload (N bytes)│
│          │             │      (16B, BE)       │                  │
└──────────┴─────────────┴──────────────────────┴──────────────────┘
  • Byte 0: Header version byte (0x03)
  • Byte 1: Compression indicator (0x00 = none, 0x05 = ZLIB)
  • Bytes 2–17: Schema version ID as a 128-bit UUID (big-endian)
  • Bytes 18+: Payload (ZLIB-compressed if byte 1 is 0x05)

Encode and decode with the Glue-specific functions:

use krafka::schema_registry::glue::{
    encode_glue_wire_format, decode_glue_wire_format,
    GlueSchemaVersionId, GlueCompression,
};

// Encoding
let uuid: GlueSchemaVersionId = "550e8400-e29b-41d4-a716-446655440000".parse()?;
let framed = encode_glue_wire_format(uuid, &avro_bytes, GlueCompression::None)?;
producer.send("my-topic", Some(b"key"), &framed).await?;

// Decoding
let (version_id, payload) = decode_glue_wire_format(&record_bytes)?;
let payload = payload.as_ref();

ZLIB compression is supported out of the box:

// Encode with ZLIB compression
let framed = encode_glue_wire_format(uuid, &payload, GlueCompression::Zlib)?;

// Decode automatically decompresses
let (version_id, original) = decode_glue_wire_format(&framed)?;
let original = original.as_ref();

Note: ZLIB decompression output is capped at 128 MiB to protect against decompression bombs, matching the limit used by record-batch decompression.

For Bytes values (e.g., from CompactedTable), use decode_glue_wire_format_bytes() for zero-copy slicing on uncompressed payloads.

Glue Client Trait

The GlueSchemaRegistryClient trait allows pluggable backends (always available, no feature required):

use krafka::schema_registry::glue::{
    GlueSchemaRegistryClient, GlueSchema, GlueSchemaVersionId, GlueDataFormat,
};

AWS SDK Client

Enable the aws-glue-schema-registry feature to use the built-in SDK client:

[dependencies]
krafka = { version = "0.12.0", features = ["aws-glue-schema-registry"] }
use krafka::schema_registry::glue::{
    AwsGlueSchemaRegistry, CachedGlueSchemaRegistry,
    decode_glue_wire_format,
};

// Create from AWS config
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
    .load()
    .await;
let glue_client = aws_sdk_glue::Client::new(&config);

let registry = CachedGlueSchemaRegistry::new(
    AwsGlueSchemaRegistry::new(glue_client, "my-registry"),
);

// Decode and look up schema
let (version_id, payload) = decode_glue_wire_format(&record_bytes)?;
let payload = payload.as_ref();
let schema = registry.get_schema_by_version_id(version_id).await?;
// Deserialize payload using schema.schema_definition

Unified Decoder Dispatch

Use WireFormatDecoder to centralize Confluent/Glue dispatch and schema lookups without writing magic-byte detection in application code. It accepts Bytes and returns a [DecodedMessage] with a zero-copy payload:

use bytes::Bytes;
use krafka::schema_registry::{WireFormatDecoder, SchemaFormat};

let decoder = WireFormatDecoder::new()
    .with_confluent(&confluent_registry)
    .with_glue(&glue_registry);

let decoded = decoder.decode(record_bytes).await?;
match decoded.schema_format {
    SchemaFormat::Unknown => {
        // pass through non-schema-framed payload
    }
    _ => {
        // decoded.payload (Bytes, zero-copy) + decoded.schema_metadata available
    }
}
`WireFormatDecoder::decode()` biases toward safe passthrough on ambiguous payloads
whose first byte collides with a Confluent (`0x00`) or Glue (`0x03`) framing
prefix but do not carry a complete valid header. If you need strict malformed
header rejection, call `detect_wire_format()` and the low-level
`decode_wire_format()` / `decode_glue_wire_format()` helpers directly.

Advanced configuration via the builder:

let registry = AwsGlueSchemaRegistry::builder(glue_client)
    .registry_name("my-custom-registry")
    .auto_register(true)  // auto-create schemas on first register
    .poll_max_attempts(15)
    .poll_interval(Duration::from_secs(2))
    .build();

Confluent vs Glue: Quick Comparison

Aspect Confluent AWS Glue
Wire format header 5 bytes 18 bytes
Schema identifier u32 (integer ID) UUID (128-bit)
Compression Not in wire format ZLIB in header
API HTTP REST AWS SDK
Feature flag schema-registry aws-glue-schema-registry
Trait SchemaRegistryClient GlueSchemaRegistryClient
Caching wrapper CachedSchemaRegistry CachedGlueSchemaRegistry

Producer-Level Schema Encoding (ConfluentSchemaEncoder)

For the common case of encoding all records sent to a producer with the same schema, use ConfluentSchemaEncoder to attach encoding directly to the producer. This is the Rust equivalent of key.serializer / value.serializer in the Java KafkaProducer: encoding is automatic on every send_record() call — no per-record boilerplate required.

[dependencies]
krafka = { version = "0.12.0", features = ["schema-registry"] }
use std::sync::Arc;
use krafka::schema_registry::{
    ConfluentSchemaEncoder, CachedSchemaRegistry, ConfluentSchemaRegistry, SchemaType,
};
use krafka::producer::{Producer, ProducerRecord};

let registry = CachedSchemaRegistry::new(
    ConfluentSchemaRegistry::new("http://localhost:8081"),
);

// Build once — schema ID is cached after the first send
let encoder = Arc::new(
    ConfluentSchemaEncoder::builder()
        .registry(registry)
        .schema(
            r#"{"type":"record","name":"Order","fields":[{"name":"id","type":"string"}]}"#,
            SchemaType::Avro,
        )
        .build()?,
);

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .value_encoder(encoder)   // encoding is now automatic
    .build()
    .await?;

// Send raw (pre-serialized) bytes — wire framing is transparent
producer.send_record(ProducerRecord::new("orders", avro_bytes)).await?;

Key + Value Encoding

Attach separate encoders for key and value:

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .key_encoder(key_encoder)
    .value_encoder(value_encoder)
    .build()
    .await?;

Per-Record Subject Name Override

For RecordName or TopicRecordName strategies, set the record name on individual records:

use krafka::schema_registry::SubjectNameStrategy;

let encoder = Arc::new(
    ConfluentSchemaEncoder::builder()
        .registry(registry)
        .schema(my_schema, SchemaType::Avro)
        .strategy(SubjectNameStrategy::TopicRecordName)
        .build()?,
);

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .value_encoder(encoder)
    .build()
    .await?;

producer.send_record(
    ProducerRecord::new("orders", avro_bytes)
        .with_record_name("com.example.Order"),
).await?;

Custom Encoder

Implement SchemaEncoder for custom framing logic (e.g., non-Confluent registries, multi-schema routing):

use std::pin::Pin;
use std::future::Future;
use bytes::Bytes;
use krafka::schema_registry::SchemaEncoder;
use krafka::error::Result;

struct MyEncoder;

impl SchemaEncoder for MyEncoder {
    fn encode(
        &self,
        payload: Bytes,
        topic: &str,
        record_name: Option<&str>,
        is_key: bool,
    ) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>> {
        let payload = payload.clone();
        Box::pin(async move {
            // Custom framing logic here
            Ok(payload)
        })
    }
}

Consumer-Level Schema Decoding (SchemaDecoder / ConfluentSchemaDecoder)

Symmetric to the producer-level SchemaEncoder, a consumer can be configured with key_decoder and/or value_decoder. After each poll() or recv(), and after the consumer interceptor, every record’s key/value bytes are automatically passed through the configured decoder before being returned to the caller.

This eliminates manual wire-format stripping in application code — equivalent to key.deserializer / value.deserializer in the Java KafkaConsumer.

Basic Usage

use std::sync::Arc;
use krafka::consumer::Consumer;
use krafka::schema_registry::ConfluentSchemaDecoder;

// Strip Confluent wire-format header from all values automatically.
let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .value_decoder(Arc::new(ConfluentSchemaDecoder::new()))
    .build()
    .await?;

consumer.subscribe(&["avro-topic"]).await?;

loop {
    let records = consumer.poll(Duration::from_secs(1)).await?;
    for record in &records {
        // record.value already has the wire-format header stripped — it is
        // the raw Avro/Protobuf/JSON bytes, as a zero-copy Bytes slice.
        if let Some(payload) = &record.value {
            let order = deserialize_order(payload)?;
        }
    }
}

Key + Value Decoding

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .key_decoder(Arc::new(ConfluentSchemaDecoder::new()))
    .value_decoder(Arc::new(ConfluentSchemaDecoder::new()))
    .build()
    .await?;

Custom Decoder

Implement the SchemaDecoder trait to handle any custom framing or encryption layer:

use std::pin::Pin;
use std::future::Future;
use bytes::Bytes;
use krafka::schema_registry::SchemaDecoder;
use krafka::error::Result;

struct AesDecryptingDecoder { /* key material */ }

impl SchemaDecoder for AesDecryptingDecoder {
    fn decode(
        &self,
        payload: Bytes,
        _topic: &str,
        _is_key: bool,
    ) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>> {
        Box::pin(async move {
            // Decrypt and return
            let plaintext = self.decrypt(&payload)?;
            Ok(Bytes::from(plaintext))
        })
    }
}

Design note: value_decoder runs after the consumer interceptor. If your interceptor modifies the raw (framed) bytes, the decoder will see the modified value. If you need decoding to happen before the interceptor, implement it inside the interceptor itself.

Next Steps


Back to top

Licensed under MIT. Copyright © 2026 Krafka Contributors.