Handling Events
This guide covers common patterns for handling WebSocket events from the Trading API. You'll learn how to:
- Route events by type to dedicated handlers
- Process market data (candle and quote events)
- React to order and position updates
- Handle errors gracefully with logging and recovery
- Implement reconnection logic for connection failures
Event Routing Pattern
All events from the Trading API include a type field that identifies the event category. The recommended pattern is to route events to dedicated handlers based on this type.
Python
import asyncio
import json
import logging
import uuid
from datetime import datetime
import websockets
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TradingStrategy:
def __init__(self, ws_url: str):
self.ws_url = ws_url
self.ws = None
self.running = False
async def connect(self):
try:
self.ws = await websockets.connect(self.ws_url)
self.running = True
logger.info("Connected to Trading API")
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
return False
async def handle_candle(self, event: dict):
"""Handle candle/bar events for technical analysis."""
bar = event.get("bar", {})
symbol = bar.get("symbol")
close = bar.get("close")
volume = bar.get("volume")
logger.info(f"Candle: {symbol} close={close} volume={volume}")
async def handle_quote(self, event: dict):
"""Handle quote events for real-time pricing."""
quote = event.get("quote", {})
symbol = quote.get("symbol")
bid = quote.get("bid")
ask = quote.get("ask")
spread = float(ask) - float(bid) if bid and ask else None
logger.info(f"Quote: {symbol} bid={bid} ask={ask} spread={spread}")
async def handle_order(self, event: dict):
"""Handle order state changes."""
order_event = event.get("event")
order = event.get("order", {})
order_id = order.get("id")
status = order.get("status")
logger.info(f"Order {order_event}: {order_id} status={status}")
if order_event == "ORDER_FILLED":
fill_price = order.get("average_fill_price")
logger.info(f"Order {order_id} filled at {fill_price}")
elif order_event == "ORDER_REJECTED":
logger.warning(f"Order {order_id} was rejected")
async def handle_position(self, event: dict):
"""Handle position state changes."""
position_event = event.get("event")
position = event.get("position", {})
symbol = position.get("symbol")
quantity = position.get("quantity")
pnl = position.get("unrealized_pnl")
logger.info(f"Position {position_event}: {symbol} qty={quantity} pnl={pnl}")
async def handle_error(self, event: dict):
"""Handle error events from the server."""
code = event.get("code")
message = event.get("message")
logger.error(f"Server error [{code}]: {message}")
async def handle_connection(self, event: dict):
"""Handle connection state changes."""
conn_event = event.get("event")
if conn_event == "DISCONNECTING":
error = event.get("error")
logger.warning(f"Disconnecting: {error}")
elif conn_event == "RECONNECTING":
logger.info("Attempting to reconnect...")
async def route_event(self, event: dict):
"""Route events to appropriate handlers based on type."""
event_type = event.get("type")
handlers = {
"candle": self.handle_candle,
"quote": self.handle_quote,
"order": self.handle_order,
"position": self.handle_position,
"error": self.handle_error,
"connection": self.handle_connection,
}
handler = handlers.get(event_type)
if handler:
try:
await handler(event)
except Exception as e:
logger.error(f"Error handling {event_type} event: {e}")
else:
logger.debug(f"Unhandled event type: {event_type}")
async def send_ack(self, event_id: str):
"""Send event acknowledgment."""
ack = {
"type": "event_ack",
"correlation_id": str(uuid.uuid4()),
"events_processed": [event_id],
"timestamp": int(datetime.now().timestamp() * 1000)
}
await self.ws.send(json.dumps(ack))
async def run(self):
"""Main event loop with error handling and recovery."""
while self.running:
try:
message = await self.ws.recv()
event = json.loads(message)
# Handle ping/pong
if event.get("type") == "ping":
await self.ws.send(json.dumps({"type": "pong"}))
continue
# Route to handler
await self.route_event(event)
# Acknowledge event
event_id = event.get("id", "unknown")
await self.send_ack(event_id)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse message: {e}")
except websockets.ConnectionClosed:
logger.warning("Connection closed, attempting reconnect...")
await self.reconnect()
except Exception as e:
logger.error(f"Unexpected error: {e}")
async def reconnect(self):
"""Reconnect with exponential backoff."""
delay = 1
max_delay = 60
while self.running:
logger.info(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)
if await self.connect():
return
delay = min(delay * 2, max_delay)
async def main():
strategy = TradingStrategy("ws://localhost:8082/ws")
if await strategy.connect():
await strategy.run()
if __name__ == "__main__":
asyncio.run(main())
JavaScript
const WebSocket = require('ws');
const { v4: uuidv4 } = require('uuid');
class TradingStrategy {
constructor(wsUrl) {
this.wsUrl = wsUrl;
this.ws = null;
this.running = false;
this.reconnectDelay = 1000;
this.maxReconnectDelay = 60000;
}
connect() {
return new Promise((resolve, reject) => {
try {
this.ws = new WebSocket(this.wsUrl);
this.ws.onopen = () => {
console.log('Connected to Trading API');
this.running = true;
this.reconnectDelay = 1000;
resolve(true);
};
this.ws.onmessage = (msg) => this.onMessage(msg);
this.ws.onerror = (err) => this.onError(err);
this.ws.onclose = () => this.onClose();
} catch (error) {
console.error('Connection failed:', error);
reject(error);
}
});
}
onMessage(msg) {
try {
const event = JSON.parse(msg.data);
// Handle ping/pong
if (event.type === 'ping') {
this.ws.send(JSON.stringify({ type: 'pong' }));
return;
}
// Route to handler
this.routeEvent(event);
// Acknowledge event
this.sendAck(event.id || 'unknown');
} catch (error) {
console.error('Failed to parse message:', error);
}
}
routeEvent(event) {
const handlers = {
candle: (e) => this.handleCandle(e),
quote: (e) => this.handleQuote(e),
order: (e) => this.handleOrder(e),
position: (e) => this.handlePosition(e),
error: (e) => this.handleError(e),
connection: (e) => this.handleConnection(e),
};
const handler = handlers[event.type];
if (handler) {
try {
handler(event);
} catch (error) {
console.error(`Error handling ${event.type} event:`, error);
}
} else {
console.debug(`Unhandled event type: ${event.type}`);
}
}
handleCandle(event) {
const bar = event.bar || {};
console.log(`Candle: ${bar.symbol} close=${bar.close} volume=${bar.volume}`);
}
handleQuote(event) {
const quote = event.quote || {};
const spread = quote.bid && quote.ask
? (parseFloat(quote.ask) - parseFloat(quote.bid)).toFixed(5)
: null;
console.log(`Quote: ${quote.symbol} bid=${quote.bid} ask=${quote.ask} spread=${spread}`);
}
handleOrder(event) {
const order = event.order || {};
console.log(`Order ${event.event}: ${order.id} status=${order.status}`);
switch (event.event) {
case 'ORDER_FILLED':
console.log(`Order ${order.id} filled at ${order.average_fill_price}`);
break;
case 'ORDER_REJECTED':
console.warn(`Order ${order.id} was rejected`);
break;
}
}
handlePosition(event) {
const position = event.position || {};
console.log(`Position ${event.event}: ${position.symbol} qty=${position.quantity} pnl=${position.unrealized_pnl}`);
}
handleError(event) {
console.error(`Server error [${event.code}]: ${event.message}`);
}
handleConnection(event) {
switch (event.event) {
case 'DISCONNECTING':
console.warn(`Disconnecting: ${event.error}`);
break;
case 'RECONNECTING':
console.log('Attempting to reconnect...');
break;
}
}
sendAck(eventId) {
const ack = {
type: 'event_ack',
correlation_id: uuidv4(),
events_processed: [eventId],
timestamp: Date.now(),
};
this.ws.send(JSON.stringify(ack));
}
onError(err) {
console.error('WebSocket error:', err.message);
}
onClose() {
console.warn('Connection closed');
if (this.running) {
this.reconnect();
}
}
async reconnect() {
console.log(`Reconnecting in ${this.reconnectDelay / 1000}s...`);
await new Promise((r) => setTimeout(r, this.reconnectDelay));
try {
await this.connect();
} catch (error) {
this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
this.reconnect();
}
}
stop() {
this.running = false;
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const strategy = new TradingStrategy('ws://localhost:8082/ws');
strategy.connect().catch(console.error);
Java
import javax.websocket.*;
import java.net.URI;
import java.util.UUID;
import java.util.logging.Logger;
import java.util.logging.Level;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@ClientEndpoint
public class TradingStrategy {
private static final Logger logger = Logger.getLogger(TradingStrategy.class.getName());
private static final Gson gson = new Gson();
private Session session;
private final String wsUrl;
private volatile boolean running = true;
private int reconnectDelay = 1000;
private static final int MAX_RECONNECT_DELAY = 60000;
public TradingStrategy(String wsUrl) {
this.wsUrl = wsUrl;
}
@OnOpen
public void onOpen(Session session) {
this.session = session;
this.reconnectDelay = 1000;
logger.info("Connected to Trading API");
}
@OnMessage
public void onMessage(String message) {
try {
JsonObject event = gson.fromJson(message, JsonObject.class);
String type = event.get("type").getAsString();
// Handle ping/pong
if ("ping".equals(type)) {
session.getBasicRemote().sendText("{\"type\":\"pong\"}");
return;
}
// Route to handler
routeEvent(event, type);
// Acknowledge event
String eventId = event.has("id") ? event.get("id").getAsString() : "unknown";
sendAck(eventId);
} catch (Exception e) {
logger.log(Level.SEVERE, "Failed to process message", e);
}
}
private void routeEvent(JsonObject event, String type) {
try {
switch (type) {
case "candle":
handleCandle(event);
break;
case "quote":
handleQuote(event);
break;
case "order":
handleOrder(event);
break;
case "position":
handlePosition(event);
break;
case "error":
handleError(event);
break;
case "connection":
handleConnection(event);
break;
default:
logger.fine("Unhandled event type: " + type);
}
} catch (Exception e) {
logger.log(Level.WARNING, "Error handling " + type + " event", e);
}
}
private void handleCandle(JsonObject event) {
JsonObject bar = event.getAsJsonObject("bar");
String symbol = bar.get("symbol").getAsString();
String close = bar.get("close").getAsString();
String volume = bar.get("volume").getAsString();
logger.info(String.format("Candle: %s close=%s volume=%s", symbol, close, volume));
}
private void handleQuote(JsonObject event) {
JsonObject quote = event.getAsJsonObject("quote");
String symbol = quote.get("symbol").getAsString();
String bid = quote.get("bid").getAsString();
String ask = quote.get("ask").getAsString();
double spread = Double.parseDouble(ask) - Double.parseDouble(bid);
logger.info(String.format("Quote: %s bid=%s ask=%s spread=%.5f", symbol, bid, ask, spread));
}
private void handleOrder(JsonObject event) {
String orderEvent = event.get("event").getAsString();
JsonObject order = event.getAsJsonObject("order");
String orderId = order.get("id").getAsString();
String status = order.get("status").getAsString();
logger.info(String.format("Order %s: %s status=%s", orderEvent, orderId, status));
switch (orderEvent) {
case "ORDER_FILLED":
String fillPrice = order.get("average_fill_price").getAsString();
logger.info(String.format("Order %s filled at %s", orderId, fillPrice));
break;
case "ORDER_REJECTED":
logger.warning(String.format("Order %s was rejected", orderId));
break;
}
}
private void handlePosition(JsonObject event) {
String positionEvent = event.get("event").getAsString();
JsonObject position = event.getAsJsonObject("position");
String symbol = position.get("symbol").getAsString();
String quantity = position.get("quantity").getAsString();
String pnl = position.get("unrealized_pnl").getAsString();
logger.info(String.format("Position %s: %s qty=%s pnl=%s",
positionEvent, symbol, quantity, pnl));
}
private void handleError(JsonObject event) {
String code = event.get("code").getAsString();
String message = event.get("message").getAsString();
logger.severe(String.format("Server error [%s]: %s", code, message));
}
private void handleConnection(JsonObject event) {
String connEvent = event.get("event").getAsString();
switch (connEvent) {
case "DISCONNECTING":
String error = event.has("error") ? event.get("error").getAsString() : "unknown";
logger.warning("Disconnecting: " + error);
break;
case "RECONNECTING":
logger.info("Attempting to reconnect...");
break;
}
}
private void sendAck(String eventId) throws Exception {
JsonObject ack = new JsonObject();
ack.addProperty("type", "event_ack");
ack.addProperty("correlation_id", UUID.randomUUID().toString());
ack.add("events_processed", gson.toJsonTree(new String[]{eventId}));
ack.addProperty("timestamp", System.currentTimeMillis());
session.getBasicRemote().sendText(gson.toJson(ack));
}
@OnClose
public void onClose() {
logger.warning("Connection closed");
if (running) {
reconnect();
}
}
@OnError
public void onError(Throwable error) {
logger.log(Level.SEVERE, "WebSocket error", error);
}
private void reconnect() {
while (running) {
try {
logger.info(String.format("Reconnecting in %ds...", reconnectDelay / 1000));
Thread.sleep(reconnectDelay);
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, new URI(wsUrl));
return;
} catch (Exception e) {
logger.log(Level.WARNING, "Reconnect failed", e);
reconnectDelay = Math.min(reconnectDelay * 2, MAX_RECONNECT_DELAY);
}
}
}
public void stop() {
running = false;
try {
if (session != null) {
session.close();
}
} catch (Exception e) {
logger.log(Level.WARNING, "Error closing session", e);
}
}
public static void main(String[] args) throws Exception {
TradingStrategy strategy = new TradingStrategy("ws://localhost:8082/ws");
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(strategy, new URI(strategy.wsUrl));
Thread.sleep(Long.MAX_VALUE);
}
}
Rust
use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{info, warn, error, debug};
#[derive(Serialize)]
struct EventAck {
#[serde(rename = "type")]
msg_type: String,
correlation_id: String,
events_processed: Vec<String>,
timestamp: u64,
}
#[derive(Serialize)]
struct Pong {
#[serde(rename = "type")]
msg_type: String,
}
struct TradingStrategy {
ws_url: String,
}
impl TradingStrategy {
fn new(ws_url: &str) -> Self {
Self {
ws_url: ws_url.to_string(),
}
}
async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let mut reconnect_delay = Duration::from_secs(1);
let max_delay = Duration::from_secs(60);
loop {
match self.connect_and_run().await {
Ok(_) => {
info!("Connection ended normally");
break;
}
Err(e) => {
error!("Connection error: {}", e);
info!("Reconnecting in {:?}...", reconnect_delay);
tokio::time::sleep(reconnect_delay).await;
reconnect_delay = std::cmp::min(reconnect_delay * 2, max_delay);
}
}
}
Ok(())
}
async fn connect_and_run(&self) -> Result<(), Box<dyn std::error::Error>> {
let (ws_stream, _) = connect_async(&self.ws_url).await?;
info!("Connected to Trading API");
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
let msg = msg?;
if let Message::Text(text) = msg {
match serde_json::from_str::<Value>(&text) {
Ok(event) => {
let event_type = event["type"].as_str().unwrap_or("unknown");
// Handle ping/pong
if event_type == "ping" {
let pong = Pong { msg_type: "pong".to_string() };
write.send(Message::Text(serde_json::to_string(&pong)?)).await?;
continue;
}
// Route to handler
self.route_event(&event, event_type);
// Acknowledge event
let event_id = event["id"].as_str().unwrap_or("unknown").to_string();
let ack = self.create_ack(event_id);
write.send(Message::Text(serde_json::to_string(&ack)?)).await?;
}
Err(e) => {
error!("Failed to parse message: {}", e);
}
}
}
}
Ok(())
}
fn route_event(&self, event: &Value, event_type: &str) {
let result = match event_type {
"candle" => self.handle_candle(event),
"quote" => self.handle_quote(event),
"order" => self.handle_order(event),
"position" => self.handle_position(event),
"error" => self.handle_error(event),
"connection" => self.handle_connection(event),
_ => {
debug!("Unhandled event type: {}", event_type);
Ok(())
}
};
if let Err(e) = result {
error!("Error handling {} event: {}", event_type, e);
}
}
fn handle_candle(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
let bar = event.get("bar").ok_or("missing bar field")?;
let symbol = bar["symbol"].as_str().unwrap_or("");
let close = bar["close"].as_str().unwrap_or("");
let volume = bar["volume"].as_str().unwrap_or("");
info!("Candle: {} close={} volume={}", symbol, close, volume);
Ok(())
}
fn handle_quote(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
let quote = event.get("quote").ok_or("missing quote field")?;
let symbol = quote["symbol"].as_str().unwrap_or("");
let bid = quote["bid"].as_str().unwrap_or("0");
let ask = quote["ask"].as_str().unwrap_or("0");
let spread = ask.parse::<f64>()? - bid.parse::<f64>()?;
info!("Quote: {} bid={} ask={} spread={:.5}", symbol, bid, ask, spread);
Ok(())
}
fn handle_order(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
let order_event = event["event"].as_str().unwrap_or("");
let order = event.get("order").ok_or("missing order field")?;
let order_id = order["id"].as_str().unwrap_or("");
let status = order["status"].as_str().unwrap_or("");
info!("Order {}: {} status={}", order_event, order_id, status);
match order_event {
"ORDER_FILLED" => {
let fill_price = order["average_fill_price"].as_str().unwrap_or("");
info!("Order {} filled at {}", order_id, fill_price);
}
"ORDER_REJECTED" => {
warn!("Order {} was rejected", order_id);
}
_ => {}
}
Ok(())
}
fn handle_position(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
let position_event = event["event"].as_str().unwrap_or("");
let position = event.get("position").ok_or("missing position field")?;
let symbol = position["symbol"].as_str().unwrap_or("");
let quantity = position["quantity"].as_str().unwrap_or("");
let pnl = position["unrealized_pnl"].as_str().unwrap_or("");
info!("Position {}: {} qty={} pnl={}", position_event, symbol, quantity, pnl);
Ok(())
}
fn handle_error(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
let code = event["code"].as_str().unwrap_or("UNKNOWN");
let message = event["message"].as_str().unwrap_or("");
error!("Server error [{}]: {}", code, message);
Ok(())
}
fn handle_connection(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
let conn_event = event["event"].as_str().unwrap_or("");
match conn_event {
"DISCONNECTING" => {
let error = event["error"].as_str().unwrap_or("unknown");
warn!("Disconnecting: {}", error);
}
"RECONNECTING" => {
info!("Attempting to reconnect...");
}
_ => {}
}
Ok(())
}
fn create_ack(&self, event_id: String) -> EventAck {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
EventAck {
msg_type: "event_ack".to_string(),
correlation_id: Uuid::new_v4().to_string(),
events_processed: vec![event_id],
timestamp,
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::init();
let strategy = TradingStrategy::new("ws://localhost:8082/ws");
strategy.run().await
}
Go
package main
import (
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
type EventAck struct {
Type string `json:"type"`
CorrelationID string `json:"correlation_id"`
EventsProcessed []string `json:"events_processed"`
Timestamp int64 `json:"timestamp"`
}
type Pong struct {
Type string `json:"type"`
}
type TradingStrategy struct {
wsURL string
conn *websocket.Conn
running bool
reconnectDelay time.Duration
maxDelay time.Duration
}
func NewTradingStrategy(wsURL string) *TradingStrategy {
return &TradingStrategy{
wsURL: wsURL,
running: true,
reconnectDelay: time.Second,
maxDelay: time.Minute,
}
}
func (s *TradingStrategy) Connect() error {
conn, _, err := websocket.DefaultDialer.Dial(s.wsURL, nil)
if err != nil {
return fmt.Errorf("connection failed: %w", err)
}
s.conn = conn
s.reconnectDelay = time.Second
log.Println("Connected to Trading API")
return nil
}
func (s *TradingStrategy) Run() {
for s.running {
if err := s.Connect(); err != nil {
log.Printf("Connection error: %v", err)
s.scheduleReconnect()
continue
}
if err := s.eventLoop(); err != nil {
log.Printf("Event loop error: %v", err)
s.scheduleReconnect()
}
}
}
func (s *TradingStrategy) eventLoop() error {
for {
_, message, err := s.conn.ReadMessage()
if err != nil {
return fmt.Errorf("read error: %w", err)
}
var event map[string]interface{}
if err := json.Unmarshal(message, &event); err != nil {
log.Printf("Failed to parse message: %v", err)
continue
}
eventType, _ := event["type"].(string)
// Handle ping/pong
if eventType == "ping" {
s.conn.WriteJSON(Pong{Type: "pong"})
continue
}
// Route to handler
s.routeEvent(event, eventType)
// Acknowledge event
eventID := "unknown"
if id, ok := event["id"].(string); ok {
eventID = id
}
s.sendAck(eventID)
}
}
func (s *TradingStrategy) routeEvent(event map[string]interface{}, eventType string) {
var err error
switch eventType {
case "candle":
err = s.handleCandle(event)
case "quote":
err = s.handleQuote(event)
case "order":
err = s.handleOrder(event)
case "position":
err = s.handlePosition(event)
case "error":
err = s.handleError(event)
case "connection":
err = s.handleConnection(event)
default:
log.Printf("Unhandled event type: %s", eventType)
return
}
if err != nil {
log.Printf("Error handling %s event: %v", eventType, err)
}
}
func (s *TradingStrategy) handleCandle(event map[string]interface{}) error {
bar, ok := event["bar"].(map[string]interface{})
if !ok {
return fmt.Errorf("missing bar field")
}
symbol := bar["symbol"]
close := bar["close"]
volume := bar["volume"]
log.Printf("Candle: %s close=%s volume=%s", symbol, close, volume)
return nil
}
func (s *TradingStrategy) handleQuote(event map[string]interface{}) error {
quote, ok := event["quote"].(map[string]interface{})
if !ok {
return fmt.Errorf("missing quote field")
}
symbol := quote["symbol"]
bid := quote["bid"]
ask := quote["ask"]
log.Printf("Quote: %s bid=%s ask=%s", symbol, bid, ask)
return nil
}
func (s *TradingStrategy) handleOrder(event map[string]interface{}) error {
orderEvent, _ := event["event"].(string)
order, ok := event["order"].(map[string]interface{})
if !ok {
return fmt.Errorf("missing order field")
}
orderID := order["id"]
status := order["status"]
log.Printf("Order %s: %s status=%s", orderEvent, orderID, status)
switch orderEvent {
case "ORDER_FILLED":
fillPrice := order["average_fill_price"]
log.Printf("Order %s filled at %s", orderID, fillPrice)
case "ORDER_REJECTED":
log.Printf("WARNING: Order %s was rejected", orderID)
}
return nil
}
func (s *TradingStrategy) handlePosition(event map[string]interface{}) error {
positionEvent, _ := event["event"].(string)
position, ok := event["position"].(map[string]interface{})
if !ok {
return fmt.Errorf("missing position field")
}
symbol := position["symbol"]
quantity := position["quantity"]
pnl := position["unrealized_pnl"]
log.Printf("Position %s: %s qty=%s pnl=%s", positionEvent, symbol, quantity, pnl)
return nil
}
func (s *TradingStrategy) handleError(event map[string]interface{}) error {
code, _ := event["code"].(string)
message, _ := event["message"].(string)
log.Printf("ERROR: Server error [%s]: %s", code, message)
return nil
}
func (s *TradingStrategy) handleConnection(event map[string]interface{}) error {
connEvent, _ := event["event"].(string)
switch connEvent {
case "DISCONNECTING":
errMsg, _ := event["error"].(string)
log.Printf("WARNING: Disconnecting: %s", errMsg)
case "RECONNECTING":
log.Println("Attempting to reconnect...")
}
return nil
}
func (s *TradingStrategy) sendAck(eventID string) {
ack := EventAck{
Type: "event_ack",
CorrelationID: uuid.New().String(),
EventsProcessed: []string{eventID},
Timestamp: time.Now().UnixMilli(),
}
s.conn.WriteJSON(ack)
}
func (s *TradingStrategy) scheduleReconnect() {
log.Printf("Reconnecting in %v...", s.reconnectDelay)
time.Sleep(s.reconnectDelay)
s.reconnectDelay *= 2
if s.reconnectDelay > s.maxDelay {
s.reconnectDelay = s.maxDelay
}
}
func (s *TradingStrategy) Stop() {
s.running = false
if s.conn != nil {
s.conn.Close()
}
}
func main() {
strategy := NewTradingStrategy("ws://localhost:8082/ws")
strategy.Run()
}
Error Handling Patterns
Connection Errors
Connection errors require graceful recovery with exponential backoff to avoid overwhelming the server:
- Detect disconnection - Listen for WebSocket close events
- Log the error - Record what happened for debugging
- Wait before reconnecting - Start with 1 second, double on each failure
- Cap the delay - Maximum 60 seconds between attempts
- Reset on success - Reset delay to 1 second after successful connection
Parsing Errors
When a malformed message arrives:
- Log the raw message - Helps debug protocol issues
- Skip the message - Don't crash the entire strategy
- Continue processing - The next message may be valid
Server Error Events
The server sends error events for protocol-level issues:
| Error Code | Action |
|---|---|
INVALID_MESSAGE | Check your message format, log details |
INTERNAL_ERROR | Server issue, may need to reconnect |
Best Practices
Logging
Effective logging helps debug production issues:
- Log event types received - Understand traffic patterns
- Log state changes - Order fills, position updates
- Log errors with context - Include event data that caused the error
- Use log levels - DEBUG for routine events, INFO for state changes, ERROR for failures
Graceful Degradation
When parts of your strategy fail:
- Isolate handlers - Errors in one handler shouldn't affect others
- Fallback behavior - If order placement fails, log and continue monitoring
- State recovery - On reconnect, re-query positions and open orders
Performance
For high-frequency strategies:
- Batch acknowledgments - Process multiple events before sending one ack
- Async handlers - Don't block the event loop with slow operations
- Queue events - Decouple receiving from processing
Next Steps
- Hello World - Minimal connection example
- Placing Orders - Order submission patterns
- Event Types Reference - Full event documentation