From b545a8944aadacdc4b2ea62e2fc1af3986c53d7a Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 15 Jan 2026 11:52:59 +0800 Subject: [PATCH 1/3] feat: add configurable retry parameters and improve retry logging - Add retry_count and retry_delay_secs DSN parameters (default: 3 retries, 10s delay) - Improve retry logs with detailed reasons (503, timeout, connection error, token refresh) - Refactor retry logic with RetryDecision struct in separate retry.rs module --- core/src/client.rs | 170 +++++++++++++++++++++++++++++++-------------- core/src/lib.rs | 1 + core/src/retry.rs | 39 +++++++++++ 3 files changed, 157 insertions(+), 53 deletions(-) create mode 100644 core/src/retry.rs diff --git a/core/src/client.rs b/core/src/client.rs index 9525f546..15bbd894 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -23,6 +23,7 @@ use crate::login::{ }; use crate::presign::{presign_upload_to_stage, PresignMode, PresignedResponse, Reader}; use crate::response::LoadResponse; +use crate::retry::RetryDecision; use crate::stage::StageLocation; use crate::{ error::{Error, Result}, @@ -130,6 +131,9 @@ pub struct APIClient { capability: Capability, queries_need_heartbeat: Mutex>, + + retry_count: u32, + retry_delay_secs: u64, } impl Drop for APIClient { @@ -274,6 +278,12 @@ impl APIClient { } } } + "retry_count" => { + client.retry_count = v.parse()?; + } + "retry_delay_secs" => { + client.retry_delay_secs = v.parse()?; + } _ => { session_state.set(k, v); } @@ -1036,8 +1046,10 @@ impl APIClient { .timeout(self.connect_timeout) .build()?; + let max_retries = self.retry_count; + let retry_delay = Duration::from_secs(self.retry_delay_secs); // avoid recursively call request_helper - for i in 0..3 { + for i in 0..max_retries { let req = request.try_clone().expect("request not cloneable"); match self.cli.execute(req).await { Ok(response) => { @@ -1053,17 +1065,37 @@ impl APIClient { } }; } - if status != StatusCode::SERVICE_UNAVAILABLE || i >= 2 { + if status != StatusCode::SERVICE_UNAVAILABLE || i >= max_retries - 1 { return Err(Error::response_error(status, &body)); } + info!( + "retry {}/{} for session token refresh due to: service unavailable (503)", + i + 1, + max_retries + ); } Err(err) => { - if !(err.is_timeout() || err.is_connect()) || i > 2 { + if !(err.is_timeout() || err.is_connect()) || i >= max_retries - 1 { return Err(Error::Request(err.to_string())); } + let reason = if err.is_timeout() { + "request timeout" + } else if err.is_connect() { + "connection error" + } else { + "request error" + }; + info!( + "retry {}/{} for session token refresh due to: {} (error: {})", + i + 1, + max_retries, + reason, + err + ); } }; - sleep(jitter(Duration::from_secs(10))).await; + warn!("retrying session token refresh after {} seconds", retry_delay.as_secs()); + sleep(jitter(retry_delay)).await; } Ok(()) } @@ -1096,9 +1128,11 @@ impl APIClient { ) -> std::result::Result { let mut refreshed = false; let mut retries = 0; + let max_retries = self.retry_count; + let retry_delay = Duration::from_secs(self.retry_delay_secs); loop { let req = request.try_clone().expect("request not cloneable"); - let (err, retry): (Error, bool) = match self.cli.execute(req).await { + let decision = match self.cli.execute(req).await { Ok(response) => { let status = response.status(); if status == StatusCode::OK { @@ -1107,7 +1141,10 @@ impl APIClient { let body = response.bytes().await?; if retry_if_503 && status == StatusCode::SERVICE_UNAVAILABLE { // waiting for server to start - (Error::response_error(status, &body), true) + RetryDecision::retry_with_reason( + Error::response_error(status, &body), + "service unavailable (503), server may be starting", + ) } else { let resp = serde_json::from_slice::(&body); match resp { @@ -1116,91 +1153,116 @@ impl APIClient { if status == StatusCode::UNAUTHORIZED { request.headers_mut().remove(reqwest::header::AUTHORIZATION); if let Some(session_token_info) = &self.session_token_info { - info!( - "will retry {} after refresh token on auth error {}", - request.url(), - e - ); - let retry = if need_refresh_token(e.code) + if need_refresh_token(e.code) && !refreshed && refresh_if_401 { self.refresh_session_token(session_token_info.clone()) .await?; refreshed = true; - true + RetryDecision::retry_with_reason( + Error::AuthFailure(e), + "session token expired and refreshed", + ) } else { - false - }; - (Error::AuthFailure(e), retry) + RetryDecision::no_retry(Error::AuthFailure(e)) + } } else if self.auth.can_reload() { - info!( - "will retry {} after reload token on auth error {}", - request.url(), - e - ); let builder = RequestBuilder::from_parts( HttpClient::new(), request.try_clone().unwrap(), ); let builder = self.auth.wrap(builder)?; request = builder.build()?; - (Error::AuthFailure(e), true) + RetryDecision::retry_with_reason( + Error::AuthFailure(e), + "authentication token reloaded", + ) } else { - (Error::AuthFailure(e), false) + RetryDecision::no_retry(Error::AuthFailure(e)) } } else { - (Error::Logic(status, e), false) + RetryDecision::no_retry(Error::Logic(status, e)) } } - Err(_) => ( - Error::Response { - status, - msg: String::from_utf8_lossy(&body).to_string(), - }, - false, - ), + Err(_) => RetryDecision::no_retry(Error::Response { + status, + msg: String::from_utf8_lossy(&body).to_string(), + }), } } } - Err(err) => ( - Error::Request(err.to_string()), - err.is_timeout() || err.is_connect() || err.is_request(), - ), + Err(err) => { + let is_retryable = err.is_timeout() || err.is_connect() || err.is_request(); + if is_retryable { + let reason = if err.is_timeout() { + "request timeout" + } else if err.is_connect() { + "connection error" + } else { + "request error" + }; + RetryDecision::retry_with_reason( + Error::Request(err.to_string()), + reason, + ) + } else { + RetryDecision::no_retry(Error::Request(err.to_string())) + } + } }; - if !retry { - return Err(err.with_context(&format!("{} {}", request.method(), request.url()))); + if !decision.should_retry { + return Err(decision.error.with_context(&format!("{} {}", request.method(), request.url()))); } - match &err { + match &decision.error { Error::AuthFailure(_) => { if refreshed { retries = 0; - } else if retries == 2 { - return Err(err.with_context(&format!( - "{} {} after 3 retries", + } else if retries >= max_retries - 1 { + return Err(decision.error.with_context(&format!( + "{} {} after {} retries", request.method(), - request.url() + request.url(), + max_retries ))); } } _ => { - if retries == 2 { - return Err(err.with_context(&format!( - "{} {} after 3 reties", + if retries >= max_retries - 1 { + return Err(decision.error.with_context(&format!( + "{} {} after {} retries", request.method(), - request.url() + request.url(), + max_retries ))); } retries += 1; - info!( - "will retry {} the {retries}th times on error {}", - request.url(), - err - ); + if let Some(reason) = decision.reason { + info!( + "retry {}/{} for {} due to: {} (error: {})", + retries, + max_retries, + request.url(), + reason, + decision.error + ); + } else { + info!( + "retry {}/{} for {} on error: {}", + retries, + max_retries, + request.url(), + decision.error + ); + } } } - warn!("will retry after 10 seconds"); - sleep(jitter(Duration::from_secs(10))).await; + if let Some(reason) = decision.reason { + warn!("retrying after {} seconds due to: {}", retry_delay.as_secs(), reason); + } else { + warn!("retrying after {} seconds", retry_delay.as_secs()); + } + sleep(jitter(retry_delay)).await; } } @@ -1301,6 +1363,8 @@ impl Default for APIClient { server_version: None, capability: Default::default(), queries_need_heartbeat: Default::default(), + retry_count: 3, + retry_delay_secs: 10, } } } diff --git a/core/src/lib.rs b/core/src/lib.rs index a54fbbd6..193bec95 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -23,6 +23,7 @@ mod pages; mod presign; mod request; mod response; +mod retry; mod capability; mod client_mgr; diff --git a/core/src/retry.rs b/core/src/retry.rs new file mode 100644 index 00000000..6274e942 --- /dev/null +++ b/core/src/retry.rs @@ -0,0 +1,39 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::Error; + +pub(crate) struct RetryDecision<'a> { + pub(crate) error: Error, + pub(crate) should_retry: bool, + pub(crate) reason: Option<&'a str>, +} + +impl<'a> RetryDecision<'a> { + pub(crate) fn no_retry(error: Error) -> Self { + Self { + error, + should_retry: false, + reason: None, + } + } + + pub(crate) fn retry_with_reason(error: Error, reason: &'a str) -> Self { + Self { + error, + should_retry: true, + reason: Some(reason), + } + } +} From 3aa79a9fe394ffde0dc8e81a7b19d3b685a5cc7a Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 15 Jan 2026 12:01:30 +0800 Subject: [PATCH 2/3] chore: apply cargo fmt Co-Authored-By: Claude Sonnet 4.5 --- core/src/client.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/core/src/client.rs b/core/src/client.rs index 15bbd894..5df378e7 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -1094,7 +1094,10 @@ impl APIClient { ); } }; - warn!("retrying session token refresh after {} seconds", retry_delay.as_secs()); + warn!( + "retrying session token refresh after {} seconds", + retry_delay.as_secs() + ); sleep(jitter(retry_delay)).await; } Ok(()) @@ -1202,17 +1205,18 @@ impl APIClient { } else { "request error" }; - RetryDecision::retry_with_reason( - Error::Request(err.to_string()), - reason, - ) + RetryDecision::retry_with_reason(Error::Request(err.to_string()), reason) } else { RetryDecision::no_retry(Error::Request(err.to_string())) } } }; if !decision.should_retry { - return Err(decision.error.with_context(&format!("{} {}", request.method(), request.url()))); + return Err(decision.error.with_context(&format!( + "{} {}", + request.method(), + request.url() + ))); } match &decision.error { Error::AuthFailure(_) => { @@ -1258,7 +1262,11 @@ impl APIClient { } } if let Some(reason) = decision.reason { - warn!("retrying after {} seconds due to: {}", retry_delay.as_secs(), reason); + warn!( + "retrying after {} seconds due to: {}", + retry_delay.as_secs(), + reason + ); } else { warn!("retrying after {} seconds", retry_delay.as_secs()); } From 7194ae043d55bf3ea3295382f4a392b04a406f33 Mon Sep 17 00:00:00 2001 From: everpcpc Date: Thu, 15 Jan 2026 12:17:22 +0800 Subject: [PATCH 3/3] fix: prevent underflow when retry_count=0 When retry_count=0, the expression `max_retries - 1` would underflow: - In debug builds: panic on subtraction - In release builds: wrap to u32::MAX, causing infinite retry loops This fix uses saturating_sub(1) which returns 0 when max_retries=0, allowing retry_count=0 to work correctly (no retries, fail immediately). Fixes applied in: - refresh_session_token: two comparisons with max_retries - 1 - query_request_helper: two comparisons with max_retries - 1 Co-Authored-By: Claude Sonnet 4.5 --- core/src/client.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/client.rs b/core/src/client.rs index 5df378e7..5e111e24 100644 --- a/core/src/client.rs +++ b/core/src/client.rs @@ -1065,7 +1065,9 @@ impl APIClient { } }; } - if status != StatusCode::SERVICE_UNAVAILABLE || i >= max_retries - 1 { + if status != StatusCode::SERVICE_UNAVAILABLE + || i >= max_retries.saturating_sub(1) + { return Err(Error::response_error(status, &body)); } info!( @@ -1075,7 +1077,8 @@ impl APIClient { ); } Err(err) => { - if !(err.is_timeout() || err.is_connect()) || i >= max_retries - 1 { + if !(err.is_timeout() || err.is_connect()) || i >= max_retries.saturating_sub(1) + { return Err(Error::Request(err.to_string())); } let reason = if err.is_timeout() { @@ -1222,7 +1225,7 @@ impl APIClient { Error::AuthFailure(_) => { if refreshed { retries = 0; - } else if retries >= max_retries - 1 { + } else if retries >= max_retries.saturating_sub(1) { return Err(decision.error.with_context(&format!( "{} {} after {} retries", request.method(), @@ -1232,7 +1235,7 @@ impl APIClient { } } _ => { - if retries >= max_retries - 1 { + if retries >= max_retries.saturating_sub(1) { return Err(decision.error.with_context(&format!( "{} {} after {} retries", request.method(),