Codex/multi asset autonomous#22
Conversation
Greptile SummaryThis PR implements multi-asset autonomous trading execution across spot, derivatives, and prediction markets. The architecture shifts from venue-specific adapters to a unified adapter-based routing system with paper and live execution modes. Major changes:
Critical security issue: Confidence Score: 2/5
|
| Filename | Overview |
|---|---|
| crates/coinbase_at_adapter/src/lib.rs | new Coinbase Advanced Trade adapter with critical security issue (credentials exposed via curl args) and error handling concerns |
| crates/paper_exchange_adapter/src/lib.rs | new paper trading simulator with deterministic pricing and balance tracking, fills orders instantly for testing |
| crates/exchange_core/src/lib.rs | multi-asset instrument definitions added (AssetClass, InstrumentType, OptionRight) to support derivatives and various asset classes |
| crates/risk_core/src/lib.rs | scoped kill switches added for venue and asset-class level risk controls, venue/asset class notional limits implemented |
| crates/trading_daemon/src/main.rs | major refactor to adapter-based routing (paper vs live), journal-based persistence, portfolio sync reconciliation loops |
| .openclaw/extensions/trading-bridge/src/index.ts | unified trading_hft tool with confidence gates for high-impact actions, multi-asset execution parameters |
Flowchart
flowchart TD
A[Trading Bridge] -->|trading_hft action| B[Trading Daemon]
B --> C{Engine Mode?}
C -->|paper| D[Paper Adapter]
C -->|hitl_live/auto_live| E{Coinbase Available?}
E -->|Yes| F[Coinbase AT Adapter]
E -->|No| G[Fallback to Paper]
F --> H[curl shell-out]
H --> I[Coinbase API]
D --> J[Simulated Fills]
B --> K[Risk Cage]
K --> L{Check Kill Switches}
L -->|Global| M[Block All]
L -->|Venue Scoped| N[Block Venue]
L -->|Strategy Scoped| O[Block Strategy]
L -->|Pass| P{Check Limits}
P -->|Notional| Q[Check Total/Venue/AssetClass]
P -->|Rate| R[Check Orders/Min]
P -->|Drawdown| S[Check Max DD]
Q --> T{Allow?}
R --> T
S --> T
T -->|Yes| U[Route to Adapter]
T -->|No| V[Reject Order]
B --> W[Portfolio Sync Loop]
W --> X[sync_positions/sync_balances]
X --> Y[Update State]
B --> Z[Journal Writer]
Z --> AA[Append-only Log]
Last reviewed commit: cd0a7f5
| fn run_curl_json( | ||
| &self, | ||
| method: &str, | ||
| path: &str, | ||
| body: Option<&Value>, | ||
| ) -> Result<Value, ExchangeError> { | ||
| let mut command = Command::new("curl"); | ||
| command.arg("-sS").arg("-X").arg(method); | ||
|
|
||
| command.arg("-H").arg("content-type: application/json"); | ||
| command | ||
| .arg("-H") | ||
| .arg(format!("CB-ACCESS-KEY: {}", self.api_key)); | ||
| command | ||
| .arg("-H") | ||
| .arg(format!("CB-ACCESS-SECRET: {}", self.api_secret)); | ||
|
|
||
| if let Some(passphrase) = &self.api_passphrase { | ||
| command | ||
| .arg("-H") | ||
| .arg(format!("CB-ACCESS-PASSPHRASE: {}", passphrase)); | ||
| } | ||
|
|
||
| if let Some(token) = &self.bearer_token { | ||
| command | ||
| .arg("-H") | ||
| .arg(format!("Authorization: Bearer {}", token)); | ||
| } | ||
|
|
||
| if let Some(body) = body { | ||
| let encoded = serde_json::to_string(body).map_err(|e| { | ||
| ExchangeError::new("json_encode", format!("failed to encode body: {e}"), false) | ||
| })?; | ||
| command.arg("--data").arg(encoded); | ||
| } | ||
|
|
||
| command.arg(format!("{}{}", self.api_base, path)); | ||
|
|
||
| let output = command.output().map_err(|e| { |
There was a problem hiding this comment.
passing API credentials via command-line arguments to curl exposes secrets in process list
| fn run_curl_json( | |
| &self, | |
| method: &str, | |
| path: &str, | |
| body: Option<&Value>, | |
| ) -> Result<Value, ExchangeError> { | |
| let mut command = Command::new("curl"); | |
| command.arg("-sS").arg("-X").arg(method); | |
| command.arg("-H").arg("content-type: application/json"); | |
| command | |
| .arg("-H") | |
| .arg(format!("CB-ACCESS-KEY: {}", self.api_key)); | |
| command | |
| .arg("-H") | |
| .arg(format!("CB-ACCESS-SECRET: {}", self.api_secret)); | |
| if let Some(passphrase) = &self.api_passphrase { | |
| command | |
| .arg("-H") | |
| .arg(format!("CB-ACCESS-PASSPHRASE: {}", passphrase)); | |
| } | |
| if let Some(token) = &self.bearer_token { | |
| command | |
| .arg("-H") | |
| .arg(format!("Authorization: Bearer {}", token)); | |
| } | |
| if let Some(body) = body { | |
| let encoded = serde_json::to_string(body).map_err(|e| { | |
| ExchangeError::new("json_encode", format!("failed to encode body: {e}"), false) | |
| })?; | |
| command.arg("--data").arg(encoded); | |
| } | |
| command.arg(format!("{}{}", self.api_base, path)); | |
| let output = command.output().map_err(|e| { | |
| fn run_curl_json( | |
| &self, | |
| method: &str, | |
| path: &str, | |
| body: Option<&Value>, | |
| ) -> Result<Value, ExchangeError> { | |
| // Use a proper HTTP client library (reqwest, hyper) instead of shelling out to curl | |
| // This avoids exposing credentials in process arguments |
| let venue_order_id = Self::extract_order_id(&response) | ||
| .unwrap_or_else(|| format!("cbat-{}", Uuid::new_v4().as_simple())); |
There was a problem hiding this comment.
order ID fallback generates UUID without checking actual response structure, which could mask API errors
|
@greptile |
Greptile SummaryThis PR implements multi-asset autonomous trading execution by generalizing the exchange abstraction layer and adding live Coinbase Advanced Trade integration alongside a sophisticated paper trading adapter. Key Changes
Issues Found
Architecture Improvements
Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| crates/coinbase_at_adapter/src/lib.rs | new Coinbase Advanced Trade adapter with bearer token auth, proper HTTP request handling, and order ID extraction from multiple response formats |
| crates/paper_exchange_adapter/src/lib.rs | new paper trading adapter with deterministic pricing and simulated fills, but has logic issue in position average price calculation for sells |
| crates/trading_daemon/src/main.rs | major refactor of daemon with multi-asset support, journal recovery, and portfolio sync loops, but risk evaluation missing venue/asset_class context |
| crates/exchange_core/src/lib.rs | complete redesign of exchange abstraction with multi-asset support (crypto, equity, prediction markets) and Pin<Box> trait surface |
| crates/risk_core/src/lib.rs | added scoped kill switches by venue and strategy, plus venue/asset-class notional limits to hard safety cage |
| crates/trading_protocol/src/lib.rs | extended protocol with multi-asset types, engine modes (paper/hitl_live/auto_live), and portfolio/execution commands |
Flowchart
flowchart TD
A[Trading Daemon] --> B{Engine Mode}
B -->|auto_live| C[Route by Asset Class]
B -->|hitl_live| C
B -->|paper| D[Paper Adapter]
C -->|Crypto Spot| E[Coinbase AT Adapter]
C -->|Prediction Markets| D
C -->|Perpetuals/Options| D
E --> F[Live Coinbase API]
D --> G[Simulated Execution]
H[Risk Cage] --> I{Evaluate Order}
I -->|Check Venue Limit| J[Venue Notional]
I -->|Check Asset Limit| K[Asset Class Notional]
I -->|Check Strategy Limit| L[Canary Notional]
I -->|Check Global Limit| M[Total Notional]
I -->|Check Kill Switches| N[Scoped Kills]
J --> O{Allow/Deny}
K --> O
L --> O
M --> O
N --> O
O -->|Allow| C
O -->|Deny| P[Reject Order]
Last reviewed commit: ff73941
| let old_qty = position.qty.max(0.0); | ||
| let new_qty = old_qty + req.qty; | ||
| position.avg_price = if new_qty > 0.0 { | ||
| ((position.avg_price * old_qty) + (fill_price * req.qty)) / new_qty | ||
| } else { | ||
| fill_price | ||
| }; | ||
| position.qty += req.qty; | ||
| } else { | ||
| position.qty -= req.qty; | ||
| } |
There was a problem hiding this comment.
average price calculation only considers buy side
when selling, position.avg_price is not updated, which means the weighted average price only tracks longs. For accurate PnL tracking, should track separate long/short basis or adjust avg_price on both sides
Additional Comments (1)
|
|
@greptile |
Greptile SummaryThis PR implements a multi-asset autonomous trading execution stack with significant architectural changes: Major Changes:
Key Issues:
Positive Changes:
Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| crates/trading_daemon/src/main.rs | Major refactor from SQLite to journal-based state with multi-asset routing, adds paper/live mode switching, scoped kill switches, and portfolio reconcilers |
| crates/coinbase_at_adapter/src/lib.rs | New Coinbase Advanced Trade adapter with bearer token auth, implements full ExchangeAdapter trait for live spot trading |
| crates/paper_exchange_adapter/src/lib.rs | New paper trading adapter with fixed position basis tracking that correctly handles both long and short sides |
| crates/exchange_core/src/lib.rs | Extended venue abstraction with AssetClass enum, comprehensive instrument metadata, and normalized portfolio snapshots for multi-asset trading |
| crates/risk_core/src/lib.rs | Refactored risk evaluation with venue and asset-class scoped limits, scoped kill switches replacing position/leverage checks |
| crates/trading_protocol/src/lib.rs | Protocol v3: replaces Venue/Order/ExecutionMode commands with unified Execution/Portfolio commands, adds EngineMode and scoped kill switch support |
| .openclaw/extensions/trading-bridge/src/index.ts | Extension updated with new execution actions, portfolio queries, scoped risk overrides, and HITL mode approval token validation |
Flowchart
flowchart TD
A[OpenClaw Extension<br/>trading_hft tool] -->|UDS Socket| B[Trading Daemon<br/>Main Handler]
B -->|Parse Request| C{Request Type?}
C -->|Engine Commands| D[Engine Controller<br/>Start/Stop/Pause/Mode]
C -->|Risk Commands| E[Risk Controller<br/>Kill Switches/Overrides]
C -->|Strategy Commands| F[Strategy Manager<br/>Upload/Promote]
C -->|Execution.Place| G[Execution Handler]
C -->|Portfolio Queries| H[Portfolio Handler]
G -->|Evaluate| I[HardSafetyCage<br/>Risk Evaluation]
I -->|Check Scoped Kills| J{Pass Risk?}
J -->|Deny| K[Reject Order]
J -->|Allow| L{Route Decision}
L -->|mode=paper OR<br/>venue!=coinbase_at| M[PaperExchangeAdapter]
L -->|mode!=paper AND<br/>venue=coinbase_at AND<br/>credentials present| N[CoinbaseATAdapter]
M -->|Deterministic Fill| O[Update State]
N -->|Live HTTP API| O
O -->|Journal Write| P[(Journal Files:<br/>orders.jsonl<br/>fills.jsonl)]
O -->|Snapshot Write| Q[(State File:<br/>engine-state.json)]
R[Background Reconciler] -->|Every 15s| N
R -->|Sync Portfolio| O
S[Startup Recovery] -->|Read Journals| P
S -->|Load State| Q
Last reviewed commit: 0065222
| fn write_journal_entry(data_dir: &str, stream: &str, entry: &serde_json::Value) { | ||
| let path = journal_path(data_dir, stream); | ||
| if let Some(parent) = path.parent() { | ||
| if let Err(err) = std::fs::create_dir_all(parent) { | ||
| warn!("failed to create journal dir {}: {}", parent.display(), err); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| let mut file = match OpenOptions::new().create(true).append(true).open(&path) { | ||
| Ok(file) => file, | ||
| Err(err) => { | ||
| warn!("failed to open journal {}: {}", path.display(), err); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| let line = match serde_json::to_string(entry) { | ||
| Ok(line) => line, | ||
| Err(err) => { | ||
| warn!("failed to serialize journal entry: {}", err); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| if let Err(err) = writeln!(file, "{}", line) { | ||
| warn!("failed writing journal {}: {}", path.display(), err); | ||
| } | ||
| } |
There was a problem hiding this comment.
journal writes are synchronous blocking I/O on every order placement. Consider using a buffered writer or async I/O to avoid blocking the main event loop during high-frequency trading
| fn journal_path(data_dir: &str, stream: &str) -> PathBuf { | ||
| let date_key = chrono::Utc::now().format("%Y-%m-%d").to_string(); | ||
| Path::new(data_dir) | ||
| .join("journal") | ||
| .join(format!("{}-{}.jsonl", stream, date_key)) | ||
| } |
There was a problem hiding this comment.
no journal rotation or cleanup implemented. Daily journal files will accumulate indefinitely, potentially filling disk space in long-running deployments
| async fn recover_from_todays_journals(state: &Arc<Mutex<EngineState>>) { | ||
| let (data_dir, mut orders, mut fills, mut processed) = { | ||
| let state = state.lock().await; | ||
| ( | ||
| state.data_dir.clone(), | ||
| HashMap::<String, OrderSnapshot>::new(), | ||
| Vec::<FillReport>::new(), | ||
| HashSet::<String>::new(), | ||
| ) | ||
| }; | ||
|
|
||
| let order_path = journal_path(&data_dir, "orders"); | ||
| if let Ok(file) = File::open(&order_path) { | ||
| let reader = BufReader::new(file); | ||
| for line in reader.lines().map_while(Result::ok) { | ||
| let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) else { | ||
| continue; | ||
| }; | ||
|
|
||
| if let Some(intent_id) = value.get("intent_id").and_then(serde_json::Value::as_str) { | ||
| processed.insert(intent_id.to_string()); | ||
| } | ||
|
|
||
| if let Some(order_value) = value.get("order") { | ||
| if let Ok(order) = serde_json::from_value::<OrderSnapshot>(order_value.clone()) { | ||
| orders.insert(order.venue_order_id.clone(), order); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let fill_path = journal_path(&data_dir, "fills"); | ||
| if let Ok(file) = File::open(&fill_path) { | ||
| let reader = BufReader::new(file); | ||
| let mut fill_ids = HashSet::new(); | ||
| for line in reader.lines().map_while(Result::ok) { | ||
| let Ok(value) = serde_json::from_str::<serde_json::Value>(&line) else { | ||
| continue; | ||
| }; | ||
|
|
||
| if let Some(fill_value) = value.get("fill") { | ||
| if let Ok(fill) = serde_json::from_value::<FillReport>(fill_value.clone()) { | ||
| if fill_ids.insert(fill.venue_fill_id.clone()) { | ||
| fills.push(fill); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if !orders.is_empty() || !fills.is_empty() || !processed.is_empty() { | ||
| let mut state = state.lock().await; | ||
| for (k, v) in orders { | ||
| state.orders.insert(k, v); | ||
| } | ||
| state.fills.extend(fills); | ||
| state.processed_intents.extend(processed); | ||
| info!( | ||
| "Recovered state from journal: {} orders, {} fills", | ||
| state.orders.len(), | ||
| state.fills.len() | ||
| ); | ||
| } | ||
| } |
There was a problem hiding this comment.
recovery only reads today's journal. Orders/fills from previous days are lost on restart unless persisted in snapshot, but snapshot doesn't include full order/fill history
| state.risk_snapshot.total_notional_cents = state | ||
| .risk_snapshot | ||
| .total_notional_cents | ||
| .saturating_add(requested_notional_cents); | ||
| state.risk_snapshot.orders_last_minute = | ||
| state.risk_snapshot.orders_last_minute.saturating_add(1); | ||
| *state | ||
| .risk_snapshot | ||
| .strategy_canary_notional | ||
| .entry(order.strategy_id.clone()) | ||
| .or_insert(0) += requested_notional_cents; | ||
| *state | ||
| .risk_snapshot | ||
| .venue_notional | ||
| .entry(order.venue.clone()) | ||
| .or_insert(0) += requested_notional_cents; | ||
| *state | ||
| .risk_snapshot | ||
| .asset_class_notional | ||
| .entry(order.instrument.asset_class.clone()) | ||
| .or_insert(0) += requested_notional_cents; |
There was a problem hiding this comment.
risk notional tracking only increments on order placement. Filled or canceled orders never decrement notional, causing risk limits to tighten over time until restart
| state.risk_snapshot.orders_last_minute = | ||
| state.risk_snapshot.orders_last_minute.saturating_add(1); |
There was a problem hiding this comment.
orders_last_minute increments on every order but never auto-resets. Risk policy check will eventually block all orders after 120 placements until manual clear_runtime_counters action
- Paper adapter: sell orders now update avg_price (weighted avg) - Risk counters: decrement notional on order cancellation - Rate limiter: auto-reset orders_last_minute after 60s - Journal: offload writes to spawn_blocking (non-blocking I/O) - Journal: cleanup files older than 7 days on startup - Journal: recover from all journal files, not just today's - Tests: add sell_order_updates_avg_price test
No description provided.