Skip to content

Commit 6f20a00

Browse files
joostjagerclaude
andcommitted
Include ChannelManager in flush() write batch
Extends Persist::flush() to accept channel_manager_bytes, allowing the channel manager to be written in the same write_batch() call as channel monitors. The channel manager is always written first in the batch to ensure proper ordering. This removes the separate channel manager persistence from the background processor and combines it with the monitor flush, reducing round trips. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent c8248bb commit 6f20a00

4 files changed

Lines changed: 113 additions & 135 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 48 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,10 @@ use lightning::sign::{
5757
use lightning::util::async_poll::MaybeSend;
5858
use lightning::util::logger::Logger;
5959
use lightning::util::persist::{
60-
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
61-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
62-
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
63-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
64-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
60+
KVStore, KVStoreSync, KVStoreSyncWrapper, NETWORK_GRAPH_PERSISTENCE_KEY,
61+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
62+
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
63+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
6564
};
6665
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
6766
use lightning::util::wakers::Future;
@@ -1150,44 +1149,13 @@ where
11501149
None => {},
11511150
}
11521151

1153-
let mut futures = Joiner::new();
1152+
// Type A is unused but needed for inference - we use the same boxed future type as other slots
1153+
let mut futures: Joiner<lightning::io::Error, core::pin::Pin<Box<dyn core::future::Future<Output = Result<(), lightning::io::Error>> + Send + 'static>>, _, _, _, _> = Joiner::new();
11541154

1155-
// Capture the number of pending monitor writes before persisting the channel manager.
1156-
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1157-
// monitor updates that arrived after the manager state was captured.
1155+
// Capture the number of pending monitor writes and whether manager needs persistence.
1156+
// We'll flush monitors and manager together in a single batch after other tasks complete.
11581157
let pending_monitor_writes = chain_monitor.pending_write_count();
1159-
1160-
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1161-
log_trace!(logger, "Persisting ChannelManager...");
1162-
1163-
let fut = async {
1164-
kv_store
1165-
.write(
1166-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1167-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1168-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1169-
channel_manager.get_cm().encode(),
1170-
)
1171-
.await
1172-
};
1173-
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1174-
let mut fut = Box::pin(fut);
1175-
1176-
// Because persisting the ChannelManager is important to avoid accidental
1177-
// force-closures, go ahead and poll the future once before we do slightly more
1178-
// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
1179-
// below. This will get it moving but won't block us for too long if the underlying
1180-
// future is actually async.
1181-
use core::future::Future;
1182-
let mut waker = dummy_waker();
1183-
let mut ctx = task::Context::from_waker(&mut waker);
1184-
match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1185-
task::Poll::Ready(res) => futures.set_a_res(res),
1186-
task::Poll::Pending => futures.set_a(fut),
1187-
}
1188-
1189-
log_trace!(logger, "Done persisting ChannelManager.");
1190-
}
1158+
let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence();
11911159

11921160
// Note that we want to archive stale ChannelMonitors and run a network graph prune once
11931161
// not long after startup before falling back to their usual infrequent runs. This avoids
@@ -1354,11 +1322,13 @@ where
13541322
res?;
13551323
}
13561324

1357-
// Flush the monitor writes that were pending before we persisted the channel manager.
1358-
// Any writes that arrived after are left in the queue for the next iteration.
1359-
if pending_monitor_writes > 0 {
1360-
match chain_monitor.flush(pending_monitor_writes) {
1361-
Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes),
1325+
// Flush monitors and manager together in a single batch.
1326+
// Any monitor writes that arrived after are left in the queue for the next iteration.
1327+
if pending_monitor_writes > 0 || needs_manager_persist {
1328+
log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes);
1329+
let manager_bytes = channel_manager.get_cm().encode();
1330+
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
1331+
Ok(()) => log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes),
13621332
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
13631333
}
13641334
}
@@ -1416,25 +1386,18 @@ where
14161386
}
14171387
log_trace!(logger, "Terminating background processor.");
14181388

