Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 56 additions & 68 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl};

use graph::components::store::ChainHeadStore;
use graph::data::store::ethereum::call;
use graph::derive::CheapClone;
use graph::env::ENV_VARS;
use graph::parking_lot::RwLock;
use graph::prelude::MetricsRegistry;
Expand All @@ -17,6 +16,7 @@ use graph::stable_hash::crypto_stable_hash;
use graph::util::herd_cache::HerdCache;

use std::collections::BTreeMap;
use std::future::Future;
use std::{
collections::HashMap,
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -1968,13 +1968,6 @@ impl ChainStoreMetrics {
}
}

#[derive(Clone, CheapClone)]
enum BlocksLookupResult {
ByHash(Arc<Result<Vec<JsonBlock>, StoreError>>),
ByNumber(Arc<Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError>>),
Ancestor(Arc<Result<Option<(json::Value, BlockPtr)>, StoreError>>),
}

pub struct ChainStore {
logger: Logger,
pool: ConnectionPool,
Expand All @@ -1989,7 +1982,11 @@ pub struct ChainStore {
// with the database and to correctly implement invalidation. So, a
// conservative approach is acceptable.
recent_blocks_cache: RecentBlocksCache,
lookup_herd: HerdCache<BlocksLookupResult>,
// Typed herd caches to avoid thundering herd on concurrent lookups
blocks_by_hash_cache: HerdCache<Arc<Result<Vec<JsonBlock>, StoreError>>>,
blocks_by_number_cache:
HerdCache<Arc<Result<BTreeMap<BlockNumber, Vec<JsonBlock>>, StoreError>>>,
ancestor_cache: HerdCache<Arc<Result<Option<(json::Value, BlockPtr)>, StoreError>>>,
}

impl ChainStore {
Expand All @@ -2005,7 +2002,9 @@ impl ChainStore {
) -> Self {
let recent_blocks_cache =
RecentBlocksCache::new(recent_blocks_cache_capacity, chain.clone(), metrics);
let lookup_herd = HerdCache::new(format!("chain_{}_herd_cache", chain));
let blocks_by_hash_cache = HerdCache::new(format!("chain_{}_blocks_by_hash", chain));
let blocks_by_number_cache = HerdCache::new(format!("chain_{}_blocks_by_number", chain));
let ancestor_cache = HerdCache::new(format!("chain_{}_ancestor", chain));
ChainStore {
logger,
pool,
Expand All @@ -2014,14 +2013,36 @@ impl ChainStore {
status,
chain_head_update_sender,
recent_blocks_cache,
lookup_herd,
blocks_by_hash_cache,
blocks_by_number_cache,
ancestor_cache,
}
}

pub fn is_ingestible(&self) -> bool {
matches!(self.status, ChainStatus::Ingestible)
}

/// Execute a cached query, avoiding thundering herd for identical requests.
/// Returns `(result, was_cached)`.
async fn cached_lookup<K, T, F>(
&self,
cache: &HerdCache<Arc<Result<T, StoreError>>>,
key: &K,
lookup: F,
) -> (Result<T, StoreError>, bool)
where
K: graph::stable_hash::StableHash,
T: Clone + Send + 'static,
F: Future<Output = Result<T, StoreError>> + Send + 'static,
{
let hash = crypto_stable_hash(key);
let lookup_fut = async move { Arc::new(lookup.await) };
let (arc_result, cached) = cache.cached_query(hash, lookup_fut, &self.logger).await;
let result = Arc::try_unwrap(arc_result).unwrap_or_else(|arc| (*arc).clone());
(result, cached)
}

pub(crate) async fn create(&self, ident: &ChainIdentifier) -> Result<(), Error> {
use public::ethereum_networks::dsl::*;

Expand Down Expand Up @@ -2489,19 +2510,13 @@ impl ChainStoreTrait for ChainStore {
.cloned()
.collect::<Vec<_>>();

let hash = crypto_stable_hash(&missing_numbers);
let this = self.clone();
let lookup_fut = async move {
let res = this.blocks_from_store_by_numbers(missing_numbers).await;
BlocksLookupResult::ByNumber(Arc::new(res))
};
let lookup_herd = self.lookup_herd.cheap_clone();
let logger = self.logger.cheap_clone();
let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
(BlocksLookupResult::ByNumber(res), _) => res,
_ => unreachable!(),
};
let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone());
let missing_clone = missing_numbers.clone();
let (res, _) = self
.cached_lookup(&self.blocks_by_number_cache, &missing_numbers, async move {
this.blocks_from_store_by_numbers(missing_clone).await
})
.await;

match res {
Ok(blocks) => {
Expand Down Expand Up @@ -2575,31 +2590,14 @@ impl ChainStoreTrait for ChainStore {
// the database for one block hash, `h3`, is not much faster
// than looking up `[h1, h3]` though it would require less
// IO bandwidth
let hash = crypto_stable_hash(&hashes);
let this = self.clone();
let lookup_fut = async move {
let res = this.blocks_from_store(hashes).await;
BlocksLookupResult::ByHash(Arc::new(res))
};
let lookup_herd = self.lookup_herd.cheap_clone();
let logger = self.logger.cheap_clone();
// This match can only return ByHash because lookup_fut explicitly constructs
// BlocksLookupResult::ByHash. The cache preserves the exact future result,
// so ByNumber variant is structurally impossible here.
let res = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
(BlocksLookupResult::ByHash(res), _) => res,
(BlocksLookupResult::ByNumber(_) | BlocksLookupResult::Ancestor(_), _) => {
Arc::new(Err(StoreError::Unknown(anyhow::anyhow!(
"Unexpected BlocksLookupResult variant returned from cached block lookup by hash"
))))
}
};
let hashes_clone = hashes.clone();
let (res, _) = self
.cached_lookup(&self.blocks_by_hash_cache, &hashes, async move {
this.blocks_from_store(hashes_clone).await
})
.await;

// Try to avoid cloning a non-concurrent lookup; it's not
// entirely clear whether that will actually avoid a clone
// since it depends on a lot of the details of how the
// `HerdCache` is implemented
let res = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone());
let stored = match res {
Ok(blocks) => {
for block in &blocks {
Expand Down Expand Up @@ -2640,14 +2638,16 @@ impl ChainStoreTrait for ChainStore {
// request the same ancestor block simultaneously. The cache check
// is inside the future so that only one caller checks and populates
// the cache.
let hash = crypto_stable_hash(&(&block_ptr, offset, &root));
let key = (&block_ptr, offset, &root);
let this = self.cheap_clone();
let lookup_fut = async move {
let res: Result<Option<(json::Value, BlockPtr)>, StoreError> = async {
let block_ptr_clone = block_ptr.clone();
let root_clone = root.clone();
let (res, cached) = self
.cached_lookup(&self.ancestor_cache, &key, async move {
// Check the local cache first.
let block_cache = this
.recent_blocks_cache
.get_ancestor(&block_ptr, offset)
.get_ancestor(&block_ptr_clone, offset)
.and_then(|x| Some(x.0).zip(x.1));
if let Some((ptr, data)) = block_cache {
return Ok(Some((data, ptr)));
Expand All @@ -2657,7 +2657,7 @@ impl ChainStoreTrait for ChainStore {
let mut conn = this.pool.get_permitted().await?;
let result = this
.storage
.ancestor_block(&mut conn, block_ptr, offset, root)
.ancestor_block(&mut conn, block_ptr_clone, offset, root_clone)
.await
.map_err(StoreError::from)?;

Expand All @@ -2678,26 +2678,14 @@ impl ChainStoreTrait for ChainStore {
}

Ok(result)
}
})
.await;
BlocksLookupResult::Ancestor(Arc::new(res))
};
let lookup_herd = self.lookup_herd.cheap_clone();
let logger = self.logger.cheap_clone();
let (res, cached) = match lookup_herd.cached_query(hash, lookup_fut, &logger).await {
(BlocksLookupResult::Ancestor(res), cached) => (res, cached),
_ => {
return Err(anyhow!(
"Unexpected BlocksLookupResult variant returned from cached ancestor block lookup"
))
}
};
let result = Arc::try_unwrap(res).unwrap_or_else(|arc| (*arc).clone())?;
let result = res?;

if cached {
// If we had a hit in the herd cache, we never ran lookup_fut
// but we want to pretend that we actually looked the value up
// from the recent blocks cache
// If we had a hit in the herd cache, we never ran the future
// that we pass to cached_lookup but we want to pretend that we
// actually looked the value up from the recent blocks cache
self.recent_blocks_cache.register_hit();
}

Expand Down