diff --git a/core/src/client.rs b/core/src/client.rs index 9525f546..5e111e24 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -23,6 +23,7 @@ use crate::login::{ }; use crate::presign::{presign_upload_to_stage, PresignMode, PresignedResponse, Reader}; use crate::response::LoadResponse; +use crate::retry::RetryDecision; use crate::stage::StageLocation; use crate::{ error::{Error, Result}, @@ -130,6 +131,9 @@ pub struct APIClient { capability: Capability, queries_need_heartbeat: Mutex>, + + retry_count: u32, + retry_delay_secs: u64, } impl Drop for APIClient { @@ -274,6 +278,12 @@ impl APIClient { } } } + "retry_count" => { + client.retry_count = v.parse()?; + } + "retry_delay_secs" => { + client.retry_delay_secs = v.parse()?; + } _ => { session_state.set(k, v); } @@ -1036,8 +1046,10 @@ impl APIClient { .timeout(self.connect_timeout) .build()?; + let max_retries = self.retry_count; + let retry_delay = Duration::from_secs(self.retry_delay_secs); // avoid recursively call request_helper - for i in 0..3 { + for i in 0..max_retries { let req = request.try_clone().expect("request not cloneable"); match self.cli.execute(req).await { Ok(response) => { @@ -1053,17 +1065,43 @@ impl APIClient { } }; } - if status != StatusCode::SERVICE_UNAVAILABLE || i >= 2 { + if status != StatusCode::SERVICE_UNAVAILABLE + || i >= max_retries.saturating_sub(1) + { return Err(Error::response_error(status, &body)); } + info!( + "retry {}/{} for session token refresh due to: service unavailable (503)", + i + 1, + max_retries + ); } Err(err) => { - if !(err.is_timeout() || err.is_connect()) || i > 2 { + if !(err.is_timeout() || err.is_connect()) || i >= max_retries.saturating_sub(1) + { return Err(Error::Request(err.to_string())); } + let reason = if err.is_timeout() { + "request timeout" + } else if err.is_connect() { + "connection error" + } else { + "request error" + }; + info!( + "retry {}/{} for session token refresh due to: {} (error: {})", + i + 1, + max_retries, + reason, + err + ); } }; - sleep(jitter(Duration::from_secs(10))).await; + warn!( + "retrying session token refresh after {} seconds", + retry_delay.as_secs() + ); + sleep(jitter(retry_delay)).await; } Ok(()) } @@ -1096,9 +1134,11 @@ impl APIClient { ) -> std::result::Result { let mut refreshed = false; let mut retries = 0; + let max_retries = self.retry_count; + let retry_delay = Duration::from_secs(self.retry_delay_secs); loop { let req = request.try_clone().expect("request not cloneable"); - let (err, retry): (Error, bool) = match self.cli.execute(req).await { + let decision = match self.cli.execute(req).await { Ok(response) => { let status = response.status(); if status == StatusCode::OK { @@ -1107,7 +1147,10 @@ impl APIClient { let body = response.bytes().await?; if retry_if_503 && status == StatusCode::SERVICE_UNAVAILABLE { // waiting for server to start - (Error::response_error(status, &body), true) + RetryDecision::retry_with_reason( + Error::response_error(status, &body), + "service unavailable (503), server may be starting", + ) } else { let resp = serde_json::from_slice::(&body); match resp { @@ -1116,91 +1159,121 @@ impl APIClient { if status == StatusCode::UNAUTHORIZED { request.headers_mut().remove(reqwest::header::AUTHORIZATION); if let Some(session_token_info) = &self.session_token_info { - info!( - "will retry {} after refresh token on auth error {}", - request.url(), - e - ); - let retry = if need_refresh_token(e.code) + if need_refresh_token(e.code) && !refreshed && refresh_if_401 { self.refresh_session_token(session_token_info.clone()) .await?; refreshed = true; - true + RetryDecision::retry_with_reason( + Error::AuthFailure(e), + "session token expired and refreshed", + ) } else { - false - }; - (Error::AuthFailure(e), retry) + RetryDecision::no_retry(Error::AuthFailure(e)) + } } else if self.auth.can_reload() { - info!( - "will retry {} after reload token on auth error {}", - request.url(), - e - ); let builder = RequestBuilder::from_parts( HttpClient::new(), request.try_clone().unwrap(), ); let builder = self.auth.wrap(builder)?; request = builder.build()?; - (Error::AuthFailure(e), true) + RetryDecision::retry_with_reason( + Error::AuthFailure(e), + "authentication token reloaded", + ) } else { - (Error::AuthFailure(e), false) + RetryDecision::no_retry(Error::AuthFailure(e)) } } else { - (Error::Logic(status, e), false) + RetryDecision::no_retry(Error::Logic(status, e)) } } - Err(_) => ( - Error::Response { - status, - msg: String::from_utf8_lossy(&body).to_string(), - }, - false, - ), + Err(_) => RetryDecision::no_retry(Error::Response { + status, + msg: String::from_utf8_lossy(&body).to_string(), + }), } } } - Err(err) => ( - Error::Request(err.to_string()), - err.is_timeout() || err.is_connect() || err.is_request(), - ), + Err(err) => { + let is_retryable = err.is_timeout() || err.is_connect() || err.is_request(); + if is_retryable { + let reason = if err.is_timeout() { + "request timeout" + } else if err.is_connect() { + "connection error" + } else { + "request error" + }; + RetryDecision::retry_with_reason(Error::Request(err.to_string()), reason) + } else { + RetryDecision::no_retry(Error::Request(err.to_string())) + } + } }; - if !retry { - return Err(err.with_context(&format!("{} {}", request.method(), request.url()))); + if !decision.should_retry { + return Err(decision.error.with_context(&format!( + "{} {}", + request.method(), + request.url() + ))); } - match &err { + match &decision.error { Error::AuthFailure(_) => { if refreshed { retries = 0; - } else if retries == 2 { - return Err(err.with_context(&format!( - "{} {} after 3 retries", + } else if retries >= max_retries.saturating_sub(1) { + return Err(decision.error.with_context(&format!( + "{} {} after {} retries", request.method(), - request.url() + request.url(), + max_retries ))); } } _ => { - if retries == 2 { - return Err(err.with_context(&format!( - "{} {} after 3 reties", + if retries >= max_retries.saturating_sub(1) { + return Err(decision.error.with_context(&format!( + "{} {} after {} retries", request.method(), - request.url() + request.url(), + max_retries ))); } retries += 1; - info!( - "will retry {} the {retries}th times on error {}", - request.url(), - err - ); + if let Some(reason) = decision.reason { + info!( + "retry {}/{} for {} due to: {} (error: {})", + retries, + max_retries, + request.url(), + reason, + decision.error + ); + } else { + info!( + "retry {}/{} for {} on error: {}", + retries, + max_retries, + request.url(), + decision.error + ); + } } } - warn!("will retry after 10 seconds"); - sleep(jitter(Duration::from_secs(10))).await; + if let Some(reason) = decision.reason { + warn!( + "retrying after {} seconds due to: {}", + retry_delay.as_secs(), + reason + ); + } else { + warn!("retrying after {} seconds", retry_delay.as_secs()); + } + sleep(jitter(retry_delay)).await; } } @@ -1301,6 +1374,8 @@ impl Default for APIClient { server_version: None, capability: Default::default(), queries_need_heartbeat: Default::default(), + retry_count: 3, + retry_delay_secs: 10, } } } diff --git a/core/src/lib.rs b/core/src/lib.rs index a54fbbd6..193bec95 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -23,6 +23,7 @@ mod pages; mod presign; mod request; mod response; +mod retry; mod capability; mod client_mgr; diff --git a/core/src/retry.rs b/core/src/retry.rs new file mode 100644 index 00000000..6274e942 --- /dev/null +++ b/core/src/retry.rs @@ -0,0 +1,39 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Error; + +pub(crate) struct RetryDecision<'a> { + pub(crate) error: Error, + pub(crate) should_retry: bool, + pub(crate) reason: Option<&'a str>, +} + +impl<'a> RetryDecision<'a> { + pub(crate) fn no_retry(error: Error) -> Self { + Self { + error, + should_retry: false, + reason: None, + } + } + + pub(crate) fn retry_with_reason(error: Error, reason: &'a str) -> Self { + Self { + error, + should_retry: true, + reason: Some(reason), + } + } +}