From 5a9e8cd4a79593814301a7109ac2e3a48694b324 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 12:15:14 +0100 Subject: [PATCH 1/7] Flush memtables before enabling WAL after initial block download completion --- src/store/db.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/store/db.rs b/src/store/db.rs index eaff9d3..e574054 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -630,7 +630,15 @@ impl Store for DBStore { } fn ibd_finished(&self) { - log::info!("Initial block download finished, setting ibd to false in the store"); + log::info!("Initial block download finished, flushing memtables before enabling WAL..."); + for cf_name in COLUMN_FAMILIES { + if let Some(cf) = self.db.cf_handle(cf_name) { + self.db + .flush_cf(&cf) + .unwrap_or_else(|e| log::error!("failed to flush CF {cf_name}: {e}")); + } + } + log::info!("Memtables flushed, setting ibd to false"); self.ibd.store(false, Ordering::Relaxed); } } From 4e5d61271af7dde26b6032e1d9dadcc8c0aad1e8 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 14:19:27 +0100 Subject: [PATCH 2/7] Reduce HeaderTimeout log severity --- src/server/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 0c15ade..dd12b19 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -452,7 +452,12 @@ pub async fn inner_main( .await; if let Err(err) = result { - log::error!("Error serving connection: {:?}", err); + let msg = format!("{err:?}"); + if msg.contains("HeaderTimeout") { + log::warn!("Header read timeout (possible scanner/slowloris): {msg}"); + } else { + log::error!("Error serving connection: {msg}"); + } } }); }, From ed15b8f1ade9514175679e736941ff07e8bbd34d Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 14:19:44 +0100 Subject: [PATCH 3/7] fmt change --- src/server/mod.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index dd12b19..4ca466d 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -160,7 +160,10 @@ impl std::fmt::Debug for Arguments { .field("enable_db_statistics", &self.enable_db_statistics) .field("cache_control_seconds", &self.cache_control_seconds) .field("request_timeout_seconds", &self.request_timeout_seconds) - .field("header_read_timeout_seconds", &self.header_read_timeout_seconds); + .field( + "header_read_timeout_seconds", + &self.header_read_timeout_seconds, + ); #[cfg(feature = "db")] { From 377ea810d46994b4c4f3cc75282704b25a794ebd Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 14:21:34 +0100 Subject: [PATCH 4/7] clippy fix --- src/be/transaction.rs | 6 +++--- src/be/txid.rs | 2 +- src/fetch.rs | 24 +++++++++++------------- src/server/route.rs | 2 +- src/server/state.rs | 2 +- src/store/db.rs | 5 +++-- src/threads/blocks.rs | 2 +- src/threads/mempool.rs | 2 +- 8 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/be/transaction.rs b/src/be/transaction.rs index a55397a..f029a92 100644 --- a/src/be/transaction.rs +++ b/src/be/transaction.rs @@ -96,7 +96,7 @@ impl Transaction { } } -impl<'a> TransactionRef<'a> { +impl TransactionRef<'_> { pub fn txid(&self) -> crate::be::Txid { match self { TransactionRef::Bitcoin(tx) => tx.compute_txid().into(), @@ -160,7 +160,7 @@ impl<'a> Iterator for InputIterator<'a> { } } -impl<'a> OutputRef<'a> { +impl OutputRef<'_> { pub(crate) fn skip_utxo(&self) -> bool { match self { OutputRef::Bitcoin(output) => output.script_pubkey.is_op_return(), @@ -201,7 +201,7 @@ impl<'a> OutputRef<'a> { } } -impl<'a> InputRef<'a> { +impl InputRef<'_> { pub(crate) fn skip_indexing(&self) -> bool { match self { InputRef::Bitcoin(_) => false, diff --git a/src/be/txid.rs b/src/be/txid.rs index 6a0ccf4..dac3516 100644 --- a/src/be/txid.rs +++ b/src/be/txid.rs @@ -136,7 +136,7 @@ impl<'de> Deserialize<'de> for Txid { if deserializer.is_human_readable() { struct TxidVisitor; - impl<'de> Visitor<'de> for TxidVisitor { + impl Visitor<'_> for TxidVisitor { type Value = Txid; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { diff --git a/src/fetch.rs b/src/fetch.rs index 6bfdff7..862210a 100644 --- a/src/fetch.rs +++ b/src/fetch.rs @@ -164,7 +164,7 @@ impl Client { .with_context(|| format!("failing converting {text} to ChainInfo"))?; Ok(Some(chain_info)) } else { - return Err(Error::UnexpectedStatus(url, status).into()); + Err(Error::UnexpectedStatus(url, status).into()) } } @@ -374,18 +374,16 @@ impl Client { let content: HashSet = serde_json::from_slice(&body_bytes) .with_context(|| format!("failure converting {url} body in HashSet"))?; content + } else if support_verbose { + serde_json::from_slice(&body_bytes) + .with_context(|| format!("failure converting {url} body in HashSet "))? } else { - if support_verbose { - serde_json::from_slice(&body_bytes) - .with_context(|| format!("failure converting {url} body in HashSet "))? - } else { - let content: HashMap = serde_json::from_slice(&body_bytes) - .with_context(|| { - format!("failure converting {url} body in HashMap ") - })?; + let content: HashMap = serde_json::from_slice(&body_bytes) + .with_context(|| { + format!("failure converting {url} body in HashMap ") + })?; - content.into_keys().collect() - } + content.into_keys().collect() }) } @@ -477,11 +475,11 @@ impl Client { if let Some(header) = self.block_header_json(last.hash, family).await? { if let Some(next) = header.nextblockhash { let header = self.block_header(next, family).await?; - return Ok(ChainStatus::NewBlock(BlockMeta::new( + Ok(ChainStatus::NewBlock(BlockMeta::new( last.height + 1, next, header.time(), - ))); + ))) } else { Ok(ChainStatus::Tip) } diff --git a/src/server/route.rs b/src/server/route.rs index 332a527..6a4ecd7 100644 --- a/src/server/route.rs +++ b/src/server/route.rs @@ -671,7 +671,7 @@ async fn handle_waterfalls_req( txs_seen: map, page, tip: tip_hash, - tip_meta: tip_meta, + tip_meta, }; let content = if cbor { "application/cbor" diff --git a/src/server/state.rs b/src/server/state.rs index 0f509bf..1850b65 100644 --- a/src/server/state.rs +++ b/src/server/state.rs @@ -90,7 +90,7 @@ impl State { } pub async fn set_hash_ts(&self, meta: &BlockMeta) { let mut blocks_hash_ts = self.blocks_hash_ts.lock().await; - update_hash_ts(&mut *blocks_hash_ts, meta); + update_hash_ts(&mut blocks_hash_ts, meta); } pub fn address(&self) -> bitcoin::Address { p2pkh(&self.secp, &self.wif_key) diff --git a/src/store/db.rs b/src/store/db.rs index e574054..5fc1ffc 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -454,11 +454,12 @@ impl DBStore { } fn write(&self, batch: rocksdb::WriteBatch) -> Result<()> { - Ok(if self.ibd.load(Ordering::Relaxed) { + if self.ibd.load(Ordering::Relaxed) { self.db.write_without_wal(batch)? } else { self.db.write(batch)? - }) + }; + Ok(()) } } diff --git a/src/threads/blocks.rs b/src/threads/blocks.rs index faf4a17..cc9ef63 100644 --- a/src/threads/blocks.rs +++ b/src/threads/blocks.rs @@ -46,7 +46,7 @@ async fn get_next_block_to_index( ) -> Option { match last_indexed.as_ref() { Some(last) => { - match client.get_next(&last, family).await { + match client.get_next(last, family).await { Ok(ChainStatus::NewBlock(next)) => Some(next), Ok(ChainStatus::Reorg) => { log::warn!("reorg happened! {last:?} removed from the chain"); diff --git a/src/threads/mempool.rs b/src/threads/mempool.rs index 7e376ef..be5f882 100644 --- a/src/threads/mempool.rs +++ b/src/threads/mempool.rs @@ -33,7 +33,7 @@ async fn sync_mempool_once( let db = &state.store; let tip = state.tip_height().await; - let new: Vec<_> = current.difference(&mempool_txids).collect(); + let new: Vec<_> = current.difference(mempool_txids).collect(); let removed: Vec<_> = mempool_txids.difference(¤t).cloned().collect(); if !new.is_empty() { log::debug!("new txs in mempool {:?}, tip: {tip:?}", new); From 2b6a03aa71784bcb6553b79c6bb110a37586dd05 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 15:21:09 +0100 Subject: [PATCH 5/7] ensure logs are printed before panic during reorg handling --- src/store/db.rs | 67 ++++++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/src/store/db.rs b/src/store/db.rs index 5fc1ffc..3c3697a 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -24,6 +24,7 @@ use std::{ }; use crate::{ + error_panic, store::{BlockMeta, Store, TxSeen}, Height, ScriptHash, }; @@ -461,6 +462,42 @@ impl DBStore { }; Ok(()) } + + fn _reorg(&self) -> Result<()> { + let reorg_data = self + .reorg_data + .lock() + .map_err(|e| anyhow::anyhow!("reorg_data lock poisoned: {e}"))?; + + // Estimate batch size for UTXO restoration + let utxo_restore_size = reorg_data.spent.len() * 44; // 44 bytes per UTXO entry + let mut batch = rocksdb::WriteBatch::with_capacity_bytes(utxo_restore_size); + + // Restore UTXOs that were spent in the reorged block + self.insert_utxos( + &mut batch, + reorg_data + .spent + .iter() + .map(|(outpoint, script_hash)| (outpoint, script_hash)), + )?; + + self.write(batch)?; + + // Remove UTXOs that were created in the reorged block + if !reorg_data.utxos_created.is_empty() { + let outpoints_to_remove: Vec = + reorg_data.utxos_created.keys().cloned().collect(); + self.remove_utxos(&outpoints_to_remove)?; + } + + // Remove history entries that were added in the reorged block + if !reorg_data.history.is_empty() { + self.remove_history_entries(&reorg_data.history)?; + } + + Ok(()) + } } fn estimate_history_size(add: &BTreeMap>) -> usize { @@ -599,34 +636,8 @@ impl Store for DBStore { } fn reorg(&self) { - let reorg_data = self.reorg_data.lock().unwrap(); - - // Estimate batch size for UTXO restoration - let utxo_restore_size = reorg_data.spent.len() * 44; // 44 bytes per UTXO entry - let mut batch = rocksdb::WriteBatch::with_capacity_bytes(utxo_restore_size); - - // Restore UTXOs that were spent in the reorged block - self.insert_utxos( - &mut batch, - reorg_data - .spent - .iter() - .map(|(outpoint, script_hash)| (outpoint, script_hash)), - ) - .unwrap(); // TODO handle unwrap; - - self.write(batch).unwrap(); // TODO handle unwrap; - - // Remove UTXOs that were created in the reorged block - if !reorg_data.utxos_created.is_empty() { - let outpoints_to_remove: Vec = - reorg_data.utxos_created.keys().cloned().collect(); - self.remove_utxos(&outpoints_to_remove).unwrap(); // TODO handle unwrap; - } - - // Remove history entries that were added in the reorged block - if !reorg_data.history.is_empty() { - self.remove_history_entries(&reorg_data.history).unwrap(); // TODO handle unwrap; + if let Err(e) = self._reorg() { + error_panic!("reorg failed: {e}"); } } From 3eae697db8386ac7adb71102f75a3e296f3591a0 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 15:33:56 +0100 Subject: [PATCH 6/7] make reorg an atomic db operation --- src/store/db.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/store/db.rs b/src/store/db.rs index 3c3697a..81e3ba1 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -302,7 +302,11 @@ impl DBStore { Ok(()) } - fn remove_history_entries(&self, to_remove: &BTreeMap>) -> Result<()> { + fn remove_history_entries_batch( + &self, + batch: &mut rocksdb::WriteBatch, + to_remove: &BTreeMap>, + ) -> Result<()> { if to_remove.is_empty() { return Ok(()); } @@ -315,8 +319,6 @@ impl DBStore { // Read current history for these script hashes let current_history = self.get_history(&script_hashes)?; - let estimate_size = estimate_history_size(to_remove); - let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimate_size); let cf = self.history_cf(); for (i, script_hash) in script_hashes.iter().enumerate() { @@ -345,7 +347,6 @@ impl DBStore { ); } } - self.write(batch)?; Ok(()) } @@ -469,9 +470,7 @@ impl DBStore { .lock() .map_err(|e| anyhow::anyhow!("reorg_data lock poisoned: {e}"))?; - // Estimate batch size for UTXO restoration - let utxo_restore_size = reorg_data.spent.len() * 44; // 44 bytes per UTXO entry - let mut batch = rocksdb::WriteBatch::with_capacity_bytes(utxo_restore_size); + let mut batch = rocksdb::WriteBatch::default(); // Restore UTXOs that were spent in the reorged block self.insert_utxos( @@ -482,20 +481,20 @@ impl DBStore { .map(|(outpoint, script_hash)| (outpoint, script_hash)), )?; - self.write(batch)?; - // Remove UTXOs that were created in the reorged block if !reorg_data.utxos_created.is_empty() { let outpoints_to_remove: Vec = reorg_data.utxos_created.keys().cloned().collect(); - self.remove_utxos(&outpoints_to_remove)?; + self.delete_utxos_batch(&mut batch, &outpoints_to_remove)?; } // Remove history entries that were added in the reorged block if !reorg_data.history.is_empty() { - self.remove_history_entries(&reorg_data.history)?; + self.remove_history_entries_batch(&mut batch, &reorg_data.history)?; } + self.write(batch)?; + Ok(()) } } From 94252790d6fa64e3b54178dd4438f9154020a323 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Mon, 9 Feb 2026 15:36:48 +0100 Subject: [PATCH 7/7] make remove_utxos available only for tests --- src/store/db.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/store/db.rs b/src/store/db.rs index 81e3ba1..77efcec 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -268,6 +268,7 @@ impl DBStore { /// Remove UTXOs from the database and return their script hashes. /// This writes immediately to the database (non-atomic with other operations). + #[cfg(test)] fn remove_utxos(&self, outpoints: &[OutPoint]) -> Result> { let result = self.get_utxos_for_spending(outpoints)?;