From 91b883a48464c4ebd5c1d7f5775f06cd5b19c770 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Wed, 4 Mar 2026 15:59:50 -0800 Subject: [PATCH 01/20] perf(db): replace disabled compaction with bounded bulk-load mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, initial sync disabled auto-compaction entirely, causing L0 files to accumulate without bound — observed 1 000+ files in practice. With unbounded L0 accumulation, multi_get() must binary-search the index block of every L0 file per key, making lookup_txos() increasingly slow as sync progresses (~700 s/batch with 1 000+ L0 files). Replace the "all or nothing" approach with a bounded bulk-load mode: auto-compaction is always enabled, but the L0 compaction trigger is raised to 64 files (vs the default 4). This keeps write amplification low — compaction fires infrequently in large batches — while capping L0 file count and therefore lookup cost. Slowdown/stop triggers are set at 4×/8× the compaction trigger so writes are never stalled while background compaction catches up. The pending-compaction-bytes stall is disabled to prevent blocking writes against the large initial backlog. When initial sync is complete, reset L0 triggers and pending-compaction stall thresholds to RocksDB defaults for stable steady-state performance. --- src/config.rs | 18 +++++------------- src/new_index/db.rs | 40 ++++++++++++++++++++++++++++++++++++++-- tests/common.rs | 1 - 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/config.rs b/src/config.rs index d23128e91..766549d25 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,17 +43,14 @@ pub struct Config { pub rpc_logging: RpcLogging, pub zmq_addr: Option, - /// Enable compaction during initial sync - /// - /// By default compaction is off until initial sync is finished for performance reasons, - /// however, this requires much more disk space. - pub initial_sync_compaction: bool, - /// RocksDB block cache size in MB (per database) /// Caches decompressed blocks in memory to avoid repeated decompression (CPU intensive) /// Total memory usage = cache_size * 3_databases (txstore, history, cache) - /// Recommendation: Start with 1024MB for production - /// Higher values reduce CPU load from cache misses but use more RAM + /// Recommendation: 1024 MB for steady-state; 4096 MB+ for initial sync (L0 SST + /// files accumulate up to the compaction trigger — their index, filter (Bloom), + /// and data blocks must fit in this cache). With 10 bits/key bloom filters and + /// a 512 MB write buffer, each L0 file's filter block is ~9.75 MB, so 64 L0 + /// files need ~625 MB of filter blocks on top of index blocks. pub db_block_cache_mb: usize, /// RocksDB parallelism level (background compaction and flush threads) @@ -229,10 +226,6 @@ impl Config { .long("anonymize-json-rpc-logging-source-ip") .help("enables ip anonymization in rpc logs") .takes_value(false) - ).arg( - Arg::with_name("initial_sync_compaction") - .long("initial-sync-compaction") - .help("Perform compaction during initial sync (slower but less disk space required)") ).arg( Arg::with_name("db_block_cache_mb") .long("db-block-cache-mb") @@ -487,7 +480,6 @@ impl Config { index_unspendables: m.is_present("index_unspendables"), cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), - initial_sync_compaction: m.is_present("initial_sync_compaction"), db_block_cache_mb: value_t_or_exit!(m, "db_block_cache_mb", usize), db_parallelism: value_t_or_exit!(m, "db_parallelism", usize), db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize), diff --git a/src/new_index/db.rs b/src/new_index/db.rs index c93ac1e12..b6d32b292 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -95,7 +95,27 @@ impl DB { db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); db_opts.set_target_file_size_base(1_073_741_824); - db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load + // Bulk-load compaction: allow L0 files to accumulate to a bounded limit + // before compacting. This reduces write amplification compared to the + // default trigger of 4, while keeping the file count — and therefore + // bloom-filter memory and lookup cost — bounded. + // + // With bloom filters at 10 bits/key and a 512 MB write buffer, each L0 + // file has ~7.8 M keys, so its filter block is ~9.75 MB. At 64 files + // that is ~625 MB of pinned filter blocks — well within an 8 GB cache. + // Each lookup checks 64 bloom filters (fast, in-memory) and reads from + // only ~0.64 files on average (1 % false-positive rate × 64 files). + // + // Set slowdown/stop triggers well above the compaction trigger so writes + // are never stalled while background compaction catches up. + // Disable the pending-compaction-bytes stall so the large backlog that + // builds up during the bulk load does not block writes. + const L0_BULK_TRIGGER: i32 = 64; + db_opts.set_level_zero_file_num_compaction_trigger(L0_BULK_TRIGGER); + db_opts.set_level_zero_slowdown_writes_trigger(L0_BULK_TRIGGER * 4); + db_opts.set_level_zero_stop_writes_trigger(L0_BULK_TRIGGER * 8); + db_opts.set_hard_pending_compaction_bytes_limit(0); + db_opts.set_soft_pending_compaction_bytes_limit(0); let parallelism: i32 = config.db_parallelism.try_into() @@ -133,7 +153,23 @@ impl DB { } pub fn enable_auto_compaction(&self) { - let opts = [("disable_auto_compactions", "false")]; + // Reset L0 triggers and pending-compaction stall thresholds to RocksDB + // defaults, so that steady-state operation compacts promptly and avoids + // unbounded compaction backlogs that cause read latency spikes. + // RocksDB defaults (stable since v5.x through v10.4.2). Hardcoded because + // set_options() doesn't return previous values and the Rust bindings lack getters. + + let soft_limit = (64u64 << 30).to_string(); // 64 GiB + let hard_limit = (256u64 << 30).to_string(); // 256 GiB + + let opts = [ + ("disable_auto_compactions", "false"), + ("level0_file_num_compaction_trigger", "4"), + ("level0_slowdown_writes_trigger", "20"), + ("level0_stop_writes_trigger", "36"), + ("soft_pending_compaction_bytes_limit", &soft_limit), + ("hard_pending_compaction_bytes_limit", &hard_limit), + ]; self.db.set_options(&opts).unwrap(); } diff --git a/tests/common.rs b/tests/common.rs index 40e542d28..53970705d 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -117,7 +117,6 @@ impl TestRunner { asset_db_path: None, // XXX #[cfg(feature = "liquid")] parent_network: bitcoin::Network::Regtest, - initial_sync_compaction: false, db_block_cache_mb: 8, db_parallelism: 2, db_write_buffer_size_mb: 256, From 1c01e4015685c9dc45fd8ccbfcedb86603b01f2e Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Mon, 2 Mar 2026 13:28:03 -0800 Subject: [PATCH 02/20] perf(db): cache index/filter blocks in block cache to bound memory With up to 64 L0 files accumulating before compaction fires, each open SST file has RocksDB allocating its index and filter blocks on the heap outside the block cache. This memory is not subject to LRU eviction and grows proportionally to the number of open SST files. Setting cache_index_and_filter_blocks(true) routes these blocks through the shared block cache, bounding total memory to --db-block-cache-mb per database. Increase to 4096+ for initial sync to ensure the working set of filter/index blocks stays in cache without thrashing. --- src/config.rs | 4 ++-- src/new_index/db.rs | 9 +++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 766549d25..6d291767d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -44,7 +44,7 @@ pub struct Config { pub zmq_addr: Option, /// RocksDB block cache size in MB (per database) - /// Caches decompressed blocks in memory to avoid repeated decompression (CPU intensive) + /// Caches decompressed data blocks, plus index and filter blocks (via cache_index_and_filter_blocks). /// Total memory usage = cache_size * 3_databases (txstore, history, cache) /// Recommendation: 1024 MB for steady-state; 4096 MB+ for initial sync (L0 SST /// files accumulate up to the compaction trigger — their index, filter (Bloom), @@ -229,7 +229,7 @@ impl Config { ).arg( Arg::with_name("db_block_cache_mb") .long("db-block-cache-mb") - .help("RocksDB block cache size in MB per database") + .help("RocksDB block cache size in MB per database. Bounds index/filter block memory; use 4096+ for initial sync to avoid table-reader heap growth.") .takes_value(true) .default_value("8") ).arg( diff --git a/src/new_index/db.rs b/src/new_index/db.rs index b6d32b292..132e193d6 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -134,6 +134,15 @@ impl DB { let mut block_opts = rocksdb::BlockBasedOptions::default(); let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(cache_size_bytes)); + // Store index and filter blocks inside the block cache so their memory is + // bounded by --db-block-cache-mb. Without this, RocksDB allocates table-reader + // memory (index + filter blocks) on the heap separately for every open SST file. + // During initial sync, L0 files accumulate up to the compaction trigger (64 by + // default) and this unbounded heap allocation can grow to many GB. + // Note: increase --db-block-cache-mb proportionally (e.g. 4096) so the cache is + // large enough to hold the working set of filter/index blocks without thrashing. + block_opts.set_cache_index_and_filter_blocks(true); + db_opts.set_block_based_table_factory(&block_opts); let db = DB { From ba8f8497f1b893dc8c3073e5942934424d4c8ab0 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Fri, 20 Feb 2026 14:05:31 -0800 Subject: [PATCH 03/20] perf(db): incremental bytes_per_sync for SST writes set_bytes_per_sync(1 MiB): background-syncs SST files incrementally, avoiding large fsync stalls at file close. --- src/new_index/db.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 132e193d6..b90ab7c08 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -127,10 +127,13 @@ impl DB { // Configure write buffer size (not set by increase_parallelism) db_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); - // db_opts.set_advise_random_on_open(???); db_opts.set_compaction_readahead_size(1 << 20); - // Configure block cache + // Background-sync SST files to the OS incrementally as they are written, + // rather than doing a large fsync on close. Smooths out I/O latency spikes. + db_opts.set_bytes_per_sync(1 << 20); + + // Configure block cache and bloom filters let mut block_opts = rocksdb::BlockBasedOptions::default(); let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(cache_size_bytes)); From 5f1eda55c0abf3ee1146a460c9754ff93fd8b9f6 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Mon, 23 Feb 2026 08:03:22 -0800 Subject: [PATCH 04/20] perf(db): parallel subcompactions for initial sync compact_range() (used by full_compaction at end of initial sync) is single-threaded by default regardless of increase_parallelism(). Setting max_subcompactions to db_parallelism splits the key range across all background threads, reducing the final compaction wall-clock time proportionally to CPU count. --- src/new_index/db.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index b90ab7c08..038c04e35 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -133,7 +133,13 @@ impl DB { // rather than doing a large fsync on close. Smooths out I/O latency spikes. db_opts.set_bytes_per_sync(1 << 20); - // Configure block cache and bloom filters + // Parallelize sub-ranges within a single compaction job (including the one-time + // full_compaction at the end of initial sync). Without this, compact_range() is + // single-threaded regardless of increase_parallelism(). Setting it equal to the + // parallelism level keeps all background threads busy during the final compaction. + db_opts.set_max_subcompactions(parallelism as u32); + + // Configure block cache and table options let mut block_opts = rocksdb::BlockBasedOptions::default(); let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(cache_size_bytes)); @@ -158,7 +164,6 @@ impl DB { } pub fn full_compaction(&self) { - // TODO: make sure this doesn't fail silently info!("starting full compaction on {:?}", self.db); self.db.compact_range(None::<&[u8]>, None::<&[u8]>); info!("finished full compaction on {:?}", self.db); From 208d727dc5bc02c346c69d9d18325abddad29c3a Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Thu, 5 Mar 2026 07:41:42 -0800 Subject: [PATCH 05/20] perf(db): forced bottommost compaction Use compact_range_opt with BottommostLevelCompaction::Force so the final full_compaction pushes all data to the lowest level and writes optimally-merged SST files. Also log elapsed time so it is easy to track how long the end-of-sync compaction takes. --- src/new_index/db.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 038c04e35..cc82e2a4e 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -165,8 +165,12 @@ impl DB { pub fn full_compaction(&self) { info!("starting full compaction on {:?}", self.db); - self.db.compact_range(None::<&[u8]>, None::<&[u8]>); - info!("finished full compaction on {:?}", self.db); + let start = std::time::Instant::now(); + let mut opts = rocksdb::CompactOptions::default(); + opts.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force); + self.db.compact_range_opt(None::<&[u8]>, None::<&[u8]>, &opts); + let elapsed = start.elapsed(); + info!("finished full compaction on {:?} in elapsed='{:.1?}'", self.db, elapsed); } pub fn enable_auto_compaction(&self) { From f3786385723f7ef463d2a8936c13faf0bf7c4d02 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Mon, 2 Mar 2026 13:28:34 -0800 Subject: [PATCH 06/20] perf(db): switch compression from Snappy to LZ4, Zstd for bottommost MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LZ4 decompresses roughly 2-3x faster than Snappy with comparable compression ratio. Profiling shows ~13% of cycles spent in Snappy decompression during lookup_txos(), making this a meaningful improvement for read-heavy workloads. Use Zstd for the bottommost level (L6), where data is fully compacted and rarely rewritten. Zstd compresses ~30-40% smaller than LZ4 with comparable decompression speed, reducing final DB size significantly. Existing SST files compressed with Snappy remain readable — RocksDB decompresses them transparently. New files will be written as LZ4/Zstd. --- src/new_index/db.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index cc82e2a4e..88f32a7b9 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -93,7 +93,8 @@ impl DB { db_opts.create_if_missing(true); db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); + db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + db_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); db_opts.set_target_file_size_base(1_073_741_824); // Bulk-load compaction: allow L0 files to accumulate to a bounded limit // before compacting. This reduces write amplification compared to the From defab7859ebc6f78814c6b3b1f8d95359ee270c7 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Thu, 5 Mar 2026 07:41:42 -0800 Subject: [PATCH 07/20] perf(db): larger compaction readahead Increase set_compaction_readahead_size from 1 MiB to 4 MiB. Better amortises syscall overhead when sequentially reading the many accumulated L0 files during the final compaction. --- src/new_index/db.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 88f32a7b9..e49b9988b 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -128,7 +128,10 @@ impl DB { // Configure write buffer size (not set by increase_parallelism) db_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); - db_opts.set_compaction_readahead_size(1 << 20); + // 4 MiB readahead for compaction I/O. Larger than the previous 1 MiB to better + // amortise syscall overhead when reading the many L0 files accumulated during + // initial sync. + db_opts.set_compaction_readahead_size(4 << 20); // Background-sync SST files to the OS incrementally as they are written, // rather than doing a large fsync on close. Smooths out I/O latency spikes. From 88606250e018eee54cb0413643a2e5e80e60f2c2 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Mon, 23 Feb 2026 13:22:22 -0800 Subject: [PATCH 08/20] perf(db): parallel sort in write_rows/delete_rows The sort before WriteBatch construction was single-threaded, pegging one core while all others sat idle during the index pass (~2-4M rows per batch). Switch to rayon par_sort_unstable_by so all available cores are used for the sort. --- src/new_index/db.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index e49b9988b..aadc660a7 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -1,4 +1,5 @@ use prometheus::GaugeVec; +use rayon::prelude::*; use rocksdb; use std::convert::TryInto; @@ -240,7 +241,7 @@ impl DB { self.db, flush ); - rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + rows.par_sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); for row in rows { batch.put(&row.key, &row.value); @@ -250,7 +251,7 @@ impl DB { pub fn delete_rows(&self, mut rows: Vec, flush: DBFlush) { log::trace!("deleting {} rows from {:?}", rows.len(), self.db,); - rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + rows.par_sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); for row in rows { batch.delete(&row.key); From c355522839dcc953afb849043660ede44e9771a4 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Wed, 4 Mar 2026 15:58:58 -0800 Subject: [PATCH 09/20] perf(db): add Bloom filters for faster point lookups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With up to 64 L0 files accumulating before compaction fires, each multi_get() must binary-search the index block of every L0 file whose key range overlaps the query — all of them for random txids. Before the bounded L0 trigger was introduced this was catastrophic (~700 s/batch with 1 000+ files); with 64 files it is still the dominant lookup cost. Add full-key Bloom filters at 10 bits/key (~1 % false-positive rate). With filters, only ~0.64 files on average need actual index or data I/O per key, reducing per-lookup cost by ~100×. The filter blocks are cached and pinned in the block cache alongside index blocks via the set_cache_index_and_filter_blocks and set_pin_l0_filter_and_index_blocks_in_cache settings. --- src/new_index/db.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index aadc660a7..a5ebb2fb0 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -156,6 +156,21 @@ impl DB { // Note: increase --db-block-cache-mb proportionally (e.g. 4096) so the cache is // large enough to hold the working set of filter/index blocks without thrashing. block_opts.set_cache_index_and_filter_blocks(true); + // Pin L0 index and filter blocks in the cache so they are never evicted. + // Without this, data block churn evicts L0 index/filter blocks, causing + // repeated disk reads for every SST lookup — worse than the old heap approach. + // With this, L0 index/filter blocks behave like the old table-reader heap + // allocation but stay within the bounded block cache. + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + // Full-key Bloom filters allow multi_get() to skip SST files that don't + // contain a key without touching the index or data blocks. Without this, + // every point lookup must binary-search the index of every L0 file whose + // key range overlaps the query (all of them for random txids) — extremely + // expensive with 1000+ L0 files accumulated during initial sync. + // At 10 bits/key the false-positive rate is ~1%, so only ~10 out of 1000 + // L0 files need actual I/O per key. The filter blocks are cached and pinned + // alongside the index blocks via the settings above. + block_opts.set_bloom_filter(10.0, false); db_opts.set_block_based_table_factory(&block_opts); From a3db816dd8c4b9dffdc1626b753bcf05fcd428e3 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Tue, 10 Mar 2026 09:16:59 -0700 Subject: [PATCH 10/20] test(db): add iterator tests for short and full-length prefix scans Tests iter_scan, iter_scan_from, iter_scan_reverse, and raw_iterator with both short (1-byte) and full (33-byte) prefixes. Validates correct row counts, prefix boundary enforcement, and no cross-prefix leakage. These tests exercise the exact scan patterns used by electrs: 1-byte prefixes for block headers (b"B") and done markers (b"D"), and 33-byte prefixes for history rows (code + scripthash). --- src/new_index/db.rs | 147 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index a5ebb2fb0..5c1e38d64 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -330,6 +330,20 @@ impl DB { } } + #[cfg(test)] + fn open_test(path: &Path) -> DB { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_bloom_filter(10.0, false); + db_opts.set_block_based_table_factory(&block_opts); + + DB { + db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open test RocksDB")), + } + } + pub fn start_stats_exporter(&self, db_metrics: Arc, db_name: &str) { let db_arc = Arc::clone(&self.db); let label = db_name.to_string(); @@ -380,3 +394,136 @@ impl DB { }); } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a key mimicking electrs row format: 1-byte code + 32-byte hash + optional suffix. + fn make_key(code: u8, hash_byte: u8, suffix: &[u8]) -> Vec { + let mut key = vec![code]; + key.extend_from_slice(&[hash_byte; 32]); + key.extend_from_slice(suffix); + key + } + + fn write_test_rows(db: &DB) { + let rows = vec![ + // B rows (block headers) — scanned with 1-byte prefix b"B" + DBRow { key: make_key(b'B', 0x01, &[]), value: b"header1".to_vec() }, + DBRow { key: make_key(b'B', 0x02, &[]), value: b"header2".to_vec() }, + // D rows (done markers) — scanned with 1-byte prefix b"D" + DBRow { key: make_key(b'D', 0x01, &[]), value: vec![] }, + DBRow { key: make_key(b'D', 0x02, &[]), value: vec![] }, + // H rows (history) — scanned with 33-byte prefix b"H" + scripthash + DBRow { key: make_key(b'H', 0xAA, &[0, 0, 0, 1]), value: vec![] }, + DBRow { key: make_key(b'H', 0xAA, &[0, 0, 0, 2]), value: vec![] }, + DBRow { key: make_key(b'H', 0xBB, &[0, 0, 0, 1]), value: vec![] }, + // O rows (txouts) — looked up by exact key, but scannable by 33-byte prefix + DBRow { key: make_key(b'O', 0xCC, &[0, 1]), value: b"txout1".to_vec() }, + DBRow { key: make_key(b'O', 0xCC, &[0, 2]), value: b"txout2".to_vec() }, + ]; + db.write_rows(rows, DBFlush::Enable); + } + + #[test] + fn test_iter_scan_short_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // 1-byte prefix scan — must find all B rows + let b_rows: Vec = db.iter_scan(b"B").collect(); + assert_eq!(b_rows.len(), 2, "expected 2 B rows, got {}", b_rows.len()); + assert_eq!(b_rows[0].value, b"header1"); + assert_eq!(b_rows[1].value, b"header2"); + + // 1-byte prefix scan — must find all D rows + let d_rows: Vec = db.iter_scan(b"D").collect(); + assert_eq!(d_rows.len(), 2, "expected 2 D rows, got {}", d_rows.len()); + } + + #[test] + fn test_iter_scan_full_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // 33-byte prefix scan — must find only H rows for hash 0xAA + let prefix = make_key(b'H', 0xAA, &[]); + let h_rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(h_rows.len(), 2, "expected 2 H/0xAA rows, got {}", h_rows.len()); + + // 33-byte prefix scan — must find only H rows for hash 0xBB + let prefix = make_key(b'H', 0xBB, &[]); + let h_rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(h_rows.len(), 1, "expected 1 H/0xBB row, got {}", h_rows.len()); + + // 33-byte prefix scan — O rows for hash 0xCC + let prefix = make_key(b'O', 0xCC, &[]); + let o_rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(o_rows.len(), 2, "expected 2 O/0xCC rows, got {}", o_rows.len()); + } + + #[test] + fn test_iter_scan_from_full_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // Scan H/0xAA starting from height 2 + let prefix = make_key(b'H', 0xAA, &[]); + let start = make_key(b'H', 0xAA, &[0, 0, 0, 2]); + let rows: Vec = db.iter_scan_from(&prefix, &start).collect(); + assert_eq!(rows.len(), 1, "expected 1 H/0xAA row from height 2, got {}", rows.len()); + } + + #[test] + fn test_iter_scan_reverse_full_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // Reverse scan H/0xAA from max + let prefix = make_key(b'H', 0xAA, &[]); + let prefix_max = make_key(b'H', 0xAA, &[0xFF, 0xFF, 0xFF, 0xFF]); + let rows: Vec = db.iter_scan_reverse(&prefix, &prefix_max).collect(); + assert_eq!(rows.len(), 2, "expected 2 H/0xAA rows in reverse, got {}", rows.len()); + // Should be in reverse order + assert!(rows[0].key > rows[1].key, "reverse scan should return descending keys"); + } + + #[test] + fn test_iter_scan_no_cross_prefix_leakage() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // Scanning for a non-existent prefix returns nothing + let prefix = make_key(b'H', 0xFF, &[]); + let rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(rows.len(), 0, "expected 0 rows for non-existent prefix"); + + // Scanning b"B" must not return D, H, or O rows + let b_rows: Vec = db.iter_scan(b"B").collect(); + for row in &b_rows { + assert_eq!(row.key[0], b'B', "B scan returned non-B row"); + } + } + + #[test] + fn test_raw_iterator_sees_all_rows() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + let mut iter = db.raw_iterator(); + iter.seek_to_first(); + let mut count = 0; + while iter.valid() { + count += 1; + iter.next(); + } + assert_eq!(count, 9, "expected 9 total rows, got {}", count); + } +} From fb5451f36cf5528705bc1d584b21c6daccd22f28 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Tue, 10 Mar 2026 09:18:28 -0700 Subject: [PATCH 11/20] perf(db): add prefix extractor for prefix Bloom filters on range scans Add a 33-byte fixed prefix extractor (code || hash) so RocksDB builds prefix Bloom filters instead of full-key filters. Range scans like iter_scan("H" + scripthash) can now skip L0 SST files whose Bloom filter doesn't match, rather than checking every file. With a prefix extractor, iterator scans whose seek key is shorter than 33 bytes would silently miss results. Use a conditional in iter_scan, iter_scan_from, and iter_scan_reverse: short prefixes (< 33 bytes) use total_order_seek for correctness, while full-length prefixes use the prefix bloom for performance. raw_iterator always uses total_order_seek since callers may seek to arbitrary positions. Requires full re-compaction (delete F marker) to rebuild SST files with prefix bloom metadata. --- src/new_index/db.rs | 73 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 15 deletions(-) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 5c1e38d64..88af7fe52 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -162,16 +162,28 @@ impl DB { // With this, L0 index/filter blocks behave like the old table-reader heap // allocation but stay within the bounded block cache. block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - // Full-key Bloom filters allow multi_get() to skip SST files that don't - // contain a key without touching the index or data blocks. Without this, - // every point lookup must binary-search the index of every L0 file whose - // key range overlaps the query (all of them for random txids) — extremely - // expensive with 1000+ L0 files accumulated during initial sync. - // At 10 bits/key the false-positive rate is ~1%, so only ~10 out of 1000 - // L0 files need actual I/O per key. The filter blocks are cached and pinned - // alongside the index blocks via the settings above. + // Bloom filters allow multi_get() to skip SST files that don't contain a key + // without touching the index or data blocks. Without this, every point lookup + // must binary-search the index of every L0 file whose key range overlaps the + // query (all of them for random txids) — extremely expensive with 1000+ L0 + // files accumulated during initial sync. At 10 bits/key the false-positive + // rate is ~1%, so only ~10 out of 1000 L0 files need actual I/O per key. + // Combined with the prefix extractor below, these become prefix Bloom filters + // keyed on `code || hash` (33 bytes), which also allow prefix range scans + // (e.g. history lookups) to skip L0 files entirely. The filter blocks are + // cached and pinned alongside the index blocks via the settings above. block_opts.set_bloom_filter(10.0, false); + // All electrs keys share the structure `code (1 byte) || hash (32 bytes) || ...`. + // A 33-byte fixed prefix extractor enables prefix Bloom filters: range scans + // like iter_scan("H" + scripthash) can skip SST files whose Bloom filter + // doesn't match the prefix, rather than checking every L0 file. + // + // INVARIANT: All iter_scan* and raw_iterator methods must use total_order_seek + // when the seek key may be shorter than 33 bytes. Without it, RocksDB silently + // skips SST files that contain matching keys. See the conditional in iter_scan(). + db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); + db_opts.set_block_based_table_factory(&block_opts); let db = DB { @@ -215,22 +227,49 @@ impl DB { } pub fn raw_iterator(&self) -> rocksdb::DBRawIterator<'_> { - self.db.raw_iterator() + let mut opts = rocksdb::ReadOptions::default(); + opts.set_total_order_seek(true); + self.db.raw_iterator_opt(opts) } pub fn iter_scan(&self, prefix: &[u8]) -> ScanIterator<'_> { + let iter = if prefix.len() >= 33 { + self.db.prefix_iterator(prefix) + } else { + // Short prefixes (e.g. b"B", b"D") are below the 33-byte prefix extractor + // length. prefix_iterator would silently skip SST files. Use total_order_seek + // to fall back to a full scan; ScanIterator enforces the prefix boundary. + let mut opts = rocksdb::ReadOptions::default(); + opts.set_total_order_seek(true); + self.db.iterator_opt( + rocksdb::IteratorMode::From(prefix, rocksdb::Direction::Forward), + opts, + ) + }; ScanIterator { prefix: prefix.to_vec(), - iter: self.db.prefix_iterator(prefix), + iter, done: false, } } pub fn iter_scan_from(&self, prefix: &[u8], start_at: &[u8]) -> ScanIterator<'_> { - let iter = self.db.iterator(rocksdb::IteratorMode::From( - start_at, - rocksdb::Direction::Forward, - )); + // start_at is always >= prefix length. When >= 33 bytes, the default seek + // uses the prefix extractor for bloom filtering automatically. When < 33 + // bytes, fall back to total_order_seek to avoid silent misses. + let iter = if start_at.len() >= 33 { + self.db.iterator(rocksdb::IteratorMode::From( + start_at, + rocksdb::Direction::Forward, + )) + } else { + let mut opts = rocksdb::ReadOptions::default(); + opts.set_total_order_seek(true); + self.db.iterator_opt( + rocksdb::IteratorMode::From(start_at, rocksdb::Direction::Forward), + opts, + ) + }; ScanIterator { prefix: prefix.to_vec(), iter, @@ -239,7 +278,10 @@ impl DB { } pub fn iter_scan_reverse(&self, prefix: &[u8], prefix_max: &[u8]) -> ReverseScanIterator<'_> { - let mut iter = self.db.raw_iterator(); + // total_order_seek is required for correctness: prefix mode (the default when a + // prefix extractor is set) is only guaranteed correct for forward iteration. + // SeekForPrev + Prev() in prefix mode can silently miss entries. + let mut iter = self.raw_iterator(); iter.seek_for_prev(prefix_max); ReverseScanIterator { @@ -334,6 +376,7 @@ impl DB { fn open_test(path: &Path) -> DB { let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); + db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); let mut block_opts = rocksdb::BlockBasedOptions::default(); block_opts.set_bloom_filter(10.0, false); From ad72179c712e8b14d66c51d3a4567293c89342cd Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Thu, 19 Feb 2026 12:00:08 -0800 Subject: [PATCH 12/20] perf(index): merge two-pass add+index into single per-batch pass MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, initial sync processed the full batch in two sequential full sweeps: add() wrote T/O rows for every block, then index() read them back to build history rows. This meant peak memory scaled with the entire batch, and the fetch pipeline could not overlap with index work because both passes had to complete before the next fetch began. Merging into a single pass — add then index for each block in turn — reduces peak working-set size and allows the prefetch pipeline to deliver the next batch's blocks while the current batch is still being indexed. --- src/new_index/schema.rs | 102 ++++++++++++++++++++++++---------------- 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index cccb8bbc3..81690b0f1 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -250,20 +250,13 @@ impl Indexer { self.duration.with_label_values(&[name]).start_timer() } - fn headers_to_add(&self, new_headers: &[HeaderEntry]) -> Vec { - let added_blockhashes = self.store.added_blockhashes.read().unwrap(); + // Headers that need any work: either not yet added to txstore or not yet indexed to history. + fn headers_to_process(&self, new_headers: &[HeaderEntry]) -> Vec { + let added = self.store.added_blockhashes.read().unwrap(); + let indexed = self.store.indexed_blockhashes.read().unwrap(); new_headers .iter() - .filter(|e| !added_blockhashes.contains(e.hash())) - .cloned() - .collect() - } - - fn headers_to_index(&self, new_headers: &[HeaderEntry]) -> Vec { - let indexed_blockhashes = self.store.indexed_blockhashes.read().unwrap(); - new_headers - .iter() - .filter(|e| !indexed_blockhashes.contains(e.hash())) + .filter(|e| !added.contains(e.hash()) || !indexed.contains(e.hash())) .cloned() .collect() } @@ -333,43 +326,72 @@ impl Indexer { .map(|blocks| self.undo_index(&blocks)); } - // Add new blocks to the txstore db - let to_add = self.headers_to_add(&new_headers); + // Single-pass: add to txstore and index to history in the same per-batch loop. + // + // In the old two-pass approach, txstore_db was fully compacted between the add + // and index passes, which pushed all O rows into large SST files deep in the LSM + // tree. With only a small block cache, lookup_txos() during indexing then caused + // nearly every key to require a disk read. + // + // By interleaving add() and index() in the same batch, the O rows written by + // add() are still in the write buffer (or nearby L0 SST files) when index() + // calls lookup_txos() — dramatically increasing cache hit rate. + // + // Crash safety: added_blockhashes / indexed_blockhashes are persisted via the + // "D" done-marker rows. On restart, headers_to_process() re-derives which + // blocks still need work, so partially-processed batches are re-processed safely. + let to_process = self.headers_to_process(&new_headers); debug!( - "adding transactions from {} blocks using {:?}", - to_add.len(), + "processing {} blocks (add + index) using {:?}", + to_process.len(), self.from ); let mut fetcher_count = 0; let mut blocks_fetched = 0; - let to_add_total = to_add.len(); - - start_fetcher(self.from, &daemon, to_add)?.map(|blocks| - { - if fetcher_count % 25 == 0 && to_add_total > 20 { - info!("adding txes from blocks {}/{} ({:.1}%)", - blocks_fetched, - to_add_total, - blocks_fetched as f32 / to_add_total as f32 * 100.0 - ); - } - fetcher_count += 1; - blocks_fetched += blocks.len(); + let to_process_total = to_process.len(); + + start_fetcher(self.from, &daemon, to_process)?.map(|blocks| { + if fetcher_count % 25 == 0 && to_process_total > 20 { + info!( + "processing blocks {}/{} ({:.1}%)", + blocks_fetched, + to_process_total, + blocks_fetched as f32 / to_process_total as f32 * 100.0 + ); + } + fetcher_count += 1; + blocks_fetched += blocks.len(); + + // Add blocks not yet in txstore (idempotent: crash recovery skips already-added blocks) + let to_add: Vec<_> = { + let added = self.store.added_blockhashes.read().unwrap(); + blocks + .iter() + .filter(|b| !added.contains(b.entry.hash())) + .cloned() + .collect() + }; + if !to_add.is_empty() { + self.add(&to_add); + } - self.add(&blocks) - }); + // Index blocks not yet in history (O rows for to_add are now in the write buffer) + let to_index: Vec<_> = { + let indexed = self.store.indexed_blockhashes.read().unwrap(); + blocks + .iter() + .filter(|b| !indexed.contains(b.entry.hash())) + .cloned() + .collect() + }; + if !to_index.is_empty() { + self.index(&to_index); + } + }); + // Compact after all add+index work is done, not between passes. self.start_auto_compactions(&self.store.txstore_db); - - // Index new blocks to the history db - let to_index = self.headers_to_index(&new_headers); - debug!( - "indexing history from {} blocks using {:?}", - to_index.len(), - self.from - ); - start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks)); self.start_auto_compactions(&self.store.history_db); self.start_auto_compactions(&self.store.cache_db); From 62d182ac94c2fb201e349387c02121b16ba6cc50 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Mon, 23 Feb 2026 08:01:21 -0800 Subject: [PATCH 13/20] perf(fetch): make initial sync batch size configurable (default 250) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously the bitcoind fetcher hard-coded a 100-block batch size. The single-pass add+index approach relies on O rows from add() still being in the write buffer when index() calls lookup_txos(), so larger batches improve cache hit rate for outputs spent within the same batch window. Add --initial-sync-batch-size (default 250) to Config and thread it through IndexerConfig → start_fetcher → bitcoind_fetcher. The blkfiles path is naturally batched by blk file so is unaffected. --- src/config.rs | 13 +++++++++++++ src/new_index/fetch.rs | 13 +++++++------ src/new_index/schema.rs | 7 +++++-- tests/common.rs | 1 + 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/config.rs b/src/config.rs index 6d291767d..a6bcce7e8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -64,6 +64,12 @@ pub struct Config { /// Larger buffers = fewer flushes (less CPU) but more RAM usage pub db_write_buffer_size_mb: usize, + /// Number of blocks per batch during initial sync (bitcoind fetch mode). + /// Larger batches keep more O rows in the write buffer when index() runs lookup_txos(), + /// improving cache hit rate for outputs spent within the same batch window. + /// Must stay within db_write_buffer_size_mb to avoid mid-batch flushes. + pub initial_sync_batch_size: usize, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -244,6 +250,12 @@ impl Config { .help("RocksDB write buffer size in MB per database. RAM usage = size * max_write_buffers(2) * 3_databases") .takes_value(true) .default_value("256") + ).arg( + Arg::with_name("initial_sync_batch_size") + .long("initial-sync-batch-size") + .help("Number of blocks per batch during initial sync. Larger values keep more txo rows in the write buffer during indexing, improving lookup_txos cache hit rate for recently-created outputs.") + .takes_value(true) + .default_value("250") ).arg( Arg::with_name("zmq_addr") .long("zmq-addr") @@ -483,6 +495,7 @@ impl Config { db_block_cache_mb: value_t_or_exit!(m, "db_block_cache_mb", usize), db_parallelism: value_t_or_exit!(m, "db_parallelism", usize), db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize), + initial_sync_batch_size: value_t_or_exit!(m, "initial_sync_batch_size", usize), zmq_addr, #[cfg(feature = "liquid")] diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 7906fb206..e2cdc36be 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -32,12 +32,12 @@ pub fn start_fetcher( from: FetchFrom, daemon: &Daemon, new_headers: Vec, + batch_size: usize, ) -> Result>> { - let fetcher = match from { - FetchFrom::Bitcoind => bitcoind_fetcher, - FetchFrom::BlkFiles => blkfiles_fetcher, - }; - fetcher(daemon, new_headers) + match from { + FetchFrom::Bitcoind => bitcoind_fetcher(daemon, new_headers, batch_size), + FetchFrom::BlkFiles => blkfiles_fetcher(daemon, new_headers), + } } #[derive(Clone)] @@ -74,6 +74,7 @@ impl Fetcher { fn bitcoind_fetcher( daemon: &Daemon, new_headers: Vec, + batch_size: usize, ) -> Result>> { if let Some(tip) = new_headers.last() { debug!("{:?} ({} left to index)", tip, new_headers.len()); @@ -87,7 +88,7 @@ fn bitcoind_fetcher( let mut fetcher_count = 0; let mut blocks_fetched = 0; let total_blocks_fetched = new_headers.len(); - for entries in new_headers.chunks(100) { + for entries in new_headers.chunks(batch_size) { if fetcher_count % 50 == 0 && total_blocks_fetched >= 50 { info!("fetching blocks {}/{} ({:.1}%)", blocks_fetched, diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 81690b0f1..e9bb073c4 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -205,6 +205,7 @@ struct IndexerConfig { address_search: bool, index_unspendables: bool, network: Network, + block_batch_size: usize, #[cfg(feature = "liquid")] parent_network: crate::chain::BNetwork, } @@ -216,6 +217,7 @@ impl From<&Config> for IndexerConfig { address_search: config.address_search, index_unspendables: config.index_unspendables, network: config.network_type, + block_batch_size: config.initial_sync_batch_size, #[cfg(feature = "liquid")] parent_network: config.parent_network, } @@ -322,7 +324,7 @@ impl Indexer { // Fetch the reorged blocks, then undo their history index db rows. // The txstore db rows are kept for reorged blocks/transactions. - start_fetcher(self.from, &daemon, reorged_headers)? + start_fetcher(self.from, &daemon, reorged_headers, self.iconfig.block_batch_size)? .map(|blocks| self.undo_index(&blocks)); } @@ -351,7 +353,7 @@ impl Indexer { let mut blocks_fetched = 0; let to_process_total = to_process.len(); - start_fetcher(self.from, &daemon, to_process)?.map(|blocks| { + start_fetcher(self.from, &daemon, to_process, self.iconfig.block_batch_size)?.map(|blocks| { if fetcher_count % 25 == 0 && to_process_total > 20 { info!( "processing blocks {}/{} ({:.1}%)", @@ -1929,6 +1931,7 @@ pub mod bench { address_search: false, index_unspendables: false, network: crate::chain::Network::Regtest, + block_batch_size: 250, }; let height = 702861; let hash = block.block_hash(); diff --git a/tests/common.rs b/tests/common.rs index 53970705d..ea240659a 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -120,6 +120,7 @@ impl TestRunner { db_block_cache_mb: 8, db_parallelism: 2, db_write_buffer_size_mb: 256, + initial_sync_batch_size: 250, //#[cfg(feature = "electrum-discovery")] //electrum_public_hosts: Option, //#[cfg(feature = "electrum-discovery")] From 9fb6c81b65cd0b4a51e93b63cec6ef27eae1010c Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Tue, 24 Feb 2026 11:01:38 -0800 Subject: [PATCH 14/20] perf(fetch): reuse rayon thread pool across blk*.dat files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously parse_blocks() created a new rayon::ThreadPool on every invocation — once per blk*.dat file (~2,900 for a full chain). Each construction spawns N OS threads. Lift the pool out of the per-blob closure so it is created once and reused for the entire blkfiles_parser run, avoiding ~47k thread spawns at 16 cores. --- src/new_index/fetch.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index e2cdc36be..a890bdf4c 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -225,9 +225,14 @@ fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher>, magic: u32) -> Fetcher, magic: u32) -> Result> { +fn parse_blocks(pool: &rayon::ThreadPool, blob: Vec, magic: u32) -> Result> { let mut cursor = Cursor::new(&blob); let mut slices = vec![]; let max_pos = blob.len() as u64; @@ -274,11 +279,6 @@ fn parse_blocks(blob: Vec, magic: u32) -> Result> { cursor.set_position(end as u64); } - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(0) // CPU-bound - .thread_name(|i| format!("parse-blocks-{}", i)) - .build() - .unwrap(); Ok(pool.install(|| { slices .into_par_iter() From b3e8bf381dcf17a4fc0df1de20cbd1289c5c7f5c Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Tue, 24 Feb 2026 11:03:34 -0800 Subject: [PATCH 15/20] perf(fetch,index): pre-compute txids in BlockEntry Both add_blocks() and index_blocks() previously called tx.compute_txid() independently, hashing every transaction twice per batch. Compute txids once at BlockEntry construction time (in the fetcher) and store them alongside the block. add_blocks and index_blocks consume the pre-computed slice, and index_transaction receives the txid as a parameter rather than recomputing it. --- src/new_index/fetch.rs | 20 ++++++++++++++------ src/new_index/schema.rs | 18 +++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index a890bdf4c..b6a32b624 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -16,7 +16,7 @@ use std::thread; use electrs_macros::trace; -use crate::chain::{Block, BlockHash}; +use crate::chain::{Block, BlockHash, Txid}; use crate::daemon::Daemon; use crate::errors::*; use crate::util::{spawn_thread, HeaderEntry, SyncChannel}; @@ -45,6 +45,7 @@ pub struct BlockEntry { pub block: Block, pub entry: HeaderEntry, pub size: u32, + pub txids: Vec, } type SizedBlock = (Block, u32); @@ -107,10 +108,14 @@ fn bitcoind_fetcher( let block_entries: Vec = blocks .into_iter() .zip(entries) - .map(|(block, entry)| BlockEntry { - entry: entry.clone(), // TODO: remove this clone() - size: block.total_size() as u32, - block, + .map(|(block, entry)| { + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); + BlockEntry { + entry: entry.clone(), // TODO: remove this clone() + size: block.total_size() as u32, + txids, + block, + } }) .collect(); assert_eq!(block_entries.len(), entries.len()); @@ -157,7 +162,10 @@ fn blkfiles_fetcher( let blockhash = block.block_hash(); entry_map .remove(&blockhash) - .map(|entry| BlockEntry { block, entry, size }) + .map(|entry| { + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); + BlockEntry { block, entry, size, txids } + }) .or_else(|| { trace!("skipping block {}", blockhash); None diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index e9bb073c4..4352e8b66 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -1215,15 +1215,15 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec = b.block.txdata.iter().map(|tx| tx.compute_txid()).collect(); - for (tx, txid) in b.block.txdata.iter().zip(txids.iter()) { + for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) { add_transaction(*txid, tx, &mut rows, iconfig); } if !iconfig.light_mode { - rows.push(BlockRow::new_txids(blockhash, &txids).into_row()); + rows.push(BlockRow::new_txids(blockhash, &b.txids).into_row()); rows.push(BlockRow::new_meta(blockhash, &BlockMeta::from(b)).into_row()); } @@ -1313,10 +1313,12 @@ fn index_blocks( block_entries .par_iter() // serialization is CPU-intensive .map(|b| { + assert_eq!(b.txids.len(), b.block.txdata.len()); let mut rows = vec![]; - for tx in &b.block.txdata { - let height = b.entry.height() as u32; - index_transaction(tx, height, previous_txos_map, &mut rows, iconfig); + let height = b.entry.height() as u32; + for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) { + let txid = full_hash(&txid[..]); + index_transaction(tx, txid, height, previous_txos_map, &mut rows, iconfig); } rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed" rows @@ -1328,12 +1330,12 @@ fn index_blocks( // TODO: return an iterator? fn index_transaction( tx: &Transaction, + txid: FullHash, confirmed_height: u32, previous_txos_map: &HashMap, rows: &mut Vec, iconfig: &IndexerConfig, ) { - let txid = full_hash(&tx.compute_txid()[..]); // persist tx confirmation row: // C{txid} → "{block_height}" @@ -1936,7 +1938,9 @@ pub mod bench { let height = 702861; let hash = block.block_hash(); let header = block.header.clone(); + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); let block_entry = BlockEntry { + txids, block, entry: HeaderEntry::new(height, hash, header), size: 0u32, // wrong but not needed for benching From 97b2d79e467529a1559a115b6e5f5d32485e17b1 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Thu, 5 Mar 2026 07:41:42 -0800 Subject: [PATCH 16/20] perf(fetch): blkfiles pipeline lookahead Increase SyncChannel capacity from 1 to 2 in all three blkfiles stages (reader, parser, fetcher). Each stage can now stay one batch ahead of the next, overlapping sequential disk I/O (reader) with CPU deserialization (parser) and block-entry construction (fetcher). The bitcoind fetcher stays at 1 since its batches can be hundreds of MB. --- src/new_index/fetch.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index b6a32b624..08297d973 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -137,7 +137,9 @@ fn blkfiles_fetcher( let blk_files = daemon.list_blk_files()?; let xor_key = daemon.read_blk_file_xor_key()?; - let chan = SyncChannel::new(1); + // Buffer of 2 lets the parser produce one batch ahead of the consumer, + // overlapping block-entry construction with the indexer. + let chan = SyncChannel::new(2); let sender = chan.sender(); let mut entry_map: HashMap = @@ -189,7 +191,9 @@ fn blkfiles_fetcher( #[trace] fn blkfiles_reader(blk_files: Vec, xor_key: Option<[u8; 8]>) -> Fetcher> { - let chan = SyncChannel::new(1); + // Buffer of 2 lets the reader read ahead by one blk file while the parser + // is working, overlapping sequential disk I/O with CPU deserialization. + let chan = SyncChannel::new(2); let sender = chan.sender(); Fetcher::from( @@ -227,7 +231,8 @@ fn blkfile_apply_xor_key(xor_key: [u8; 8], blob: &mut [u8]) { #[trace] fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher> { - let chan = SyncChannel::new(1); + // Buffer of 2 lets the parser stay one batch ahead of the fetcher stage. + let chan = SyncChannel::new(2); let sender = chan.sender(); Fetcher::from( From 04795afe964a095266e9f397591c8c0146566a30 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Wed, 4 Mar 2026 16:00:01 -0800 Subject: [PATCH 17/20] feat(metrics): add batch_total timer to index_duration histogram Add a batch_total label to the existing index_duration histogram that covers the full wall-clock time of each add+index cycle. This makes it easy to see per-batch throughput at a glance without summing the individual step histograms (add_process, add_write, index_lookup, index_process). Only recorded for batches where actual work is done (blocks not yet added or indexed), so no-op iterations do not skew the distribution. --- src/new_index/schema.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 4352e8b66..86d8f65d0 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -374,9 +374,6 @@ impl Indexer { .cloned() .collect() }; - if !to_add.is_empty() { - self.add(&to_add); - } // Index blocks not yet in history (O rows for to_add are now in the write buffer) let to_index: Vec<_> = { @@ -387,8 +384,15 @@ impl Indexer { .cloned() .collect() }; - if !to_index.is_empty() { - self.index(&to_index); + + if !to_add.is_empty() || !to_index.is_empty() { + let _batch_timer = self.start_timer("batch_total"); + if !to_add.is_empty() { + self.add(&to_add); + } + if !to_index.is_empty() { + self.index(&to_index); + } } }); From 2251fb5b8d090e46c674c4de18df67f39ef3de44 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Wed, 4 Mar 2026 22:02:20 -0800 Subject: [PATCH 18/20] feat(metrics): export per-level SST file counts to Prometheus Add rocksdb_num_files_at_level gauge with 'db' and 'level' labels, covering all 7 RocksDB compaction levels (L0-L6). --- src/new_index/db.rs | 13 +++++++++++++ src/new_index/db_metrics.rs | 5 +++++ 2 files changed, 18 insertions(+) diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 88af7fe52..e4c51f72d 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -14,6 +14,8 @@ use crate::util::{bincode, spawn_thread, Bytes}; static DB_VERSION: u32 = 3; +const ROCKSDB_NUM_LEVELS: u32 = 7; + #[derive(Debug, Eq, PartialEq)] pub struct DBRow { pub key: Vec, @@ -389,7 +391,9 @@ impl DB { pub fn start_stats_exporter(&self, db_metrics: Arc, db_name: &str) { let db_arc = Arc::clone(&self.db); + let db_arc2 = Arc::clone(&self.db); let label = db_name.to_string(); + let label2 = label.clone(); let update_gauge = move |gauge: &GaugeVec, property: &str| { if let Ok(Some(value)) = db_arc.property_value(property) { @@ -433,6 +437,15 @@ impl DB { update_gauge(&db_metrics.block_cache_capacity, "rocksdb.block-cache-capacity"); update_gauge(&db_metrics.block_cache_usage, "rocksdb.block-cache-usage"); update_gauge(&db_metrics.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); + for level in 0..ROCKSDB_NUM_LEVELS { + let prop = format!("rocksdb.num-files-at-level{}", level); + if let Ok(Some(value)) = db_arc2.property_value(&prop) { + if let Ok(v) = value.parse::() { + let level_str = level.to_string(); + db_metrics.num_files_at_level.with_label_values(&[&label2, &level_str]).set(v); + } + } + } thread::sleep(Duration::from_secs(5)); }); } diff --git a/src/new_index/db_metrics.rs b/src/new_index/db_metrics.rs index e8df0db43..5e19439dc 100644 --- a/src/new_index/db_metrics.rs +++ b/src/new_index/db_metrics.rs @@ -49,6 +49,7 @@ pub struct RocksDbMetrics { // Level metrics pub base_level: GaugeVec, + pub num_files_at_level: GaugeVec, // Write metrics pub actual_delayed_write_rate: GaugeVec, @@ -204,6 +205,10 @@ impl RocksDbMetrics { format!("rocksdb_base_level"), "Base level for compaction." ), labels), + num_files_at_level: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_files_at_level", + "Number of SST files at each compaction level." + ), &["db", "level"]), // Write metrics actual_delayed_write_rate: metrics.gauge_vec(MetricOpts::new( From 553340010a9684f406771afeafd3fd718fe52c0d Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Fri, 6 Mar 2026 21:25:12 -0800 Subject: [PATCH 19/20] refactor(logging): show block height instead of batch count in progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change fetcher and indexer progress logging from "blocks fetched / total blocks" to "current height / chain tip height". This is more meaningful during initial sync — users can immediately see how far along the chain the sync has progressed rather than counting opaque batch numbers. --- src/new_index/fetch.rs | 13 +++++++------ src/new_index/schema.rs | 14 +++++++------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 08297d973..53135f132 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -33,9 +33,10 @@ pub fn start_fetcher( daemon: &Daemon, new_headers: Vec, batch_size: usize, + chain_tip_height: usize, ) -> Result>> { match from { - FetchFrom::Bitcoind => bitcoind_fetcher(daemon, new_headers, batch_size), + FetchFrom::Bitcoind => bitcoind_fetcher(daemon, new_headers, batch_size, chain_tip_height), FetchFrom::BlkFiles => blkfiles_fetcher(daemon, new_headers), } } @@ -76,6 +77,7 @@ fn bitcoind_fetcher( daemon: &Daemon, new_headers: Vec, batch_size: usize, + chain_tip_height: usize, ) -> Result>> { if let Some(tip) = new_headers.last() { debug!("{:?} ({} left to index)", tip, new_headers.len()); @@ -87,18 +89,17 @@ fn bitcoind_fetcher( chan.into_receiver(), spawn_thread("bitcoind_fetcher", move || { let mut fetcher_count = 0; - let mut blocks_fetched = 0; let total_blocks_fetched = new_headers.len(); for entries in new_headers.chunks(batch_size) { if fetcher_count % 50 == 0 && total_blocks_fetched >= 50 { + let batch_height = entries.last().map(|e| e.height()).unwrap_or(0); info!("fetching blocks {}/{} ({:.1}%)", - blocks_fetched, - total_blocks_fetched, - blocks_fetched as f32 / total_blocks_fetched as f32 * 100.0 + batch_height, + chain_tip_height, + batch_height as f32 / chain_tip_height.max(1) as f32 * 100.0 ); } fetcher_count += 1; - blocks_fetched += entries.len(); let blockhashes: Vec = entries.iter().map(|e| *e.hash()).collect(); let blocks = daemon diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 86d8f65d0..f120d7130 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -293,6 +293,7 @@ impl Indexer { let tip = daemon.getbestblockhash()?; let (new_headers, reorged_since) = self.get_new_headers(&daemon, &tip)?; + let chain_tip_height = new_headers.last().map(|h| h.height()).unwrap_or(0); // Handle reorgs by undoing the reorged (stale) blocks first if let Some(reorged_since) = reorged_since { @@ -324,7 +325,7 @@ impl Indexer { // Fetch the reorged blocks, then undo their history index db rows. // The txstore db rows are kept for reorged blocks/transactions. - start_fetcher(self.from, &daemon, reorged_headers, self.iconfig.block_batch_size)? + start_fetcher(self.from, &daemon, reorged_headers, self.iconfig.block_batch_size, chain_tip_height)? .map(|blocks| self.undo_index(&blocks)); } @@ -350,20 +351,19 @@ impl Indexer { ); let mut fetcher_count = 0; - let mut blocks_fetched = 0; let to_process_total = to_process.len(); - start_fetcher(self.from, &daemon, to_process, self.iconfig.block_batch_size)?.map(|blocks| { + start_fetcher(self.from, &daemon, to_process, self.iconfig.block_batch_size, chain_tip_height)?.map(|blocks| { if fetcher_count % 25 == 0 && to_process_total > 20 { + let batch_height = blocks.last().map(|b| b.entry.height()).unwrap_or(0); info!( "processing blocks {}/{} ({:.1}%)", - blocks_fetched, - to_process_total, - blocks_fetched as f32 / to_process_total as f32 * 100.0 + batch_height, + chain_tip_height, + batch_height as f32 / chain_tip_height.max(1) as f32 * 100.0 ); } fetcher_count += 1; - blocks_fetched += blocks.len(); // Add blocks not yet in txstore (idempotent: crash recovery skips already-added blocks) let to_add: Vec<_> = { From e9f56c379d54ca705f7574c7dac06b6d00ee8ff9 Mon Sep 17 00:00:00 2001 From: Philippe McLean Date: Fri, 6 Mar 2026 21:25:51 -0800 Subject: [PATCH 20/20] feat(metrics): export initial sync height as Prometheus gauge Add initial_sync_height and initial_sync_progress_pct gauges, updated after every batch so sync progress is visible in Prometheus without waiting for the full update_blocks() call to complete. --- src/metrics.rs | 6 ++++++ src/new_index/schema.rs | 17 +++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/src/metrics.rs b/src/metrics.rs index 8a4c188d1..e0c56ac75 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -48,6 +48,12 @@ impl Metrics { g } + pub fn float_gauge(&self, opts: prometheus::Opts) -> prometheus::Gauge { + let g = prometheus::Gauge::with_opts(opts).unwrap(); + self.reg.register(Box::new(g.clone())).unwrap(); + g + } + pub fn gauge_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> GaugeVec { let g = GaugeVec::new(opts, labels).unwrap(); self.reg.register(Box::new(g.clone())).unwrap(); diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index f120d7130..c1ba71980 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -198,6 +198,8 @@ pub struct Indexer { iconfig: IndexerConfig, duration: HistogramVec, tip_metric: Gauge, + sync_height: Gauge, + sync_progress: prometheus::Gauge, } struct IndexerConfig { @@ -245,6 +247,14 @@ impl Indexer { &["step"], ), tip_metric: metrics.gauge(MetricOpts::new("tip_height", "Current chain tip height")), + sync_height: metrics.gauge(MetricOpts::new( + "initial_sync_height", + "Height of the last block batch completed during initial sync", + )), + sync_progress: metrics.float_gauge(MetricOpts::new( + "initial_sync_progress_pct", + "Initial sync progress as a percentage of the best known chain height", + )), } } @@ -394,6 +404,13 @@ impl Indexer { self.index(&to_index); } } + if let Some(last) = blocks.last() { + let h = last.entry.height(); + self.sync_height.set(h as i64); + if chain_tip_height > 0 { + self.sync_progress.set(h as f64 / chain_tip_height as f64 * 100.0); + } + } }); // Compact after all add+index work is done, not between passes.