Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ slow-timeout = { period = "120s", terminate-after = 3 }
filter = 'test(rpc_snapshot_test_)'
slow-timeout = { period = "120s", terminate-after = 3 }
retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true }

# These tests download test snapshots from the network, which can take a while.
# There might be some network issues, so we allow some retries with backoff.
# Jitter is enabled to avoid [thundering herd issues](https://en.wikipedia.org/wiki/Thundering_herd_problem).
[[profile.default.overrides]]
filter = 'test(state_compute_)'
slow-timeout = { period = "120s", terminate-after = 3 }
retries = { backoff = "exponential", count = 3, delay = "5s", jitter = true }
12 changes: 4 additions & 8 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::utils::misc::env::is_env_truthy;
use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
use anyhow::{Context as _, bail};
use dialoguer::theme::ColorfulTheme;
use futures::{Future, FutureExt, select};
use futures::{Future, FutureExt};
use std::path::Path;
use std::sync::Arc;
use std::sync::OnceLock;
Expand Down Expand Up @@ -738,13 +738,9 @@ async fn maybe_set_snapshot_path(
async fn propagate_error(
services: &mut JoinSet<Result<(), anyhow::Error>>,
) -> anyhow::Result<std::convert::Infallible> {
while !services.is_empty() {
select! {
option = services.join_next().fuse() => {
if let Some(Ok(Err(error_message))) = option {
return Err(error_message)
}
},
while let Some(result) = services.join_next().await {
if let Ok(Err(error_message)) = result {
return Err(error_message);
}
}
std::future::pending().await
Expand Down
20 changes: 16 additions & 4 deletions src/state_manager/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub mod state_compute {
use std::{
path::{Path, PathBuf},
sync::{Arc, LazyLock},
time::Duration,
};
use url::Url;

Expand All @@ -215,11 +216,22 @@ pub mod state_compute {
let url = Url::parse(&format!(
"https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/state_compute/{chain}_{epoch}.forest.car.zst"
))?;
Ok(
download_file_with_cache(&url, &SNAPSHOT_CACHE_DIR, DownloadFileOption::NonResumable)
.await?
.path,
Ok(crate::utils::retry(
crate::utils::RetryArgs {
timeout: Some(Duration::from_secs(30)),
max_retries: Some(5),
delay: Some(Duration::from_secs(1)),
},
|| {
download_file_with_cache(
&url,
&SNAPSHOT_CACHE_DIR,
DownloadFileOption::NonResumable,
)
},
)
.await?
.path)
}

pub async fn prepare_state_compute(
Expand Down
2 changes: 1 addition & 1 deletion src/state_migration/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn test_state_migration(
retry(
RetryArgs {
timeout: Some(timeout),
max_retries: Some(5),
max_retries: Some(15),
..Default::default()
},
|| async {
Expand Down
12 changes: 3 additions & 9 deletions src/tool/subcommands/api_cmd/test_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,11 @@ mod tests {
let cache_dir = project_dir.cache_dir().join("test").join("rpc-snapshots");
let path = crate::utils::retry(
crate::utils::RetryArgs {
timeout: Some(Duration::from_secs(if crate::utils::is_ci() {
20
} else {
120
})),
timeout: Some(Duration::from_secs(30)),
max_retries: Some(5),
..Default::default()
},
|| async {
download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable).await
delay: Some(Duration::from_secs(1)),
},
|| download_file_with_cache(&url, &cache_dir, DownloadFileOption::NonResumable),
)
.await
.unwrap()
Expand Down
46 changes: 20 additions & 26 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,9 @@ pub mod stream;
pub mod version;

use anyhow::{Context as _, bail};
use futures::{
Future, FutureExt,
future::{FusedFuture, pending},
select,
};
use futures::Future;
use multiaddr::{Multiaddr, Protocol};
use std::{pin::Pin, str::FromStr, time::Duration};
use std::{str::FromStr, time::Duration};
use tokio::time::sleep;
use tracing::error;
use url::Url;
Expand Down Expand Up @@ -125,29 +121,26 @@ where
F: Future<Output = Result<T, E>>,
E: std::fmt::Debug,
{
let mut timeout: Pin<Box<dyn FusedFuture<Output = ()>>> = match args.timeout {
Some(duration) => Box::pin(sleep(duration).fuse()),
None => Box::pin(pending()),
};
let max_retries = args.max_retries.unwrap_or(usize::MAX);
let mut task = Box::pin(
async {
for _ in 0..max_retries {
match make_fut().await {
Ok(ok) => return Ok(ok),
Err(err) => error!("retrying operation after {err:?}"),
}
if let Some(delay) = args.delay {
sleep(delay).await;
}
let task = async {
for _ in 0..max_retries {
match make_fut().await {
Ok(ok) => return Ok(ok),
Err(err) => error!("retrying operation after {err:?}"),
}
if let Some(delay) = args.delay {
sleep(delay).await;
}
Err(RetryError::RetriesExceeded)
}
.fuse(),
);
select! {
_ = timeout => Err(RetryError::TimeoutExceeded),
res = task => res,
Err(RetryError::RetriesExceeded)
};

if let Some(timeout) = args.timeout {
tokio::time::timeout(timeout, task)
.await
.map_err(|_| RetryError::TimeoutExceeded)?
} else {
task.await
}
}

Expand Down Expand Up @@ -187,6 +180,7 @@ mod tests {
mod files;

use RetryError::{RetriesExceeded, TimeoutExceeded};
use futures::future::pending;
use std::{future::ready, sync::atomic::AtomicUsize};

use super::*;
Expand Down
Loading