diff --git a/src/config.rs b/src/config.rs index d23128e91..a6bcce7e8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,17 +43,14 @@ pub struct Config { pub rpc_logging: RpcLogging, pub zmq_addr: Option, - /// Enable compaction during initial sync - /// - /// By default compaction is off until initial sync is finished for performance reasons, - /// however, this requires much more disk space. - pub initial_sync_compaction: bool, - /// RocksDB block cache size in MB (per database) - /// Caches decompressed blocks in memory to avoid repeated decompression (CPU intensive) + /// Caches decompressed data blocks, plus index and filter blocks (via cache_index_and_filter_blocks). /// Total memory usage = cache_size * 3_databases (txstore, history, cache) - /// Recommendation: Start with 1024MB for production - /// Higher values reduce CPU load from cache misses but use more RAM + /// Recommendation: 1024 MB for steady-state; 4096 MB+ for initial sync (L0 SST + /// files accumulate up to the compaction trigger — their index, filter (Bloom), + /// and data blocks must fit in this cache). With 10 bits/key bloom filters and + /// a 512 MB write buffer, each L0 file's filter block is ~9.75 MB, so 64 L0 + /// files need ~625 MB of filter blocks on top of index blocks. pub db_block_cache_mb: usize, /// RocksDB parallelism level (background compaction and flush threads) @@ -67,6 +64,12 @@ pub struct Config { /// Larger buffers = fewer flushes (less CPU) but more RAM usage pub db_write_buffer_size_mb: usize, + /// Number of blocks per batch during initial sync (bitcoind fetch mode). + /// Larger batches keep more O rows in the write buffer when index() runs lookup_txos(), + /// improving cache hit rate for outputs spent within the same batch window. + /// Must stay within db_write_buffer_size_mb to avoid mid-batch flushes. + pub initial_sync_batch_size: usize, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -229,14 +232,10 @@ impl Config { .long("anonymize-json-rpc-logging-source-ip") .help("enables ip anonymization in rpc logs") .takes_value(false) - ).arg( - Arg::with_name("initial_sync_compaction") - .long("initial-sync-compaction") - .help("Perform compaction during initial sync (slower but less disk space required)") ).arg( Arg::with_name("db_block_cache_mb") .long("db-block-cache-mb") - .help("RocksDB block cache size in MB per database") + .help("RocksDB block cache size in MB per database. Bounds index/filter block memory; use 4096+ for initial sync to avoid table-reader heap growth.") .takes_value(true) .default_value("8") ).arg( @@ -251,6 +250,12 @@ impl Config { .help("RocksDB write buffer size in MB per database. RAM usage = size * max_write_buffers(2) * 3_databases") .takes_value(true) .default_value("256") + ).arg( + Arg::with_name("initial_sync_batch_size") + .long("initial-sync-batch-size") + .help("Number of blocks per batch during initial sync. Larger values keep more txo rows in the write buffer during indexing, improving lookup_txos cache hit rate for recently-created outputs.") + .takes_value(true) + .default_value("250") ).arg( Arg::with_name("zmq_addr") .long("zmq-addr") @@ -487,10 +492,10 @@ impl Config { index_unspendables: m.is_present("index_unspendables"), cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), - initial_sync_compaction: m.is_present("initial_sync_compaction"), db_block_cache_mb: value_t_or_exit!(m, "db_block_cache_mb", usize), db_parallelism: value_t_or_exit!(m, "db_parallelism", usize), db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize), + initial_sync_batch_size: value_t_or_exit!(m, "initial_sync_batch_size", usize), zmq_addr, #[cfg(feature = "liquid")] diff --git a/src/metrics.rs b/src/metrics.rs index 8a4c188d1..e0c56ac75 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -48,6 +48,12 @@ impl Metrics { g } + pub fn float_gauge(&self, opts: prometheus::Opts) -> prometheus::Gauge { + let g = prometheus::Gauge::with_opts(opts).unwrap(); + self.reg.register(Box::new(g.clone())).unwrap(); + g + } + pub fn gauge_vec(&self, opts: prometheus::Opts, labels: &[&str]) -> GaugeVec { let g = GaugeVec::new(opts, labels).unwrap(); self.reg.register(Box::new(g.clone())).unwrap(); diff --git a/src/new_index/db.rs b/src/new_index/db.rs index c93ac1e12..e4c51f72d 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -1,4 +1,5 @@ use prometheus::GaugeVec; +use rayon::prelude::*; use rocksdb; use std::convert::TryInto; @@ -13,6 +14,8 @@ use crate::util::{bincode, spawn_thread, Bytes}; static DB_VERSION: u32 = 3; +const ROCKSDB_NUM_LEVELS: u32 = 7; + #[derive(Debug, Eq, PartialEq)] pub struct DBRow { pub key: Vec, @@ -93,9 +96,30 @@ impl DB { db_opts.create_if_missing(true); db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); + db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + db_opts.set_bottommost_compression_type(rocksdb::DBCompressionType::Zstd); db_opts.set_target_file_size_base(1_073_741_824); - db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load + // Bulk-load compaction: allow L0 files to accumulate to a bounded limit + // before compacting. This reduces write amplification compared to the + // default trigger of 4, while keeping the file count — and therefore + // bloom-filter memory and lookup cost — bounded. + // + // With bloom filters at 10 bits/key and a 512 MB write buffer, each L0 + // file has ~7.8 M keys, so its filter block is ~9.75 MB. At 64 files + // that is ~625 MB of pinned filter blocks — well within an 8 GB cache. + // Each lookup checks 64 bloom filters (fast, in-memory) and reads from + // only ~0.64 files on average (1 % false-positive rate × 64 files). + // + // Set slowdown/stop triggers well above the compaction trigger so writes + // are never stalled while background compaction catches up. + // Disable the pending-compaction-bytes stall so the large backlog that + // builds up during the bulk load does not block writes. + const L0_BULK_TRIGGER: i32 = 64; + db_opts.set_level_zero_file_num_compaction_trigger(L0_BULK_TRIGGER); + db_opts.set_level_zero_slowdown_writes_trigger(L0_BULK_TRIGGER * 4); + db_opts.set_level_zero_stop_writes_trigger(L0_BULK_TRIGGER * 8); + db_opts.set_hard_pending_compaction_bytes_limit(0); + db_opts.set_soft_pending_compaction_bytes_limit(0); let parallelism: i32 = config.db_parallelism.try_into() @@ -107,13 +131,61 @@ impl DB { // Configure write buffer size (not set by increase_parallelism) db_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); - // db_opts.set_advise_random_on_open(???); - db_opts.set_compaction_readahead_size(1 << 20); + // 4 MiB readahead for compaction I/O. Larger than the previous 1 MiB to better + // amortise syscall overhead when reading the many L0 files accumulated during + // initial sync. + db_opts.set_compaction_readahead_size(4 << 20); + + // Background-sync SST files to the OS incrementally as they are written, + // rather than doing a large fsync on close. Smooths out I/O latency spikes. + db_opts.set_bytes_per_sync(1 << 20); + + // Parallelize sub-ranges within a single compaction job (including the one-time + // full_compaction at the end of initial sync). Without this, compact_range() is + // single-threaded regardless of increase_parallelism(). Setting it equal to the + // parallelism level keeps all background threads busy during the final compaction. + db_opts.set_max_subcompactions(parallelism as u32); - // Configure block cache + // Configure block cache and table options let mut block_opts = rocksdb::BlockBasedOptions::default(); let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(cache_size_bytes)); + // Store index and filter blocks inside the block cache so their memory is + // bounded by --db-block-cache-mb. Without this, RocksDB allocates table-reader + // memory (index + filter blocks) on the heap separately for every open SST file. + // During initial sync, L0 files accumulate up to the compaction trigger (64 by + // default) and this unbounded heap allocation can grow to many GB. + // Note: increase --db-block-cache-mb proportionally (e.g. 4096) so the cache is + // large enough to hold the working set of filter/index blocks without thrashing. + block_opts.set_cache_index_and_filter_blocks(true); + // Pin L0 index and filter blocks in the cache so they are never evicted. + // Without this, data block churn evicts L0 index/filter blocks, causing + // repeated disk reads for every SST lookup — worse than the old heap approach. + // With this, L0 index/filter blocks behave like the old table-reader heap + // allocation but stay within the bounded block cache. + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + // Bloom filters allow multi_get() to skip SST files that don't contain a key + // without touching the index or data blocks. Without this, every point lookup + // must binary-search the index of every L0 file whose key range overlaps the + // query (all of them for random txids) — extremely expensive with 1000+ L0 + // files accumulated during initial sync. At 10 bits/key the false-positive + // rate is ~1%, so only ~10 out of 1000 L0 files need actual I/O per key. + // Combined with the prefix extractor below, these become prefix Bloom filters + // keyed on `code || hash` (33 bytes), which also allow prefix range scans + // (e.g. history lookups) to skip L0 files entirely. The filter blocks are + // cached and pinned alongside the index blocks via the settings above. + block_opts.set_bloom_filter(10.0, false); + + // All electrs keys share the structure `code (1 byte) || hash (32 bytes) || ...`. + // A 33-byte fixed prefix extractor enables prefix Bloom filters: range scans + // like iter_scan("H" + scripthash) can skip SST files whose Bloom filter + // doesn't match the prefix, rather than checking every L0 file. + // + // INVARIANT: All iter_scan* and raw_iterator methods must use total_order_seek + // when the seek key may be shorter than 33 bytes. Without it, RocksDB silently + // skips SST files that contain matching keys. See the conditional in iter_scan(). + db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); + db_opts.set_block_based_table_factory(&block_opts); let db = DB { @@ -126,34 +198,80 @@ impl DB { } pub fn full_compaction(&self) { - // TODO: make sure this doesn't fail silently info!("starting full compaction on {:?}", self.db); - self.db.compact_range(None::<&[u8]>, None::<&[u8]>); - info!("finished full compaction on {:?}", self.db); + let start = std::time::Instant::now(); + let mut opts = rocksdb::CompactOptions::default(); + opts.set_bottommost_level_compaction(rocksdb::BottommostLevelCompaction::Force); + self.db.compact_range_opt(None::<&[u8]>, None::<&[u8]>, &opts); + let elapsed = start.elapsed(); + info!("finished full compaction on {:?} in elapsed='{:.1?}'", self.db, elapsed); } pub fn enable_auto_compaction(&self) { - let opts = [("disable_auto_compactions", "false")]; + // Reset L0 triggers and pending-compaction stall thresholds to RocksDB + // defaults, so that steady-state operation compacts promptly and avoids + // unbounded compaction backlogs that cause read latency spikes. + // RocksDB defaults (stable since v5.x through v10.4.2). Hardcoded because + // set_options() doesn't return previous values and the Rust bindings lack getters. + + let soft_limit = (64u64 << 30).to_string(); // 64 GiB + let hard_limit = (256u64 << 30).to_string(); // 256 GiB + + let opts = [ + ("disable_auto_compactions", "false"), + ("level0_file_num_compaction_trigger", "4"), + ("level0_slowdown_writes_trigger", "20"), + ("level0_stop_writes_trigger", "36"), + ("soft_pending_compaction_bytes_limit", &soft_limit), + ("hard_pending_compaction_bytes_limit", &hard_limit), + ]; self.db.set_options(&opts).unwrap(); } pub fn raw_iterator(&self) -> rocksdb::DBRawIterator<'_> { - self.db.raw_iterator() + let mut opts = rocksdb::ReadOptions::default(); + opts.set_total_order_seek(true); + self.db.raw_iterator_opt(opts) } pub fn iter_scan(&self, prefix: &[u8]) -> ScanIterator<'_> { + let iter = if prefix.len() >= 33 { + self.db.prefix_iterator(prefix) + } else { + // Short prefixes (e.g. b"B", b"D") are below the 33-byte prefix extractor + // length. prefix_iterator would silently skip SST files. Use total_order_seek + // to fall back to a full scan; ScanIterator enforces the prefix boundary. + let mut opts = rocksdb::ReadOptions::default(); + opts.set_total_order_seek(true); + self.db.iterator_opt( + rocksdb::IteratorMode::From(prefix, rocksdb::Direction::Forward), + opts, + ) + }; ScanIterator { prefix: prefix.to_vec(), - iter: self.db.prefix_iterator(prefix), + iter, done: false, } } pub fn iter_scan_from(&self, prefix: &[u8], start_at: &[u8]) -> ScanIterator<'_> { - let iter = self.db.iterator(rocksdb::IteratorMode::From( - start_at, - rocksdb::Direction::Forward, - )); + // start_at is always >= prefix length. When >= 33 bytes, the default seek + // uses the prefix extractor for bloom filtering automatically. When < 33 + // bytes, fall back to total_order_seek to avoid silent misses. + let iter = if start_at.len() >= 33 { + self.db.iterator(rocksdb::IteratorMode::From( + start_at, + rocksdb::Direction::Forward, + )) + } else { + let mut opts = rocksdb::ReadOptions::default(); + opts.set_total_order_seek(true); + self.db.iterator_opt( + rocksdb::IteratorMode::From(start_at, rocksdb::Direction::Forward), + opts, + ) + }; ScanIterator { prefix: prefix.to_vec(), iter, @@ -162,7 +280,10 @@ impl DB { } pub fn iter_scan_reverse(&self, prefix: &[u8], prefix_max: &[u8]) -> ReverseScanIterator<'_> { - let mut iter = self.db.raw_iterator(); + // total_order_seek is required for correctness: prefix mode (the default when a + // prefix extractor is set) is only guaranteed correct for forward iteration. + // SeekForPrev + Prev() in prefix mode can silently miss entries. + let mut iter = self.raw_iterator(); iter.seek_for_prev(prefix_max); ReverseScanIterator { @@ -179,7 +300,7 @@ impl DB { self.db, flush ); - rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + rows.par_sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); for row in rows { batch.put(&row.key, &row.value); @@ -189,7 +310,7 @@ impl DB { pub fn delete_rows(&self, mut rows: Vec, flush: DBFlush) { log::trace!("deleting {} rows from {:?}", rows.len(), self.db,); - rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + rows.par_sort_unstable_by(|a, b| a.key.cmp(&b.key)); let mut batch = rocksdb::WriteBatch::default(); for row in rows { batch.delete(&row.key); @@ -253,9 +374,26 @@ impl DB { } } + #[cfg(test)] + fn open_test(path: &Path) -> DB { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(33)); + + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_bloom_filter(10.0, false); + db_opts.set_block_based_table_factory(&block_opts); + + DB { + db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open test RocksDB")), + } + } + pub fn start_stats_exporter(&self, db_metrics: Arc, db_name: &str) { let db_arc = Arc::clone(&self.db); + let db_arc2 = Arc::clone(&self.db); let label = db_name.to_string(); + let label2 = label.clone(); let update_gauge = move |gauge: &GaugeVec, property: &str| { if let Ok(Some(value)) = db_arc.property_value(property) { @@ -299,7 +437,149 @@ impl DB { update_gauge(&db_metrics.block_cache_capacity, "rocksdb.block-cache-capacity"); update_gauge(&db_metrics.block_cache_usage, "rocksdb.block-cache-usage"); update_gauge(&db_metrics.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); + for level in 0..ROCKSDB_NUM_LEVELS { + let prop = format!("rocksdb.num-files-at-level{}", level); + if let Ok(Some(value)) = db_arc2.property_value(&prop) { + if let Ok(v) = value.parse::() { + let level_str = level.to_string(); + db_metrics.num_files_at_level.with_label_values(&[&label2, &level_str]).set(v); + } + } + } thread::sleep(Duration::from_secs(5)); }); } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a key mimicking electrs row format: 1-byte code + 32-byte hash + optional suffix. + fn make_key(code: u8, hash_byte: u8, suffix: &[u8]) -> Vec { + let mut key = vec![code]; + key.extend_from_slice(&[hash_byte; 32]); + key.extend_from_slice(suffix); + key + } + + fn write_test_rows(db: &DB) { + let rows = vec![ + // B rows (block headers) — scanned with 1-byte prefix b"B" + DBRow { key: make_key(b'B', 0x01, &[]), value: b"header1".to_vec() }, + DBRow { key: make_key(b'B', 0x02, &[]), value: b"header2".to_vec() }, + // D rows (done markers) — scanned with 1-byte prefix b"D" + DBRow { key: make_key(b'D', 0x01, &[]), value: vec![] }, + DBRow { key: make_key(b'D', 0x02, &[]), value: vec![] }, + // H rows (history) — scanned with 33-byte prefix b"H" + scripthash + DBRow { key: make_key(b'H', 0xAA, &[0, 0, 0, 1]), value: vec![] }, + DBRow { key: make_key(b'H', 0xAA, &[0, 0, 0, 2]), value: vec![] }, + DBRow { key: make_key(b'H', 0xBB, &[0, 0, 0, 1]), value: vec![] }, + // O rows (txouts) — looked up by exact key, but scannable by 33-byte prefix + DBRow { key: make_key(b'O', 0xCC, &[0, 1]), value: b"txout1".to_vec() }, + DBRow { key: make_key(b'O', 0xCC, &[0, 2]), value: b"txout2".to_vec() }, + ]; + db.write_rows(rows, DBFlush::Enable); + } + + #[test] + fn test_iter_scan_short_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // 1-byte prefix scan — must find all B rows + let b_rows: Vec = db.iter_scan(b"B").collect(); + assert_eq!(b_rows.len(), 2, "expected 2 B rows, got {}", b_rows.len()); + assert_eq!(b_rows[0].value, b"header1"); + assert_eq!(b_rows[1].value, b"header2"); + + // 1-byte prefix scan — must find all D rows + let d_rows: Vec = db.iter_scan(b"D").collect(); + assert_eq!(d_rows.len(), 2, "expected 2 D rows, got {}", d_rows.len()); + } + + #[test] + fn test_iter_scan_full_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // 33-byte prefix scan — must find only H rows for hash 0xAA + let prefix = make_key(b'H', 0xAA, &[]); + let h_rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(h_rows.len(), 2, "expected 2 H/0xAA rows, got {}", h_rows.len()); + + // 33-byte prefix scan — must find only H rows for hash 0xBB + let prefix = make_key(b'H', 0xBB, &[]); + let h_rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(h_rows.len(), 1, "expected 1 H/0xBB row, got {}", h_rows.len()); + + // 33-byte prefix scan — O rows for hash 0xCC + let prefix = make_key(b'O', 0xCC, &[]); + let o_rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(o_rows.len(), 2, "expected 2 O/0xCC rows, got {}", o_rows.len()); + } + + #[test] + fn test_iter_scan_from_full_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // Scan H/0xAA starting from height 2 + let prefix = make_key(b'H', 0xAA, &[]); + let start = make_key(b'H', 0xAA, &[0, 0, 0, 2]); + let rows: Vec = db.iter_scan_from(&prefix, &start).collect(); + assert_eq!(rows.len(), 1, "expected 1 H/0xAA row from height 2, got {}", rows.len()); + } + + #[test] + fn test_iter_scan_reverse_full_prefix() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // Reverse scan H/0xAA from max + let prefix = make_key(b'H', 0xAA, &[]); + let prefix_max = make_key(b'H', 0xAA, &[0xFF, 0xFF, 0xFF, 0xFF]); + let rows: Vec = db.iter_scan_reverse(&prefix, &prefix_max).collect(); + assert_eq!(rows.len(), 2, "expected 2 H/0xAA rows in reverse, got {}", rows.len()); + // Should be in reverse order + assert!(rows[0].key > rows[1].key, "reverse scan should return descending keys"); + } + + #[test] + fn test_iter_scan_no_cross_prefix_leakage() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + // Scanning for a non-existent prefix returns nothing + let prefix = make_key(b'H', 0xFF, &[]); + let rows: Vec = db.iter_scan(&prefix).collect(); + assert_eq!(rows.len(), 0, "expected 0 rows for non-existent prefix"); + + // Scanning b"B" must not return D, H, or O rows + let b_rows: Vec = db.iter_scan(b"B").collect(); + for row in &b_rows { + assert_eq!(row.key[0], b'B', "B scan returned non-B row"); + } + } + + #[test] + fn test_raw_iterator_sees_all_rows() { + let dir = tempfile::tempdir().unwrap(); + let db = DB::open_test(dir.path()); + write_test_rows(&db); + + let mut iter = db.raw_iterator(); + iter.seek_to_first(); + let mut count = 0; + while iter.valid() { + count += 1; + iter.next(); + } + assert_eq!(count, 9, "expected 9 total rows, got {}", count); + } +} diff --git a/src/new_index/db_metrics.rs b/src/new_index/db_metrics.rs index e8df0db43..5e19439dc 100644 --- a/src/new_index/db_metrics.rs +++ b/src/new_index/db_metrics.rs @@ -49,6 +49,7 @@ pub struct RocksDbMetrics { // Level metrics pub base_level: GaugeVec, + pub num_files_at_level: GaugeVec, // Write metrics pub actual_delayed_write_rate: GaugeVec, @@ -204,6 +205,10 @@ impl RocksDbMetrics { format!("rocksdb_base_level"), "Base level for compaction." ), labels), + num_files_at_level: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_files_at_level", + "Number of SST files at each compaction level." + ), &["db", "level"]), // Write metrics actual_delayed_write_rate: metrics.gauge_vec(MetricOpts::new( diff --git a/src/new_index/fetch.rs b/src/new_index/fetch.rs index 7906fb206..53135f132 100644 --- a/src/new_index/fetch.rs +++ b/src/new_index/fetch.rs @@ -16,7 +16,7 @@ use std::thread; use electrs_macros::trace; -use crate::chain::{Block, BlockHash}; +use crate::chain::{Block, BlockHash, Txid}; use crate::daemon::Daemon; use crate::errors::*; use crate::util::{spawn_thread, HeaderEntry, SyncChannel}; @@ -32,12 +32,13 @@ pub fn start_fetcher( from: FetchFrom, daemon: &Daemon, new_headers: Vec, + batch_size: usize, + chain_tip_height: usize, ) -> Result>> { - let fetcher = match from { - FetchFrom::Bitcoind => bitcoind_fetcher, - FetchFrom::BlkFiles => blkfiles_fetcher, - }; - fetcher(daemon, new_headers) + match from { + FetchFrom::Bitcoind => bitcoind_fetcher(daemon, new_headers, batch_size, chain_tip_height), + FetchFrom::BlkFiles => blkfiles_fetcher(daemon, new_headers), + } } #[derive(Clone)] @@ -45,6 +46,7 @@ pub struct BlockEntry { pub block: Block, pub entry: HeaderEntry, pub size: u32, + pub txids: Vec, } type SizedBlock = (Block, u32); @@ -74,6 +76,8 @@ impl Fetcher { fn bitcoind_fetcher( daemon: &Daemon, new_headers: Vec, + batch_size: usize, + chain_tip_height: usize, ) -> Result>> { if let Some(tip) = new_headers.last() { debug!("{:?} ({} left to index)", tip, new_headers.len()); @@ -85,18 +89,17 @@ fn bitcoind_fetcher( chan.into_receiver(), spawn_thread("bitcoind_fetcher", move || { let mut fetcher_count = 0; - let mut blocks_fetched = 0; let total_blocks_fetched = new_headers.len(); - for entries in new_headers.chunks(100) { + for entries in new_headers.chunks(batch_size) { if fetcher_count % 50 == 0 && total_blocks_fetched >= 50 { + let batch_height = entries.last().map(|e| e.height()).unwrap_or(0); info!("fetching blocks {}/{} ({:.1}%)", - blocks_fetched, - total_blocks_fetched, - blocks_fetched as f32 / total_blocks_fetched as f32 * 100.0 + batch_height, + chain_tip_height, + batch_height as f32 / chain_tip_height.max(1) as f32 * 100.0 ); } fetcher_count += 1; - blocks_fetched += entries.len(); let blockhashes: Vec = entries.iter().map(|e| *e.hash()).collect(); let blocks = daemon @@ -106,10 +109,14 @@ fn bitcoind_fetcher( let block_entries: Vec = blocks .into_iter() .zip(entries) - .map(|(block, entry)| BlockEntry { - entry: entry.clone(), // TODO: remove this clone() - size: block.total_size() as u32, - block, + .map(|(block, entry)| { + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); + BlockEntry { + entry: entry.clone(), // TODO: remove this clone() + size: block.total_size() as u32, + txids, + block, + } }) .collect(); assert_eq!(block_entries.len(), entries.len()); @@ -131,7 +138,9 @@ fn blkfiles_fetcher( let blk_files = daemon.list_blk_files()?; let xor_key = daemon.read_blk_file_xor_key()?; - let chan = SyncChannel::new(1); + // Buffer of 2 lets the parser produce one batch ahead of the consumer, + // overlapping block-entry construction with the indexer. + let chan = SyncChannel::new(2); let sender = chan.sender(); let mut entry_map: HashMap = @@ -156,7 +165,10 @@ fn blkfiles_fetcher( let blockhash = block.block_hash(); entry_map .remove(&blockhash) - .map(|entry| BlockEntry { block, entry, size }) + .map(|entry| { + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); + BlockEntry { block, entry, size, txids } + }) .or_else(|| { trace!("skipping block {}", blockhash); None @@ -180,7 +192,9 @@ fn blkfiles_fetcher( #[trace] fn blkfiles_reader(blk_files: Vec, xor_key: Option<[u8; 8]>) -> Fetcher> { - let chan = SyncChannel::new(1); + // Buffer of 2 lets the reader read ahead by one blk file while the parser + // is working, overlapping sequential disk I/O with CPU deserialization. + let chan = SyncChannel::new(2); let sender = chan.sender(); Fetcher::from( @@ -218,15 +232,21 @@ fn blkfile_apply_xor_key(xor_key: [u8; 8], blob: &mut [u8]) { #[trace] fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher> { - let chan = SyncChannel::new(1); + // Buffer of 2 lets the parser stay one batch ahead of the fetcher stage. + let chan = SyncChannel::new(2); let sender = chan.sender(); Fetcher::from( chan.into_receiver(), spawn_thread("blkfiles_parser", move || { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(0) // CPU-bound + .thread_name(|i| format!("parse-blocks-{}", i)) + .build() + .unwrap(); blobs.map(|blob| { trace!("parsing {} bytes", blob.len()); - let blocks = parse_blocks(blob, magic).expect("failed to parse blk*.dat file"); + let blocks = parse_blocks(&pool, blob, magic).expect("failed to parse blk*.dat file"); sender .send(blocks) .expect("failed to send blocks from blk*.dat file"); @@ -236,7 +256,7 @@ fn blkfiles_parser(blobs: Fetcher>, magic: u32) -> Fetcher, magic: u32) -> Result> { +fn parse_blocks(pool: &rayon::ThreadPool, blob: Vec, magic: u32) -> Result> { let mut cursor = Cursor::new(&blob); let mut slices = vec![]; let max_pos = blob.len() as u64; @@ -273,11 +293,6 @@ fn parse_blocks(blob: Vec, magic: u32) -> Result> { cursor.set_position(end as u64); } - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(0) // CPU-bound - .thread_name(|i| format!("parse-blocks-{}", i)) - .build() - .unwrap(); Ok(pool.install(|| { slices .into_par_iter() diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index cccb8bbc3..c1ba71980 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -198,6 +198,8 @@ pub struct Indexer { iconfig: IndexerConfig, duration: HistogramVec, tip_metric: Gauge, + sync_height: Gauge, + sync_progress: prometheus::Gauge, } struct IndexerConfig { @@ -205,6 +207,7 @@ struct IndexerConfig { address_search: bool, index_unspendables: bool, network: Network, + block_batch_size: usize, #[cfg(feature = "liquid")] parent_network: crate::chain::BNetwork, } @@ -216,6 +219,7 @@ impl From<&Config> for IndexerConfig { address_search: config.address_search, index_unspendables: config.index_unspendables, network: config.network_type, + block_batch_size: config.initial_sync_batch_size, #[cfg(feature = "liquid")] parent_network: config.parent_network, } @@ -243,6 +247,14 @@ impl Indexer { &["step"], ), tip_metric: metrics.gauge(MetricOpts::new("tip_height", "Current chain tip height")), + sync_height: metrics.gauge(MetricOpts::new( + "initial_sync_height", + "Height of the last block batch completed during initial sync", + )), + sync_progress: metrics.float_gauge(MetricOpts::new( + "initial_sync_progress_pct", + "Initial sync progress as a percentage of the best known chain height", + )), } } @@ -250,20 +262,13 @@ impl Indexer { self.duration.with_label_values(&[name]).start_timer() } - fn headers_to_add(&self, new_headers: &[HeaderEntry]) -> Vec { - let added_blockhashes = self.store.added_blockhashes.read().unwrap(); + // Headers that need any work: either not yet added to txstore or not yet indexed to history. + fn headers_to_process(&self, new_headers: &[HeaderEntry]) -> Vec { + let added = self.store.added_blockhashes.read().unwrap(); + let indexed = self.store.indexed_blockhashes.read().unwrap(); new_headers .iter() - .filter(|e| !added_blockhashes.contains(e.hash())) - .cloned() - .collect() - } - - fn headers_to_index(&self, new_headers: &[HeaderEntry]) -> Vec { - let indexed_blockhashes = self.store.indexed_blockhashes.read().unwrap(); - new_headers - .iter() - .filter(|e| !indexed_blockhashes.contains(e.hash())) + .filter(|e| !added.contains(e.hash()) || !indexed.contains(e.hash())) .cloned() .collect() } @@ -298,6 +303,7 @@ impl Indexer { let tip = daemon.getbestblockhash()?; let (new_headers, reorged_since) = self.get_new_headers(&daemon, &tip)?; + let chain_tip_height = new_headers.last().map(|h| h.height()).unwrap_or(0); // Handle reorgs by undoing the reorged (stale) blocks first if let Some(reorged_since) = reorged_since { @@ -329,47 +335,86 @@ impl Indexer { // Fetch the reorged blocks, then undo their history index db rows. // The txstore db rows are kept for reorged blocks/transactions. - start_fetcher(self.from, &daemon, reorged_headers)? + start_fetcher(self.from, &daemon, reorged_headers, self.iconfig.block_batch_size, chain_tip_height)? .map(|blocks| self.undo_index(&blocks)); } - // Add new blocks to the txstore db - let to_add = self.headers_to_add(&new_headers); + // Single-pass: add to txstore and index to history in the same per-batch loop. + // + // In the old two-pass approach, txstore_db was fully compacted between the add + // and index passes, which pushed all O rows into large SST files deep in the LSM + // tree. With only a small block cache, lookup_txos() during indexing then caused + // nearly every key to require a disk read. + // + // By interleaving add() and index() in the same batch, the O rows written by + // add() are still in the write buffer (or nearby L0 SST files) when index() + // calls lookup_txos() — dramatically increasing cache hit rate. + // + // Crash safety: added_blockhashes / indexed_blockhashes are persisted via the + // "D" done-marker rows. On restart, headers_to_process() re-derives which + // blocks still need work, so partially-processed batches are re-processed safely. + let to_process = self.headers_to_process(&new_headers); debug!( - "adding transactions from {} blocks using {:?}", - to_add.len(), + "processing {} blocks (add + index) using {:?}", + to_process.len(), self.from ); let mut fetcher_count = 0; - let mut blocks_fetched = 0; - let to_add_total = to_add.len(); - - start_fetcher(self.from, &daemon, to_add)?.map(|blocks| - { - if fetcher_count % 25 == 0 && to_add_total > 20 { - info!("adding txes from blocks {}/{} ({:.1}%)", - blocks_fetched, - to_add_total, - blocks_fetched as f32 / to_add_total as f32 * 100.0 - ); - } - fetcher_count += 1; - blocks_fetched += blocks.len(); + let to_process_total = to_process.len(); + + start_fetcher(self.from, &daemon, to_process, self.iconfig.block_batch_size, chain_tip_height)?.map(|blocks| { + if fetcher_count % 25 == 0 && to_process_total > 20 { + let batch_height = blocks.last().map(|b| b.entry.height()).unwrap_or(0); + info!( + "processing blocks {}/{} ({:.1}%)", + batch_height, + chain_tip_height, + batch_height as f32 / chain_tip_height.max(1) as f32 * 100.0 + ); + } + fetcher_count += 1; + + // Add blocks not yet in txstore (idempotent: crash recovery skips already-added blocks) + let to_add: Vec<_> = { + let added = self.store.added_blockhashes.read().unwrap(); + blocks + .iter() + .filter(|b| !added.contains(b.entry.hash())) + .cloned() + .collect() + }; - self.add(&blocks) - }); + // Index blocks not yet in history (O rows for to_add are now in the write buffer) + let to_index: Vec<_> = { + let indexed = self.store.indexed_blockhashes.read().unwrap(); + blocks + .iter() + .filter(|b| !indexed.contains(b.entry.hash())) + .cloned() + .collect() + }; - self.start_auto_compactions(&self.store.txstore_db); + if !to_add.is_empty() || !to_index.is_empty() { + let _batch_timer = self.start_timer("batch_total"); + if !to_add.is_empty() { + self.add(&to_add); + } + if !to_index.is_empty() { + self.index(&to_index); + } + } + if let Some(last) = blocks.last() { + let h = last.entry.height(); + self.sync_height.set(h as i64); + if chain_tip_height > 0 { + self.sync_progress.set(h as f64 / chain_tip_height as f64 * 100.0); + } + } + }); - // Index new blocks to the history db - let to_index = self.headers_to_index(&new_headers); - debug!( - "indexing history from {} blocks using {:?}", - to_index.len(), - self.from - ); - start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks)); + // Compact after all add+index work is done, not between passes. + self.start_auto_compactions(&self.store.txstore_db); self.start_auto_compactions(&self.store.history_db); self.start_auto_compactions(&self.store.cache_db); @@ -1191,15 +1236,15 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec = b.block.txdata.iter().map(|tx| tx.compute_txid()).collect(); - for (tx, txid) in b.block.txdata.iter().zip(txids.iter()) { + for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) { add_transaction(*txid, tx, &mut rows, iconfig); } if !iconfig.light_mode { - rows.push(BlockRow::new_txids(blockhash, &txids).into_row()); + rows.push(BlockRow::new_txids(blockhash, &b.txids).into_row()); rows.push(BlockRow::new_meta(blockhash, &BlockMeta::from(b)).into_row()); } @@ -1289,10 +1334,12 @@ fn index_blocks( block_entries .par_iter() // serialization is CPU-intensive .map(|b| { + assert_eq!(b.txids.len(), b.block.txdata.len()); let mut rows = vec![]; - for tx in &b.block.txdata { - let height = b.entry.height() as u32; - index_transaction(tx, height, previous_txos_map, &mut rows, iconfig); + let height = b.entry.height() as u32; + for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) { + let txid = full_hash(&txid[..]); + index_transaction(tx, txid, height, previous_txos_map, &mut rows, iconfig); } rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed" rows @@ -1304,12 +1351,12 @@ fn index_blocks( // TODO: return an iterator? fn index_transaction( tx: &Transaction, + txid: FullHash, confirmed_height: u32, previous_txos_map: &HashMap, rows: &mut Vec, iconfig: &IndexerConfig, ) { - let txid = full_hash(&tx.compute_txid()[..]); // persist tx confirmation row: // C{txid} → "{block_height}" @@ -1907,11 +1954,14 @@ pub mod bench { address_search: false, index_unspendables: false, network: crate::chain::Network::Regtest, + block_batch_size: 250, }; let height = 702861; let hash = block.block_hash(); let header = block.header.clone(); + let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect(); let block_entry = BlockEntry { + txids, block, entry: HeaderEntry::new(height, hash, header), size: 0u32, // wrong but not needed for benching diff --git a/tests/common.rs b/tests/common.rs index 40e542d28..ea240659a 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -117,10 +117,10 @@ impl TestRunner { asset_db_path: None, // XXX #[cfg(feature = "liquid")] parent_network: bitcoin::Network::Regtest, - initial_sync_compaction: false, db_block_cache_mb: 8, db_parallelism: 2, db_write_buffer_size_mb: 256, + initial_sync_batch_size: 250, //#[cfg(feature = "electrum-discovery")] //electrum_public_hosts: Option, //#[cfg(feature = "electrum-discovery")]