From c6ef5f6b88809b446b035afd1baeb5719a9d3de6 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 12 Dec 2025 16:34:40 -0800 Subject: [PATCH] store: Refactor the herd cache for recent blocks lookups --- store/postgres/src/chain_store.rs | 124 ++++++++++++++---------------- 1 file changed, 56 insertions(+), 68 deletions(-) diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index eadde677f96..8ea6ebb8067 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -7,7 +7,6 @@ use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl}; use graph::components::store::ChainHeadStore; use graph::data::store::ethereum::call; -use graph::derive::CheapClone; use graph::env::ENV_VARS; use graph::parking_lot::RwLock; use graph::prelude::MetricsRegistry; @@ -17,6 +16,7 @@ use graph::stable_hash::crypto_stable_hash; use graph::util::herd_cache::HerdCache; use std::collections::BTreeMap; +use std::future::Future; use std::{ collections::HashMap, convert::{TryFrom, TryInto}, @@ -1968,13 +1968,6 @@ impl ChainStoreMetrics { } } -#[derive(Clone, CheapClone)] -enum BlocksLookupResult { - ByHash(Arc, StoreError>>), - ByNumber(Arc>, StoreError>>), - Ancestor(Arc, StoreError>>), -} - pub struct ChainStore { logger: Logger, pool: ConnectionPool, @@ -1989,7 +1982,11 @@ pub struct ChainStore { // with the database and to correctly implement invalidation. So, a // conservative approach is acceptable. recent_blocks_cache: RecentBlocksCache, - lookup_herd: HerdCache, + // Typed herd caches to avoid thundering herd on concurrent lookups + blocks_by_hash_cache: HerdCache, StoreError>>>, + blocks_by_number_cache: + HerdCache>, StoreError>>>, + ancestor_cache: HerdCache, StoreError>>>, } impl ChainStore { @@ -2005,7 +2002,9 @@ impl ChainStore { ) -> Self { let recent_blocks_cache = RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics); - let lookup_herd = HerdCache::new(format!("chain_{}_herd_cache", chain)); + let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain)); + let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain)); + let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain)); ChainStore { logger, pool, @@ -2014,7 +2013,9 @@ impl ChainStore { status, chain_head_update_sender, recent_blocks_cache, - lookup_herd, + blocks_by_hash_cache, + blocks_by_number_cache, + ancestor_cache, } } @@ -2022,6 +2023,26 @@ impl ChainStore { matches!(self.status, ChainStatus::Ingestible) } + /// Execute a cached query, avoiding thundering herd for identical requests. + /// Returns `(result, was_cached)`. + async fn cached_lookup( + &self, + cache: &HerdCache>>, + key: &K, + lookup: F, + ) -> (Result, bool) + where + K: graph::stable_hash::StableHash, + T: Clone + Send + 'static, + F: Future> + Send + 'static, + { + let hash = crypto_stable_hash(key); + let lookup_fut = async move { Arc::new(lookup.await) }; + let (arc_result, cached) = cache.cached_query(hash, lookup_fut, &self.logger).await; + let result = Arc::try_unwrap(arc_result).unwrap_or_else(|arc| (*arc).clone()); + (result, cached) + } + pub(crate) async fn create(&self, ident: &ChainIdentifier) -> Result<(), Error> { use public::ethereum_networks::dsl::*; @@ -2489,19 +2510,13 @@ impl ChainStoreTrait for ChainStore { .cloned() .collect::>(); - let hash = crypto_stable_hash(&missing_numbers); let this = self.clone(); - let lookup_fut = async move { - let res = this.blocks_from_store_by_numbers(missing_numbers).await; - BlocksLookupResult::ByNumber(Arc::new(res)) - }; - let lookup_herd = self.lookup_herd.cheap_clone(); - let logger = self.logger.cheap_clone(); - let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { - (BlocksLookupResult::ByNumber(res), _) => res, - _ => unreachable!(), - }; - let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone()); + let missing_clone = missing_numbers.clone(); + let (res, _) = self + .cached_lookup(&self.blocks_by_number_cache, &missing_numbers, async move { + this.blocks_from_store_by_numbers(missing_clone).await + }) + .await; match res { Ok(blocks) => { @@ -2575,31 +2590,14 @@ impl ChainStoreTrait for ChainStore { // the database for one block hash, `h3`, is not much faster // than looking up `[h1, h3]` though it would require less // IO bandwidth - let hash = crypto_stable_hash(&hashes); let this = self.clone(); - let lookup_fut = async move { - let res = this.blocks_from_store(hashes).await; - BlocksLookupResult::ByHash(Arc::new(res)) - }; - let lookup_herd = self.lookup_herd.cheap_clone(); - let logger = self.logger.cheap_clone(); - // This match can only return ByHash because lookup_fut explicitly constructs - // BlocksLookupResult::ByHash. The cache preserves the exact future result, - // so ByNumber variant is structurally impossible here. - let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { - (BlocksLookupResult::ByHash(res), _) => res, - (BlocksLookupResult::ByNumber(_) | BlocksLookupResult::Ancestor(_), _) => { - Arc::new(Err(StoreError::Unknown(anyhow::anyhow!( - "Unexpected BlocksLookupResult variant returned from cached block lookup by hash" - )))) - } - }; + let hashes_clone = hashes.clone(); + let (res, _) = self + .cached_lookup(&self.blocks_by_hash_cache, &hashes, async move { + this.blocks_from_store(hashes_clone).await + }) + .await; - // Try to avoid cloning a non-concurrent lookup; it's not - // entirely clear whether that will actually avoid a clone - // since it depends on a lot of the details of how the - // `HerdCache` is implemented - let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone()); let stored = match res { Ok(blocks) => { for block in &blocks { @@ -2640,14 +2638,16 @@ impl ChainStoreTrait for ChainStore { // request the same ancestor block simultaneously. The cache check // is inside the future so that only one caller checks and populates // the cache. - let hash = crypto_stable_hash(&(&block_ptr, offset, &root)); + let key = (&block_ptr, offset, &root); let this = self.cheap_clone(); - let lookup_fut = async move { - let res: Result, StoreError> = async { + let block_ptr_clone = block_ptr.clone(); + let root_clone = root.clone(); + let (res, cached) = self + .cached_lookup(&self.ancestor_cache, &key, async move { // Check the local cache first. let block_cache = this .recent_blocks_cache - .get_ancestor(&block_ptr, offset) + .get_ancestor(&block_ptr_clone, offset) .and_then(|x| Some(x.0).zip(x.1)); if let Some((ptr, data)) = block_cache { return Ok(Some((data, ptr))); @@ -2657,7 +2657,7 @@ impl ChainStoreTrait for ChainStore { let mut conn = this.pool.get_permitted().await?; let result = this .storage - .ancestor_block(&mut conn, block_ptr, offset, root) + .ancestor_block(&mut conn, block_ptr_clone, offset, root_clone) .await .map_err(StoreError::from)?; @@ -2678,26 +2678,14 @@ impl ChainStoreTrait for ChainStore { } Ok(result) - } + }) .await; - BlocksLookupResult::Ancestor(Arc::new(res)) - }; - let lookup_herd = self.lookup_herd.cheap_clone(); - let logger = self.logger.cheap_clone(); - let (res, cached) = match lookup_herd.cached_query(hash, lookup_fut, &logger).await { - (BlocksLookupResult::Ancestor(res), cached) => (res, cached), - _ => { - return Err(anyhow!( - "Unexpected BlocksLookupResult variant returned from cached ancestor block lookup" - )) - } - }; - let result = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone())?; + let result = res?; if cached { - // If we had a hit in the herd cache, we never ran lookup_fut - // but we want to pretend that we actually looked the value up - // from the recent blocks cache + // If we had a hit in the herd cache, we never ran the future + // that we pass to cached_lookup but we want to pretend that we + // actually looked the value up from the recent blocks cache self.recent_blocks_cache.register_hit(); }