Skip to content

Schema Registry

This guide explains how to use the Schema Registry for managing Avro schemas in TradeX Kafka events.

The Schema Registry provides centralized schema management for Kafka events, ensuring type safety and schema evolution compatibility.

  • URL: http://localhost:8085 (local) or http://schema-registry:8081 (Docker)
  • UI: View-only access via Kafka UI at http://localhost:8080
  1. Edit schema file in shared/kafka-schema/
  2. Commit to git
  3. Run push script: ./shared/scripts/kafka/schema/push.sh <file> <subject>
  4. Verify in UI
Terminal window
./shared/scripts/kafka/schema/push.sh \
shared/kafka-schema/engine/engine.event.v1.avsc \
engine.event.v1-value

Format: {topic-name}-value

  • engine.event.v1-value → schema for engine.event.v1 topic
  • orders.created-value → schema for orders.created topic

The -value suffix indicates this schema applies to message values (not keys).

  1. Open http://localhost:8080
  2. Click Schema Registry in sidebar
  3. Click a subject to view versions and details

Note: UI is view-only. Do not register schemas via UI.

Terminal window
# List all subjects
curl http://localhost:8085/subjects | jq .
# Get latest schema
curl http://localhost:8085/subjects/engine.event.v1-value/versions/latest | jq .
# Get all versions
curl http://localhost:8085/subjects/engine.event.v1-value/versions | jq .

Producer:

import { Kafka } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
const registry = new SchemaRegistry({ host: 'http://localhost:8085' });
// Get schema ID
const schemaId = await registry.getLatestSchemaId('engine.event.v1-value');
// Create message matching schema
const message = {
event_id: uuidv7(),
event_type: "engine.event.v1",
timestamp: new Date().toISOString(),
data: { /* ... */ }
};
// Encode and send
const encoded = await registry.encode(schemaId, message);
await producer.send({
topic: 'engine.event.v1',
messages: [{ value: encoded }]
});

Consumer:

const consumer = kafka.consumer({ groupId: 'my-group' });
await consumer.subscribe({ topic: 'engine.event.v1' });
await consumer.run({
eachMessage: async ({ message }) => {
// Decode the encoded message
const decoded = await registry.decode(message.value);
console.log(decoded);
}
});

Producer:

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8085'})
avro_serializer = AvroSerializer(schema_registry_client, schema_str)
producer = Producer({'bootstrap.servers': 'localhost:9092'})
message = {
"event_id": str(uuid.uuid4()),
"event_type": "engine.event.v1",
"timestamp": datetime.utcnow().isoformat(),
"data": { /* ... */ }
}
# Serialize and send
serialized = avro_serializer(message, None)
producer.produce('engine.event.v1', value=serialized)
producer.flush()

Consumer:

from confluent_kafka import Consumer
from confluent_kafka.schema_registry.avro import AvroDeserializer
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group'
})
consumer.subscribe(['engine.event.v1'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
# Deserialize the message
decoded = avro_deserializer(msg.value(), None)
print(decoded)
ModeUse Case
BACKWARDAdding optional fields (default)
FULLProduction (strictest)
NONEDevelopment only

BACKWARD means new consumers can read messages written with old schemas.

  • Adding optional fields with defaults
  • Removing optional fields
  • Changing documentation

Breaking Changes (NOT allowed with BACKWARD mode)

Section titled “Breaking Changes (NOT allowed with BACKWARD mode)”
  • Removing required fields
  • Changing field types
  • Adding required fields

Original Schema:

{
"type": "record",
"name": "Trade",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "price", "type": "double" }
]
}

Updated Schema (BACKWARD compatible):

{
"type": "record",
"name": "Trade",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "price", "type": "double" },
{ "name": "volume", "type": "double", "default": 0.0 }
]
}

The default: 0.0 makes this field optional, so old consumers can still read messages.

  1. Always commit schema files to git before registering
  2. Use descriptive names: engine.event.v1 not event1
  3. Include version in topic: engine.event.v1engine.event.v2 for breaking changes
  4. Add doc fields for documentation
  5. Provide defaults for optional fields: "default": null
  • Validate JSON: jq . your-schema.avsc
  • Check Avro syntax
  • Check compatibility: curl http://localhost:8085/config/{subject}
  • Temporarily set to NONE for testing
Terminal window
docker ps | grep schema-registry
docker logs tradex-schema-registry-dev
docker-compose -f infra/docker-compose.yml restart schema-registry
TaskCommand
Register schema./shared/scripts/kafka/schema/push.sh <file> <subject>
List subjectscurl http://localhost:8085/subjects
View UIhttp://localhost:8080 → Schema Registry