Schema Registry Guide
This guide covers Krafka’s schema registry integration, including the Confluent wire format, subject naming strategies, caching, and the built-in HTTP client.
Overview
Krafka provides schema registry support at two levels:
- Always available (no extra dependencies): Wire format encode/decode, subject name strategies, the
SchemaRegistryClienttrait,CachedSchemaRegistry, the Glue wire format, theGlueSchemaRegistryClienttrait, andCachedGlueSchemaRegistry. - Feature-gated (
schema-registry):ConfluentSchemaRegistryHTTP client for the Confluent Schema Registry. - Feature-gated (
aws-glue-schema-registry):AwsGlueSchemaRegistrySDK client for the AWS Glue Schema Registry.
Krafka handles the wire format framing and registry communication. Actual serialization (Avro, Protobuf, JSON Schema) is left to your preferred library — this keeps the dependency tree lean and gives you full control over serde.
Wire Format
The Confluent wire format prepends a 5-byte header to every serialized payload:
┌──────────┬────────────────────┬──────────────────┐
│ 0x00 (1B)│ Schema ID (4B, BE) │ Payload (N bytes)│
└──────────┴────────────────────┴──────────────────┘
Use encode_wire_format() and decode_wire_format():
use krafka::schema_registry::{encode_wire_format, decode_wire_format};
// Encoding: prepend wire format header to serialized data
let avro_bytes: Vec<u8> = serialize_with_avro(&my_record);
let framed = encode_wire_format(schema_id, &avro_bytes);
// framed is ready to use as a Kafka record value
// Decoding: strip the header to get schema ID + raw payload
let (schema_id, payload) = decode_wire_format(&record.value.unwrap())?;
// Use schema_id to look up the schema, then deserialize payload
You can also detect wire format before dispatching to Confluent/Glue-specific decoders:
use krafka::schema_registry::{detect_wire_format, DetectedWireFormat};
match detect_wire_format(data) {
DetectedWireFormat::Confluent { schema_id, payload_offset } => {
// route to Confluent registry
}
DetectedWireFormat::Glue { version_id, payload_offset } => {
// route to Glue registry
}
DetectedWireFormat::InvalidConfluent | DetectedWireFormat::InvalidGlue => {
// reject malformed framed data
}
DetectedWireFormat::Unknown | _ => {
// passthrough or custom handling
}
}
Zero-Copy Decoding with Bytes
When working with Bytes values (e.g., from CompactedTable), use decode_wire_format_bytes() for zero-copy slicing — the returned payload shares the same backing allocation:
use krafka::schema_registry::decode_wire_format_bytes;
// value is &Bytes from CompactedTable::get()
let (schema_id, payload) = decode_wire_format_bytes(value)?;
// payload is a Bytes slice — no copy, no allocation
Subject Name Strategies
A subject determines where a schema is registered and looked up in the registry. Krafka supports three strategies matching the Confluent conventions:
| Strategy | Subject format | Best for |
|---|---|---|
TopicName (default) |
{topic}-key / {topic}-value |
One schema per topic |
RecordName |
{record_name} |
Same type across multiple topics |
TopicRecordName |
{topic}-{record_name} |
Per-topic evolution of shared types |
use krafka::schema_registry::SubjectNameStrategy;
let strategy = SubjectNameStrategy::TopicName;
let subject = strategy.subject_name("orders", None, false)?;
assert_eq!(subject, "orders-value");
let strategy = SubjectNameStrategy::RecordName;
let subject = strategy.subject_name("orders", Some("com.example.Order"), false)?;
assert_eq!(subject, "com.example.Order");
Compatible Registries
The ConfluentSchemaRegistry HTTP client uses the standard Confluent REST API and works with any registry that implements it:
| Registry | Notes |
|---|---|
| Confluent Schema Registry | The reference implementation |
| Karapace (Aiven, Apache 2.0) | Drop-in replacement; compatible with Confluent SR API level 6.1.1 |
| Apicurio Registry (Red Hat, Apache 2.0) | Enable its Confluent-compatible API mode |
No code changes are needed — just point ConfluentSchemaRegistry at the compatible URL.
For AWS environments, the AwsGlueSchemaRegistry SDK client communicates with the AWS Glue Schema Registry via the AWS SDK.
Schema Registry Client Trait
The SchemaRegistryClient trait allows pluggable registry backends:
use krafka::schema_registry::{SchemaRegistryClient, Schema, SchemaId, SchemaType, SchemaVersion, SchemaReference};
use krafka::error::Result;
struct MyRegistry { /* ... */ }
impl SchemaRegistryClient for MyRegistry {
async fn get_schema_by_id(&self, id: SchemaId) -> Result<Schema> {
// Fetch from your registry backend
Ok(Schema::new(id, SchemaType::Avro, r#"{"type":"string"}"#))
}
async fn get_latest_schema(&self, subject: &str) -> Result<Schema> {
// Fetch the schema for the latest version of `subject`
todo!()
}
async fn get_schema_by_version(&self, subject: &str, version: SchemaVersion) -> Result<Schema> {
// Fetch the schema for a specific version of `subject`
todo!()
}
async fn register_schema(
&self,
subject: &str,
schema: &str,
schema_type: SchemaType,
references: &[SchemaReference],
) -> Result<SchemaId> {
// Register the schema and return its assigned ID
todo!()
}
}
Caching
CachedSchemaRegistry wraps any SchemaRegistryClient with an in-memory ID-to-schema cache. Schema IDs are immutable in the registry, so cached entries never expire unless you opt into bounded eviction with with_max_entries(). Concurrent cold misses for the same schema ID are also coalesced, so only one upstream request runs per ID at a time:
use krafka::schema_registry::CachedSchemaRegistry;
let cached = CachedSchemaRegistry::new(my_registry);
// First call fetches from the registry
let schema = cached.get_schema_by_id(1).await?;
// Second call is served from cache (no network request)
let same = cached.get_schema_by_id(1).await?;
// get_latest_schema always forwards but populates the ID cache
let latest = cached.get_latest_schema("orders-value").await?;
let by_id = cached.get_schema_by_id(latest.id).await?; // cache hit
// Inspect or clear the cache
println!("Cached schemas: {}", cached.cache_len());
cached.clear_cache();
// Invalidate one entry or all entries
cached.invalidate(1);
cached.invalidate_all();
// Optional: pre-warm immutable IDs at startup
cached.warm_cache(&[1, 2, 3]).await?;
// Optional: bound cache growth by evicting the oldest inserted IDs
let bounded = CachedSchemaRegistry::with_max_entries(other_registry, 1024);
CachedSchemaRegistry and CachedGlueSchemaRegistry also provide inherent async methods with the same names as their client traits for ergonomic calls on concrete cached types.
CachedGlueSchemaRegistry follows the same rules for AWS Glue schema version IDs: immutable-ID caching, concurrent miss coalescing, and optional bounded eviction via with_max_entries().
For provider-agnostic cache lifecycle operations, both wrappers implement AnySchemaCache:
use krafka::schema_registry::{AnySchemaCache, CachedSchemaRegistry, SchemaId};
async fn reset_and_prewarm(cache: &dyn AnySchemaCache<Id = SchemaId>) -> krafka::Result<()> {
cache.invalidate_all();
cache.warm_cache(&[1, 2, 3]).await
}
Confluent Schema Registry HTTP Client
Enable the schema-registry feature to use the built-in HTTP client:
[dependencies]
krafka = { version = "0.12.0", features = ["schema-registry"] }
Basic Usage
use krafka::schema_registry::{
ConfluentSchemaRegistry, CachedSchemaRegistry, SchemaType,
encode_wire_format, decode_wire_format,
};
// Create and cache the client
let client = ConfluentSchemaRegistry::new("http://localhost:8081");
let registry = CachedSchemaRegistry::new(client);
// Register a schema
let schema_id = registry.register_schema(
"orders-value",
r#"{"type":"record","name":"Order","fields":[{"name":"id","type":"string"}]}"#,
SchemaType::Avro,
&[],
).await?;
// Encode with wire format
let avro_bytes = serialize_order(&order);
let wire_bytes = encode_wire_format(schema_id, &avro_bytes);
producer.send("orders", Some(b"key"), &wire_bytes).await?;
// Decode from wire format
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
if let Some(value) = &record.value {
let (id, payload) = decode_wire_format(value)?;
let schema = registry.get_schema_by_id(id).await?;
let order = deserialize_order(payload, &schema.schema);
}
}
Authentication
use krafka::schema_registry::ConfluentSchemaRegistry;
// Basic auth
let client = ConfluentSchemaRegistry::builder()
.url("https://registry.example.com")
.basic_auth("user", "password")
.build()?;
// Bearer token
let client = ConfluentSchemaRegistry::builder()
.url("https://registry.example.com")
.bearer_token("my-jwt-token")
.build()?;
// Custom timeout
let client = ConfluentSchemaRegistry::builder()
.url("http://localhost:8081")
.request_timeout(Duration::from_secs(10))
.build()?;
Additional Operations
The HTTP client provides extra methods beyond the trait:
// Check schema compatibility (supports references)
let compatible = client.check_compatibility(
"orders-value",
&new_schema,
SchemaType::Avro,
&[], // pass SchemaReference values if the schema has dependencies
).await?;
// List all subjects
let subjects = client.get_subjects().await?;
// List all versions of a subject
let versions = client.get_versions("orders-value").await?;
// Delete a subject (soft-delete)
let deleted = client.delete_subject("orders-value", false).await?;
// Delete a subject (permanent hard-delete)
let deleted = client.delete_subject("orders-value", true).await?;
Schema References
For schemas with dependencies (e.g., Protobuf imports, Avro references), pass SchemaReference values when registering:
use krafka::schema_registry::{SchemaReference, SchemaType};
let refs = vec![
SchemaReference::new("com.example.Address", "address-value", 1),
];
let id = registry.register_schema(
"order-value",
&order_schema,
SchemaType::Avro,
&refs,
).await?;
Using with CompactedTable
CompactedTable stores key-value pairs as Bytes. When the values are Confluent wire-format encoded, use decode_wire_format_bytes() for zero-copy decoding:
use krafka::consumer::CompactedTopicConsumer;
use krafka::schema_registry::{
decode_wire_format_bytes, CachedSchemaRegistry, ConfluentSchemaRegistry,
};
// Set up the schema registry client with caching
let registry = CachedSchemaRegistry::new(
ConfluentSchemaRegistry::new("http://localhost:8081"),
);
// Build and start the compacted topic consumer
let ctc = CompactedTopicConsumer::builder()
.bootstrap_servers("localhost:9092")
.topic("user-profiles")
.build()
.await?;
// Look up a single key
if let Some(value) = ctc.table().get(b"user-42") {
let (schema_id, payload) = decode_wire_format_bytes(value)?;
let schema = registry.get_schema_by_id(schema_id).await?;
let user = deserialize_avro(&payload, &schema.schema);
}
// Iterate all entries
for (key, value) in ctc.table() {
let (schema_id, payload) = decode_wire_format_bytes(value)?;
let schema = registry.get_schema_by_id(schema_id).await?;
// schema_id lookups are cached after the first fetch
}
Since schema IDs are immutable, CachedSchemaRegistry ensures you only make one HTTP round-trip per schema ID, even when iterating thousands of table entries.
AWS Glue Schema Registry
For AWS MSK users, Krafka provides first-class AWS Glue Schema Registry support. Glue uses a completely different wire format and UUID-based schema version IDs.
Glue Wire Format
The Glue wire format uses an 18-byte header (vs Confluent’s 5-byte header):
┌──────────┬─────────────┬──────────────────────┬──────────────────┐
│ 0x03 (1B)│ Compr. (1B) │ Schema Version UUID │ Payload (N bytes)│
│ │ │ (16B, BE) │ │
└──────────┴─────────────┴──────────────────────┴──────────────────┘
- Byte 0: Header version byte (
0x03) - Byte 1: Compression indicator (
0x00= none,0x05= ZLIB) - Bytes 2–17: Schema version ID as a 128-bit UUID (big-endian)
- Bytes 18+: Payload (ZLIB-compressed if byte 1 is
0x05)
Encode and decode with the Glue-specific functions:
use krafka::schema_registry::glue::{
encode_glue_wire_format, decode_glue_wire_format,
GlueSchemaVersionId, GlueCompression,
};
// Encoding
let uuid: GlueSchemaVersionId = "550e8400-e29b-41d4-a716-446655440000".parse()?;
let framed = encode_glue_wire_format(uuid, &avro_bytes, GlueCompression::None)?;
producer.send("my-topic", Some(b"key"), &framed).await?;
// Decoding
let (version_id, payload) = decode_glue_wire_format(&record_bytes)?;
let payload = payload.as_ref();
ZLIB compression is supported out of the box:
// Encode with ZLIB compression
let framed = encode_glue_wire_format(uuid, &payload, GlueCompression::Zlib)?;
// Decode automatically decompresses
let (version_id, original) = decode_glue_wire_format(&framed)?;
let original = original.as_ref();
Note: ZLIB decompression output is capped at 128 MiB to protect against decompression bombs, matching the limit used by record-batch decompression.
For Bytes values (e.g., from CompactedTable), use decode_glue_wire_format_bytes() for zero-copy slicing on uncompressed payloads.
Glue Client Trait
The GlueSchemaRegistryClient trait allows pluggable backends (always available, no feature required):
use krafka::schema_registry::glue::{
GlueSchemaRegistryClient, GlueSchema, GlueSchemaVersionId, GlueDataFormat,
};
AWS SDK Client
Enable the aws-glue-schema-registry feature to use the built-in SDK client:
[dependencies]
krafka = { version = "0.12.0", features = ["aws-glue-schema-registry"] }
use krafka::schema_registry::glue::{
AwsGlueSchemaRegistry, CachedGlueSchemaRegistry,
decode_glue_wire_format,
};
// Create from AWS config
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
.load()
.await;
let glue_client = aws_sdk_glue::Client::new(&config);
let registry = CachedGlueSchemaRegistry::new(
AwsGlueSchemaRegistry::new(glue_client, "my-registry"),
);
// Decode and look up schema
let (version_id, payload) = decode_glue_wire_format(&record_bytes)?;
let payload = payload.as_ref();
let schema = registry.get_schema_by_version_id(version_id).await?;
// Deserialize payload using schema.schema_definition
Unified Decoder Dispatch
Use WireFormatDecoder to centralize Confluent/Glue dispatch and schema lookups
without writing magic-byte detection in application code.
It accepts Bytes and returns a [DecodedMessage] with a zero-copy payload:
use bytes::Bytes;
use krafka::schema_registry::{WireFormatDecoder, SchemaFormat};
let decoder = WireFormatDecoder::new()
.with_confluent(&confluent_registry)
.with_glue(&glue_registry);
let decoded = decoder.decode(record_bytes).await?;
match decoded.schema_format {
SchemaFormat::Unknown => {
// pass through non-schema-framed payload
}
_ => {
// decoded.payload (Bytes, zero-copy) + decoded.schema_metadata available
}
}
`WireFormatDecoder::decode()` biases toward safe passthrough on ambiguous payloads
whose first byte collides with a Confluent (`0x00`) or Glue (`0x03`) framing
prefix but do not carry a complete valid header. If you need strict malformed
header rejection, call `detect_wire_format()` and the low-level
`decode_wire_format()` / `decode_glue_wire_format()` helpers directly.
Advanced configuration via the builder:
let registry = AwsGlueSchemaRegistry::builder(glue_client)
.registry_name("my-custom-registry")
.auto_register(true) // auto-create schemas on first register
.poll_max_attempts(15)
.poll_interval(Duration::from_secs(2))
.build();
Confluent vs Glue: Quick Comparison
| Aspect | Confluent | AWS Glue |
|---|---|---|
| Wire format header | 5 bytes | 18 bytes |
| Schema identifier | u32 (integer ID) |
UUID (128-bit) |
| Compression | Not in wire format | ZLIB in header |
| API | HTTP REST | AWS SDK |
| Feature flag | schema-registry |
aws-glue-schema-registry |
| Trait | SchemaRegistryClient |
GlueSchemaRegistryClient |
| Caching wrapper | CachedSchemaRegistry |
CachedGlueSchemaRegistry |
Producer-Level Schema Encoding (ConfluentSchemaEncoder)
For the common case of encoding all records sent to a producer with the same schema,
use ConfluentSchemaEncoder to attach encoding directly to the producer.
This is the Rust equivalent of key.serializer / value.serializer in the Java KafkaProducer:
encoding is automatic on every send_record() call — no per-record boilerplate required.
[dependencies]
krafka = { version = "0.12.0", features = ["schema-registry"] }
use std::sync::Arc;
use krafka::schema_registry::{
ConfluentSchemaEncoder, CachedSchemaRegistry, ConfluentSchemaRegistry, SchemaType,
};
use krafka::producer::{Producer, ProducerRecord};
let registry = CachedSchemaRegistry::new(
ConfluentSchemaRegistry::new("http://localhost:8081"),
);
// Build once — schema ID is cached after the first send
let encoder = Arc::new(
ConfluentSchemaEncoder::builder()
.registry(registry)
.schema(
r#"{"type":"record","name":"Order","fields":[{"name":"id","type":"string"}]}"#,
SchemaType::Avro,
)
.build()?,
);
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.value_encoder(encoder) // encoding is now automatic
.build()
.await?;
// Send raw (pre-serialized) bytes — wire framing is transparent
producer.send_record(ProducerRecord::new("orders", avro_bytes)).await?;
Key + Value Encoding
Attach separate encoders for key and value:
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.key_encoder(key_encoder)
.value_encoder(value_encoder)
.build()
.await?;
Per-Record Subject Name Override
For RecordName or TopicRecordName strategies, set the record name on individual records:
use krafka::schema_registry::SubjectNameStrategy;
let encoder = Arc::new(
ConfluentSchemaEncoder::builder()
.registry(registry)
.schema(my_schema, SchemaType::Avro)
.strategy(SubjectNameStrategy::TopicRecordName)
.build()?,
);
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.value_encoder(encoder)
.build()
.await?;
producer.send_record(
ProducerRecord::new("orders", avro_bytes)
.with_record_name("com.example.Order"),
).await?;
Custom Encoder
Implement SchemaEncoder for custom framing logic (e.g., non-Confluent registries,
multi-schema routing):
use std::pin::Pin;
use std::future::Future;
use bytes::Bytes;
use krafka::schema_registry::SchemaEncoder;
use krafka::error::Result;
struct MyEncoder;
impl SchemaEncoder for MyEncoder {
fn encode(
&self,
payload: Bytes,
topic: &str,
record_name: Option<&str>,
is_key: bool,
) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>> {
let payload = payload.clone();
Box::pin(async move {
// Custom framing logic here
Ok(payload)
})
}
}
Consumer-Level Schema Decoding (SchemaDecoder / ConfluentSchemaDecoder)
Symmetric to the producer-level SchemaEncoder, a consumer can be configured with
key_decoder and/or value_decoder. After each poll() or recv(), and after
the consumer interceptor, every record’s key/value bytes are automatically passed
through the configured decoder before being returned to the caller.
This eliminates manual wire-format stripping in application code — equivalent to
key.deserializer / value.deserializer in the Java KafkaConsumer.
Basic Usage
use std::sync::Arc;
use krafka::consumer::Consumer;
use krafka::schema_registry::ConfluentSchemaDecoder;
// Strip Confluent wire-format header from all values automatically.
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.value_decoder(Arc::new(ConfluentSchemaDecoder::new()))
.build()
.await?;
consumer.subscribe(&["avro-topic"]).await?;
loop {
let records = consumer.poll(Duration::from_secs(1)).await?;
for record in &records {
// record.value already has the wire-format header stripped — it is
// the raw Avro/Protobuf/JSON bytes, as a zero-copy Bytes slice.
if let Some(payload) = &record.value {
let order = deserialize_order(payload)?;
}
}
}
Key + Value Decoding
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.key_decoder(Arc::new(ConfluentSchemaDecoder::new()))
.value_decoder(Arc::new(ConfluentSchemaDecoder::new()))
.build()
.await?;
Custom Decoder
Implement the SchemaDecoder trait to handle any custom framing or encryption layer:
use std::pin::Pin;
use std::future::Future;
use bytes::Bytes;
use krafka::schema_registry::SchemaDecoder;
use krafka::error::Result;
struct AesDecryptingDecoder { /* key material */ }
impl SchemaDecoder for AesDecryptingDecoder {
fn decode(
&self,
payload: Bytes,
_topic: &str,
_is_key: bool,
) -> Pin<Box<dyn Future<Output = Result<Bytes>> + Send + '_>> {
Box::pin(async move {
// Decrypt and return
let plaintext = self.decrypt(&payload)?;
Ok(Bytes::from(plaintext))
})
}
}
Design note:
value_decoderruns after the consumer interceptor. If your interceptor modifies the raw (framed) bytes, the decoder will see the modified value. If you need decoding to happen before the interceptor, implement it inside the interceptor itself.
Next Steps
- Producer Guide — sending schema-encoded records
- Consumer Guide — consuming and decoding records
- Authentication Guide — securing connections