1419-
// After we exit, ensure we persist the ChannelManager one final time - this avoids
1420-
// some races where users quit while channel updates were in-flight, with
1421-
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1422-
kv_store
1423-
.write(
1424-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1425-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1426-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1427-
channel_manager.get_cm().encode(),
1428-
)
1429-
.await?;
1430-
1431-
// Flush all pending monitor writes after final channel manager persistence.
1389+
// After we exit, ensure we persist the ChannelManager one final time along with any
1390+
// pending monitor writes - this avoids some races where users quit while channel updates
1391+
// were in-flight, with ChannelMonitor update(s) persisted without a corresponding
1392+
// ChannelManager update.
14321393
let pending_monitor_writes = chain_monitor.pending_write_count();
1433-
if pending_monitor_writes > 0 {
1434-
match chain_monitor.flush(pending_monitor_writes) {
1435-
Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes),
1436-
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
1437-
}
1394+
let manager_bytes = channel_manager.get_cm().encode();
1395+
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
1396+
Ok(()) => log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes),
1397+
Err(e) => {
1398+
log_error!(logger, "Failed final flush: {}", e);
1399+
return Err(e);
1400+
},
14381401
}
14391402

14401403
if let Some(ref scorer) = scorer {
@@ -1746,25 +1709,17 @@ impl BackgroundProcessor {
17461709
channel_manager.get_cm().timer_tick_occurred();
17471710
last_freshness_call = Instant::now();
17481711
}
1749-
// Capture the number of pending monitor writes before persisting the channel manager.
1712+
// Capture the number of pending monitor writes and whether manager needs persistence.
17501713
let pending_monitor_writes = chain_monitor.pending_write_count();
1714+
let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence();
17511715

1752-
if channel_manager.get_cm().get_and_clear_needs_persistence() {
1753-
log_trace!(logger, "Persisting ChannelManager...");
1754-
(kv_store.write(
1755-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1756-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1757-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1758-
channel_manager.get_cm().encode(),
1759-
))?;
1760-
log_trace!(logger, "Done persisting ChannelManager.");
1761-
}
1762-
1763-
// Flush the monitor writes that were pending before we persisted the channel manager.
1764-
if pending_monitor_writes > 0 {
1765-
match chain_monitor.flush(pending_monitor_writes) {
1716+
// Flush monitors and manager together in a single batch.
1717+
if pending_monitor_writes > 0 || needs_manager_persist {
1718+
log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes);
1719+
let manager_bytes = channel_manager.get_cm().encode();
1720+
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
17661721
Ok(()) => {
1767-
log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes)
1722+
log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes)
17681723
},
17691724
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
17701725
}
@@ -1881,25 +1836,20 @@ impl BackgroundProcessor {
18811836
}
18821837
}
18831838

1884-
// After we exit, ensure we persist the ChannelManager one final time - this avoids
1885-
// some races where users quit while channel updates were in-flight, with
1886-
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1887-
kv_store.write(
1888-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1889-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1890-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1891-
channel_manager.get_cm().encode(),
1892-
)?;
1893-
1894-
// Flush all pending monitor writes after final channel manager persistence.
1839+
// After we exit, ensure we persist the ChannelManager one final time along with any
1840+
// pending monitor writes - this avoids some races where users quit while channel updates
1841+
// were in-flight, with ChannelMonitor update(s) persisted without a corresponding
1842+
// ChannelManager update.
18951843
let pending_monitor_writes = chain_monitor.pending_write_count();
1896-
if pending_monitor_writes > 0 {
1897-
match chain_monitor.flush(pending_monitor_writes) {
1898-
Ok(()) => {
1899-
log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes)
1900-
},
1901-
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
1902-
}
1844+
let manager_bytes = channel_manager.get_cm().encode();
1845+
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
1846+
Ok(()) => {
1847+
log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes)
1848+
},
1849+
Err(e) => {
1850+
log_error!(logger, "Failed final flush: {}", e);
1851+
return Err(e.into());
1852+
},
19031853
}
19041854

