diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 9e9d4580c3..a7e0171337 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -659,6 +659,17 @@ impl Catalog for GlueCatalog { Ok(()) } + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + iceberg::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + /// Asynchronously checks the existence of a specified table /// in the database. /// diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index bd78193732..4a030c1104 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -604,6 +604,17 @@ impl Catalog for HmsCatalog { Ok(()) } + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + iceberg::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + /// Asynchronously checks the existence of a specified table /// in the database. /// diff --git a/crates/catalog/loader/tests/common/mod.rs b/crates/catalog/loader/tests/common/mod.rs index 90b72df8ab..600cd9b6f4 100644 --- a/crates/catalog/loader/tests/common/mod.rs +++ b/crates/catalog/loader/tests/common/mod.rs @@ -335,7 +335,7 @@ pub fn assert_map_contains(expected: &HashMap, actual: &HashMap< pub async fn cleanup_namespace_dyn(catalog: &dyn Catalog, namespace: &NamespaceIdent) { if let Ok(tables) = catalog.list_tables(namespace).await { for table in tables { - let _ = catalog.drop_table(&table).await; + let _ = catalog.purge_table(&table).await; } } let _ = catalog.drop_namespace(namespace).await; diff --git a/crates/catalog/loader/tests/table_suite.rs b/crates/catalog/loader/tests/table_suite.rs index 6b7a3a822c..cdc9b11043 100644 --- a/crates/catalog/loader/tests/table_suite.rs +++ b/crates/catalog/loader/tests/table_suite.rs @@ -274,3 +274,80 @@ async fn test_catalog_drop_table_missing_errors(#[case] kind: CatalogKind) -> Re assert!(catalog.drop_table(&table_ident).await.is_err()); Ok(()) } + +// Common behavior: purge_table removes the table from the catalog. +#[rstest] +#[case::rest_catalog(CatalogKind::Rest)] +#[case::glue_catalog(CatalogKind::Glue)] +#[case::hms_catalog(CatalogKind::Hms)] +#[case::sql_catalog(CatalogKind::Sql)] +#[case::s3tables_catalog(CatalogKind::S3Tables)] +#[case::memory_catalog(CatalogKind::Memory)] +#[tokio::test] +async fn test_catalog_purge_table(#[case] kind: CatalogKind) -> Result<()> { + let Some(harness) = load_catalog(kind).await else { + return Ok(()); + }; + let catalog = harness.catalog; + let namespace = NamespaceIdent::new(normalize_test_name_with_parts!( + "catalog_purge_table", + harness.label + )); + + cleanup_namespace_dyn(catalog.as_ref(), &namespace).await; + catalog.create_namespace(&namespace, HashMap::new()).await?; + + let table_name = normalize_test_name_with_parts!("catalog_purge_table", harness.label, "table"); + let table = catalog + .create_table(&namespace, table_creation(table_name)) + .await?; + let ident = table.identifier().clone(); + + assert!(catalog.table_exists(&ident).await?); + + // Capture metadata location and file_io before purge so we can verify + // that the underlying files are actually deleted. + let metadata_location = table.metadata_location().map(|s| s.to_string()); + let file_io = table.file_io().clone(); + + catalog.purge_table(&ident).await?; + assert!(!catalog.table_exists(&ident).await?); + + if let Some(location) = &metadata_location { + assert!( + !file_io.exists(location).await?, + "Metadata file should have been deleted after purge" + ); + } + + catalog.drop_namespace(&namespace).await?; + + Ok(()) +} + +// Common behavior: purging a missing table should error. +#[rstest] +#[case::rest_catalog(CatalogKind::Rest)] +#[case::glue_catalog(CatalogKind::Glue)] +#[case::hms_catalog(CatalogKind::Hms)] +#[case::sql_catalog(CatalogKind::Sql)] +#[case::s3tables_catalog(CatalogKind::S3Tables)] +#[case::memory_catalog(CatalogKind::Memory)] +#[tokio::test] +async fn test_catalog_purge_table_missing_errors(#[case] kind: CatalogKind) -> Result<()> { + let Some(harness) = load_catalog(kind).await else { + return Ok(()); + }; + let catalog = harness.catalog; + let namespace = NamespaceIdent::new(normalize_test_name_with_parts!( + "catalog_purge_table_missing_errors", + harness.label + )); + + cleanup_namespace_dyn(catalog.as_ref(), &namespace).await; + catalog.create_namespace(&namespace, HashMap::new()).await?; + + let table_ident = TableIdent::new(namespace.clone(), "missing".to_string()); + assert!(catalog.purge_table(&table_ident).await.is_err()); + Ok(()) +} diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 3551b05160..7d5df24d52 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -363,6 +363,35 @@ impl RestCatalog { } } + /// Sends a DELETE request for the given table, optionally requesting purge. + async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> { + let context = self.context().await?; + + let mut request_builder = context + .client + .request(Method::DELETE, context.config.table_endpoint(table)); + + if purge { + request_builder = request_builder.query(&[("purgeRequested", "true")]); + } + + let request = request_builder.build()?; + let http_response = context.client.query_catalog(request).await?; + + match http_response.status() { + StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), + StatusCode::NOT_FOUND => Err(Error::new( + ErrorKind::TableNotFound, + "Tried to drop a table that does not exist", + )), + _ => Err(deserialize_unexpected_catalog_error( + http_response, + context.client.disable_header_redaction(), + ) + .await), + } + } + /// Gets the [`RestContext`] from the catalog. async fn context(&self) -> Result<&RestContext> { self.ctx @@ -828,27 +857,13 @@ impl Catalog for RestCatalog { /// Drop a table from the catalog. async fn drop_table(&self, table: &TableIdent) -> Result<()> { - let context = self.context().await?; - - let request = context - .client - .request(Method::DELETE, context.config.table_endpoint(table)) - .build()?; - - let http_response = context.client.query_catalog(request).await?; + self.delete_table(table, false).await + } - match http_response.status() { - StatusCode::NO_CONTENT | StatusCode::OK => Ok(()), - StatusCode::NOT_FOUND => Err(Error::new( - ErrorKind::TableNotFound, - "Tried to drop a table that does not exist", - )), - _ => Err(deserialize_unexpected_catalog_error( - http_response, - context.client.disable_header_redaction(), - ) - .await), - } + /// Drop a table from the catalog and purge its data by sending + /// `purgeRequested=true` to the REST server. + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + self.delete_table(table, true).await } /// Check if a table exists in the catalog. diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index a416c38f22..b88bd77d29 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -562,15 +562,18 @@ impl Catalog for S3TablesCatalog { Ok(self.load_table_with_version_token(table_ident).await?.0) } - /// Drops an existing table from the s3tables catalog. + /// Not supported for S3Tables. Use `purge_table` instead. /// - /// Validates the table identifier and then deletes the corresponding - /// table from the s3tables catalog. - /// - /// This function can return an error in the following situations: - /// - Errors from the underlying database deletion process, converted using - /// `from_aws_sdk_error`. - async fn drop_table(&self, table: &TableIdent) -> Result<()> { + /// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog. + async fn drop_table(&self, _table: &TableIdent) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "drop_table is not supported for S3Tables; use purge_table instead", + )) + } + + /// Purge a table from the S3 Tables catalog. + async fn purge_table(&self, table: &TableIdent) -> Result<()> { let req = self .s3tables_client .delete_table() diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 195f6c9de4..7e468e7e37 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -757,6 +757,17 @@ impl Catalog for SqlCatalog { Ok(()) } + async fn purge_table(&self, table: &TableIdent) -> Result<()> { + let table_info = self.load_table(table).await?; + self.drop_table(table).await?; + iceberg::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + async fn load_table(&self, identifier: &TableIdent) -> Result { if !self.table_exists(identifier).await? { return no_such_table_err(identifier); diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 25ae004417..8fa5c479c3 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -326,6 +326,17 @@ impl Catalog for MemoryCatalog { Ok(()) } + async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> { + let table_info = self.load_table(table_ident).await?; + self.drop_table(table_ident).await?; + crate::catalog::utils::drop_table_data( + table_info.file_io(), + table_info.metadata(), + table_info.metadata_location(), + ) + .await + } + /// Check if a table exists in the catalog. async fn table_exists(&self, table_ident: &TableIdent) -> Result { let root_namespace_state = self.root_namespace_state.lock().await; diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 06326917ec..f296cf2260 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -19,6 +19,7 @@ pub mod memory; mod metadata_location; +pub(crate) mod utils; use std::collections::HashMap; use std::fmt::{Debug, Display}; @@ -98,6 +99,14 @@ pub trait Catalog: Debug + Sync + Send { /// Drop a table from the catalog, or returns error if it doesn't exist. async fn drop_table(&self, table: &TableIdent) -> Result<()>; + /// Drop a table from the catalog and delete the underlying table data. + /// + /// Implementations should load the table metadata, drop the table + /// from the catalog, then delete all associated data and metadata files. + /// The [`drop_table_data`](utils::drop_table_data) utility function can + /// be used for the file cleanup step. + async fn purge_table(&self, table: &TableIdent) -> Result<()>; + /// Check if a table exists in the catalog. async fn table_exists(&self, table: &TableIdent) -> Result; diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs new file mode 100644 index 0000000000..d450f9df80 --- /dev/null +++ b/crates/iceberg/src/catalog/utils.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utility functions for catalog operations. + +use std::collections::HashSet; + +use futures::{TryStreamExt, stream}; + +use crate::Result; +use crate::io::FileIO; +use crate::spec::TableMetadata; + +const DELETE_CONCURRENCY: usize = 10; + +/// Deletes all data and metadata files referenced by the given table metadata. +/// +/// This mirrors the Java implementation's `CatalogUtil.dropTableData`. +/// It collects all manifest files, manifest lists, previous metadata files, +/// statistics files, and partition statistics files, then deletes them. +/// +/// Data files within manifests are only deleted if the `gc.enabled` table +/// property is `true` (the default), to avoid corrupting other tables that +/// may share the same data files. +pub async fn drop_table_data( + io: &FileIO, + metadata: &TableMetadata, + metadata_location: Option<&str>, +) -> Result<()> { + let mut manifest_lists_to_delete: HashSet = HashSet::new(); + let mut manifests_to_delete: HashSet = HashSet::new(); + + // Load all manifest lists concurrently + let results: Vec<_> = + futures::future::try_join_all(metadata.snapshots().map(|snapshot| async { + let manifest_list = snapshot.load_manifest_list(io, metadata).await?; + Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list)) + })) + .await?; + + for (manifest_list_location, manifest_list) in results { + if !manifest_list_location.is_empty() { + manifest_lists_to_delete.insert(manifest_list_location); + } + for manifest_file in manifest_list.entries() { + manifests_to_delete.insert(manifest_file.manifest_path.clone()); + } + } + + // Delete data files only if gc.enabled is true, to avoid corrupting shared tables + if metadata.table_properties()?.gc_enabled { + delete_data_files(io, &manifests_to_delete).await?; + } + + // Delete manifest files + io.delete_stream(stream::iter(manifests_to_delete)).await?; + + // Delete manifest lists + io.delete_stream(stream::iter(manifest_lists_to_delete)) + .await?; + + // Delete previous metadata files + let prev_metadata_paths: Vec = metadata + .metadata_log() + .iter() + .map(|m| m.metadata_file.clone()) + .collect(); + io.delete_stream(stream::iter(prev_metadata_paths)).await?; + + // Delete statistics files + let stats_paths: Vec = metadata + .statistics_iter() + .map(|s| s.statistics_path.clone()) + .collect(); + io.delete_stream(stream::iter(stats_paths)).await?; + + // Delete partition statistics files + let partition_stats_paths: Vec = metadata + .partition_statistics_iter() + .map(|s| s.statistics_path.clone()) + .collect(); + io.delete_stream(stream::iter(partition_stats_paths)) + .await?; + + // Delete the current metadata file + if let Some(location) = metadata_location { + io.delete(location).await?; + } + + Ok(()) +} + +/// Reads manifests concurrently and deletes the data files referenced within. +async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) -> Result<()> { + stream::iter(manifest_paths.iter().map(Ok)) + .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move { + let input = io.new_input(manifest_path)?; + let manifest_content = input.read().await?; + let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?; + + let data_file_paths = manifest + .entries() + .iter() + .map(|entry| entry.data_file.file_path().to_string()) + .collect::>(); + + io.delete_stream(stream::iter(data_file_paths)).await + }) + .await +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8b345deb6e..d06c04e520 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -71,6 +71,7 @@ pub use error::{Error, ErrorKind, Result}; mod catalog; +pub use catalog::utils::drop_table_data; pub use catalog::*; pub mod table; diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 6e08318479..07c157304e 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -114,6 +114,9 @@ pub struct TableProperties { pub metadata_compression_codec: CompressionCodec, /// Whether to use `FanoutWriter` for partitioned tables. pub write_datafusion_fanout_enabled: bool, + /// Whether garbage collection is enabled on drop. + /// When `false`, data files will not be deleted when a table is dropped. + pub gc_enabled: bool, } impl TableProperties { @@ -212,6 +215,13 @@ impl TableProperties { pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled"; /// Default value for fanout writer enabled pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT: bool = true; + + /// Property key for enabling garbage collection on drop. + /// When set to `false`, data files will not be deleted when a table is dropped. + /// Defaults to `true`. + pub const PROPERTY_GC_ENABLED: &str = "gc.enabled"; + /// Default value for gc.enabled + pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true; } impl TryFrom<&HashMap> for TableProperties { @@ -256,6 +266,11 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED, TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT, )?, + gc_enabled: parse_property( + props, + TableProperties::PROPERTY_GC_ENABLED, + TableProperties::PROPERTY_GC_ENABLED_DEFAULT, + )?, }) } } @@ -294,6 +309,10 @@ mod tests { table_properties.metadata_compression_codec, CompressionCodec::None ); + assert_eq!( + table_properties.gc_enabled, + TableProperties::PROPERTY_GC_ENABLED_DEFAULT + ); } #[test] @@ -377,12 +396,17 @@ mod tests { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES.to_string(), "512".to_string(), ), + ( + TableProperties::PROPERTY_GC_ENABLED.to_string(), + "false".to_string(), + ), ]); let table_properties = TableProperties::try_from(&props).unwrap(); assert_eq!(table_properties.commit_num_retries, 10); assert_eq!(table_properties.commit_max_retry_wait_ms, 20); assert_eq!(table_properties.write_format_default, "avro".to_string()); assert_eq!(table_properties.write_target_file_size_bytes, 512); + assert!(!table_properties.gc_enabled); } #[test] @@ -429,6 +453,17 @@ mod tests { assert!(table_properties.to_string().contains( "Invalid value for write.target-file-size-bytes: invalid digit found in string" )); + + let invalid_gc_enabled = HashMap::from([( + TableProperties::PROPERTY_GC_ENABLED.to_string(), + "notabool".to_string(), + )]); + let table_properties = TableProperties::try_from(&invalid_gc_enabled).unwrap_err(); + assert!( + table_properties + .to_string() + .contains("Invalid value for gc.enabled") + ); } #[test]