diff --git a/.gitignore b/.gitignore index 302d322..5726310 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ on_chain_bytecode.txt .env tests/hardhat/package-lock.json .dv_config.json +/.claude diff --git a/docs/config.md b/docs/config.md index 7a3fcfe..a40e492 100644 --- a/docs/config.md +++ b/docs/config.md @@ -23,7 +23,7 @@ When running the `dvf` command, the default configuration file is expected at `$ | - - `api_url` | Chain-specific Blockscout API URL | | - - `api_key` | Chain-specific Blockscout API Key | | `max_blocks_per_event_query` | Number of blocks that can be queried at once in `getLogs`, optional, defaults to 9999 | -| `web3_timeout` | Timeout in seconds for web3 RPC queries, optional, defaults to 5000 | +| `web3_timeout` | Timeout in milliseconds for web3 RPC queries, optional, defaults to 5000 | | `signer` | Configuration on how to sign, optional | | - `wallet_address` | Address which is used to sign | | - `wallet_type` | Can have different structure | diff --git a/lib/dvf/discovery.rs b/lib/dvf/discovery.rs index b028fb9..7261b61 100644 --- a/lib/dvf/discovery.rs +++ b/lib/dvf/discovery.rs @@ -15,14 +15,13 @@ use crate::dvf::config::DVFConfig; use crate::dvf::parse::{self, ValidationError}; use crate::dvf::registry; use crate::state::contract_state::ContractState; +use crate::state::contract_state::MappingUsages; use crate::state::forge_inspect; use crate::utils::pretty::PrettyPrinter; use crate::utils::progress::{print_progress, ProgressMode}; use crate::utils::read_write_file::get_project_paths; use crate::web3; use crate::web3::stop_anvil_instance; -use crate::web3::TraceWithAddress; -use alloy_node_bindings::AnvilInstance; pub struct DiscoveryParams<'a> { pub config: &'a DVFConfig, @@ -46,9 +45,8 @@ pub struct DiscoveryParams<'a> { pub progress_mode: &'a ProgressMode, pub use_storage_range: bool, pub tx_hashes: Option>, - // Optional cache (used by inspect-tx): reuse an already computed trace and config - pub cached_traces: Option>, - pub cached_anvil_config: Option<&'a DVFConfig>, + // Optional cache (used by inspect-tx): reuse pre-computed mapping usages + pub cached_mapping_usages: Option, } pub struct DiscoveryResult { @@ -211,47 +209,48 @@ pub fn discover_storage_and_events( )?; print_progress("Getting relevant traces.", params.pc, params.progress_mode); - let mut seen_transactions = HashSet::new(); let mut missing_traces = false; - for (index, tx_hash) in tx_hashes.iter().enumerate() { - if seen_transactions.contains(tx_hash) { - continue; - } - seen_transactions.insert(tx_hash); - - info!("Getting trace for {}", tx_hash); - // Use cached trace if provided (inspect-tx), otherwise fetch - let fetched = if let Some(ref cached) = params.cached_traces { - debug!("Using cached trace at index {} of {}", index, cached.len()); - Ok(( - cached[index].clone(), - None::, - None::, - )) - } else { - web3::get_eth_debug_trace_sim(params.config, tx_hash) - }; - match fetched { - Ok((trace, anvil_config, anvil_instance)) => { - let record_traces_config: &DVFConfig = if params.cached_traces.is_some() { - params.cached_anvil_config.unwrap_or(params.config) - } else { - anvil_config.as_ref().unwrap_or(params.config) - }; - if let Err(err) = contract_state.record_traces(record_traces_config, vec![trace]) { + if let Some(cached_usages) = params.cached_mapping_usages { + // Inject pre-computed mapping usages directly (from inspect-tx) + debug!("Using cached mapping usages"); + contract_state.inject_mapping_usages(cached_usages); + } else { + // Stream traces to collect mapping usages + let mut seen_transactions = HashSet::new(); + for tx_hash in tx_hashes.iter() { + if seen_transactions.contains(tx_hash) { + continue; + } + seen_transactions.insert(tx_hash); + + info!("Getting trace for {}", tx_hash); + let trace_address = match web3::get_receipt_address(params.config, tx_hash) { + Ok(addr) => addr, + Err(err) => { missing_traces = true; info!("Warning. The trace for {tx_hash} cannot be obtained. Some mapping slots might not be decodable. You can try to increase the timeout in the config. Error: {}", err); + continue; } - if params.cached_traces.is_none() { + }; + let mut processor = crate::state::contract_state::MappingUsageProcessor::new( + *params.address, + trace_address, + params.config, + tx_hash.clone(), + true, + ); + match web3::stream_eth_debug_trace_sim(params.config, tx_hash, &mut processor) { + Ok((_metadata, _address, _anvil_config, anvil_instance)) => { + contract_state.inject_mapping_usages(processor.mapping_usages); if let Some(anvil_instance) = anvil_instance { stop_anvil_instance(anvil_instance); } } - } - Err(err) => { - missing_traces = true; - info!("Warning. The trace for {tx_hash} cannot be obtained. Some mapping slots might not be decodable. You can try to increase the timeout in the config. Error: {}", err); + Err(err) => { + missing_traces = true; + info!("Warning. The trace for {tx_hash} cannot be obtained. Some mapping slots might not be decodable. You can try to increase the timeout in the config. Error: {}", err); + } } } } @@ -546,8 +545,7 @@ pub fn create_discovery_params_for_init<'a>( progress_mode, use_storage_range: true, tx_hashes: None, - cached_traces: None, - cached_anvil_config: None, + cached_mapping_usages: None, } } @@ -593,7 +591,6 @@ pub fn create_discovery_params_for_update<'a>( progress_mode, use_storage_range: false, // cannot use storage range here as we are only trying to get a subset of the state tx_hashes: None, - cached_traces: None, - cached_anvil_config: None, + cached_mapping_usages: None, } } diff --git a/lib/state/contract_state.rs b/lib/state/contract_state.rs index 781c57f..723d153 100644 --- a/lib/state/contract_state.rs +++ b/lib/state/contract_state.rs @@ -17,7 +17,9 @@ use crate::state::forge_inspect::{ ForgeInspectIrOptimized, ForgeInspectLayoutStorage, StateVariable, TypeDescription, }; use crate::utils::pretty::PrettyPrinter; -use crate::web3::{get_internal_create_addresses, StorageSnapshot, TraceWithAddress}; +use crate::web3::{ + get_internal_create_addresses, StorageSnapshot, StructLogProcessor, TraceWithAddress, +}; fn hash_u256(u: &U256) -> B256 { keccak256(u.to_be_bytes::<32>()) @@ -388,6 +390,16 @@ impl<'a> ContractState<'a> { Ok(()) } + /// Inject pre-computed mapping usages directly, skipping trace processing. + pub fn inject_mapping_usages(&mut self, usages: HashMap>) { + for (index, entries) in usages { + self.mapping_usages + .entry(index) + .or_default() + .extend(entries); + } + } + fn add_to_table(storage_entry: &parse::DVFStorageEntry, table: &mut Table) { PrettyPrinter::add_formatted_to_table( &storage_entry.var_name, @@ -931,3 +943,225 @@ impl<'a> ContractState<'a> { var_type.starts_with("t_userDefinedValueType") } } + +/// Type alias for mapping usages: storage index -> set of (key, derived_slot). +pub type MappingUsages = HashMap>; + +/// Helper: extract mapping key from memory at a SHA3/KECCAK256 opcode. +/// Returns `Some((key_hex, storage_index))` if the input looks like a mapping hash. +fn extract_mapping_key_from_sha3(stack: &[U256], memory: &[String]) -> Option<(String, U256)> { + let length_in_bytes = stack[stack.len() - 2]; + if length_in_bytes < U256::from(32_u64) || length_in_bytes >= U256::from(usize::MAX / 2) { + return None; + } + let mem_str: String = memory.iter().cloned().collect(); + let start_idx = stack[stack.len() - 1].to::() * 2; + let length = length_in_bytes.to::() * 2; + let sha3_input = format!("0x{}", &mem_str[start_idx..(start_idx + length)]); + + let usize_str_length = length_in_bytes.to::() * 2 + 2; + assert!(sha3_input.len() == usize_str_length); + let key = sha3_input[2..usize_str_length - 64].to_string(); + let index = U256::from_str_radix(&sha3_input[usize_str_length - 64..], 16).ok()?; + Some((key, index)) +} + +/// Processor that collects mapping usages for a single address from trace logs. +/// Extracted from `ContractState::record_traces()`. +pub struct MappingUsageProcessor<'a> { + address: Address, + config: &'a DVFConfig, + tx_id: String, + is_first_trace: bool, + depth_to_address: HashMap, + create_addresses: Option>, + key: Option, + index: U256, + pub mapping_usages: MappingUsages, + failed: bool, +} + +impl<'a> MappingUsageProcessor<'a> { + pub fn new( + address: Address, + trace_address: Address, + config: &'a DVFConfig, + tx_id: String, + is_first_trace: bool, + ) -> Self { + let mut depth_to_address: HashMap = HashMap::new(); + depth_to_address.insert(1, trace_address); + MappingUsageProcessor { + address, + config, + tx_id, + is_first_trace, + depth_to_address, + create_addresses: None, + key: None, + index: U256::from(1), + mapping_usages: HashMap::new(), + failed: false, + } + } +} + +impl<'a> StructLogProcessor for MappingUsageProcessor<'a> { + fn process_log( + &mut self, + log: alloy_rpc_types_trace::geth::StructLog, + ) -> Result<(), crate::dvf::parse::ValidationError> { + if log.stack.is_none() { + return Ok(()); + } + let stack = log.stack.unwrap(); + + if log.op == "CREATE" || log.op == "CREATE2" { + if self.is_first_trace { + if self.create_addresses.is_none() { + self.create_addresses = + Some(get_internal_create_addresses(self.config, &self.tx_id)?); + } + if let Some(ref mut create_ref) = self.create_addresses { + self.depth_to_address + .insert(log.depth + 1, create_ref.remove(0)); + } + } else { + self.depth_to_address + .insert(log.depth + 1, Address::from([0; 20])); + } + } + + if log.op == "CALL" || log.op == "STATICCALL" { + let address_bytes = stack[stack.len() - 2].to_be_bytes::<32>(); + let a = Address::from_slice(&address_bytes[12..]); + self.depth_to_address.insert(log.depth + 1, a); + } + + if log.op == "DELEGATECALL" || log.op == "CALLCODE" { + self.depth_to_address + .insert(log.depth + 1, self.depth_to_address[&log.depth]); + } + + if self.depth_to_address[&log.depth] == self.address { + if let Some(key_in) = self.key.take() { + let target_slot = &stack[stack.len() - 1]; + self.mapping_usages + .entry(self.index) + .or_default() + .insert((key_in, *target_slot)); + } + + if log.op == "KECCAK256" || log.op == "SHA3" { + if let Some(memory) = &log.memory { + if let Some((key, index)) = extract_mapping_key_from_sha3(&stack, memory) { + debug!("Found key {} for index {}.", key, index); + self.key = Some(key); + self.index = index; + } + } + } + } + + Ok(()) + } + + fn trace_failed(&mut self) { + self.failed = true; + self.mapping_usages.clear(); + } +} + +/// Processor that collects mapping usages for ALL addresses encountered in a trace. +/// Used by `inspect-tx` to process the trace once for all contracts. +pub struct MultiAddressMappingProcessor<'a> { + config: &'a DVFConfig, + tx_id: String, + depth_to_address: HashMap, + create_addresses: Option>, + key: Option, + index: U256, + /// Per-address mapping usages. + pub all_mapping_usages: HashMap, + failed: bool, +} + +impl<'a> MultiAddressMappingProcessor<'a> { + pub fn new(trace_address: Address, config: &'a DVFConfig, tx_id: String) -> Self { + let mut depth_to_address: HashMap = HashMap::new(); + depth_to_address.insert(1, trace_address); + MultiAddressMappingProcessor { + config, + tx_id, + depth_to_address, + create_addresses: None, + key: None, + index: U256::from(1), + all_mapping_usages: HashMap::new(), + failed: false, + } + } +} + +impl<'a> StructLogProcessor for MultiAddressMappingProcessor<'a> { + fn process_log( + &mut self, + log: alloy_rpc_types_trace::geth::StructLog, + ) -> Result<(), crate::dvf::parse::ValidationError> { + if log.stack.is_none() { + return Ok(()); + } + let stack = log.stack.unwrap(); + + if log.op == "CREATE" || log.op == "CREATE2" { + if self.create_addresses.is_none() { + self.create_addresses = + Some(get_internal_create_addresses(self.config, &self.tx_id)?); + } + if let Some(ref mut create_ref) = self.create_addresses { + self.depth_to_address + .insert(log.depth + 1, create_ref.remove(0)); + } + } + + if log.op == "CALL" || log.op == "STATICCALL" { + let address_bytes = stack[stack.len() - 2].to_be_bytes::<32>(); + let a = Address::from_slice(&address_bytes[12..]); + self.depth_to_address.insert(log.depth + 1, a); + } + + if log.op == "DELEGATECALL" || log.op == "CALLCODE" { + self.depth_to_address + .insert(log.depth + 1, self.depth_to_address[&log.depth]); + } + + let current_address = self.depth_to_address[&log.depth]; + + if let Some(key_in) = self.key.take() { + let target_slot = &stack[stack.len() - 1]; + self.all_mapping_usages + .entry(current_address) + .or_default() + .entry(self.index) + .or_default() + .insert((key_in, *target_slot)); + } + + if log.op == "KECCAK256" || log.op == "SHA3" { + if let Some(memory) = &log.memory { + if let Some((key, index)) = extract_mapping_key_from_sha3(&stack, memory) { + debug!("Found key {} for index {}.", key, index); + self.key = Some(key); + self.index = index; + } + } + } + + Ok(()) + } + + fn trace_failed(&mut self) { + self.failed = true; + self.all_mapping_usages.clear(); + } +} diff --git a/lib/utils/progress.rs b/lib/utils/progress.rs index d8f81b9..584cd2c 100644 --- a/lib/utils/progress.rs +++ b/lib/utils/progress.rs @@ -23,7 +23,7 @@ pub fn print_progress(s: &str, i: &mut u64, pm: &ProgressMode) { ProgressMode::BytecodeCheck => 3, ProgressMode::GenerateBuildCache => 1, ProgressMode::ListEvents => 1, - ProgressMode::InspectTx => 10, + ProgressMode::InspectTx => 3, ProgressMode::InspectTxSub => 10, ProgressMode::InspectTxSubNoconf => 4, }; diff --git a/lib/web3.rs b/lib/web3.rs index 848c6b6..9d5bd55 100644 --- a/lib/web3.rs +++ b/lib/web3.rs @@ -98,6 +98,138 @@ pub struct TraceWithAddress { pub tx_id: String, } +/// Metadata from a debug trace (everything except structLogs). +#[derive(Debug, Clone)] +pub struct TraceMetadata { + pub failed: bool, + pub gas: u64, + pub return_value: Bytes, +} + +/// Trait for processing StructLog entries one at a time during streaming deserialization. +pub trait StructLogProcessor { + fn process_log(&mut self, log: StructLog) -> Result<(), ValidationError>; + /// Called after all logs have been processed if the trace's `failed` flag is true. + /// Allows the processor to discard results from a failed trace. + fn trace_failed(&mut self); +} + +/// Processor that builds a storage snapshot from trace logs. +/// Extracted from `StorageSnapshot::add_trace()`. +pub struct StorageSnapshotProcessor<'a> { + snapshot: &'a mut HashMap, + address: Address, + config: &'a DVFConfig, + tx_id: String, + depth_to_address: HashMap, + last_storage: HashMap>, + last_depth: u64, + create_addresses: Option>, + failed: bool, +} + +impl<'a> StorageSnapshotProcessor<'a> { + pub fn new( + snapshot: &'a mut HashMap, + address: Address, + tx_to_address: Address, + config: &'a DVFConfig, + tx_id: String, + ) -> Self { + let mut depth_to_address: HashMap = HashMap::new(); + depth_to_address.insert(1, tx_to_address); + StorageSnapshotProcessor { + snapshot, + address, + config, + tx_id, + depth_to_address, + last_storage: HashMap::new(), + last_depth: 1, + create_addresses: None, + failed: false, + } + } + + /// Finalize: commit depth-1 storage (we know depth 1 succeeded if trace didn't fail). + pub fn finalize(self) -> Result<(), ValidationError> { + if self.failed { + return Ok(()); + } + commit_storage_to_snapshot(&self.last_storage, 0u64, self.snapshot); + // Check that we used all addresses + if let Some(addrs) = self.create_addresses { + assert_eq!(addrs.len(), 0); + } + Ok(()) + } +} + +impl<'a> StructLogProcessor for StorageSnapshotProcessor<'a> { + fn process_log(&mut self, log: StructLog) -> Result<(), ValidationError> { + if log.stack.is_none() { + return Ok(()); + } + let stack = log.stack.unwrap(); + + if log.op == "CREATE" || log.op == "CREATE2" { + if self.create_addresses.is_none() { + self.create_addresses = + Some(get_internal_create_addresses(self.config, &self.tx_id)?); + } + if let Some(ref mut create_ref) = self.create_addresses { + self.depth_to_address + .insert(log.depth + 1, create_ref.remove(0)); + } + } + + if log.op == "CALL" || log.op == "STATICCALL" { + let address_bytes = stack[stack.len() - 2].to_be_bytes::<32>(); + let a = Address::from_slice(&address_bytes[12..]); + self.depth_to_address.insert(log.depth + 1, a); + } + + if log.op == "DELEGATECALL" || log.op == "CALLCODE" { + self.depth_to_address + .insert(log.depth + 1, self.depth_to_address[&log.depth]); + } + + if self.depth_to_address[&log.depth] == self.address && log.op == "SSTORE" { + let last_store = self.last_storage.entry(log.depth).or_default(); + let value = stack[stack.len() - 2]; + let slot = stack[stack.len() - 1]; + last_store.insert(slot, value); + } + + if log.op == "STOP" || log.op == "RETURN" { + // Propagate committed storage to parent depth instead of writing + // directly to the snapshot. This ensures that if a parent depth + // later REVERTs, the child's storage changes are correctly rolled + // back (the parent's storage, including propagated values, is + // discarded on depth-decrease cleanup). It also handles reentrancy + // naturally: child values override parent values for the same slots. + if let Some(committed) = self.last_storage.remove(&log.depth) { + let parent = self.last_storage.entry(log.depth - 1).or_default(); + for (slot, value) in committed { + parent.insert(slot, value); + } + } + } + if log.depth < self.last_depth { + for depth in (log.depth + 1)..=self.last_depth { + self.last_storage.remove(&depth); + } + } + self.last_depth = log.depth; + + Ok(()) + } + + fn trace_failed(&mut self) { + self.failed = true; + } +} + pub fn get_block_traces( config: &DVFConfig, block_num: u64, @@ -296,6 +428,34 @@ fn create_trace_with_address( } } +/// Get the target address for a transaction from its receipt. +/// Extracted from `create_trace_with_address()` for reuse with streaming. +pub fn get_receipt_address(config: &DVFConfig, tx_id: &str) -> Result { + let request_body = json!({ + "jsonrpc": "2.0", + "method": "eth_getTransactionReceipt", + "params": [tx_id], + "id": 1 + }); + let result = send_blocking_web3_post(config, &request_body)?; + let receipt: TransactionReceipt = serde_json::from_value(result)?; + + if let Some(address) = receipt.to { + Ok(address) + } else if let Some(address) = receipt.contract_address { + Ok(address) + } else { + println!( + "[DEBUG] get_receipt_address: No address found in receipt: {:?}", + receipt + ); + Err(ValidationError::from(format!( + "Found no address for tx {}", + tx_id + ))) + } +} + pub fn get_eth_debug_trace( config: &DVFConfig, tx_id: &str, @@ -476,6 +636,423 @@ pub fn get_eth_debug_trace_sim( } } +/// Module for streaming deserialization of debug_traceTransaction responses. +/// Instead of materializing the entire `structLogs` array, we deserialize and process +/// one `StructLog` at a time, keeping memory usage bounded. +mod streaming_trace { + use super::*; + use serde::de::{self, DeserializeSeed, MapAccess, SeqAccess, Visitor}; + use std::fmt; + + /// Seed that drives streaming: navigates the JSON-RPC envelope and processes structLogs + /// one element at a time via the provided `StructLogProcessor`. + struct ResultFieldSeed<'a, P> { + processor: &'a mut P, + metadata: &'a mut TraceMetadata, + } + + /// Visitor for the `result` object inside JSON-RPC response. + /// Handles fields: `failed`, `gas`, `returnValue`, `structLogs`. + struct ResultObjectVisitor<'a, P> { + processor: &'a mut P, + metadata: &'a mut TraceMetadata, + } + + impl<'de, P: StructLogProcessor> Visitor<'de> for ResultObjectVisitor<'_, P> { + type Value = (); + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a trace result object with structLogs") + } + + fn visit_map>(self, mut map: A) -> Result<(), A::Error> { + let mut got_struct_logs = false; + while let Some(key) = map.next_key::()? { + match key.as_str() { + "failed" => { + self.metadata.failed = map.next_value()?; + } + "gas" => { + // Handle both integer and hex string (pathological RPC) + let value: serde_json::Value = map.next_value()?; + self.metadata.gas = match value { + serde_json::Value::Number(n) => n + .as_u64() + .ok_or_else(|| de::Error::custom("Invalid gas number"))?, + serde_json::Value::String(s) => { + u64::from_str_radix(s.trim_start_matches("0x"), 16) + .map_err(de::Error::custom)? + } + _ => { + return Err(de::Error::custom( + "Expected gas as hex string or integer", + )) + } + }; + } + "returnValue" => { + self.metadata.return_value = map.next_value()?; + } + "structLogs" => { + // Stream the array: deserialize each StructLog and hand to processor + map.next_value_seed(StructLogArraySeed { + processor: self.processor, + })?; + got_struct_logs = true; + } + _ => { + // Skip unknown fields + let _ = map.next_value::()?; + } + } + } + if !got_struct_logs { + return Err(de::Error::custom( + "missing structLogs field in trace result", + )); + } + // If `failed` was encountered after `structLogs`, notify the processor + if self.metadata.failed { + self.processor.trace_failed(); + } + Ok(()) + } + } + + impl<'de, P: StructLogProcessor> DeserializeSeed<'de> for ResultFieldSeed<'_, P> { + type Value = (); + + fn deserialize>(self, deserializer: D) -> Result<(), D::Error> { + deserializer.deserialize_map(ResultObjectVisitor { + processor: self.processor, + metadata: self.metadata, + }) + } + } + + /// Seed for streaming deserialization of the `structLogs` array. + struct StructLogArraySeed<'a, P> { + processor: &'a mut P, + } + + struct StructLogArrayVisitor<'a, P> { + processor: &'a mut P, + } + + impl<'de, P: StructLogProcessor> Visitor<'de> for StructLogArrayVisitor<'_, P> { + type Value = (); + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("an array of StructLog entries") + } + + fn visit_seq>(self, mut seq: A) -> Result<(), A::Error> { + while let Some(log) = seq.next_element::()? { + self.processor.process_log(log).map_err(de::Error::custom)?; + } + Ok(()) + } + } + + impl<'de, P: StructLogProcessor> DeserializeSeed<'de> for StructLogArraySeed<'_, P> { + type Value = (); + + fn deserialize>(self, deserializer: D) -> Result<(), D::Error> { + deserializer.deserialize_seq(StructLogArrayVisitor { + processor: self.processor, + }) + } + } + + /// Visitor for the top-level JSON-RPC response envelope. + struct RpcEnvelopeVisitor<'a, P> { + processor: &'a mut P, + metadata: &'a mut TraceMetadata, + } + + impl<'de, P: StructLogProcessor> Visitor<'de> for RpcEnvelopeVisitor<'_, P> { + type Value = (); + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a JSON-RPC response object") + } + + fn visit_map>(self, mut map: A) -> Result<(), A::Error> { + let mut got_result = false; + while let Some(key) = map.next_key::()? { + match key.as_str() { + "result" => { + map.next_value_seed(ResultFieldSeed { + processor: self.processor, + metadata: self.metadata, + })?; + got_result = true; + } + "error" => { + let error: serde_json::Value = map.next_value()?; + return Err(de::Error::custom(format!("Web3Error: {:?}", error))); + } + _ => { + // Skip jsonrpc, id, etc. + let _ = map.next_value::()?; + } + } + } + if !got_result { + return Err(de::Error::custom("No result field in JSON-RPC response")); + } + Ok(()) + } + } + + struct RpcEnvelopeSeed<'a, P> { + processor: &'a mut P, + metadata: &'a mut TraceMetadata, + } + + impl<'de, P: StructLogProcessor> DeserializeSeed<'de> for RpcEnvelopeSeed<'_, P> { + type Value = (); + + fn deserialize>(self, deserializer: D) -> Result<(), D::Error> { + deserializer.deserialize_map(RpcEnvelopeVisitor { + processor: self.processor, + metadata: self.metadata, + }) + } + } + + /// Stream a debug_traceTransaction response, processing each StructLog via the processor. + /// Returns TraceMetadata (failed, gas, returnValue) without materializing the full array. + pub fn stream_trace_response( + response: reqwest::blocking::Response, + processor: &mut P, + ) -> Result { + let mut metadata = TraceMetadata { + failed: false, + gas: 0, + return_value: Bytes::new(), + }; + + let reader = std::io::BufReader::new(response); + let mut deserializer = serde_json::Deserializer::from_reader(reader); + + RpcEnvelopeSeed { + processor, + metadata: &mut metadata, + } + .deserialize(&mut deserializer) + .map_err(|e| { + ValidationError::from(format!("Streaming trace deserialization error: {}", e)) + })?; + + Ok(metadata) + } +} + +/// Send a JSON-RPC POST and return the raw response (not deserialized). +/// Uses a fixed 120-second per-read timeout (via `.timeout()`) instead of +/// the user-configured `web3_timeout`, so that arbitrarily large streaming +/// responses can be consumed while still detecting server stalls promptly. +fn send_blocking_web3_post_raw( + config: &DVFConfig, + request_body: &serde_json::Value, +) -> Result { + let client = Client::builder() + .connect_timeout(Duration::from_millis(config.web3_timeout)) + .timeout(Duration::from_secs(120)) + .build() + .unwrap(); + + let node_url = config.get_rpc_url()?; + + debug!("Web3 request_body (streaming): {:?}", request_body); + let response = client.post(node_url).json(&request_body).send()?; + + Ok(response) +} + +/// Stream a debug_traceTransaction RPC call, processing each StructLog one at a time. +/// Returns (TraceMetadata, target_address). +pub fn stream_debug_trace( + config: &DVFConfig, + tx_id: &str, + processor: &mut P, +) -> Result<(TraceMetadata, Address), ValidationError> { + debug!("Streaming debug trace for {}", tx_id); + let request_body = json!({ + "jsonrpc": "2.0", + "method": "debug_traceTransaction", + "params": [tx_id, {"enableMemory": true, "enableStorage": true, "enableReturnData": false}], + "id": 1 + }); + + let response = send_blocking_web3_post_raw(config, &request_body)?; + let metadata = streaming_trace::stream_trace_response(response, processor)?; + let address = get_receipt_address(config, tx_id)?; + + Ok((metadata, address)) +} + +/// Streaming variant of `get_eth_debug_trace_sim()` with Anvil fallback. +/// Returns (TraceMetadata, target_address, Option, Option). +pub fn stream_eth_debug_trace_sim( + config: &DVFConfig, + tx_id: &str, + processor: &mut P, +) -> Result< + ( + TraceMetadata, + Address, + Option, + Option, + ), + ValidationError, +> { + debug!("Streaming debug trace (with sim fallback)."); + + let request_body = json!({ + "jsonrpc": "2.0", + "method": "debug_traceTransaction", + "params": [tx_id, {"enableMemory": true, "enableStorage": true, "enableReturnData": false}], + "id": 1 + }); + + // Try direct streaming first + match send_blocking_web3_post_raw(config, &request_body) { + Ok(response) => match streaming_trace::stream_trace_response(response, processor) { + Ok(metadata) => { + let address = get_receipt_address(config, tx_id)?; + return Ok((metadata, address, None, None)); + } + Err(e) => { + info!( + "Direct streaming trace failed, trying fallback with anvil: {}", + e + ); + } + }, + Err(_) => { + info!("Initial debug_traceTransaction request failed, trying fallback with anvil"); + } + } + + // Fallback: replay via Anvil, then stream from Anvil + let (block_num, tx_index, tx_result) = get_transaction_details(config, tx_id)?; + let rpc_url = config.get_rpc_url()?; + + let (fork_transaction_hash, fork_block_number) = + match get_previous_transaction_in_block(config, block_num, tx_index)? { + Some(prev_tx_hash) => (Some(prev_tx_hash), None), + None => (None, Some(block_num - 1)), + }; + + let anvil_instance = start_anvil( + &rpc_url, + fork_transaction_hash.as_deref(), + fork_block_number, + get_eth_block_timestamp(config, block_num)?, + )?; + + let mut anvil_config = DVFConfig::default(); + anvil_config + .rpc_urls + .insert(config.active_chain_id.unwrap(), anvil_instance.endpoint()); + anvil_config.active_chain_id = config.active_chain_id; + anvil_config.web3_timeout = config.web3_timeout; + + let from = tx_result["from"].as_str().unwrap(); + let to = tx_result["to"].as_str().unwrap_or("null"); + let value = tx_result["value"].as_str().unwrap(); + let data = tx_result["input"].as_str().unwrap(); + let gas = tx_result["gas"].as_str().unwrap(); + let gas_price = tx_result["gasPrice"].as_str().unwrap(); + + let impersonate_body = json!({ + "jsonrpc": "2.0", + "method": "anvil_impersonateAccount", + "params": [from], + "id": 1 + }); + let _ = send_blocking_web3_post(&anvil_config, &impersonate_body); + + let balance_body = json!({ + "jsonrpc": "2.0", + "method": "anvil_setBalance", + "params": [from, "0x56bc75e2d63100000"], + "id": 1 + }); + let _ = send_blocking_web3_post(&anvil_config, &balance_body); + + let send_tx_body = json!({ + "jsonrpc": "2.0", + "method": "eth_sendTransaction", + "params": [ + { + "from": from, + "to": if to == "null" { serde_json::Value::Null } else { json!(to) }, + "value": value, + "input": data, + "gas": gas, + "gasPrice": gas_price + } + ], + "id": 1 + }); + + let anvil_tx_result = match send_blocking_web3_post(&anvil_config, &send_tx_body) { + Ok(result) => result, + Err(_) => { + let send_tx_body_wo_gas = json!({ + "jsonrpc": "2.0", + "method": "eth_sendTransaction", + "params": [ + { + "from": from, + "to": if to == "null" { serde_json::Value::Null } else { json!(to) }, + "value": value, + "input": data, + } + ], + "id": 1 + }); + match send_blocking_web3_post(&anvil_config, &send_tx_body_wo_gas) { + Ok(result) => result, + Err(e) => { + stop_anvil_instance(anvil_instance); + return Err(e); + } + } + } + }; + + let ts_body = json!({ + "jsonrpc": "2.0", + "method": "evm_setNextBlockTimestamp", + "params": [get_eth_block_timestamp(config, block_num)?], + "id": 1 + }); + let _ = send_blocking_web3_post(&anvil_config, &ts_body); + + let mine_body = json!({ + "jsonrpc": "2.0", + "method": "evm_mine", + "params": [], + "id": 1 + }); + let _ = send_blocking_web3_post(&anvil_config, &mine_body); + + let anvil_tx_id = anvil_tx_result.as_str().unwrap(); + match stream_debug_trace(&anvil_config, anvil_tx_id, processor) { + Ok((metadata, address)) => { + Ok((metadata, address, Some(anvil_config), Some(anvil_instance))) + } + Err(e) => { + stop_anvil_instance(anvil_instance); + Err(e) + } + } +} + // Returns create addresses of internal calls, if the initial call is a create, then it is not included // Reverting creates are indicated with a Zero address fn extract_create_addresses_from_call_frame( @@ -1032,21 +1609,33 @@ pub fn get_all_txs_for_contract( start_block: u64, end_block: u64, ) -> Result, ValidationError> { - if let Ok(all_txs) = - get_all_txs_for_contract_from_blockscout(config, address, start_block, end_block) - { - return Ok(all_txs); - } else if end_block - start_block <= 100 { + // For manageable ranges, prefer trace-based approaches: they capture ALL + // internal calls to the contract, whereas blockscout may miss some. + // Parity traces: 1 RPC call per block. + if end_block - start_block <= 500 { if let Ok(all_txs) = get_all_txs_for_contract_from_parity_traces(config, address, start_block, end_block) { + debug!("Found {} txs via parity traces", all_txs.len()); return Ok(all_txs); - } else if let Ok(all_txs) = + } + } + // Geth traces: 1 RPC call per transaction (only feasible for small ranges). + if end_block - start_block <= 100 { + if let Ok(all_txs) = get_all_txs_for_contract_from_geth_traces(config, address, start_block, end_block) { + debug!("Found {} txs via geth traces", all_txs.len()); return Ok(all_txs); } } + // Fallback: blockscout API (fast but may miss internal-call-only transactions). + if let Ok(all_txs) = + get_all_txs_for_contract_from_blockscout(config, address, start_block, end_block) + { + debug!("Found {} txs via blockscout", all_txs.len()); + return Ok(all_txs); + } Err(ValidationError::from(format!( "Could not find transactions for {:?} from {} to {}.", address, start_block, end_block @@ -1707,11 +2296,22 @@ impl StorageSnapshot { address: &Address, tx_hashes: &Vec, ) -> Result, ValidationError> { - debug!("Constructing snapshot from TX Ids."); + debug!("Constructing snapshot from TX Ids (streaming)."); let mut snapshot: HashMap = HashMap::new(); for tx_hash in tx_hashes { - let trace_w_a = get_eth_debug_trace(config, tx_hash)?; - Self::add_trace(&mut snapshot, config, address, &trace_w_a)?; + let tx_to_address = get_receipt_address(config, tx_hash)?; + let mut processor = StorageSnapshotProcessor::new( + &mut snapshot, + *address, + tx_to_address, + config, + tx_hash.clone(), + ); + let (metadata, _) = stream_debug_trace(config, tx_hash, &mut processor)?; + if metadata.failed { + processor.trace_failed(); + } + processor.finalize()?; } Ok(snapshot) } @@ -1793,16 +2393,19 @@ impl StorageSnapshot { // ); } - // Save upon successful return + // Propagate committed storage to parent depth on successful return if log.op == "STOP" || log.op == "RETURN" { - commit_storage_to_snapshot(&last_storage, log.depth, snapshot); + if let Some(committed) = last_storage.remove(&log.depth) { + let parent = last_storage.entry(log.depth - 1).or_default(); + for (slot, value) in committed { + parent.insert(slot, value); + } + } } // Clean failed storages if log.depth < last_depth { - for depth in log.depth..last_depth + 1 { - if last_storage.contains_key(&depth) { - last_storage.remove(&depth); - } + for depth in (log.depth + 1)..=last_depth { + last_storage.remove(&depth); } } } diff --git a/src/dvf.rs b/src/dvf.rs index 2e021a4..0473ba6 100644 --- a/src/dvf.rs +++ b/src/dvf.rs @@ -1659,21 +1659,34 @@ fn process(matches: ArgMatches) -> Result<(), ValidationError> { for address in &call_addresses { println!("- {}", address); } - println!("The transaction created the following contracts:"); - for address in &create_addresses { - println!("- {}", address); + if !create_addresses.is_empty() { + println!("The transaction created the following contracts:"); + for address in &create_addresses { + println!("- {}", address); + } } let registry = registry::Registry::from_config(&config)?; let pretty_printer = PrettyPrinter::new(&config, Some(®istry)); - // Fetch and cache debug trace once; keep anvil alive until end - let (cached_trace, cached_anvil_config, cached_anvil_instance) = - web3::get_eth_debug_trace_sim(&config, tx_hash)?; + // Stream the trace once, collecting mapping usages for ALL addresses + let trace_address = web3::get_receipt_address(&config, tx_hash)?; + let mut multi_processor = + dvf_libs::state::contract_state::MultiAddressMappingProcessor::new( + trace_address, + &config, + tx_hash.clone(), + ); + let (_metadata, _address, _anvil_config, anvil_instance) = + web3::stream_eth_debug_trace_sim(&config, tx_hash, &mut multi_processor)?; + + // Cache the tiny per-address mapping usages (not the raw trace) + let all_mapping_usages = multi_processor.all_mapping_usages; print_progress("Checking called contracts.", &mut pc, &progress_mode); for address in &call_addresses { println!("Checking contract: {}", address); + let usages_for_address = all_mapping_usages.get(address).cloned(); inspect_called_contract( &config, chain_id, @@ -1681,16 +1694,14 @@ fn process(matches: ArgMatches) -> Result<(), ValidationError> { block_num, tx_hash, &pretty_printer, - &mut pc, - &progress_mode, - Some(vec![cached_trace.clone()]), - cached_anvil_config.as_ref(), + usages_for_address, )?; } print_progress("Checking created contracts.", &mut pc, &progress_mode); for address in &create_addresses { println!("Checking contract: {}", address); + let usages_for_address = all_mapping_usages.get(address).cloned(); inspect_called_contract( &config, chain_id, @@ -1698,15 +1709,16 @@ fn process(matches: ArgMatches) -> Result<(), ValidationError> { block_num, tx_hash, &pretty_printer, - &mut pc, - &progress_mode, - Some(vec![cached_trace.clone()]), - cached_anvil_config.as_ref(), + usages_for_address, )?; } + if create_addresses.is_empty() { + println!("No contracts created during transaction - skipping") + } + // After all inspections, stop cached anvil instance if present - if let Some(anvil_instance) = cached_anvil_instance { + if let Some(anvil_instance) = anvil_instance { web3::stop_anvil_instance(anvil_instance); } @@ -1823,10 +1835,7 @@ fn inspect_called_contract( block_num: u64, tx_hash: &String, pretty_printer: &PrettyPrinter, - pc: &mut u64, - progress_mode: &ProgressMode, - cached_traces: Option>, - cached_anvil_config: Option<&DVFConfig>, + cached_mapping_usages: Option, ) -> Result<(), ValidationError> { let project_config = config.get_project_config(address, chain_id); if let Some(project_config) = project_config { @@ -1869,7 +1878,7 @@ fn inspect_called_contract( None => "Compiling local code.", Some(_) => "Loading build cache.", }; - print_progress(compile_output, pc, progress_mode); + print_progress(compile_output, &mut pc_sub, &progress_mode_sub); let mut project_info = ProjectInfo::new( &project_config.contract_name, Path::new(&project_config.project_path), @@ -1950,8 +1959,7 @@ fn inspect_called_contract( progress_mode: &progress_mode_sub, use_storage_range: false, tx_hashes: Some(vec![tx_hash.clone()]), - cached_traces, - cached_anvil_config, + cached_mapping_usages, })?; dumped.critical_storage_variables = critical_storage_variables;