From ff19223fbcab7d4567ab9c5a81f255ede02c6147 Mon Sep 17 00:00:00 2001 From: Jaden Fix Date: Mon, 16 Feb 2026 21:02:16 -0800 Subject: [PATCH 1/3] Generalize trading daemon for multi-asset venues and crypto --- .../extensions/trading-bridge/src/index.ts | 290 ++++ crates/Cargo.lock | 194 +++ crates/Cargo.toml | 7 + crates/exchange_coinbase_spot/Cargo.toml | 12 + crates/exchange_coinbase_spot/src/lib.rs | 207 +++ crates/exchange_core/Cargo.toml | 2 + crates/exchange_core/src/lib.rs | 91 +- crates/exchange_derivatives_paper/Cargo.toml | 12 + crates/exchange_derivatives_paper/src/lib.rs | 208 +++ crates/exchange_kalshi/Cargo.toml | 12 + crates/exchange_kalshi/src/lib.rs | 206 +++ crates/risk_core/src/lib.rs | 114 ++ crates/strategy_crypto_momentum/Cargo.toml | 9 + crates/strategy_crypto_momentum/src/lib.rs | 83 + crates/strategy_kalshi_arbitrage/Cargo.toml | 9 + crates/strategy_kalshi_arbitrage/src/lib.rs | 82 + crates/strategy_kalshi_weather/Cargo.toml | 9 + crates/strategy_kalshi_weather/src/lib.rs | 73 + crates/trading_daemon/Cargo.toml | 6 + crates/trading_daemon/Dockerfile | 7 + .../migrations/0001_runtime.sql | 49 + crates/trading_daemon/src/main.rs | 1343 ++++++++++++++++- crates/trading_domain/Cargo.toml | 7 + crates/trading_domain/src/lib.rs | 173 +++ crates/trading_protocol/src/lib.rs | 237 ++- crates/tradingctl/src/main.rs | 179 ++- trading-cli | 24 +- 27 files changed, 3558 insertions(+), 87 deletions(-) create mode 100644 crates/exchange_coinbase_spot/Cargo.toml create mode 100644 crates/exchange_coinbase_spot/src/lib.rs create mode 100644 crates/exchange_derivatives_paper/Cargo.toml create mode 100644 crates/exchange_derivatives_paper/src/lib.rs create mode 100644 crates/exchange_kalshi/Cargo.toml create mode 100644 crates/exchange_kalshi/src/lib.rs create mode 100644 crates/strategy_crypto_momentum/Cargo.toml create mode 100644 crates/strategy_crypto_momentum/src/lib.rs create mode 100644 crates/strategy_kalshi_arbitrage/Cargo.toml create mode 100644 crates/strategy_kalshi_arbitrage/src/lib.rs create mode 100644 crates/strategy_kalshi_weather/Cargo.toml create mode 100644 crates/strategy_kalshi_weather/src/lib.rs create mode 100644 crates/trading_daemon/migrations/0001_runtime.sql create mode 100644 crates/trading_domain/Cargo.toml create mode 100644 crates/trading_domain/src/lib.rs diff --git a/.openclaw/extensions/trading-bridge/src/index.ts b/.openclaw/extensions/trading-bridge/src/index.ts index ac7494ca..9a2339bb 100644 --- a/.openclaw/extensions/trading-bridge/src/index.ts +++ b/.openclaw/extensions/trading-bridge/src/index.ts @@ -32,6 +32,14 @@ type StrategyManageAction = | "upload_candidate" | "promote_candidate"; +type VenueManageAction = "list" | "enable" | "disable" | "status"; + +type OrderManageAction = "submit" | "cancel" | "list"; + +type PortfolioStatusAction = "balances" | "positions"; + +type ExecutionModeAction = "get" | "set"; + const DEFAULT_SOCKET_PATH = "/var/run/openclaw/trading.sock"; const RECONNECT_DELAY_MS = 3_000; const REQUEST_TIMEOUT_MS = 5_000; @@ -255,6 +263,13 @@ function asString(value: unknown, field: string): string { return value.trim(); } +function asNumber(value: unknown, field: string): number { + if (typeof value !== "number" || Number.isNaN(value)) { + throw new Error(`Missing required field '${field}'`); + } + return value; +} + // OpenClaw extension entry point. // @ts-ignore export default function (api: any) { @@ -557,6 +572,281 @@ export default function (api: any) { }, }); + api.registerTool({ + name: "trading_venue_manage", + description: "List, enable, disable, or inspect venue status.", + parameters: { + type: "object", + properties: { + action: { type: "string", enum: ["list", "enable", "disable", "status"] }, + venue_id: { type: "string" }, + }, + required: ["action"], + }, + execute: async (input: { action: VenueManageAction; venue_id?: unknown }) => { + if (!client.state.connected) { + return { + sent: false, + action: input.action, + connected: false, + error: "Trading daemon is not connected", + }; + } + + try { + let kind = ""; + let payload: Record = {}; + switch (input.action) { + case "list": + kind = "Venue.List"; + break; + case "enable": + kind = "Venue.Enable"; + payload = { venue_id: asString(input.venue_id, "venue_id") }; + break; + case "disable": + kind = "Venue.Disable"; + payload = { venue_id: asString(input.venue_id, "venue_id") }; + break; + case "status": + kind = "Venue.Status"; + payload = { venue_id: asString(input.venue_id, "venue_id") }; + break; + default: + throw new Error(`Unsupported venue action '${input.action}'`); + } + + const response = await client.request(kind, payload); + return { + sent: true, + action: input.action, + response: response.payload, + }; + } catch (error) { + return { + sent: false, + action: input.action, + connected: client.state.connected, + error: toErrorMessage(error), + }; + } + }, + }); + + api.registerTool({ + name: "trading_order_manage", + description: "Submit, cancel, or list orders across enabled venues.", + parameters: { + type: "object", + properties: { + action: { type: "string", enum: ["submit", "cancel", "list"] }, + strategy_id: { type: "string" }, + venue_id: { type: "string" }, + symbol: { type: "string" }, + asset_class: { type: "string" }, + market_type: { type: "string" }, + side: { type: "string" }, + order_type: { type: "string" }, + quantity: { type: "number" }, + limit_price: { type: "number" }, + tif: { type: "string" }, + post_only: { type: "boolean" }, + reduce_only: { type: "boolean" }, + client_order_id: { type: "string" }, + expiry_ts_ms: { type: "number" }, + strike: { type: "number" }, + option_type: { type: "string" }, + venue_order_id: { type: "string" }, + }, + required: ["action"], + }, + execute: async (input: { + action: OrderManageAction; + strategy_id?: unknown; + venue_id?: unknown; + symbol?: unknown; + asset_class?: unknown; + market_type?: unknown; + side?: unknown; + order_type?: unknown; + quantity?: unknown; + limit_price?: unknown; + tif?: unknown; + post_only?: unknown; + reduce_only?: unknown; + client_order_id?: unknown; + expiry_ts_ms?: unknown; + strike?: unknown; + option_type?: unknown; + venue_order_id?: unknown; + }) => { + if (!client.state.connected) { + return { + sent: false, + action: input.action, + connected: false, + error: "Trading daemon is not connected", + }; + } + + try { + let kind = ""; + let payload: Record = {}; + + switch (input.action) { + case "submit": + kind = "Order.Submit"; + payload = { + strategy_id: asString(input.strategy_id, "strategy_id"), + venue_id: asString(input.venue_id, "venue_id"), + instrument: { + venue_id: asString(input.venue_id, "venue_id"), + symbol: asString(input.symbol, "symbol"), + asset_class: asString(input.asset_class, "asset_class"), + market_type: asString(input.market_type, "market_type"), + expiry_ts_ms: typeof input.expiry_ts_ms === "number" ? input.expiry_ts_ms : null, + strike: typeof input.strike === "number" ? input.strike : null, + option_type: typeof input.option_type === "string" ? input.option_type : null, + }, + side: asString(input.side, "side"), + order_type: asString(input.order_type, "order_type"), + quantity: asNumber(input.quantity, "quantity"), + limit_price: typeof input.limit_price === "number" ? input.limit_price : null, + tif: typeof input.tif === "string" ? input.tif : null, + post_only: typeof input.post_only === "boolean" ? input.post_only : false, + reduce_only: typeof input.reduce_only === "boolean" ? input.reduce_only : false, + client_order_id: asString(input.client_order_id, "client_order_id"), + }; + break; + case "cancel": + kind = "Order.Cancel"; + payload = { + venue_id: typeof input.venue_id === "string" ? input.venue_id : null, + venue_order_id: asString(input.venue_order_id, "venue_order_id"), + }; + break; + case "list": + kind = "Order.List"; + payload = { + venue_id: typeof input.venue_id === "string" ? input.venue_id : null, + }; + break; + default: + throw new Error(`Unsupported order action '${input.action}'`); + } + + const response = await client.request(kind, payload); + return { + sent: true, + action: input.action, + response: response.payload, + }; + } catch (error) { + return { + sent: false, + action: input.action, + connected: client.state.connected, + error: toErrorMessage(error), + }; + } + }, + }); + + api.registerTool({ + name: "trading_portfolio_status", + description: "Get normalized balances or positions across enabled venues.", + parameters: { + type: "object", + properties: { + action: { type: "string", enum: ["balances", "positions"] }, + }, + required: ["action"], + }, + execute: async (input: { action: PortfolioStatusAction }) => { + if (!client.state.connected) { + return { + connected: false, + socketPath, + action: input.action, + portfolio: null, + error: "Trading daemon is not connected", + }; + } + + try { + const kind = input.action === "balances" ? "Portfolio.Balances" : "Portfolio.Positions"; + const response = await client.request(kind, {}); + return { + connected: true, + socketPath, + action: input.action, + portfolio: response.payload, + }; + } catch (error) { + return { + connected: client.state.connected, + socketPath, + action: input.action, + error: toErrorMessage(error), + portfolio: null, + }; + } + }, + }); + + api.registerTool({ + name: "trading_execution_mode", + description: "Get or set paper/live execution mode for a venue market type.", + parameters: { + type: "object", + properties: { + action: { type: "string", enum: ["get", "set"] }, + venue_id: { type: "string" }, + market_type: { type: "string" }, + mode: { type: "string", enum: ["paper", "live"] }, + }, + required: ["action", "venue_id", "market_type"], + }, + execute: async (input: { + action: ExecutionModeAction; + venue_id?: unknown; + market_type?: unknown; + mode?: unknown; + }) => { + if (!client.state.connected) { + return { + sent: false, + action: input.action, + connected: false, + error: "Trading daemon is not connected", + }; + } + + try { + const kind = input.action === "set" ? "ExecutionMode.Set" : "ExecutionMode.Get"; + const payload = { + venue_id: asString(input.venue_id, "venue_id"), + market_type: asString(input.market_type, "market_type"), + mode: input.action === "set" ? asString(input.mode, "mode") : "", + }; + + const response = await client.request(kind, payload); + return { + sent: true, + action: input.action, + response: response.payload, + }; + } catch (error) { + return { + sent: false, + action: input.action, + connected: client.state.connected, + error: toErrorMessage(error), + }; + } + }, + }); + client.on("envelope", (envelope: Envelope) => { // Broadcast both generic alerts and explicit risk alerts to operator channels. if (envelope.type !== "Event.Alert" && envelope.type !== "Event.RiskAlert") { diff --git a/crates/Cargo.lock b/crates/Cargo.lock index 6e7346c3..22f611c6 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "android_system_properties" version = "0.1.5" @@ -67,6 +79,17 @@ version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.5.0" @@ -189,13 +212,63 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "exchange_coinbase_spot" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "exchange_core", + "tokio", + "trading_domain", + "uuid", +] + [[package]] name = "exchange_core" version = "0.1.0" dependencies = [ + "async-trait", "serde", + "trading_domain", +] + +[[package]] +name = "exchange_derivatives_paper" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "exchange_core", + "tokio", + "trading_domain", + "uuid", +] + +[[package]] +name = "exchange_kalshi" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "exchange_core", + "tokio", + "trading_domain", + "uuid", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -319,6 +392,15 @@ dependencies = [ "wasip3", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -334,6 +416,15 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.5.0" @@ -422,6 +513,17 @@ version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +[[package]] +name = "libsqlite3-sys" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c10584274047cb335c23d3e61bcef8e323adae7c5c8c760540f73610177fc3f" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "lock_api" version = "0.4.14" @@ -513,6 +615,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "prettyplease" version = "0.2.37" @@ -563,6 +671,20 @@ dependencies = [ "serde", ] +[[package]] +name = "rusqlite" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b838eba278d213a8beaf485bd313fd580ca4505a00d5871caeb1457c55322cae" +dependencies = [ + "bitflags", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -679,6 +801,33 @@ dependencies = [ "serde", ] +[[package]] +name = "strategy_crypto_momentum" +version = "0.1.0" +dependencies = [ + "exchange_core", + "strategy_core", + "trading_domain", +] + +[[package]] +name = "strategy_kalshi_arbitrage" +version = "0.1.0" +dependencies = [ + "exchange_core", + "strategy_core", + "trading_domain", +] + +[[package]] +name = "strategy_kalshi_weather" +version = "0.1.0" +dependencies = [ + "exchange_core", + "strategy_core", + "trading_domain", +] + [[package]] name = "strsim" version = "0.11.1" @@ -809,9 +958,14 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "exchange_coinbase_spot", + "exchange_core", + "exchange_derivatives_paper", + "exchange_kalshi", "fs2", "futures", "risk_core", + "rusqlite", "serde", "serde_json", "strategy_core", @@ -819,9 +973,17 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "trading_domain", "trading_protocol", ] +[[package]] +name = "trading_domain" +version = "0.1.0" +dependencies = [ + "serde", +] + [[package]] name = "trading_protocol" version = "0.1.0" @@ -883,6 +1045,18 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1238,6 +1412,26 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "zerocopy" +version = "0.8.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/crates/Cargo.toml b/crates/Cargo.toml index 187a9dd8..f26e7ef6 100644 --- a/crates/Cargo.toml +++ b/crates/Cargo.toml @@ -1,10 +1,17 @@ [workspace] resolver = "2" members = [ + "trading_domain", "trading_protocol", "trading_daemon", "tradingctl", "exchange_core", + "exchange_kalshi", + "exchange_coinbase_spot", + "exchange_derivatives_paper", "strategy_core", + "strategy_kalshi_weather", + "strategy_kalshi_arbitrage", + "strategy_crypto_momentum", "risk_core", ] diff --git a/crates/exchange_coinbase_spot/Cargo.toml b/crates/exchange_coinbase_spot/Cargo.toml new file mode 100644 index 00000000..a5619b3f --- /dev/null +++ b/crates/exchange_coinbase_spot/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "exchange_coinbase_spot" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +exchange_core = { path = "../exchange_core" } +trading_domain = { path = "../trading_domain" } +tokio = { version = "1", features = ["sync"] } +uuid = { version = "1", features = ["v4"] } +chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/crates/exchange_coinbase_spot/src/lib.rs b/crates/exchange_coinbase_spot/src/lib.rs new file mode 100644 index 00000000..c93c3d01 --- /dev/null +++ b/crates/exchange_coinbase_spot/src/lib.rs @@ -0,0 +1,207 @@ +//! Coinbase Advanced Trade (spot) adapter. +//! +//! The implementation is intentionally conservative and stateful so runtime control +//! flows can be validated in both paper and live modes. + +use std::collections::HashMap; + +use async_trait::async_trait; +use exchange_core::{ExchangeAdapter, ExchangeError}; +use tokio::sync::RwLock; +use trading_domain::{ + AssetClass, BalanceSnapshot, ExecutionMode, InstrumentId, MarketType, OrderAck, OrderRequest, + OrderSide, OrderStatus, OrderSummary, PositionSnapshot, VenueCapability, VenueHealth, +}; +use uuid::Uuid; + +#[derive(Debug, Default)] +struct AdapterState { + positions: HashMap, + orders: HashMap, +} + +#[derive(Debug)] +pub struct CoinbaseSpotAdapter { + mode: ExecutionMode, + state: RwLock, +} + +impl CoinbaseSpotAdapter { + pub fn new(mode: ExecutionMode) -> Self { + Self { + mode, + state: RwLock::new(AdapterState::default()), + } + } + + fn validate_order(req: &OrderRequest) -> Result<(), ExchangeError> { + if req.instrument.market_type != MarketType::Spot + || req.instrument.asset_class != AssetClass::Crypto + { + return Err(ExchangeError { + code: "unsupported_market_type".to_string(), + message: "Coinbase spot adapter only accepts crypto spot instruments".to_string(), + retriable: false, + }); + } + if req.quantity <= 0.0 { + return Err(ExchangeError { + code: "invalid_quantity".to_string(), + message: "order quantity must be > 0".to_string(), + retriable: false, + }); + } + Ok(()) + } +} + +#[async_trait] +impl ExchangeAdapter for CoinbaseSpotAdapter { + fn venue(&self) -> &'static str { + "coinbase_spot" + } + + fn capabilities(&self) -> Vec { + vec![VenueCapability { + market_type: MarketType::Spot, + supports_live: true, + supports_paper: true, + supports_post_only: true, + supports_reduce_only: false, + }] + } + + fn execution_mode(&self) -> ExecutionMode { + self.mode + } + + async fn connect_market_data(&self) -> Result<(), ExchangeError> { + Ok(()) + } + + async fn place_order(&self, req: OrderRequest) -> Result { + Self::validate_order(&req)?; + + let now = chrono::Utc::now().timestamp_millis(); + let venue_order_id = format!("cb-{}", Uuid::new_v4()); + let status = match self.mode { + ExecutionMode::Paper => OrderStatus::Filled, + ExecutionMode::Live => OrderStatus::Accepted, + }; + + let summary = OrderSummary { + venue_order_id: venue_order_id.clone(), + client_order_id: req.client_order_id.clone(), + strategy_id: req.strategy_id.clone(), + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + side: req.side, + order_type: req.order_type, + quantity: req.quantity, + filled_quantity: if status == OrderStatus::Filled { + req.quantity + } else { + 0.0 + }, + avg_fill_price: req.limit_price, + status, + created_at_ms: now, + updated_at_ms: now, + message: None, + }; + + let mut state = self.state.write().await; + if status == OrderStatus::Filled { + let key = req.instrument.key(); + let signed = match req.side { + OrderSide::Buy => req.quantity, + OrderSide::Sell => -req.quantity, + }; + let entry = state + .positions + .entry(key) + .or_insert_with(|| PositionSnapshot { + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + quantity: 0.0, + avg_price: req.limit_price.unwrap_or(0.0), + mark_price: req.limit_price, + unrealized_pnl: Some(0.0), + }); + entry.quantity += signed; + if let Some(price) = req.limit_price { + entry.avg_price = price; + entry.mark_price = Some(price); + } + } + state.orders.insert(venue_order_id.clone(), summary); + + Ok(OrderAck { + venue_order_id, + accepted: true, + status, + reason: None, + }) + } + + async fn cancel_order(&self, venue_order_id: &str) -> Result<(), ExchangeError> { + let mut state = self.state.write().await; + let Some(order) = state.orders.get_mut(venue_order_id) else { + return Err(ExchangeError { + code: "order_not_found".to_string(), + message: format!("unknown order id: {venue_order_id}"), + retriable: false, + }); + }; + order.status = OrderStatus::Canceled; + order.updated_at_ms = chrono::Utc::now().timestamp_millis(); + order.message = Some("canceled by operator".to_string()); + Ok(()) + } + + async fn sync_positions(&self) -> Result, ExchangeError> { + let state = self.state.read().await; + Ok(state.positions.values().cloned().collect()) + } + + async fn sync_balances(&self) -> Result, ExchangeError> { + let (total, available) = match self.mode { + ExecutionMode::Paper => (50_000.0, 50_000.0), + ExecutionMode::Live => (0.0, 0.0), + }; + Ok(vec![BalanceSnapshot { + venue_id: "coinbase_spot".to_string(), + asset: "USD".to_string(), + total, + available, + }]) + } + + async fn list_orders(&self) -> Result, ExchangeError> { + let state = self.state.read().await; + Ok(state.orders.values().cloned().collect()) + } + + fn health(&self) -> VenueHealth { + VenueHealth { + venue_id: "coinbase_spot".to_string(), + healthy: true, + connected_market_data: true, + connected_trading: true, + message: Some("adapter active".to_string()), + } + } +} + +pub fn default_spot_instrument(symbol: &str) -> InstrumentId { + InstrumentId { + venue_id: "coinbase_spot".to_string(), + symbol: symbol.to_string(), + asset_class: AssetClass::Crypto, + market_type: MarketType::Spot, + expiry_ts_ms: None, + strike: None, + option_type: None, + metadata: Default::default(), + } +} diff --git a/crates/exchange_core/Cargo.toml b/crates/exchange_core/Cargo.toml index 7d96e991..c8e7095d 100644 --- a/crates/exchange_core/Cargo.toml +++ b/crates/exchange_core/Cargo.toml @@ -4,4 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = "0.1" serde = { version = "1.0", features = ["derive"] } +trading_domain = { path = "../trading_domain" } diff --git a/crates/exchange_core/src/lib.rs b/crates/exchange_core/src/lib.rs index 25d50e9e..c71017ad 100644 --- a/crates/exchange_core/src/lib.rs +++ b/crates/exchange_core/src/lib.rs @@ -1,73 +1,13 @@ //! Exchange abstraction layer for normalized, venue-agnostic execution. +use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use trading_domain::{ + BalanceSnapshot, ExecutionMode, OrderAck, OrderRequest, OrderSummary, PositionSnapshot, + VenueCapability, VenueHealth, +}; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum OrderSide { - Buy, - Sell, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum OrderType { - Limit, - Market, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum TimeInForce { - Gtc, - Ioc, - Fok, - Day, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NormalizedOrderRequest { - pub venue: String, - pub symbol: String, - pub strategy_id: String, - pub client_order_id: String, - pub side: OrderSide, - pub order_type: OrderType, - pub qty: f64, - pub limit_price: Option, - pub tif: Option, - pub post_only: bool, - pub reduce_only: bool, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OrderAck { - pub venue_order_id: String, - pub accepted: bool, - pub reason: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PositionSnapshot { - pub venue: String, - pub symbol: String, - pub qty: f64, - pub avg_price: f64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BalanceSnapshot { - pub venue: String, - pub asset: String, - pub total: f64, - pub available: f64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExchangeHealth { - pub venue: String, - pub healthy: bool, - pub connected_market_data: bool, - pub connected_trading: bool, - pub message: Option, -} +pub type NormalizedOrderRequest = OrderRequest; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExchangeError { @@ -76,18 +16,25 @@ pub struct ExchangeError { pub retriable: bool, } +#[async_trait] pub trait ExchangeAdapter: Send + Sync { fn venue(&self) -> &'static str; - fn connect_market_data(&self) -> Result<(), ExchangeError>; + fn capabilities(&self) -> Vec; + + fn execution_mode(&self) -> ExecutionMode; + + async fn connect_market_data(&self) -> Result<(), ExchangeError>; + + async fn place_order(&self, req: NormalizedOrderRequest) -> Result; - fn place_order(&self, req: NormalizedOrderRequest) -> Result; + async fn cancel_order(&self, venue_order_id: &str) -> Result<(), ExchangeError>; - fn cancel_order(&self, venue_order_id: &str) -> Result<(), ExchangeError>; + async fn sync_positions(&self) -> Result, ExchangeError>; - fn sync_positions(&self) -> Result, ExchangeError>; + async fn sync_balances(&self) -> Result, ExchangeError>; - fn sync_balances(&self) -> Result, ExchangeError>; + async fn list_orders(&self) -> Result, ExchangeError>; - fn health(&self) -> ExchangeHealth; + fn health(&self) -> VenueHealth; } diff --git a/crates/exchange_derivatives_paper/Cargo.toml b/crates/exchange_derivatives_paper/Cargo.toml new file mode 100644 index 00000000..8ee88bea --- /dev/null +++ b/crates/exchange_derivatives_paper/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "exchange_derivatives_paper" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +exchange_core = { path = "../exchange_core" } +trading_domain = { path = "../trading_domain" } +tokio = { version = "1", features = ["sync"] } +uuid = { version = "1", features = ["v4"] } +chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/crates/exchange_derivatives_paper/src/lib.rs b/crates/exchange_derivatives_paper/src/lib.rs new file mode 100644 index 00000000..d03e11fc --- /dev/null +++ b/crates/exchange_derivatives_paper/src/lib.rs @@ -0,0 +1,208 @@ +//! Paper-only derivatives adapter used for compliant rollout fallback. + +use std::collections::HashMap; + +use async_trait::async_trait; +use exchange_core::{ExchangeAdapter, ExchangeError}; +use tokio::sync::RwLock; +use trading_domain::{ + AssetClass, BalanceSnapshot, ExecutionMode, InstrumentId, MarketType, OrderAck, OrderRequest, + OrderSide, OrderStatus, OrderSummary, PositionSnapshot, VenueCapability, VenueHealth, +}; +use uuid::Uuid; + +#[derive(Debug, Default)] +struct AdapterState { + positions: HashMap, + orders: HashMap, +} + +#[derive(Debug)] +pub struct DerivativesPaperAdapter { + state: RwLock, +} + +impl Default for DerivativesPaperAdapter { + fn default() -> Self { + Self { + state: RwLock::new(AdapterState::default()), + } + } +} + +impl DerivativesPaperAdapter { + fn validate_order(req: &OrderRequest) -> Result<(), ExchangeError> { + if !matches!( + req.instrument.market_type, + MarketType::Perpetual | MarketType::Futures | MarketType::Option + ) { + return Err(ExchangeError { + code: "unsupported_market_type".to_string(), + message: "paper derivatives adapter only accepts derivative market types" + .to_string(), + retriable: false, + }); + } + if req.quantity <= 0.0 { + return Err(ExchangeError { + code: "invalid_quantity".to_string(), + message: "order quantity must be > 0".to_string(), + retriable: false, + }); + } + Ok(()) + } +} + +#[async_trait] +impl ExchangeAdapter for DerivativesPaperAdapter { + fn venue(&self) -> &'static str { + "derivatives_paper" + } + + fn capabilities(&self) -> Vec { + vec![ + VenueCapability { + market_type: MarketType::Perpetual, + supports_live: false, + supports_paper: true, + supports_post_only: true, + supports_reduce_only: true, + }, + VenueCapability { + market_type: MarketType::Futures, + supports_live: false, + supports_paper: true, + supports_post_only: true, + supports_reduce_only: true, + }, + VenueCapability { + market_type: MarketType::Option, + supports_live: false, + supports_paper: true, + supports_post_only: true, + supports_reduce_only: true, + }, + ] + } + + fn execution_mode(&self) -> ExecutionMode { + ExecutionMode::Paper + } + + async fn connect_market_data(&self) -> Result<(), ExchangeError> { + Ok(()) + } + + async fn place_order(&self, req: OrderRequest) -> Result { + Self::validate_order(&req)?; + + let now = chrono::Utc::now().timestamp_millis(); + let venue_order_id = format!("deriv-paper-{}", Uuid::new_v4()); + + let summary = OrderSummary { + venue_order_id: venue_order_id.clone(), + client_order_id: req.client_order_id.clone(), + strategy_id: req.strategy_id.clone(), + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + side: req.side, + order_type: req.order_type, + quantity: req.quantity, + filled_quantity: req.quantity, + avg_fill_price: req.limit_price, + status: OrderStatus::Filled, + created_at_ms: now, + updated_at_ms: now, + message: Some("paper fill".to_string()), + }; + + let mut state = self.state.write().await; + let key = req.instrument.key(); + let signed = match req.side { + OrderSide::Buy => req.quantity, + OrderSide::Sell => -req.quantity, + }; + let entry = state + .positions + .entry(key) + .or_insert_with(|| PositionSnapshot { + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + quantity: 0.0, + avg_price: req.limit_price.unwrap_or(0.0), + mark_price: req.limit_price, + unrealized_pnl: Some(0.0), + }); + entry.quantity += signed; + if let Some(price) = req.limit_price { + entry.avg_price = price; + entry.mark_price = Some(price); + } + + state.orders.insert(venue_order_id.clone(), summary); + + Ok(OrderAck { + venue_order_id, + accepted: true, + status: OrderStatus::Filled, + reason: None, + }) + } + + async fn cancel_order(&self, venue_order_id: &str) -> Result<(), ExchangeError> { + let mut state = self.state.write().await; + let Some(order) = state.orders.get_mut(venue_order_id) else { + return Err(ExchangeError { + code: "order_not_found".to_string(), + message: format!("unknown order id: {venue_order_id}"), + retriable: false, + }); + }; + order.status = OrderStatus::Canceled; + order.updated_at_ms = chrono::Utc::now().timestamp_millis(); + Ok(()) + } + + async fn sync_positions(&self) -> Result, ExchangeError> { + let state = self.state.read().await; + Ok(state.positions.values().cloned().collect()) + } + + async fn sync_balances(&self) -> Result, ExchangeError> { + Ok(vec![BalanceSnapshot { + venue_id: "derivatives_paper".to_string(), + asset: "USD".to_string(), + total: 20_000.0, + available: 20_000.0, + }]) + } + + async fn list_orders(&self) -> Result, ExchangeError> { + let state = self.state.read().await; + Ok(state.orders.values().cloned().collect()) + } + + fn health(&self) -> VenueHealth { + VenueHealth { + venue_id: "derivatives_paper".to_string(), + healthy: true, + connected_market_data: true, + connected_trading: true, + message: Some("paper-only derivatives venue".to_string()), + } + } +} + +pub fn default_perp_instrument(symbol: &str) -> InstrumentId { + InstrumentId { + venue_id: "derivatives_paper".to_string(), + symbol: symbol.to_string(), + asset_class: AssetClass::Derivative, + market_type: MarketType::Perpetual, + expiry_ts_ms: None, + strike: None, + option_type: None, + metadata: Default::default(), + } +} diff --git a/crates/exchange_kalshi/Cargo.toml b/crates/exchange_kalshi/Cargo.toml new file mode 100644 index 00000000..25be3827 --- /dev/null +++ b/crates/exchange_kalshi/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "exchange_kalshi" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +exchange_core = { path = "../exchange_core" } +trading_domain = { path = "../trading_domain" } +tokio = { version = "1", features = ["sync"] } +uuid = { version = "1", features = ["v4"] } +chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/crates/exchange_kalshi/src/lib.rs b/crates/exchange_kalshi/src/lib.rs new file mode 100644 index 00000000..fa19e44b --- /dev/null +++ b/crates/exchange_kalshi/src/lib.rs @@ -0,0 +1,206 @@ +//! Kalshi exchange adapter for the unified trading runtime. + +use std::collections::HashMap; + +use async_trait::async_trait; +use exchange_core::{ExchangeAdapter, ExchangeError}; +use tokio::sync::RwLock; +use trading_domain::{ + BalanceSnapshot, ExecutionMode, InstrumentId, MarketType, OrderAck, OrderRequest, OrderSide, + OrderStatus, OrderSummary, PositionSnapshot, VenueCapability, VenueHealth, +}; +use uuid::Uuid; + +#[derive(Debug, Default)] +struct AdapterState { + positions: HashMap, + orders: HashMap, +} + +#[derive(Debug)] +pub struct KalshiAdapter { + mode: ExecutionMode, + state: RwLock, +} + +impl KalshiAdapter { + pub fn new(mode: ExecutionMode) -> Self { + Self { + mode, + state: RwLock::new(AdapterState::default()), + } + } + + fn validate_order(req: &OrderRequest) -> Result<(), ExchangeError> { + if req.instrument.market_type != MarketType::Binary { + return Err(ExchangeError { + code: "unsupported_market_type".to_string(), + message: "Kalshi adapter only accepts binary instruments".to_string(), + retriable: false, + }); + } + if req.quantity <= 0.0 { + return Err(ExchangeError { + code: "invalid_quantity".to_string(), + message: "order quantity must be > 0".to_string(), + retriable: false, + }); + } + Ok(()) + } + + fn signed_qty(side: OrderSide, qty: f64) -> f64 { + match side { + OrderSide::Buy => qty, + OrderSide::Sell => -qty, + } + } +} + +#[async_trait] +impl ExchangeAdapter for KalshiAdapter { + fn venue(&self) -> &'static str { + "kalshi" + } + + fn capabilities(&self) -> Vec { + vec![VenueCapability { + market_type: MarketType::Binary, + supports_live: true, + supports_paper: true, + supports_post_only: false, + supports_reduce_only: true, + }] + } + + fn execution_mode(&self) -> ExecutionMode { + self.mode + } + + async fn connect_market_data(&self) -> Result<(), ExchangeError> { + Ok(()) + } + + async fn place_order(&self, req: OrderRequest) -> Result { + Self::validate_order(&req)?; + + let now = chrono::Utc::now().timestamp_millis(); + let venue_order_id = format!("kalshi-{}", Uuid::new_v4()); + let status = match self.mode { + ExecutionMode::Paper => OrderStatus::Filled, + ExecutionMode::Live => OrderStatus::Accepted, + }; + + let summary = OrderSummary { + venue_order_id: venue_order_id.clone(), + client_order_id: req.client_order_id.clone(), + strategy_id: req.strategy_id.clone(), + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + side: req.side, + order_type: req.order_type, + quantity: req.quantity, + filled_quantity: if status == OrderStatus::Filled { + req.quantity + } else { + 0.0 + }, + avg_fill_price: req.limit_price, + status, + created_at_ms: now, + updated_at_ms: now, + message: None, + }; + + let mut state = self.state.write().await; + if status == OrderStatus::Filled { + let key = req.instrument.key(); + let delta = Self::signed_qty(req.side, req.quantity); + let entry = state + .positions + .entry(key) + .or_insert_with(|| PositionSnapshot { + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + quantity: 0.0, + avg_price: req.limit_price.unwrap_or(0.0), + mark_price: req.limit_price, + unrealized_pnl: Some(0.0), + }); + entry.quantity += delta; + if let Some(price) = req.limit_price { + entry.avg_price = price; + entry.mark_price = Some(price); + } + } + state.orders.insert(venue_order_id.clone(), summary); + + Ok(OrderAck { + venue_order_id, + accepted: true, + status, + reason: None, + }) + } + + async fn cancel_order(&self, venue_order_id: &str) -> Result<(), ExchangeError> { + let mut state = self.state.write().await; + let Some(order) = state.orders.get_mut(venue_order_id) else { + return Err(ExchangeError { + code: "order_not_found".to_string(), + message: format!("unknown order id: {venue_order_id}"), + retriable: false, + }); + }; + order.status = OrderStatus::Canceled; + order.updated_at_ms = chrono::Utc::now().timestamp_millis(); + order.message = Some("canceled by operator".to_string()); + Ok(()) + } + + async fn sync_positions(&self) -> Result, ExchangeError> { + let state = self.state.read().await; + Ok(state.positions.values().cloned().collect()) + } + + async fn sync_balances(&self) -> Result, ExchangeError> { + let (total, available) = match self.mode { + ExecutionMode::Paper => (100_000.0, 100_000.0), + ExecutionMode::Live => (0.0, 0.0), + }; + Ok(vec![BalanceSnapshot { + venue_id: "kalshi".to_string(), + asset: "USD".to_string(), + total, + available, + }]) + } + + async fn list_orders(&self) -> Result, ExchangeError> { + let state = self.state.read().await; + Ok(state.orders.values().cloned().collect()) + } + + fn health(&self) -> VenueHealth { + VenueHealth { + venue_id: "kalshi".to_string(), + healthy: true, + connected_market_data: true, + connected_trading: true, + message: Some("adapter active".to_string()), + } + } +} + +pub fn default_kalshi_prediction_instrument(symbol: &str) -> InstrumentId { + InstrumentId { + venue_id: "kalshi".to_string(), + symbol: symbol.to_string(), + asset_class: trading_domain::AssetClass::Prediction, + market_type: MarketType::Binary, + expiry_ts_ms: None, + strike: None, + option_type: None, + metadata: Default::default(), + } +} diff --git a/crates/risk_core/src/lib.rs b/crates/risk_core/src/lib.rs index cd687cfa..b872a7fb 100644 --- a/crates/risk_core/src/lib.rs +++ b/crates/risk_core/src/lib.rs @@ -7,7 +7,10 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HardSafetyPolicy { pub max_total_notional_cents: i64, + pub max_notional_per_venue_cents: i64, pub max_strategy_canary_notional_cents: i64, + pub max_open_positions: u32, + pub max_leverage_x: f64, pub max_orders_per_minute: u32, pub max_drawdown_cents: i64, pub forced_cooldown_secs: u64, @@ -17,7 +20,10 @@ impl Default for HardSafetyPolicy { fn default() -> Self { Self { max_total_notional_cents: 50_000, + max_notional_per_venue_cents: 25_000, max_strategy_canary_notional_cents: 2_500, + max_open_positions: 25, + max_leverage_x: 2.0, max_orders_per_minute: 120, max_drawdown_cents: 5_000, forced_cooldown_secs: 600, @@ -28,7 +34,10 @@ impl Default for HardSafetyPolicy { #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct RiskSnapshot { pub total_notional_cents: i64, + pub venue_notional_cents: HashMap, pub drawdown_cents: i64, + pub open_positions: u32, + pub leverage_x: f64, pub orders_last_minute: u32, pub kill_switch_engaged: bool, pub paused: bool, @@ -82,6 +91,16 @@ impl HardSafetyCage { strategy_id: &str, requested_notional_cents: i64, snapshot: &RiskSnapshot, + ) -> RiskDecision { + self.evaluate_order_with_context(strategy_id, None, requested_notional_cents, snapshot) + } + + pub fn evaluate_order_with_context( + &self, + strategy_id: &str, + venue_id: Option<&str>, + requested_notional_cents: i64, + snapshot: &RiskSnapshot, ) -> RiskDecision { if snapshot.kill_switch_engaged { return RiskDecision::Deny { @@ -114,6 +133,22 @@ impl HardSafetyCage { ), }; } + if snapshot.open_positions >= self.policy.max_open_positions { + return RiskDecision::Deny { + reason: format!( + "open positions breached: {} >= {}", + snapshot.open_positions, self.policy.max_open_positions + ), + }; + } + if snapshot.leverage_x > self.policy.max_leverage_x { + return RiskDecision::Deny { + reason: format!( + "leverage breached: {:.3} > {:.3}", + snapshot.leverage_x, self.policy.max_leverage_x + ), + }; + } let projected_total = snapshot .total_notional_cents @@ -126,6 +161,22 @@ impl HardSafetyCage { ), }; } + if let Some(venue_id) = venue_id { + let venue_total = snapshot + .venue_notional_cents + .get(venue_id) + .copied() + .unwrap_or(0) + .saturating_add(requested_notional_cents); + if venue_total > self.policy.max_notional_per_venue_cents { + return RiskDecision::Deny { + reason: format!( + "venue notional breached: {} > {} ({})", + venue_total, self.policy.max_notional_per_venue_cents, venue_id + ), + }; + } + } let current_strategy = snapshot .strategy_canary_notional @@ -252,4 +303,67 @@ mod tests { let result = cage.evaluate_promotion(&req, &snapshot); assert_eq!(result, RiskDecision::Allow); } + + #[test] + fn denies_order_when_venue_notional_limit_breached() { + let mut policy = HardSafetyPolicy::default(); + policy.max_notional_per_venue_cents = 1_000; + let cage = HardSafetyCage::new(policy); + + let mut snapshot = RiskSnapshot::default(); + snapshot + .venue_notional_cents + .insert("coinbase_spot".to_string(), 950); + + let decision = cage.evaluate_order_with_context( + "crypto.momentum_trend", + Some("coinbase_spot"), + 100, + &snapshot, + ); + assert_eq!( + decision, + RiskDecision::Deny { + reason: "venue notional breached: 1050 > 1000 (coinbase_spot)".to_string() + } + ); + } + + #[test] + fn denies_order_when_open_positions_limit_breached() { + let mut policy = HardSafetyPolicy::default(); + policy.max_open_positions = 2; + let cage = HardSafetyCage::new(policy); + + let snapshot = RiskSnapshot { + open_positions: 2, + ..RiskSnapshot::default() + }; + let decision = cage.evaluate_order("strategy.a", 10, &snapshot); + assert_eq!( + decision, + RiskDecision::Deny { + reason: "open positions breached: 2 >= 2".to_string() + } + ); + } + + #[test] + fn denies_order_when_leverage_limit_breached() { + let mut policy = HardSafetyPolicy::default(); + policy.max_leverage_x = 1.5; + let cage = HardSafetyCage::new(policy); + + let snapshot = RiskSnapshot { + leverage_x: 1.6, + ..RiskSnapshot::default() + }; + let decision = cage.evaluate_order("strategy.a", 10, &snapshot); + assert_eq!( + decision, + RiskDecision::Deny { + reason: "leverage breached: 1.600 > 1.500".to_string() + } + ); + } } diff --git a/crates/strategy_crypto_momentum/Cargo.toml b/crates/strategy_crypto_momentum/Cargo.toml new file mode 100644 index 00000000..a6279703 --- /dev/null +++ b/crates/strategy_crypto_momentum/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "strategy_crypto_momentum" +version = "0.1.0" +edition = "2021" + +[dependencies] +exchange_core = { path = "../exchange_core" } +strategy_core = { path = "../strategy_core" } +trading_domain = { path = "../trading_domain" } diff --git a/crates/strategy_crypto_momentum/src/lib.rs b/crates/strategy_crypto_momentum/src/lib.rs new file mode 100644 index 00000000..0305241f --- /dev/null +++ b/crates/strategy_crypto_momentum/src/lib.rs @@ -0,0 +1,83 @@ +//! Crypto momentum/trend strategy plugin. + +use std::collections::BTreeMap; + +use exchange_core::NormalizedOrderRequest; +use strategy_core::{ + MarketRegime, RegimeContext, SignalIntent, StrategyError, StrategyFamily, StrategyPlugin, +}; +use trading_domain::{AssetClass, InstrumentId, MarketType, OrderSide, OrderType}; + +#[derive(Debug, Default)] +pub struct CryptoMomentumStrategy; + +impl StrategyPlugin for CryptoMomentumStrategy { + fn id(&self) -> &'static str { + "crypto.momentum_trend" + } + + fn family(&self) -> StrategyFamily { + StrategyFamily::Momentum + } + + fn supports_venue(&self, venue: &str) -> bool { + venue.eq_ignore_ascii_case("coinbase_spot") + || venue.eq_ignore_ascii_case("derivatives_paper") + } + + fn evaluate(&self, ctx: &RegimeContext) -> Result, StrategyError> { + if !self.supports_venue(&ctx.venue) { + return Ok(None); + } + if ctx.regime != MarketRegime::Trending { + return Ok(None); + } + + let side = if ctx.momentum_lookback_return >= 0.0 { + OrderSide::Buy + } else { + OrderSide::Sell + }; + + let market_type = if ctx.venue.eq_ignore_ascii_case("derivatives_paper") { + MarketType::Perpetual + } else { + MarketType::Spot + }; + + let order = NormalizedOrderRequest { + venue_id: ctx.venue.clone(), + strategy_id: self.id().to_string(), + client_order_id: format!("mom-{}", ctx.ts_ms), + instrument: InstrumentId { + venue_id: ctx.venue.clone(), + symbol: ctx.symbol.clone(), + asset_class: AssetClass::Crypto, + market_type, + expiry_ts_ms: None, + strike: None, + option_type: None, + metadata: BTreeMap::new(), + }, + side, + order_type: OrderType::Market, + quantity: 0.001, + limit_price: None, + tif: None, + post_only: false, + reduce_only: side == OrderSide::Sell, + metadata: BTreeMap::new(), + }; + + Ok(Some(SignalIntent { + strategy_id: self.id().to_string(), + family: self.family(), + confidence: 0.70, + horizon_ms: 30_000, + expected_slippage_bps: 25.0, + requested_risk_budget_cents: 200, + order, + rationale: "momentum/trend follow-on signal".to_string(), + })) + } +} diff --git a/crates/strategy_kalshi_arbitrage/Cargo.toml b/crates/strategy_kalshi_arbitrage/Cargo.toml new file mode 100644 index 00000000..2018da41 --- /dev/null +++ b/crates/strategy_kalshi_arbitrage/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "strategy_kalshi_arbitrage" +version = "0.1.0" +edition = "2021" + +[dependencies] +exchange_core = { path = "../exchange_core" } +strategy_core = { path = "../strategy_core" } +trading_domain = { path = "../trading_domain" } diff --git a/crates/strategy_kalshi_arbitrage/src/lib.rs b/crates/strategy_kalshi_arbitrage/src/lib.rs new file mode 100644 index 00000000..b22b6f53 --- /dev/null +++ b/crates/strategy_kalshi_arbitrage/src/lib.rs @@ -0,0 +1,82 @@ +//! Kalshi arbitrage strategy plugin. + +use std::collections::BTreeMap; + +use exchange_core::NormalizedOrderRequest; +use strategy_core::{ + MarketRegime, RegimeContext, SignalIntent, StrategyError, StrategyFamily, StrategyPlugin, +}; +use trading_domain::{AssetClass, InstrumentId, MarketType, OrderSide, OrderType, TimeInForce}; + +#[derive(Debug, Default)] +pub struct KalshiArbitrageStrategy; + +impl StrategyPlugin for KalshiArbitrageStrategy { + fn id(&self) -> &'static str { + "kalshi.arbitrage" + } + + fn family(&self) -> StrategyFamily { + StrategyFamily::Arbitrage + } + + fn supports_venue(&self, venue: &str) -> bool { + venue.eq_ignore_ascii_case("kalshi") + } + + fn evaluate(&self, ctx: &RegimeContext) -> Result, StrategyError> { + if !self.supports_venue(&ctx.venue) { + return Ok(None); + } + if !matches!( + ctx.regime, + MarketRegime::RangeBound | MarketRegime::LowVolatility + ) { + return Ok(None); + } + if ctx.spread_bps > 80.0 { + return Ok(None); + } + + let side = if ctx.order_book_imbalance >= 0.0 { + OrderSide::Buy + } else { + OrderSide::Sell + }; + + let order = NormalizedOrderRequest { + venue_id: "kalshi".to_string(), + strategy_id: self.id().to_string(), + client_order_id: format!("arb-{}", ctx.ts_ms), + instrument: InstrumentId { + venue_id: "kalshi".to_string(), + symbol: ctx.symbol.clone(), + asset_class: AssetClass::Prediction, + market_type: MarketType::Binary, + expiry_ts_ms: None, + strike: None, + option_type: None, + metadata: BTreeMap::new(), + }, + side, + order_type: OrderType::Limit, + quantity: 1.0, + limit_price: Some(0.50), + tif: Some(TimeInForce::Fok), + post_only: false, + reduce_only: side == OrderSide::Sell, + metadata: BTreeMap::new(), + }; + + Ok(Some(SignalIntent { + strategy_id: self.id().to_string(), + family: self.family(), + confidence: 0.78, + horizon_ms: 20_000, + expected_slippage_bps: 8.0, + requested_risk_budget_cents: 120, + order, + rationale: "short-horizon kalshi set-arb opportunity".to_string(), + })) + } +} diff --git a/crates/strategy_kalshi_weather/Cargo.toml b/crates/strategy_kalshi_weather/Cargo.toml new file mode 100644 index 00000000..986bed29 --- /dev/null +++ b/crates/strategy_kalshi_weather/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "strategy_kalshi_weather" +version = "0.1.0" +edition = "2021" + +[dependencies] +exchange_core = { path = "../exchange_core" } +strategy_core = { path = "../strategy_core" } +trading_domain = { path = "../trading_domain" } diff --git a/crates/strategy_kalshi_weather/src/lib.rs b/crates/strategy_kalshi_weather/src/lib.rs new file mode 100644 index 00000000..f35bbbac --- /dev/null +++ b/crates/strategy_kalshi_weather/src/lib.rs @@ -0,0 +1,73 @@ +//! Kalshi weather strategy plugin. + +use std::collections::BTreeMap; + +use exchange_core::NormalizedOrderRequest; +use strategy_core::{ + MarketRegime, RegimeContext, SignalIntent, StrategyError, StrategyFamily, StrategyPlugin, +}; +use trading_domain::{AssetClass, InstrumentId, MarketType, OrderSide, OrderType, TimeInForce}; + +#[derive(Debug, Default)] +pub struct KalshiWeatherStrategy; + +impl StrategyPlugin for KalshiWeatherStrategy { + fn id(&self) -> &'static str { + "kalshi.market_making" + } + + fn family(&self) -> StrategyFamily { + StrategyFamily::MarketMaking + } + + fn supports_venue(&self, venue: &str) -> bool { + venue.eq_ignore_ascii_case("kalshi") + } + + fn evaluate(&self, ctx: &RegimeContext) -> Result, StrategyError> { + if !self.supports_venue(&ctx.venue) { + return Ok(None); + } + if ctx.regime != MarketRegime::EventDriven { + return Ok(None); + } + if ctx.spread_bps > 120.0 { + return Ok(None); + } + + let order = NormalizedOrderRequest { + venue_id: "kalshi".to_string(), + strategy_id: self.id().to_string(), + client_order_id: format!("weather-{}", ctx.ts_ms), + instrument: InstrumentId { + venue_id: "kalshi".to_string(), + symbol: ctx.symbol.clone(), + asset_class: AssetClass::Prediction, + market_type: MarketType::Binary, + expiry_ts_ms: None, + strike: None, + option_type: None, + metadata: BTreeMap::new(), + }, + side: OrderSide::Buy, + order_type: OrderType::Limit, + quantity: 1.0, + limit_price: Some(0.45), + tif: Some(TimeInForce::Ioc), + post_only: false, + reduce_only: false, + metadata: BTreeMap::new(), + }; + + Ok(Some(SignalIntent { + strategy_id: self.id().to_string(), + family: self.family(), + confidence: 0.72, + horizon_ms: 60_000, + expected_slippage_bps: 15.0, + requested_risk_budget_cents: 150, + order, + rationale: "event-driven kalshi weather entry".to_string(), + })) + } +} diff --git a/crates/trading_daemon/Cargo.toml b/crates/trading_daemon/Cargo.toml index 3f27fbb2..8588c787 100644 --- a/crates/trading_daemon/Cargo.toml +++ b/crates/trading_daemon/Cargo.toml @@ -7,6 +7,11 @@ edition = "2021" trading_protocol = { path = "../trading_protocol" } risk_core = { path = "../risk_core" } strategy_core = { path = "../strategy_core" } +exchange_core = { path = "../exchange_core" } +trading_domain = { path = "../trading_domain" } +exchange_kalshi = { path = "../exchange_kalshi" } +exchange_coinbase_spot = { path = "../exchange_coinbase_spot" } +exchange_derivatives_paper = { path = "../exchange_derivatives_paper" } tokio = { version = "1.35", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -17,3 +22,4 @@ futures = "0.3" fs2 = "0.4" # For file locking bytes = "1.5" tokio-util = { version = "0.7.10", features = ["codec"] } +rusqlite = { version = "0.31", features = ["bundled"] } diff --git a/crates/trading_daemon/Dockerfile b/crates/trading_daemon/Dockerfile index 40eceee4..990242af 100644 --- a/crates/trading_daemon/Dockerfile +++ b/crates/trading_daemon/Dockerfile @@ -6,11 +6,18 @@ RUN apt-get update && apt-get install -y --no-install-recommends pkg-config libs # Copy only bridge workspace crates. COPY crates/Cargo.toml crates/Cargo.lock ./crates/ +COPY crates/trading_domain ./crates/trading_domain COPY crates/trading_protocol ./crates/trading_protocol COPY crates/trading_daemon ./crates/trading_daemon COPY crates/tradingctl ./crates/tradingctl COPY crates/exchange_core ./crates/exchange_core +COPY crates/exchange_kalshi ./crates/exchange_kalshi +COPY crates/exchange_coinbase_spot ./crates/exchange_coinbase_spot +COPY crates/exchange_derivatives_paper ./crates/exchange_derivatives_paper COPY crates/strategy_core ./crates/strategy_core +COPY crates/strategy_kalshi_weather ./crates/strategy_kalshi_weather +COPY crates/strategy_kalshi_arbitrage ./crates/strategy_kalshi_arbitrage +COPY crates/strategy_crypto_momentum ./crates/strategy_crypto_momentum COPY crates/risk_core ./crates/risk_core WORKDIR /app/crates diff --git a/crates/trading_daemon/migrations/0001_runtime.sql b/crates/trading_daemon/migrations/0001_runtime.sql new file mode 100644 index 00000000..aa4ed5a4 --- /dev/null +++ b/crates/trading_daemon/migrations/0001_runtime.sql @@ -0,0 +1,49 @@ +PRAGMA journal_mode = WAL; + +CREATE TABLE IF NOT EXISTS venues ( + venue_id TEXT PRIMARY KEY, + enabled INTEGER NOT NULL, + market_types TEXT NOT NULL, + paper_only INTEGER NOT NULL, + live_enabled INTEGER NOT NULL, + message TEXT, + updated_at_ms INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS execution_modes ( + mode_key TEXT PRIMARY KEY, + venue_id TEXT NOT NULL, + market_type TEXT NOT NULL, + mode TEXT NOT NULL, + updated_at_ms INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS orders ( + venue_order_id TEXT PRIMARY KEY, + payload_json TEXT NOT NULL, + updated_at_ms INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS fills ( + fill_id TEXT PRIMARY KEY, + payload_json TEXT NOT NULL, + updated_at_ms INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS strategies ( + strategy_id TEXT PRIMARY KEY, + payload_json TEXT NOT NULL, + updated_at_ms INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS risk_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + payload_json TEXT NOT NULL, + updated_at_ms INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS events_journal ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts_ms INTEGER NOT NULL, + event_json TEXT NOT NULL +); diff --git a/crates/trading_daemon/src/main.rs b/crates/trading_daemon/src/main.rs index a6c23501..7edcd661 100644 --- a/crates/trading_daemon/src/main.rs +++ b/crates/trading_daemon/src/main.rs @@ -1,12 +1,17 @@ use anyhow::{Context, Result}; use bytes::Bytes; +use exchange_coinbase_spot::CoinbaseSpotAdapter; +use exchange_core::ExchangeAdapter; +use exchange_derivatives_paper::DerivativesPaperAdapter; +use exchange_kalshi::KalshiAdapter; use fs2::FileExt; use futures::{SinkExt, StreamExt}; use risk_core::{HardSafetyCage, HardSafetyPolicy, PromotionRequest, RiskDecision, RiskSnapshot}; +use rusqlite::{params, Connection}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fs::File; use std::io::ErrorKind; #[cfg(unix)] @@ -20,17 +25,26 @@ use tokio::signal; use tokio::sync::Mutex; use tokio_util::codec::Framed; use tracing::{error, info, warn}; +use trading_domain::{ + AssetClass, ExecutionMode, InstrumentId, MarketType, OptionType, OrderRequest, OrderSide, + OrderStatus, OrderSummary, OrderType, TimeInForce, +}; use trading_protocol::{ create_codec, CandidatePromotePayload, CandidateUploadPayload, ControlCommand, EngineCommand, - EngineStatePayload, Envelope, Event, RequestKind, RiskCommand, RiskLimitsPayload, - RiskOverridePayload, RiskStatePayload, StrategyCommand, StrategySummaryPayload, + EngineStatePayload, Envelope, Event, ExecutionModeCommand, ExecutionModePayload, + InstrumentRefPayload, OrderCancelPayload, OrderCommand, OrderListItemPayload, + OrderSubmitPayload, PortfolioBalancePayload, PortfolioCommand, PortfolioPositionPayload, + RequestKind, RiskCommand, RiskLimitsPayload, RiskOverridePayload, RiskStatePayload, + StrategyCommand, StrategySummaryPayload, VenueCommand, VenueSummaryPayload, VenueTogglePayload, DEFAULT_SOCKET_PATH, }; const DEFAULT_LOCK_PATH: &str = "/var/run/openclaw/trading.lock"; const DEFAULT_STATE_PATH: &str = "/var/run/openclaw/trading-state.json"; +const DEFAULT_DB_PATH: &str = "/var/run/openclaw/trading-state.sqlite3"; const DEFAULT_CANDIDATE_TTL_MS: i64 = 0; const MAX_RECENT_EVENTS: usize = 128; +const SQLITE_MIGRATION_0001: &str = include_str!("../migrations/0001_runtime.sql"); #[derive(Debug, Clone, Serialize, Deserialize)] struct StrategyCandidate { @@ -83,7 +97,16 @@ impl StrategyState { } } -#[derive(Debug, Default)] +#[derive(Debug, Clone, Serialize, Deserialize)] +struct VenueState { + id: String, + enabled: bool, + market_types: Vec, + paper_only: bool, + live_enabled: bool, + message: Option, +} + struct EngineState { running: bool, paused: bool, @@ -96,7 +119,11 @@ struct EngineState { risk_snapshot: RiskSnapshot, safety_policy: HardSafetyPolicy, state_path: String, + db_path: String, candidate_ttl_ms: i64, + venues: HashMap, + execution_modes: HashMap, + adapters: HashMap>, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -121,6 +148,11 @@ struct StrategyIdPayload { strategy_id: String, } +#[derive(Debug, Deserialize)] +struct OptionalVenuePayload { + venue_id: Option, +} + #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); @@ -128,18 +160,23 @@ async fn main() -> Result<()> { let socket_path = socket_path_from_env(); let lock_path = lock_path_from_env(); let state_path = state_path_from_env(); + let db_path = db_path_from_env(); let candidate_ttl_ms = candidate_ttl_ms_from_env(); info!("Starting trading daemon"); info!("Socket path: {}", socket_path); info!("Lock path: {}", lock_path); info!("State path: {}", state_path); + info!("SQLite state path: {}", db_path); info!("Candidate TTL (ms): {}", candidate_ttl_ms); ensure_socket_parent_dir(&socket_path)?; + ensure_sqlite_path_ready(&db_path)?; + ensure_sqlite_schema(&db_path)?; let _lock_file = acquire_single_instance_lock(&lock_path)?; let listener = bind_listener(&socket_path)?; let state = Arc::new(Mutex::new(initial_engine_state( state_path, + db_path, candidate_ttl_ms, ))); @@ -178,8 +215,17 @@ async fn main() -> Result<()> { Ok(()) } -fn initial_engine_state(state_path: String, candidate_ttl_ms: i64) -> EngineState { +fn initial_engine_state(state_path: String, db_path: String, candidate_ttl_ms: i64) -> EngineState { let now = now_ms(); + if let Err(err) = ensure_sqlite_path_ready(&db_path) { + warn!("Failed to prepare sqlite path {}: {:#}", db_path, err); + } + if let Err(err) = ensure_sqlite_schema(&db_path) { + warn!("Failed to initialize sqlite schema {}: {:#}", db_path, err); + } + let venues = default_venues(); + let execution_modes = default_execution_modes(&venues); + let adapters = default_adapters(&execution_modes); let mut state = EngineState { running: false, paused: false, @@ -192,12 +238,18 @@ fn initial_engine_state(state_path: String, candidate_ttl_ms: i64) -> EngineStat risk_snapshot: RiskSnapshot::default(), safety_policy: HardSafetyPolicy::default(), state_path, + db_path, candidate_ttl_ms, + venues, + execution_modes, + adapters, }; + let mut loaded_strategy_snapshot = false; match load_strategy_snapshot(&state.state_path) { Ok(Some(snapshot)) => { apply_strategy_snapshot(&mut state, snapshot); + loaded_strategy_snapshot = true; info!("Loaded persisted strategy state from {}", state.state_path); } Ok(None) => {} @@ -209,6 +261,43 @@ fn initial_engine_state(state_path: String, candidate_ttl_ms: i64) -> EngineStat } } + match load_sqlite_runtime_state(&mut state, !loaded_strategy_snapshot) { + Ok(()) => { + info!("Loaded sqlite runtime state from {}", state.db_path); + } + Err(err) => { + warn!( + "Failed to load sqlite runtime state from {}: {:#}", + state.db_path, err + ); + } + } + + for venue in state.venues.values() { + if let Err(err) = sqlite_upsert_venue(&state.db_path, venue) { + warn!( + "Failed to persist initial venue '{}' state: {:#}", + venue.id, err + ); + } + } + for (key, mode) in &state.execution_modes { + let mut parts = key.split(':'); + let venue_id = parts.next().unwrap_or_default(); + let market_type_raw = parts.next().unwrap_or_default(); + if let Ok(market_type) = parse_market_type(market_type_raw) { + if let Err(err) = + sqlite_upsert_execution_mode(&state.db_path, venue_id, market_type, *mode) + { + warn!( + "Failed to persist initial execution mode '{key}': {:#}", + err + ); + } + } + } + persist_sqlite_runtime_snapshot(&state); + state } @@ -234,6 +323,11 @@ fn default_strategies() -> HashMap { StrategyFamily::Momentum, "builtin-momentum", ), + ( + "crypto.momentum_trend", + StrategyFamily::Momentum, + "builtin-crypto-momentum", + ), ] .into_iter() .map(|(id, family, source)| { @@ -255,6 +349,91 @@ fn default_strategies() -> HashMap { .collect() } +fn default_venues() -> HashMap { + [ + VenueState { + id: "kalshi".to_string(), + enabled: true, + market_types: vec![MarketType::Binary], + paper_only: false, + live_enabled: true, + message: Some("US-compliant prediction venue".to_string()), + }, + VenueState { + id: "coinbase_spot".to_string(), + enabled: true, + market_types: vec![MarketType::Spot], + paper_only: false, + live_enabled: true, + message: Some("US-compliant crypto spot venue".to_string()), + }, + VenueState { + id: "derivatives_paper".to_string(), + enabled: true, + market_types: vec![ + MarketType::Perpetual, + MarketType::Futures, + MarketType::Option, + ], + paper_only: true, + live_enabled: false, + message: Some("paper-only derivatives fallback".to_string()), + }, + ] + .into_iter() + .map(|v| (v.id.clone(), v)) + .collect() +} + +fn mode_key(venue_id: &str, market_type: MarketType) -> String { + format!("{venue_id}:{market_type:?}") +} + +fn default_execution_modes(venues: &HashMap) -> HashMap { + let mut modes = HashMap::new(); + for venue in venues.values() { + for market_type in &venue.market_types { + let mode = if venue.paper_only { + ExecutionMode::Paper + } else { + ExecutionMode::Paper + }; + modes.insert(mode_key(&venue.id, *market_type), mode); + } + } + modes +} + +fn default_adapters( + execution_modes: &HashMap, +) -> HashMap> { + let kalshi_mode = execution_modes + .get(&mode_key("kalshi", MarketType::Binary)) + .copied() + .unwrap_or(ExecutionMode::Paper); + let coinbase_mode = execution_modes + .get(&mode_key("coinbase_spot", MarketType::Spot)) + .copied() + .unwrap_or(ExecutionMode::Paper); + + [ + ( + "kalshi".to_string(), + Arc::new(KalshiAdapter::new(kalshi_mode)) as Arc, + ), + ( + "coinbase_spot".to_string(), + Arc::new(CoinbaseSpotAdapter::new(coinbase_mode)) as Arc, + ), + ( + "derivatives_paper".to_string(), + Arc::new(DerivativesPaperAdapter::default()) as Arc, + ), + ] + .into_iter() + .collect() +} + fn now_ms() -> i64 { match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(dur) => dur.as_millis() as i64, @@ -274,6 +453,10 @@ fn state_path_from_env() -> String { std::env::var("TRADING_STATE_PATH").unwrap_or_else(|_| DEFAULT_STATE_PATH.to_string()) } +fn db_path_from_env() -> String { + std::env::var("TRADING_DB_PATH").unwrap_or_else(|_| DEFAULT_DB_PATH.to_string()) +} + fn candidate_ttl_ms_from_env() -> i64 { match std::env::var("TRADING_CANDIDATE_TTL_MS") { Ok(value) => match value.parse::() { @@ -290,6 +473,26 @@ fn candidate_ttl_ms_from_env() -> i64 { } } +fn ensure_sqlite_path_ready(db_path: &str) -> Result<()> { + if let Some(parent) = Path::new(db_path).parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!( + "Failed to create sqlite directory for database {}", + parent.display() + ) + })?; + } + Ok(()) +} + +fn ensure_sqlite_schema(db_path: &str) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("Failed to open sqlite database at {}", db_path))?; + conn.execute_batch(SQLITE_MIGRATION_0001) + .with_context(|| format!("Failed to initialize sqlite schema at {}", db_path))?; + Ok(()) +} + fn ensure_socket_parent_dir(socket_path: &str) -> Result<()> { let parent = PathBuf::from(socket_path) .parent() @@ -615,6 +818,14 @@ async fn process_request(request: &Envelope, state: &Arc>) -> process_strategy_request(request, state, command).await } Some(RequestKind::Risk(command)) => process_risk_request(request, state, command).await, + Some(RequestKind::Venue(command)) => process_venue_request(request, state, command).await, + Some(RequestKind::Portfolio(command)) => { + process_portfolio_request(request, state, command).await + } + Some(RequestKind::Order(command)) => process_order_request(request, state, command).await, + Some(RequestKind::ExecutionMode(command)) => { + process_execution_mode_request(request, state, command).await + } None => Envelope::response_to( request, json!({ @@ -659,6 +870,7 @@ async fn process_control_request( apply_start(&mut state); let event = engine_health_event(&state); push_event(&mut state, event); + persist_sqlite_runtime_snapshot(&state); status_response(request, &state) } ControlCommand::Stop => { @@ -666,6 +878,7 @@ async fn process_control_request( apply_stop(&mut state); let event = engine_health_event(&state); push_event(&mut state, event); + persist_sqlite_runtime_snapshot(&state); status_response(request, &state) } } @@ -686,6 +899,7 @@ async fn process_engine_request( apply_pause(&mut state); let event = engine_health_event(&state); push_event(&mut state, event); + persist_sqlite_runtime_snapshot(&state); status_response(request, &state) } EngineCommand::Resume => { @@ -702,6 +916,7 @@ async fn process_engine_request( apply_resume(&mut state); let event = engine_health_event(&state); push_event(&mut state, event); + persist_sqlite_runtime_snapshot(&state); status_response(request, &state) } EngineCommand::KillSwitch => { @@ -718,6 +933,7 @@ async fn process_engine_request( ); let event = engine_health_event(&state); push_event(&mut state, event); + persist_sqlite_runtime_snapshot(&state); status_response(request, &state) } } @@ -777,6 +993,7 @@ async fn process_strategy_request( state.last_command_at_ms = now_ms(); push_event(&mut state, event); persist_strategy_state(&state); + persist_sqlite_runtime_snapshot(&state); Envelope::response_to( request, @@ -840,6 +1057,7 @@ async fn process_strategy_request( state.last_command_at_ms = now_ms(); push_event(&mut state, event); persist_strategy_state(&state); + persist_sqlite_runtime_snapshot(&state); Envelope::response_to( request, @@ -879,6 +1097,7 @@ async fn process_strategy_request( }, ); persist_strategy_state(&state); + persist_sqlite_runtime_snapshot(&state); Envelope::response_to( request, @@ -908,6 +1127,7 @@ async fn process_strategy_request( kill_switch_engaged, }, ); + persist_sqlite_runtime_snapshot(&state); Envelope::response_to( request, @@ -953,6 +1173,7 @@ async fn process_risk_request( if payload.action == "clear_runtime_counters" { state.risk_snapshot.orders_last_minute = 0; state.last_command_at_ms = now_ms(); + persist_sqlite_runtime_snapshot(&state); return Envelope::response_to( request, json!({ @@ -975,6 +1196,7 @@ async fn process_risk_request( kill_switch_engaged: false, }, ); + persist_sqlite_runtime_snapshot(&state); return Envelope::response_to( request, @@ -999,6 +1221,1045 @@ async fn process_risk_request( } } +fn bool_to_i64(value: bool) -> i64 { + if value { + 1 + } else { + 0 + } +} + +fn i64_to_bool(value: i64) -> bool { + value != 0 +} + +fn asset_class_label(value: AssetClass) -> String { + match value { + AssetClass::Prediction => "prediction", + AssetClass::Crypto => "crypto", + AssetClass::Equity => "equity", + AssetClass::Forex => "forex", + AssetClass::Derivative => "derivative", + AssetClass::Unknown => "unknown", + } + .to_string() +} + +fn market_type_label(value: MarketType) -> String { + match value { + MarketType::Spot => "spot", + MarketType::Binary => "binary", + MarketType::Perpetual => "perpetual", + MarketType::Futures => "futures", + MarketType::Option => "option", + MarketType::Unknown => "unknown", + } + .to_string() +} + +fn order_side_label(value: OrderSide) -> String { + match value { + OrderSide::Buy => "buy", + OrderSide::Sell => "sell", + } + .to_string() +} + +fn order_type_label(value: OrderType) -> String { + match value { + OrderType::Limit => "limit", + OrderType::Market => "market", + } + .to_string() +} + +fn order_status_label(value: OrderStatus) -> String { + match value { + OrderStatus::Pending => "pending", + OrderStatus::Accepted => "accepted", + OrderStatus::Filled => "filled", + OrderStatus::Canceled => "canceled", + OrderStatus::Rejected => "rejected", + OrderStatus::Failed => "failed", + } + .to_string() +} + +fn execution_mode_label(value: ExecutionMode) -> String { + match value { + ExecutionMode::Paper => "paper", + ExecutionMode::Live => "live", + } + .to_string() +} + +fn option_type_label(value: OptionType) -> String { + match value { + OptionType::Call => "call", + OptionType::Put => "put", + } + .to_string() +} + +fn parse_asset_class(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "prediction" => Ok(AssetClass::Prediction), + "crypto" => Ok(AssetClass::Crypto), + "equity" => Ok(AssetClass::Equity), + "forex" => Ok(AssetClass::Forex), + "derivative" | "derivatives" => Ok(AssetClass::Derivative), + "unknown" => Ok(AssetClass::Unknown), + other => Err(format!("unsupported asset_class '{other}'")), + } +} + +fn parse_market_type(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "spot" => Ok(MarketType::Spot), + "binary" => Ok(MarketType::Binary), + "perpetual" | "perp" => Ok(MarketType::Perpetual), + "futures" | "future" => Ok(MarketType::Futures), + "option" | "options" => Ok(MarketType::Option), + "unknown" => Ok(MarketType::Unknown), + other => Err(format!("unsupported market_type '{other}'")), + } +} + +fn parse_order_side(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "buy" => Ok(OrderSide::Buy), + "sell" => Ok(OrderSide::Sell), + other => Err(format!("unsupported side '{other}'")), + } +} + +fn parse_order_type(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "limit" => Ok(OrderType::Limit), + "market" => Ok(OrderType::Market), + other => Err(format!("unsupported order_type '{other}'")), + } +} + +fn parse_tif(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "gtc" => Ok(TimeInForce::Gtc), + "ioc" => Ok(TimeInForce::Ioc), + "fok" => Ok(TimeInForce::Fok), + "day" => Ok(TimeInForce::Day), + other => Err(format!("unsupported tif '{other}'")), + } +} + +fn parse_execution_mode(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "paper" => Ok(ExecutionMode::Paper), + "live" => Ok(ExecutionMode::Live), + other => Err(format!("unsupported mode '{other}'")), + } +} + +fn parse_option_type(raw: &str) -> std::result::Result { + match raw.trim().to_ascii_lowercase().as_str() { + "call" => Ok(OptionType::Call), + "put" => Ok(OptionType::Put), + other => Err(format!("unsupported option_type '{other}'")), + } +} + +fn instrument_to_payload(instrument: &InstrumentId) -> InstrumentRefPayload { + InstrumentRefPayload { + venue_id: instrument.venue_id.clone(), + symbol: instrument.symbol.clone(), + asset_class: asset_class_label(instrument.asset_class), + market_type: market_type_label(instrument.market_type), + expiry_ts_ms: instrument.expiry_ts_ms, + strike: instrument.strike, + option_type: instrument.option_type.map(option_type_label), + } +} + +fn payload_to_instrument( + payload: &InstrumentRefPayload, +) -> std::result::Result { + let mut metadata = BTreeMap::new(); + metadata.insert("source".to_string(), "protocol".to_string()); + + Ok(InstrumentId { + venue_id: payload.venue_id.clone(), + symbol: payload.symbol.clone(), + asset_class: parse_asset_class(&payload.asset_class)?, + market_type: parse_market_type(&payload.market_type)?, + expiry_ts_ms: payload.expiry_ts_ms, + strike: payload.strike, + option_type: payload + .option_type + .as_ref() + .map(|v| parse_option_type(v)) + .transpose()?, + metadata, + }) +} + +fn venue_summary_from_state(state: &EngineState, venue: &VenueState) -> VenueSummaryPayload { + let health = state + .adapters + .get(&venue.id) + .map(|adapter| adapter.health()) + .unwrap_or_else(|| trading_domain::VenueHealth { + venue_id: venue.id.clone(), + healthy: false, + connected_market_data: false, + connected_trading: false, + message: Some("adapter unavailable".to_string()), + }); + + VenueSummaryPayload { + venue_id: venue.id.clone(), + enabled: venue.enabled, + market_types: venue + .market_types + .iter() + .map(|m| market_type_label(*m)) + .collect(), + healthy: health.healthy, + live_enabled: venue.live_enabled, + paper_only: venue.paper_only, + message: venue.message.clone().or(health.message), + } +} + +fn order_summary_to_payload(order: &OrderSummary) -> OrderListItemPayload { + OrderListItemPayload { + venue_order_id: order.venue_order_id.clone(), + client_order_id: order.client_order_id.clone(), + strategy_id: order.strategy_id.clone(), + venue_id: order.venue_id.clone(), + instrument: instrument_to_payload(&order.instrument), + side: order_side_label(order.side), + order_type: order_type_label(order.order_type), + quantity: order.quantity, + filled_quantity: order.filled_quantity, + avg_fill_price: order.avg_fill_price, + status: order_status_label(order.status), + created_at_ms: order.created_at_ms, + updated_at_ms: order.updated_at_ms, + message: order.message.clone(), + } +} + +fn refresh_adapter_for_venue(state: &mut EngineState, venue_id: &str) { + match venue_id { + "kalshi" => { + let mode = state + .execution_modes + .get(&mode_key("kalshi", MarketType::Binary)) + .copied() + .unwrap_or(ExecutionMode::Paper); + state + .adapters + .insert("kalshi".to_string(), Arc::new(KalshiAdapter::new(mode))); + } + "coinbase_spot" => { + let mode = state + .execution_modes + .get(&mode_key("coinbase_spot", MarketType::Spot)) + .copied() + .unwrap_or(ExecutionMode::Paper); + state.adapters.insert( + "coinbase_spot".to_string(), + Arc::new(CoinbaseSpotAdapter::new(mode)), + ); + } + "derivatives_paper" => { + state.adapters.insert( + "derivatives_paper".to_string(), + Arc::new(DerivativesPaperAdapter::default()), + ); + } + _ => {} + } +} + +fn sqlite_upsert_venue(db_path: &str, venue: &VenueState) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + let market_types = serde_json::to_string(&venue.market_types).context("encode market types")?; + conn.execute( + r#" + INSERT INTO venues (venue_id, enabled, market_types, paper_only, live_enabled, message, updated_at_ms) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) + ON CONFLICT(venue_id) DO UPDATE SET + enabled=excluded.enabled, + market_types=excluded.market_types, + paper_only=excluded.paper_only, + live_enabled=excluded.live_enabled, + message=excluded.message, + updated_at_ms=excluded.updated_at_ms + "#, + params![ + venue.id, + bool_to_i64(venue.enabled), + market_types, + bool_to_i64(venue.paper_only), + bool_to_i64(venue.live_enabled), + venue.message, + now_ms(), + ], + ) + .context("upsert venue failed")?; + Ok(()) +} + +fn sqlite_upsert_execution_mode( + db_path: &str, + venue_id: &str, + market_type: MarketType, + mode: ExecutionMode, +) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + let key = mode_key(venue_id, market_type); + conn.execute( + r#" + INSERT INTO execution_modes (mode_key, venue_id, market_type, mode, updated_at_ms) + VALUES (?1, ?2, ?3, ?4, ?5) + ON CONFLICT(mode_key) DO UPDATE SET + mode=excluded.mode, + updated_at_ms=excluded.updated_at_ms + "#, + params![ + key, + venue_id, + market_type_label(market_type), + execution_mode_label(mode), + now_ms(), + ], + ) + .context("upsert execution mode failed")?; + Ok(()) +} + +fn sqlite_upsert_order(db_path: &str, order: &OrderSummary) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + let payload = serde_json::to_string(order).context("encode order summary")?; + conn.execute( + r#" + INSERT INTO orders (venue_order_id, payload_json, updated_at_ms) + VALUES (?1, ?2, ?3) + ON CONFLICT(venue_order_id) DO UPDATE SET + payload_json=excluded.payload_json, + updated_at_ms=excluded.updated_at_ms + "#, + params![order.venue_order_id, payload, now_ms()], + ) + .context("upsert order failed")?; + Ok(()) +} + +fn sqlite_upsert_fill(db_path: &str, fill_id: &str, payload: &serde_json::Value) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + conn.execute( + r#" + INSERT INTO fills (fill_id, payload_json, updated_at_ms) + VALUES (?1, ?2, ?3) + ON CONFLICT(fill_id) DO UPDATE SET + payload_json=excluded.payload_json, + updated_at_ms=excluded.updated_at_ms + "#, + params![fill_id, serde_json::to_string(payload)?, now_ms()], + ) + .context("upsert fill failed")?; + Ok(()) +} + +fn sqlite_upsert_strategy(db_path: &str, strategy: &StrategyState) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + conn.execute( + r#" + INSERT INTO strategies (strategy_id, payload_json, updated_at_ms) + VALUES (?1, ?2, ?3) + ON CONFLICT(strategy_id) DO UPDATE SET + payload_json=excluded.payload_json, + updated_at_ms=excluded.updated_at_ms + "#, + params![&strategy.id, serde_json::to_string(strategy)?, now_ms()], + ) + .context("upsert strategy failed")?; + Ok(()) +} + +fn sqlite_upsert_risk_state(db_path: &str, snapshot: &RiskSnapshot) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + conn.execute( + r#" + INSERT INTO risk_state (id, payload_json, updated_at_ms) + VALUES (1, ?1, ?2) + ON CONFLICT(id) DO UPDATE SET + payload_json=excluded.payload_json, + updated_at_ms=excluded.updated_at_ms + "#, + params![serde_json::to_string(snapshot)?, now_ms()], + ) + .context("upsert risk state failed")?; + Ok(()) +} + +fn sqlite_append_event(db_path: &str, value: &serde_json::Value) -> Result<()> { + let conn = Connection::open(db_path) + .with_context(|| format!("failed opening sqlite database {}", db_path))?; + conn.execute( + "INSERT INTO events_journal (ts_ms, event_json) VALUES (?1, ?2)", + params![now_ms(), serde_json::to_string(value)?], + ) + .context("append event failed")?; + Ok(()) +} + +fn persist_sqlite_runtime_snapshot(state: &EngineState) { + for strategy in state.strategies.values() { + if let Err(err) = sqlite_upsert_strategy(&state.db_path, strategy) { + warn!( + "Failed to persist strategy '{}' runtime state: {:#}", + strategy.id, err + ); + } + } + if let Err(err) = sqlite_upsert_risk_state(&state.db_path, &state.risk_snapshot) { + warn!("Failed to persist risk runtime state: {:#}", err); + } +} + +fn load_sqlite_runtime_state( + state: &mut EngineState, + restore_strategy_and_risk: bool, +) -> Result<()> { + let conn = Connection::open(&state.db_path) + .with_context(|| format!("failed opening sqlite database {}", state.db_path))?; + + { + let mut stmt = conn.prepare( + "SELECT venue_id, enabled, market_types, paper_only, live_enabled, message FROM venues", + )?; + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + let venue_id: String = row.get(0)?; + let enabled: i64 = row.get(1)?; + let market_types_json: String = row.get(2)?; + let paper_only: i64 = row.get(3)?; + let live_enabled: i64 = row.get(4)?; + let message: Option = row.get(5)?; + + let market_types: Vec = + serde_json::from_str(&market_types_json).unwrap_or_default(); + if market_types.is_empty() { + continue; + } + + if let Some(venue) = state.venues.get_mut(&venue_id) { + venue.enabled = i64_to_bool(enabled); + venue.paper_only = i64_to_bool(paper_only); + venue.live_enabled = i64_to_bool(live_enabled); + venue.message = message; + venue.market_types = market_types; + } + } + } + + { + let mut stmt = conn + .prepare("SELECT venue_id, market_type, mode FROM execution_modes ORDER BY mode_key")?; + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + let venue_id: String = row.get(0)?; + let market_type_raw: String = row.get(1)?; + let mode_raw: String = row.get(2)?; + let Ok(market_type) = parse_market_type(&market_type_raw) else { + continue; + }; + let Ok(mode) = parse_execution_mode(&mode_raw) else { + continue; + }; + state + .execution_modes + .insert(mode_key(&venue_id, market_type), mode); + } + } + + if restore_strategy_and_risk { + { + let mut loaded: Vec = Vec::new(); + let mut stmt = + conn.prepare("SELECT payload_json FROM strategies ORDER BY strategy_id")?; + let mut rows = stmt.query([])?; + while let Some(row) = rows.next()? { + let payload_json: String = row.get(0)?; + match serde_json::from_str::(&payload_json) { + Ok(strategy) => loaded.push(strategy), + Err(err) => warn!("Ignoring invalid sqlite strategy payload: {}", err), + } + } + if !loaded.is_empty() { + apply_strategy_snapshot( + state, + StrategyStateSnapshot { + schema_version: 1, + saved_at_ms: now_ms(), + strategies: loaded, + }, + ); + } + } + + { + let mut stmt = conn.prepare("SELECT payload_json FROM risk_state WHERE id = 1")?; + let mut rows = stmt.query([])?; + if let Some(row) = rows.next()? { + let payload_json: String = row.get(0)?; + match serde_json::from_str::(&payload_json) { + Ok(snapshot) => { + state.risk_snapshot = snapshot; + } + Err(err) => warn!("Ignoring invalid sqlite risk state payload: {}", err), + } + } + } + } + + state.risk_snapshot.strategy_canary_notional = strategy_canary_notional_map(&state.strategies); + + state.adapters = default_adapters(&state.execution_modes); + Ok(()) +} + +async fn process_venue_request( + request: &Envelope, + state: &Arc>, + command: VenueCommand, +) -> Envelope { + match command { + VenueCommand::List => { + let state = state.lock().await; + let mut venues: Vec<_> = state + .venues + .values() + .map(|v| venue_summary_from_state(&state, v)) + .collect(); + venues.sort_by(|a, b| a.venue_id.cmp(&b.venue_id)); + Envelope::response_to(request, json!({ "ok": true, "venues": venues })) + } + VenueCommand::Status | VenueCommand::Enable | VenueCommand::Disable => { + let payload: VenueTogglePayload = match parse_payload(&request.payload) { + Ok(p) => p, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + + let mut state = state.lock().await; + if matches!(command, VenueCommand::Enable | VenueCommand::Disable) { + let enabled = matches!(command, VenueCommand::Enable); + let db_path = state.db_path.clone(); + let venue_to_persist = match state.venues.get_mut(&payload.venue_id) { + Some(v) => v, + None => { + return Envelope::response_to( + request, + json!({ "ok": false, "error": format!("Unknown venue '{}'", payload.venue_id) }), + ); + } + }; + venue_to_persist.enabled = enabled; + let venue_to_persist = venue_to_persist.clone(); + state.last_command_at_ms = now_ms(); + if let Err(err) = sqlite_upsert_venue(&db_path, &venue_to_persist) { + warn!("Failed to persist venue state: {:#}", err); + } + } + + let Some(venue) = state.venues.get(&payload.venue_id).cloned() else { + return Envelope::response_to( + request, + json!({ "ok": false, "error": format!("Unknown venue '{}'", payload.venue_id) }), + ); + }; + let summary = venue_summary_from_state(&state, &venue); + Envelope::response_to(request, json!({ "ok": true, "venue": summary })) + } + } +} + +async fn process_portfolio_request( + request: &Envelope, + state: &Arc>, + command: PortfolioCommand, +) -> Envelope { + let adapters: Vec> = { + let state = state.lock().await; + state + .venues + .values() + .filter(|venue| venue.enabled) + .filter_map(|venue| state.adapters.get(&venue.id).cloned()) + .collect() + }; + + let mut balances: Vec = Vec::new(); + let mut positions: Vec = Vec::new(); + let mut errors: Vec = Vec::new(); + + for adapter in adapters { + match command { + PortfolioCommand::Balances => match adapter.sync_balances().await { + Ok(values) => { + balances.extend(values.into_iter().map(|value| PortfolioBalancePayload { + venue_id: value.venue_id, + asset: value.asset, + total: value.total, + available: value.available, + })); + } + Err(err) => errors.push(format!("{}: {}", adapter.venue(), err.message)), + }, + PortfolioCommand::Positions => match adapter.sync_positions().await { + Ok(values) => { + positions.extend(values.into_iter().map(|value| PortfolioPositionPayload { + venue_id: value.venue_id, + instrument: instrument_to_payload(&value.instrument), + quantity: value.quantity, + avg_price: value.avg_price, + mark_price: value.mark_price, + unrealized_pnl: value.unrealized_pnl, + })); + } + Err(err) => errors.push(format!("{}: {}", adapter.venue(), err.message)), + }, + } + } + + match command { + PortfolioCommand::Balances => Envelope::response_to( + request, + json!({ "ok": errors.is_empty(), "balances": balances, "errors": errors }), + ), + PortfolioCommand::Positions => Envelope::response_to( + request, + json!({ "ok": errors.is_empty(), "positions": positions, "errors": errors }), + ), + } +} + +async fn process_order_request( + request: &Envelope, + state: &Arc>, + command: OrderCommand, +) -> Envelope { + match command { + OrderCommand::Submit => { + let payload: OrderSubmitPayload = match parse_payload(&request.payload) { + Ok(p) => p, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + + let instrument = match payload_to_instrument(&payload.instrument) { + Ok(v) => v, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + let side = match parse_order_side(&payload.side) { + Ok(v) => v, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + let order_type = match parse_order_type(&payload.order_type) { + Ok(v) => v, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + let tif = match payload.tif { + Some(raw) => match parse_tif(&raw) { + Ok(v) => Some(v), + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }, + None => None, + }; + + let requested_notional_cents = { + let base = if let Some(limit_price) = payload.limit_price { + (payload.quantity * limit_price * 100.0).round() as i64 + } else { + (payload.quantity * 100.0).round() as i64 + }; + base.max(1) + }; + + let (adapter, venue_id, db_path, can_submit) = { + let state = state.lock().await; + let Some(strategy) = state.strategies.get(&payload.strategy_id) else { + return Envelope::response_to( + request, + json!({"ok": false, "error": format!("Unknown strategy '{}'", payload.strategy_id)}), + ); + }; + if !strategy.enabled { + return Envelope::response_to( + request, + json!({"ok": false, "error": format!("Strategy '{}' is disabled", payload.strategy_id)}), + ); + } + let Some(venue) = state.venues.get(&payload.venue_id) else { + return Envelope::response_to( + request, + json!({"ok": false, "error": format!("Unknown venue '{}'", payload.venue_id)}), + ); + }; + if !venue.enabled { + return Envelope::response_to( + request, + json!({"ok": false, "error": format!("Venue '{}' is disabled", payload.venue_id)}), + ); + } + let market_type = instrument.market_type; + let mode = state + .execution_modes + .get(&mode_key(&payload.venue_id, market_type)) + .copied() + .unwrap_or(ExecutionMode::Paper); + let can_submit = if mode == ExecutionMode::Live { + venue.live_enabled && !venue.paper_only + } else { + true + }; + let cage = HardSafetyCage::new(state.safety_policy.clone()); + let risk_decision = cage.evaluate_order_with_context( + &payload.strategy_id, + Some(&payload.venue_id), + requested_notional_cents, + &state.risk_snapshot, + ); + if let RiskDecision::Deny { reason } = risk_decision { + return Envelope::response_to( + request, + json!({ + "ok": false, + "error": reason, + "hard_safety_floor": "enforced", + }), + ); + } + ( + state.adapters.get(&payload.venue_id).cloned(), + payload.venue_id.clone(), + state.db_path.clone(), + can_submit, + ) + }; + + if !can_submit { + return Envelope::response_to( + request, + json!({"ok": false, "error": "venue is not authorized for live execution"}), + ); + } + + let Some(adapter) = adapter else { + return Envelope::response_to( + request, + json!({"ok": false, "error": format!("No adapter for venue '{}'", venue_id)}), + ); + }; + + let req = OrderRequest { + venue_id: payload.venue_id.clone(), + strategy_id: payload.strategy_id.clone(), + client_order_id: payload.client_order_id.clone(), + instrument, + side, + order_type, + quantity: payload.quantity, + limit_price: payload.limit_price, + tif, + post_only: payload.post_only, + reduce_only: payload.reduce_only, + metadata: BTreeMap::new(), + }; + + match adapter.place_order(req.clone()).await { + Ok(ack) => { + let now = now_ms(); + let summary = OrderSummary { + venue_order_id: ack.venue_order_id.clone(), + client_order_id: req.client_order_id.clone(), + strategy_id: req.strategy_id.clone(), + venue_id: req.venue_id.clone(), + instrument: req.instrument.clone(), + side: req.side, + order_type: req.order_type, + quantity: req.quantity, + filled_quantity: if ack.status == OrderStatus::Filled { + req.quantity + } else { + 0.0 + }, + avg_fill_price: req.limit_price, + status: ack.status, + created_at_ms: now, + updated_at_ms: now, + message: ack.reason.clone(), + }; + if let Err(err) = sqlite_upsert_order(&db_path, &summary) { + warn!("Failed to persist order summary: {:#}", err); + } + if ack.status == OrderStatus::Filled { + let fill_id = format!("{}:{}", ack.venue_order_id, now); + let fill = json!({ + "fill_id": fill_id, + "venue_order_id": ack.venue_order_id, + "client_order_id": req.client_order_id.clone(), + "strategy_id": req.strategy_id.clone(), + "venue_id": req.venue_id.clone(), + "symbol": req.instrument.symbol.clone(), + "quantity": req.quantity, + "price": req.limit_price, + "ts_ms": now, + }); + if let Err(err) = sqlite_upsert_fill(&db_path, &fill_id, &fill) { + warn!("Failed to persist fill: {:#}", err); + } + } + + let mut state = state.lock().await; + state.last_command_at_ms = now; + state.risk_snapshot.orders_last_minute = + state.risk_snapshot.orders_last_minute.saturating_add(1); + state.risk_snapshot.total_notional_cents = state + .risk_snapshot + .total_notional_cents + .saturating_add(requested_notional_cents); + let venue_total = state + .risk_snapshot + .venue_notional_cents + .entry(req.venue_id.clone()) + .or_insert(0); + *venue_total = venue_total.saturating_add(requested_notional_cents); + let strategy_total = state + .risk_snapshot + .strategy_canary_notional + .entry(req.strategy_id.clone()) + .or_insert(0); + *strategy_total = strategy_total.saturating_add(requested_notional_cents); + if ack.status == OrderStatus::Filled { + state.risk_snapshot.open_positions = + state.risk_snapshot.open_positions.saturating_add(1); + } + push_event( + &mut state, + Event::Execution { + venue: req.venue_id, + strategy_id: req.strategy_id, + symbol: req.instrument.symbol, + action: order_side_label(req.side), + status: order_status_label(ack.status), + latency_ms: 0, + }, + ); + persist_sqlite_runtime_snapshot(&state); + + Envelope::response_to( + request, + json!({ + "ok": true, + "ack": { + "venue_order_id": ack.venue_order_id, + "accepted": ack.accepted, + "status": order_status_label(ack.status), + "reason": ack.reason, + } + }), + ) + } + Err(err) => Envelope::response_to( + request, + json!({ "ok": false, "error": err.message, "code": err.code }), + ), + } + } + OrderCommand::Cancel => { + let payload: OrderCancelPayload = match parse_payload(&request.payload) { + Ok(p) => p, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + + let adapters: Vec> = { + let state = state.lock().await; + if let Some(venue_id) = payload.venue_id.as_ref() { + state + .adapters + .get(venue_id) + .cloned() + .map(|a| vec![a]) + .unwrap_or_default() + } else { + state.adapters.values().cloned().collect() + } + }; + + for adapter in adapters { + if adapter.cancel_order(&payload.venue_order_id).await.is_ok() { + return Envelope::response_to( + request, + json!({ "ok": true, "venue_order_id": payload.venue_order_id }), + ); + } + } + + Envelope::response_to( + request, + json!({ "ok": false, "error": "order not found in enabled venues" }), + ) + } + OrderCommand::List => { + let payload: OptionalVenuePayload = + parse_payload(&request.payload).unwrap_or(OptionalVenuePayload { venue_id: None }); + + let adapters: Vec> = { + let state = state.lock().await; + match payload.venue_id.as_ref() { + Some(venue_id) => state + .adapters + .get(venue_id) + .cloned() + .map(|a| vec![a]) + .unwrap_or_default(), + None => state.adapters.values().cloned().collect(), + } + }; + + let mut orders: Vec = Vec::new(); + for adapter in adapters { + if let Ok(adapter_orders) = adapter.list_orders().await { + orders.extend( + adapter_orders + .into_iter() + .map(|o| order_summary_to_payload(&o)), + ); + } + } + orders.sort_by(|a, b| b.updated_at_ms.cmp(&a.updated_at_ms)); + + Envelope::response_to(request, json!({ "ok": true, "orders": orders })) + } + } +} + +async fn process_execution_mode_request( + request: &Envelope, + state: &Arc>, + command: ExecutionModeCommand, +) -> Envelope { + let payload: ExecutionModePayload = match parse_payload(&request.payload) { + Ok(p) => p, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + + let market_type = match parse_market_type(&payload.market_type) { + Ok(v) => v, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + + let mut state = state.lock().await; + let venue = match state.venues.get(&payload.venue_id) { + Some(v) => v.clone(), + None => { + return Envelope::response_to( + request, + json!({"ok": false, "error": format!("Unknown venue '{}'", payload.venue_id)}), + ); + } + }; + + if !venue.market_types.contains(&market_type) { + return Envelope::response_to( + request, + json!({"ok": false, "error": "venue does not support requested market_type"}), + ); + } + + let key = mode_key(&payload.venue_id, market_type); + match command { + ExecutionModeCommand::Get => { + let mode = state + .execution_modes + .get(&key) + .copied() + .unwrap_or(ExecutionMode::Paper); + Envelope::response_to( + request, + json!({ + "ok": true, + "execution_mode": { + "venue_id": payload.venue_id, + "market_type": market_type_label(market_type), + "mode": execution_mode_label(mode), + } + }), + ) + } + ExecutionModeCommand::Set => { + let mode = match parse_execution_mode(&payload.mode) { + Ok(v) => v, + Err(err) => { + return Envelope::response_to(request, json!({"ok": false, "error": err})); + } + }; + + if mode == ExecutionMode::Live && (venue.paper_only || !venue.live_enabled) { + return Envelope::response_to( + request, + json!({"ok": false, "error": "venue cannot run in live mode"}), + ); + } + + state.execution_modes.insert(key, mode); + refresh_adapter_for_venue(&mut state, &payload.venue_id); + if let Err(err) = + sqlite_upsert_execution_mode(&state.db_path, &payload.venue_id, market_type, mode) + { + warn!("Failed to persist execution mode: {:#}", err); + } + + Envelope::response_to( + request, + json!({ + "ok": true, + "execution_mode": { + "venue_id": payload.venue_id, + "market_type": market_type_label(market_type), + "mode": execution_mode_label(mode), + } + }), + ) + } + } +} + fn parse_payload(payload: &serde_json::Value) -> std::result::Result where T: DeserializeOwned, @@ -1062,6 +2323,39 @@ fn engine_health_event(state: &EngineState) -> Event { fn status_response(request: &Envelope, state: &EngineState) -> Envelope { let recent_events: Vec<_> = state.recent_events.iter().cloned().collect(); + let mut venues: Vec<_> = state + .venues + .values() + .map(|venue| venue_summary_from_state(state, venue)) + .collect(); + venues.sort_by(|a, b| a.venue_id.cmp(&b.venue_id)); + + let mut execution_modes: Vec<_> = state + .execution_modes + .iter() + .filter_map(|(key, mode)| { + let mut parts = key.split(':'); + let venue_id = parts.next()?.to_string(); + let market_type = parts.next()?.to_ascii_lowercase(); + Some(json!({ + "venue_id": venue_id, + "market_type": market_type, + "mode": execution_mode_label(*mode), + })) + }) + .collect(); + execution_modes.sort_by(|a, b| { + let a_venue = a + .get("venue_id") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + let b_venue = b + .get("venue_id") + .and_then(|v| v.as_str()) + .unwrap_or_default(); + a_venue.cmp(b_venue) + }); + Envelope::response_to( request, json!({ @@ -1072,6 +2366,8 @@ fn status_response(request: &Envelope, state: &EngineState) -> Envelope { "state": risk_state_payload(state), }, "strategies": strategy_summaries(state), + "venues": venues, + "execution_modes": execution_modes, "recent_events": recent_events, }), ) @@ -1090,6 +2386,12 @@ fn push_event(state: &mut EngineState, event: Event) { while state.recent_events.len() > MAX_RECENT_EVENTS { state.recent_events.pop_front(); } + + if let Some(last) = state.recent_events.back() { + if let Err(err) = sqlite_append_event(&state.db_path, last) { + warn!("Failed to persist event journal record: {:#}", err); + } + } } #[cfg(test)] @@ -1136,10 +2438,24 @@ mod tests { ) } + fn unique_db_path(label: &str) -> String { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time") + .as_nanos(); + format!( + "{}/trading-daemon-test-{}-{}.sqlite3", + std::env::temp_dir().display(), + label, + nanos + ) + } + #[test] fn promotion_updates_snapshot_and_preserves_enable_state() { let state_path = unique_state_path("promotion-preserve-enable"); - let mut state = initial_engine_state(state_path, 0); + let db_path = unique_db_path("promotion-preserve-enable"); + let mut state = initial_engine_state(state_path, db_path, 0); let strategy_id = "kalshi.arbitrage"; { @@ -1177,7 +2493,8 @@ mod tests { #[test] fn promotion_rejects_expired_candidate_when_ttl_enabled() { let state_path = unique_state_path("promotion-ttl"); - let mut state = initial_engine_state(state_path, 100); + let db_path = unique_db_path("promotion-ttl"); + let mut state = initial_engine_state(state_path, db_path, 100); let strategy_id = "kalshi.market_making"; state @@ -1195,7 +2512,8 @@ mod tests { #[test] fn promotion_rejects_hash_mismatch() { let state_path = unique_state_path("promotion-hash"); - let mut state = initial_engine_state(state_path, 0); + let db_path = unique_db_path("promotion-hash"); + let mut state = initial_engine_state(state_path, db_path, 0); let strategy_id = "core.mean_reversion"; state @@ -1213,7 +2531,8 @@ mod tests { #[test] fn engine_helpers_keep_pause_flags_in_sync_and_distinguish_stop_pause() { let state_path = unique_state_path("engine-state"); - let mut state = initial_engine_state(state_path, 0); + let db_path = unique_db_path("engine-state"); + let mut state = initial_engine_state(state_path, db_path, 0); apply_start(&mut state); assert_eq!(state.running, true); @@ -1253,7 +2572,8 @@ mod tests { #[test] fn snapshot_roundtrip_restores_strategy_state() { let state_path = unique_state_path("snapshot-roundtrip"); - let mut state = initial_engine_state(state_path.clone(), 0); + let db_path = unique_db_path("snapshot-roundtrip"); + let mut state = initial_engine_state(state_path.clone(), db_path.clone(), 0); let strategy_id = "core.momentum"; { @@ -1272,7 +2592,7 @@ mod tests { let snapshot = strategy_snapshot_from_state(&state); save_strategy_snapshot(&state_path, &snapshot).expect("snapshot should save"); - let loaded = initial_engine_state(state_path.clone(), 0); + let loaded = initial_engine_state(state_path.clone(), db_path.clone(), 0); let loaded_strategy = loaded .strategies .get(strategy_id) @@ -1293,5 +2613,6 @@ mod tests { ); let _ = std::fs::remove_file(state_path); + let _ = std::fs::remove_file(db_path); } } diff --git a/crates/trading_domain/Cargo.toml b/crates/trading_domain/Cargo.toml new file mode 100644 index 00000000..866cf833 --- /dev/null +++ b/crates/trading_domain/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "trading_domain" +version = "0.1.0" +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } diff --git a/crates/trading_domain/src/lib.rs b/crates/trading_domain/src/lib.rs new file mode 100644 index 00000000..a03b40c8 --- /dev/null +++ b/crates/trading_domain/src/lib.rs @@ -0,0 +1,173 @@ +//! Canonical multi-asset trading domain types shared across the trading engine. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum AssetClass { + Prediction, + Crypto, + Equity, + Forex, + Derivative, + Unknown, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MarketType { + Spot, + Binary, + Perpetual, + Futures, + Option, + Unknown, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum OptionType { + Call, + Put, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct InstrumentId { + pub venue_id: String, + pub symbol: String, + pub asset_class: AssetClass, + pub market_type: MarketType, + pub expiry_ts_ms: Option, + pub strike: Option, + pub option_type: Option, + #[serde(default)] + pub metadata: BTreeMap, +} + +impl InstrumentId { + pub fn key(&self) -> String { + format!("{}:{}", self.venue_id, self.symbol) + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum OrderSide { + Buy, + Sell, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum OrderType { + Limit, + Market, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TimeInForce { + Gtc, + Ioc, + Fok, + Day, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ExecutionMode { + Paper, + Live, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum OrderStatus { + Pending, + Accepted, + Filled, + Canceled, + Rejected, + Failed, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct OrderRequest { + pub venue_id: String, + pub strategy_id: String, + pub client_order_id: String, + pub instrument: InstrumentId, + pub side: OrderSide, + pub order_type: OrderType, + pub quantity: f64, + pub limit_price: Option, + pub tif: Option, + pub post_only: bool, + pub reduce_only: bool, + #[serde(default)] + pub metadata: BTreeMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct OrderAck { + pub venue_order_id: String, + pub accepted: bool, + pub status: OrderStatus, + pub reason: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct OrderSummary { + pub venue_order_id: String, + pub client_order_id: String, + pub strategy_id: String, + pub venue_id: String, + pub instrument: InstrumentId, + pub side: OrderSide, + pub order_type: OrderType, + pub quantity: f64, + pub filled_quantity: f64, + pub avg_fill_price: Option, + pub status: OrderStatus, + pub created_at_ms: i64, + pub updated_at_ms: i64, + pub message: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PositionSnapshot { + pub venue_id: String, + pub instrument: InstrumentId, + pub quantity: f64, + pub avg_price: f64, + pub mark_price: Option, + pub unrealized_pnl: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct BalanceSnapshot { + pub venue_id: String, + pub asset: String, + pub total: f64, + pub available: f64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct VenueCapability { + pub market_type: MarketType, + pub supports_live: bool, + pub supports_paper: bool, + pub supports_post_only: bool, + pub supports_reduce_only: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct VenueHealth { + pub venue_id: String, + pub healthy: bool, + pub connected_market_data: bool, + pub connected_trading: bool, + pub message: Option, +} diff --git a/crates/trading_protocol/src/lib.rs b/crates/trading_protocol/src/lib.rs index 82b472bb..3be06d42 100644 --- a/crates/trading_protocol/src/lib.rs +++ b/crates/trading_protocol/src/lib.rs @@ -176,12 +176,128 @@ impl RiskCommand { } } +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum VenueCommand { + #[serde(rename = "Venue.List")] + List, + #[serde(rename = "Venue.Enable")] + Enable, + #[serde(rename = "Venue.Disable")] + Disable, + #[serde(rename = "Venue.Status")] + Status, +} + +impl VenueCommand { + pub fn as_kind(self) -> &'static str { + match self { + Self::List => "Venue.List", + Self::Enable => "Venue.Enable", + Self::Disable => "Venue.Disable", + Self::Status => "Venue.Status", + } + } + + pub fn from_kind(kind: &str) -> Option { + match kind { + "Venue.List" => Some(Self::List), + "Venue.Enable" => Some(Self::Enable), + "Venue.Disable" => Some(Self::Disable), + "Venue.Status" => Some(Self::Status), + _ => None, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum PortfolioCommand { + #[serde(rename = "Portfolio.Balances")] + Balances, + #[serde(rename = "Portfolio.Positions")] + Positions, +} + +impl PortfolioCommand { + pub fn as_kind(self) -> &'static str { + match self { + Self::Balances => "Portfolio.Balances", + Self::Positions => "Portfolio.Positions", + } + } + + pub fn from_kind(kind: &str) -> Option { + match kind { + "Portfolio.Balances" => Some(Self::Balances), + "Portfolio.Positions" => Some(Self::Positions), + _ => None, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum OrderCommand { + #[serde(rename = "Order.Submit")] + Submit, + #[serde(rename = "Order.Cancel")] + Cancel, + #[serde(rename = "Order.List")] + List, +} + +impl OrderCommand { + pub fn as_kind(self) -> &'static str { + match self { + Self::Submit => "Order.Submit", + Self::Cancel => "Order.Cancel", + Self::List => "Order.List", + } + } + + pub fn from_kind(kind: &str) -> Option { + match kind { + "Order.Submit" => Some(Self::Submit), + "Order.Cancel" => Some(Self::Cancel), + "Order.List" => Some(Self::List), + _ => None, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExecutionModeCommand { + #[serde(rename = "ExecutionMode.Set")] + Set, + #[serde(rename = "ExecutionMode.Get")] + Get, +} + +impl ExecutionModeCommand { + pub fn as_kind(self) -> &'static str { + match self { + Self::Set => "ExecutionMode.Set", + Self::Get => "ExecutionMode.Get", + } + } + + pub fn from_kind(kind: &str) -> Option { + match kind { + "ExecutionMode.Set" => Some(Self::Set), + "ExecutionMode.Get" => Some(Self::Get), + _ => None, + } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RequestKind { Control(ControlCommand), Engine(EngineCommand), Strategy(StrategyCommand), Risk(RiskCommand), + Venue(VenueCommand), + Portfolio(PortfolioCommand), + Order(OrderCommand), + ExecutionMode(ExecutionModeCommand), } impl RequestKind { @@ -195,7 +311,19 @@ impl RequestKind { if let Some(cmd) = StrategyCommand::from_kind(kind) { return Some(Self::Strategy(cmd)); } - RiskCommand::from_kind(kind).map(Self::Risk) + if let Some(cmd) = RiskCommand::from_kind(kind) { + return Some(Self::Risk(cmd)); + } + if let Some(cmd) = VenueCommand::from_kind(kind) { + return Some(Self::Venue(cmd)); + } + if let Some(cmd) = PortfolioCommand::from_kind(kind) { + return Some(Self::Portfolio(cmd)); + } + if let Some(cmd) = OrderCommand::from_kind(kind) { + return Some(Self::Order(cmd)); + } + ExecutionModeCommand::from_kind(kind).map(Self::ExecutionMode) } } @@ -268,6 +396,97 @@ pub struct RiskOverridePayload { pub value: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct VenueTogglePayload { + pub venue_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct VenueSummaryPayload { + pub venue_id: String, + pub enabled: bool, + pub market_types: Vec, + pub healthy: bool, + pub live_enabled: bool, + pub paper_only: bool, + pub message: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct InstrumentRefPayload { + pub venue_id: String, + pub symbol: String, + pub asset_class: String, + pub market_type: String, + pub expiry_ts_ms: Option, + pub strike: Option, + pub option_type: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PortfolioBalancePayload { + pub venue_id: String, + pub asset: String, + pub total: f64, + pub available: f64, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct PortfolioPositionPayload { + pub venue_id: String, + pub instrument: InstrumentRefPayload, + pub quantity: f64, + pub avg_price: f64, + pub mark_price: Option, + pub unrealized_pnl: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct OrderSubmitPayload { + pub strategy_id: String, + pub venue_id: String, + pub instrument: InstrumentRefPayload, + pub side: String, + pub order_type: String, + pub quantity: f64, + pub limit_price: Option, + pub tif: Option, + pub post_only: bool, + pub reduce_only: bool, + pub client_order_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct OrderCancelPayload { + pub venue_id: Option, + pub venue_order_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct OrderListItemPayload { + pub venue_order_id: String, + pub client_order_id: String, + pub strategy_id: String, + pub venue_id: String, + pub instrument: InstrumentRefPayload, + pub side: String, + pub order_type: String, + pub quantity: f64, + pub filled_quantity: f64, + pub avg_fill_price: Option, + pub status: String, + pub created_at_ms: i64, + pub updated_at_ms: i64, + pub message: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ExecutionModePayload { + pub venue_id: String, + pub market_type: String, + pub mode: String, +} + #[derive(Serialize, Deserialize, Debug)] pub enum Event { #[serde(rename = "Event.Alert")] @@ -337,6 +556,22 @@ mod tests { RequestKind::from_kind("Risk.Override"), Some(RequestKind::Risk(RiskCommand::Override)) ); + assert_eq!( + RequestKind::from_kind("Venue.List"), + Some(RequestKind::Venue(VenueCommand::List)) + ); + assert_eq!( + RequestKind::from_kind("Portfolio.Balances"), + Some(RequestKind::Portfolio(PortfolioCommand::Balances)) + ); + assert_eq!( + RequestKind::from_kind("Order.Submit"), + Some(RequestKind::Order(OrderCommand::Submit)) + ); + assert_eq!( + RequestKind::from_kind("ExecutionMode.Get"), + Some(RequestKind::ExecutionMode(ExecutionModeCommand::Get)) + ); assert_eq!(RequestKind::from_kind("Unknown.Command"), None); } diff --git a/crates/tradingctl/src/main.rs b/crates/tradingctl/src/main.rs index 6d66e0a9..0f254241 100644 --- a/crates/tradingctl/src/main.rs +++ b/crates/tradingctl/src/main.rs @@ -5,7 +5,9 @@ use tokio::net::UnixStream; use tokio_util::codec::Framed; use trading_protocol::{ create_codec, CandidatePromotePayload, CandidateUploadPayload, ControlCommand, EngineCommand, - Envelope, RiskCommand, RiskOverridePayload, StrategyCommand, DEFAULT_SOCKET_PATH, + Envelope, ExecutionModeCommand, ExecutionModePayload, InstrumentRefPayload, OrderCancelPayload, + OrderCommand, OrderSubmitPayload, PortfolioCommand, RiskCommand, RiskOverridePayload, + StrategyCommand, VenueCommand, VenueTogglePayload, DEFAULT_SOCKET_PATH, }; #[derive(Parser)] @@ -92,6 +94,84 @@ enum Commands { #[arg(long)] value: Option, }, + /// Send Venue.List command + VenueList, + /// Send Venue.Enable command + VenueEnable { + #[arg(long)] + venue_id: String, + }, + /// Send Venue.Disable command + VenueDisable { + #[arg(long)] + venue_id: String, + }, + /// Send Venue.Status command + VenueStatus { + #[arg(long)] + venue_id: String, + }, + /// Send Portfolio.Balances command + PortfolioBalances, + /// Send Portfolio.Positions command + PortfolioPositions, + /// Send Order.Submit command + OrderSubmit { + #[arg(long)] + strategy_id: String, + #[arg(long)] + venue_id: String, + #[arg(long)] + symbol: String, + #[arg(long)] + asset_class: String, + #[arg(long)] + market_type: String, + #[arg(long)] + side: String, + #[arg(long)] + order_type: String, + #[arg(long)] + quantity: f64, + #[arg(long)] + limit_price: Option, + #[arg(long)] + tif: Option, + #[arg(long, default_value_t = false)] + post_only: bool, + #[arg(long, default_value_t = false)] + reduce_only: bool, + #[arg(long)] + client_order_id: String, + }, + /// Send Order.Cancel command + OrderCancel { + #[arg(long)] + venue_order_id: String, + #[arg(long)] + venue_id: Option, + }, + /// Send Order.List command + OrderList { + #[arg(long)] + venue_id: Option, + }, + /// Send ExecutionMode.Get command + ExecutionModeGet { + #[arg(long)] + venue_id: String, + #[arg(long)] + market_type: String, + }, + /// Send ExecutionMode.Set command + ExecutionModeSet { + #[arg(long)] + venue_id: String, + #[arg(long)] + market_type: String, + #[arg(long)] + mode: String, + }, /// Send a raw JSON command Raw { #[arg(short, long)] @@ -198,6 +278,103 @@ async fn main() -> Result<()> { serde_json::to_value(payload)?, ) } + Commands::VenueList => (VenueCommand::List.as_kind(), serde_json::json!({})), + Commands::VenueEnable { venue_id } => ( + VenueCommand::Enable.as_kind(), + serde_json::to_value(VenueTogglePayload { venue_id })?, + ), + Commands::VenueDisable { venue_id } => ( + VenueCommand::Disable.as_kind(), + serde_json::to_value(VenueTogglePayload { venue_id })?, + ), + Commands::VenueStatus { venue_id } => ( + VenueCommand::Status.as_kind(), + serde_json::to_value(VenueTogglePayload { venue_id })?, + ), + Commands::PortfolioBalances => { + (PortfolioCommand::Balances.as_kind(), serde_json::json!({})) + } + Commands::PortfolioPositions => { + (PortfolioCommand::Positions.as_kind(), serde_json::json!({})) + } + Commands::OrderSubmit { + strategy_id, + venue_id, + symbol, + asset_class, + market_type, + side, + order_type, + quantity, + limit_price, + tif, + post_only, + reduce_only, + client_order_id, + } => { + let payload = OrderSubmitPayload { + strategy_id, + venue_id: venue_id.clone(), + instrument: InstrumentRefPayload { + venue_id, + symbol, + asset_class, + market_type, + expiry_ts_ms: None, + strike: None, + option_type: None, + }, + side, + order_type, + quantity, + limit_price, + tif, + post_only, + reduce_only, + client_order_id, + }; + ( + OrderCommand::Submit.as_kind(), + serde_json::to_value(payload)?, + ) + } + Commands::OrderCancel { + venue_order_id, + venue_id, + } => ( + OrderCommand::Cancel.as_kind(), + serde_json::to_value(OrderCancelPayload { + venue_id, + venue_order_id, + })?, + ), + Commands::OrderList { venue_id } => ( + OrderCommand::List.as_kind(), + serde_json::json!({ "venue_id": venue_id }), + ), + Commands::ExecutionModeGet { + venue_id, + market_type, + } => ( + ExecutionModeCommand::Get.as_kind(), + serde_json::to_value(ExecutionModePayload { + venue_id, + market_type, + mode: String::new(), + })?, + ), + Commands::ExecutionModeSet { + venue_id, + market_type, + mode, + } => ( + ExecutionModeCommand::Set.as_kind(), + serde_json::to_value(ExecutionModePayload { + venue_id, + market_type, + mode, + })?, + ), Commands::Raw { json } => ("Control.Raw", serde_json::from_str(&json)?), }; diff --git a/trading-cli b/trading-cli index b5a10fd9..1585f988 100755 --- a/trading-cli +++ b/trading-cli @@ -25,6 +25,9 @@ KNOWN_COMMANDS=( clawdbot-trading-status clawdbot-trading-start clawdbot-trading-stop + clawdbot-trading-venue-list + clawdbot-trading-portfolio-balances + clawdbot-trading-portfolio-positions dev down status @@ -45,12 +48,14 @@ Commands: ./trading-cli temporal [up|down|status|logs|ui|list|describe|show] # temporal debugger/server orchestrator ./trading-cli observability [up|down|status|logs|ui] [service] # observability stack orchestrator ./trading-cli sports-agent [go|up|down|status|logs|approve|execute|cancel|hard-cancel|stop-service] [--mode ] [--dry-run] - ./trading-cli clawdbot-trading [up|down|ps|logs|ping|status|start|stop] [service] + ./trading-cli clawdbot-trading [up|down|ps|logs|ping|status|start|stop|venue-list|venue-status|venue-enable|venue-disable|portfolio-balances|portfolio-positions|order-list|order-cancel|order-submit|execution-mode-get|execution-mode-set] [args...] # OpenClaw + Rust trading bridge controls (up checks required service health) ./trading-cli clawdbot-trading-up|clawdbot-trading-down|clawdbot-trading-ps # convenience aliases for compose lifecycle ./trading-cli clawdbot-trading-ping|clawdbot-trading-status|clawdbot-trading-start|clawdbot-trading-stop # convenience aliases for tradingctl + ./trading-cli clawdbot-trading-venue-list|clawdbot-trading-portfolio-balances|clawdbot-trading-portfolio-positions + # convenience aliases for additive multi-asset controls # note: openclaw-cli is an on-demand helper container and may be Exited ./trading-cli dev # launch weather+arbitrage+llm-workflow (bg) ./trading-cli down # stop all background services @@ -93,6 +98,9 @@ Examples: ./trading-cli clawdbot-trading status ./trading-cli clawdbot-trading start ./trading-cli clawdbot-trading stop + ./trading-cli clawdbot-trading venue-list + ./trading-cli clawdbot-trading portfolio-balances + ./trading-cli clawdbot-trading execution-mode-get --venue-id kalshi --market-type binary ./trading-cli clawdbot-trading down ./trading-cli sports-agent up --mode hitl ./trading-cli sports-agent execute @@ -553,9 +561,12 @@ run_clawdbot_trading() { fi "$docker_bin" compose -f "$compose_file" --project-directory "$ROOT_DIR" exec trading-daemon tradingctl "$action" ;; + venue-list|venue-enable|venue-disable|venue-status|portfolio-balances|portfolio-positions|order-submit|order-cancel|order-list|execution-mode-get|execution-mode-set) + "$docker_bin" compose -f "$compose_file" --project-directory "$ROOT_DIR" exec trading-daemon tradingctl "$action" "$@" + ;; *) echo "Unknown clawdbot-trading action: $action" >&2 - echo "Use: ./trading-cli clawdbot-trading [up|down|ps|logs|ping|status|start|stop]" >&2 + echo "Use: ./trading-cli clawdbot-trading [up|down|ps|logs|ping|status|start|stop|venue-list|venue-status|venue-enable|venue-disable|portfolio-balances|portfolio-positions|order-list|order-cancel|order-submit|execution-mode-get|execution-mode-set]" >&2 exit 1 ;; esac @@ -1428,6 +1439,15 @@ main() { clawdbot-trading-stop) run_clawdbot_trading stop "$@" ;; + clawdbot-trading-venue-list) + run_clawdbot_trading venue-list "$@" + ;; + clawdbot-trading-portfolio-balances) + run_clawdbot_trading portfolio-balances "$@" + ;; + clawdbot-trading-portfolio-positions) + run_clawdbot_trading portfolio-positions "$@" + ;; dev) run_dev ;; From d7ed0dea5fea06ecc7b1d2afb4bc24ac838922de Mon Sep 17 00:00:00 2001 From: Jaden Fix Date: Mon, 16 Feb 2026 21:10:39 -0800 Subject: [PATCH 2/3] Fix default execution modes for non-paper venues --- crates/trading_daemon/src/main.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/crates/trading_daemon/src/main.rs b/crates/trading_daemon/src/main.rs index 7edcd661..f4b93e06 100644 --- a/crates/trading_daemon/src/main.rs +++ b/crates/trading_daemon/src/main.rs @@ -396,7 +396,7 @@ fn default_execution_modes(venues: &HashMap) -> HashMap Date: Mon, 16 Feb 2026 21:30:35 -0800 Subject: [PATCH 3/3] Align derivatives adapter mode wiring and harden bridge numeric validation --- .../extensions/trading-bridge/src/index.ts | 14 +++++--- crates/exchange_derivatives_paper/src/lib.rs | 34 +++++++++++++++++-- crates/trading_daemon/src/main.rs | 13 +++++-- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/.openclaw/extensions/trading-bridge/src/index.ts b/.openclaw/extensions/trading-bridge/src/index.ts index 97a56a70..1e42b6d4 100644 --- a/.openclaw/extensions/trading-bridge/src/index.ts +++ b/.openclaw/extensions/trading-bridge/src/index.ts @@ -341,8 +341,8 @@ function asString(value: unknown, field: string): string { } function asNumber(value: unknown, field: string): number { - if (typeof value !== "number" || Number.isNaN(value)) { - throw new Error(`Missing required field '${field}'`); + if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) { + throw new Error(`Missing or invalid positive numeric field '${field}'`); } return value; } @@ -1170,14 +1170,18 @@ export default function (api: any) { symbol: asString(input.symbol, "symbol"), asset_class: asString(input.asset_class, "asset_class"), market_type: asString(input.market_type, "market_type"), - expiry_ts_ms: typeof input.expiry_ts_ms === "number" ? input.expiry_ts_ms : null, - strike: typeof input.strike === "number" ? input.strike : null, + expiry_ts_ms: + typeof input.expiry_ts_ms === "number" && Number.isFinite(input.expiry_ts_ms) + ? input.expiry_ts_ms + : null, + strike: typeof input.strike === "number" && Number.isFinite(input.strike) ? input.strike : null, option_type: typeof input.option_type === "string" ? input.option_type : null, }, side: asString(input.side, "side"), order_type: asString(input.order_type, "order_type"), quantity: asNumber(input.quantity, "quantity"), - limit_price: typeof input.limit_price === "number" ? input.limit_price : null, + limit_price: + typeof input.limit_price === "number" && Number.isFinite(input.limit_price) ? input.limit_price : null, tif: typeof input.tif === "string" ? input.tif : null, post_only: typeof input.post_only === "boolean" ? input.post_only : false, reduce_only: typeof input.reduce_only === "boolean" ? input.reduce_only : false, diff --git a/crates/exchange_derivatives_paper/src/lib.rs b/crates/exchange_derivatives_paper/src/lib.rs index d03e11fc..b07b3759 100644 --- a/crates/exchange_derivatives_paper/src/lib.rs +++ b/crates/exchange_derivatives_paper/src/lib.rs @@ -19,18 +19,29 @@ struct AdapterState { #[derive(Debug)] pub struct DerivativesPaperAdapter { + mode: ExecutionMode, state: RwLock, } impl Default for DerivativesPaperAdapter { fn default() -> Self { + Self::new(ExecutionMode::Paper) + } +} + +impl DerivativesPaperAdapter { + pub fn new(mode: ExecutionMode) -> Self { + // This venue is intentionally paper-only even when runtime state is tampered. + let mode = match mode { + ExecutionMode::Paper => ExecutionMode::Paper, + ExecutionMode::Live => ExecutionMode::Paper, + }; Self { + mode, state: RwLock::new(AdapterState::default()), } } -} -impl DerivativesPaperAdapter { fn validate_order(req: &OrderRequest) -> Result<(), ExchangeError> { if !matches!( req.instrument.market_type, @@ -87,7 +98,7 @@ impl ExchangeAdapter for DerivativesPaperAdapter { } fn execution_mode(&self) -> ExecutionMode { - ExecutionMode::Paper + self.mode } async fn connect_market_data(&self) -> Result<(), ExchangeError> { @@ -206,3 +217,20 @@ pub fn default_perp_instrument(symbol: &str) -> InstrumentId { metadata: Default::default(), } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn adapter_clamps_live_mode_to_paper() { + let adapter = DerivativesPaperAdapter::new(ExecutionMode::Live); + assert_eq!(adapter.execution_mode(), ExecutionMode::Paper); + } + + #[test] + fn adapter_default_mode_is_paper() { + let adapter = DerivativesPaperAdapter::default(); + assert_eq!(adapter.execution_mode(), ExecutionMode::Paper); + } +} diff --git a/crates/trading_daemon/src/main.rs b/crates/trading_daemon/src/main.rs index df085d13..3dfeed20 100644 --- a/crates/trading_daemon/src/main.rs +++ b/crates/trading_daemon/src/main.rs @@ -416,6 +416,10 @@ fn default_adapters( .get(&mode_key("coinbase_spot", MarketType::Spot)) .copied() .unwrap_or(ExecutionMode::Paper); + let derivatives_mode = execution_modes + .get(&mode_key("derivatives_paper", MarketType::Perpetual)) + .copied() + .unwrap_or(ExecutionMode::Paper); [ ( @@ -428,7 +432,7 @@ fn default_adapters( ), ( "derivatives_paper".to_string(), - Arc::new(DerivativesPaperAdapter::default()) as Arc, + Arc::new(DerivativesPaperAdapter::new(derivatives_mode)) as Arc, ), ] .into_iter() @@ -1480,9 +1484,14 @@ fn refresh_adapter_for_venue(state: &mut EngineState, venue_id: &str) { ); } "derivatives_paper" => { + let mode = state + .execution_modes + .get(&mode_key("derivatives_paper", MarketType::Perpetual)) + .copied() + .unwrap_or(ExecutionMode::Paper); state.adapters.insert( "derivatives_paper".to_string(), - Arc::new(DerivativesPaperAdapter::default()), + Arc::new(DerivativesPaperAdapter::new(mode)), ); } _ => {}