Handling Events

This guide covers common patterns for handling WebSocket events from the Trading API. You'll learn how to:

  • Route events by type to dedicated handlers
  • Process market data (candle and quote events)
  • React to order and position updates
  • Handle errors gracefully with logging and recovery
  • Implement reconnection logic for connection failures

Event Routing Pattern

All events from the Trading API include a type field that identifies the event category. The recommended pattern is to route events to dedicated handlers based on this type.

Python

import asyncio
import json
import logging
import uuid
from datetime import datetime
import websockets

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TradingStrategy:
    def __init__(self, ws_url: str):
        self.ws_url = ws_url
        self.ws = None
        self.running = False

    async def connect(self):
        try:
            self.ws = await websockets.connect(self.ws_url)
            self.running = True
            logger.info("Connected to Trading API")
            return True
        except Exception as e:
            logger.error(f"Connection failed: {e}")
            return False

    async def handle_candle(self, event: dict):
        """Handle candle/bar events for technical analysis."""
        bar = event.get("bar", {})
        symbol = bar.get("symbol")
        close = bar.get("close")
        volume = bar.get("volume")
        logger.info(f"Candle: {symbol} close={close} volume={volume}")

    async def handle_quote(self, event: dict):
        """Handle quote events for real-time pricing."""
        quote = event.get("quote", {})
        symbol = quote.get("symbol")
        bid = quote.get("bid")
        ask = quote.get("ask")
        spread = float(ask) - float(bid) if bid and ask else None
        logger.info(f"Quote: {symbol} bid={bid} ask={ask} spread={spread}")

    async def handle_order(self, event: dict):
        """Handle order state changes."""
        order_event = event.get("event")
        order = event.get("order", {})
        order_id = order.get("id")
        status = order.get("status")
        logger.info(f"Order {order_event}: {order_id} status={status}")

        if order_event == "ORDER_FILLED":
            fill_price = order.get("average_fill_price")
            logger.info(f"Order {order_id} filled at {fill_price}")
        elif order_event == "ORDER_REJECTED":
            logger.warning(f"Order {order_id} was rejected")

    async def handle_position(self, event: dict):
        """Handle position state changes."""
        position_event = event.get("event")
        position = event.get("position", {})
        symbol = position.get("symbol")
        quantity = position.get("quantity")
        pnl = position.get("unrealized_pnl")
        logger.info(f"Position {position_event}: {symbol} qty={quantity} pnl={pnl}")

    async def handle_error(self, event: dict):
        """Handle error events from the server."""
        code = event.get("code")
        message = event.get("message")
        logger.error(f"Server error [{code}]: {message}")

    async def handle_connection(self, event: dict):
        """Handle connection state changes."""
        conn_event = event.get("event")
        if conn_event == "DISCONNECTING":
            error = event.get("error")
            logger.warning(f"Disconnecting: {error}")
        elif conn_event == "RECONNECTING":
            logger.info("Attempting to reconnect...")

    async def route_event(self, event: dict):
        """Route events to appropriate handlers based on type."""
        event_type = event.get("type")

        handlers = {
            "candle": self.handle_candle,
            "quote": self.handle_quote,
            "order": self.handle_order,
            "position": self.handle_position,
            "error": self.handle_error,
            "connection": self.handle_connection,
        }

        handler = handlers.get(event_type)
        if handler:
            try:
                await handler(event)
            except Exception as e:
                logger.error(f"Error handling {event_type} event: {e}")
        else:
            logger.debug(f"Unhandled event type: {event_type}")

    async def send_ack(self, event_id: str):
        """Send event acknowledgment."""
        ack = {
            "type": "event_ack",
            "correlation_id": str(uuid.uuid4()),
            "events_processed": [event_id],
            "timestamp": int(datetime.now().timestamp() * 1000)
        }
        await self.ws.send(json.dumps(ack))

    async def run(self):
        """Main event loop with error handling and recovery."""
        while self.running:
            try:
                message = await self.ws.recv()
                event = json.loads(message)

                # Handle ping/pong
                if event.get("type") == "ping":
                    await self.ws.send(json.dumps({"type": "pong"}))
                    continue

                # Route to handler
                await self.route_event(event)

                # Acknowledge event
                event_id = event.get("id", "unknown")
                await self.send_ack(event_id)

            except json.JSONDecodeError as e:
                logger.error(f"Failed to parse message: {e}")
            except websockets.ConnectionClosed:
                logger.warning("Connection closed, attempting reconnect...")
                await self.reconnect()
            except Exception as e:
                logger.error(f"Unexpected error: {e}")

    async def reconnect(self):
        """Reconnect with exponential backoff."""
        delay = 1
        max_delay = 60
        while self.running:
            logger.info(f"Reconnecting in {delay}s...")
            await asyncio.sleep(delay)
            if await self.connect():
                return
            delay = min(delay * 2, max_delay)

