diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 26f61118b7..fe0a51e032 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -38,6 +38,7 @@ jobs: # Order here is sensitive, as it will be used to determine the order of publishing package: - "crates/iceberg" + - "crates/storage/object_store" - "crates/storage/opendal" - "crates/catalog/glue" - "crates/catalog/hms" diff --git a/Cargo.lock b/Cargo.lock index 39812b010f..c6dbdf0a82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3540,6 +3540,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "iceberg-storage-object_store" +version = "0.9.0" +dependencies = [ + "async-trait", + "bytes", + "dashmap", + "futures", + "iceberg", + "object_store", + "serde", + "tokio", + "typetag", + "url", +] + [[package]] name = "iceberg-storage-opendal" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 6a361ecbd8..8c675547fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,7 @@ iceberg-catalog-rest = { version = "0.9.0", path = "./crates/catalog/rest" } iceberg-catalog-s3tables = { version = "0.9.0", path = "./crates/catalog/s3tables" } iceberg-catalog-sql = { version = "0.9.0", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.9.0", path = "./crates/integrations/datafusion" } +iceberg-storage-object_store = { version = "0.9.0", path = "./crates/storage/object_store" } iceberg-storage-opendal = { version = "0.9.0", path = "./crates/storage/opendal" } indicatif = "0.18" itertools = "0.13" diff --git a/crates/storage/object_store/Cargo.toml b/crates/storage/object_store/Cargo.toml new file mode 100644 index 0000000000..4a78aacd29 --- /dev/null +++ b/crates/storage/object_store/Cargo.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-storage-object_store" +edition = { workspace = true } +version = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg object_store storage implementation" +keywords = ["iceberg", "object-store", "storage", "s3"] + +[features] +default = ["object_store-s3"] +object_store-s3 = ["object_store/aws"] + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +dashmap = { workspace = true } +futures = { workspace = true } +iceberg = { workspace = true } +object_store = { version = "0.12" } +serde = { workspace = true } +typetag = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/storage/object_store/src/lib.rs b/crates/storage/object_store/src/lib.rs new file mode 100644 index 0000000000..3d6a2748f5 --- /dev/null +++ b/crates/storage/object_store/src/lib.rs @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `object_store`-based storage implementation for Apache Iceberg. +//! +//! This crate provides [`ObjectStoreStorage`] and [`ObjectStoreStorageFactory`], +//! which implement the [`Storage`](iceberg::io::Storage) and +//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate +//! using the [`object_store`](https://docs.rs/object_store) crate as the backend. +//! +//! Currently only S3 storage is supported (via the `object_store-s3` feature flag, +//! enabled by default). + +#[cfg(feature = "object_store-s3")] +mod s3; + +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use dashmap::DashMap; +use futures::StreamExt; +use futures::stream::BoxStream; +#[cfg(feature = "object_store-s3")] +use iceberg::io::S3Config; +use iceberg::io::{ + FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, + StorageFactory, +}; +use iceberg::{Error, ErrorKind, Result}; +use object_store::path::Path as ObjectStorePath; +use object_store::{ObjectStore, PutPayload, WriteMultipart}; +#[cfg(feature = "object_store-s3")] +use s3::{build_s3_store, parse_s3_url}; +use serde::{Deserialize, Serialize}; + +/// Convert an `object_store::Error` into an `iceberg::Error`. +fn from_object_store_error(e: object_store::Error) -> Error { + Error::new(ErrorKind::Unexpected, "Failure in doing io operation").with_source(e) +} + +/// `object_store`-based storage factory. +/// +/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances +/// backed by the `object_store` crate. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ObjectStoreStorageFactory { + /// S3 storage factory. + #[cfg(feature = "object_store-s3")] + S3, +} + +#[typetag::serde(name = "ObjectStoreStorageFactory")] +impl StorageFactory for ObjectStoreStorageFactory { + #[allow(unused_variables)] + fn build(&self, config: &StorageConfig) -> Result> { + match self { + #[cfg(feature = "object_store-s3")] + ObjectStoreStorageFactory::S3 => { + let s3_config = S3Config::try_from(config)?; + Ok(Arc::new(ObjectStoreStorage::S3 { + config: Arc::new(s3_config), + store_cache: Arc::new(DashMap::new()), + })) + } + } + } +} + +/// `object_store`-based storage implementation. +/// +/// Stores are cached per bucket to avoid rebuilding the client on every operation. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ObjectStoreStorage { + /// S3 storage variant. + #[cfg(feature = "object_store-s3")] + S3 { + /// Parsed S3 configuration from iceberg core. + config: Arc, + /// Per-bucket store cache. + #[serde(skip, default)] + store_cache: Arc>>, + }, +} + +impl ObjectStoreStorage { + /// Get or create a cached store and extract the relative `ObjectStorePath`. + fn get_store_and_path(&self, path: &str) -> Result<(Arc, ObjectStorePath)> { + match self { + #[cfg(feature = "object_store-s3")] + ObjectStoreStorage::S3 { + config, + store_cache, + } => { + let (_scheme, bucket, relative) = parse_s3_url(path)?; + + let store = store_cache + .entry(bucket.to_string()) + .or_try_insert_with(|| build_s3_store(config, bucket))? + .value() + .clone(); + + Ok((store, ObjectStorePath::from(relative))) + } + } + } +} + +#[typetag::serde(name = "ObjectStoreStorage")] +#[async_trait] +impl Storage for ObjectStoreStorage { + async fn exists(&self, path: &str) -> Result { + let (store, object_path) = self.get_store_and_path(path)?; + match store.head(&object_path).await { + Ok(_) => Ok(true), + Err(object_store::Error::NotFound { .. }) => Ok(false), + Err(e) => Err(from_object_store_error(e)), + } + } + + async fn metadata(&self, path: &str) -> Result { + let (store, object_path) = self.get_store_and_path(path)?; + let meta = store + .head(&object_path) + .await + .map_err(from_object_store_error)?; + Ok(FileMetadata { + size: meta.size as u64, + }) + } + + async fn read(&self, path: &str) -> Result { + let (store, object_path) = self.get_store_and_path(path)?; + let result = store + .get(&object_path) + .await + .map_err(from_object_store_error)?; + result.bytes().await.map_err(from_object_store_error) + } + + async fn reader(&self, path: &str) -> Result> { + let (store, object_path) = self.get_store_and_path(path)?; + Ok(Box::new(ObjectStoreReader { + store, + path: object_path, + })) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let (store, object_path) = self.get_store_and_path(path)?; + store + .put(&object_path, PutPayload::from_bytes(bs)) + .await + .map_err(from_object_store_error)?; + Ok(()) + } + + async fn writer(&self, path: &str) -> Result> { + let (store, object_path) = self.get_store_and_path(path)?; + let upload = store + .put_multipart(&object_path) + .await + .map_err(from_object_store_error)?; + let writer = WriteMultipart::new(upload); + Ok(Box::new(ObjectStoreWriter { + writer: Some(writer), + })) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (store, object_path) = self.get_store_and_path(path)?; + store + .delete(&object_path) + .await + .map_err(from_object_store_error)?; + Ok(()) + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + let (store, object_path) = self.get_store_and_path(path)?; + let prefix = if object_path.as_ref().ends_with('/') { + object_path + } else { + ObjectStorePath::from(format!("{}/", object_path.as_ref())) + }; + + let mut list_stream = store.list(Some(&prefix)); + while let Some(entry) = list_stream.next().await { + let entry = entry.map_err(from_object_store_error)?; + store + .delete(&entry.location) + .await + .map_err(from_object_store_error)?; + } + Ok(()) + } + + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + while let Some(path) = paths.next().await { + let (store, object_path) = self.get_store_and_path(&path)?; + store + .delete(&object_path) + .await + .map_err(from_object_store_error)?; + } + Ok(()) + } + + fn new_input(&self, path: &str) -> Result { + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) + } +} + +/// Reader that implements `FileRead` using `object_store`. +struct ObjectStoreReader { + store: Arc, + path: ObjectStorePath, +} + +#[async_trait] +impl FileRead for ObjectStoreReader { + async fn read(&self, range: Range) -> Result { + let opts = object_store::GetOptions { + range: Some((range.start..range.end).into()), + ..Default::default() + }; + let result = self + .store + .get_opts(&self.path, opts) + .await + .map_err(from_object_store_error)?; + result.bytes().await.map_err(from_object_store_error) + } +} + +/// Writer that implements `FileWrite` using `object_store` multipart upload. +struct ObjectStoreWriter { + writer: Option, +} + +#[async_trait] +impl FileWrite for ObjectStoreWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let writer = self + .writer + .as_mut() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Writer has already been closed"))?; + writer.write(&bs); + Ok(()) + } + + async fn close(&mut self) -> Result<()> { + let writer = self + .writer + .take() + .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Writer has already been closed"))?; + writer.finish().await.map_err(from_object_store_error)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "object_store-s3")] + fn make_s3_storage() -> ObjectStoreStorage { + ObjectStoreStorage::S3 { + config: Arc::new(S3Config::default()), + store_cache: Arc::new(DashMap::new()), + } + } + + #[cfg(feature = "object_store-s3")] + #[test] + fn test_store_cache_reuses_store() { + let storage = make_s3_storage(); + let (store1, _) = storage + .get_store_and_path("s3://test-bucket/file1.parquet") + .unwrap(); + let (store2, _) = storage + .get_store_and_path("s3://test-bucket/file2.parquet") + .unwrap(); + assert!(Arc::ptr_eq(&store1, &store2)); + } + + #[cfg(feature = "object_store-s3")] + #[test] + fn test_store_cache_different_buckets() { + let storage = make_s3_storage(); + let (store1, _) = storage + .get_store_and_path("s3://bucket-a/file.parquet") + .unwrap(); + let (store2, _) = storage + .get_store_and_path("s3://bucket-b/file.parquet") + .unwrap(); + assert!(!Arc::ptr_eq(&store1, &store2)); + } + + #[cfg(feature = "object_store-s3")] + #[test] + fn test_relative_path_extraction() { + let storage = make_s3_storage(); + let (_, path) = storage + .get_store_and_path("s3://my-bucket/data/file.parquet") + .unwrap(); + assert_eq!(path.as_ref(), "data/file.parquet"); + } +} diff --git a/crates/storage/object_store/src/s3.rs b/crates/storage/object_store/src/s3.rs new file mode 100644 index 0000000000..8ff03ded67 --- /dev/null +++ b/crates/storage/object_store/src/s3.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use iceberg::io::S3Config; +use iceberg::{Error, ErrorKind, Result}; +use object_store::ObjectStore; +use object_store::aws::AmazonS3Builder; +use url::Url; + +/// Parse an absolute S3 URL into (scheme, bucket, relative_path). +/// +/// Accepts `s3://` and `s3a://` schemes. +pub(crate) fn parse_s3_url(path: &str) -> Result<(&str, &str, &str)> { + let url = Url::parse(path).map_err(|e| { + Error::new(ErrorKind::DataInvalid, format!("Invalid URL: {path}")).with_source(e) + })?; + + let scheme = &path[..url.scheme().len()]; + match scheme { + "s3" | "s3a" => {} + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported S3 scheme: {scheme} in url: {path}"), + )); + } + } + + let bucket_str = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {path}, missing bucket"), + ) + })?; + + let prefix_len = scheme.len() + "://".len() + bucket_str.len() + "/".len(); + let relative = if path.len() > prefix_len { + &path[prefix_len..] + } else { + "" + }; + + let bucket_start = scheme.len() + "://".len(); + let bucket = &path[bucket_start..bucket_start + bucket_str.len()]; + + Ok((scheme, bucket, relative)) +} + +/// Build an `AmazonS3` store from iceberg's `S3Config` for a given bucket. +pub(crate) fn build_s3_store(config: &S3Config, bucket: &str) -> Result> { + let mut builder = AmazonS3Builder::new().with_bucket_name(bucket); + + if let Some(ref endpoint) = config.endpoint { + builder = builder.with_endpoint(endpoint); + if endpoint.starts_with("http://") { + builder = builder.with_allow_http(true); + } + } + if let Some(ref access_key_id) = config.access_key_id { + builder = builder.with_access_key_id(access_key_id); + } + if let Some(ref secret_access_key) = config.secret_access_key { + builder = builder.with_secret_access_key(secret_access_key); + } + if let Some(ref session_token) = config.session_token { + builder = builder.with_token(session_token); + } + if let Some(ref region) = config.region { + builder = builder.with_region(region); + } + if config.enable_virtual_host_style { + builder = builder.with_virtual_hosted_style_request(true); + } + if config.allow_anonymous { + builder = builder.with_skip_signature(true); + } + + let store = builder.build().map_err(|e| { + Error::new(ErrorKind::Unexpected, "Failed to build S3 object store").with_source(e) + })?; + Ok(Arc::new(store)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_s3_url() { + let (scheme, bucket, relative) = + parse_s3_url("s3://my-bucket/path/to/file.parquet").unwrap(); + assert_eq!(scheme, "s3"); + assert_eq!(bucket, "my-bucket"); + assert_eq!(relative, "path/to/file.parquet"); + } + + #[test] + fn test_parse_s3a_url() { + let (scheme, bucket, relative) = + parse_s3_url("s3a://my-bucket/path/to/file.parquet").unwrap(); + assert_eq!(scheme, "s3a"); + assert_eq!(bucket, "my-bucket"); + assert_eq!(relative, "path/to/file.parquet"); + } + + #[test] + fn test_parse_s3_url_unsupported_scheme() { + assert!(parse_s3_url("gs://my-bucket/file.parquet").is_err()); + } + + #[test] + fn test_parse_s3_url_bucket_only() { + let (scheme, bucket, relative) = parse_s3_url("s3://my-bucket/").unwrap(); + assert_eq!(scheme, "s3"); + assert_eq!(bucket, "my-bucket"); + assert_eq!(relative, ""); + } +}