Schema Registry
This guide explains how to use the Schema Registry for managing Avro schemas in TradeX Kafka events.
Overview
Section titled “Overview”The Schema Registry provides centralized schema management for Kafka events, ensuring type safety and schema evolution compatibility.
Access
Section titled “Access”- URL:
http://localhost:8085(local) orhttp://schema-registry:8081(Docker) - UI: View-only access via Kafka UI at
http://localhost:8080
Registering Schemas
Section titled “Registering Schemas”Workflow
Section titled “Workflow”- Edit schema file in
shared/kafka-schema/ - Commit to git
- Run push script:
./shared/scripts/kafka/schema/push.sh <file> <subject> - Verify in UI
Example
Section titled “Example”./shared/scripts/kafka/schema/push.sh \ shared/kafka-schema/engine/engine.event.v1.avsc \ engine.event.v1-valueSubject Naming
Section titled “Subject Naming”Format: {topic-name}-value
engine.event.v1-value→ schema forengine.event.v1topicorders.created-value→ schema fororders.createdtopic
The -value suffix indicates this schema applies to message values (not keys).
Viewing Schemas
Section titled “Viewing Schemas”Kafka UI
Section titled “Kafka UI”- Open
http://localhost:8080 - Click Schema Registry in sidebar
- Click a subject to view versions and details
Note: UI is view-only. Do not register schemas via UI.
API Commands
Section titled “API Commands”# List all subjectscurl http://localhost:8085/subjects | jq .
# Get latest schemacurl http://localhost:8085/subjects/engine.event.v1-value/versions/latest | jq .
# Get all versionscurl http://localhost:8085/subjects/engine.event.v1-value/versions | jq .Using Schemas in Code
Section titled “Using Schemas in Code”TypeScript/Node.js
Section titled “TypeScript/Node.js”Producer:
import { Kafka } from 'kafkajs';import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({ brokers: ['localhost:9092'] });const producer = kafka.producer();const registry = new SchemaRegistry({ host: 'http://localhost:8085' });
// Get schema IDconst schemaId = await registry.getLatestSchemaId('engine.event.v1-value');
// Create message matching schemaconst message = { event_id: uuidv7(), event_type: "engine.event.v1", timestamp: new Date().toISOString(), data: { /* ... */ }};
// Encode and sendconst encoded = await registry.encode(schemaId, message);await producer.send({ topic: 'engine.event.v1', messages: [{ value: encoded }]});Consumer:
const consumer = kafka.consumer({ groupId: 'my-group' });await consumer.subscribe({ topic: 'engine.event.v1' });
await consumer.run({ eachMessage: async ({ message }) => { // Decode the encoded message const decoded = await registry.decode(message.value); console.log(decoded); }});Python
Section titled “Python”Producer:
from confluent_kafka import Producerfrom confluent_kafka.schema_registry import SchemaRegistryClientfrom confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8085'})avro_serializer = AvroSerializer(schema_registry_client, schema_str)
producer = Producer({'bootstrap.servers': 'localhost:9092'})
message = { "event_id": str(uuid.uuid4()), "event_type": "engine.event.v1", "timestamp": datetime.utcnow().isoformat(), "data": { /* ... */ }}
# Serialize and sendserialized = avro_serializer(message, None)producer.produce('engine.event.v1', value=serialized)producer.flush()Consumer:
from confluent_kafka import Consumerfrom confluent_kafka.schema_registry.avro import AvroDeserializer
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)
consumer = Consumer({ 'bootstrap.servers': 'localhost:9092', 'group.id': 'my-group'})consumer.subscribe(['engine.event.v1'])
while True: msg = consumer.poll(1.0) if msg is None: continue
# Deserialize the message decoded = avro_deserializer(msg.value(), None) print(decoded)Schema Evolution
Section titled “Schema Evolution”Compatibility Modes
Section titled “Compatibility Modes”| Mode | Use Case |
|---|---|
| BACKWARD | Adding optional fields (default) |
| FULL | Production (strictest) |
| NONE | Development only |
BACKWARD means new consumers can read messages written with old schemas.
Safe Changes (BACKWARD compatible)
Section titled “Safe Changes (BACKWARD compatible)”- Adding optional fields with defaults
- Removing optional fields
- Changing documentation
Breaking Changes (NOT allowed with BACKWARD mode)
Section titled “Breaking Changes (NOT allowed with BACKWARD mode)”- Removing required fields
- Changing field types
- Adding required fields
Example: Adding a Field
Section titled “Example: Adding a Field”Original Schema:
{ "type": "record", "name": "Trade", "fields": [ { "name": "id", "type": "string" }, { "name": "price", "type": "double" } ]}Updated Schema (BACKWARD compatible):
{ "type": "record", "name": "Trade", "fields": [ { "name": "id", "type": "string" }, { "name": "price", "type": "double" }, { "name": "volume", "type": "double", "default": 0.0 } ]}The default: 0.0 makes this field optional, so old consumers can still read messages.
Best Practices
Section titled “Best Practices”- Always commit schema files to git before registering
- Use descriptive names:
engine.event.v1notevent1 - Include version in topic:
engine.event.v1→engine.event.v2for breaking changes - Add
docfields for documentation - Provide defaults for optional fields:
"default": null
Troubleshooting
Section titled “Troubleshooting””Invalid schema”
Section titled “”Invalid schema””- Validate JSON:
jq . your-schema.avsc - Check Avro syntax
”Schema incompatible”
Section titled “”Schema incompatible””- Check compatibility:
curl http://localhost:8085/config/{subject} - Temporarily set to
NONEfor testing
Schema Registry not accessible
Section titled “Schema Registry not accessible”docker ps | grep schema-registrydocker logs tradex-schema-registry-devdocker-compose -f infra/docker-compose.yml restart schema-registryQuick Reference
Section titled “Quick Reference”| Task | Command |
|---|---|
| Register schema | ./shared/scripts/kafka/schema/push.sh <file> <subject> |
| List subjects | curl http://localhost:8085/subjects |
| View UI | http://localhost:8080 → Schema Registry |