async def main():
    strategy = TradingStrategy("ws://localhost:8082/ws")
    if await strategy.connect():
        await strategy.run()

if __name__ == "__main__":
    asyncio.run(main())

JavaScript

const WebSocket = require('ws');
const { v4: uuidv4 } = require('uuid');

class TradingStrategy {
  constructor(wsUrl) {
    this.wsUrl = wsUrl;
    this.ws = null;
    this.running = false;
    this.reconnectDelay = 1000;
    this.maxReconnectDelay = 60000;
  }

  connect() {
    return new Promise((resolve, reject) => {
      try {
        this.ws = new WebSocket(this.wsUrl);

        this.ws.onopen = () => {
          console.log('Connected to Trading API');
          this.running = true;
          this.reconnectDelay = 1000;
          resolve(true);
        };

        this.ws.onmessage = (msg) => this.onMessage(msg);
        this.ws.onerror = (err) => this.onError(err);
        this.ws.onclose = () => this.onClose();
      } catch (error) {
        console.error('Connection failed:', error);
        reject(error);
      }
    });
  }

  onMessage(msg) {
    try {
      const event = JSON.parse(msg.data);

      // Handle ping/pong
      if (event.type === 'ping') {
        this.ws.send(JSON.stringify({ type: 'pong' }));
        return;
      }

      // Route to handler
      this.routeEvent(event);

      // Acknowledge event
      this.sendAck(event.id || 'unknown');
    } catch (error) {
      console.error('Failed to parse message:', error);
    }
  }

  routeEvent(event) {
    const handlers = {
      candle: (e) => this.handleCandle(e),
      quote: (e) => this.handleQuote(e),
      order: (e) => this.handleOrder(e),
      position: (e) => this.handlePosition(e),
      error: (e) => this.handleError(e),
      connection: (e) => this.handleConnection(e),
    };

    const handler = handlers[event.type];
    if (handler) {
      try {
        handler(event);
      } catch (error) {
        console.error(`Error handling ${event.type} event:`, error);
      }
    } else {
      console.debug(`Unhandled event type: ${event.type}`);
    }
  }

  handleCandle(event) {
    const bar = event.bar || {};
    console.log(`Candle: ${bar.symbol} close=${bar.close} volume=${bar.volume}`);
  }

  handleQuote(event) {
    const quote = event.quote || {};
    const spread = quote.bid && quote.ask
      ? (parseFloat(quote.ask) - parseFloat(quote.bid)).toFixed(5)
      : null;
    console.log(`Quote: ${quote.symbol} bid=${quote.bid} ask=${quote.ask} spread=${spread}`);
  }

  handleOrder(event) {
    const order = event.order || {};
    console.log(`Order ${event.event}: ${order.id} status=${order.status}`);

    switch (event.event) {
      case 'ORDER_FILLED':
        console.log(`Order ${order.id} filled at ${order.average_fill_price}`);
        break;
      case 'ORDER_REJECTED':
        console.warn(`Order ${order.id} was rejected`);
        break;
    }
  }

  handlePosition(event) {
    const position = event.position || {};
    console.log(`Position ${event.event}: ${position.symbol} qty=${position.quantity} pnl=${position.unrealized_pnl}`);
  }

  handleError(event) {
    console.error(`Server error [${event.code}]: ${event.message}`);
  }

  handleConnection(event) {
    switch (event.event) {
      case 'DISCONNECTING':
        console.warn(`Disconnecting: ${event.error}`);
        break;
      case 'RECONNECTING':
        console.log('Attempting to reconnect...');
        break;
    }
  }

