The Market Data Service is a Go microservice responsible for ingesting raw trade/book events from the matching engine, maintaining canonical market data (order books, candles, tickers, mark/index/funding prices), persisting to ClickHouse, and distributing real-time feeds to clients via a multiplexed WebSocket.
Data Ingestion : Consume trade and order book delta events from Kafka.
Order Book Management : Maintain per-symbol Level 2 (L2) order books in memory, with delta compression and periodic snapshots.
OHLCV Aggregation : Roll up trades into candlesticks for multiple timeframes (1m, 5m, 15m, 1h, 4h, 1d).
Pricing Services : Compute index prices (from external exchanges), mark prices (fair value), and funding rates (perpetuals).
Ticker Computation : Calculate 24h rolling ticker statistics (open, high, low, close, volume, change).
Data Persistence : Store trades and candles in ClickHouse; order book snapshots in PostgreSQL.
WebSocket Distribution : Single multiplexed WebSocket endpoint with subscribe/unsubscribe channels.
REST & gRPC APIs : Snapshot endpoints for clients and internal services.
External Integration : Track spot prices from Binance, Coinbase, Kraken, etc. via ccxt.
graph TB
ME[Matching Engine] -->|engine.event.v1| Kafka[Kafka]
MetaSvc[Metadata Service] -->|instrument events| Kafka
Kafka --> MDS[Market Data Service]
MDS --> Books[In-Memory Order Books]
MDS --> OHLCV[OHLCV Aggregator]
MDS --> Pricing[Pricing Workers]
MDS --> Ticker[Ticker Worker]
ExternalEx[External Exchanges] -->|ccxt| MDS
MDS -->|WebSocket| Clients[WebSocket Clients]
MDS -->|REST| RestClients[REST Clients]
MDS -->|gRPC| InternalSvc[Internal Services]
MDS --> PG[(PostgreSQL)]
MDS --> CH[(ClickHouse)]
MDS --> Redis[(Redis Cache)]
Field Type Description trade_idUUID Unique trade identifier symbolSTRING Instrument symbol priceDECIMAL Trade price quantityDECIMAL Trade quantity taker_sideENUM(buy, sell) Taker’s side maker_order_idUUID Maker’s order ID taker_order_idUUID Taker’s order ID timestampINT64 Epoch milliseconds sequenceINT64 Engine sequence number
Field Type Description priceDECIMAL Price level quantityDECIMAL Total quantity at price (0 = removed)
Field Type Description symbolSTRING Instrument symbol bidsArray of BookLevel Bid levels (descending) asksArray of BookLevel Ask levels (ascending) sequenceINT64 Snapshot sequence timestampINT64 Snapshot time
Field Type Description symbolSTRING Instrument symbol intervalENUM 1m, 5m, 15m, 1h, 4h, 1dopen_timeINT64 Candle open timestamp close_timeINT64 Candle close timestamp openDECIMAL Opening price highDECIMAL Highest price lowDECIMAL Lowest price closeDECIMAL Closing price volumeDECIMAL Base volume quote_volumeDECIMAL Quote volume trade_countINT Number of trades
Field Type Description symbolSTRING Instrument symbol last_priceDECIMAL Last traded price price_changeDECIMAL 24h price change price_change_pctDECIMAL 24h change percentage high_24hDECIMAL 24h high low_24hDECIMAL 24h low volume_24hDECIMAL 24h base volume quote_volume_24hDECIMAL 24h quote volume best_bidDECIMAL Current best bid best_askDECIMAL Current best ask open_interestDECIMAL Open interest (perpetuals)
Field Type Description symbolSTRING Instrument symbol mark_priceDECIMAL Fair value mark price index_priceDECIMAL Underlying index price premiumDECIMAL Basis (mark - index) timestampINT64 Computation time
Field Type Description symbolSTRING Index symbol (e.g., BTCUSDT-INDEX) priceDECIMAL Weighted average price sourcesArray Individual exchange prices and weights timestampINT64 Computation time
Field Type Description symbolSTRING Perpetual symbol funding_rateDECIMAL Current funding rate next_funding_timeINT64 Next settlement time countdown_secondsINT Seconds until next funding mark_priceDECIMAL Current mark price index_priceDECIMAL Current index price
Single multiplexed WebSocket: ws://host/ws
{ "op" : " subscribe " , "channel" : " book " , "symbol" : " BTCUSDT-PERP " }
{ "op" : " unsubscribe " , "channel" : " trades " , "symbol" : " BTCUSDT-PERP " }
{ "op" : " config " , "prec" : 0.5 }
Channel Data Update Frequency bookOrder book deltas (full snapshot on subscribe, then deltas) On every change tradesIndividual trade ticks On every trade ticker24h ticker statistics Every 1 second kline:{interval}OHLCV candles (e.g., kline:1m, kline:5m) On close / in-progress update markMark price updates Every 1 second indexIndex price updates Every 1 second fundingFunding rate with countdown Every 1 second
On subscribe to book: server sends a full snapshot
Subsequent messages are incremental deltas (price levels that changed)
Client reconstructs book by applying deltas to snapshot
Full snapshot re-sent every 30 seconds to correct any drift
Sequence numbers included for gap detection
100 messages/second per client connection
Configurable via WS_RATE_MAX_MSGS_PER_SEC
Method Endpoint Description GET/v1/symbolsList trading symbols GET/v1/trades?symbol={symbol}&limit={n}Get recent trades GET/v1/depth?symbol={symbol}&depth={n}Get order book depth GET/v1/ticker/24h?symbol={symbol}Get 24h ticker GET/v1/klines?symbol={symbol}&interval={i}&start={t}&end={t}Get candlesticks GET/v1/mark?symbol={symbol}Get mark price GET/v1/index?symbol={symbol}Get index price GET/v1/funding?symbol={symbol}Get funding rate
RPC Description Called By GetOrderBook(symbol, depth)Get current order book snapshot Risk Service, MM Service GetMarkPrice(symbol)Get current mark price Position Service, Funding Service ReplayStream(symbol, channel, start_time, end_time)Stream historical data Analytics
Topic Description engine.event.v1Matching engine events (trades, book deltas) engine.snapshot.v1Full order book snapshots engine.bookhash.v1Order book integrity hashes md.instrument.created.v1New instrument notifications md.instrument.updated.v1Instrument metadata changes md.instrument.halt.v1Trading halt events md.instrument.resume.v1Trading resume events
Topic Description md.trades.v1.{symbol}Normalized trade events (per symbol) md.orderbook.delta.v1Order book deltas md.orderbook.snap.v1Order book snapshots md.ohlcv.v1Candlestick updates md.ticker24h.v124h ticker updates md.mark.v1Mark price updates md.index.v1Index price updates md.funding.v1Funding rate updates
Runs on a timer per timeframe (1m, 5m, 15m, 1h, 4h, 1d)
On each tick: close completed candle, start new one
Updates are also triggered in real-time as trades arrive
Persists completed candles to PostgreSQL
Computes mark price: mark = index + EMA(premium)
Computes funding rate: rate = clamp(premium_rate + interest_rate, -0.75%, +0.75%)
Publishes to Kafka and broadcasts via WebSocket
Recalculates 24h rolling statistics every second
Uses sliding window over trade history
Publishes to WebSocket ticker channel
Fetches spot prices from external exchanges via ccxt library
Exchanges: Binance, Coinbase, Kraken, OKX (configurable)
Weighted average with configurable weights per source
Stale source detection: excludes sources that haven’t updated within threshold
Persists raw spot prices to ClickHouse for audit trail
Trades : All trade events with high-cardinality indexing
Raw spot prices : External exchange prices for index computation
Order book snapshots : Historical snapshots for analytics
Optimized for batch writes with buffering
Candles : OHLCV data for all timeframes
Order book snapshots : Latest snapshots for REST queries
Pricing data : Mark, index, and funding rate history
Latest order book : Per-symbol book state for quick REST queries
Latest ticker : 24h ticker cache
Latest prices : Mark, index, funding cache
md_ingest_lag_ms{topic} — Event processing lag
md_seq_gap_total{symbol} — Sequence gaps detected
md_ws_clients — Active WebSocket connections
md_snapshot_latency_ms{symbol} — Snapshot generation latency
md_kafka_messages_consumed_total{topic} — Kafka messages consumed
md_db_query_duration_ms{repository} — Database query duration
md_ohlcv_candles_closed_total{symbol,interval} — Candles closed
md_index_price_source_staleness_ms{exchange} — External source staleness
Spans: md.process_trade, md.process_book_delta, md.compute_mark, md.compute_index, md.broadcast_ws, md.persist_candle
Structured JSON via zap: symbol, channel, event_type, sequence, latency_us, trace_id.
Metric Target Trade-to-WebSocket latency (p99) < 150 ms Order book update latency (p99) < 50 ms Candle accuracy 100% (no missed trades) WebSocket availability >= 99.9% Index price freshness < 2 seconds
Variable Description Default KAFKA_BROKERSKafka broker addresses Required SCHEMA_REGISTRY_URLAvro schema registry Required REDIS_URLRedis connection string Required POSTGRES_URLPostgreSQL connection string Required CLICKHOUSE_URLClickHouse connection string Required HTTP_PORTREST + WebSocket port 8080 GRPC_PORTgRPC server port 50051 WS_RATE_MAX_MSGS_PER_SECWebSocket rate limit per client 100
Language : Go 1.21+
WebSocket : gorilla/websocket
Database : PostgreSQL (SQLC), ClickHouse
Migrations : Atlas
Kafka : confluent-kafka-go with Avro
External Exchanges : ccxt library
Metrics : Prometheus client_golang
Logging : zap structured logging
gRPC : google.golang.org/grpc