From 067c35bd045d5c0c15586e1a8cb3006ffb38a078 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Mar 2026 11:18:13 -0400 Subject: [PATCH 1/2] Use separate tokio runtime for IO in datafusion-cli --- Cargo.lock | 2 + datafusion-cli/Cargo.toml | 2 + datafusion-cli/src/main.rs | 2 + datafusion-cli/src/object_storage.rs | 369 ++++++++++++++++++++++++++- 4 files changed, 361 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c06c7856e30c..7ba66224fb269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1888,6 +1888,7 @@ dependencies = [ "async-trait", "aws-config", "aws-credential-types", + "bytes", "chrono", "clap", "ctor", @@ -1906,6 +1907,7 @@ dependencies = [ "regex", "rstest", "rustyline", + "tempfile", "testcontainers-modules", "tokio", "url", diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 3fe6be964c3f6..d0d7c8cc7eb26 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -39,6 +39,7 @@ arrow = { workspace = true } async-trait = { workspace = true } aws-config = "1.8.14" aws-credential-types = "1.2.13" +bytes = { workspace = true } chrono = { workspace = true } clap = { version = "4.5.60", features = ["cargo", "derive"] } datafusion = { workspace = true, features = [ @@ -77,4 +78,5 @@ ctor = { workspace = true } insta = { workspace = true } insta-cmd = "0.6.0" rstest = { workspace = true } +tempfile = { workspace = true } testcontainers-modules = { workspace = true, features = ["minio"] } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 6bfe1160ecdd6..dd2df145a1167 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -34,6 +34,7 @@ use datafusion_cli::catalog::DynamicObjectStoreCatalog; use datafusion_cli::functions::{ ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc, }; +use datafusion_cli::object_storage::init_object_store_io_runtime; use datafusion_cli::object_storage::instrumented::{ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, }; @@ -172,6 +173,7 @@ pub async fn main() -> ExitCode { /// Main CLI entrypoint async fn main_inner() -> Result<()> { env_logger::init(); + init_object_store_io_runtime(); let args = Args::parse(); if !args.quiet { diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 34787838929f1..8d920360c38fe 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -22,6 +22,7 @@ use aws_config::BehaviorVersion; use aws_credential_types::provider::{ ProvideCredentials, SharedCredentialsProvider, error::CredentialsError, }; +use bytes::Bytes; use datafusion::{ common::{ config::ConfigEntry, config::ConfigExtension, config::ConfigField, @@ -31,20 +32,33 @@ use datafusion::{ error::{DataFusionError, Result}, execution::context::SessionState, }; +use futures::{ + FutureExt, StreamExt, + stream::{self, BoxStream}, +}; use log::debug; use object_store::{ ClientOptions, CredentialProvider, Error::Generic, - ObjectStore, + GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, UploadPart, aws::{AmazonS3Builder, AmazonS3ConfigKey, AwsCredential}, + client::SpawnedReqwestConnector, gcp::GoogleCloudStorageBuilder, http::HttpBuilder, + path::Path, }; use std::{ any::Any, error::Error, fmt::{Debug, Display}, - sync::Arc, + future::Future, + ops::Range, + sync::{Arc, LazyLock}, +}; +use tokio::{ + runtime::{Builder as RuntimeBuilder, Handle, Runtime}, + sync::{Mutex, mpsc}, }; use url::Url; @@ -60,6 +74,305 @@ async fn resolve_bucket_region( Ok("eu-central-1".to_string()) } +const OBJECT_STORE_IO_THREADS: usize = 4; +const OBJECT_STORE_CHANNEL_CAPACITY: usize = 16; + +static OBJECT_STORE_IO_RUNTIME: LazyLock = LazyLock::new(|| { + RuntimeBuilder::new_multi_thread() + .worker_threads(OBJECT_STORE_IO_THREADS) + .thread_name("df-cli-object-store-io") + .enable_all() + .build() + .expect("object store IO runtime should initialize") +}); + +pub fn init_object_store_io_runtime() { + let _ = &*OBJECT_STORE_IO_RUNTIME; +} + +fn object_store_io_handle() -> Handle { + OBJECT_STORE_IO_RUNTIME.handle().clone() +} + +fn spawned_reqwest_connector() -> SpawnedReqwestConnector { + SpawnedReqwestConnector::new(object_store_io_handle()) +} + +fn join_error(err: tokio::task::JoinError) -> object_store::Error { + Generic { + store: "TokioRuntime", + source: Box::new(err), + } + .into() +} + +async fn spawn_on_io_runtime(future: F) -> object_store::Result +where + F: Future> + Send + 'static, + T: Send + 'static, +{ + object_store_io_handle() + .spawn(future) + .await + .map_err(join_error)? +} + +fn forward_stream_on_io_runtime( + build_stream: F, +) -> BoxStream<'static, object_store::Result> +where + T: Send + 'static, + F: FnOnce() -> BoxStream<'static, object_store::Result> + Send + 'static, +{ + let (tx, rx) = mpsc::channel(OBJECT_STORE_CHANNEL_CAPACITY); + object_store_io_handle().spawn(async move { + let mut stream = build_stream(); + while let Some(item) = stream.next().await { + if tx.send(item).await.is_err() { + break; + } + } + }); + + stream::unfold(rx, |mut rx| async move { + rx.recv().await.map(|item| (item, rx)) + }) + .boxed() +} + +fn normalize_get_result(result: GetResult) -> GetResult { + let GetResult { + payload, + meta, + range, + attributes, + } = result; + + let payload = match payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(file, path) => { + let meta_for_stream = meta.clone(); + let range_for_stream = range.clone(); + let attributes_for_stream = attributes.clone(); + GetResultPayload::Stream(forward_stream_on_io_runtime(move || { + GetResult { + payload: GetResultPayload::File(file, path), + meta: meta_for_stream, + range: range_for_stream, + attributes: attributes_for_stream, + } + .into_stream() + })) + } + GetResultPayload::Stream(stream) => { + GetResultPayload::Stream(forward_stream_on_io_runtime(move || stream)) + } + }; + + GetResult { + payload, + meta, + range, + attributes, + } +} + +#[derive(Debug)] +struct IoMultipartUpload { + inner: Arc>>, + handle: Handle, +} + +#[async_trait] +impl MultipartUpload for IoMultipartUpload { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let inner = Arc::clone(&self.inner); + let handle = self.handle.clone(); + async move { + let future = { + let mut upload = inner.lock().await; + upload.put_part(data) + }; + handle.spawn(future).await.map_err(join_error)? + } + .boxed() + } + + async fn complete(&mut self) -> object_store::Result { + let inner = Arc::clone(&self.inner); + self.handle + .spawn(async move { + let mut upload = inner.lock().await; + upload.complete().await + }) + .await + .map_err(join_error)? + } + + async fn abort(&mut self) -> object_store::Result<()> { + let inner = Arc::clone(&self.inner); + self.handle + .spawn(async move { + let mut upload = inner.lock().await; + upload.abort().await + }) + .await + .map_err(join_error)? + } +} + +#[derive(Debug)] +struct IoObjectStore { + inner: Arc, + handle: Handle, +} + +impl IoObjectStore { + fn new(inner: Arc) -> Self { + Self { + inner, + handle: object_store_io_handle(), + } + } +} + +impl Display for IoObjectStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Display::fmt(&self.inner, f) + } +} + +#[async_trait] +#[deny(clippy::missing_trait_methods)] +impl ObjectStore for IoObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> object_store::Result { + let inner = Arc::clone(&self.inner); + let location = location.clone(); + spawn_on_io_runtime(async move { inner.put_opts(&location, payload, opts).await }) + .await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store::Result> { + let inner = Arc::clone(&self.inner); + let location = location.clone(); + let handle = self.handle.clone(); + let upload = spawn_on_io_runtime(async move { + inner.put_multipart_opts(&location, opts).await + }) + .await?; + Ok(Box::new(IoMultipartUpload { + inner: Arc::new(Mutex::new(upload)), + handle, + })) + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + let inner = Arc::clone(&self.inner); + let location = location.clone(); + let result = + spawn_on_io_runtime(async move { inner.get_opts(&location, options).await }) + .await?; + Ok(normalize_get_result(result)) + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> object_store::Result> { + let inner = Arc::clone(&self.inner); + let location = location.clone(); + let ranges = ranges.to_vec(); + spawn_on_io_runtime(async move { inner.get_ranges(&location, &ranges).await }) + .await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, object_store::Result>, + ) -> BoxStream<'static, object_store::Result> { + let inner = Arc::clone(&self.inner); + forward_stream_on_io_runtime(move || inner.delete_stream(locations)) + } + + fn list( + &self, + prefix: Option<&Path>, + ) -> BoxStream<'static, object_store::Result> { + let inner = Arc::clone(&self.inner); + let prefix = prefix.cloned(); + forward_stream_on_io_runtime(move || inner.list(prefix.as_ref())) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, object_store::Result> { + let inner = Arc::clone(&self.inner); + let prefix = prefix.cloned(); + let offset = offset.clone(); + forward_stream_on_io_runtime(move || { + inner.list_with_offset(prefix.as_ref(), &offset) + }) + } + + async fn list_with_delimiter( + &self, + prefix: Option<&Path>, + ) -> object_store::Result { + let inner = Arc::clone(&self.inner); + let prefix = prefix.cloned(); + spawn_on_io_runtime( + async move { inner.list_with_delimiter(prefix.as_ref()).await }, + ) + .await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: object_store::CopyOptions, + ) -> object_store::Result<()> { + let inner = Arc::clone(&self.inner); + let from = from.clone(); + let to = to.clone(); + spawn_on_io_runtime(async move { inner.copy_opts(&from, &to, options).await }) + .await + } + + async fn rename_opts( + &self, + from: &Path, + to: &Path, + options: object_store::RenameOptions, + ) -> object_store::Result<()> { + let inner = Arc::clone(&self.inner); + let from = from.clone(); + let to = to.clone(); + spawn_on_io_runtime(async move { inner.rename_opts(&from, &to, options).await }) + .await + } +} + +fn wrap_object_store(store: Arc) -> Arc { + Arc::new(IoObjectStore::new(store)) +} + pub async fn get_s3_object_store_builder( url: &Url, aws_options: &AwsOptions, @@ -161,6 +474,8 @@ async fn get_s3_object_store_builder_inner( builder = builder.with_skip_signature(*skip_signature); } + builder = builder.with_http_connector(spawned_reqwest_connector()); + Ok(builder) } @@ -286,6 +601,8 @@ fn get_object_store_builder( builder = builder.with_endpoint(endpoint); } + builder = builder.with_http_connector(spawned_reqwest_connector()); + Ok(builder) } @@ -308,6 +625,8 @@ pub fn get_gcs_object_store_builder( builder = builder.with_application_credentials(application_credentials_path); } + builder = builder.with_http_connector(spawned_reqwest_connector()); + Ok(builder) } @@ -513,6 +832,7 @@ impl ConfigExtension for GcpOptions { const PREFIX: &'static str = "gcp"; } +/// Given a URL scheme, URL, and table_options, return the appropriate [`ObjectStore`] instance pub(crate) async fn get_object_store( state: &SessionState, scheme: &str, @@ -520,7 +840,7 @@ pub(crate) async fn get_object_store( table_options: &TableOptions, resolve_region: bool, ) -> Result, DataFusionError> { - let store: Arc = match scheme { + match scheme { "s3" => { let Some(options) = table_options.extensions.get::() else { return exec_err!( @@ -529,7 +849,7 @@ pub(crate) async fn get_object_store( }; let builder = get_s3_object_store_builder(url, options, resolve_region).await?; - Arc::new(builder.build()?) + Ok(Arc::new(builder.build()?)) } "oss" => { let Some(options) = table_options.extensions.get::() else { @@ -538,7 +858,7 @@ pub(crate) async fn get_object_store( ); }; let builder = get_oss_object_store_builder(url, options)?; - Arc::new(builder.build()?) + Ok(Arc::new(builder.build()?)) } "cos" => { let Some(options) = table_options.extensions.get::() else { @@ -547,7 +867,7 @@ pub(crate) async fn get_object_store( ); }; let builder = get_cos_object_store_builder(url, options)?; - Arc::new(builder.build()?) + Ok(Arc::new(builder.build()?)) } "gs" | "gcs" => { let Some(options) = table_options.extensions.get::() else { @@ -556,26 +876,27 @@ pub(crate) async fn get_object_store( ); }; let builder = get_gcs_object_store_builder(url, options)?; - Arc::new(builder.build()?) + Ok(Arc::new(builder.build()?)) } - "http" | "https" => Arc::new( + "http" | "https" => Ok(Arc::new( HttpBuilder::new() .with_client_options(ClientOptions::new().with_allow_http(true)) + .with_http_connector(spawned_reqwest_connector()) .with_url(url.origin().ascii_serialization()) .build()?, - ), + )), _ => { // For other types, try to get from `object_store_registry`: - state + let store = state .runtime_env() .object_store_registry .get_store(url) .map_err(|_| { exec_datafusion_err!("Unsupported object store scheme: {}", scheme) - })? + })?; + Ok(wrap_object_store(store)) } - }; - Ok(store) + } } #[cfg(test)] @@ -590,7 +911,10 @@ mod tests { prelude::SessionContext, }; - use object_store::{aws::AmazonS3ConfigKey, gcp::GoogleConfigKey}; + use object_store::{ + ObjectStoreExt, aws::AmazonS3ConfigKey, gcp::GoogleConfigKey, + local::LocalFileSystem, + }; #[tokio::test] async fn s3_object_store_builder_default() -> Result<()> { @@ -660,6 +984,23 @@ mod tests { Ok(()) } + #[tokio::test] + async fn io_object_store_wraps_local_get_result_as_stream() -> Result<()> { + let temp_dir = tempfile::tempdir().unwrap(); + let store = wrap_object_store(Arc::new( + LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap(), + )); + let location = Path::from("test.txt"); + + store.put(&location, "hello".into()).await?; + let result = store.get_opts(&location, GetOptions::default()).await?; + + assert!(matches!(&result.payload, GetResultPayload::Stream(_))); + assert_eq!(result.bytes().await?, Bytes::from_static(b"hello")); + + Ok(()) + } + #[tokio::test] async fn s3_object_store_builder() -> Result<()> { // "fake" is uppercase to ensure the values are not lowercased when parsed From 2808312c44de91fe7d75e293cd3237733b13caeb Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Mar 2026 11:21:49 -0400 Subject: [PATCH 2/2] clippy --- datafusion-cli/src/object_storage.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion-cli/src/object_storage.rs b/datafusion-cli/src/object_storage.rs index 8d920360c38fe..8ca9901a1cba3 100644 --- a/datafusion-cli/src/object_storage.rs +++ b/datafusion-cli/src/object_storage.rs @@ -103,7 +103,6 @@ fn join_error(err: tokio::task::JoinError) -> object_store::Error { store: "TokioRuntime", source: Box::new(err), } - .into() } async fn spawn_on_io_runtime(future: F) -> object_store::Result