Skip to content

Kafka Integration

This guide explains how to integrate with TradeX Kafka events for consuming and producing events.

TradeX uses Apache Kafka for event-driven communication between services. All events use Avro schemas registered in the Schema Registry.

  • Kafka cluster access
  • Schema Registry access
  • Kafka client library for your language
  • Avro schema deserializer
const kafkaConfig = {
brokers: ['kafka-1:29092', 'kafka-2:29092', 'kafka-3:29092'],
clientId: 'your-service-name',
schemaRegistry: {
host: 'http://schema-registry:8081'
}
};
import { Kafka } from 'kafkajs';
import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({
brokers: ['kafka-1:29092'],
clientId: 'my-service'
});
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });
const consumer = kafka.consumer({ groupId: 'my-consumer-group' });
await consumer.subscribe({
topic: 'engine.event.v1',
fromBeginning: false
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Decode Avro message
const decoded = await registry.decode(message.value);
// Process event
if (decoded.data.type === 'TRADE') {
console.log('Trade executed:', decoded.data.trade);
}
}
});
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer({
'bootstrap.servers': 'kafka-1:29092',
'group.id': 'my-consumer-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['engine.event.v1'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
# Deserialize Avro message
decoded = avro_deserializer(msg.value(), None)
# Process event
if decoded['data']['type'] == 'TRADE':
print('Trade executed:', decoded['data']['trade'])
import (
"github.com/Shopify/sarama"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
consumer, err := sarama.NewConsumerGroup(
[]string{"kafka-1:29092"},
"my-consumer-group",
config,
)
// Implement sarama.ConsumerGroupHandler
const producer = kafka.producer();
await producer.connect();
// Get schema ID
const schemaId = await registry.getLatestSchemaId('engine.event.v1-value');
// Create message
const message = {
event_id: uuidv7(),
event_type: 'engine.event.v1',
source_service: 'my-service',
trace_id: getTraceId(),
timestamp: new Date().toISOString(),
version: 'v1',
data: {
// event data
}
};
// Encode and send
const encoded = await registry.encode(schemaId, message);
await producer.send({
topic: 'engine.event.v1',
messages: [{
key: message.data.symbol,
value: encoded
}]
});
from confluent_kafka import Producer
from confluent_kafka.schema_registry.avro import AvroSerializer
avro_serializer = AvroSerializer(schema_registry_client, schema_str)
producer = Producer({'bootstrap.servers': 'kafka-1:29092'})
message = {
"event_id": str(uuid.uuid4()),
"event_type": "engine.event.v1",
"source_service": "my-service",
"timestamp": datetime.utcnow().isoformat(),
"version": "v1",
"data": {
# event data
}
}
# Serialize and send
serialized = avro_serializer(message, None)
producer.produce('engine.event.v1', value=serialized, key=message['data']['symbol'])
producer.flush()
  • Unique Group IDs: Each service should use a unique consumer group
  • Group Coordination: Multiple instances of the same service share a group
  • Offset Management: Commit offsets after successful processing
  • Error Handling: Implement retry logic and dead letter queues
const consumer = kafka.consumer({
groupId: 'order-service', // Unique per service
sessionTimeout: 30000,
heartbeatInterval: 3000
});
async function processWithRetry(message, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
await processMessage(message);
return;
} catch (error) {
if (i === maxRetries - 1) {
// Send to dead letter queue
await sendToDLQ(message, error);
} else {
await sleep(1000 * Math.pow(2, i)); // Exponential backoff
}
}
}
}
async function sendToDLQ(message, error) {
await producer.send({
topic: `${originalTopic}.dlq`,
messages: [{
value: JSON.stringify({
originalMessage: message,
error: error.message,
timestamp: new Date().toISOString()
})
}]
});
}
const processedEvents = new Set();
async function processMessage(message) {
const eventId = message.event_id;
// Check if already processed
if (processedEvents.has(eventId)) {
console.log('Duplicate event, skipping:', eventId);
return;
}
// Process event
await handleEvent(message);
// Mark as processed
processedEvents.add(eventId);
}
  • Lag: Consumer lag per partition
  • Throughput: Messages processed per second
  • Errors: Error rate and types
  • Latency: Processing latency
const metrics = {
messagesProcessed: 0,
errors: 0,
lastProcessedTime: Date.now()
};
// Track metrics
metrics.messagesProcessed++;
if (error) metrics.errors++;
// Export to Prometheus
prometheus.register.registerMetric({
name: 'kafka_messages_processed_total',
help: 'Total messages processed',
value: metrics.messagesProcessed
});