  sendAck(eventId) {
    const ack = {
      type: 'event_ack',
      correlation_id: uuidv4(),
      events_processed: [eventId],
      timestamp: Date.now(),
    };
    this.ws.send(JSON.stringify(ack));
  }

  onError(err) {
    console.error('WebSocket error:', err.message);
  }

  onClose() {
    console.warn('Connection closed');
    if (this.running) {
      this.reconnect();
    }
  }

  async reconnect() {
    console.log(`Reconnecting in ${this.reconnectDelay / 1000}s...`);
    await new Promise((r) => setTimeout(r, this.reconnectDelay));

    try {
      await this.connect();
    } catch (error) {
      this.reconnectDelay = Math.min(this.reconnectDelay * 2, this.maxReconnectDelay);
      this.reconnect();
    }
  }

  stop() {
    this.running = false;
    if (this.ws) {
      this.ws.close();
    }
  }
}

// Usage
const strategy = new TradingStrategy('ws://localhost:8082/ws');
strategy.connect().catch(console.error);

Java

import javax.websocket.*;
import java.net.URI;
import java.util.UUID;
import java.util.logging.Logger;
import java.util.logging.Level;
import com.google.gson.Gson;
import com.google.gson.JsonObject;

@ClientEndpoint
public class TradingStrategy {
    private static final Logger logger = Logger.getLogger(TradingStrategy.class.getName());
    private static final Gson gson = new Gson();
    private Session session;
    private final String wsUrl;
    private volatile boolean running = true;
    private int reconnectDelay = 1000;
    private static final int MAX_RECONNECT_DELAY = 60000;

