Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/api/method/get_indexer_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::common::typedefs::context::Context;
use solana_client::nonblocking::rpc_client::RpcClient;

// TODO: Make this an environment variable.
pub const HEALTH_CHECK_SLOT_DISTANCE: i64 = 20;
pub const HEALTH_CHECK_SLOT_DISTANCE: i64 = 50;

// TODO: Make sure that get_indexer_health formatting matches the Solana RPC formatting.
pub async fn get_indexer_health(
Expand Down
120 changes: 104 additions & 16 deletions src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::BTreeMap,
sync::{atomic::Ordering, Arc},
time::Duration,
};

use async_stream::stream;
Expand All @@ -19,8 +20,35 @@ use crate::{
monitor::{start_latest_slot_updater, LATEST_SLOT},
};

const BLOCK_NOT_AVAILABLE_ERROR: i64 = -32004;
const BLOCK_NOT_AVAILABLE_BACKOFF_MS: u64 = 200;
const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BlockFetchRetryAction {
RetryImmediately,
RetryAfter(Duration),
Skip,
}

fn classify_block_fetch_error(
kind: &solana_client::client_error::ClientErrorKind,
) -> BlockFetchRetryAction {
match kind {
solana_client::client_error::ClientErrorKind::RpcError(RpcError::RpcResponseError {
code,
..
}) if *code == BLOCK_NOT_AVAILABLE_ERROR => {
BlockFetchRetryAction::RetryAfter(Duration::from_millis(BLOCK_NOT_AVAILABLE_BACKOFF_MS))
}
solana_client::client_error::ClientErrorKind::RpcError(RpcError::RpcResponseError {
code,
..
}) if SKIPPED_BLOCK_ERRORS.contains(code) => BlockFetchRetryAction::Skip,
_ => BlockFetchRetryAction::RetryImmediately,
}
}

fn get_slot_stream(rpc_client: Arc<RpcClient>, start_slot: u64) -> impl Stream<Item = u64> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
Expand Down Expand Up @@ -86,7 +114,6 @@ fn pop_cached_blocks_to_index(
} else if min_slot < last_indexed_slot {
block_cache.remove(&min_slot);
} else {

break;
}
}
Expand Down Expand Up @@ -117,24 +144,85 @@ pub async fn fetch_block_with_infinite_retries(
}
return Some(parse_ui_confirmed_blocked(block, slot).unwrap());
}
Err(e) => {
log::error!("Error fetching block {}: {}", slot, e);
if let solana_client::client_error::ClientErrorKind::RpcError(
RpcError::RpcResponseError { code, .. },
) = *e.kind
{
if SKIPPED_BLOCK_ERRORS.contains(&code) {
metric! {
statsd_count!("rpc_skipped_block", 1);
}
log::info!("Skipped block: {}", slot);
return None;
Err(e) => match classify_block_fetch_error(&e.kind) {
BlockFetchRetryAction::RetryAfter(backoff) => {
metric! {
statsd_count!("rpc_block_not_available", 1);
}
log::debug!(
"Block {} not yet available from RPC, retrying in {}ms: {}",
slot,
backoff.as_millis(),
e
);
tokio::time::sleep(backoff).await;
}
metric! {
statsd_count!("rpc_block_fetch_failed", 1);
BlockFetchRetryAction::Skip => {
metric! {
statsd_count!("rpc_skipped_block", 1);
}
log::info!("Skipped block: {}", slot);
return None;
}
}
BlockFetchRetryAction::RetryImmediately => {
log::error!("Error fetching block {}: {}", slot, e);
metric! {
statsd_count!("rpc_block_fetch_failed", 1);
}
}
},
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use solana_client::rpc_request::RpcResponseErrorData;

fn rpc_response_error(code: i64) -> solana_client::client_error::ClientErrorKind {
solana_client::client_error::ClientErrorKind::RpcError(RpcError::RpcResponseError {
code,
message: "test".to_string(),
data: RpcResponseErrorData::Empty,
})
}

#[test]
fn classify_block_fetch_error_returns_backoff_for_block_not_available() {
assert_eq!(
classify_block_fetch_error(&rpc_response_error(BLOCK_NOT_AVAILABLE_ERROR)),
BlockFetchRetryAction::RetryAfter(Duration::from_millis(
BLOCK_NOT_AVAILABLE_BACKOFF_MS,
))
);
}

#[test]
fn classify_block_fetch_error_skips_known_skipped_block_codes() {
for code in SKIPPED_BLOCK_ERRORS {
assert_eq!(
classify_block_fetch_error(&rpc_response_error(code)),
BlockFetchRetryAction::Skip
);
}
}

#[test]
fn classify_block_fetch_error_retries_immediately_for_other_rpc_codes() {
assert_eq!(
classify_block_fetch_error(&rpc_response_error(-32005)),
BlockFetchRetryAction::RetryImmediately
);
}

#[test]
fn classify_block_fetch_error_retries_immediately_for_non_rpc_response_errors() {
assert_eq!(
classify_block_fetch_error(&solana_client::client_error::ClientErrorKind::Io(
std::io::Error::other("test")
)),
BlockFetchRetryAction::RetryImmediately
);
}
}