Schema Registry

Schema management with JSON Schema, Avro, and Protobuf support.

Table of contents

  1. Overview
  2. Architecture
  3. Crate Architecture
  4. Deployment Modes
  5. Standalone Registry (rivven-schema)
    1. Quick Start
    2. Standalone Server
    3. CLI Commands
  6. Authentication & Security
    1. Authentication Methods
    2. Enable JWT/OIDC
    3. JWT/OIDC Configuration
    4. API Key Configuration
    5. RBAC Integration
    6. Cedar Policy-Based Authorization
  7. Kubernetes Deployment
    1. Install the CRD
    2. Using the Operator
    3. Manual Deployment
  8. Disabled Mode (No Schema Registry)
  9. External Mode
    1. Authentication Options
  10. AWS Glue Mode
  11. Supported Schema Formats
  12. Apache Avro
    1. Parsing Schemas
    2. Encoding and Decoding
    3. Standard Wire Format
    4. Schema Fingerprinting
    5. Compatibility Checking
      1. Avro Compatibility Rules
  13. Protocol Buffers
    1. Parsing Schemas
    2. Encoding and Decoding
    3. Standard Wire Format
    4. Compatibility Checking
      1. Protobuf Compatibility Rules
      2. JSON Schema Compatibility Rules
  14. Schema Evolution Strategies
    1. Compatibility Levels
    2. Best Practices
  15. Schema Inference
  16. Schema-Aware Sinks
    1. SchemaAwareSource
    2. Subject Naming Strategies
    3. Stdout Sink with Avro Output
  17. Integration Examples
    1. CDC with Avro Schemas
    2. External Queue Bridge with Protobuf
    3. AWS Glue with CDC
  18. Advanced Features
    1. Cache Configuration
    2. Schema Contexts (Multi-Tenancy)
    3. Version States (Schema Lifecycle)
    4. Content Validation Rules
    5. Prometheus Metrics
  19. REST API Reference
    1. Core Endpoints
    2. Version State Endpoints
    3. Content Validation Endpoints
    4. Schema Context Endpoints
    5. Statistics Endpoint
  20. API Reference
    1. AvroSchema
    2. AvroCodec
    3. ProtobufSchema
    4. ProtobufCodec

Overview

Rivven follows a schema-agnostic broker architecture where the broker handles only raw bytes. Schema management is handled by the dedicated rivven-schema crate, which provides:

  1. rivven-schema — Standalone Schema Registry (lib + binary) with industry-standard REST API
  2. rivven-connect — Schema integration for connectors (serialization/deserialization)

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                 Schema Registry Architecture                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   Producer                                                       │
│      │                                                           │
│      ├──► Schema Registry (rivven-schema) ──► Register Schema   │
│      │          │                                                │
│      │          ▼                                                │
│      │    Get Schema ID                                          │
│      │          │                                                │
│      ├──► Serialize (Avro/Protobuf/JSON)                        │
│      │          │                                                │
│      ▼          ▼                                                │
│   ┌─────────────────────────────────────────────────────┐       │
│   │              Broker (rivvend)                        │       │
│   │        Raw bytes only - schema agnostic             │       │
│   │     Topics, Partitions, Messages, Offsets           │       │
│   └─────────────────────────────────────────────────────┘       │
│      │                                                           │
│      ▼                                                           │
│   Consumer                                                       │
│      │                                                           │
│      ├──► Extract Schema ID from message header                 │
│      ├──► Schema Registry (rivven-schema) ──► Get Schema        │
│      └──► Deserialize (Avro/Protobuf/JSON)                      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Key Design Decisions:

  • Broker is schema-agnostic — only handles raw bytes, no schema processing
  • rivven-schema is the single source of truth for schema storage, versioning, and compatibility
  • rivven-connect handles serialization/deserialization with schema registry integration
  • Standard wire format — Compatible with common ecosystem tools

Crate Architecture

┌─────────────────────────────────────────────────────────────────┐
│                   rivven-schema (Schema Registry)                │
│  ├── Types: SchemaId, SchemaType, Subject, SchemaVersion        │
│  ├── Compatibility: CompatibilityLevel, CompatibilityChecker    │
│  ├── Fingerprinting: SHA-256 (primary), MD5 for deduplication   │
│  ├── Storage: MemoryStorage                                     │
│  └── Server: Industry-standard REST API                        │
├─────────────────────────────────────────────────────────────────┤
│                      rivven-connect (Connectors)                 │
│  ├── Re-exports from rivven-schema: SchemaId, SchemaType, etc.  │
│  ├── Codecs: AvroCodec, ProtobufCodec, JsonCodec (wire format)  │
│  ├── Config: ExternalConfig, GlueConfig, Disabled               │
│  └── Clients: ExternalRegistry, GlueRegistry                    │
├─────────────────────────────────────────────────────────────────┤
│                        rivvend (Broker)                          │
│  └── Schema-agnostic: only handles raw bytes                    │
└─────────────────────────────────────────────────────────────────┘