    public TradingStrategy(String wsUrl) {
        this.wsUrl = wsUrl;
    }

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.reconnectDelay = 1000;
        logger.info("Connected to Trading API");
    }

    @OnMessage
    public void onMessage(String message) {
        try {
            JsonObject event = gson.fromJson(message, JsonObject.class);
            String type = event.get("type").getAsString();

            // Handle ping/pong
            if ("ping".equals(type)) {
                session.getBasicRemote().sendText("{\"type\":\"pong\"}");
                return;
            }

            // Route to handler
            routeEvent(event, type);

            // Acknowledge event
            String eventId = event.has("id") ? event.get("id").getAsString() : "unknown";
            sendAck(eventId);

        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to process message", e);
        }
    }

    private void routeEvent(JsonObject event, String type) {
        try {
            switch (type) {
                case "candle":
                    handleCandle(event);
                    break;
                case "quote":
                    handleQuote(event);
                    break;
                case "order":
                    handleOrder(event);
                    break;
                case "position":
                    handlePosition(event);
                    break;
                case "error":
                    handleError(event);
                    break;
                case "connection":
                    handleConnection(event);
                    break;
                default:
                    logger.fine("Unhandled event type: " + type);
            }
        } catch (Exception e) {
            logger.log(Level.WARNING, "Error handling " + type + " event", e);
        }
    }

    private void handleCandle(JsonObject event) {
        JsonObject bar = event.getAsJsonObject("bar");
        String symbol = bar.get("symbol").getAsString();
        String close = bar.get("close").getAsString();
        String volume = bar.get("volume").getAsString();
        logger.info(String.format("Candle: %s close=%s volume=%s", symbol, close, volume));
    }

    private void handleQuote(JsonObject event) {
        JsonObject quote = event.getAsJsonObject("quote");
        String symbol = quote.get("symbol").getAsString();
        String bid = quote.get("bid").getAsString();
        String ask = quote.get("ask").getAsString();
        double spread = Double.parseDouble(ask) - Double.parseDouble(bid);
        logger.info(String.format("Quote: %s bid=%s ask=%s spread=%.5f", symbol, bid, ask, spread));
    }

    private void handleOrder(JsonObject event) {
        String orderEvent = event.get("event").getAsString();
        JsonObject order = event.getAsJsonObject("order");
        String orderId = order.get("id").getAsString();
        String status = order.get("status").getAsString();
        logger.info(String.format("Order %s: %s status=%s", orderEvent, orderId, status));

        switch (orderEvent) {
            case "ORDER_FILLED":
                String fillPrice = order.get("average_fill_price").getAsString();
                logger.info(String.format("Order %s filled at %s", orderId, fillPrice));
                break;
            case "ORDER_REJECTED":
                logger.warning(String.format("Order %s was rejected", orderId));
                break;
        }
    }

    private void handlePosition(JsonObject event) {
        String positionEvent = event.get("event").getAsString();
        JsonObject position = event.getAsJsonObject("position");
        String symbol = position.get("symbol").getAsString();
        String quantity = position.get("quantity").getAsString();
        String pnl = position.get("unrealized_pnl").getAsString();
        logger.info(String.format("Position %s: %s qty=%s pnl=%s",
            positionEvent, symbol, quantity, pnl));
    }

    private void handleError(JsonObject event) {
        String code = event.get("code").getAsString();
        String message = event.get("message").getAsString();
        logger.severe(String.format("Server error [%s]: %s", code, message));
    }

    private void handleConnection(JsonObject event) {
        String connEvent = event.get("event").getAsString();
        switch (connEvent) {
            case "DISCONNECTING":
                String error = event.has("error") ? event.get("error").getAsString() : "unknown";
                logger.warning("Disconnecting: " + error);
                break;
            case "RECONNECTING":
                logger.info("Attempting to reconnect...");
                break;
        }
    }

    private void sendAck(String eventId) throws Exception {
        JsonObject ack = new JsonObject();
        ack.addProperty("type", "event_ack");
        ack.addProperty("correlation_id", UUID.randomUUID().toString());
        ack.add("events_processed", gson.toJsonTree(new String[]{eventId}));
        ack.addProperty("timestamp", System.currentTimeMillis());
        session.getBasicRemote().sendText(gson.toJson(ack));
    }

    @OnClose
    public void onClose() {
        logger.warning("Connection closed");
        if (running) {
            reconnect();
        }
    }

    @OnError
    public void onError(Throwable error) {
        logger.log(Level.SEVERE, "WebSocket error", error);
    }

    private void reconnect() {
        while (running) {
            try {
                logger.info(String.format("Reconnecting in %ds...", reconnectDelay / 1000));
                Thread.sleep(reconnectDelay);

                WebSocketContainer container = ContainerProvider.getWebSocketContainer();
                container.connectToServer(this, new URI(wsUrl));
                return;

            } catch (Exception e) {
                logger.log(Level.WARNING, "Reconnect failed", e);
                reconnectDelay = Math.min(reconnectDelay * 2, MAX_RECONNECT_DELAY);
            }
        }
    }

    public void stop() {
        running = false;
        try {
            if (session != null) {
                session.close();
            }
        } catch (Exception e) {
            logger.log(Level.WARNING, "Error closing session", e);
        }
    }

    public static void main(String[] args) throws Exception {
        TradingStrategy strategy = new TradingStrategy("ws://localhost:8082/ws");
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(strategy, new URI(strategy.wsUrl));
        Thread.sleep(Long.MAX_VALUE);
    }
}

Rust

use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{info, warn, error, debug};

#[derive(Serialize)]
struct EventAck {
    #[serde(rename = "type")]
    msg_type: String,
    correlation_id: String,
    events_processed: Vec<String>,
    timestamp: u64,
}

#[derive(Serialize)]
struct Pong {
    #[serde(rename = "type")]
    msg_type: String,
}

struct TradingStrategy {
    ws_url: String,
}

impl TradingStrategy {
    fn new(ws_url: &str) -> Self {
        Self {
            ws_url: ws_url.to_string(),
        }
    }

    async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
        let mut reconnect_delay = Duration::from_secs(1);
        let max_delay = Duration::from_secs(60);