19051855
if let Some(ref scorer) = scorer {

lightning/src/chain/chainmonitor.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,17 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
209209

210210
/// Flushes pending writes to the underlying storage.
211211
///
212-
/// The `count` parameter specifies how many pending writes to flush.
212+
/// The `count` parameter specifies how many pending monitor writes to flush.
213+
/// The `channel_manager_bytes` parameter contains the serialized channel manager to persist.
214+
///
215+
/// The channel manager is always written first in the batch, before any monitor writes,
216+
/// to ensure proper ordering (manager state should be at least as recent as monitors on disk).
213217
///
214218
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
215219
/// from persist methods), this method should write queued data to storage.
216220
///
217221
/// Returns the list of completed monitor updates (channel_id, update_id) that were flushed.
218-
fn flush(&self, count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error>;
222+
fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error>;
219223
}
220224

221225
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -347,8 +351,8 @@ where
347351
self.persister.pending_write_count()
348352
}
349353

350-
fn flush(&self, count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
351-
crate::util::persist::poll_sync_future(self.persister.flush(count))
354+
fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
355+
crate::util::persist::poll_sync_future(self.persister.flush(count, channel_manager_bytes))
352356
}
353357
}
354358

@@ -760,14 +764,17 @@ where
760764

761765
/// Flushes pending writes to the underlying storage.
762766
///
763-
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
764-
/// If `count` is `None`, all pending writes are flushed.
767+
/// The `count` parameter specifies how many pending monitor writes to flush.
768+
/// The `channel_manager_bytes` parameter contains the serialized channel manager to persist.
769+
///
770+
/// The channel manager is always written first in the batch, before any monitor writes,
771+
/// to ensure proper ordering (manager state should be at least as recent as monitors on disk).
765772
///
766773
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
767774
/// from persist methods), this method writes queued data to storage and signals
768775
/// completion to the channel manager via [`Self::channel_monitor_updated`].
769-
pub fn flush(&self, count: usize) -> Result<(), io::Error> {
770-
let completed = self.persister.flush(count)?;
776+
pub fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<(), io::Error> {
777+
let completed = self.persister.flush(count, channel_manager_bytes)?;
771778
for (channel_id, update_id) in completed {
772779
let _ = self.channel_monitor_updated(channel_id, update_id);
773780
}

lightning/src/util/persist.rs

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,15 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
563563
);
564564
}
565565

566-
fn flush(&self, _count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
567-
// KVStoreSync implementations persist immediately, so there's nothing to flush.
566+
fn flush(&self, _count: usize, channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
567+
// KVStoreSync implementations persist immediately, so there's nothing to flush
568+
// for monitors. However, we still need to persist the channel manager.
569+
self.write(
570+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
571+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
572+
CHANNEL_MANAGER_PERSISTENCE_KEY,
573+
channel_manager_bytes,
574+
)?;
568575
Ok(Vec::new())
569576
}
570577
}
@@ -902,8 +909,8 @@ where
902909
self.0.pending_write_count()
903910
}
904911

905-
fn flush(&self, count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
906-
poll_sync_future(self.0.flush(count))
912+
fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
913+
poll_sync_future(self.0.flush(count, channel_manager_bytes))
907914
}
908915
}
909916

@@ -1116,22 +1123,35 @@ where
11161123

