From df057d35d8e8dccf9c2a93439f4795d629ac02c1 Mon Sep 17 00:00:00 2001 From: Miles 'wedtm' Smith Date: Fri, 20 Mar 2026 14:58:51 +0000 Subject: [PATCH 1/2] fix: add a backoff for getBlock on -32004 from upstream --- src/ingester/fetchers/poller.rs | 120 +++++++++++++++++++++++++++----- 1 file changed, 104 insertions(+), 16 deletions(-) diff --git a/src/ingester/fetchers/poller.rs b/src/ingester/fetchers/poller.rs index dec7f812..3a19f799 100644 --- a/src/ingester/fetchers/poller.rs +++ b/src/ingester/fetchers/poller.rs @@ -1,6 +1,7 @@ use std::{ collections::BTreeMap, sync::{atomic::Ordering, Arc}, + time::Duration, }; use async_stream::stream; @@ -19,8 +20,35 @@ use crate::{ monitor::{start_latest_slot_updater, LATEST_SLOT}, }; +const BLOCK_NOT_AVAILABLE_ERROR: i64 = -32004; +const BLOCK_NOT_AVAILABLE_BACKOFF_MS: u64 = 200; const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009]; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum BlockFetchRetryAction { + RetryImmediately, + RetryAfter(Duration), + Skip, +} + +fn classify_block_fetch_error( + kind: &solana_client::client_error::ClientErrorKind, +) -> BlockFetchRetryAction { + match kind { + solana_client::client_error::ClientErrorKind::RpcError(RpcError::RpcResponseError { + code, + .. + }) if *code == BLOCK_NOT_AVAILABLE_ERROR => { + BlockFetchRetryAction::RetryAfter(Duration::from_millis(BLOCK_NOT_AVAILABLE_BACKOFF_MS)) + } + solana_client::client_error::ClientErrorKind::RpcError(RpcError::RpcResponseError { + code, + .. + }) if SKIPPED_BLOCK_ERRORS.contains(code) => BlockFetchRetryAction::Skip, + _ => BlockFetchRetryAction::RetryImmediately, + } +} + fn get_slot_stream(rpc_client: Arc, start_slot: u64) -> impl Stream { stream! { start_latest_slot_updater(rpc_client.clone()).await; @@ -86,7 +114,6 @@ fn pop_cached_blocks_to_index( } else if min_slot < last_indexed_slot { block_cache.remove(&min_slot); } else { - break; } } @@ -117,24 +144,85 @@ pub async fn fetch_block_with_infinite_retries( } return Some(parse_ui_confirmed_blocked(block, slot).unwrap()); } - Err(e) => { - log::error!("Error fetching block {}: {}", slot, e); - if let solana_client::client_error::ClientErrorKind::RpcError( - RpcError::RpcResponseError { code, .. }, - ) = *e.kind - { - if SKIPPED_BLOCK_ERRORS.contains(&code) { - metric! { - statsd_count!("rpc_skipped_block", 1); - } - log::info!("Skipped block: {}", slot); - return None; + Err(e) => match classify_block_fetch_error(&e.kind) { + BlockFetchRetryAction::RetryAfter(backoff) => { + metric! { + statsd_count!("rpc_block_not_available", 1); } + log::debug!( + "Block {} not yet available from RPC, retrying in {}ms: {}", + slot, + backoff.as_millis(), + e + ); + tokio::time::sleep(backoff).await; } - metric! { - statsd_count!("rpc_block_fetch_failed", 1); + BlockFetchRetryAction::Skip => { + metric! { + statsd_count!("rpc_skipped_block", 1); + } + log::info!("Skipped block: {}", slot); + return None; } - } + BlockFetchRetryAction::RetryImmediately => { + log::error!("Error fetching block {}: {}", slot, e); + metric! { + statsd_count!("rpc_block_fetch_failed", 1); + } + } + }, } } } + +#[cfg(test)] +mod tests { + use super::*; + use solana_client::rpc_request::RpcResponseErrorData; + + fn rpc_response_error(code: i64) -> solana_client::client_error::ClientErrorKind { + solana_client::client_error::ClientErrorKind::RpcError(RpcError::RpcResponseError { + code, + message: "test".to_string(), + data: RpcResponseErrorData::Empty, + }) + } + + #[test] + fn classify_block_fetch_error_returns_backoff_for_block_not_available() { + assert_eq!( + classify_block_fetch_error(&rpc_response_error(BLOCK_NOT_AVAILABLE_ERROR)), + BlockFetchRetryAction::RetryAfter(Duration::from_millis( + BLOCK_NOT_AVAILABLE_BACKOFF_MS, + )) + ); + } + + #[test] + fn classify_block_fetch_error_skips_known_skipped_block_codes() { + for code in SKIPPED_BLOCK_ERRORS { + assert_eq!( + classify_block_fetch_error(&rpc_response_error(code)), + BlockFetchRetryAction::Skip + ); + } + } + + #[test] + fn classify_block_fetch_error_retries_immediately_for_other_rpc_codes() { + assert_eq!( + classify_block_fetch_error(&rpc_response_error(-32005)), + BlockFetchRetryAction::RetryImmediately + ); + } + + #[test] + fn classify_block_fetch_error_retries_immediately_for_non_rpc_response_errors() { + assert_eq!( + classify_block_fetch_error(&solana_client::client_error::ClientErrorKind::Io( + std::io::Error::other("test") + )), + BlockFetchRetryAction::RetryImmediately + ); + } +} From f3bd984d0082dd2dfe4c33cab5e5de64896a6f88 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Fri, 20 Mar 2026 20:20:59 +0100 Subject: [PATCH 2/2] Increase health check slot distance to 50 --- src/api/method/get_indexer_health.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/method/get_indexer_health.rs b/src/api/method/get_indexer_health.rs index 40d5a55e..9700a4ba 100644 --- a/src/api/method/get_indexer_health.rs +++ b/src/api/method/get_indexer_health.rs @@ -5,7 +5,7 @@ use crate::common::typedefs::context::Context; use solana_client::nonblocking::rpc_client::RpcClient; // TODO: Make this an environment variable. -pub const HEALTH_CHECK_SLOT_DISTANCE: i64 = 20; +pub const HEALTH_CHECK_SLOT_DISTANCE: i64 = 50; // TODO: Make sure that get_indexer_health formatting matches the Solana RPC formatting. pub async fn get_indexer_health(