        loop {
            match self.connect_and_run().await {
                Ok(_) => {
                    info!("Connection ended normally");
                    break;
                }
                Err(e) => {
                    error!("Connection error: {}", e);
                    info!("Reconnecting in {:?}...", reconnect_delay);
                    tokio::time::sleep(reconnect_delay).await;
                    reconnect_delay = std::cmp::min(reconnect_delay * 2, max_delay);
                }
            }
        }
        Ok(())
    }

    async fn connect_and_run(&self) -> Result<(), Box<dyn std::error::Error>> {
        let (ws_stream, _) = connect_async(&self.ws_url).await?;
        info!("Connected to Trading API");

        let (mut write, mut read) = ws_stream.split();

        while let Some(msg) = read.next().await {
            let msg = msg?;
            if let Message::Text(text) = msg {
                match serde_json::from_str::<Value>(&text) {
                    Ok(event) => {
                        let event_type = event["type"].as_str().unwrap_or("unknown");

                        // Handle ping/pong
                        if event_type == "ping" {
                            let pong = Pong { msg_type: "pong".to_string() };
                            write.send(Message::Text(serde_json::to_string(&pong)?)).await?;
                            continue;
                        }

                        // Route to handler
                        self.route_event(&event, event_type);

                        // Acknowledge event
                        let event_id = event["id"].as_str().unwrap_or("unknown").to_string();
                        let ack = self.create_ack(event_id);
                        write.send(Message::Text(serde_json::to_string(&ack)?)).await?;
                    }
                    Err(e) => {
                        error!("Failed to parse message: {}", e);
                    }
                }
            }
        }
        Ok(())
    }

    fn route_event(&self, event: &Value, event_type: &str) {
        let result = match event_type {
            "candle" => self.handle_candle(event),
            "quote" => self.handle_quote(event),
            "order" => self.handle_order(event),
            "position" => self.handle_position(event),
            "error" => self.handle_error(event),
            "connection" => self.handle_connection(event),
            _ => {
                debug!("Unhandled event type: {}", event_type);
                Ok(())
            }
        };

        if let Err(e) = result {
            error!("Error handling {} event: {}", event_type, e);
        }
    }

    fn handle_candle(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
        let bar = event.get("bar").ok_or("missing bar field")?;
        let symbol = bar["symbol"].as_str().unwrap_or("");
        let close = bar["close"].as_str().unwrap_or("");
        let volume = bar["volume"].as_str().unwrap_or("");
        info!("Candle: {} close={} volume={}", symbol, close, volume);
        Ok(())
    }

    fn handle_quote(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
        let quote = event.get("quote").ok_or("missing quote field")?;
        let symbol = quote["symbol"].as_str().unwrap_or("");
        let bid = quote["bid"].as_str().unwrap_or("0");
        let ask = quote["ask"].as_str().unwrap_or("0");
        let spread = ask.parse::<f64>()? - bid.parse::<f64>()?;
        info!("Quote: {} bid={} ask={} spread={:.5}", symbol, bid, ask, spread);
        Ok(())
    }

    fn handle_order(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
        let order_event = event["event"].as_str().unwrap_or("");
        let order = event.get("order").ok_or("missing order field")?;
        let order_id = order["id"].as_str().unwrap_or("");
        let status = order["status"].as_str().unwrap_or("");
        info!("Order {}: {} status={}", order_event, order_id, status);

        match order_event {
            "ORDER_FILLED" => {
                let fill_price = order["average_fill_price"].as_str().unwrap_or("");
                info!("Order {} filled at {}", order_id, fill_price);
            }
            "ORDER_REJECTED" => {
                warn!("Order {} was rejected", order_id);
            }
            _ => {}
        }
        Ok(())
    }

    fn handle_position(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
        let position_event = event["event"].as_str().unwrap_or("");
        let position = event.get("position").ok_or("missing position field")?;
        let symbol = position["symbol"].as_str().unwrap_or("");
        let quantity = position["quantity"].as_str().unwrap_or("");
        let pnl = position["unrealized_pnl"].as_str().unwrap_or("");
        info!("Position {}: {} qty={} pnl={}", position_event, symbol, quantity, pnl);
        Ok(())
    }

    fn handle_error(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
        let code = event["code"].as_str().unwrap_or("UNKNOWN");
        let message = event["message"].as_str().unwrap_or("");
        error!("Server error [{}]: {}", code, message);
        Ok(())
    }

    fn handle_connection(&self, event: &Value) -> Result<(), Box<dyn std::error::Error>> {
        let conn_event = event["event"].as_str().unwrap_or("");
        match conn_event {
            "DISCONNECTING" => {
                let error = event["error"].as_str().unwrap_or("unknown");
                warn!("Disconnecting: {}", error);
            }
            "RECONNECTING" => {
                info!("Attempting to reconnect...");
            }
            _ => {}
        }
        Ok(())
    }

    fn create_ack(&self, event_id: String) -> EventAck {
        let timestamp = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_millis() as u64;

        EventAck {
            msg_type: "event_ack".to_string(),
            correlation_id: Uuid::new_v4().to_string(),
            events_processed: vec![event_id],
            timestamp,
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::init();
    let strategy = TradingStrategy::new("ws://localhost:8082/ws");
    strategy.run().await
}

Go

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	"github.com/gorilla/websocket"
)