Deployment Modes

Rivven supports 3 schema modes for maximum flexibility:

Mode Component Description Use Case
External rivven-connect Connect to an external Schema Registry (including rivven-schema) Production, multi-cluster
External (AWS Glue) rivven-connect Connect to AWS Glue Schema Registry AWS-native deployments
Disabled rivven-connect JSON events without schemas Development, simple use cases

The schema registry provides:

  • Schema Evolution — Track schema versions with compatibility checking
  • Multiple Formats — Native support for JSON Schema, Avro, and Protobuf
  • Industry-Standard API — Wire format and REST API compatibility
  • Schema Fingerprinting — SHA-256 (primary) and MD5 fingerprints for caching and deduplication

Standalone Registry (rivven-schema)

The rivven-schema crate provides a standalone Schema Registry for schema storage, versioning, and compatibility checking.

Note: The registry stores and validates schemas. It does not encode/decode data. Use rivven-connect for Avro/Protobuf/JSON codecs.

Quick Start

use rivven_schema::{SchemaRegistry, RegistryConfig, SchemaType};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Create an in-memory registry
    let config = RegistryConfig::memory();
    let registry = SchemaRegistry::new(config).await?;

    // Register an Avro schema
    let avro_schema = r#"{
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "id", "type": "long"},
            {"name": "name", "type": "string"}
        ]
    }"#;
    
    let schema_id = registry.register("user-value", SchemaType::Avro, avro_schema).await?;
    println!("Registered schema with ID: {}", schema_id.0);

    // Retrieve the schema
    let schema = registry.get_by_id(schema_id).await?;
    println!("Schema: {}", schema.schema);

    // Check compatibility for schema evolution
    let new_schema = r#"{
        "type": "record",
        "name": "User",
        "fields": [
            {"name": "id", "type": "long"},
            {"name": "name", "type": "string"},
            {"name": "email", "type": ["null", "string"], "default": null}
        ]
    }"#;
    let result = registry.check_compatibility("user-value", SchemaType::Avro, new_schema, None).await?;
    println!("Compatible: {}", result.is_compatible);

    Ok(())
}

Standalone Server

# Start with in-memory storage
rivven-schema serve --port 8081

CLI Commands

# Check server health
rivven-schema health --url http://localhost:8081

# Register a schema
rivven-schema register --url http://localhost:8081 --subject user-value --schema schema.avsc

# Get a schema by ID
rivven-schema get --url http://localhost:8081 --id 1

# List all subjects
rivven-schema subjects --url http://localhost:8081

# Check compatibility
rivven-schema compat --url http://localhost:8081 --subject user-value --schema new.avsc

Authentication & Security

The standalone Schema Registry (rivven-schema) supports enterprise-grade authentication methods.

When authentication is enabled, all 15 mutating handlers — including register schema, delete subject (soft & permanent), update global/subject compatibility config, undelete, deprecate/disable/enable version, create/delete context, add/delete validation rules, and set version state — enforce subject-level permission checks. Read-only endpoints allow anonymous access only when explicitly configured. The middleware authenticates requests via the configured method, and each handler enforces authorization for the specific operation (Create, Delete, Alter) on the target subject.

Architecture note: The global compatibility level is stored directly in the registry (behind RwLock<CompatibilityLevel>) as the single source of truth. The REST API layer propagates compatibility changes to the registry immediately — there is no separate copy that can drift.

Authentication Methods

Method Header Feature Flag Use Case
HTTP Basic Auth Authorization: Basic base64(user:pass) built-in Simple deployments
Bearer Token Authorization: Bearer <session-id> built-in Session-based auth
JWT/OIDC Authorization: Bearer <jwt> jwt Enterprise SSO
API Keys X-API-Key: <key> built-in Service-to-service auth

Enable JWT/OIDC

Basic auth, bearer tokens, and API keys are always available. For JWT/OIDC support:

cargo build -p rivven-schema --features jwt

JWT/OIDC Configuration

auth:
  enabled: true
  jwt:
    # Token validation
    secret: "${JWT_SECRET}"           # For HS256
    # Or RSA public key for RS256
    rsa_public_key_path: /etc/rivven/jwt-public.pem
    # Or JWKS endpoint for key rotation
    jwks_url: "https://auth.example.com/.well-known/jwks.json"
    
    # Validation options
    issuer: "https://auth.example.com"
    audience: "rivven-schema"
    algorithms: ["RS256", "ES256"]

API Key Configuration

auth:
  enabled: true
  api_keys:
    - key: "sk_prod_abc123..."
      principal: "service-account-1"
      description: "Production connector"
    - key: "sk_prod_xyz789..."
      principal: "service-account-2"
      description: "Analytics pipeline"

RBAC Integration

Authentication integrates with rivven-core’s RBAC system:

auth:
  enabled: true
  allow_anonymous_read: false  # Require auth for all operations
  
  # Per-subject permissions
  acls:
    - principal: "team-a"
      subjects: ["team-a-*"]
      permissions: [read, write, delete]
    - principal: "readonly-service"
      subjects: ["*"]
      permissions: [read]

Cedar Policy-Based Authorization

For fine-grained, policy-as-code authorization:

Cedar authorization is fully wired and evaluates every request through cedar_policy::Authorizer::is_authorized(). Policies are checked at runtime — this is not a stub. See Cedar documentation for details.

cargo build -p rivven-schema --features cedar
// Allow schema admins full access
permit(
  principal in Rivven::Group::"schema-admins",
  action,
  resource is Rivven::Schema
);

// Allow teams to manage their own schemas
permit(
  principal,
  action in [Rivven::Action::"create", Rivven::Action::"alter"],
  resource is Rivven::Schema
) when {
  resource.name.startsWith(principal.team + "-")
};

Kubernetes Deployment

Deploy the Schema Registry on Kubernetes using the RivvenSchemaRegistry CRD:

Install the CRD

The CRD definition is available at deploy/crds/rivvenschemaregistry-crd.yaml:

kubectl apply -f crates/rivven-operator/deploy/crds/rivvenschemaregistry-crd.yaml

Or generate it from the operator binary:

rivven-operator --print-crd | kubectl apply -f -

The CRD provides the short name rsr for quick lookups:

kubectl get rsr
# NAME              CLUSTER      REPLICAS   READY   SCHEMAS   PHASE     AGE
# schema-registry   production   2          2       42        Running   3d

Using the Operator

apiVersion: rivven.hupe1980.github.io/v1alpha1
kind: RivvenSchemaRegistry
metadata:
  name: schema-registry
  namespace: default
spec:
  clusterRef:
    name: production
  
  replicas: 2
  version: "0.0.22"
  
  # Server configuration
  server:
    port: 8081
    requestTimeoutMs: 30000
    corsEnabled: true
  
  # Store schemas in broker topic
  storage:
    mode: broker
    topic: _schemas
    replicationFactor: 3
  
  # Compatibility settings
  compatibility:
    defaultLevel: BACKWARD
    perSubject:
      "order-events-value": FULL
  
  # Enable all schema formats
  formats:
    avro: true
    jsonSchema: true
    protobuf: true
  
  # JWT authentication
  auth:
    enabled: true
    method: jwt
    jwt:
      issuerUrl: "https://auth.example.com"
      audience: "schema-registry"
  
  # TLS encryption
  tls:
    enabled: true
    certSecretName: schema-registry-tls
  
  # Prometheus metrics
  metrics:
    enabled: true
    serviceMonitorEnabled: true
kubectl apply -f schema-registry.yaml

# Check status
kubectl get rivvenschemaregistries
kubectl describe rivvenschemaregistry schema-registry

Manual Deployment

For manual deployments without the operator:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rivven-schema-registry
spec:
  replicas: 2
  selector:
    matchLabels:
      app: rivven-schema-registry
  template:
    metadata:
      labels:
        app: rivven-schema-registry
    spec:
      containers:
        - name: schema-registry
          image: ghcr.io/hupe1980/rivven-schema:0.0.22
          ports:
            - containerPort: 8081
          env:
            - name: RIVVEN_SCHEMA_PORT
              value: "8081"
            - name: RIVVEN_SCHEMA_STORAGE_MODE
              value: "broker"
            - name: RIVVEN_BROKERS
              value: "rivven-headless:9092"
          livenessProbe:
            httpGet:
              path: /health
              port: 8081
          readinessProbe:
            httpGet:
              path: /health
              port: 8081
---
apiVersion: v1
kind: Service
metadata:
  name: rivven-schema-registry
spec:
  selector:
    app: rivven-schema-registry
  ports:
    - port: 8081
      targetPort: 8081

See the Kubernetes Deployment Guide for more details.


Disabled Mode (No Schema Registry)

For development or simple use cases, you can disable the schema registry entirely:

# rivven-connect.yaml
schema_registry:
  mode: disabled

Events are sent as plain JSON without schema validation or the 5-byte schema ID header.

External Mode

Connect to an external schema registry (including rivven-schema) for multi-cluster environments or when integrating with existing infrastructure.

Authentication Options

rivven-connect supports multiple authentication methods when connecting to external registries:

schema_registry:
  mode: external
  external:
    url: "https://schema-registry.example.com:8081"
    
    # Option 1: Basic Auth (username/password)
    auth:
      type: basic
      username: "${SCHEMA_REGISTRY_USER}"
      password: "${SCHEMA_REGISTRY_PASSWORD}"
    
    # Option 2: OAuth2/OIDC
    # auth:
    #   type: oauth2
    #   client_id: "${OAUTH_CLIENT_ID}"
    #   client_secret: "${OAUTH_CLIENT_SECRET}"
    #   token_url: "https://auth.example.com/oauth/token"
    #   scope: "registry:read registry:write"
    
    # Option 3: mTLS (Enterprise)
    # auth:
    #   type: mtls
    #   client_cert_path: /etc/rivven/client.crt
    #   client_key_path: /etc/rivven/client.key
    #   ca_cert_path: /etc/rivven/ca.crt
    
    # Option 4: API Key
    # auth:
    #   type: api_key
    #   key: "${API_KEY}"
    #   header_name: "X-API-Key"
    
    # Option 5: Bearer Token
    # auth:
    #   type: bearer
    #   token: "${BEARER_TOKEN}"
    
    timeout_secs: 30
    max_retries: 3
    retry_backoff_ms: 100
use rivven_connect::schema::{SchemaRegistryClient, SchemaRegistryConfig};

let config = SchemaRegistryConfig::external("http://schema-registry:8081");
let registry = SchemaRegistryClient::from_config_async(&config, None).await?;

AWS Glue Mode

Connect to AWS Glue Schema Registry for AWS-native deployments. Supports all schema types (Avro, JSON Schema, Protobuf) and integrates with AWS IAM for authentication.

schema_registry:
  mode: glue
  glue:
    region: us-east-1
    registry_name: my-registry  # optional, defaults to "default-registry"
    cache_ttl_secs: 300         # schema cache TTL

Features:

  • Full AWS Glue Schema Registry API support
  • IAM authentication (uses default AWS credential chain)
  • Schema versioning and compatibility checking
  • Automatic UUID-to-integer ID mapping for standard wire format compatibility
  • Schema caching for performance

Environment Variables:

# AWS credentials (if not using IAM roles)
export AWS_ACCESS_KEY_ID=your_access_key
export AWS_SECRET_ACCESS_KEY=your_secret_key
export AWS_REGION=us-east-1

Example (Rust):

use rivven_connect::schema::{SchemaRegistryClient, SchemaRegistryConfig, SchemaType, Subject};

// Create Glue registry client
let config = SchemaRegistryConfig::glue("us-east-1");
let registry = SchemaRegistryClient::from_config_async(&config, None).await?;

// Register a schema
let schema_id = registry.register(
    &Subject::new("users-value"),
    SchemaType::Avro,
    r#"{"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}"#
).await?;

// Get schema by ID
let schema = registry.get_by_id(schema_id).await?;

Supported Schema Formats

Format Status Features
JSON Schema ✅ Full Draft-07 support, validation, inference
Apache Avro ✅ Full Encoding/decoding, fingerprinting, standard wire format
Protocol Buffers ✅ Full Dynamic parsing, encoding/decoding, standard wire format

Apache Avro

Parsing Schemas

use rivven_connect::schema::{AvroSchema, AvroCodec};

let schema = AvroSchema::parse(r#"{
    "type": "record",
    "name": "User",
    "namespace": "com.example",
    "fields": [
        {"name": "id", "type": "long"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": null},
        {"name": "tags", "type": {"type": "array", "items": "string"}},
        {"name": "metadata", "type": {"type": "map", "values": "string"}}
    ]
}"#)?;

// Access schema metadata
println!("Name: {}", schema.name().unwrap());           // "User"
println!("Namespace: {}", schema.namespace().unwrap()); // "com.example"
println!("Fullname: {}", schema.fullname().unwrap());   // "com.example.User"

// List fields
for field in schema.fields().unwrap() {
    println!("Field: {} ({})", field.name, field.schema_type);
}

Encoding and Decoding

use rivven_connect::schema::AvroCodec;

let codec = AvroCodec::new(schema);

// Encode JSON to Avro binary
let json = serde_json::json!({
    "id": 12345,
    "name": "Alice",
    "email": "alice@example.com",
    "tags": ["admin", "developer"],
    "metadata": {"department": "engineering"}
});

let avro_bytes = codec.encode(&json)?;

// Decode Avro binary to JSON
let decoded = codec.decode(&avro_bytes)?;
assert_eq!(decoded["name"], "Alice");

Standard Wire Format

The standard wire format prefixes Avro data with a magic byte and 4-byte schema ID:

+----------+------------+------------------+
| Magic(1) | SchemaID(4)| Avro Data (N)    |
+----------+------------+------------------+
|   0x00   | Big-endian |  Avro binary     |
+----------+------------+------------------+
// Encode with schema ID
let schema_id: u32 = 12345;
let wire_bytes = codec.encode_wire_format(&json, schema_id)?;

// Decode and retrieve schema ID
let (schema_id, decoded) = codec.decode_wire_format(&wire_bytes)?;
println!("Schema ID: {}", schema_id);

Schema Fingerprinting

Schema fingerprints are useful for caching and identifying schema versions:

let schema = AvroSchema::parse("...")?;

// SHA-256 fingerprint (32 bytes) — used as primary for dedup lookup
let sha256 = schema.fingerprint_sha256();
println!("SHA-256: {:02x?}", sha256);

// MD5 fingerprint (16 bytes) — available for Confluent compatibility
let md5 = schema.fingerprint_md5();
println!("MD5: {:02x?}", md5);

Compatibility Checking

use rivven_connect::schema::AvroCompatibility;

let old_schema = AvroSchema::parse(r#"{"type": "record", "name": "User", "fields": [
    {"name": "id", "type": "long"}
]}"#)?;

