Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 128 additions & 53 deletions core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::login::{
};
use crate::presign::{presign_upload_to_stage, PresignMode, PresignedResponse, Reader};
use crate::response::LoadResponse;
use crate::retry::RetryDecision;
use crate::stage::StageLocation;
use crate::{
error::{Error, Result},
Expand Down Expand Up @@ -130,6 +131,9 @@ pub struct APIClient {
capability: Capability,

queries_need_heartbeat: Mutex<HashMap<String, QueryState>>,

retry_count: u32,
retry_delay_secs: u64,
}

impl Drop for APIClient {
Expand Down Expand Up @@ -274,6 +278,12 @@ impl APIClient {
}
}
}
"retry_count" => {
client.retry_count = v.parse()?;
}
"retry_delay_secs" => {
client.retry_delay_secs = v.parse()?;
}
_ => {
session_state.set(k, v);
}
Expand Down Expand Up @@ -1036,8 +1046,10 @@ impl APIClient {
.timeout(self.connect_timeout)
.build()?;

let max_retries = self.retry_count;
let retry_delay = Duration::from_secs(self.retry_delay_secs);
// avoid recursively call request_helper
for i in 0..3 {
for i in 0..max_retries {
let req = request.try_clone().expect("request not cloneable");
match self.cli.execute(req).await {
Ok(response) => {
Expand All @@ -1053,17 +1065,43 @@ impl APIClient {
}
};
}
if status != StatusCode::SERVICE_UNAVAILABLE || i >= 2 {
if status != StatusCode::SERVICE_UNAVAILABLE
|| i >= max_retries.saturating_sub(1)
{
return Err(Error::response_error(status, &body));
}
info!(
"retry {}/{} for session token refresh due to: service unavailable (503)",
i + 1,
max_retries
);
}
Err(err) => {
if !(err.is_timeout() || err.is_connect()) || i > 2 {
if !(err.is_timeout() || err.is_connect()) || i >= max_retries.saturating_sub(1)
{
return Err(Error::Request(err.to_string()));
}
let reason = if err.is_timeout() {
"request timeout"
} else if err.is_connect() {
"connection error"
} else {
"request error"
};
info!(
"retry {}/{} for session token refresh due to: {} (error: {})",
i + 1,
max_retries,
reason,
err
);
}
};
sleep(jitter(Duration::from_secs(10))).await;
warn!(
"retrying session token refresh after {} seconds",
retry_delay.as_secs()
);
sleep(jitter(retry_delay)).await;
}
Ok(())
}
Expand Down Expand Up @@ -1096,9 +1134,11 @@ impl APIClient {
) -> std::result::Result<Response, Error> {
let mut refreshed = false;
let mut retries = 0;
let max_retries = self.retry_count;
let retry_delay = Duration::from_secs(self.retry_delay_secs);
loop {
let req = request.try_clone().expect("request not cloneable");
let (err, retry): (Error, bool) = match self.cli.execute(req).await {
let decision = match self.cli.execute(req).await {
Ok(response) => {
let status = response.status();
if status == StatusCode::OK {
Expand All @@ -1107,7 +1147,10 @@ impl APIClient {
let body = response.bytes().await?;
if retry_if_503 && status == StatusCode::SERVICE_UNAVAILABLE {
// waiting for server to start
(Error::response_error(status, &body), true)
RetryDecision::retry_with_reason(
Error::response_error(status, &body),
"service unavailable (503), server may be starting",
)
} else {
let resp = serde_json::from_slice::<ResponseWithErrorCode>(&body);
match resp {
Expand All @@ -1116,91 +1159,121 @@ impl APIClient {
if status == StatusCode::UNAUTHORIZED {
request.headers_mut().remove(reqwest::header::AUTHORIZATION);
if let Some(session_token_info) = &self.session_token_info {
info!(
"will retry {} after refresh token on auth error {}",
request.url(),
e
);
let retry = if need_refresh_token(e.code)
if need_refresh_token(e.code)
&& !refreshed
&& refresh_if_401
{
self.refresh_session_token(session_token_info.clone())
.await?;
refreshed = true;
true
RetryDecision::retry_with_reason(
Error::AuthFailure(e),
"session token expired and refreshed",
)
} else {
false
};
(Error::AuthFailure(e), retry)
RetryDecision::no_retry(Error::AuthFailure(e))
}
} else if self.auth.can_reload() {
info!(
"will retry {} after reload token on auth error {}",
request.url(),
e
);
let builder = RequestBuilder::from_parts(
HttpClient::new(),
request.try_clone().unwrap(),
);
let builder = self.auth.wrap(builder)?;
request = builder.build()?;
(Error::AuthFailure(e), true)
RetryDecision::retry_with_reason(
Error::AuthFailure(e),
"authentication token reloaded",
)
} else {
(Error::AuthFailure(e), false)
RetryDecision::no_retry(Error::AuthFailure(e))
}
} else {
(Error::Logic(status, e), false)
RetryDecision::no_retry(Error::Logic(status, e))
}
}
Err(_) => (
Error::Response {
status,
msg: String::from_utf8_lossy(&body).to_string(),
},
false,
),
Err(_) => RetryDecision::no_retry(Error::Response {
status,
msg: String::from_utf8_lossy(&body).to_string(),
}),
}
}
}
Err(err) => (
Error::Request(err.to_string()),
err.is_timeout() || err.is_connect() || err.is_request(),
),
Err(err) => {
let is_retryable = err.is_timeout() || err.is_connect() || err.is_request();
if is_retryable {
let reason = if err.is_timeout() {
"request timeout"
} else if err.is_connect() {
"connection error"
} else {
"request error"
};
RetryDecision::retry_with_reason(Error::Request(err.to_string()), reason)
} else {
RetryDecision::no_retry(Error::Request(err.to_string()))
}
}
};
if !retry {
return Err(err.with_context(&format!("{} {}", request.method(), request.url())));
if !decision.should_retry {
return Err(decision.error.with_context(&format!(
"{} {}",
request.method(),
request.url()
)));
}
match &err {
match &decision.error {
Error::AuthFailure(_) => {
if refreshed {
retries = 0;
} else if retries == 2 {
return Err(err.with_context(&format!(
"{} {} after 3 retries",
} else if retries >= max_retries.saturating_sub(1) {
return Err(decision.error.with_context(&format!(
Comment thread
everpcpc marked this conversation as resolved.
"{} {} after {} retries",
request.method(),
request.url()
request.url(),
max_retries
)));
}
}
_ => {
if retries == 2 {
return Err(err.with_context(&format!(
"{} {} after 3 reties",
if retries >= max_retries.saturating_sub(1) {
return Err(decision.error.with_context(&format!(
"{} {} after {} retries",
request.method(),
request.url()
request.url(),
max_retries
)));
}
retries += 1;
info!(
"will retry {} the {retries}th times on error {}",
request.url(),
err
);
if let Some(reason) = decision.reason {
info!(
"retry {}/{} for {} due to: {} (error: {})",
retries,
max_retries,
request.url(),
reason,
decision.error
);
} else {
info!(
"retry {}/{} for {} on error: {}",
retries,
max_retries,
request.url(),
decision.error
);
}
}
}
warn!("will retry after 10 seconds");
sleep(jitter(Duration::from_secs(10))).await;
if let Some(reason) = decision.reason {
warn!(
"retrying after {} seconds due to: {}",
retry_delay.as_secs(),
reason
);
} else {
warn!("retrying after {} seconds", retry_delay.as_secs());
}
sleep(jitter(retry_delay)).await;
}
}

Expand Down Expand Up @@ -1301,6 +1374,8 @@ impl Default for APIClient {
server_version: None,
capability: Default::default(),
queries_need_heartbeat: Default::default(),
retry_count: 3,
retry_delay_secs: 10,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod pages;
mod presign;
mod request;
mod response;
mod retry;

mod capability;
mod client_mgr;
Expand Down
39 changes: 39 additions & 0 deletions core/src/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::error::Error;

pub(crate) struct RetryDecision<'a> {
pub(crate) error: Error,
pub(crate) should_retry: bool,
pub(crate) reason: Option<&'a str>,
}

impl<'a> RetryDecision<'a> {
pub(crate) fn no_retry(error: Error) -> Self {
Self {
error,
should_retry: false,
reason: None,
}
}

pub(crate) fn retry_with_reason(error: Error, reason: &'a str) -> Self {
Self {
error,
should_retry: true,
reason: Some(reason),
}
}
}
Loading