diff --git a/src/server/mod.rs b/src/server/mod.rs index 0c15ade..8e323e1 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -264,6 +264,7 @@ pub enum Error { InvalidBlockHash, CannotFindBlockHeader, DBOpen(String), + DBCorrupted(String), CannotLoadEncryptionKey, CannotDecrypt, CannotEncrypt, @@ -354,7 +355,8 @@ pub async fn inner_main( { let state = state.clone(); - headers(state).await.unwrap(); + let preload_client = Client::new(&args)?; + headers(state, Some(&preload_client), args.network.into()).await?; } // Create oneshot channel to signal when initial block download is complete diff --git a/src/server/preload.rs b/src/server/preload.rs index 887833c..a1c285c 100644 --- a/src/server/preload.rs +++ b/src/server/preload.rs @@ -1,12 +1,88 @@ use std::sync::Arc; -use crate::{server::Error, server::State, store::Store}; +use crate::{ + fetch::Client, + server::Error, + server::State, + store::{BlockMeta, Store}, + Family, +}; -pub async fn headers(state: Arc) -> Result<(), Error> { +const MAX_HASH_GAP_REPAIR: u32 = 1000; + +pub async fn headers( + state: Arc, + client: Option<&Client>, + family: Family, +) -> Result<(), Error> { let mut blocks_hash_ts = state.blocks_hash_ts.lock().await; let mut i = 0usize; - for meta in state.store.iter_hash_ts() { - assert_eq!(i as u32, meta.height()); + let metas: Vec = state.store.iter_hash_ts().collect(); + for meta in metas { + if i as u32 != meta.height() { + let gap_start = i as u32; + let gap_end = meta.height(); + let gap_len = gap_end.saturating_sub(gap_start); + + if gap_len == 0 { + return Err(Error::DBCorrupted(format!( + "hashes CF out-of-order entry at height {}, reindex required", + meta.height() + ))); + } + + let client = client.ok_or_else(|| { + Error::DBCorrupted(format!( + "hashes CF gap detected: expected height {}, found {}. \ +DB is inconsistent; reindex required", + i, + meta.height() + )) + })?; + + if gap_len > MAX_HASH_GAP_REPAIR { + return Err(Error::DBCorrupted(format!( + "hashes CF gap too large to repair ({} blocks from {} to {}), reindex required", + gap_len, + gap_start, + gap_end - 1 + ))); + } + + log::warn!( + "hashes CF gap detected ({} blocks from {} to {}), attempting repair", + gap_len, + gap_start, + gap_end - 1 + ); + + for height in gap_start..gap_end { + let hash = client + .block_hash(height) + .await + .map_err(|e| Error::DBCorrupted(format!("failed to fetch block hash: {e}")))? + .ok_or_else(|| { + Error::DBCorrupted(format!( + "missing block hash at height {height} while repairing hashes CF" + )) + })?; + let header = client + .block_header(hash, family) + .await + .map_err(|e| { + Error::DBCorrupted(format!( + "failed to fetch block header for {hash}: {e}" + )) + })?; + let repaired = BlockMeta::new(height, hash, header.time()); + state + .store + .put_hash_ts(&repaired) + .map_err(|e| Error::DBCorrupted(format!("failed to write hash meta: {e}")))?; + blocks_hash_ts.push((repaired.hash(), repaired.timestamp())); + } + i = gap_end as usize; + } blocks_hash_ts.push((meta.hash(), meta.timestamp())); i += 1; } @@ -14,3 +90,44 @@ pub async fn headers(state: Arc) -> Result<(), Error> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::headers; + use crate::server::Error; + use crate::store::{db::DBStore, AnyStore, BlockMeta, Store}; + use age::x25519::Identity; + use bitcoin::NetworkKind; + use elements::{hashes::Hash, BlockHash}; + use std::collections::BTreeMap; + use std::sync::Arc; + + use super::State; + + #[tokio::test] + async fn test_preload_detects_hash_gap() { + let tempdir = tempfile::TempDir::new().unwrap(); + let db = DBStore::open(tempdir.path(), 64, false).unwrap(); + + let block0 = BlockMeta::new(0, BlockHash::all_zeros(), 0); + db.update(&block0, vec![], BTreeMap::new(), BTreeMap::new()) + .unwrap(); + + let block2 = BlockMeta::new(2, BlockHash::all_zeros(), 0); + db.update(&block2, vec![], BTreeMap::new(), BTreeMap::new()) + .unwrap(); + + let state = Arc::new(State::new( + AnyStore::Db(db), + Identity::generate(), + bitcoin::PrivateKey::generate(NetworkKind::Test), + 100, + 5, + 1000, + ) + .unwrap()); + + let result = headers(state, None, crate::Family::Elements).await; + assert!(matches!(result, Err(Error::DBCorrupted(_)))); + } +} diff --git a/src/store/db.rs b/src/store/db.rs index eaff9d3..8af3351 100644 --- a/src/store/db.rs +++ b/src/store/db.rs @@ -14,7 +14,7 @@ use crate::V; use prefix_uvarint::PrefixVarInt; use std::{ - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, hash::Hasher, path::Path, sync::{ @@ -24,14 +24,17 @@ use std::{ }; use crate::{ - store::{BlockMeta, Store, TxSeen}, + store::{BlockMeta, SpentUtxo, Store, TxSeen}, Height, ScriptHash, }; +use crate::store::REORG_BUFFER_MAX_DEPTH; -/// Data for handling reorgs up to 1 block. -/// If the process halts right before a reorg, this will be lost and a reindex must happen. +/// Data for handling reorgs using an in-memory undo buffer. +/// If the process halts or a reorg exceeds the buffer depth, a reindex must happen. #[derive(Debug, Default)] struct ReorgData { + /// Height of the block this undo data refers to. + height: Height, /// Input spent in the last block. These are usually deleted from the db when a block is found. /// When there is a reorg we reinsert them in the db. spent: Vec<(OutPoint, ScriptHash)>, @@ -54,7 +57,7 @@ pub struct DBStore { salt: u64, /// Reorg data for handling blockchain reorganizations - reorg_data: Mutex, + reorg_data: Mutex>, /// Whether we are in Initial Block Download mode. /// during initial block download we do something different to speed up initial indexing, like writing disabling the wal. @@ -175,7 +178,7 @@ impl DBStore { let store = DBStore { db, salt, - reorg_data: Mutex::new(ReorgData::default()), + reorg_data: Mutex::new(VecDeque::new()), ibd: AtomicBool::new(true), }; Ok(store) @@ -206,6 +209,34 @@ impl DBStore { batch.put_cf(&self.hashes_cf(), meta.height().to_be_bytes(), buffer); } + /// Remove block hash and timestamp for a given height from an existing batch. + fn delete_hash_ts_batch(&self, batch: &mut rocksdb::WriteBatch, height: Height) { + batch.delete_cf(&self.hashes_cf(), height.to_be_bytes()); + } + + fn push_reorg_data(&self, data: ReorgData) { + let mut reorg_data = self.reorg_data.lock().unwrap(); + reorg_data.push_back(data); + if reorg_data.len() > REORG_BUFFER_MAX_DEPTH { + let dropped = reorg_data.pop_front().expect("len > max depth"); + log::warn!( + "reorg buffer depth exceeded ({}); dropping undo data for height {}", + REORG_BUFFER_MAX_DEPTH, + dropped.height + ); + } + } + + fn pop_reorg_data(&self) -> ReorgData { + let mut reorg_data = self.reorg_data.lock().unwrap(); + reorg_data.pop_back().unwrap_or_else(|| { + error_panic!( + "reorg depth exceeded in-memory buffer ({}); reindex required", + REORG_BUFFER_MAX_DEPTH + ) + }) + } + fn insert_utxos<'a, I>(&self, batch: &mut rocksdb::WriteBatch, adds: I) -> Result<()> where I: IntoIterator, @@ -224,25 +255,37 @@ impl DBStore { Ok(()) } - /// Look up UTXOs and return their script hashes, panicking if any UTXO doesn't exist. + /// Look up UTXOs and return their script hashes, erroring if any UTXO doesn't exist. /// This is a read-only operation. fn get_utxos_for_spending( &self, outpoints: &[OutPoint], ) -> Result> { - let result: Vec = self - .get_utxos(outpoints)? - .iter() - .enumerate() - .map(|(i, e)| { - e.unwrap_or_else(|| { - error_panic!( - "every utxo must exist when spent, can't find {}", - outpoints[i] - ); - }) - }) - .collect(); + let utxos = self.get_utxos(outpoints)?; + let mut missing = Vec::new(); + let mut result = Vec::with_capacity(outpoints.len()); + + for (i, entry) in utxos.into_iter().enumerate() { + match entry { + Some(script_hash) => result.push(script_hash), + None => missing.push(outpoints[i]), + } + } + + if !missing.is_empty() { + let preview: Vec = missing + .iter() + .take(5) + .map(|o| o.to_string()) + .collect(); + anyhow::bail!( + "every utxo must exist when spent; missing {} outpoints (sample: [{}]). \ +DB may be inconsistent (reorg deeper than buffer or corrupted) and requires reindex", + missing.len(), + preview.join(", ") + ); + } + Ok(Vec::from_iter( outpoints.iter().cloned().zip(result.iter().cloned()), )) @@ -460,6 +503,13 @@ impl DBStore { self.db.write(batch)? }) } + + pub(crate) fn write_hash_ts(&self, meta: &BlockMeta) -> Result<()> { + let mut batch = rocksdb::WriteBatch::with_capacity_bytes(40); + self.set_hash_ts_batch(&mut batch, meta); + self.write(batch)?; + Ok(()) + } } fn estimate_history_size(add: &BTreeMap>) -> usize { @@ -546,21 +596,21 @@ impl Store for DBStore { fn update( &self, block_meta: &BlockMeta, - utxo_spent: Vec<(u32, OutPoint, crate::be::Txid)>, + utxo_spent: Vec, history_map: BTreeMap>, utxo_created: BTreeMap, ) -> Result<()> { let mut history_map = history_map; - // First, read the script hashes for spent UTXOs (read-only operation) - let only_outpoints: Vec<_> = utxo_spent.iter().map(|e| e.1).collect(); - let outpoint_script_hashes = self.get_utxos_for_spending(&only_outpoints)?; - // Build the history entries for spending transactions - let script_hashes = outpoint_script_hashes.iter().map(|e| e.1); - for (script_hash, (vin, _, txid)) in script_hashes.into_iter().zip(utxo_spent) { - let el = history_map.entry(script_hash).or_default(); - el.push(TxSeen::new(txid, block_meta.height(), V::Vin(vin))); + let only_outpoints: Vec<_> = utxo_spent.iter().map(|e| e.outpoint).collect(); + for spent in utxo_spent.iter() { + let el = history_map.entry(spent.script_hash).or_default(); + el.push(TxSeen::new( + spent.txid, + block_meta.height(), + V::Vin(spent.vin), + )); } // Create a single batch for ALL writes (atomic operation) @@ -587,22 +637,27 @@ impl Store for DBStore { self.write(batch)?; // Store reorg data for potential blockchain reorganization correction - { - let mut reorg_data = self.reorg_data.lock().unwrap(); - reorg_data.spent = outpoint_script_hashes; - reorg_data.history = history_map; - reorg_data.utxos_created = utxo_created; - } + let outpoint_script_hashes: Vec<(OutPoint, ScriptHash)> = utxo_spent + .iter() + .map(|spent| (spent.outpoint, spent.script_hash)) + .collect(); + + self.push_reorg_data(ReorgData { + height: block_meta.height(), + spent: outpoint_script_hashes, + history: history_map, + utxos_created: utxo_created, + }); Ok(()) } fn reorg(&self) { - let reorg_data = self.reorg_data.lock().unwrap(); + let reorg_data = self.pop_reorg_data(); // Estimate batch size for UTXO restoration let utxo_restore_size = reorg_data.spent.len() * 44; // 44 bytes per UTXO entry - let mut batch = rocksdb::WriteBatch::with_capacity_bytes(utxo_restore_size); + let mut batch = rocksdb::WriteBatch::with_capacity_bytes(utxo_restore_size + 4); // Restore UTXOs that were spent in the reorged block self.insert_utxos( @@ -614,18 +669,22 @@ impl Store for DBStore { ) .unwrap(); // TODO handle unwrap; + self.delete_hash_ts_batch(&mut batch, reorg_data.height); + self.write(batch).unwrap(); // TODO handle unwrap; // Remove UTXOs that were created in the reorged block if !reorg_data.utxos_created.is_empty() { let outpoints_to_remove: Vec = reorg_data.utxos_created.keys().cloned().collect(); - self.remove_utxos(&outpoints_to_remove).unwrap(); // TODO handle unwrap; + self.remove_utxos(&outpoints_to_remove) + .unwrap_or_else(|e| error_panic!("failed to remove reorg-created utxos: {e}")); } // Remove history entries that were added in the reorged block if !reorg_data.history.is_empty() { - self.remove_history_entries(&reorg_data.history).unwrap(); // TODO handle unwrap; + self.remove_history_entries(&reorg_data.history) + .unwrap_or_else(|e| error_panic!("failed to remove reorg history entries: {e}")); } } @@ -633,6 +692,10 @@ impl Store for DBStore { log::info!("Initial block download finished, setting ibd to false in the store"); self.ibd.store(false, Ordering::Relaxed); } + + fn put_hash_ts(&self, meta: &BlockMeta) -> Result<()> { + self.write_hash_ts(meta) + } } fn serialize_outpoint(o: &OutPoint) -> Vec { @@ -721,17 +784,17 @@ mod test { use elements::{hashes::Hash, BlockHash, OutPoint, Txid}; use rocksdb::DB; use std::{ - collections::BTreeMap, + collections::{BTreeMap, VecDeque}, sync::{atomic::AtomicBool, Mutex}, }; - use crate::store::{ - db::{ - estimate_history_size, get_or_init_salt, serialize_outpoint, vec_tx_seen_from_be_bytes, - vec_tx_seen_to_be_bytes, ReorgData, TxSeen, - }, - Store, - }; + use crate::store::{ + db::{ + estimate_history_size, get_or_init_salt, serialize_outpoint, vec_tx_seen_from_be_bytes, + vec_tx_seen_to_be_bytes, TxSeen, + }, + BlockMeta, SpentUtxo, Store, + }; use crate::V; use super::DBStore; @@ -745,7 +808,7 @@ mod test { let db = DBStore { db: DB::open(&opts, tempdir.path()).unwrap(), salt: 0, - reorg_data: Mutex::new(ReorgData::default()), + reorg_data: Mutex::new(VecDeque::new()), ibd: AtomicBool::new(true), }; let hash = db.hash(b"test"); @@ -954,4 +1017,98 @@ mod test { "ScriptHash (u64) natural ordering must match binary encoding ordering" ); } + + #[test] + fn test_reorg_buffer_multiple_blocks() { + let tempdir = tempfile::TempDir::new().unwrap(); + let db = DBStore::open(tempdir.path(), 64, false).unwrap(); + + let script_a = db.hash(b"script-a"); + let script_b = db.hash(b"script-b"); + let script_c = db.hash(b"script-c"); + + let txid0 = crate::be::Txid::from_array([1u8; 32]); + let txid1 = crate::be::Txid::from_array([2u8; 32]); + let txid2 = crate::be::Txid::from_array([3u8; 32]); + + let out_a = OutPoint::new(txid0.elements(), 0); + let out_b = OutPoint::new(txid1.elements(), 0); + let out_c = OutPoint::new(txid2.elements(), 0); + + let block0 = BlockMeta::new(0, BlockHash::all_zeros(), 0); + let mut history0 = BTreeMap::new(); + history0.insert(script_a, vec![TxSeen::new(txid0, 0, V::Vout(0))]); + let mut utxo_created0 = BTreeMap::new(); + utxo_created0.insert(out_a, script_a); + db.update(&block0, vec![], history0, utxo_created0) + .unwrap(); + + let block1 = BlockMeta::new(1, BlockHash::all_zeros(), 0); + let mut history1 = BTreeMap::new(); + history1.insert(script_b, vec![TxSeen::new(txid1, 1, V::Vout(0))]); + let mut utxo_created1 = BTreeMap::new(); + utxo_created1.insert(out_b, script_b); + db.update( + &block1, + vec![SpentUtxo { + vin: 0, + outpoint: out_a, + txid: txid1, + script_hash: script_a, + }], + history1, + utxo_created1, + ) + .unwrap(); + + let block2 = BlockMeta::new(2, BlockHash::all_zeros(), 0); + let mut history2 = BTreeMap::new(); + history2.insert(script_c, vec![TxSeen::new(txid2, 2, V::Vout(0))]); + let mut utxo_created2 = BTreeMap::new(); + utxo_created2.insert(out_c, script_c); + db.update( + &block2, + vec![SpentUtxo { + vin: 0, + outpoint: out_b, + txid: txid2, + script_hash: script_b, + }], + history2, + utxo_created2, + ) + .unwrap(); + + let heights: Vec<_> = db.iter_hash_ts().map(|m| m.height()).collect(); + assert_eq!(heights, vec![0, 1, 2]); + + let utxos = db.get_utxos(&[out_a, out_b, out_c]).unwrap(); + assert_eq!(utxos, vec![None, None, Some(script_c)]); + + // Reorg block 2. + db.reorg(); + let heights: Vec<_> = db.iter_hash_ts().map(|m| m.height()).collect(); + assert_eq!(heights, vec![0, 1]); + + let utxos = db.get_utxos(&[out_a, out_b, out_c]).unwrap(); + assert_eq!(utxos, vec![None, Some(script_b), None]); + + let history_b = db.get_history(&[script_b]).unwrap(); + assert_eq!(history_b[0], vec![TxSeen::new(txid1, 1, V::Vout(0))]); + let history_c = db.get_history(&[script_c]).unwrap(); + assert!(history_c[0].is_empty()); + + // Reorg block 1. + db.reorg(); + let heights: Vec<_> = db.iter_hash_ts().map(|m| m.height()).collect(); + assert_eq!(heights, vec![0]); + + let utxos = db.get_utxos(&[out_a, out_b]).unwrap(); + assert_eq!(utxos, vec![Some(script_a), None]); + + let history_a = db.get_history(&[script_a]).unwrap(); + assert_eq!(history_a[0], vec![TxSeen::new(txid0, 0, V::Vout(0))]); + let history_b = db.get_history(&[script_b]).unwrap(); + assert!(history_b[0].is_empty()); + } } diff --git a/src/store/memory.rs b/src/store/memory.rs index 0307bcb..c873cdb 100644 --- a/src/store/memory.rs +++ b/src/store/memory.rs @@ -1,20 +1,30 @@ -use std::{collections::BTreeMap, hash::Hasher, sync::Mutex}; +use std::{ + collections::{BTreeMap, VecDeque}, + hash::Hasher, + sync::Mutex, +}; use elements::OutPoint; use fxhash::FxHasher; use crate::{error_panic, ScriptHash}; -use super::{BlockMeta, Store, TxSeen}; +use super::{BlockMeta, SpentUtxo, Store, TxSeen, REORG_BUFFER_MAX_DEPTH}; use crate::V; #[derive(Debug)] pub struct MemoryStore { utxos: Mutex>, history: Mutex>>, + reorg_data: Mutex>, +} - // TODO memory store does not fully support reorgs - last_block: Mutex>, +#[derive(Debug, Default)] +struct ReorgData { + height: crate::Height, + spent: Vec<(OutPoint, ScriptHash)>, + history: BTreeMap>, + utxos_created: BTreeMap, } impl Store for MemoryStore { @@ -62,71 +72,131 @@ impl Store for MemoryStore { fn update( &self, block_meta: &BlockMeta, - utxo_spent: Vec<(u32, elements::OutPoint, crate::be::Txid)>, + utxo_spent: Vec, history_map: std::collections::BTreeMap>, utxo_created: std::collections::BTreeMap, ) -> anyhow::Result<()> { let mut history_map = history_map; - // // TODO should be a db tx - let only_outpoints: Vec<_> = utxo_spent.iter().map(|e| e.1).collect(); - let script_hashes = self.remove_utxos(&only_outpoints); - - let last_block = BTreeMap::from_iter( - only_outpoints - .iter() - .cloned() - .zip(script_hashes.iter().cloned()), - ); - *self.last_block.lock().unwrap() = last_block; // TODO handle unwrap; - - for (script_hash, (vin, _, txid)) in script_hashes.into_iter().zip(utxo_spent) { - let el = history_map.entry(script_hash).or_default(); - el.push(TxSeen::new(txid, block_meta.height(), V::Vin(vin))); + let only_outpoints: Vec<_> = utxo_spent.iter().map(|e| e.outpoint).collect(); + self.remove_utxos(&only_outpoints); + + for spent in utxo_spent.iter() { + let el = history_map.entry(spent.script_hash).or_default(); + el.push(TxSeen::new( + spent.txid, + block_meta.height(), + V::Vin(spent.vin), + )); } - self.update_history(history_map); + self.update_history(&history_map); self.insert_utxos(&utxo_created); + self.push_reorg_data(ReorgData { + height: block_meta.height(), + spent: utxo_spent + .iter() + .map(|spent| (spent.outpoint, spent.script_hash)) + .collect(), + history: history_map, + utxos_created: utxo_created, + }); Ok(()) } fn reorg(&self) { - self.insert_utxos(&self.last_block.lock().unwrap()); + let reorg_data = self.pop_reorg_data(); + let spent: BTreeMap<_, _> = reorg_data.spent.into_iter().collect(); + self.insert_utxos(&spent); + self.remove_utxos_created(&reorg_data.utxos_created); + self.remove_history_entries(&reorg_data.history); } fn ibd_finished(&self) {} + + fn put_hash_ts(&self, meta: &BlockMeta) -> anyhow::Result<()> { + self.write_hash_ts(meta) + } } impl MemoryStore { - fn remove_utxos(&self, outpoints: &[OutPoint]) -> Vec { - let mut result = Vec::with_capacity(outpoints.len()); + fn remove_utxos(&self, outpoints: &[OutPoint]) { + let mut utxos = self.utxos.lock().unwrap(); for outpoint in outpoints { - result.push( - self.utxos - .lock() - .unwrap() - .remove(outpoint) - .unwrap_or_else(|| { - error_panic!("{outpoint} must be unspent"); - }), - ); + if utxos.remove(outpoint).is_none() { + log::warn!("missing utxo {outpoint} while applying block"); + } } - result } - fn update_history(&self, add: BTreeMap>) { + fn update_history(&self, add: &BTreeMap>) { let mut history = self.history.lock().unwrap(); for (k, v) in add { - history.entry(k).or_default().extend(v); + history.entry(*k).or_default().extend(v.clone()); } } fn insert_utxos(&self, adds: &BTreeMap) { self.utxos.lock().unwrap().extend(adds); } + fn remove_utxos_created(&self, utxos_created: &BTreeMap) { + let mut utxos = self.utxos.lock().unwrap(); + for outpoint in utxos_created.keys() { + utxos.remove(outpoint); + } + } + + fn remove_history_entries(&self, to_remove: &BTreeMap>) { + if to_remove.is_empty() { + return; + } + let mut history = self.history.lock().unwrap(); + for (script_hash, entries_to_remove) in to_remove { + if let Some(current_entries) = history.get_mut(script_hash) { + for entry_to_remove in entries_to_remove { + current_entries.retain(|entry| { + !(entry.txid == entry_to_remove.txid + && entry.height == entry_to_remove.height + && entry.v == entry_to_remove.v) + }); + } + if current_entries.is_empty() { + history.remove(script_hash); + } + } + } + } + + fn push_reorg_data(&self, data: ReorgData) { + let mut reorg_data = self.reorg_data.lock().unwrap(); + reorg_data.push_back(data); + if reorg_data.len() > REORG_BUFFER_MAX_DEPTH { + let dropped = reorg_data.pop_front().expect("len > max depth"); + log::warn!( + "reorg buffer depth exceeded ({}); dropping undo data for height {}", + REORG_BUFFER_MAX_DEPTH, + dropped.height + ); + } + } + + fn pop_reorg_data(&self) -> ReorgData { + let mut reorg_data = self.reorg_data.lock().unwrap(); + reorg_data.pop_back().unwrap_or_else(|| { + error_panic!( + "reorg depth exceeded in-memory buffer ({}); reindex required", + REORG_BUFFER_MAX_DEPTH + ) + }) + } + pub(crate) fn new() -> Self { Self { utxos: Mutex::new(BTreeMap::new()), history: Mutex::new(BTreeMap::new()), - last_block: Mutex::new(BTreeMap::new()), + reorg_data: Mutex::new(VecDeque::new()), } } + + fn write_hash_ts(&self, _meta: &BlockMeta) -> anyhow::Result<()> { + Ok(()) + } } diff --git a/src/store/mod.rs b/src/store/mod.rs index ae5f760..d10cb13 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -8,11 +8,23 @@ pub mod db; pub mod memory; +/// Maximum number of recent blocks to keep for in-memory reorg undo data. +/// If a deeper reorg happens, a full reindex is required. +pub(crate) const REORG_BUFFER_MAX_DEPTH: usize = 100; + pub enum AnyStore { #[cfg(feature = "db")] Db(db::DBStore), Mem(memory::MemoryStore), } + +#[derive(Clone, Debug)] +pub struct SpentUtxo { + pub vin: u32, + pub outpoint: OutPoint, + pub txid: crate::be::Txid, + pub script_hash: ScriptHash, +} impl AnyStore { pub(crate) fn stats(&self) -> Option { match self { @@ -43,11 +55,14 @@ pub trait Store { fn update( &self, block_meta: &BlockMeta, - utxo_spent: Vec<(u32, OutPoint, crate::be::Txid)>, + utxo_spent: Vec, history_map: BTreeMap>, // We want this sorted because when inserted in the write batch it's faster (see benches and test guaranteeing encoding order match struct ordering) utxo_created: BTreeMap, // We want this sorted because when inserted in the write batch it's faster (see benches and test guaranteeing encoding order match struct ordering) ) -> Result<()>; + /// Persist a block hash + timestamp entry for header preloading or repairs. + fn put_hash_ts(&self, meta: &BlockMeta) -> Result<()>; + /// Reorg, reinsert the last block unspent utxos fn reorg(&self); @@ -91,7 +106,7 @@ impl Store for AnyStore { fn update( &self, block_meta: &BlockMeta, - utxo_spent: Vec<(u32, OutPoint, crate::be::Txid)>, + utxo_spent: Vec, history_map: BTreeMap>, utxo_created: BTreeMap, ) -> Result<()> { @@ -102,6 +117,14 @@ impl Store for AnyStore { } } + fn put_hash_ts(&self, meta: &BlockMeta) -> Result<()> { + match self { + #[cfg(feature = "db")] + AnyStore::Db(d) => d.put_hash_ts(meta), + AnyStore::Mem(m) => m.put_hash_ts(meta), + } + } + fn reorg(&self) { match self { #[cfg(feature = "db")] diff --git a/src/threads/blocks.rs b/src/threads/blocks.rs index faf4a17..7fc34eb 100644 --- a/src/threads/blocks.rs +++ b/src/threads/blocks.rs @@ -2,12 +2,12 @@ use crate::{ be::Family, fetch::{ChainStatus, Client}, server::{Error, State}, - store::{BlockMeta, Store}, + store::{BlockMeta, SpentUtxo, Store}, TxSeen, V, }; use elements::{OutPoint, Txid}; use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, future::Future, str::FromStr, sync::Arc, @@ -232,6 +232,16 @@ pub async fn index( } } state.set_hash_ts(&block_to_index).await; + let utxo_spent = resolve_spent_utxos( + &state, + &client, + family, + &block_to_index, + utxo_spent, + ) + .await + .unwrap_or_else(|e| error_panic!("error resolving spent utxos: {e}")); + db.update(&block_to_index, utxo_spent, history_map, utxo_created) .unwrap_or_else(|e| error_panic!("error updating db: {e}")); @@ -254,3 +264,86 @@ fn generate_skip_outpoint() -> HashSet { skip_outpoint } + +async fn resolve_spent_utxos( + state: &Arc, + client: &Client, + family: Family, + block_meta: &BlockMeta, + utxo_spent: Vec<(u32, OutPoint, crate::be::Txid)>, +) -> Result, Error> { + if utxo_spent.is_empty() { + return Ok(vec![]); + } + + let outpoints: Vec = utxo_spent.iter().map(|e| e.1).collect(); + let mut script_hashes = state + .store + .get_utxos(&outpoints) + .map_err(|e| Error::String(format!("failed to get utxos for block {block_meta:?}: {e}")))?; + + let mut missing_by_txid: HashMap> = HashMap::new(); + for (idx, (outpoint, script_hash)) in outpoints.iter().zip(script_hashes.iter()).enumerate() { + if script_hash.is_none() { + missing_by_txid + .entry((*outpoint).txid.into()) + .or_default() + .push((idx, outpoint.vout)); + } + } + + if !missing_by_txid.is_empty() { + let missing_count: usize = missing_by_txid.values().map(|v| v.len()).sum(); + log::warn!( + "missing {missing_count} spent utxos while indexing height {}; backfilling from node", + block_meta.height + ); + for (txid, entries) in missing_by_txid { + let tx = client + .tx(txid, family) + .await + .map_err(|e| Error::String(format!("failed to fetch missing tx {txid}: {e}")))?; + for (idx, vout) in entries { + let output = tx + .outputs_iter() + .nth(vout as usize) + .ok_or_else(|| { + Error::String(format!( + "missing vout {vout} for tx {txid} while repairing utxo" + )) + })?; + + if output.skip_utxo() { + return Err(Error::String(format!( + "missing utxo {txid}:{vout} is unspendable, refusing to repair" + ))); + } + + let script_hash = if output.skip_indexing() { + state.store.hash(b"") + } else { + state.store.hash(output.script_pubkey_bytes()) + }; + script_hashes[idx] = Some(script_hash); + } + } + } + + let mut spent = Vec::with_capacity(utxo_spent.len()); + for (idx, (vin, outpoint, txid)) in utxo_spent.into_iter().enumerate() { + let script_hash = script_hashes[idx].ok_or_else(|| { + Error::String(format!( + "missing utxo {} could not be repaired, reindex required", + outpoint + )) + })?; + spent.push(SpentUtxo { + vin, + outpoint, + txid, + script_hash, + }); + } + + Ok(spent) +}