type EventAck struct {
	Type            string   `json:"type"`
	CorrelationID   string   `json:"correlation_id"`
	EventsProcessed []string `json:"events_processed"`
	Timestamp       int64    `json:"timestamp"`
}

type Pong struct {
	Type string `json:"type"`
}

type TradingStrategy struct {
	wsURL          string
	conn           *websocket.Conn
	running        bool
	reconnectDelay time.Duration
	maxDelay       time.Duration
}

func NewTradingStrategy(wsURL string) *TradingStrategy {
	return &TradingStrategy{
		wsURL:          wsURL,
		running:        true,
		reconnectDelay: time.Second,
		maxDelay:       time.Minute,
	}
}

func (s *TradingStrategy) Connect() error {
	conn, _, err := websocket.DefaultDialer.Dial(s.wsURL, nil)
	if err != nil {
		return fmt.Errorf("connection failed: %w", err)
	}
	s.conn = conn
	s.reconnectDelay = time.Second
	log.Println("Connected to Trading API")
	return nil
}

func (s *TradingStrategy) Run() {
	for s.running {
		if err := s.Connect(); err != nil {
			log.Printf("Connection error: %v", err)
			s.scheduleReconnect()
			continue
		}

		if err := s.eventLoop(); err != nil {
			log.Printf("Event loop error: %v", err)
			s.scheduleReconnect()
		}
	}
}

func (s *TradingStrategy) eventLoop() error {
	for {
		_, message, err := s.conn.ReadMessage()
		if err != nil {
			return fmt.Errorf("read error: %w", err)
		}

		var event map[string]interface{}
		if err := json.Unmarshal(message, &event); err != nil {
			log.Printf("Failed to parse message: %v", err)
			continue
		}

		eventType, _ := event["type"].(string)

		// Handle ping/pong
		if eventType == "ping" {
			s.conn.WriteJSON(Pong{Type: "pong"})
			continue
		}

		// Route to handler
		s.routeEvent(event, eventType)

		// Acknowledge event
		eventID := "unknown"
		if id, ok := event["id"].(string); ok {
			eventID = id
		}
		s.sendAck(eventID)
	}
}

func (s *TradingStrategy) routeEvent(event map[string]interface{}, eventType string) {
	var err error

	switch eventType {
	case "candle":
		err = s.handleCandle(event)
	case "quote":
		err = s.handleQuote(event)
	case "order":
		err = s.handleOrder(event)
	case "position":
		err = s.handlePosition(event)
	case "error":
		err = s.handleError(event)
	case "connection":
		err = s.handleConnection(event)
	default:
		log.Printf("Unhandled event type: %s", eventType)
		return
	}

	if err != nil {
		log.Printf("Error handling %s event: %v", eventType, err)
	}
}

func (s *TradingStrategy) handleCandle(event map[string]interface{}) error {
	bar, ok := event["bar"].(map[string]interface{})
	if !ok {
		return fmt.Errorf("missing bar field")
	}
	symbol := bar["symbol"]
	close := bar["close"]
	volume := bar["volume"]
	log.Printf("Candle: %s close=%s volume=%s", symbol, close, volume)
	return nil
}

func (s *TradingStrategy) handleQuote(event map[string]interface{}) error {
	quote, ok := event["quote"].(map[string]interface{})
	if !ok {
		return fmt.Errorf("missing quote field")
	}
	symbol := quote["symbol"]
	bid := quote["bid"]
	ask := quote["ask"]
	log.Printf("Quote: %s bid=%s ask=%s", symbol, bid, ask)
	return nil
}

