Skip to content
55 changes: 54 additions & 1 deletion lwk_wollet/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,16 @@ impl Wollet {
);
let mut last_used_internal = None;
let mut last_used_external = None;
for (txid, _) in txid_height_new {
// Also include deleted txids: a tx that was seen and then deleted (e.g. a phantom tx
// applied locally but not confirmed on-chain) should still count as having used its
// addresses so those addresses are not reused. This matters for merged updates where
// phantom txids are removed from txid_height_new but still appear in txid_height_delete.
let txids_for_last_used: Vec<Txid> = txid_height_new
.into_iter()
.map(|(t, _)| t)
.chain(txid_height_delete.iter().copied())
.collect();
for txid in txids_for_last_used {
if let Some(tx) = cache.all_txs.get(&txid) {
for (vout, output) in tx.output.iter().enumerate() {
if !cache
Expand Down Expand Up @@ -439,9 +448,53 @@ impl Wollet {
.map_err(|e| Error::Generic(format!("store error: {e}")))?;
*next_index += 1;

*next_index = self.merge_updates(*next_index)?;

Ok(())
}

pub(crate) fn merge_updates(&self, next_index: usize) -> Result<usize, Error> {
match self.merge_threshold {
Some(threshold) if threshold < next_index => (),
_ => return Ok(next_index), // Not merging
};

// Read and merge all persisted updates
let first_bytes = self
.store
.get(&update_key(0))
.map_err(|e| Error::Generic(format!("store error: {e}")))?
.ok_or_else(|| Error::Generic("expected update 0 to exist".into()))?;
let mut merged = Update::deserialize(&first_bytes)?;

for i in 1..next_index {
let bytes = self
.store
.get(&update_key(i))
.map_err(|e| Error::Generic(format!("store error: {e}")))?
.ok_or_else(|| Error::Generic(format!("expected update {i} to exist")))?;
merged.merge(Update::deserialize(&bytes)?);
}

// Delete all old updates from last to first to avoid holes on crash
for j in (0..next_index).rev() {
self.store
.remove(&update_key(j))
.map_err(|e| Error::Generic(format!("failed to remove update {j}: {e}")))?;
}
// A crash here or during the removal loop will leave the cache empty or at an old state,
// which is not the end of the world, the following scan will bring it back.

// Store the merged update as update 0
let merged_bytes = merged.serialize()?;
self.store
.put(&update_key(0), &merged_bytes)
.map_err(|e| Error::Generic(format!("failed to store merged update: {e}")))?;

let next_index = 1;
Ok(next_index)
}

/// Apply a transaction to the wallet state
///
/// Wallet transactions are normally obtained using [`crate::clients::blocking::BlockchainBackend::full_scan()`]
Expand Down
64 changes: 7 additions & 57 deletions lwk_wollet/src/wollet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct Wollet {
pub(crate) descriptor: WolletDescriptor,
/// Counter for the next update key
pub(crate) next_update_index: Mutex<usize>,
pub(crate) merge_threshold: Option<usize>,
/// cached value
max_weight_to_satisfy: usize,
}
Expand All @@ -81,8 +82,9 @@ impl WolletBuilder {
}
}

/// Set the threshold for merging updates during build.
/// When the number of updates exceeds this threshold, they will be merged into one.
/// Set a threshold to merge updates
///
/// When the number of updates exceeds the threshold, they are merged into one.
/// Set to None to disable merging (default).
pub fn with_merge_threshold(mut self, threshold: Option<usize>) -> Self {
self.merge_threshold = threshold;
Expand Down Expand Up @@ -125,24 +127,7 @@ impl WolletBuilder {
store: self.store,
next_update_index: Mutex::new(0),
max_weight_to_satisfy,
};

// Check if merging is enabled and needed
let mut merging = if let Some(threshold) = self.merge_threshold {
let merge_key = update_key(threshold);
match wollet.store.get(&merge_key) {
Ok(Some(_)) => {
// There are at least threshold+1 updates, need to merge
let first_key = update_key(0);
match wollet.store.get(&first_key) {
Ok(Some(bytes)) => Some(Update::deserialize(&bytes)?),
_ => None,
}
}
_ => None,
}
} else {
None
merge_threshold: self.merge_threshold,
};

// Restore updates from the store using indexed keys
Expand All @@ -151,49 +136,14 @@ impl WolletBuilder {
match wollet.store.get(&key) {
Ok(Some(bytes)) => {
let update = Update::deserialize(&bytes)?;
wollet.apply_update_no_persist(update.clone())?;
if let Some(ref mut m) = merging {
if i > 0 {
m.merge(update);
}
}
wollet.apply_update_no_persist(update)?;
}
Ok(None) => {
// Update the next index
let mut next_update_index = wollet
.next_update_index
.lock()
.map_err(|_| Error::Generic("next_update_index lock poisoned".into()))?;
*next_update_index = i;

// If we were merging, persist the merged update and clean up
if let Some(merged) = merging {
// Delete all old updates
// we are starting from the last to avoid having holes in the beginning
for j in (0..i).rev() {
let old_key = update_key(j);
wollet.store.remove(&old_key).map_err(|e| {
Error::Generic(format!("failed to remove update {}: {}", j, e))
})?;
}

// A crash here or during the removal loop will leave the cache empty or at
// an old state which is not the end of the world, the following scan will
// bring the cache back to the correct state.

// Store the merged update as update 0
let merged_bytes = merged.serialize()?;
wollet
.store
.put(&update_key(0), &merged_bytes)
.map_err(|e| {
Error::Generic(format!("failed to store merged update: {}", e))
})?;

// Update next_update_index to 1
*next_update_index = 1;
}

*next_update_index = wollet.merge_updates(i)?;
break;
}
Err(e) => return Err(Error::Generic(format!("store error: {e}"))),
Expand Down
Loading
Loading