Kafka Integration
This guide explains how to integrate with TradeX Kafka events for consuming and producing events.
Overview
Section titled “Overview”TradeX uses Apache Kafka for event-driven communication between services. All events use Avro schemas registered in the Schema Registry.
Prerequisites
Section titled “Prerequisites”- Kafka cluster access
- Schema Registry access
- Kafka client library for your language
- Avro schema deserializer
Configuration
Section titled “Configuration”Connection Settings
Section titled “Connection Settings”const kafkaConfig = { brokers: ['kafka-1:29092', 'kafka-2:29092', 'kafka-3:29092'], clientId: 'your-service-name', schemaRegistry: { host: 'http://schema-registry:8081' }};Consuming Events
Section titled “Consuming Events”TypeScript/Node.js
Section titled “TypeScript/Node.js”import { Kafka } from 'kafkajs';import { SchemaRegistry } from '@kafkajs/confluent-schema-registry';
const kafka = new Kafka({ brokers: ['kafka-1:29092'], clientId: 'my-service'});
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });const consumer = kafka.consumer({ groupId: 'my-consumer-group' });
await consumer.subscribe({ topic: 'engine.event.v1', fromBeginning: false});
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { // Decode Avro message const decoded = await registry.decode(message.value);
// Process event if (decoded.data.type === 'TRADE') { console.log('Trade executed:', decoded.data.trade); } }});Python
Section titled “Python”from confluent_kafka import Consumerfrom confluent_kafka.schema_registry import SchemaRegistryClientfrom confluent_kafka.schema_registry.avro import AvroDeserializer
schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer({ 'bootstrap.servers': 'kafka-1:29092', 'group.id': 'my-consumer-group', 'auto.offset.reset': 'earliest'})
consumer.subscribe(['engine.event.v1'])
while True: msg = consumer.poll(1.0) if msg is None: continue
# Deserialize Avro message decoded = avro_deserializer(msg.value(), None)
# Process event if decoded['data']['type'] == 'TRADE': print('Trade executed:', decoded['data']['trade'])import ( "github.com/Shopify/sarama" "github.com/confluentinc/confluent-kafka-go/kafka")
config := sarama.NewConfig()config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()config.Consumer.Offsets.Initial = sarama.OffsetNewest
consumer, err := sarama.NewConsumerGroup( []string{"kafka-1:29092"}, "my-consumer-group", config,)
// Implement sarama.ConsumerGroupHandlerProducing Events
Section titled “Producing Events”TypeScript/Node.js
Section titled “TypeScript/Node.js”const producer = kafka.producer();
await producer.connect();
// Get schema IDconst schemaId = await registry.getLatestSchemaId('engine.event.v1-value');
// Create messageconst message = { event_id: uuidv7(), event_type: 'engine.event.v1', source_service: 'my-service', trace_id: getTraceId(), timestamp: new Date().toISOString(), version: 'v1', data: { // event data }};
// Encode and sendconst encoded = await registry.encode(schemaId, message);await producer.send({ topic: 'engine.event.v1', messages: [{ key: message.data.symbol, value: encoded }]});Python
Section titled “Python”from confluent_kafka import Producerfrom confluent_kafka.schema_registry.avro import AvroSerializer
avro_serializer = AvroSerializer(schema_registry_client, schema_str)
producer = Producer({'bootstrap.servers': 'kafka-1:29092'})
message = { "event_id": str(uuid.uuid4()), "event_type": "engine.event.v1", "source_service": "my-service", "timestamp": datetime.utcnow().isoformat(), "version": "v1", "data": { # event data }}
# Serialize and sendserialized = avro_serializer(message, None)producer.produce('engine.event.v1', value=serialized, key=message['data']['symbol'])producer.flush()Consumer Groups
Section titled “Consumer Groups”Best Practices
Section titled “Best Practices”- Unique Group IDs: Each service should use a unique consumer group
- Group Coordination: Multiple instances of the same service share a group
- Offset Management: Commit offsets after successful processing
- Error Handling: Implement retry logic and dead letter queues
Example Configuration
Section titled “Example Configuration”const consumer = kafka.consumer({ groupId: 'order-service', // Unique per service sessionTimeout: 30000, heartbeatInterval: 3000});Error Handling
Section titled “Error Handling”Retry Logic
Section titled “Retry Logic”async function processWithRetry(message, maxRetries = 3) { for (let i = 0; i < maxRetries; i++) { try { await processMessage(message); return; } catch (error) { if (i === maxRetries - 1) { // Send to dead letter queue await sendToDLQ(message, error); } else { await sleep(1000 * Math.pow(2, i)); // Exponential backoff } } }}Dead Letter Queue
Section titled “Dead Letter Queue”async function sendToDLQ(message, error) { await producer.send({ topic: `${originalTopic}.dlq`, messages: [{ value: JSON.stringify({ originalMessage: message, error: error.message, timestamp: new Date().toISOString() }) }] });}Idempotency
Section titled “Idempotency”Handling Duplicate Events
Section titled “Handling Duplicate Events”const processedEvents = new Set();
async function processMessage(message) { const eventId = message.event_id;
// Check if already processed if (processedEvents.has(eventId)) { console.log('Duplicate event, skipping:', eventId); return; }
// Process event await handleEvent(message);
// Mark as processed processedEvents.add(eventId);}Monitoring
Section titled “Monitoring”Metrics to Track
Section titled “Metrics to Track”- Lag: Consumer lag per partition
- Throughput: Messages processed per second
- Errors: Error rate and types
- Latency: Processing latency
Example Monitoring
Section titled “Example Monitoring”const metrics = { messagesProcessed: 0, errors: 0, lastProcessedTime: Date.now()};
// Track metricsmetrics.messagesProcessed++;if (error) metrics.errors++;
// Export to Prometheusprometheus.register.registerMetric({ name: 'kafka_messages_processed_total', help: 'Total messages processed', value: metrics.messagesProcessed});