diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 1ccade444..fcda2c83e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -80,7 +80,11 @@ jobs: - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: | - RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test -- --skip cbf + - name: Test CBF on Rust ${{ matrix.toolchain }} + if: "matrix.platform != 'windows-latest'" + run: | + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test cbf -- --test-threads=1 - name: Test with UniFFI support on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest' && matrix.build-uniffi" run: | diff --git a/Cargo.toml b/Cargo.toml index 539941677..8182c4f8b 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} +bip157 = { version = "0.4.2", default-features = false } bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]} bitreq = { version = "0.3", default-features = false, features = ["async-https", "json-using-serde"] } diff --git a/README.md b/README.md index 0068b6e07..32417242b 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ fn main() { LDK Node currently comes with a decidedly opinionated set of design choices: - On-chain data is handled by the integrated [BDK][bdk] wallet. -- Chain data may currently be sourced from the Bitcoin Core RPC interface, or from an [Electrum][electrum] or [Esplora][esplora] server. +- Chain data may currently be sourced from the Bitcoin Core RPC interface, from an [Electrum][electrum] or [Esplora][esplora] server, or via [compact block filters (BIP 157)][bip157]. - Wallet and channel state may be persisted to an [SQLite][sqlite] database, to file system, or to a custom back-end to be implemented by the user. - Gossip data may be sourced via Lightning's peer-to-peer network or the [Rapid Gossip Sync](https://docs.rs/lightning-rapid-gossip-sync/*/lightning_rapid_gossip_sync/) protocol. - Entropy for the Lightning and on-chain wallets may be sourced from raw bytes or a [BIP39](https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki) mnemonic. In addition, LDK Node offers the means to generate and persist the entropy bytes to disk. @@ -80,6 +80,7 @@ The Minimum Supported Rust Version (MSRV) is currently 1.85.0. [bdk]: https://bitcoindevkit.org/ [electrum]: https://github.com/spesmilo/electrum-protocol [esplora]: https://github.com/Blockstream/esplora +[bip157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki [sqlite]: https://sqlite.org/ [rust]: https://www.rust-lang.org/ [swift]: https://www.swift.org/ diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 014993690..31d3d6a4c 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -9,6 +9,8 @@ typedef dictionary EsploraSyncConfig; typedef dictionary ElectrumSyncConfig; +typedef dictionary CbfSyncConfig; + typedef dictionary TorConfig; typedef interface NodeEntropy; @@ -38,6 +40,7 @@ interface Builder { constructor(Config config); void set_chain_source_esplora(string server_url, EsploraSyncConfig? config); void set_chain_source_electrum(string server_url, ElectrumSyncConfig? config); + void set_chain_source_cbf(sequence peers, CbfSyncConfig? sync_config, FeeSourceConfig? fee_source_config); void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_chain_source_bitcoind_rest(string rest_host, u16 rest_port, string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); @@ -354,6 +357,8 @@ enum Currency { typedef enum AsyncPaymentsRole; +typedef enum FeeSourceConfig; + [Custom] typedef string Txid; diff --git a/src/builder.rs b/src/builder.rs index cd8cc184f..5a388cd82 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -42,11 +42,11 @@ use lightning::util::sweep::OutputSweeper; use lightning_persister::fs_store::v1::FilesystemStore; use vss_client::headers::VssHeaderProvider; -use crate::chain::ChainSource; +use crate::chain::{ChainSource, FeeSourceConfig}; use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, - BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig, - DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, + TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, }; use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; @@ -105,6 +105,11 @@ enum ChainDataSourceConfig { rpc_password: String, rest_client_config: Option, }, + Cbf { + peers: Vec, + sync_config: Option, + fee_source_config: Option, + }, } #[derive(Debug, Clone)] @@ -193,6 +198,8 @@ pub enum BuildError { NetworkMismatch, /// The role of the node in an asynchronous payments context is not compatible with the current configuration. AsyncPaymentsConfigMismatch, + /// We failed to setup the chain source. + ChainSourceSetupFailed, } impl fmt::Display for BuildError { @@ -226,6 +233,7 @@ impl fmt::Display for BuildError { "The async payments role is not compatible with the current configuration." ) }, + Self::ChainSourceSetupFailed => write!(f, "Failed to setup chain source."), } } } @@ -365,6 +373,28 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its chain data via BIP 157 compact block + /// filters. + /// + /// `peers` is an optional list of peer addresses to connect to for sourcing compact block + /// filters. If empty, the node will discover peers via DNS seeds. + /// + /// If no `sync_config` is given, default values are used. See [`CbfSyncConfig`] for more + /// information. + /// + /// Note: fee rate estimation with this chain source uses block-level averages (total fees + /// divided by block weight) rather than per-transaction fee rates. This can underestimate + /// next-block inclusion rates during periods of high mempool congestion. Percentile-based + /// target selection partially mitigates this. + pub fn set_chain_source_cbf( + &mut self, peers: Vec, sync_config: Option, + fee_source_config: Option, + ) -> &mut Self { + self.chain_data_source_config = + Some(ChainDataSourceConfig::Cbf { peers, sync_config, fee_source_config }); + self + } + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. /// /// This method establishes an RPC connection that enables all essential chain operations including @@ -892,6 +922,26 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_chain_source_electrum(server_url, sync_config); } + /// Configures the [`Node`] instance to source its chain data via BIP 157 compact block + /// filters. + /// + /// `peers` is an optional list of peer addresses to connect to for sourcing compact block + /// filters. If empty, the node will discover peers via DNS seeds. + /// + /// If no `sync_config` is given, default values are used. See [`CbfSyncConfig`] for more + /// information. + /// + /// Note: fee rate estimation with this chain source uses block-level averages (total fees + /// divided by block weight) rather than per-transaction fee rates. This can underestimate + /// next-block inclusion rates during periods of high mempool congestion. Percentile-based + /// target selection partially mitigates this. + pub fn set_chain_source_cbf( + &self, peers: Vec, sync_config: Option, + fee_source_config: Option, + ) { + self.inner.write().unwrap().set_chain_source_cbf(peers, sync_config, fee_source_config); + } + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. /// /// This method establishes an RPC connection that enables all essential chain operations including @@ -1364,6 +1414,25 @@ fn build_with_store_internal( }), }, + Some(ChainDataSourceConfig::Cbf { peers, sync_config, fee_source_config }) => { + let sync_config = sync_config.clone().unwrap_or(CbfSyncConfig::default()); + ChainSource::new_cbf( + peers.clone(), + sync_config, + fee_source_config.clone(), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + .map_err(|e| { + log_error!(logger, "Failed to initialize CBF chain source: {}", e); + BuildError::ChainSourceSetupFailed + })? + }, + None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); @@ -2079,6 +2148,9 @@ pub(crate) fn sanitize_alias(alias_str: &str) -> Result { #[cfg(test)] mod tests { + #[cfg(feature = "uniffi")] + use crate::config::CbfSyncConfig; + use super::{sanitize_alias, BuildError, NodeAlias}; #[test] @@ -2116,4 +2188,24 @@ mod tests { let node = sanitize_alias(alias); assert_eq!(node.err().unwrap(), BuildError::InvalidNodeAlias); } + + #[cfg(feature = "uniffi")] + #[test] + fn arced_builder_can_set_cbf_chain_source() { + let builder = super::ArcedNodeBuilder::new(); + let sync_config = CbfSyncConfig::default(); + + let peers = vec!["127.0.0.1:8333".to_string()]; + builder.set_chain_source_cbf(peers.clone(), Some(sync_config.clone()), None); + + let guard = builder.inner.read().unwrap(); + assert!(matches!( + guard.chain_data_source_config.as_ref(), + Some(super::ChainDataSourceConfig::Cbf { + peers: p, + sync_config: Some(config), + fee_source_config: None, + }) if config == &sync_config && p == &peers + )); + } } diff --git a/src/chain/cbf.rs b/src/chain/cbf.rs new file mode 100644 index 000000000..494883b09 --- /dev/null +++ b/src/chain/cbf.rs @@ -0,0 +1,1248 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::collections::{BTreeMap, HashMap}; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use bdk_chain::{BlockId, ConfirmationBlockTime, TxUpdate}; +use bdk_wallet::Update; +use bip157::chain::BlockHeaderChanges; +use bip157::{ + BlockHash, Builder, Client, Event, Info, Requester, SyncUpdate, TrustedPeer, Warning, +}; +use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; +use bitcoin::{Amount, FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; +use electrum_client::ElectrumApi; +use lightning::chain::{Confirm, WatchedOutput}; +use lightning::util::ser::Writeable; +use tokio::sync::{mpsc, oneshot}; + +use super::{FeeSourceConfig, WalletSyncStatus}; +use crate::config::{CbfSyncConfig, Config, BDK_CLIENT_STOP_GAP}; +use crate::error::Error; +use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + OnchainFeeEstimator, +}; +use crate::io::utils::write_node_metrics; +use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::NodeMetrics; + +/// Minimum fee rate: 1 sat/vB = 250 sat/kWU. Used as a floor for computed fee rates. +const MIN_FEERATE_SAT_PER_KWU: u64 = 250; + +/// Number of recent blocks to look back for per-target fee rate estimation. +const FEE_RATE_LOOKBACK_BLOCKS: usize = 6; + +/// The fee estimation back-end used by the CBF chain source. +enum FeeSource { + /// Derive fee rates from the coinbase reward of recent blocks. + /// + /// Provides a per-target rate using percentile selection across multiple blocks. + /// Less accurate than a mempool-aware source but requires no extra connectivity. + Cbf, + /// Delegate fee estimation to an Esplora HTTP server. + Esplora { client: esplora_client::AsyncClient }, + /// Delegate fee estimation to an Electrum server. + /// + /// A fresh connection is opened for each estimation cycle because `ElectrumClient` + /// is not `Sync`. + Electrum { server_url: String }, +} + +pub(super) struct CbfChainSource { + /// Peer addresses for sourcing compact block filters via P2P. + peers: Vec, + /// User-provided sync configuration (timeouts, background sync intervals). + pub(super) sync_config: CbfSyncConfig, + /// Fee estimation back-end. + fee_source: FeeSource, + /// Tracks whether the bip157 node is running and holds the command handle. + cbf_runtime_status: Mutex, + /// Latest chain tip hash, updated by the background event processing task. + latest_tip: Arc>>, + /// Scripts to match against compact block filters during a scan. + watched_scripts: Arc>>, + /// Block (height, hash) pairs where filters matched watched scripts. + matched_block_hashes: Arc>>, + /// One-shot channel sender to signal filter scan completion. + sync_completion_tx: Arc>>>, + /// Filters at or below this height are skipped during incremental scans. + filter_skip_height: Arc, + /// Serializes concurrent filter scans (on-chain and lightning). + scan_lock: tokio::sync::Mutex<()>, + /// Scripts registered by LDK's Filter trait for lightning channel monitoring. + registered_scripts: Mutex>, + /// Set when new scripts are registered; forces a full rescan on next lightning sync. + lightning_scripts_dirty: Arc, + /// Last block height reached by on-chain wallet sync, used for incremental scans. + last_onchain_synced_height: Arc>>, + /// Last block height reached by lightning wallet sync, used for incremental scans. + last_lightning_synced_height: Arc>>, + /// Deduplicates concurrent on-chain wallet sync requests. + onchain_wallet_sync_status: Mutex, + /// Deduplicates concurrent lightning wallet sync requests. + lightning_wallet_sync_status: Mutex, + /// Shared fee rate estimator, updated by this chain source. + fee_estimator: Arc, + /// Persistent key-value store for node metrics. + kv_store: Arc, + /// Node configuration (network, storage path, etc.). + config: Arc, + /// Logger instance. + logger: Arc, + /// Shared node metrics (sync timestamps, etc.). + node_metrics: Arc>, +} + +enum CbfRuntimeStatus { + Started { requester: Requester }, + Stopped, +} + +/// Shared state passed to the background event processing task. +struct CbfEventState { + latest_tip: Arc>>, + watched_scripts: Arc>>, + matched_block_hashes: Arc>>, + sync_completion_tx: Arc>>>, + filter_skip_height: Arc, + last_onchain_synced_height: Arc>>, + last_lightning_synced_height: Arc>>, + lightning_scripts_dirty: Arc, +} + +impl CbfChainSource { + pub(crate) fn new( + peers: Vec, sync_config: CbfSyncConfig, fee_source_config: Option, + fee_estimator: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, + ) -> Result { + let fee_source = match fee_source_config { + Some(FeeSourceConfig::Esplora(server_url)) => { + let timeout = sync_config.timeouts_config.per_request_timeout_secs; + let mut builder = esplora_client::Builder::new(&server_url); + builder = builder.timeout(timeout as u64); + let client = builder.build_async().map_err(|e| { + log_error!(logger, "Failed to build esplora client: {}", e); + Error::ConnectionFailed + })?; + FeeSource::Esplora { client } + }, + Some(FeeSourceConfig::Electrum(server_url)) => FeeSource::Electrum { server_url }, + None => FeeSource::Cbf, + }; + + let cbf_runtime_status = Mutex::new(CbfRuntimeStatus::Stopped); + let latest_tip = Arc::new(Mutex::new(None)); + let watched_scripts = Arc::new(RwLock::new(Vec::new())); + let matched_block_hashes = Arc::new(Mutex::new(Vec::new())); + let sync_completion_tx = Arc::new(Mutex::new(None)); + let filter_skip_height = Arc::new(AtomicU32::new(0)); + let registered_scripts = Mutex::new(Vec::new()); + let lightning_scripts_dirty = Arc::new(AtomicBool::new(true)); + let scan_lock = tokio::sync::Mutex::new(()); + let last_onchain_synced_height = Arc::new(Mutex::new(None)); + let last_lightning_synced_height = Arc::new(Mutex::new(None)); + let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); + Ok(Self { + peers, + sync_config, + fee_source, + cbf_runtime_status, + latest_tip, + watched_scripts, + matched_block_hashes, + sync_completion_tx, + filter_skip_height, + registered_scripts, + lightning_scripts_dirty, + scan_lock, + last_onchain_synced_height, + last_lightning_synced_height, + onchain_wallet_sync_status, + lightning_wallet_sync_status, + fee_estimator, + kv_store, + config, + logger, + node_metrics, + }) + } + + /// Start the bip157 node and spawn background tasks for event processing. + pub(crate) fn start(&self, runtime: Arc) { + let mut status = self.cbf_runtime_status.lock().unwrap(); + if matches!(*status, CbfRuntimeStatus::Started { .. }) { + debug_assert!(false, "We shouldn't call start if we're already started"); + return; + } + + let network = self.config.network; + + let mut builder = Builder::new(network); + + // Configure data directory under the node's storage path. + let data_dir = std::path::PathBuf::from(&self.config.storage_dir_path).join("bip157_data"); + builder = builder.data_dir(data_dir); + + // Add configured peers. + let peers: Vec = self + .peers + .iter() + .filter_map(|peer_str| { + peer_str.parse::().ok().map(TrustedPeer::from_socket_addr) + }) + .collect(); + if !peers.is_empty() { + builder = builder.add_peers(peers); + } + + // Require multiple peers to agree on filter headers before accepting them, + // as recommended by BIP 157 to mitigate malicious peer attacks. + builder = builder.required_peers(self.sync_config.required_peers); + + // Request witness data so segwit transactions include full witnesses, + // required for Lightning channel operations. + builder = builder.fetch_witness_data(); + + // Set peer response timeout from user configuration (default: 30s). + builder = + builder.response_timeout(Duration::from_secs(self.sync_config.response_timeout_secs)); + + let (node, client) = builder.build(); + + let Client { requester, info_rx, warn_rx, event_rx } = client; + + // Spawn the bip157 node in the background. + let node_logger = Arc::clone(&self.logger); + runtime.spawn_background_task(async move { + if let Err(e) = node.run().await { + log_error!(node_logger, "CBF node exited with error: {:?}", e); + } + }); + + // Spawn a task to log info messages. + let info_logger = Arc::clone(&self.logger); + runtime + .spawn_cancellable_background_task(Self::process_info_messages(info_rx, info_logger)); + + // Spawn a task to log warning messages. + let warn_logger = Arc::clone(&self.logger); + runtime + .spawn_cancellable_background_task(Self::process_warn_messages(warn_rx, warn_logger)); + + // Spawn a task to process events. + let event_state = CbfEventState { + latest_tip: Arc::clone(&self.latest_tip), + watched_scripts: Arc::clone(&self.watched_scripts), + matched_block_hashes: Arc::clone(&self.matched_block_hashes), + sync_completion_tx: Arc::clone(&self.sync_completion_tx), + filter_skip_height: Arc::clone(&self.filter_skip_height), + last_onchain_synced_height: Arc::clone(&self.last_onchain_synced_height), + last_lightning_synced_height: Arc::clone(&self.last_lightning_synced_height), + lightning_scripts_dirty: Arc::clone(&self.lightning_scripts_dirty), + }; + let event_logger = Arc::clone(&self.logger); + runtime.spawn_cancellable_background_task(Self::process_events( + event_rx, + event_state, + event_logger, + )); + + log_info!(self.logger, "CBF chain source started."); + + *status = CbfRuntimeStatus::Started { requester }; + } + + /// Shut down the bip157 node and stop all background tasks. + pub(crate) fn stop(&self) { + let mut status = self.cbf_runtime_status.lock().unwrap(); + match &*status { + CbfRuntimeStatus::Started { requester } => { + let _ = requester.shutdown(); + log_info!(self.logger, "CBF chain source stopped."); + }, + CbfRuntimeStatus::Stopped => {}, + } + *status = CbfRuntimeStatus::Stopped; + } + + async fn process_info_messages(mut info_rx: mpsc::Receiver, logger: Arc) { + while let Some(info) = info_rx.recv().await { + log_debug!(logger, "CBF node info: {}", info); + } + } + + async fn process_warn_messages( + mut warn_rx: mpsc::UnboundedReceiver, logger: Arc, + ) { + while let Some(warning) = warn_rx.recv().await { + log_debug!(logger, "CBF node warning: {}", warning); + } + } + + async fn process_events( + mut event_rx: mpsc::UnboundedReceiver, state: CbfEventState, logger: Arc, + ) { + while let Some(event) = event_rx.recv().await { + match event { + Event::FiltersSynced(sync_update) => { + let tip = sync_update.tip(); + *state.latest_tip.lock().unwrap() = Some(tip.hash); + log_info!( + logger, + "CBF filters synced to tip: height={}, hash={}", + tip.height, + tip.hash, + ); + if let Some(tx) = state.sync_completion_tx.lock().unwrap().take() { + let _ = tx.send(sync_update); + } + }, + Event::Block(_) => {}, + Event::ChainUpdate(header_changes) => match header_changes { + BlockHeaderChanges::Reorganized { accepted, reorganized } => { + log_debug!( + logger, + "CBF chain reorg detected: {} blocks removed, {} blocks accepted.", + reorganized.len(), + accepted.len(), + ); + + // Reset synced heights to just before the earliest reorganized + // block so the next incremental scan covers the affected range. + if let Some(min_reorg_height) = reorganized.iter().map(|h| h.height).min() { + let reset_height = if min_reorg_height > 0 { + Some(min_reorg_height - 1) + } else { + None + }; + *state.last_onchain_synced_height.lock().unwrap() = reset_height; + *state.last_lightning_synced_height.lock().unwrap() = reset_height; + state.lightning_scripts_dirty.store(true, Ordering::Release); + log_debug!( + logger, + "Reset synced heights to {:?} due to reorg at height {}.", + reset_height, + min_reorg_height, + ); + } + }, + BlockHeaderChanges::Connected(header) => { + log_trace!(logger, "CBF block connected at height {}", header.height,); + }, + BlockHeaderChanges::ForkAdded(header) => { + log_trace!(logger, "CBF fork block observed at height {}", header.height,); + }, + }, + Event::IndexedFilter(indexed_filter) => { + let skip_height = state.filter_skip_height.load(Ordering::Acquire); + if skip_height > 0 && indexed_filter.height() <= skip_height { + continue; + } + let scripts = state.watched_scripts.read().unwrap(); + if !scripts.is_empty() && indexed_filter.contains_any(scripts.iter()) { + state + .matched_block_hashes + .lock() + .unwrap() + .push((indexed_filter.height(), indexed_filter.block_hash())); + } + log_trace!(logger, "CBF received filter at height {}", indexed_filter.height(),); + }, + } + } + } + + fn requester(&self) -> Result { + let status = self.cbf_runtime_status.lock().unwrap(); + match &*status { + CbfRuntimeStatus::Started { requester } => Ok(requester.clone()), + CbfRuntimeStatus::Stopped => { + debug_assert!( + false, + "We should have started the chain source before using the requester" + ); + Err(Error::ConnectionFailed) + }, + } + } + + /// Register a transaction script for Lightning channel monitoring. + pub(crate) fn register_tx(&self, _txid: &Txid, script_pubkey: &Script) { + self.registered_scripts.lock().unwrap().push(script_pubkey.to_owned()); + self.lightning_scripts_dirty.store(true, Ordering::Release); + } + + /// Register a watched output script for Lightning channel monitoring. + pub(crate) fn register_output(&self, output: WatchedOutput) { + self.registered_scripts.lock().unwrap().push(output.script_pubkey.clone()); + self.lightning_scripts_dirty.store(true, Ordering::Release); + } + + /// Run a CBF filter scan: set watched scripts, trigger a rescan, wait for + /// completion, and return the sync update along with matched block hashes. + /// + /// When `skip_before_height` is `Some(h)`, filters at or below height `h` are + /// skipped, making the scan incremental. + async fn run_filter_scan( + &self, scripts: Vec, skip_before_height: Option, + ) -> Result<(SyncUpdate, Vec<(u32, BlockHash)>), Error> { + let requester = self.requester()?; + + let _scan_guard = self.scan_lock.lock().await; + + self.filter_skip_height.store(skip_before_height.unwrap_or(0), Ordering::Release); + self.matched_block_hashes.lock().unwrap().clear(); + *self.watched_scripts.write().unwrap() = scripts; + + let (tx, rx) = oneshot::channel(); + *self.sync_completion_tx.lock().unwrap() = Some(tx); + + requester.rescan().map_err(|e| { + log_error!(self.logger, "Failed to trigger CBF rescan: {:?}", e); + Error::WalletOperationFailed + })?; + + let sync_update = rx.await.map_err(|e| { + log_error!(self.logger, "CBF sync completion channel dropped: {:?}", e); + Error::WalletOperationFailed + })?; + + self.filter_skip_height.store(0, Ordering::Release); + self.watched_scripts.write().unwrap().clear(); + let matched = std::mem::take(&mut *self.matched_block_hashes.lock().unwrap()); + + Ok((sync_update, matched)) + } + + /// Sync the on-chain wallet by scanning compact block filters for relevant transactions. + pub(crate) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_debug!(self.logger, "On-chain wallet sync already in progress, waiting."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let res = async { + let requester = self.requester()?; + let now = Instant::now(); + + let scripts = onchain_wallet.get_spks_for_cbf_sync(BDK_CLIENT_STOP_GAP); + if scripts.is_empty() { + log_debug!(self.logger, "No wallet scripts to sync via CBF."); + return Ok(()); + } + + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.onchain_wallet_sync_timeout_secs, + ), + self.sync_onchain_wallet_op(requester, scripts), + ); + + let (tx_update, sync_update) = match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e); + return Err(Error::WalletOperationTimeout); + }, + }; + + // Build chain checkpoint extending from the wallet's current tip. + let mut cp = onchain_wallet.latest_checkpoint(); + for (height, header) in sync_update.recent_history() { + if *height > cp.height() { + let block_id = BlockId { height: *height, hash: header.block_hash() }; + cp = cp.push(block_id).unwrap_or_else(|old| old); + } + } + let tip = sync_update.tip(); + if tip.height > cp.height() { + let tip_block_id = BlockId { height: tip.height, hash: tip.hash }; + cp = cp.push(tip_block_id).unwrap_or_else(|old| old); + } + + let update = + Update { last_active_indices: BTreeMap::new(), tx_update, chain: Some(cp) }; + + onchain_wallet.apply_update(update)?; + + log_debug!( + self.logger, + "Sync of on-chain wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_onchain_wallet_sync_timestamp = t; + }, + )?; + + Ok(()) + } + .await; + + self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_onchain_wallet_op( + &self, requester: Requester, scripts: Vec, + ) -> Result<(TxUpdate, SyncUpdate), Error> { + // Always do a full scan (skip_height=None) for the on-chain wallet. + // Unlike the Lightning wallet which can rely on reorg_queue events, + // the on-chain wallet needs to see all blocks to correctly detect + // reorgs via checkpoint comparison in the caller. + // + // We include LDK-registered scripts (e.g., channel funding output + // scripts) alongside the wallet scripts. This ensures the on-chain + // wallet scan also fetches blocks containing channel funding + // transactions, whose outputs are needed by BDK's TxGraph to + // calculate fees for subsequent spends such as splice transactions. + // Without these, BDK's `calculate_fee` would fail with + // `MissingTxOut` because the parent transaction's outputs are + // unknown. This mirrors what the Bitcoind chain source does in + // `Wallet::block_connected` by inserting registered tx outputs. + let mut all_scripts = scripts; + // we query all registered scripts, not only BDK-related + all_scripts.extend(self.registered_scripts.lock().unwrap().iter().cloned()); + let (sync_update, matched) = self.run_filter_scan(all_scripts, None).await?; + + log_debug!( + self.logger, + "CBF on-chain filter scan complete: {} matching blocks found.", + matched.len() + ); + + // Fetch matching blocks and include all their transactions. + // The compact block filter already matched our scripts (covering both + // created outputs and spent inputs), so we include every transaction + // from matched blocks and let BDK determine relevance. + let mut tx_update = TxUpdate::default(); + for (height, block_hash) in &matched { + let indexed_block = requester.get_block(*block_hash).await.map_err(|e| { + log_error!(self.logger, "Failed to fetch block {}: {:?}", block_hash, e); + Error::WalletOperationFailed + })?; + let block = indexed_block.block; + let block_id = BlockId { height: *height, hash: block.header.block_hash() }; + let conf_time = + ConfirmationBlockTime { block_id, confirmation_time: block.header.time as u64 }; + for tx in &block.txdata { + let txid = tx.compute_txid(); + tx_update.txs.push(Arc::new(tx.clone())); + tx_update.anchors.insert((conf_time, txid)); + } + } + + let tip = sync_update.tip(); + *self.last_onchain_synced_height.lock().unwrap() = Some(tip.height); + + Ok((tx_update, sync_update)) + } + + /// Sync the Lightning wallet by confirming channel transactions via compact block filters. + pub(crate) async fn sync_lightning_wallet( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + let receiver_res = { + let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + if let Some(mut sync_receiver) = receiver_res { + log_debug!(self.logger, "Lightning wallet sync already in progress, waiting."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::TxSyncFailed + })?; + } + + let res = async { + let requester = self.requester()?; + let now = Instant::now(); + + let scripts: Vec = self.registered_scripts.lock().unwrap().clone(); + if scripts.is_empty() { + log_debug!(self.logger, "No registered scripts for CBF lightning sync."); + return Ok(()); + } + + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.lightning_wallet_sync_timeout_secs, + ), + self.sync_lightning_wallet_op( + requester, + channel_manager, + chain_monitor, + output_sweeper, + scripts, + ), + ); + + match timeout_fut.await { + Ok(res) => res?, + Err(e) => { + log_error!(self.logger, "Sync of Lightning wallet timed out: {}", e); + return Err(Error::TxSyncTimeout); + }, + }; + + log_debug!( + self.logger, + "Sync of Lightning wallet via CBF finished in {}ms.", + now.elapsed().as_millis() + ); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_lightning_wallet_sync_timestamp = t; + }, + )?; + + Ok(()) + } + .await; + + self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + + res + } + + async fn sync_lightning_wallet_op( + &self, requester: Requester, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, scripts: Vec, + ) -> Result<(), Error> { + let scripts_dirty = self.lightning_scripts_dirty.load(Ordering::Acquire); + let skip_height = + if scripts_dirty { None } else { *self.last_lightning_synced_height.lock().unwrap() }; + let (sync_update, matched) = self.run_filter_scan(scripts, skip_height).await?; + + log_debug!( + self.logger, + "CBF lightning filter scan complete: {} matching blocks found.", + matched.len() + ); + + let confirmables: Vec<&(dyn Confirm + Sync + Send)> = + vec![&*channel_manager, &*chain_monitor, &*output_sweeper]; + + // Fetch matching blocks and confirm all their transactions. + // The compact block filter already matched our scripts (covering both + // created outputs and spent inputs), so we confirm every transaction + // from matched blocks and let LDK determine relevance. + for (height, block_hash) in &matched { + confirm_block_transactions( + &requester, + *block_hash, + *height, + &confirmables, + &self.logger, + ) + .await?; + } + + // Update the best block tip. + let tip = sync_update.tip(); + if let Some(tip_header) = sync_update.recent_history().get(&tip.height) { + for confirmable in &confirmables { + confirmable.best_block_updated(tip_header, tip.height); + } + } + + *self.last_lightning_synced_height.lock().unwrap() = Some(tip.height); + self.lightning_scripts_dirty.store(false, Ordering::Release); + + Ok(()) + } + + pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { + let new_fee_rate_cache = match &self.fee_source { + FeeSource::Cbf => self.fee_rate_cache_from_cbf().await?, + FeeSource::Esplora { client } => Some(self.fee_rate_cache_from_esplora(client).await?), + FeeSource::Electrum { server_url } => { + Some(self.fee_rate_cache_from_electrum(server_url).await?) + }, + }; + + let Some(new_fee_rate_cache) = new_fee_rate_cache else { + return Ok(()); + }; + + self.fee_estimator.set_fee_rate_cache(new_fee_rate_cache); + + update_node_metrics_timestamp( + &self.node_metrics, + &*self.kv_store, + &*self.logger, + |m, t| { + m.latest_fee_rate_cache_update_timestamp = t; + }, + )?; + + Ok(()) + } + + /// Derive per-target fee rates from recent blocks' coinbase outputs. + /// + /// Returns `Ok(None)` when no chain tip is available yet (first startup before sync). + async fn fee_rate_cache_from_cbf( + &self, + ) -> Result>, Error> { + let requester = self.requester()?; + + let tip_hash = match *self.latest_tip.lock().unwrap() { + Some(hash) => hash, + None => { + log_debug!(self.logger, "No tip available yet for fee rate estimation, skipping."); + return Ok(None); + }, + }; + + let now = Instant::now(); + + // Fetch fee rates from the last N blocks for per-target estimation. + // We compute fee rates ourselves rather than using Requester::average_fee_rate, + // so we can sample multiple blocks and select percentiles per confirmation target. + let mut block_fee_rates: Vec = Vec::with_capacity(FEE_RATE_LOOKBACK_BLOCKS); + let mut current_hash = tip_hash; + + let timeout = Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ); + let fetch_start = Instant::now(); + + for idx in 0..FEE_RATE_LOOKBACK_BLOCKS { + // Check if we've exceeded the overall timeout for fee estimation. + let remaining_timeout = timeout.saturating_sub(fetch_start.elapsed()); + if remaining_timeout.is_zero() { + log_error!(self.logger, "Updating fee rate estimates timed out."); + return Err(Error::FeerateEstimationUpdateTimeout); + } + + // Fetch the block via P2P. On the first iteration, a fetch failure + // likely means the cached tip is stale (initial sync or reorg), so + // we clear the tip and skip gracefully instead of returning an error. + let indexed_block = + match tokio::time::timeout(remaining_timeout, requester.get_block(current_hash)) + .await + { + Ok(Ok(indexed_block)) => indexed_block, + Ok(Err(e)) if idx == 0 => { + log_debug!( + self.logger, + "Cached CBF tip {} was unavailable during fee estimation, \ + likely due to initial sync or a reorg: {:?}", + current_hash, + e + ); + *self.latest_tip.lock().unwrap() = None; + return Ok(None); + }, + Ok(Err(e)) => { + log_error!( + self.logger, + "Failed to fetch block for fee estimation: {:?}", + e + ); + return Err(Error::FeerateEstimationUpdateFailed); + }, + Err(e) if idx == 0 => { + log_debug!( + self.logger, + "Timed out fetching cached CBF tip {} during fee estimation, \ + likely due to initial sync or a reorg: {}", + current_hash, + e + ); + *self.latest_tip.lock().unwrap() = None; + return Ok(None); + }, + Err(e) => { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + return Err(Error::FeerateEstimationUpdateTimeout); + }, + }; + + let height = indexed_block.height; + let block = &indexed_block.block; + let weight_kwu = block.weight().to_kwu_floor(); + + // Compute fee rate: (coinbase_output - subsidy) / weight. + // For blocks with zero weight (e.g. coinbase-only in regtest), use the floor rate. + let fee_rate_sat_per_kwu = if weight_kwu == 0 { + MIN_FEERATE_SAT_PER_KWU + } else { + let subsidy = block_subsidy(height); + let revenue = block + .txdata + .first() + .map(|tx| tx.output.iter().map(|o| o.value).sum()) + .unwrap_or(Amount::ZERO); + let block_fees = revenue.checked_sub(subsidy).unwrap_or(Amount::ZERO); + + if block_fees == Amount::ZERO && self.config.network == Network::Bitcoin { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: zero block fees are disallowed on Mainnet.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + (block_fees.to_sat() / weight_kwu).max(MIN_FEERATE_SAT_PER_KWU) + }; + + block_fee_rates.push(fee_rate_sat_per_kwu); + // Walk backwards through the chain via prev_blockhash. + if height == 0 { + break; + } + current_hash = block.header.prev_blockhash; + } + + if block_fee_rates.is_empty() { + log_error!(self.logger, "No blocks available for fee rate estimation."); + return Err(Error::FeerateEstimationUpdateFailed); + } + + block_fee_rates.sort(); + + let confirmation_targets = get_all_conf_targets(); + let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len()); + + for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + let base_fee_rate = select_fee_rate_for_target(&block_fee_rates, num_blocks); + let adjusted_fee_rate = apply_post_estimation_adjustments(target, base_fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + log_debug!( + self.logger, + "CBF fee rate estimation finished in {}ms ({} blocks sampled).", + now.elapsed().as_millis(), + block_fee_rates.len(), + ); + + Ok(Some(new_fee_rate_cache)) + } + + /// Fetch per-target fee rates from an Esplora server. + async fn fee_rate_cache_from_esplora( + &self, client: &esplora_client::AsyncClient, + ) -> Result, Error> { + let timeout = Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ); + let estimates = tokio::time::timeout(timeout, client.get_fee_estimates()) + .await + .map_err(|e| { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { + log_error!(self.logger, "Failed to retrieve fee rate estimates: {}", e); + Error::FeerateEstimationUpdateFailed + })?; + + if estimates.is_empty() && self.config.network == Network::Bitcoin { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: empty estimates are disallowed on Mainnet.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + let confirmation_targets = get_all_conf_targets(); + let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len()); + for target in confirmation_targets { + let num_blocks = get_num_block_defaults_for_target(target); + let converted_estimate_sat_vb = + esplora_client::convert_fee_rate(num_blocks, estimates.clone()) + .map_or(1.0, |converted| converted.max(1.0)); + let fee_rate = FeeRate::from_sat_per_kwu((converted_estimate_sat_vb * 250.0) as u64); + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + Ok(new_fee_rate_cache) + } + + /// Fetch per-target fee rates from an Electrum server. + /// + /// Opens a fresh connection for each call because `ElectrumClient` is not `Sync`. + async fn fee_rate_cache_from_electrum( + &self, server_url: &str, + ) -> Result, Error> { + let server_url = server_url.to_owned(); + let confirmation_targets = get_all_conf_targets(); + let per_request_timeout = self.sync_config.timeouts_config.per_request_timeout_secs; + + let raw_estimates: Vec = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.fee_rate_cache_update_timeout_secs, + ), + tokio::task::spawn_blocking(move || { + let electrum_config = electrum_client::ConfigBuilder::new() + .retry(3) + .timeout(Some(per_request_timeout)) + .build(); + let client = electrum_client::Client::from_config(&server_url, electrum_config) + .map_err(|_| Error::FeerateEstimationUpdateFailed)?; + let mut batch = electrum_client::Batch::default(); + for target in confirmation_targets { + batch.estimate_fee(get_num_block_defaults_for_target(target)); + } + client.batch_call(&batch).map_err(|_| Error::FeerateEstimationUpdateFailed) + }), + ) + .await + .map_err(|e| { + log_error!(self.logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|_| Error::FeerateEstimationUpdateFailed)? // JoinError + ?; // inner Result + + let confirmation_targets = get_all_conf_targets(); + + if raw_estimates.len() != confirmation_targets.len() + && self.config.network == Network::Bitcoin + { + log_error!( + self.logger, + "Failed to retrieve fee rate estimates: Electrum server didn't return all expected results.", + ); + return Err(Error::FeerateEstimationUpdateFailed); + } + + let mut new_fee_rate_cache = HashMap::with_capacity(confirmation_targets.len()); + for (target, raw_rate) in confirmation_targets.into_iter().zip(raw_estimates.into_iter()) { + // Electrum returns BTC/KvB; fall back to 1 sat/vb (= 0.00001 BTC/KvB) on failure. + let fee_rate_btc_per_kvb = + raw_rate.as_f64().map_or(0.00001_f64, |v: f64| v.max(0.00001)); + // Convert BTC/KvB → sat/kwu: multiply by 25_000_000 (= 10^8 / 4). + let fee_rate = + FeeRate::from_sat_per_kwu((fee_rate_btc_per_kvb * 25_000_000.0).round() as u64); + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + self.logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + Ok(new_fee_rate_cache) + } + + /// Broadcast a package of transactions via the P2P network. + pub(crate) async fn process_broadcast_package(&self, package: Vec) { + let Ok(requester) = self.requester() else { return }; + + for tx in package { + let txid = tx.compute_txid(); + let tx_bytes = tx.encode(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs), + requester.broadcast_tx(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(wtxid) => { + log_trace!( + self.logger, + "Successfully broadcast transaction {} (wtxid: {})", + txid, + wtxid + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {:?}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx_bytes) + ); + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx_bytes) + ); + }, + } + } + } +} + +/// Record the current timestamp in a `NodeMetrics` field and persist the metrics. +fn update_node_metrics_timestamp( + node_metrics: &RwLock, kv_store: &DynStore, logger: &Logger, + setter: impl FnOnce(&mut NodeMetrics, Option), +) -> Result<(), Error> { + let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + let mut locked = node_metrics.write().unwrap(); + setter(&mut locked, unix_time_secs_opt); + write_node_metrics(&*locked, kv_store, logger)?; + Ok(()) +} + +/// Fetch a block by hash and call `transactions_confirmed` on each confirmable. +async fn confirm_block_transactions( + requester: &Requester, block_hash: BlockHash, height: u32, + confirmables: &[&(dyn Confirm + Sync + Send)], logger: &Logger, +) -> Result<(), Error> { + let indexed_block = requester.get_block(block_hash).await.map_err(|e| { + log_error!(logger, "Failed to fetch block {}: {:?}", block_hash, e); + Error::TxSyncFailed + })?; + let block = &indexed_block.block; + let header = &block.header; + let txdata: Vec<(usize, &Transaction)> = block.txdata.iter().enumerate().collect(); + if !txdata.is_empty() { + for confirmable in confirmables { + confirmable.transactions_confirmed(header, &txdata, height); + } + } + Ok(()) +} + +/// Compute the block subsidy (mining reward before fees) at the given block height. +fn block_subsidy(height: u32) -> Amount { + let halvings = height / SUBSIDY_HALVING_INTERVAL; + if halvings >= 64 { + return Amount::ZERO; + } + let base = Amount::ONE_BTC.to_sat() * 50; + Amount::from_sat(base >> halvings) +} + +/// Select a fee rate from sorted block fee rates based on confirmation urgency. +/// +/// For urgent targets (1 block), uses the highest observed fee rate. +/// For medium targets (2-6 blocks), uses the 75th percentile. +/// For standard targets (7-12 blocks), uses the median. +/// For low-urgency targets (13+ blocks), uses the 25th percentile. +fn select_fee_rate_for_target(sorted_rates: &[u64], num_blocks: usize) -> FeeRate { + if sorted_rates.is_empty() { + return FeeRate::from_sat_per_kwu(MIN_FEERATE_SAT_PER_KWU); + } + + let len = sorted_rates.len(); + let idx = if num_blocks <= 1 { + len - 1 + } else if num_blocks <= 6 { + (len * 3) / 4 + } else if num_blocks <= 12 { + len / 2 + } else { + len / 4 + }; + + FeeRate::from_sat_per_kwu(sorted_rates[idx.min(len - 1)]) +} + +#[cfg(test)] +mod tests { + use bitcoin::constants::SUBSIDY_HALVING_INTERVAL; + use bitcoin::{Amount, FeeRate}; + + use super::{block_subsidy, select_fee_rate_for_target, MIN_FEERATE_SAT_PER_KWU}; + use crate::fee_estimator::{ + apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, + }; + + #[test] + fn select_fee_rate_empty_returns_floor() { + let rate = select_fee_rate_for_target(&[], 1); + assert_eq!(rate, FeeRate::from_sat_per_kwu(MIN_FEERATE_SAT_PER_KWU)); + } + + #[test] + fn select_fee_rate_single_element_returns_it_for_all_buckets() { + let rates = [5000u64]; + // Every urgency bucket should return the single available rate. + for num_blocks in [1, 3, 6, 12, 144, 1008] { + let rate = select_fee_rate_for_target(&rates, num_blocks); + assert_eq!( + rate, + FeeRate::from_sat_per_kwu(5000), + "num_blocks={} should return the only available rate", + num_blocks, + ); + } + } + + #[test] + fn select_fee_rate_picks_correct_percentile() { + // 6 sorted rates: indices 0..5 + let rates = [100, 200, 300, 400, 500, 600]; + // 1-block (most urgent): highest → index 5 → 600 + assert_eq!(select_fee_rate_for_target(&rates, 1), FeeRate::from_sat_per_kwu(600)); + // 6-block (medium): 75th percentile → (6*3)/4 = 4 → 500 + assert_eq!(select_fee_rate_for_target(&rates, 6), FeeRate::from_sat_per_kwu(500)); + // 12-block (standard): median → 6/2 = 3 → 400 + assert_eq!(select_fee_rate_for_target(&rates, 12), FeeRate::from_sat_per_kwu(400)); + // 144-block (low): 25th percentile → 6/4 = 1 → 200 + assert_eq!(select_fee_rate_for_target(&rates, 144), FeeRate::from_sat_per_kwu(200)); + } + + #[test] + fn select_fee_rate_monotonic_urgency() { + // More urgent targets should never produce lower rates than less urgent ones. + let rates = [250, 500, 1000, 2000, 4000, 8000]; + let urgent = select_fee_rate_for_target(&rates, 1); + let medium = select_fee_rate_for_target(&rates, 6); + let standard = select_fee_rate_for_target(&rates, 12); + let low = select_fee_rate_for_target(&rates, 144); + + assert!( + urgent >= medium, + "urgent ({}) >= medium ({})", + urgent.to_sat_per_kwu(), + medium.to_sat_per_kwu() + ); + assert!( + medium >= standard, + "medium ({}) >= standard ({})", + medium.to_sat_per_kwu(), + standard.to_sat_per_kwu() + ); + assert!( + standard >= low, + "standard ({}) >= low ({})", + standard.to_sat_per_kwu(), + low.to_sat_per_kwu() + ); + } + + #[test] + fn uniform_rates_match_naive_single_rate() { + // When all blocks have the same fee rate (like the old single-block + // approach), every target should select that same base rate. This + // proves the optimized multi-block approach is backwards-compatible. + + let uniform_rate = 3000u64; + let rates = [uniform_rate; 6]; + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + let optimized = select_fee_rate_for_target(&rates, num_blocks); + let naive = FeeRate::from_sat_per_kwu(uniform_rate); + assert_eq!( + optimized, naive, + "For target {:?} (num_blocks={}), optimized rate should match naive single-rate", + target, num_blocks, + ); + + // Also verify the post-estimation adjustments produce the same + // result for both approaches. + let adjusted_optimized = apply_post_estimation_adjustments(target, optimized); + let adjusted_naive = apply_post_estimation_adjustments(target, naive); + assert_eq!(adjusted_optimized, adjusted_naive); + } + } + + #[test] + fn block_subsidy_genesis() { + assert_eq!(block_subsidy(0), Amount::from_sat(50 * 100_000_000)); + } + + #[test] + fn block_subsidy_first_halving() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL), Amount::from_sat(25 * 100_000_000)); + } + + #[test] + fn block_subsidy_second_halving() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 2), Amount::from_sat(1_250_000_000)); + } + + #[test] + fn block_subsidy_exhausted_after_64_halvings() { + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 64), Amount::ZERO); + assert_eq!(block_subsidy(SUBSIDY_HALVING_INTERVAL * 100), Amount::ZERO); + } + + #[test] + fn select_fee_rate_two_elements() { + let rates = [1000, 5000]; + // 1-block: index 1 (highest) → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 1), FeeRate::from_sat_per_kwu(5000)); + // 6-block: (2*3)/4 = 1 → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 6), FeeRate::from_sat_per_kwu(5000)); + // 12-block: 2/2 = 1 → 5000 + assert_eq!(select_fee_rate_for_target(&rates, 12), FeeRate::from_sat_per_kwu(5000)); + // 144-block: 2/4 = 0 → 1000 + assert_eq!(select_fee_rate_for_target(&rates, 144), FeeRate::from_sat_per_kwu(1000)); + } + + #[test] + fn select_fee_rate_all_targets_use_valid_indices() { + for size in 1..=6 { + let rates: Vec = (1..=size).map(|i| i as u64 * 1000).collect(); + for target in get_all_conf_targets() { + let num_blocks = get_num_block_defaults_for_target(target); + let _ = select_fee_rate_for_target(&rates, num_blocks); + } + } + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 49c011a78..b896ba6fb 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. pub(crate) mod bitcoind; +mod cbf; mod electrum; mod esplora; @@ -17,11 +18,12 @@ use bitcoin::{Script, Txid}; use lightning::chain::{BestBlock, Filter}; use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; +use crate::chain::cbf::CbfChainSource; use crate::chain::electrum::ElectrumChainSource; use crate::chain::esplora::EsploraChainSource; use crate::config::{ - BackgroundSyncConfig, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, - WALLET_SYNC_INTERVAL_MINIMUM_SECS, + BackgroundSyncConfig, BitcoindRestClientConfig, CbfSyncConfig, Config, ElectrumSyncConfig, + EsploraSyncConfig, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; @@ -82,6 +84,20 @@ impl WalletSyncStatus { } } +/// Optional external fee estimation backend for the CBF chain source. +/// +/// By default CBF derives fee rates from recent blocks' coinbase outputs. +/// Setting an external source provides more accurate, per-target estimates +/// from a mempool-aware server. +#[derive(Debug, Clone)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))] +pub enum FeeSourceConfig { + /// Use an Esplora HTTP server for fee rate estimation. + Esplora(String), + /// Use an Electrum server for fee rate estimation. + Electrum(String), +} + pub(crate) struct ChainSource { kind: ChainSourceKind, registered_txids: Mutex>, @@ -93,6 +109,7 @@ enum ChainSourceKind { Esplora(EsploraChainSource), Electrum(ElectrumChainSource), Bitcoind(BitcoindChainSource), + Cbf(CbfChainSource), } impl ChainSource { @@ -184,11 +201,35 @@ impl ChainSource { (Self { kind, registered_txids, tx_broadcaster, logger }, best_block) } + pub(crate) fn new_cbf( + peers: Vec, sync_config: CbfSyncConfig, fee_source_config: Option, + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> Result<(Self, Option), Error> { + let cbf_chain_source = CbfChainSource::new( + peers, + sync_config, + fee_source_config, + fee_estimator, + kv_store, + config, + Arc::clone(&logger), + node_metrics, + )?; + let kind = ChainSourceKind::Cbf(cbf_chain_source); + let registered_txids = Mutex::new(Vec::new()); + Ok((Self { kind, registered_txids, tx_broadcaster, logger }, None)) + } + pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { match &self.kind { ChainSourceKind::Electrum(electrum_chain_source) => { electrum_chain_source.start(runtime)? }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.start(runtime); + }, _ => { // Nothing to do for other chain sources. }, @@ -199,6 +240,9 @@ impl ChainSource { pub(crate) fn stop(&self) { match &self.kind { ChainSourceKind::Electrum(electrum_chain_source) => electrum_chain_source.stop(), + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.stop(); + }, _ => { // Nothing to do for other chain sources. }, @@ -210,6 +254,7 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { Some(bitcoind_chain_source.as_utxo_source()) }, + ChainSourceKind::Cbf { .. } => None, _ => None, } } @@ -223,6 +268,7 @@ impl ChainSource { ChainSourceKind::Esplora(_) => true, ChainSourceKind::Electrum { .. } => true, ChainSourceKind::Bitcoind { .. } => false, + ChainSourceKind::Cbf { .. } => true, } } @@ -289,6 +335,28 @@ impl ChainSource { ) .await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + if let Some(background_sync_config) = + cbf_chain_source.sync_config.background_sync_config.as_ref() + { + self.start_tx_based_sync_loop( + stop_sync_receiver, + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + background_sync_config, + Arc::clone(&self.logger), + ) + .await + } else { + log_info!( + self.logger, + "Background syncing is disabled. Manual syncing required for onchain wallet, lightning wallet, and fee rate updates.", + ); + return; + } + }, } } @@ -368,6 +436,9 @@ impl ChainSource { // `ChainPoller`. So nothing to do here. unreachable!("Onchain wallet will be synced via chain polling") }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.sync_onchain_wallet(onchain_wallet).await + }, } } @@ -393,6 +464,11 @@ impl ChainSource { // `ChainPoller`. So nothing to do here. unreachable!("Lightning wallet will be synced via chain polling") }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source + .sync_lightning_wallet(channel_manager, chain_monitor, output_sweeper) + .await + }, } } @@ -421,6 +497,10 @@ impl ChainSource { ) .await }, + ChainSourceKind::Cbf { .. } => { + // In CBF mode we sync wallets via compact block filters. + unreachable!("Listeners will be synced via compact block filter syncing") + }, } } @@ -435,6 +515,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.update_fee_rate_estimates().await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.update_fee_rate_estimates().await + }, } } @@ -463,6 +546,9 @@ impl ChainSource { ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source.process_broadcast_package(next_package).await }, + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.process_broadcast_package(next_package).await + }, } } } @@ -481,6 +567,9 @@ impl Filter for ChainSource { electrum_chain_source.register_tx(txid, script_pubkey) }, ChainSourceKind::Bitcoind { .. } => (), + ChainSourceKind::Cbf(cbf_chain_source) => { + cbf_chain_source.register_tx(txid, script_pubkey) + }, } } fn register_output(&self, output: lightning::chain::WatchedOutput) { @@ -492,6 +581,7 @@ impl Filter for ChainSource { electrum_chain_source.register_output(output) }, ChainSourceKind::Bitcoind { .. } => (), + ChainSourceKind::Cbf(cbf_chain_source) => cbf_chain_source.register_output(output), } } } diff --git a/src/config.rs b/src/config.rs index 71e4d2314..893c734d9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -482,6 +482,53 @@ impl Default for ElectrumSyncConfig { } } +/// Configuration for syncing via BIP 157 compact block filters. +/// +/// Background syncing is enabled by default, using the default values specified in +/// [`BackgroundSyncConfig`]. +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct CbfSyncConfig { + /// Background sync configuration. + /// + /// If set to `None`, background syncing will be disabled. Users will need to manually + /// sync via [`Node::sync_wallets`] for the wallets and fee rate updates. + /// + /// [`Node::sync_wallets`]: crate::Node::sync_wallets + pub background_sync_config: Option, + /// Sync timeouts configuration. + pub timeouts_config: SyncTimeoutsConfig, + /// Peer response timeout in seconds for the bip157 P2P node. + /// + /// If a peer does not respond within this duration, the connection may be dropped. + /// Higher values are recommended for slow peers or when downloading many blocks. + /// + /// Defaults to 30 seconds. + pub response_timeout_secs: u64, + /// Number of peers that must agree on filter headers before they are accepted. + /// + /// Higher values increase security against malicious peers serving invalid compact block + /// filters, at the cost of slower sync times. Must be between 1 and 15. + /// + /// As recommended by BIP 157, clients should connect to multiple peers to mitigate the risk + /// of downloading incorrect filter headers. Setting this to 1 means filter headers from a + /// single peer are trusted without cross-validation. + /// + /// Defaults to 2. + pub required_peers: u8, +} + +impl Default for CbfSyncConfig { + fn default() -> Self { + Self { + background_sync_config: Some(BackgroundSyncConfig::default()), + timeouts_config: SyncTimeoutsConfig::default(), + response_timeout_secs: 30, + required_peers: 2, + } + } +} + /// Configuration for syncing with Bitcoin Core backend via REST. #[derive(Debug, Clone)] pub struct BitcoindRestClientConfig { diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 5a1420882..57d51f718 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -142,7 +142,9 @@ impl VssClientHeaderProvider for VssHeaderProviderAdapter { } use crate::builder::sanitize_alias; -pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig}; +pub use crate::config::{ + default_config, CbfSyncConfig, ElectrumSyncConfig, EsploraSyncConfig, TorConfig, +}; pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; diff --git a/src/lib.rs b/src/lib.rs index 2e02e996c..1f6df49fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,6 +129,7 @@ pub use builder::BuildError; #[cfg(not(feature = "uniffi"))] pub use builder::NodeBuilder as Builder; use chain::ChainSource; +pub use chain::FeeSourceConfig; use config::{ default_user_config, may_announce_channel, AsyncPaymentsRole, ChannelConfig, Config, LNURL_AUTH_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0e80a46db..8d4f22cbb 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -122,6 +122,28 @@ impl Wallet { self.inner.lock().unwrap().start_sync_with_revealed_spks().build() } + pub(crate) fn get_spks_for_cbf_sync(&self, stop_gap: usize) -> Vec { + let wallet = self.inner.lock().unwrap(); + let mut scripts: Vec = + wallet.spk_index().revealed_spks(..).map(|((_, _), spk)| spk).collect(); + + // For first sync when no scripts have been revealed yet, generate + // lookahead scripts up to the stop gap for both keychains. + if scripts.is_empty() { + for keychain in [KeychainKind::External, KeychainKind::Internal] { + for idx in 0..stop_gap as u32 { + scripts.push(wallet.peek_address(keychain, idx).address.script_pubkey()); + } + } + } + + scripts + } + + pub(crate) fn latest_checkpoint(&self) -> bdk_chain::CheckPoint { + self.inner.lock().unwrap().latest_checkpoint() + } + pub(crate) fn get_cached_txs(&self) -> Vec> { self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() } @@ -1155,9 +1177,15 @@ impl Wallet { let kind = PaymentKind::Onchain { txid, status: confirmation_status }; - let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let fee = match locked_wallet.calculate_fee(tx) { + Ok(fee) => Some(fee), + Err(e) => { + log_error!(self.logger, "Failed to calculate fee for tx {}: {:?}", txid, e); + None + }, + }; let (sent, received) = locked_wallet.sent_and_received(tx); - let fee_sat = fee.to_sat(); + let fee_sat = fee.map_or(0, |f| f.to_sat()); let (direction, amount_msat) = if sent > received { ( @@ -1180,7 +1208,7 @@ impl Wallet { payment_id, kind, amount_msat, - Some(fee_sat * 1000), + fee.map(|f| f.to_sat() * 1000), direction, payment_status, ) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4f68f9825..45ac10f1a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,6 +7,8 @@ #![cfg(any(test, cln_test, lnd_test, vss_test))] #![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_macros)] pub(crate) mod logging; @@ -27,7 +29,10 @@ use bitcoin::{ use electrsd::corepc_node::{Client as BitcoindClient, Node as BitcoinD}; use electrsd::{corepc_node, ElectrsD}; use electrum_client::ElectrumApi; -use ldk_node::config::{AsyncPaymentsRole, Config, ElectrumSyncConfig, EsploraSyncConfig}; +use ldk_node::config::{ + AsyncPaymentsRole, CbfSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, + SyncTimeoutsConfig, +}; use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; @@ -48,6 +53,17 @@ use rand::distr::Alphanumeric; use rand::{rng, Rng}; use serde_json::{json, Value}; +macro_rules! skip_if_cbf { + ($chain_source:expr) => { + if matches!($chain_source, TestChainSource::Cbf(_)) { + println!("Skipping test: not compatible with CBF chain source"); + return; + } + }; +} + +pub(crate) use skip_if_cbf; + macro_rules! expect_event { ($node:expr, $event_type:ident) => {{ match $node.next_event_async().await { @@ -223,6 +239,11 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { let mut bitcoind_conf = corepc_node::Conf::default(); bitcoind_conf.network = "regtest"; bitcoind_conf.args.push("-rest"); + + bitcoind_conf.p2p = corepc_node::P2P::Yes; + bitcoind_conf.args.push("-blockfilterindex=1"); + bitcoind_conf.args.push("-peerblockfilters=1"); + let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); let electrs_exe = env::var("ELECTRS_EXE") @@ -239,7 +260,16 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { pub(crate) fn random_chain_source<'a>( bitcoind: &'a BitcoinD, electrsd: &'a ElectrsD, ) -> TestChainSource<'a> { - let r = rand::random_range(0..3); + // Allow forcing a specific backend via LDK_TEST_CHAIN_SOURCE env var. + // Valid values: "esplora", "electrum", "bitcoind-rpc", "bitcoind-rest", "cbf" + let r = match std::env::var("LDK_TEST_CHAIN_SOURCE").ok().as_deref() { + Some("esplora") => 0, + Some("electrum") => 1, + Some("bitcoind-rpc") => 2, + Some("bitcoind-rest") => 3, + Some("cbf") => 4, + _ => rand::random_range(0..5), + }; match r { 0 => { println!("Randomly setting up Esplora chain syncing..."); @@ -257,6 +287,10 @@ pub(crate) fn random_chain_source<'a>( println!("Randomly setting up Bitcoind REST chain syncing..."); TestChainSource::BitcoindRestSync(bitcoind) }, + 4 => { + println!("Randomly setting up CBF compact block filter syncing..."); + TestChainSource::Cbf(bitcoind) + }, _ => unreachable!(), } } @@ -324,6 +358,7 @@ pub(crate) enum TestChainSource<'a> { Electrum(&'a ElectrsD), BitcoindRpcSync(&'a BitcoinD), BitcoindRestSync(&'a BitcoinD), + Cbf(&'a BitcoinD), } #[derive(Clone, Copy)] @@ -461,6 +496,23 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> rpc_password, ); }, + TestChainSource::Cbf(bitcoind) => { + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + let timeouts_config = SyncTimeoutsConfig { + onchain_wallet_sync_timeout_secs: 3, + lightning_wallet_sync_timeout_secs: 3, + fee_rate_cache_update_timeout_secs: 3, + tx_broadcast_timeout_secs: 3, + per_request_timeout_secs: 3, + }; + let sync_config = CbfSyncConfig { + background_sync_config: None, + timeouts_config, + ..Default::default() + }; + builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config), None); + }, } match &config.log_writer { @@ -495,7 +547,10 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> node.start().unwrap(); assert!(node.status().is_running); - assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); + + if !matches!(chain_source, TestChainSource::Cbf(_)) { + assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); + } node } @@ -569,7 +624,9 @@ pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoin let tx = electrs.transaction_get(&outpoint.txid).unwrap(); let txout_script = tx.output.get(outpoint.vout as usize).unwrap().clone().script_pubkey; - let is_spent = !electrs.script_get_history(&txout_script).unwrap().is_empty(); + // An output's script will have at least 1 history entry (the tx that created it). + // When the output is spent, there will be at least 2 entries (creating + spending tx). + let is_spent = electrs.script_get_history(&txout_script).unwrap().len() >= 2; if is_spent { return; } @@ -577,12 +634,30 @@ pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoin exponential_backoff_poll(|| { electrs.ping().unwrap(); - let is_spent = !electrs.script_get_history(&txout_script).unwrap().is_empty(); + let is_spent = electrs.script_get_history(&txout_script).unwrap().len() >= 2; is_spent.then_some(()) }) .await; } +pub(crate) async fn wait_for_cbf_sync(node: &TestNode) { + let before = node.status().latest_onchain_wallet_sync_timestamp; + let mut delay = Duration::from_millis(200); + for _ in 0..30 { + if node.sync_wallets().is_ok() { + let after = node.status().latest_onchain_wallet_sync_timestamp; + if after > before { + return; + } + } + tokio::time::sleep(delay).await; + if delay < Duration::from_secs(2) { + delay = delay.mul_f32(1.5); + } + } + panic!("wait_for_cbf_sync: timed out waiting for CBF sync to complete"); +} + pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, @@ -1221,8 +1296,9 @@ pub(crate) async fn do_channel_full_cycle( let splice_out_sat = funding_amount_sat / 2; node_b.splice_out(&user_channel_id_b, node_a.node_id(), &addr_a, splice_out_sat).unwrap(); - expect_splice_pending_event!(node_a, node_b.node_id()); + let splice_out_txo = expect_splice_pending_event!(node_a, node_b.node_id()); expect_splice_pending_event!(node_b, node_a.node_id()); + wait_for_tx(electrsd, splice_out_txo.txid).await; generate_blocks_and_wait(&bitcoind, electrsd, 6).await; node_a.sync_wallets().unwrap(); @@ -1243,8 +1319,9 @@ pub(crate) async fn do_channel_full_cycle( let splice_in_sat = splice_out_sat; node_a.splice_in(&user_channel_id_a, node_b.node_id(), splice_in_sat).unwrap(); - expect_splice_pending_event!(node_a, node_b.node_id()); + let splice_in_txo = expect_splice_pending_event!(node_a, node_b.node_id()); expect_splice_pending_event!(node_b, node_a.node_id()); + wait_for_tx(electrsd, splice_in_txo.txid).await; generate_blocks_and_wait(&bitcoind, electrsd, 6).await; node_a.sync_wallets().unwrap(); @@ -1272,12 +1349,22 @@ pub(crate) async fn do_channel_full_cycle( expect_event!(node_a, ChannelClosed); expect_event!(node_b, ChannelClosed); - wait_for_outpoint_spend(electrsd, funding_txo_b).await; + // After splices, the latest funding outpoint is from the last splice. + // We must wait for the close tx (which spends the latest funding output) + // to propagate before mining. + wait_for_outpoint_spend(electrsd, splice_in_txo).await; generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); + // CBF needs a second sync: the first sync confirms the close tx in the + // Lightning wallet, which may trigger new script registrations. The + // second sync picks up blocks matching those new scripts for the + // on-chain wallet. + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + if force_close { // Check node_b properly sees all balances and sweeps them. assert_eq!(node_b.list_balances().lightning_balances.len(), 1); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 413b2d44a..94805de1a 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,8 +23,9 @@ use common::{ expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, generate_listening_addresses, open_channel, open_channel_push_amt, open_channel_with_all, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, skip_if_cbf, + splice_in_with_all, wait_for_cbf_sync, wait_for_tx, TestChainSource, TestStoreType, + TestSyncStore, }; use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; @@ -74,6 +75,7 @@ async fn channel_full_cycle_force_close_trusted_no_reserve() { async fn channel_full_cycle_0conf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = random_chain_source(&bitcoind, &electrsd); + skip_if_cbf!(chain_source); let (node_a, node_b) = setup_two_nodes(&chain_source, true, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, true, true, false) .await; @@ -977,6 +979,7 @@ async fn splice_channel() { let txo = expect_splice_pending_event!(node_a, node_b.node_id()); expect_splice_pending_event!(node_b, node_a.node_id()); + wait_for_tx(&electrsd.client, txo.txid).await; generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; @@ -1020,6 +1023,7 @@ async fn splice_channel() { let txo = expect_splice_pending_event!(node_a, node_b.node_id()); expect_splice_pending_event!(node_b, node_a.node_id()); + wait_for_tx(&electrsd.client, txo.txid).await; generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; @@ -1668,8 +1672,8 @@ async fn unified_send_receive_bip21_uri() { }, }; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -2554,6 +2558,7 @@ async fn persistence_backwards_compatibility() { async fn onchain_fee_bump_rbf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = random_chain_source(&bitcoind, &electrsd); + skip_if_cbf!(chain_source); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); // Fund both nodes @@ -2873,3 +2878,336 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_cbf() { + let (bitcoind, _electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + assert!(node.status().is_running); + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + + node.start().unwrap(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + assert!(node.status().is_running); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn fee_rate_estimation_after_manual_sync_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + Amount::from_sat(100_000), + ) + .await; + + wait_for_cbf_sync(&node).await; + let first_fee_update = node.status().latest_fee_rate_cache_update_timestamp; + assert!(first_fee_update.is_some()); + + // Subsequent manual syncs should keep the fee cache populated. + node.sync_wallets().unwrap(); + let second_fee_update = node.status().latest_fee_rate_cache_update_timestamp; + assert!(second_fee_update.is_some()); + assert!(second_fee_update >= first_fee_update); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn repeated_manual_sync_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let node = setup_node(&chain_source, random_config(true)); + + let addr = node.onchain_payment().new_address().unwrap(); + let premine_amount_sat = 100_000; + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Regression: the second manual sync must not block forever. + node.sync_wallets().unwrap(); + assert_eq!(node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_reinit_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let config = random_config(true); + + let p2p_socket = bitcoind.params.p2p_socket.expect("P2P must be enabled for CBF"); + let peer_addr = format!("{}", p2p_socket); + let sync_config = + ldk_node::config::CbfSyncConfig { background_sync_config: None, ..Default::default() }; + + let test_sync_store = TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); + + setup_builder!(builder, config.node_config); + builder.set_chain_source_cbf(vec![peer_addr.clone()], Some(sync_config.clone()), None); + + let node = builder + .build_with_store(config.node_entropy.clone().into(), test_sync_store.clone()) + .unwrap(); + node.start().unwrap(); + + let expected_node_id = node.node_id(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + + let funding_address = node.onchain_payment().new_address().unwrap(); + assert_eq!(node.list_balances().total_onchain_balance_sats, 0); + + let expected_amount = Amount::from_sat(100_000); + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![funding_address], + expected_amount, + ) + .await; + + wait_for_cbf_sync(&node).await; + assert_eq!(node.list_balances().spendable_onchain_balance_sats, expected_amount.to_sat()); + + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + + node.start().unwrap(); + assert_eq!(node.start(), Err(NodeError::AlreadyRunning)); + + node.stop().unwrap(); + assert_eq!(node.stop(), Err(NodeError::NotRunning)); + drop(node); + + // Reinitialize from the same config and store. + setup_builder!(builder, config.node_config); + builder.set_chain_source_cbf(vec![peer_addr], Some(sync_config), None); + + let reinitialized_node = + builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + reinitialized_node.start().unwrap(); + assert_eq!(reinitialized_node.node_id(), expected_node_id); + + // Balance should be persisted from the previous run. + assert_eq!( + reinitialized_node.list_balances().spendable_onchain_balance_sats, + expected_amount.to_sat() + ); + + wait_for_cbf_sync(&reinitialized_node).await; + assert_eq!( + reinitialized_node.list_balances().spendable_onchain_balance_sats, + expected_amount.to_sat() + ); + + reinitialized_node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_recovery_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + + let original_config = random_config(true); + let original_node_entropy = original_config.node_entropy.clone(); + let original_node = setup_node(&chain_source, original_config); + + let premine_amount_sat = 100_000; + + let addr_1 = original_node.onchain_payment().new_address().unwrap(); + + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_1], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&original_node).await; + assert_eq!(original_node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + let addr_2 = original_node.onchain_payment().new_address().unwrap(); + + let txid = bitcoind + .client + .send_to_address(&addr_2, Amount::from_sat(premine_amount_sat)) + .unwrap() + .0 + .parse() + .unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + + wait_for_cbf_sync(&original_node).await; + assert_eq!( + original_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 2 + ); + + original_node.stop().unwrap(); + drop(original_node); + + // Now we start from scratch, only the seed remains the same. + let mut recovered_config = random_config(true); + recovered_config.node_entropy = original_node_entropy; + recovered_config.recovery_mode = true; + let recovered_node = setup_node(&chain_source, recovered_config); + + wait_for_cbf_sync(&recovered_node).await; + assert_eq!( + recovered_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 2 + ); + + // Check we sync even when skipping some addresses. + let _addr_3 = recovered_node.onchain_payment().new_address().unwrap(); + let _addr_4 = recovered_node.onchain_payment().new_address().unwrap(); + let _addr_5 = recovered_node.onchain_payment().new_address().unwrap(); + let addr_6 = recovered_node.onchain_payment().new_address().unwrap(); + + let txid = bitcoind + .client + .send_to_address(&addr_6, Amount::from_sat(premine_amount_sat)) + .unwrap() + .0 + .parse() + .unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; + + wait_for_cbf_sync(&recovered_node).await; + assert_eq!( + recovered_node.list_balances().spendable_onchain_balance_sats, + premine_amount_sat * 3 + ); + + recovered_node.stop().unwrap(); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_receive_cbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Cbf(&bitcoind); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + let premine_amount_sat = 1_100_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a.clone(), addr_b.clone()], + Amount::from_sat(premine_amount_sat), + ) + .await; + + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + assert_eq!(node_b.list_balances().spendable_onchain_balance_sats, premine_amount_sat); + + // Check on-chain payment tracking after premine. + let node_a_payments = node_a.list_payments(); + let node_b_payments = node_b.list_payments(); + for payments in [&node_a_payments, &node_b_payments] { + assert_eq!(payments.len(), 1); + } + for p in [node_a_payments.first().unwrap(), node_b_payments.first().unwrap()] { + assert_eq!(p.amount_msat, Some(premine_amount_sat * 1000)); + assert_eq!(p.direction, PaymentDirection::Inbound); + assert_eq!(p.status, PaymentStatus::Pending); + match p.kind { + PaymentKind::Onchain { status, .. } => { + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + } + + // Send from B to A. + let amount_to_send_sats = 54_321; + let txid = + node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + // Mine the transaction so CBF can see it. + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + + let payment_id = PaymentId(txid.to_byte_array()); + let payment_a = node_a.payment(&payment_id).unwrap(); + match payment_a.kind { + PaymentKind::Onchain { txid: tx, status } => { + assert_eq!(tx, txid); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + assert!(payment_a.fee_paid_msat > Some(0)); + assert_eq!(payment_a.amount_msat, Some(amount_to_send_sats * 1000)); + + let payment_b = node_b.payment(&payment_id).unwrap(); + match payment_b.kind { + PaymentKind::Onchain { txid: tx, status } => { + assert_eq!(tx, txid); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + assert!(payment_b.fee_paid_msat > Some(0)); + assert_eq!(payment_b.amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(payment_a.fee_paid_msat, payment_b.fee_paid_msat); + + let onchain_fee_buffer_sat = 1000; + let expected_node_a_balance = premine_amount_sat + amount_to_send_sats; + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, expected_node_a_balance); + assert!( + node_b.list_balances().spendable_onchain_balance_sats + > premine_amount_sat - amount_to_send_sats - onchain_fee_buffer_sat + ); + assert!( + node_b.list_balances().spendable_onchain_balance_sats + < premine_amount_sat - amount_to_send_sats + ); + + // Test send_all_to_address. + let addr_b2 = node_b.onchain_payment().new_address().unwrap(); + let txid = node_a.onchain_payment().send_all_to_address(&addr_b2, false, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_cbf_sync(&node_a).await; + node_b.sync_wallets().unwrap(); + + assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, 0); + assert_eq!(node_a.list_balances().total_onchain_balance_sats, 0); + assert!(node_b.list_balances().spendable_onchain_balance_sats > premine_amount_sat); + + node_a.stop().unwrap(); + node_b.stop().unwrap(); +}