diff --git a/lwk_wollet/src/update.rs b/lwk_wollet/src/update.rs index e1eb6ab1..953d9b1c 100644 --- a/lwk_wollet/src/update.rs +++ b/lwk_wollet/src/update.rs @@ -355,7 +355,16 @@ impl Wollet { ); let mut last_used_internal = None; let mut last_used_external = None; - for (txid, _) in txid_height_new { + // Also include deleted txids: a tx that was seen and then deleted (e.g. a phantom tx + // applied locally but not confirmed on-chain) should still count as having used its + // addresses so those addresses are not reused. This matters for merged updates where + // phantom txids are removed from txid_height_new but still appear in txid_height_delete. + let txids_for_last_used: Vec = txid_height_new + .into_iter() + .map(|(t, _)| t) + .chain(txid_height_delete.iter().copied()) + .collect(); + for txid in txids_for_last_used { if let Some(tx) = cache.all_txs.get(&txid) { for (vout, output) in tx.output.iter().enumerate() { if !cache @@ -439,9 +448,53 @@ impl Wollet { .map_err(|e| Error::Generic(format!("store error: {e}")))?; *next_index += 1; + *next_index = self.merge_updates(*next_index)?; + Ok(()) } + pub(crate) fn merge_updates(&self, next_index: usize) -> Result { + match self.merge_threshold { + Some(threshold) if threshold < next_index => (), + _ => return Ok(next_index), // Not merging + }; + + // Read and merge all persisted updates + let first_bytes = self + .store + .get(&update_key(0)) + .map_err(|e| Error::Generic(format!("store error: {e}")))? + .ok_or_else(|| Error::Generic("expected update 0 to exist".into()))?; + let mut merged = Update::deserialize(&first_bytes)?; + + for i in 1..next_index { + let bytes = self + .store + .get(&update_key(i)) + .map_err(|e| Error::Generic(format!("store error: {e}")))? + .ok_or_else(|| Error::Generic(format!("expected update {i} to exist")))?; + merged.merge(Update::deserialize(&bytes)?); + } + + // Delete all old updates from last to first to avoid holes on crash + for j in (0..next_index).rev() { + self.store + .remove(&update_key(j)) + .map_err(|e| Error::Generic(format!("failed to remove update {j}: {e}")))?; + } + // A crash here or during the removal loop will leave the cache empty or at an old state, + // which is not the end of the world, the following scan will bring it back. + + // Store the merged update as update 0 + let merged_bytes = merged.serialize()?; + self.store + .put(&update_key(0), &merged_bytes) + .map_err(|e| Error::Generic(format!("failed to store merged update: {e}")))?; + + let next_index = 1; + Ok(next_index) + } + /// Apply a transaction to the wallet state /// /// Wallet transactions are normally obtained using [`crate::clients::blocking::BlockchainBackend::full_scan()`] diff --git a/lwk_wollet/src/wollet.rs b/lwk_wollet/src/wollet.rs index 0ea9a8f9..202872f8 100644 --- a/lwk_wollet/src/wollet.rs +++ b/lwk_wollet/src/wollet.rs @@ -57,6 +57,7 @@ pub struct Wollet { pub(crate) descriptor: WolletDescriptor, /// Counter for the next update key pub(crate) next_update_index: Mutex, + pub(crate) merge_threshold: Option, /// cached value max_weight_to_satisfy: usize, } @@ -81,8 +82,9 @@ impl WolletBuilder { } } - /// Set the threshold for merging updates during build. - /// When the number of updates exceeds this threshold, they will be merged into one. + /// Set a threshold to merge updates + /// + /// When the number of updates exceeds the threshold, they are merged into one. /// Set to None to disable merging (default). pub fn with_merge_threshold(mut self, threshold: Option) -> Self { self.merge_threshold = threshold; @@ -125,24 +127,7 @@ impl WolletBuilder { store: self.store, next_update_index: Mutex::new(0), max_weight_to_satisfy, - }; - - // Check if merging is enabled and needed - let mut merging = if let Some(threshold) = self.merge_threshold { - let merge_key = update_key(threshold); - match wollet.store.get(&merge_key) { - Ok(Some(_)) => { - // There are at least threshold+1 updates, need to merge - let first_key = update_key(0); - match wollet.store.get(&first_key) { - Ok(Some(bytes)) => Some(Update::deserialize(&bytes)?), - _ => None, - } - } - _ => None, - } - } else { - None + merge_threshold: self.merge_threshold, }; // Restore updates from the store using indexed keys @@ -151,49 +136,14 @@ impl WolletBuilder { match wollet.store.get(&key) { Ok(Some(bytes)) => { let update = Update::deserialize(&bytes)?; - wollet.apply_update_no_persist(update.clone())?; - if let Some(ref mut m) = merging { - if i > 0 { - m.merge(update); - } - } + wollet.apply_update_no_persist(update)?; } Ok(None) => { - // Update the next index let mut next_update_index = wollet .next_update_index .lock() .map_err(|_| Error::Generic("next_update_index lock poisoned".into()))?; - *next_update_index = i; - - // If we were merging, persist the merged update and clean up - if let Some(merged) = merging { - // Delete all old updates - // we are starting from the last to avoid having holes in the beginning - for j in (0..i).rev() { - let old_key = update_key(j); - wollet.store.remove(&old_key).map_err(|e| { - Error::Generic(format!("failed to remove update {}: {}", j, e)) - })?; - } - - // A crash here or during the removal loop will leave the cache empty or at - // an old state which is not the end of the world, the following scan will - // bring the cache back to the correct state. - - // Store the merged update as update 0 - let merged_bytes = merged.serialize()?; - wollet - .store - .put(&update_key(0), &merged_bytes) - .map_err(|e| { - Error::Generic(format!("failed to store merged update: {}", e)) - })?; - - // Update next_update_index to 1 - *next_update_index = 1; - } - + *next_update_index = wollet.merge_updates(i)?; break; } Err(e) => return Err(Error::Generic(format!("store error: {e}"))), diff --git a/lwk_wollet/tests/e2e.rs b/lwk_wollet/tests/e2e.rs index 08b61019..ceadadcd 100644 --- a/lwk_wollet/tests/e2e.rs +++ b/lwk_wollet/tests/e2e.rs @@ -4166,136 +4166,215 @@ fn test_zmq_endpoint() { assert!(!msg[1].is_empty()); } +fn cp_dir_rec(src: &std::path::Path, dst: &std::path::Path) { + std::fs::create_dir_all(dst).unwrap(); + for entry in std::fs::read_dir(src).unwrap() { + let entry = entry.unwrap(); + let ty = entry.file_type().unwrap(); + let dst_path = dst.join(entry.file_name()); + if ty.is_dir() { + cp_dir_rec(&entry.path(), &dst_path); + } else { + std::fs::copy(entry.path(), dst_path).unwrap(); + } + } +} + #[test] fn test_merge_updates_e2e() { let env = TestEnvBuilder::from_env().with_electrum().build(); - let signer = SwSigner::new(TEST_MNEMONIC, false).unwrap(); - let signers: [&AnySigner; 1] = [&AnySigner::Software(signer)]; - let slip77_key = "9c8e4f05c7711a98c838be228bcb84924d4570ca53f35fa1c793e58841d47023"; - let desc_str = format!( - "ct(slip77({}),elwpkh({}/*))", - slip77_key, - signers[0].xpub().unwrap() - ); - let client = test_client_electrum(&env.electrum_url()); - let mut wallet = TestWollet::new(client, &desc_str); - - wallet.fund_btc(&env); - wallet.send_btc(&signers, None, None); + let network = ElementsNetwork::default_regtest(); - let expected_balance = wallet.wollet.balance().unwrap(); - let expected_tx_count = wallet.wollet.transactions().unwrap().len(); - let expected_utxo_count = wallet.wollet.utxos().unwrap().len(); - let num_updates_before = wallet.wollet.updates().unwrap().len(); - assert_eq!(num_updates_before, 4); + let signer = generate_signer(); + let view_key = generate_view_key(); + let desc = format!("ct({},elwpkh({}/*))", view_key, signer.xpub()); - let descriptor = wallet.wollet.wollet_descriptor(); - let network = ElementsNetwork::default_regtest(); - let db_root_dir = wallet.db_root_dir(); - - // Reconstruct the encrypted file store - // this is needed to use the same path/encryption created by the TestWollet ( which use with_legacy_fs_store ) - let mut path = db_root_dir.path().to_path_buf(); - path.push(network.as_str()); - path.push("enc_cache"); - path.push( - ::hash(descriptor.to_string().as_bytes()).to_string(), - ); - let file_store = FileStore::new(path.clone()).unwrap(); - let encrypted_store = EncryptedStore::new(file_store, descriptor.encryption_key_bytes()); + let client = test_client_electrum(&env.electrum_url()); + let mut wallet = TestWollet::new(client, &desc); - let wollet = WolletBuilder::new(network, descriptor.clone()) - .with_store(std::sync::Arc::new(encrypted_store)) - .with_merge_threshold(Some(2)) - .build() - .unwrap(); + let desc = wallet.wollet.wollet_descriptor(); + let path = wallet.path(); - assert_eq!(expected_balance, wollet.balance().unwrap()); - assert_eq!(expected_tx_count, wollet.transactions().unwrap().len()); - assert_eq!(expected_utxo_count, wollet.utxos().unwrap().len()); + wallet.fund_btc(&env); + wallet.send_btc(&[&AnySigner::Software(signer.clone())], None, None); + + let balance = wallet.wollet.balance().unwrap(); + let txs = wallet.wollet.transactions().unwrap().len(); + let utxos = wallet.wollet.utxos().unwrap().len(); + assert_eq!(wallet.wollet.updates().unwrap().len(), 4); + + { + // Simulate a restart from the state produced by wallet.wollet + let dir = tempfile::TempDir::new().unwrap(); + cp_dir_rec(&path, dir.path()); + { + // Restart with merge threshold + let wollet = WolletBuilder::new(network, desc.clone()) + .with_legacy_fs_store(&dir) + .unwrap() + .with_merge_threshold(Some(2)) + .build() + .unwrap(); + + // Updates are merged + assert_eq!(wollet.updates().unwrap().len(), 1); + + // Internal state is identical to the original wallet + assert_eq!(balance, wollet.balance().unwrap()); + assert_eq!(txs, wollet.transactions().unwrap().len()); + assert_eq!(utxos, wollet.utxos().unwrap().len()); + } - let updates_after = wollet.updates().unwrap(); - assert_eq!(updates_after.len(), 1); + { + // Restart from merged update + let wollet = WolletBuilder::new(network, desc.clone()) + .with_legacy_fs_store(&dir) + .unwrap() + .build() + .unwrap(); + + // We get the same state + assert_eq!(wollet.updates().unwrap().len(), 1); + assert_eq!(balance, wollet.balance().unwrap()); + assert_eq!(txs, wollet.transactions().unwrap().len()); + assert_eq!(utxos, wollet.utxos().unwrap().len()); + } + } - // Verify the merged wallet can be reopened and still has correct state - drop(wollet); - let mut wollet = WolletBuilder::new(network, descriptor.clone()) - .with_legacy_fs_store(&db_root_dir) - .unwrap() - .build() - .unwrap(); - assert_eq!(expected_balance, wollet.balance().unwrap()); - assert_eq!(expected_tx_count, wollet.transactions().unwrap().len()); - assert_eq!(expected_utxo_count, wollet.utxos().unwrap().len()); - assert_eq!(wollet.updates().unwrap().len(), 1); - - // Test merge with txid_height_delete: - // Build a tx, apply it without broadcasting, then sync so it gets deleted. - let address = wollet.address(None).unwrap().address().clone(); - let mut pset = wollet + // Now we want to simulate the case where a transaction appears + // in the tx list and then it disappears. So we can test that we + // correctly handle merging updates where we "delete" a transaction. + let address = wallet.address(); + let mut pset = wallet .tx_builder() .add_lbtc_recipient(&address, 10_000) .unwrap() .finish() .unwrap(); - let signer = SwSigner::new(TEST_MNEMONIC, false).unwrap(); signer.sign(&mut pset).unwrap(); - let tx = wollet.finalize(&mut pset).unwrap(); + let tx = wallet.wollet.finalize(&mut pset).unwrap(); let phantom_txid = tx.txid(); - wollet.apply_transaction(tx).unwrap(); - assert!(wollet + wallet.wollet.apply_transaction(tx).unwrap(); + assert!(wallet + .wollet .transactions() .unwrap() .iter() .any(|tx| tx.txid == phantom_txid)); // Sync: electrum doesn't know about the phantom tx, so it will be deleted - let mut client = test_client_electrum(&env.electrum_url()); - let update = client.full_scan(&wollet).unwrap().unwrap(); + let update = wallet.client.full_scan(&wallet.wollet).unwrap().unwrap(); assert!(update.txid_height_delete.contains(&phantom_txid)); - wollet.apply_update(update).unwrap(); - assert!(wollet + wallet.wollet.apply_update(update).unwrap(); + assert!(wallet + .wollet .transactions() .unwrap() .iter() .all(|tx| tx.txid != phantom_txid)); // State should be back to what it was before the phantom tx - assert_eq!(expected_balance, wollet.balance().unwrap()); - assert_eq!(expected_tx_count, wollet.transactions().unwrap().len()); - assert_eq!(expected_utxo_count, wollet.utxos().unwrap().len()); - - let num_updates = wollet.updates().unwrap().len(); - assert_eq!(num_updates, 3); + assert_eq!(balance, wallet.wollet.balance().unwrap()); + assert_eq!(txs, wallet.wollet.transactions().unwrap().len()); + assert_eq!(utxos, wallet.wollet.utxos().unwrap().len()); + + // With 2 extra updates: one that adds the tx, one that removes it + assert_eq!(wallet.wollet.updates().unwrap().len(), 6); + + { + let dir = tempfile::TempDir::new().unwrap(); + cp_dir_rec(&path, dir.path()); + { + // Restart with merge threshold + // Note: update includes txid_height_delete + let wollet = WolletBuilder::new(network, desc.clone()) + .with_legacy_fs_store(&dir) + .unwrap() + .with_merge_threshold(Some(2)) + .build() + .unwrap(); + + assert_eq!(wollet.updates().unwrap().len(), 1); + assert_eq!(balance, wollet.balance().unwrap()); + assert_eq!(txs, wollet.transactions().unwrap().len()); + assert_eq!(utxos, wollet.utxos().unwrap().len()); + } - // Reopen with merge threshold to trigger merge including txid_height_delete - drop(wollet); + { + // Restart from merged update + let wollet = WolletBuilder::new(network, desc.clone()) + .with_legacy_fs_store(&dir) + .unwrap() + .build() + .unwrap(); + + assert_eq!(wollet.updates().unwrap().len(), 1); + assert_eq!(balance, wollet.balance().unwrap()); + assert_eq!(txs, wollet.transactions().unwrap().len()); + assert_eq!(utxos, wollet.utxos().unwrap().len()); + } + } - let file_store = FileStore::new(path).unwrap(); - let encrypted_store = EncryptedStore::new(file_store, descriptor.encryption_key_bytes()); + // Test merging updates with a running instance + { + let dir = tempfile::TempDir::new().unwrap(); + cp_dir_rec(&path, dir.path()); + { + // Restart with merge threshold + let mut wollet = WolletBuilder::new(network, desc.clone()) + .with_legacy_fs_store(&dir) + .unwrap() + .with_merge_threshold(Some(2)) + .build() + .unwrap(); + + // All previous updates are merged + assert_eq!(wollet.updates().unwrap().len(), 1); + // Same state as before + assert_eq!(balance, wollet.balance().unwrap()); + assert_eq!(txs, wollet.transactions().unwrap().len()); + assert_eq!(utxos, wollet.utxos().unwrap().len()); + + fn wait_tx_update(wollet: &mut Wollet, client: &mut ElectrumClient) -> Update { + for _ in 0..10 { + std::thread::sleep(std::time::Duration::from_millis(500)); + if let Some(update) = client.full_scan(wollet).unwrap() { + if !update.new_txs.txs.is_empty() { + return update; + } + } + } + panic!("update did not arrive"); + } - let wollet = WolletBuilder::new(network, descriptor.clone()) - .with_store(std::sync::Arc::new(encrypted_store)) - .with_merge_threshold(Some(2)) - .build() - .unwrap(); + // Apply one update (2): not merged + let address = wollet.address(Some(0)).unwrap().address().clone(); + env.elementsd_sendtoaddress(&address, 10000, None); + let mut client = test_client_electrum(&env.electrum_url()); + let update = wait_tx_update(&mut wollet, &mut client); + wollet.apply_update(update.clone()).unwrap(); + assert_eq!(wollet.updates().unwrap().len(), 2); + + // Apply another update (3): merged + env.elementsd_sendtoaddress(&address, 20000, None); + let update = wait_tx_update(&mut wollet, &mut wallet.client); + wollet.apply_update(update.clone()).unwrap(); + assert_eq!(txs + 2, wollet.transactions().unwrap().len()); + assert_eq!(wollet.updates().unwrap().len(), 1); + } - assert_eq!(expected_balance, wollet.balance().unwrap()); - assert_eq!(expected_tx_count, wollet.transactions().unwrap().len()); - assert_eq!(expected_utxo_count, wollet.utxos().unwrap().len()); - assert_eq!(wollet.updates().unwrap().len(), 1); + { + // Restart from merged update + let wollet = WolletBuilder::new(network, desc.clone()) + .with_legacy_fs_store(&dir) + .unwrap() + .build() + .unwrap(); - // Final reopen to verify persistence of the merged-with-deletes state - drop(wollet); - let wollet = WolletBuilder::new(network, descriptor) - .with_legacy_fs_store(&db_root_dir) - .unwrap() - .build() - .unwrap(); - assert_eq!(expected_balance, wollet.balance().unwrap()); - assert_eq!(expected_tx_count, wollet.transactions().unwrap().len()); - assert_eq!(expected_utxo_count, wollet.utxos().unwrap().len()); - assert_eq!(wollet.updates().unwrap().len(), 1); + assert_eq!(wollet.updates().unwrap().len(), 1); + assert_eq!(txs + 2, wollet.transactions().unwrap().len()); + } + } } #[test] diff --git a/lwk_wollet/tests/test_wollet.rs b/lwk_wollet/tests/test_wollet.rs index 73421b84..ff096d3f 100644 --- a/lwk_wollet/tests/test_wollet.rs +++ b/lwk_wollet/tests/test_wollet.rs @@ -103,8 +103,8 @@ impl TestWollet { self.wollet.tx_builder() } - pub fn db_root_dir(self) -> TempDir { - self.db_root_dir + pub fn path(&self) -> std::path::PathBuf { + self.db_root_dir.path().to_owned() } pub fn policy_asset(&self) -> AssetId { @@ -646,7 +646,7 @@ impl TestWollet { let descriptor = wollet.wollet.descriptor().unwrap().to_string(); let expected_updates = wollet.wollet.updates().unwrap(); let expected = wollet.wollet.balance().unwrap(); - let db_root_dir = wollet.db_root_dir(); + let db_root_dir = wollet.path(); let network = ElementsNetwork::default_regtest(); for _ in 0..2 {