From 97bb07682855a6c6a8ca40d9db26c2b9353ac5b0 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 17 Mar 2026 15:39:15 -0700 Subject: [PATCH 1/5] Add test suite --- Cargo.lock | 129 +++++- crates/storage/common/Cargo.toml | 47 +++ crates/storage/common/src/lib.rs | 16 + crates/storage/common/tests/common/mod.rs | 171 ++++++++ .../storage/common/tests/credential_suite.rs | 142 +++++++ crates/storage/common/tests/file_io_suite.rs | 349 +++++++++++++++++ .../storage/common/tests/resolving_suite.rs | 368 ++++++++++++++++++ 7 files changed, 1205 insertions(+), 17 deletions(-) create mode 100644 crates/storage/common/Cargo.toml create mode 100644 crates/storage/common/src/lib.rs create mode 100644 crates/storage/common/tests/common/mod.rs create mode 100644 crates/storage/common/tests/credential_suite.rs create mode 100644 crates/storage/common/tests/file_io_suite.rs create mode 100644 crates/storage/common/tests/resolving_suite.rs diff --git a/Cargo.lock b/Cargo.lock index 39812b010f..06b8c4dd14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -109,7 +109,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -120,7 +120,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -968,6 +968,21 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" +[[package]] +name = "bit-set" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3" +dependencies = [ + "bit-vec", +] + +[[package]] +name = "bit-vec" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" + [[package]] name = "bitflags" version = "2.11.0" @@ -1044,7 +1059,7 @@ version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d314cc62af2b6b0c65780555abb4d02a03dd3b799cd42419044f0c38d99738c0" dependencies = [ - "darling 0.20.11", + "darling 0.23.0", "ident_case", "prettyplease", "proc-macro2", @@ -1257,7 +1272,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -2418,7 +2433,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2571,7 +2586,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3248,7 +3263,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.2", "tokio", "tower-service", "tracing", @@ -3540,6 +3555,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "iceberg-storage-common" +version = "0.9.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "futures", + "iceberg", + "iceberg-storage-opendal", + "iceberg_test_utils", + "proptest", + "reqsign", + "reqwest", + "rstest", + "tempfile", + "tokio", +] + [[package]] name = "iceberg-storage-opendal" version = "0.9.0" @@ -3811,7 +3845,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4338,7 +4372,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4958,6 +4992,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proptest" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532" +dependencies = [ + "bit-set", + "bit-vec", + "bitflags", + "num-traits", + "rand 0.9.2", + "rand_chacha 0.9.0", + "rand_xorshift", + "regex-syntax", + "rusty-fork", + "tempfile", + "unarray", +] + [[package]] name = "prost" version = "0.14.3" @@ -4975,7 +5028,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.13.0", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -4994,7 +5047,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn", @@ -5045,6 +5098,12 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a651516ddc9168ebd67b24afd085a718be02f8858fe406591b013d101ce2f40" +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + [[package]] name = "quick-xml" version = "0.37.5" @@ -5078,7 +5137,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.37", - "socket2 0.5.10", + "socket2 0.6.2", "thiserror 2.0.18", "tokio", "tracing", @@ -5115,7 +5174,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.2", "tracing", "windows-sys 0.60.2", ] @@ -5215,6 +5274,15 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_xorshift" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a" +dependencies = [ + "rand_core 0.9.5", +] + [[package]] name = "recursive" version = "0.1.1" @@ -5557,7 +5625,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5646,6 +5714,18 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rusty-fork" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bf79ff24e648f6da1f8d1f011e9cac26491b619e6b9280f2b47f1774e6ee2" +dependencies = [ + "fnv", + "quick-error", + "tempfile", + "wait-timeout", +] + [[package]] name = "rustyline" version = "17.0.2" @@ -6543,10 +6623,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.1", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7041,6 +7121,12 @@ dependencies = [ "typify-impl", ] +[[package]] +name = "unarray" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" + [[package]] name = "unicode-bidi" version = "0.3.18" @@ -7232,6 +7318,15 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "wait-timeout" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ac3b126d3914f9849036f826e054cbabdc8519970b8998ddaf3b5bd3c65f11" +dependencies = [ + "libc", +] + [[package]] name = "walkdir" version = "2.5.0" @@ -7441,7 +7536,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/crates/storage/common/Cargo.toml b/crates/storage/common/Cargo.toml new file mode 100644 index 0000000000..5019f1fb03 --- /dev/null +++ b/crates/storage/common/Cargo.toml @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-storage-common" +edition = { workspace = true } +version = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } +homepage = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg Storage Common Test Suite" +keywords = ["iceberg", "storage"] + +[dependencies] +iceberg = { workspace = true } + +[dev-dependencies] +anyhow = { workspace = true } +iceberg = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["opendal-all"] } +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +rstest = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +bytes = { workspace = true } +futures = { workspace = true } +async-trait = { workspace = true } +reqsign = { version = "0.16.3", default-features = false } +reqwest = { workspace = true } +tempfile = { workspace = true } +proptest = { version = "1" } diff --git a/crates/storage/common/src/lib.rs b/crates/storage/common/src/lib.rs new file mode 100644 index 0000000000..b248758bc1 --- /dev/null +++ b/crates/storage/common/src/lib.rs @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. diff --git a/crates/storage/common/tests/common/mod.rs b/crates/storage/common/tests/common/mod.rs new file mode 100644 index 0000000000..dbcb7cd28b --- /dev/null +++ b/crates/storage/common/tests/common/mod.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared helpers for storage integration suites. + +#![allow(dead_code)] + +use std::collections::HashMap; +use std::sync::Arc; + +use iceberg::io::{ + FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, S3_ACCESS_KEY_ID, S3_ENDPOINT, + S3_REGION, S3_SECRET_ACCESS_KEY, +}; +use iceberg_storage_opendal::{OpenDalResolvingStorageFactory, OpenDalStorageFactory}; +use iceberg_test_utils::{get_gcs_endpoint, get_minio_endpoint, set_up}; +use tempfile::TempDir; +use tokio::time::sleep; + +static FAKE_GCS_BUCKET: &str = "test-bucket"; + +#[derive(Debug, Clone, Copy)] +pub enum StorageKind { + OpenDalS3, + OpenDalGcs, + OpenDalResolving, +} + +pub struct StorageHarness { + pub file_io: FileIO, + pub label: &'static str, + pub base_path: String, + pub _tempdirs: Vec, +} + +/// Creates a `FileIO` for the given backend. Returns `None` when infrastructure +/// is unavailable (e.g., MinIO Docker container not running). +pub async fn load_storage(kind: StorageKind) -> Option { + set_up(); + match kind { + StorageKind::OpenDalS3 => load_opendal_s3().await, + StorageKind::OpenDalGcs => load_opendal_gcs().await, + StorageKind::OpenDalResolving => load_opendal_resolving().await, + } +} + +async fn load_opendal_s3() -> Option { + let minio_endpoint = get_minio_endpoint(); + + let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + customized_credential_load: None, + })) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_ACCESS_KEY_ID, "admin".to_string()), + (S3_SECRET_ACCESS_KEY, "password".to_string()), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + // Poll until MinIO is ready + let mut retries = 0; + while retries < 30 { + if file_io.exists("s3://bucket1/").await.unwrap_or(false) { + return Some(StorageHarness { + file_io, + label: "opendal_s3", + base_path: "s3://bucket1/".to_string(), + _tempdirs: Vec::new(), + }); + } + sleep(std::time::Duration::from_secs(1)).await; + retries += 1; + } + + None +} + +async fn load_opendal_gcs() -> Option { + let gcs_endpoint = get_gcs_endpoint(); + + // Create the test bucket via HTTP + let mut bucket_data = HashMap::new(); + bucket_data.insert("name", FAKE_GCS_BUCKET); + + let client = reqwest::Client::new(); + let endpoint = format!("{gcs_endpoint}/storage/v1/b"); + if client.post(&endpoint).json(&bucket_data).send().await.is_err() { + return None; + } + + let file_io = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::Gcs)) + .with_props(vec![ + (GCS_SERVICE_PATH, gcs_endpoint), + (GCS_NO_AUTH, "true".to_string()), + ]) + .build(); + + // Poll until GCS emulator is ready + let base_path = format!("gs://{FAKE_GCS_BUCKET}/"); + let mut retries = 0; + while retries < 30 { + if file_io.exists(&base_path).await.unwrap_or(false) { + return Some(StorageHarness { + file_io, + label: "opendal_gcs", + base_path, + _tempdirs: Vec::new(), + }); + } + sleep(std::time::Duration::from_secs(1)).await; + retries += 1; + } + + None +} + +async fn load_opendal_resolving() -> Option { + let minio_endpoint = get_minio_endpoint(); + + let file_io = FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_ACCESS_KEY_ID, "admin".to_string()), + (S3_SECRET_ACCESS_KEY, "password".to_string()), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + // Poll until MinIO is ready + let mut retries = 0; + while retries < 30 { + if file_io.exists("s3://bucket1/").await.unwrap_or(false) { + return Some(StorageHarness { + file_io, + label: "opendal_resolving", + base_path: "s3://bucket1/".to_string(), + _tempdirs: Vec::new(), + }); + } + sleep(std::time::Duration::from_secs(1)).await; + retries += 1; + } + + None +} + +/// Generates a unique file path under `harness.base_path` to avoid conflicts +/// between concurrent test runs. +pub fn unique_path(harness: &StorageHarness, test_name: &str) -> String { + format!( + "{}{}", + harness.base_path, + iceberg_test_utils::normalize_test_name(test_name) + ) +} diff --git a/crates/storage/common/tests/credential_suite.rs b/crates/storage/common/tests/credential_suite.rs new file mode 100644 index 0000000000..1a1ebbb2cf --- /dev/null +++ b/crates/storage/common/tests/credential_suite.rs @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Custom AWS credential loader tests (OpenDAL S3 only). + +mod common; + +use std::sync::Arc; + +use async_trait::async_trait; +use common::{load_storage, StorageKind}; +use iceberg::io::{FileIOBuilder, S3_ENDPOINT, S3_REGION}; +use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory}; +use iceberg_test_utils::get_minio_endpoint; +use reqsign::{AwsCredential, AwsCredentialLoad}; +use reqwest::Client; +use rstest::rstest; + +/// Mock credential loader for testing custom AWS credential injection. +struct MockCredentialLoader { + credential: Option, +} + +impl MockCredentialLoader { + fn new(credential: Option) -> Self { + Self { credential } + } + + fn new_minio() -> Self { + Self::new(Some(AwsCredential { + access_key_id: "admin".to_string(), + secret_access_key: "password".to_string(), + session_token: None, + expires_in: None, + })) + } +} + +#[async_trait] +impl AwsCredentialLoad for MockCredentialLoader { + async fn load_credential(&self, _client: Client) -> anyhow::Result> { + Ok(self.credential.clone()) + } +} + +#[test] +fn test_custom_aws_credential_loader_instantiation() { + let mock_loader = MockCredentialLoader::new_minio(); + let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader)); + + let _builder = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + customized_credential_load: Some(custom_loader), + })) + .with_props(vec![ + (S3_ENDPOINT, "http://localhost:9000".to_string()), + ("bucket", "test-bucket".to_string()), + (S3_REGION, "us-east-1".to_string()), + ]); +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[tokio::test] +async fn test_s3_with_custom_credential_loader_success( + #[case] kind: StorageKind, +) -> iceberg::Result<()> { + let Some(_harness) = load_storage(kind).await else { + return Ok(()); + }; + + let mock_loader = MockCredentialLoader::new_minio(); + let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader)); + let minio_endpoint = get_minio_endpoint(); + + let file_io_with_custom_creds = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + customized_credential_load: Some(custom_loader), + })) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + match file_io_with_custom_creds.exists("s3://bucket1/any").await { + Ok(_) => {} + Err(e) => panic!("Failed to check existence of bucket: {e}"), + } + + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[tokio::test] +async fn test_s3_with_custom_credential_loader_failure( + #[case] kind: StorageKind, +) -> iceberg::Result<()> { + let Some(_harness) = load_storage(kind).await else { + return Ok(()); + }; + + let mock_loader = MockCredentialLoader::new(None); + let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader)); + let minio_endpoint = get_minio_endpoint(); + + let file_io_with_custom_creds = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { + configured_scheme: "s3".to_string(), + customized_credential_load: Some(custom_loader), + })) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + match file_io_with_custom_creds.exists("s3://bucket1/any").await { + Ok(_) => panic!("Expected error, but got Ok"), + Err(e) => { + assert!(e + .to_string() + .contains("no valid credential found and anonymous access is not allowed")); + } + } + + Ok(()) +} diff --git a/crates/storage/common/tests/file_io_suite.rs b/crates/storage/common/tests/file_io_suite.rs new file mode 100644 index 0000000000..52c8248221 --- /dev/null +++ b/crates/storage/common/tests/file_io_suite.rs @@ -0,0 +1,349 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared FileIO integration tests parameterized over storage backends. + +mod common; + +use std::sync::Arc; + +use common::{load_storage, unique_path, StorageKind}; +use futures::StreamExt; +use iceberg::io::{FileIOBuilder, LocalFsStorageFactory}; +use rstest::rstest; + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_exists(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + // Non-existent path should return false + let non_existent = unique_path(&harness, "non_existent_file_that_does_not_exist"); + assert!(!harness.file_io.exists(&non_existent).await.unwrap()); + // Bucket root should return true + assert!(harness.file_io.exists(&harness.base_path).await.unwrap()); + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_write(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_write"); + // Clean up from any previous test runs + let _ = harness.file_io.delete(&path).await; + assert!(!harness.file_io.exists(&path).await.unwrap()); + let output_file = harness.file_io.new_output(&path).unwrap(); + output_file.write("123".into()).await.unwrap(); + assert!(harness.file_io.exists(&path).await.unwrap()); + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_read(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_read"); + let output_file = harness.file_io.new_output(&path).unwrap(); + output_file.write("test_input".into()).await.unwrap(); + let input_file = harness.file_io.new_input(&path).unwrap(); + let buffer = input_file.read().await.unwrap(); + assert_eq!(buffer, "test_input".as_bytes()); + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_delete(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_delete"); + // Write a file + harness + .file_io + .new_output(&path) + .unwrap() + .write("delete_me".into()) + .await + .unwrap(); + assert!(harness.file_io.exists(&path).await.unwrap()); + // Delete it + harness.file_io.delete(&path).await.unwrap(); + // Verify it's gone + assert!(!harness.file_io.exists(&path).await.unwrap()); + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_delete_nonexistent(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_delete_nonexistent"); + // Delete a non-existent path should succeed (no-op) + harness.file_io.delete(&path).await.unwrap(); + Ok(()) +} + + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_delete_stream(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let base = unique_path(&harness, "test_file_io_delete_stream"); + let paths: Vec = (0..5).map(|i| format!("{base}/file-{i}")).collect(); + for path in &paths { + let _ = harness.file_io.delete(path).await; + harness + .file_io + .new_output(path) + .unwrap() + .write("delete-me".into()) + .await + .unwrap(); + assert!(harness.file_io.exists(path).await.unwrap()); + } + let stream = futures::stream::iter(paths.clone()).boxed(); + harness.file_io.delete_stream(stream).await.unwrap(); + for path in &paths { + assert!(!harness.file_io.exists(path).await.unwrap()); + } + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_delete_stream_empty(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let stream = futures::stream::empty().boxed(); + harness.file_io.delete_stream(stream).await.unwrap(); + Ok(()) +} + + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_delete_prefix(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let prefix = unique_path(&harness, "test_file_io_delete_prefix"); + // Write files under the prefix + let paths: Vec = (0..3).map(|i| format!("{prefix}/file-{i}")).collect(); + for path in &paths { + harness + .file_io + .new_output(path) + .unwrap() + .write("data".into()) + .await + .unwrap(); + assert!(harness.file_io.exists(path).await.unwrap()); + } + // Delete the prefix + harness.file_io.delete_prefix(&prefix).await.unwrap(); + // Verify all files are gone + for path in &paths { + assert!(!harness.file_io.exists(path).await.unwrap()); + } + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_delete_prefix_nonexistent(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let prefix = unique_path(&harness, "test_file_io_delete_prefix_nonexistent"); + // Delete a non-existent prefix should succeed (no-op) + harness.file_io.delete_prefix(&prefix).await.unwrap(); + Ok(()) +} + + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_metadata(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_metadata"); + let content = "metadata_test_content"; + harness + .file_io + .new_output(&path) + .unwrap() + .write(content.into()) + .await + .unwrap(); + let input_file = harness.file_io.new_input(&path).unwrap(); + let metadata = input_file.metadata().await.unwrap(); + assert_eq!(metadata.size, content.len() as u64); + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_metadata_nonexistent(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_metadata_nonexistent"); + let input_file = harness.file_io.new_input(&path).unwrap(); + let result = input_file.metadata().await; + assert!(result.is_err()); + Ok(()) +} + + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_range_read(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_range_read"); + let content = b"0123456789abcdef"; + harness + .file_io + .new_output(&path) + .unwrap() + .write(bytes::Bytes::from_static(content)) + .await + .unwrap(); + let input_file = harness.file_io.new_input(&path).unwrap(); + let reader = input_file.reader().await.unwrap(); + // Read a range in the middle + let range_data = reader.read(4..10).await.unwrap(); + assert_eq!(range_data.as_ref(), &content[4..10]); + Ok(()) +} + + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_streaming_write(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_streaming_write"); + let output_file = harness.file_io.new_output(&path).unwrap(); + let mut writer = output_file.writer().await.unwrap(); + writer + .write(bytes::Bytes::from("streaming_content")) + .await + .unwrap(); + writer.close().await.unwrap(); + // Read back and verify + let input_file = harness.file_io.new_input(&path).unwrap(); + let buffer = input_file.read().await.unwrap(); + assert_eq!(buffer, bytes::Bytes::from("streaming_content")); + Ok(()) +} + +#[rstest] +#[case::opendal_s3(StorageKind::OpenDalS3)] +#[case::opendal_gcs(StorageKind::OpenDalGcs)] +#[tokio::test] +async fn test_file_io_streaming_write_double_close( + #[case] kind: StorageKind, +) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + let path = unique_path(&harness, "test_file_io_streaming_write_double_close"); + let output_file = harness.file_io.new_output(&path).unwrap(); + let mut writer = output_file.writer().await.unwrap(); + writer + .write(bytes::Bytes::from("data")) + .await + .unwrap(); + writer.close().await.unwrap(); + // Second close should error + let result = writer.close().await; + assert!(result.is_err()); + Ok(()) +} + + +#[tokio::test] +async fn test_file_io_builder_with_prop() { + let builder = + FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).with_prop("key1", "value1"); + assert_eq!(builder.config().get("key1"), Some(&"value1".to_string())); +} + +#[tokio::test] +async fn test_file_io_builder_with_props() { + let builder = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).with_props(vec![ + ("key1", "value1"), + ("key2", "value2"), + ("key3", "value3"), + ]); + assert_eq!(builder.config().get("key1"), Some(&"value1".to_string())); + assert_eq!(builder.config().get("key2"), Some(&"value2".to_string())); + assert_eq!(builder.config().get("key3"), Some(&"value3".to_string())); +} + +#[tokio::test] +async fn test_file_io_builder_build_returns_file_io() { + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)) + .with_prop("some_key", "some_value") + .build(); + assert_eq!( + file_io.config().get("some_key"), + Some(&"some_value".to_string()) + ); +} diff --git a/crates/storage/common/tests/resolving_suite.rs b/crates/storage/common/tests/resolving_suite.rs new file mode 100644 index 0000000000..b16cae7358 --- /dev/null +++ b/crates/storage/common/tests/resolving_suite.rs @@ -0,0 +1,368 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! OpenDAL resolving storage tests (OpenDAL resolving backend only). + +mod common; + +use std::sync::Arc; + +use async_trait::async_trait; +use common::{load_storage, unique_path, StorageKind}; +use iceberg::io::{FileIOBuilder, S3_ENDPOINT, S3_REGION}; +use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalResolvingStorageFactory}; +use iceberg_test_utils::{get_minio_endpoint, set_up}; +use reqsign::{AwsCredential, AwsCredentialLoad}; +use reqwest::Client; +use rstest::rstest; + +fn temp_fs_path(name: &str) -> String { + let dir = std::env::temp_dir().join("iceberg_resolving_tests"); + std::fs::create_dir_all(&dir).unwrap(); + let path = dir.join(name); + // Clean up from previous runs + let _ = std::fs::remove_file(&path); + format!("file:/{}", path.display()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_mixed_scheme_write_and_read(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + + let s3_path = unique_path(&harness, "test_mixed_scheme_write_and_read"); + let fs_path = temp_fs_path("mixed_write_and_read.txt"); + let mem_path = "memory://test_mixed_scheme_write_and_read"; + + // Write to all three schemes + harness + .file_io + .new_output(&s3_path) + .unwrap() + .write("from_s3".into()) + .await + .unwrap(); + harness + .file_io + .new_output(&fs_path) + .unwrap() + .write("from_fs".into()) + .await + .unwrap(); + harness + .file_io + .new_output(mem_path) + .unwrap() + .write("from_memory".into()) + .await + .unwrap(); + + // Read back from all three + assert_eq!( + harness + .file_io + .new_input(&s3_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("from_s3") + ); + assert_eq!( + harness + .file_io + .new_input(&fs_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("from_fs") + ); + assert_eq!( + harness + .file_io + .new_input(mem_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("from_memory") + ); + + Ok(()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_mixed_scheme_exists_independently(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + + let s3_path = unique_path(&harness, "test_mixed_scheme_exists_independently"); + let fs_path = temp_fs_path("mixed_exists_independently.txt"); + let mem_path = "memory://test_mixed_scheme_exists_independently"; + + // Clean up S3 from previous runs + let _ = harness.file_io.delete(&s3_path).await; + + // None exist initially + assert!(!harness.file_io.exists(&s3_path).await.unwrap()); + assert!(!harness.file_io.exists(&fs_path).await.unwrap()); + assert!(!harness.file_io.exists(mem_path).await.unwrap()); + + // Write only to fs + harness + .file_io + .new_output(&fs_path) + .unwrap() + .write("fs_only".into()) + .await + .unwrap(); + + // Only fs exists + assert!(!harness.file_io.exists(&s3_path).await.unwrap()); + assert!(harness.file_io.exists(&fs_path).await.unwrap()); + assert!(!harness.file_io.exists(mem_path).await.unwrap()); + + Ok(()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_mixed_scheme_delete_one_keeps_others( + #[case] kind: StorageKind, +) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + + let s3_path = unique_path(&harness, "test_mixed_scheme_delete_one_keeps_others"); + let fs_path = temp_fs_path("mixed_delete_one_keeps_others.txt"); + let mem_path = "memory://test_mixed_scheme_delete_one_keeps_others"; + + // Write to all three + harness + .file_io + .new_output(&s3_path) + .unwrap() + .write("s3".into()) + .await + .unwrap(); + harness + .file_io + .new_output(&fs_path) + .unwrap() + .write("fs".into()) + .await + .unwrap(); + harness + .file_io + .new_output(mem_path) + .unwrap() + .write("mem".into()) + .await + .unwrap(); + + // Delete only the fs file + harness.file_io.delete(&fs_path).await.unwrap(); + + // fs gone, S3 and memory still there + assert!(harness.file_io.exists(&s3_path).await.unwrap()); + assert!(!harness.file_io.exists(&fs_path).await.unwrap()); + assert!(harness.file_io.exists(mem_path).await.unwrap()); + + assert_eq!( + harness + .file_io + .new_input(&s3_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("s3") + ); + assert_eq!( + harness + .file_io + .new_input(mem_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("mem") + ); + + Ok(()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_mixed_scheme_interleaved_operations( + #[case] kind: StorageKind, +) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + + let s3_path = unique_path(&harness, "test_mixed_scheme_interleaved"); + let fs_path = temp_fs_path("mixed_interleaved.txt"); + let mem_path = "memory://test_mixed_scheme_interleaved"; + + // Interleave: write fs, write memory, write s3 + harness + .file_io + .new_output(&fs_path) + .unwrap() + .write("fs_data".into()) + .await + .unwrap(); + harness + .file_io + .new_output(mem_path) + .unwrap() + .write("mem_data".into()) + .await + .unwrap(); + harness + .file_io + .new_output(&s3_path) + .unwrap() + .write("s3_data".into()) + .await + .unwrap(); + + // Read in reverse order: s3, memory, fs + assert_eq!( + harness + .file_io + .new_input(&s3_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("s3_data") + ); + assert_eq!( + harness + .file_io + .new_input(mem_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("mem_data") + ); + assert_eq!( + harness + .file_io + .new_input(&fs_path) + .unwrap() + .read() + .await + .unwrap(), + bytes::Bytes::from("fs_data") + ); + + Ok(()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_invalid_scheme(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + + let result = harness.file_io.exists("unknown://bucket/key").await; + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Unsupported storage scheme")); + + Ok(()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_missing_scheme(#[case] kind: StorageKind) -> iceberg::Result<()> { + let Some(harness) = load_storage(kind).await else { + return Ok(()); + }; + + let result = harness.file_io.exists("no-scheme-path").await; + assert!(result.is_err()); + + Ok(()) +} + +#[rstest] +#[case::opendal_resolving(StorageKind::OpenDalResolving)] +#[tokio::test] +async fn test_resolving_with_custom_credential_loader( + #[case] kind: StorageKind, +) -> iceberg::Result<()> { + let Some(_harness) = load_storage(kind).await else { + return Ok(()); + }; + + struct MinioCredentialLoader; + + #[async_trait] + impl AwsCredentialLoad for MinioCredentialLoader { + async fn load_credential( + &self, + _client: Client, + ) -> anyhow::Result> { + Ok(Some(AwsCredential { + access_key_id: "admin".to_string(), + secret_access_key: "password".to_string(), + session_token: None, + expires_in: None, + })) + } + } + + set_up(); + let minio_endpoint = get_minio_endpoint(); + + let factory = OpenDalResolvingStorageFactory::new() + .with_s3_credential_loader(CustomAwsCredentialLoader::new(Arc::new( + MinioCredentialLoader, + ))); + + let file_io = FileIOBuilder::new(Arc::new(factory)) + .with_props(vec![ + (S3_ENDPOINT, minio_endpoint), + (S3_REGION, "us-east-1".to_string()), + ]) + .build(); + + // Should be able to access S3 using the custom credential loader + assert!(file_io.exists("s3://bucket1/").await.unwrap()); + + Ok(()) +} From 7d8088621cd95eff2f192602c60c4810d1307f87 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 13:38:32 -0700 Subject: [PATCH 2/5] fix fmt --- crates/storage/common/tests/common/mod.rs | 12 +++++++--- .../storage/common/tests/credential_suite.rs | 9 +++---- crates/storage/common/tests/file_io_suite.rs | 16 +++---------- .../storage/common/tests/resolving_suite.rs | 24 +++++++++---------- 4 files changed, 28 insertions(+), 33 deletions(-) diff --git a/crates/storage/common/tests/common/mod.rs b/crates/storage/common/tests/common/mod.rs index dbcb7cd28b..37404c306b 100644 --- a/crates/storage/common/tests/common/mod.rs +++ b/crates/storage/common/tests/common/mod.rs @@ -23,8 +23,8 @@ use std::collections::HashMap; use std::sync::Arc; use iceberg::io::{ - FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, S3_ACCESS_KEY_ID, S3_ENDPOINT, - S3_REGION, S3_SECRET_ACCESS_KEY, + FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, + S3_SECRET_ACCESS_KEY, }; use iceberg_storage_opendal::{OpenDalResolvingStorageFactory, OpenDalStorageFactory}; use iceberg_test_utils::{get_gcs_endpoint, get_minio_endpoint, set_up}; @@ -100,7 +100,13 @@ async fn load_opendal_gcs() -> Option { let client = reqwest::Client::new(); let endpoint = format!("{gcs_endpoint}/storage/v1/b"); - if client.post(&endpoint).json(&bucket_data).send().await.is_err() { + if client + .post(&endpoint) + .json(&bucket_data) + .send() + .await + .is_err() + { return None; } diff --git a/crates/storage/common/tests/credential_suite.rs b/crates/storage/common/tests/credential_suite.rs index 1a1ebbb2cf..37d4858772 100644 --- a/crates/storage/common/tests/credential_suite.rs +++ b/crates/storage/common/tests/credential_suite.rs @@ -22,7 +22,7 @@ mod common; use std::sync::Arc; use async_trait::async_trait; -use common::{load_storage, StorageKind}; +use common::{StorageKind, load_storage}; use iceberg::io::{FileIOBuilder, S3_ENDPOINT, S3_REGION}; use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory}; use iceberg_test_utils::get_minio_endpoint; @@ -132,9 +132,10 @@ async fn test_s3_with_custom_credential_loader_failure( match file_io_with_custom_creds.exists("s3://bucket1/any").await { Ok(_) => panic!("Expected error, but got Ok"), Err(e) => { - assert!(e - .to_string() - .contains("no valid credential found and anonymous access is not allowed")); + assert!( + e.to_string() + .contains("no valid credential found and anonymous access is not allowed") + ); } } diff --git a/crates/storage/common/tests/file_io_suite.rs b/crates/storage/common/tests/file_io_suite.rs index 52c8248221..d6880c1245 100644 --- a/crates/storage/common/tests/file_io_suite.rs +++ b/crates/storage/common/tests/file_io_suite.rs @@ -21,7 +21,7 @@ mod common; use std::sync::Arc; -use common::{load_storage, unique_path, StorageKind}; +use common::{StorageKind, load_storage, unique_path}; use futures::StreamExt; use iceberg::io::{FileIOBuilder, LocalFsStorageFactory}; use rstest::rstest; @@ -116,7 +116,6 @@ async fn test_file_io_delete_nonexistent(#[case] kind: StorageKind) -> iceberg:: Ok(()) } - #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] #[case::opendal_gcs(StorageKind::OpenDalGcs)] @@ -159,7 +158,6 @@ async fn test_file_io_delete_stream_empty(#[case] kind: StorageKind) -> iceberg: Ok(()) } - #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] #[case::opendal_gcs(StorageKind::OpenDalGcs)] @@ -204,7 +202,6 @@ async fn test_file_io_delete_prefix_nonexistent(#[case] kind: StorageKind) -> ic Ok(()) } - #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] #[case::opendal_gcs(StorageKind::OpenDalGcs)] @@ -243,7 +240,6 @@ async fn test_file_io_metadata_nonexistent(#[case] kind: StorageKind) -> iceberg Ok(()) } - #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] #[case::opendal_gcs(StorageKind::OpenDalGcs)] @@ -269,7 +265,6 @@ async fn test_file_io_range_read(#[case] kind: StorageKind) -> iceberg::Result<( Ok(()) } - #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] #[case::opendal_gcs(StorageKind::OpenDalGcs)] @@ -306,10 +301,7 @@ async fn test_file_io_streaming_write_double_close( let path = unique_path(&harness, "test_file_io_streaming_write_double_close"); let output_file = harness.file_io.new_output(&path).unwrap(); let mut writer = output_file.writer().await.unwrap(); - writer - .write(bytes::Bytes::from("data")) - .await - .unwrap(); + writer.write(bytes::Bytes::from("data")).await.unwrap(); writer.close().await.unwrap(); // Second close should error let result = writer.close().await; @@ -317,11 +309,9 @@ async fn test_file_io_streaming_write_double_close( Ok(()) } - #[tokio::test] async fn test_file_io_builder_with_prop() { - let builder = - FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).with_prop("key1", "value1"); + let builder = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).with_prop("key1", "value1"); assert_eq!(builder.config().get("key1"), Some(&"value1".to_string())); } diff --git a/crates/storage/common/tests/resolving_suite.rs b/crates/storage/common/tests/resolving_suite.rs index b16cae7358..455fab009f 100644 --- a/crates/storage/common/tests/resolving_suite.rs +++ b/crates/storage/common/tests/resolving_suite.rs @@ -22,7 +22,7 @@ mod common; use std::sync::Arc; use async_trait::async_trait; -use common::{load_storage, unique_path, StorageKind}; +use common::{StorageKind, load_storage, unique_path}; use iceberg::io::{FileIOBuilder, S3_ENDPOINT, S3_REGION}; use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalResolvingStorageFactory}; use iceberg_test_utils::{get_minio_endpoint, set_up}; @@ -297,10 +297,12 @@ async fn test_invalid_scheme(#[case] kind: StorageKind) -> iceberg::Result<()> { let result = harness.file_io.exists("unknown://bucket/key").await; assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("Unsupported storage scheme")); + assert!( + result + .unwrap_err() + .to_string() + .contains("Unsupported storage scheme") + ); Ok(()) } @@ -333,10 +335,7 @@ async fn test_resolving_with_custom_credential_loader( #[async_trait] impl AwsCredentialLoad for MinioCredentialLoader { - async fn load_credential( - &self, - _client: Client, - ) -> anyhow::Result> { + async fn load_credential(&self, _client: Client) -> anyhow::Result> { Ok(Some(AwsCredential { access_key_id: "admin".to_string(), secret_access_key: "password".to_string(), @@ -349,10 +348,9 @@ async fn test_resolving_with_custom_credential_loader( set_up(); let minio_endpoint = get_minio_endpoint(); - let factory = OpenDalResolvingStorageFactory::new() - .with_s3_credential_loader(CustomAwsCredentialLoader::new(Arc::new( - MinioCredentialLoader, - ))); + let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader( + CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)), + ); let file_io = FileIOBuilder::new(Arc::new(factory)) .with_props(vec![ From 978d4e0978be45e6e33d8e96f822b1f3db8eb63c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 18 Mar 2026 14:00:26 -0700 Subject: [PATCH 3/5] comment out gcs for delete_stream --- crates/storage/common/tests/file_io_suite.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/storage/common/tests/file_io_suite.rs b/crates/storage/common/tests/file_io_suite.rs index d6880c1245..cdb6ecfd5b 100644 --- a/crates/storage/common/tests/file_io_suite.rs +++ b/crates/storage/common/tests/file_io_suite.rs @@ -118,7 +118,9 @@ async fn test_file_io_delete_nonexistent(#[case] kind: StorageKind) -> iceberg:: #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] -#[case::opendal_gcs(StorageKind::OpenDalGcs)] +// We use fake-gcs-server for testing and it doesn't support batch delete +// https://github.com/fsouza/fake-gcs-server/issues/1443 +// #[case::opendal_gcs(StorageKind::OpenDalGcs)] #[tokio::test] async fn test_file_io_delete_stream(#[case] kind: StorageKind) -> iceberg::Result<()> { let Some(harness) = load_storage(kind).await else { @@ -147,7 +149,9 @@ async fn test_file_io_delete_stream(#[case] kind: StorageKind) -> iceberg::Resul #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] -#[case::opendal_gcs(StorageKind::OpenDalGcs)] +// We use fake-gcs-server for testing and it doesn't support batch delete +// https://github.com/fsouza/fake-gcs-server/issues/1443 +// #[case::opendal_gcs(StorageKind::OpenDalGcs)] #[tokio::test] async fn test_file_io_delete_stream_empty(#[case] kind: StorageKind) -> iceberg::Result<()> { let Some(harness) = load_storage(kind).await else { From a7a55eb7c319f7e018e286cd4da5259ba388f7d6 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 19 Mar 2026 15:13:48 -0700 Subject: [PATCH 4/5] disable delete_prefix test for gcs --- crates/storage/common/tests/file_io_suite.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/storage/common/tests/file_io_suite.rs b/crates/storage/common/tests/file_io_suite.rs index cdb6ecfd5b..ce349f9eee 100644 --- a/crates/storage/common/tests/file_io_suite.rs +++ b/crates/storage/common/tests/file_io_suite.rs @@ -164,7 +164,9 @@ async fn test_file_io_delete_stream_empty(#[case] kind: StorageKind) -> iceberg: #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] -#[case::opendal_gcs(StorageKind::OpenDalGcs)] +// We use fake-gcs-server for testing and it doesn't support batch delete +// https://github.com/fsouza/fake-gcs-server/issues/1443 +// #[case::opendal_gcs(StorageKind::OpenDalGcs)] #[tokio::test] async fn test_file_io_delete_prefix(#[case] kind: StorageKind) -> iceberg::Result<()> { let Some(harness) = load_storage(kind).await else { @@ -194,7 +196,9 @@ async fn test_file_io_delete_prefix(#[case] kind: StorageKind) -> iceberg::Resul #[rstest] #[case::opendal_s3(StorageKind::OpenDalS3)] -#[case::opendal_gcs(StorageKind::OpenDalGcs)] +// We use fake-gcs-server for testing and it doesn't support batch delete +// https://github.com/fsouza/fake-gcs-server/issues/1443 +// #[case::opendal_gcs(StorageKind::OpenDalGcs)] #[tokio::test] async fn test_file_io_delete_prefix_nonexistent(#[case] kind: StorageKind) -> iceberg::Result<()> { let Some(harness) = load_storage(kind).await else { From 12a42be9dbb4b63d450f3fecf1916f4af06c9c19 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 19 Mar 2026 15:29:14 -0700 Subject: [PATCH 5/5] remove old tests --- Cargo.lock | 2 - crates/storage/opendal/Cargo.toml | 7 - .../storage/opendal/tests/file_io_gcs_test.rs | 97 ------ .../storage/opendal/tests/file_io_s3_test.rs | 249 --------------- .../opendal/tests/resolving_storage_test.rs | 297 ------------------ 5 files changed, 652 deletions(-) delete mode 100644 crates/storage/opendal/tests/file_io_gcs_test.rs delete mode 100644 crates/storage/opendal/tests/file_io_s3_test.rs delete mode 100644 crates/storage/opendal/tests/resolving_storage_test.rs diff --git a/Cargo.lock b/Cargo.lock index 06b8c4dd14..0a6aee350a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3584,12 +3584,10 @@ dependencies = [ "cfg-if", "futures", "iceberg", - "iceberg_test_utils", "opendal", "reqsign", "reqwest", "serde", - "tokio", "typetag", "url", ] diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index 84f7e1147a..8120e35298 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -50,10 +50,3 @@ serde = { workspace = true } typetag = { workspace = true } url = { workspace = true } futures = { workspace = true } - -[dev-dependencies] -async-trait = { workspace = true } -iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } -reqsign = { version = "0.16.3", default-features = false } -reqwest = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/storage/opendal/tests/file_io_gcs_test.rs b/crates/storage/opendal/tests/file_io_gcs_test.rs deleted file mode 100644 index 5e04491131..0000000000 --- a/crates/storage/opendal/tests/file_io_gcs_test.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Integration tests for FileIO Google Cloud Storage (GCS). -//! -//! These tests assume Docker containers are started externally via `make docker-up`. - -#[cfg(feature = "opendal-gcs")] -mod tests { - use std::collections::HashMap; - use std::sync::Arc; - - use bytes::Bytes; - use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH}; - use iceberg_storage_opendal::OpenDalStorageFactory; - use iceberg_test_utils::{get_gcs_endpoint, set_up}; - - static FAKE_GCS_BUCKET: &str = "test-bucket"; - - async fn get_file_io_gcs() -> FileIO { - set_up(); - - let gcs_endpoint = get_gcs_endpoint(); - - // A bucket must exist for FileIO - create_bucket(FAKE_GCS_BUCKET, &gcs_endpoint).await.unwrap(); - - FileIOBuilder::new(Arc::new(OpenDalStorageFactory::Gcs)) - .with_props(vec![ - (GCS_SERVICE_PATH, gcs_endpoint), - (GCS_NO_AUTH, "true".to_string()), - ]) - .build() - } - - // Create a bucket against the emulated GCS storage server. - async fn create_bucket(name: &str, server_endpoint: &str) -> anyhow::Result<()> { - let mut bucket_data = HashMap::new(); - bucket_data.insert("name", name); - - let client = reqwest::Client::new(); - let endpoint = format!("{server_endpoint}/storage/v1/b"); - client.post(endpoint).json(&bucket_data).send().await?; - Ok(()) - } - - fn get_gs_path() -> String { - format!("gs://{FAKE_GCS_BUCKET}") - } - - #[tokio::test] - async fn gcs_exists() { - let file_io = get_file_io_gcs().await; - assert!(file_io.exists(format!("{}/", get_gs_path())).await.unwrap()); - } - - #[tokio::test] - async fn gcs_write() { - let gs_file = format!("{}/write-file", get_gs_path()); - let file_io = get_file_io_gcs().await; - let output = file_io.new_output(&gs_file).unwrap(); - output - .write(bytes::Bytes::from_static(b"iceberg-gcs!")) - .await - .expect("Write to test output file"); - assert!(file_io.exists(gs_file).await.unwrap()) - } - - #[tokio::test] - async fn gcs_read() { - let gs_file = format!("{}/read-gcs", get_gs_path()); - let file_io = get_file_io_gcs().await; - let output = file_io.new_output(&gs_file).unwrap(); - output - .write(bytes::Bytes::from_static(b"iceberg!")) - .await - .expect("Write to test output file"); - assert!(file_io.exists(&gs_file).await.unwrap()); - - let input = file_io.new_input(gs_file).unwrap(); - assert_eq!(input.read().await.unwrap(), Bytes::from_static(b"iceberg!")); - } -} diff --git a/crates/storage/opendal/tests/file_io_s3_test.rs b/crates/storage/opendal/tests/file_io_s3_test.rs deleted file mode 100644 index 207a4454d7..0000000000 --- a/crates/storage/opendal/tests/file_io_s3_test.rs +++ /dev/null @@ -1,249 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Integration tests for FileIO S3. -//! -//! These tests assume Docker containers are started externally via `make docker-up`. -//! Each test uses unique file paths based on module path to avoid conflicts. -#[cfg(feature = "opendal-s3")] -mod tests { - use std::sync::Arc; - - use async_trait::async_trait; - use futures::StreamExt; - use iceberg::io::{ - FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, - }; - use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory}; - use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; - use reqsign::{AwsCredential, AwsCredentialLoad}; - use reqwest::Client; - - async fn get_file_io() -> FileIO { - set_up(); - - let minio_endpoint = get_minio_endpoint(); - - FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: "s3".to_string(), - customized_credential_load: None, - })) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_ACCESS_KEY_ID, "admin".to_string()), - (S3_SECRET_ACCESS_KEY, "password".to_string()), - (S3_REGION, "us-east-1".to_string()), - ]) - .build() - } - - #[tokio::test] - async fn test_file_io_s3_exists() { - let file_io = get_file_io().await; - assert!(!file_io.exists("s3://bucket2/any").await.unwrap()); - assert!(file_io.exists("s3://bucket1/").await.unwrap()); - } - - #[tokio::test] - async fn test_file_io_s3_output() { - let file_io = get_file_io().await; - // Use unique file path based on module path to avoid conflicts - let output_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_file_io_s3_output") - ); - // Clean up from any previous test runs - let _ = file_io.delete(&output_path).await; - assert!(!file_io.exists(&output_path).await.unwrap()); - let output_file = file_io.new_output(&output_path).unwrap(); - { - output_file.write("123".into()).await.unwrap(); - } - assert!(file_io.exists(&output_path).await.unwrap()); - } - - #[tokio::test] - async fn test_file_io_s3_input() { - let file_io = get_file_io().await; - // Use unique file path based on module path to avoid conflicts - let file_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_file_io_s3_input") - ); - let output_file = file_io.new_output(&file_path).unwrap(); - { - output_file.write("test_input".into()).await.unwrap(); - } - - let input_file = file_io.new_input(&file_path).unwrap(); - - { - let buffer = input_file.read().await.unwrap(); - assert_eq!(buffer, "test_input".as_bytes()); - } - } - - // Mock credential loader for testing - struct MockCredentialLoader { - credential: Option, - } - - impl MockCredentialLoader { - fn new(credential: Option) -> Self { - Self { credential } - } - - fn new_minio() -> Self { - Self::new(Some(AwsCredential { - access_key_id: "admin".to_string(), - secret_access_key: "password".to_string(), - session_token: None, - expires_in: None, - })) - } - } - - #[async_trait] - impl AwsCredentialLoad for MockCredentialLoader { - async fn load_credential(&self, _client: Client) -> anyhow::Result> { - Ok(self.credential.clone()) - } - } - - #[test] - fn test_custom_aws_credential_loader_instantiation() { - // Test creating CustomAwsCredentialLoader with mock loader - let mock_loader = MockCredentialLoader::new_minio(); - let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader)); - - // Test that the loader can be used in FileIOBuilder with OpenDalStorageFactory - let _builder = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: "s3".to_string(), - customized_credential_load: Some(custom_loader), - })) - .with_props(vec![ - (S3_ENDPOINT, "http://localhost:9000".to_string()), - ("bucket", "test-bucket".to_string()), - (S3_REGION, "us-east-1".to_string()), - ]); - } - - #[tokio::test] - async fn test_s3_with_custom_credential_loader_integration() { - let _file_io = get_file_io().await; - - // Create a mock credential loader - let mock_loader = MockCredentialLoader::new_minio(); - let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader)); - - let minio_endpoint = get_minio_endpoint(); - - // Build FileIO with custom credential loader via OpenDalStorageFactory - let file_io_with_custom_creds = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: "s3".to_string(), - customized_credential_load: Some(custom_loader), - })) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_REGION, "us-east-1".to_string()), - ]) - .build(); - - // Test that the FileIO was built successfully with the custom loader - match file_io_with_custom_creds.exists("s3://bucket1/any").await { - Ok(_) => {} - Err(e) => panic!("Failed to check existence of bucket: {e}"), - } - } - - #[tokio::test] - async fn test_s3_with_custom_credential_loader_integration_failure() { - let _file_io = get_file_io().await; - - // Create a mock credential loader with no credentials - let mock_loader = MockCredentialLoader::new(None); - let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader)); - - let minio_endpoint = get_minio_endpoint(); - - // Build FileIO with custom credential loader via OpenDalStorageFactory - let file_io_with_custom_creds = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 { - configured_scheme: "s3".to_string(), - customized_credential_load: Some(custom_loader), - })) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_REGION, "us-east-1".to_string()), - ]) - .build(); - - // Test that the FileIO was built successfully with the custom loader - match file_io_with_custom_creds.exists("s3://bucket1/any").await { - Ok(_) => panic!( - "Expected error, but got Ok - the credential loader should fail to provide valid credentials" - ), - Err(e) => { - assert!( - e.to_string() - .contains("no valid credential found and anonymous access is not allowed") - ); - } - } - } - - #[tokio::test] - async fn test_file_io_s3_delete_stream() { - let file_io = get_file_io().await; - - // Write multiple files - let paths: Vec = (0..5) - .map(|i| { - format!( - "s3://bucket1/{}/file-{i}", - normalize_test_name_with_parts!("test_file_io_s3_delete_stream") - ) - }) - .collect(); - for path in &paths { - let _ = file_io.delete(path).await; - file_io - .new_output(path) - .unwrap() - .write("delete-me".into()) - .await - .unwrap(); - assert!(file_io.exists(path).await.unwrap()); - } - - // Delete via delete_stream - let stream = futures::stream::iter(paths.clone()).boxed(); - file_io.delete_stream(stream).await.unwrap(); - - // Verify all files are gone - for path in &paths { - assert!(!file_io.exists(path).await.unwrap()); - } - } - - #[tokio::test] - async fn test_file_io_s3_delete_stream_empty() { - let file_io = get_file_io().await; - let stream = futures::stream::empty().boxed(); - // Should succeed with no-op - file_io.delete_stream(stream).await.unwrap(); - } -} diff --git a/crates/storage/opendal/tests/resolving_storage_test.rs b/crates/storage/opendal/tests/resolving_storage_test.rs deleted file mode 100644 index 4572ad2c2d..0000000000 --- a/crates/storage/opendal/tests/resolving_storage_test.rs +++ /dev/null @@ -1,297 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Integration tests for OpenDalResolvingStorage. -//! -//! These tests assume Docker containers are started externally via `make docker-up`. -//! Each test uses unique file paths based on module path to avoid conflicts. - -#[cfg(all( - feature = "opendal-s3", - feature = "opendal-fs", - feature = "opendal-memory" -))] -mod tests { - use std::sync::Arc; - - use iceberg::io::{ - FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, - }; - use iceberg_storage_opendal::OpenDalResolvingStorageFactory; - use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; - - fn get_resolving_file_io() -> iceberg::io::FileIO { - set_up(); - - let minio_endpoint = get_minio_endpoint(); - - FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new())) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_ACCESS_KEY_ID, "admin".to_string()), - (S3_SECRET_ACCESS_KEY, "password".to_string()), - (S3_REGION, "us-east-1".to_string()), - ]) - .build() - } - - fn temp_fs_path(name: &str) -> String { - let dir = std::env::temp_dir().join("iceberg_resolving_tests"); - std::fs::create_dir_all(&dir).unwrap(); - let path = dir.join(name); - // Clean up from previous runs - let _ = std::fs::remove_file(&path); - format!("file:/{}", path.display()) - } - - #[tokio::test] - async fn test_mixed_scheme_write_and_read() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_scheme_write_and_read") - ); - let fs_path = temp_fs_path("mixed_write_and_read.txt"); - let mem_path = "memory://test_mixed_scheme_write_and_read"; - - // Write to all three schemes - file_io - .new_output(&s3_path) - .unwrap() - .write("from_s3".into()) - .await - .unwrap(); - file_io - .new_output(&fs_path) - .unwrap() - .write("from_fs".into()) - .await - .unwrap(); - file_io - .new_output(mem_path) - .unwrap() - .write("from_memory".into()) - .await - .unwrap(); - - // Read back from all three - assert_eq!( - file_io.new_input(&s3_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("from_s3") - ); - assert_eq!( - file_io.new_input(&fs_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("from_fs") - ); - assert_eq!( - file_io.new_input(mem_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("from_memory") - ); - } - - #[tokio::test] - async fn test_mixed_scheme_exists_independently() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_scheme_exists_independently") - ); - let fs_path = temp_fs_path("mixed_exists_independently.txt"); - let mem_path = "memory://test_mixed_scheme_exists_independently"; - - // Clean up S3 from previous runs - let _ = file_io.delete(&s3_path).await; - - // None exist initially - assert!(!file_io.exists(&s3_path).await.unwrap()); - assert!(!file_io.exists(&fs_path).await.unwrap()); - assert!(!file_io.exists(mem_path).await.unwrap()); - - // Write only to fs - file_io - .new_output(&fs_path) - .unwrap() - .write("fs_only".into()) - .await - .unwrap(); - - // Only fs exists - assert!(!file_io.exists(&s3_path).await.unwrap()); - assert!(file_io.exists(&fs_path).await.unwrap()); - assert!(!file_io.exists(mem_path).await.unwrap()); - } - - #[tokio::test] - async fn test_mixed_scheme_delete_one_keeps_others() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_scheme_delete_one_keeps_others") - ); - let fs_path = temp_fs_path("mixed_delete_one_keeps_others.txt"); - let mem_path = "memory://test_mixed_scheme_delete_one_keeps_others"; - - // Write to all three - file_io - .new_output(&s3_path) - .unwrap() - .write("s3".into()) - .await - .unwrap(); - file_io - .new_output(&fs_path) - .unwrap() - .write("fs".into()) - .await - .unwrap(); - file_io - .new_output(mem_path) - .unwrap() - .write("mem".into()) - .await - .unwrap(); - - // Delete only the fs file - file_io.delete(&fs_path).await.unwrap(); - - // fs gone, S3 and memory still there - assert!(file_io.exists(&s3_path).await.unwrap()); - assert!(!file_io.exists(&fs_path).await.unwrap()); - assert!(file_io.exists(mem_path).await.unwrap()); - - assert_eq!( - file_io.new_input(&s3_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("s3") - ); - assert_eq!( - file_io.new_input(mem_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("mem") - ); - } - - #[tokio::test] - async fn test_mixed_scheme_interleaved_operations() { - let file_io = get_resolving_file_io(); - - let s3_path = format!( - "s3://bucket1/{}", - normalize_test_name_with_parts!("test_mixed_scheme_interleaved") - ); - let fs_path = temp_fs_path("mixed_interleaved.txt"); - let mem_path = "memory://test_mixed_scheme_interleaved"; - - // Interleave: write fs, write memory, write s3 - file_io - .new_output(&fs_path) - .unwrap() - .write("fs_data".into()) - .await - .unwrap(); - file_io - .new_output(mem_path) - .unwrap() - .write("mem_data".into()) - .await - .unwrap(); - file_io - .new_output(&s3_path) - .unwrap() - .write("s3_data".into()) - .await - .unwrap(); - - // Read in reverse order: s3, memory, fs - assert_eq!( - file_io.new_input(&s3_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("s3_data") - ); - assert_eq!( - file_io.new_input(mem_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("mem_data") - ); - assert_eq!( - file_io.new_input(&fs_path).unwrap().read().await.unwrap(), - bytes::Bytes::from("fs_data") - ); - } - - #[tokio::test] - async fn test_invalid_scheme() { - let file_io = get_resolving_file_io(); - let result = file_io.exists("unknown://bucket/key").await; - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("Unsupported storage scheme"), - ); - } - - #[tokio::test] - async fn test_missing_scheme() { - let file_io = get_resolving_file_io(); - let result = file_io.exists("no-scheme-path").await; - assert!(result.is_err()); - } - - #[cfg(feature = "opendal-s3")] - #[tokio::test] - async fn test_with_custom_credential_loader() { - use async_trait::async_trait; - use iceberg_storage_opendal::CustomAwsCredentialLoader; - use reqsign::{AwsCredential, AwsCredentialLoad}; - use reqwest::Client; - - struct MinioCredentialLoader; - - #[async_trait] - impl AwsCredentialLoad for MinioCredentialLoader { - async fn load_credential( - &self, - _client: Client, - ) -> anyhow::Result> { - Ok(Some(AwsCredential { - access_key_id: "admin".to_string(), - secret_access_key: "password".to_string(), - session_token: None, - expires_in: None, - })) - } - } - - set_up(); - let minio_endpoint = get_minio_endpoint(); - - let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader( - CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)), - ); - - let file_io = FileIOBuilder::new(Arc::new(factory)) - .with_props(vec![ - (S3_ENDPOINT, minio_endpoint), - (S3_REGION, "us-east-1".to_string()), - ]) - .build(); - - // Should be able to access S3 using the custom credential loader - assert!(file_io.exists("s3://bucket1/").await.unwrap()); - } -}