let new_schema = AvroSchema::parse(r#"{"type": "record", "name": "User", "fields": [
    {"name": "id", "type": "long"},
    {"name": "name", "type": "string", "default": ""}
]}"#)?;

// Backward compatibility: new readers can read old data
let backward = AvroCompatibility::check_backward(&new_schema, &old_schema)?;
assert!(backward); // ✅ Adding field with default is backward compatible

// Forward compatibility: old readers can read new data
let forward = AvroCompatibility::check_forward(&new_schema, &old_schema)?;

// Full compatibility: both directions
let full = AvroCompatibility::check_full(&new_schema, &old_schema)?;

Avro Compatibility Rules

Change Backward Forward Full
Add field with default
Add field without default
Remove field with default
Remove field without default
Rename field
Promote int → long
Promote float → double

Protocol Buffers

Parsing Schemas

use rivven_connect::schema::{ProtobufSchema, ProtobufCodec};

let schema = ProtobufSchema::parse(r#"
    syntax = "proto3";
    package com.example;
    
    enum Status {
        UNKNOWN = 0;
        ACTIVE = 1;
        INACTIVE = 2;
    }
    
    message User {
        int64 id = 1;
        string name = 2;
        optional string email = 3;
        repeated string tags = 4;
        Status status = 5;
    }
"#)?;

// Access schema metadata
println!("Message: {}", schema.message_name());   // "User"
println!("Package: {:?}", schema.package());      // Some("com.example")
println!("Fullname: {}", schema.fullname());      // "com.example.User"

// List fields
for field in schema.fields()? {
    println!("Field: {} (#{}) - {:?}", 
        field.name, field.number, field.kind);
}

Encoding and Decoding

let codec = ProtobufCodec::new(schema);

// Encode JSON to Protobuf binary
let json = serde_json::json!({
    "id": 12345,
    "name": "Alice",
    "email": "alice@example.com",
    "tags": ["admin", "developer"],
    "status": "ACTIVE"
});

let proto_bytes = codec.encode(&json)?;

// Decode Protobuf binary to JSON
let decoded = codec.decode(&proto_bytes)?;
assert_eq!(decoded["name"], "Alice");
assert_eq!(decoded["status"], "ACTIVE");

Standard Wire Format

The standard wire format for Protobuf includes a message index:

+----------+------------+--------------+------------------+
| Magic(1) | SchemaID(4)| MsgIndex(N)  | Protobuf Data(N) |
+----------+------------+--------------+------------------+
|   0x00   | Big-endian | Varint(0)    | Protobuf binary  |
+----------+------------+--------------+------------------+
// Encode with schema ID
let schema_id: u32 = 12345;
let wire_bytes = codec.encode_wire_format(&json, schema_id)?;

// Decode and retrieve schema ID
let (schema_id, decoded) = codec.decode_wire_format(&wire_bytes)?;

Compatibility Checking

use rivven_connect::schema::ProtobufCompatibility;

let old_schema = ProtobufSchema::parse(r#"
    syntax = "proto3";
    message User { int64 id = 1; }
"#)?;

let new_schema = ProtobufSchema::parse(r#"
    syntax = "proto3";
    message User { int64 id = 1; string name = 2; }
"#)?;

// Adding new fields is compatible in proto3
let compatible = ProtobufCompatibility::check_full(&new_schema, &old_schema)?;
assert!(compatible); // ✅ New optional field is fully compatible

Protobuf Compatibility Rules

Change Backward Forward Full
Add optional field
Add repeated field
Remove optional field
Reuse field number
Change field type
Wire-compatible promotion*

*Wire-compatible type groups:

  • Varint: int32, int64, uint32, uint64, bool
  • 32-bit: fixed32, sfixed32, float
  • 64-bit: fixed64, sfixed64, double

JSON Schema Compatibility Rules

JSON Schema compatibility checking recursively validates nested object structures, not just top-level properties.

Change Backward Forward Full
Add optional property
Add required property
Remove optional property
Remove required property
Change property type
Widen type (e.g. int → number)
Add enum value
Remove enum value
Allow additionalProperties
Disallow additionalProperties
Change nested object property Recurse Recurse Recurse
Change array items schema Recurse Recurse Recurse

Nested properties are reported with dotted paths (e.g., address.street) for clear error messages.

Schema Evolution Strategies

Compatibility Levels

Level Description Use Case
backward New readers can read old data Rolling upgrades (consumers first)
forward Old readers can read new data Rolling upgrades (producers first)
full Both backward and forward Maximum flexibility
none No compatibility checking Development only

Best Practices

  1. Always use defaults for new fields in Avro
  2. Reserve field numbers in Protobuf before removal
  3. Never reuse field numbers or names
  4. Test compatibility before deploying schema changes
  5. Use full compatibility when possible for maximum safety

Schema Inference

Automatically infer schemas from sample data:

use rivven_connect::schema::{SchemaInference, infer_schema_from_samples};

let samples = vec![
    serde_json::json!({"id": 1, "name": "Alice", "active": true}),
    serde_json::json!({"id": 2, "name": "Bob", "age": 30}),
    serde_json::json!({"id": 3, "name": "Carol", "active": false, "age": 25}),
];

let schema = infer_schema_from_samples(&samples)?;
// Produces JSON Schema with merged properties from all samples

Schema-Aware Sinks

Wrap any sink with automatic schema registration and Avro serialization:

use rivven_connect::schema::{SchemaAwareSink, SchemaAwareConfig, SubjectStrategy};
use rivven_connect::connectors::stdout::StdoutSink;

// Create a schema-aware wrapper around any sink
let registry = SchemaRegistryClient::from_config_async(&config, None).await?;
let schema_sink = SchemaAwareSink::with_config(
    StdoutSink::new(),
    Arc::new(registry),
    SchemaAwareConfig {
        subject_strategy: SubjectStrategy::TopicName,  // "{stream}-value"
        auto_register: true,                           // auto-register inferred schemas
        wire_format: true,                             // 5-byte header
        schema_type: SchemaType::Avro,
    }
);

// Events will have schema_id and schema_subject added to metadata
// Data is serialized with Avro before writing

SchemaAwareSource

Wrap any source with automatic Avro serialization and schema registration:

use rivven_connect::schema::{SchemaAwareSource, SchemaAwareConfig, SubjectStrategy};
use rivven_connect::connectors::postgres::PostgresCdcSource;

// Create a schema-aware wrapper around any source
let registry = SchemaRegistryClient::from_config_async(&config, None).await?;
let schema_source = SchemaAwareSource::with_config(
    PostgresCdcSource::new(),
    Arc::new(registry),
    SchemaAwareConfig {
        subject_strategy: SubjectStrategy::TopicName,  // "{stream}-value"
        auto_register: true,                           // auto-register inferred schemas
        wire_format: true,                             // 5-byte header
        schema_type: SchemaType::Avro,
    }
);

// Events from the source will have:
// - Data serialized as Avro bytes (standard wire format)
// - Schema registered in the schema registry
// - schema_id and schema_subject in event metadata
let stream = schema_source.read(&config, &catalog, state).await?;

Subject Naming Strategies

Strategy Pattern Example
TopicName {stream}-value users-value
RecordName {type} User (from __type field)
TopicRecordName {stream}-{type} users-Created
Custom User-defined fn(&str, &Value) -> String

Stdout Sink with Avro Output

The stdout sink supports Avro output formats for debugging:

sink:
  type: stdout
  config:
    format: avrobinary  # avrojson, avrobinary, or avrohex
    avro_schema: |
      {
        "type": "record",
        "name": "User",
        "fields": [
          {"name": "id", "type": "long"},
          {"name": "name", "type": "string"}
        ]
      }
    wire_format: true  # include 5-byte header

Integration Examples

CDC with Avro Schemas

connectors:
  - name: postgres-users
    type: postgres-cdc
    config:
      connection_string: "postgres://..."
      table: "users"
      schema_registry:
        mode: external
        external:
          url: "http://schema-registry:8081"
        format: avro
        compatibility: backward

External Queue Bridge with Protobuf

connectors:
  - name: external-orders
    type: external-source
    config:
      brokers: ["broker:9092"]
      topic: "orders"
      schema_registry:
        mode: external
        url: "http://schema-registry:8081"
        format: protobuf

AWS Glue with CDC

connectors:
  - name: mysql-orders
    type: mysql-cdc
    config:
      host: "mysql.example.com"
      database: "orders"
      schema_registry:
        mode: glue
        glue:
          region: us-east-1
          registry_name: orders-registry
        format: avro

Advanced Features

Cache Configuration

The schema registry maintains in-memory caches for fast schema lookups. Caches are bounded to prevent unbounded memory growth:

Setting Default Description
max_cache_size 10,000 Maximum number of schemas cached in memory

When the cache exceeds the configured limit, entries are evicted to make room for new schemas.

schema_registry:
  max_cache_size: 10000    # Maximum cached schemas (default: 10,000)

The validation regex cache is independently bounded to 1,000 compiled patterns.

Schema Contexts (Multi-Tenancy)

Schema contexts provide namespace isolation for multi-tenant deployments. Each context acts as a separate namespace for subjects, enabling:

  • Tenant isolation — Different teams/applications can use the same subject names
  • Environment separation — dev/staging/prod contexts without name conflicts
  • Access control — Per-context permissions
use rivven_schema::{SchemaRegistry, SchemaContext, RegistryConfig};

// Create a registry
let registry = SchemaRegistry::new(RegistryConfig::memory()).await?;

// Create a tenant context
let tenant_ctx = SchemaContext::new("tenant-acme")
    .with_description("ACME Corp schemas");
registry.create_context(tenant_ctx)?;

// Register schema in context using qualified subject name
// Format: :.context:subject
let schema_id = registry.register(
    ":.tenant-acme:user-value",
    SchemaType::Avro,
    r#"{"type": "record", "name": "User", "fields": [...]}"#
).await?;

// List subjects in context
let subjects = registry.list_subjects_in_context("tenant-acme");

// List all contexts
let contexts = registry.list_contexts();

REST API:

# Create context
POST /contexts {"name": "tenant-acme", "description": "ACME Corp"}

# List contexts
GET /contexts

# List subjects in context
GET /contexts/tenant-acme/subjects

# Delete context (must be empty)
DELETE /contexts/tenant-acme

Version States (Schema Lifecycle)

Version states enable schema lifecycle management:

State Description Behavior
Enabled Active, fully usable Default state for new schemas
Deprecated Discouraged but usable Returns warning in API response
Disabled Blocked from use Returns 403 Forbidden
use rivven_schema::{SchemaVersion, VersionState};

// Deprecate a version (warns clients to migrate)
registry.deprecate_version("user-value", SchemaVersion::new(1)).await?;

// Disable a version (blocks usage entirely)
registry.disable_version("user-value", SchemaVersion::new(1)).await?;

// Re-enable a version
registry.enable_version("user-value", SchemaVersion::new(1)).await?;

// Check version state
let state = registry.get_version_state("user-value", SchemaVersion::new(1)).await?;
match state {
    VersionState::Enabled => println!("Active"),
    VersionState::Deprecated => println!("Deprecated - migrate soon"),
    VersionState::Disabled => println!("Blocked"),
}

REST API:

# Get version state
GET /subjects/user-value/versions/1/state

# Set version state
PUT /subjects/user-value/versions/1/state
Content-Type: application/json
{"state": "DEPRECATED"}

# Convenience endpoints
POST /subjects/user-value/versions/1/deprecate
POST /subjects/user-value/versions/1/disable
POST /subjects/user-value/versions/1/enable

Content Validation Rules

Beyond compatibility checking, the validation engine enforces content rules on schemas:

Rule Type Description Example Config
MaxSize Maximum schema size in bytes {"max_bytes": 102400}
NamingConvention Name pattern matching {"pattern": "^[A-Z][a-zA-Z0-9]*$"}
FieldRequired Required fields in records {"fields": ["id", "timestamp"]}
FieldType Field type constraints {"field": "id", "types": ["long", "string"]}
Regex Custom regex validation {"pattern": ".*Event$", "target": "name"}
JsonSchema Validate against JSON Schema {"schema": {...}}
use rivven_schema::{ValidationRule, ValidationRuleType, ValidationLevel};

// Create a validation rule
let rule = ValidationRule::new(
    "max-schema-size",
    ValidationRuleType::MaxSize,
    r#"{"max_bytes": 102400}"#
)
.with_level(ValidationLevel::Error)
.with_description("Schemas must be under 100KB")
.for_subjects(vec!["user-*".to_string()]);  // Only apply to user-* subjects

// Add to registry
registry.add_validation_rule(rule);

// Validate a schema without registering
let report = registry.validate_schema(
    SchemaType::Avro,
    "user-value",
    large_schema
)?;

if !report.is_valid() {
    for error in report.error_messages() {
        println!("Validation error: {}", error);
    }
}

Pre-built Rule Presets:

use rivven_schema::validation::presets;

// Production-ready ruleset
let engine = presets::production_ruleset();

// Rules included:
// - max_bytes: 100KB limit
// - naming: PascalCase for record names
// - required fields: id, timestamp

REST API:

# List validation rules
GET /config/validation/rules

# Add a validation rule
POST /config/validation/rules
Content-Type: application/json
{
  "name": "max-size",
  "rule_type": "MaxSize",
  "config": {"max_bytes": 102400},
  "level": "ERROR",
  "description": "Schemas must be under 100KB"
}

# Validate schema without registering
POST /subjects/user-value/validate
Content-Type: application/json
{"schema": "{...}", "schemaType": "AVRO"}

# Delete a rule
DELETE /config/validation/rules/max-size

Prometheus Metrics

Enable Prometheus metrics for monitoring:

# Build with metrics support
cargo build -p rivven-schema --features metrics
use rivven_schema::{SchemaRegistry, RegistryConfig, MetricsConfig};

// Create registry with metrics
let config = RegistryConfig::memory();
let metrics_config = MetricsConfig::default()
    .with_prefix("rivven_schema")
    .with_buckets(vec![0.001, 0.01, 0.1, 0.5, 1.0]);

let registry = SchemaRegistry::with_metrics(config, metrics_config).await?;

// Access metrics for custom integration
if let Some(metrics) = registry.metrics() {
    // Metrics are automatically recorded
    // Access Prometheus registry if needed
}

Available Metrics:

Metric Type Description
schemas_registered_total Counter Total schemas registered
schemas_lookups_total Counter Total schema lookups (by_id/by_version)
compatibility_checks_total Counter Compatibility checks (pass/fail)
validation_checks_total Counter Validation rule checks
errors_total Counter Errors by type
version_state_changes_total Counter State transitions
schemas_count Gauge Current schema count
subjects_count Gauge Current subject count
versions_count Gauge Current version count
operation_duration_seconds Histogram Operation latency
schema_size_bytes Histogram Schema sizes

Prometheus Endpoint:

# Start server with metrics
rivven-schema serve --port 8081 --metrics-port 9090

# Scrape metrics
curl http://localhost:9090/metrics

Grafana Dashboard:

# prometheus.yml
scrape_configs:
  - job_name: 'rivven-schema'
    static_configs:
      - targets: ['localhost:9090']

REST API Reference

The Schema Registry provides an industry-standard REST API with additional enterprise endpoints.

Core Endpoints

Method Endpoint Description
GET / API information
GET /health Health check
GET /ready Readiness check
GET /live Liveness check
GET /subjects List all subjects
GET /subjects/:subject Get subject info
DELETE /subjects/:subject Delete subject (soft delete)
GET /subjects/:subject/versions List versions for subject
POST /subjects/:subject/versions Register new schema
GET /subjects/:subject/versions/:version Get schema by version
DELETE /subjects/:subject/versions/:version Delete specific version
GET /subjects/:subject/versions/:version/schema Get raw schema string
GET /subjects/:subject/versions/:version/referencedby Get referencing schemas
POST /subjects/:subject Check if schema exists
GET /schemas List all schemas
GET /schemas/ids/:id Get schema by ID
GET /schemas/ids/:id/schema Get raw schema by ID
GET /schemas/ids/:id/subjects Get subjects using schema
GET /schemas/ids/:id/versions Get all versions of schema
GET /schemas/types List supported schema types
GET /config Get global config
PUT /config Set global compatibility
GET /config/:subject Get subject config
PUT /config/:subject Set subject compatibility
DELETE /config/:subject Reset subject config
POST /compatibility/subjects/:subject/versions/:version Check compatibility
POST /compatibility/subjects/:subject/versions/latest Check vs latest
GET /mode Get global mode
PUT /mode Set global mode

Version State Endpoints

Method Endpoint Description
GET /subjects/:subject/versions/:version/state Get version state
PUT /subjects/:subject/versions/:version/state Set version state
POST /subjects/:subject/versions/:version/deprecate Mark as deprecated
POST /subjects/:subject/versions/:version/disable Disable version
POST /subjects/:subject/versions/:version/enable Re-enable version

Content Validation Endpoints

Method Endpoint Description
POST /subjects/:subject/validate Validate schema without registering
GET /config/validation/rules List all validation rules
POST /config/validation/rules Add a validation rule
DELETE /config/validation/rules/:name Delete a validation rule

Schema Context Endpoints

Method Endpoint Description
GET /contexts List all contexts
POST /contexts Create a new context
GET /contexts/:context Get context details
DELETE /contexts/:context Delete context (must be empty)
GET /contexts/:context/subjects List subjects in context

Statistics Endpoint

Method Endpoint Description
GET /stats Get registry statistics

Example Stats Response:

{
  "total_schemas": 42,
  "total_subjects": 15,
  "total_versions": 87,
  "schemas_by_type": {
    "AVRO": 35,
    "JSON": 5,
    "PROTOBUF": 2
  }
}

API Reference

AvroSchema

Method Description
parse(json: &str) Parse Avro schema from JSON
name() Get record name
namespace() Get namespace
fullname() Get fully qualified name
fields() List record fields
fingerprint_md5() Get MD5 fingerprint
fingerprint_sha256() Get SHA-256 fingerprint

AvroCodec

Method Description
new(schema) Create codec for schema
encode(json) Encode JSON to Avro bytes
decode(bytes) Decode Avro bytes to JSON
encode_wire_format(json, id) Encode with standard wire format
decode_wire_format(bytes) Decode standard wire format

ProtobufSchema

Method Description
parse(proto: &str) Parse .proto file content
message_name() Get main message name
package() Get package name
fullname() Get fully qualified name
fields() List message fields
fingerprint_md5() Get MD5 fingerprint

ProtobufCodec

Method Description
new(schema) Create codec for schema
encode(json) Encode JSON to Protobuf bytes
decode(bytes) Decode Protobuf bytes to JSON
encode_wire_format(json, id) Encode with standard wire format
decode_wire_format(bytes) Decode standard wire format

For more information, see the Architecture and Connector Development guides.


Back to top

Copyright © 2026 Rivven Contributors. Licensed under the Apache License 2.0.