func (s *TradingStrategy) handleOrder(event map[string]interface{}) error {
	orderEvent, _ := event["event"].(string)
	order, ok := event["order"].(map[string]interface{})
	if !ok {
		return fmt.Errorf("missing order field")
	}
	orderID := order["id"]
	status := order["status"]
	log.Printf("Order %s: %s status=%s", orderEvent, orderID, status)

	switch orderEvent {
	case "ORDER_FILLED":
		fillPrice := order["average_fill_price"]
		log.Printf("Order %s filled at %s", orderID, fillPrice)
	case "ORDER_REJECTED":
		log.Printf("WARNING: Order %s was rejected", orderID)
	}
	return nil
}

func (s *TradingStrategy) handlePosition(event map[string]interface{}) error {
	positionEvent, _ := event["event"].(string)
	position, ok := event["position"].(map[string]interface{})
	if !ok {
		return fmt.Errorf("missing position field")
	}
	symbol := position["symbol"]
	quantity := position["quantity"]
	pnl := position["unrealized_pnl"]
	log.Printf("Position %s: %s qty=%s pnl=%s", positionEvent, symbol, quantity, pnl)
	return nil
}

func (s *TradingStrategy) handleError(event map[string]interface{}) error {
	code, _ := event["code"].(string)
	message, _ := event["message"].(string)
	log.Printf("ERROR: Server error [%s]: %s", code, message)
	return nil
}

func (s *TradingStrategy) handleConnection(event map[string]interface{}) error {
	connEvent, _ := event["event"].(string)
	switch connEvent {
	case "DISCONNECTING":
		errMsg, _ := event["error"].(string)
		log.Printf("WARNING: Disconnecting: %s", errMsg)
	case "RECONNECTING":
		log.Println("Attempting to reconnect...")
	}
	return nil
}

func (s *TradingStrategy) sendAck(eventID string) {
	ack := EventAck{
		Type:            "event_ack",
		CorrelationID:   uuid.New().String(),
		EventsProcessed: []string{eventID},
		Timestamp:       time.Now().UnixMilli(),
	}
	s.conn.WriteJSON(ack)
}

func (s *TradingStrategy) scheduleReconnect() {
	log.Printf("Reconnecting in %v...", s.reconnectDelay)
	time.Sleep(s.reconnectDelay)
	s.reconnectDelay *= 2
	if s.reconnectDelay > s.maxDelay {
		s.reconnectDelay = s.maxDelay
	}
}

func (s *TradingStrategy) Stop() {
	s.running = false
	if s.conn != nil {
		s.conn.Close()
	}
}

func main() {
	strategy := NewTradingStrategy("ws://localhost:8082/ws")
	strategy.Run()
}

Error Handling Patterns

Connection Errors

Connection errors require graceful recovery with exponential backoff to avoid overwhelming the server:

  1. Detect disconnection - Listen for WebSocket close events
  2. Log the error - Record what happened for debugging
  3. Wait before reconnecting - Start with 1 second, double on each failure
  4. Cap the delay - Maximum 60 seconds between attempts
  5. Reset on success - Reset delay to 1 second after successful connection

Parsing Errors

When a malformed message arrives:

  1. Log the raw message - Helps debug protocol issues
  2. Skip the message - Don't crash the entire strategy
  3. Continue processing - The next message may be valid

Server Error Events

The server sends error events for protocol-level issues:

Error CodeAction
INVALID_MESSAGECheck your message format, log details
INTERNAL_ERRORServer issue, may need to reconnect

Best Practices

Logging

Effective logging helps debug production issues:

  • Log event types received - Understand traffic patterns
  • Log state changes - Order fills, position updates
  • Log errors with context - Include event data that caused the error
  • Use log levels - DEBUG for routine events, INFO for state changes, ERROR for failures

Graceful Degradation

When parts of your strategy fail:

  • Isolate handlers - Errors in one handler shouldn't affect others
  • Fallback behavior - If order placement fails, log and continue monitoring
  • State recovery - On reconnect, re-query positions and open orders

Performance

For high-frequency strategies:

  • Batch acknowledgments - Process multiple events before sending one ack
  • Async handlers - Don't block the event loop with slow operations
  • Queue events - Decouple receiving from processing

Next Steps