11171124
/// Flushes pending writes to the underlying [`KVStore`].
11181125
///
1119-
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
1120-
/// If `count` is `None`, all pending writes are flushed.
1126+
/// The `count` parameter specifies how many pending monitor writes to flush.
1127+
/// The `channel_manager_bytes` parameter contains the serialized channel manager to persist.
1128+
///
1129+
/// The channel manager is always written first in the batch, before any monitor writes,
1130+
/// to ensure proper ordering (manager state should be at least as recent as monitors on disk).
11211131
///
11221132
/// This method should be called after one or more calls that queue persist operations
11231133
/// to actually write the data to storage.
11241134
///
11251135
/// Returns the list of completed monitor updates (channel_id, update_id) that were flushed.
1126-
pub async fn flush(&self, count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
1136+
pub async fn flush(
1137+
&self, count: usize, channel_manager_bytes: Vec<u8>,
1138+
) -> Result<Vec<(ChannelId, u64)>, io::Error> {
11271139
let pending = {
11281140
let mut queue = self.0.pending_writes.lock().unwrap();
11291141
let n = count.min(queue.len());
11301142
queue.drain(..n).collect::<Vec<_>>()
11311143
};
11321144

11331145
// Phase 1: Collect all batch entries
1134-
let mut batch_entries = Vec::with_capacity(pending.len());
1146+
// Channel manager goes FIRST to ensure it's written before monitors
1147+
let mut batch_entries = Vec::with_capacity(pending.len() + 1);
1148+
batch_entries.push(BatchWriteEntry::new(
1149+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1150+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1151+
CHANNEL_MANAGER_PERSISTENCE_KEY,
1152+
channel_manager_bytes,
1153+
));
1154+
11351155
let mut stale_cleanups = Vec::new();
11361156

11371157
for (i, write) in pending.iter().enumerate() {
@@ -1164,29 +1184,30 @@ where
11641184
}
11651185

11661186
// Phase 2: Execute batch write
1167-
let successful_writes = if !batch_entries.is_empty() {
1168-
let result = self.0.kv_store.write_batch(batch_entries).await;
1169-
if let Some(err) = result.error {
1170-
// Re-queue failed and subsequent writes
1171-
let failed_writes =
1172-
pending.into_iter().skip(result.successful_writes).collect::<Vec<_>>();
1173-
if !failed_writes.is_empty() {
1174-
let mut queue = self.0.pending_writes.lock().unwrap();
1175-
// Prepend failed writes back to the front of the queue
1176-
for write in failed_writes.into_iter().rev() {
1177-
queue.insert(0, write);
1178-
}
1187+
let result = self.0.kv_store.write_batch(batch_entries).await;
1188+
if let Some(err) = result.error {
1189+
// The first entry is the channel manager, so successful_writes includes it
1190+
// Monitor writes start at index 1, so subtract 1 to get monitor success count
1191+
let successful_monitor_writes =
1192+
if result.successful_writes > 0 { result.successful_writes - 1 } else { 0 };
1193+
// Re-queue failed and subsequent monitor writes
1194+
let failed_writes =
1195+
pending.into_iter().skip(successful_monitor_writes).collect::<Vec<_>>();
1196+
if !failed_writes.is_empty() {
1197+
let mut queue = self.0.pending_writes.lock().unwrap();
1198+
// Prepend failed writes back to the front of the queue
1199+
for write in failed_writes.into_iter().rev() {
1200+
queue.insert(0, write);
11791201
}
1180-
return Err(err);
11811202
}
1182-
result.successful_writes
1183-
} else {
1184-
0
1185-
};
1203+
return Err(err);
1204+
}
1205+
// Subtract 1 for the channel manager entry to get monitor success count
1206+
let successful_monitor_writes = result.successful_writes.saturating_sub(1);
11861207

11871208
// Phase 3: Cleanup stale updates (only for successfully written monitors)
11881209
for (i, monitor_key, start, end) in stale_cleanups {
1189-
if i < successful_writes {
1210+
if i < successful_monitor_writes {
11901211
for update_id in start..end {
11911212
let update_name = UpdateName::from(update_id);
11921213
// Lazy delete - ignore errors as this is just cleanup
@@ -1207,7 +1228,7 @@ where
12071228
// Phase 4: Return completions for successful writes only
12081229
let completions = pending
12091230
.into_iter()
1210-
.take(successful_writes)
1231+
.take(successful_monitor_writes)
12111232
.map(|write| match write {
12121233
PendingWrite::FullMonitor { completion, .. } => completion,
12131234
PendingWrite::Update { completion, .. } => completion,

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ impl<Signer: sign::ecdsa::EcdsaChannelSigner> Persist<Signer> for WatchtowerPers
825825
);
826826
}
827827

828-
fn flush(&self, _count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
828+
fn flush(&self, _count: usize, _channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
829829
Ok(Vec::new())
830830
}
831831
}
@@ -892,7 +892,7 @@ impl<Signer: sign::ecdsa::EcdsaChannelSigner> Persist<Signer> for TestPersister
892892
self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &monitor_name);
893893
}
894894

895-
fn flush(&self, _count: usize) -> Result<Vec<(ChannelId, u64)>, io::Error> {
895+
fn flush(&self, _count: usize, _channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
896896
Ok(Vec::new())
897897
}
898898
}

0 commit comments

Comments
 (0)