Skip to content
11 changes: 11 additions & 0 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,17 @@ impl Catalog for GlueCatalog {
Ok(())
}

async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let table_info = self.load_table(table).await?;
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
}

/// Asynchronously checks the existence of a specified table
/// in the database.
///
Expand Down
11 changes: 11 additions & 0 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,17 @@ impl Catalog for HmsCatalog {
Ok(())
}

async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let table_info = self.load_table(table).await?;
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
}

/// Asynchronously checks the existence of a specified table
/// in the database.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/catalog/loader/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub fn assert_map_contains(expected: &HashMap<String, String>, actual: &HashMap<
pub async fn cleanup_namespace_dyn(catalog: &dyn Catalog, namespace: &NamespaceIdent) {
if let Ok(tables) = catalog.list_tables(namespace).await {
for table in tables {
let _ = catalog.drop_table(&table).await;
let _ = catalog.purge_table(&table).await;
}
}
let _ = catalog.drop_namespace(namespace).await;
Expand Down
77 changes: 77 additions & 0 deletions crates/catalog/loader/tests/table_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,80 @@ async fn test_catalog_drop_table_missing_errors(#[case] kind: CatalogKind) -> Re
assert!(catalog.drop_table(&table_ident).await.is_err());
Ok(())
}

// Common behavior: purge_table removes the table from the catalog.
#[rstest]
#[case::rest_catalog(CatalogKind::Rest)]
#[case::glue_catalog(CatalogKind::Glue)]
#[case::hms_catalog(CatalogKind::Hms)]
#[case::sql_catalog(CatalogKind::Sql)]
#[case::s3tables_catalog(CatalogKind::S3Tables)]
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_purge_table(#[case] kind: CatalogKind) -> Result<()> {
let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
let catalog = harness.catalog;
let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
"catalog_purge_table",
harness.label
));

cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
catalog.create_namespace(&namespace, HashMap::new()).await?;

let table_name = normalize_test_name_with_parts!("catalog_purge_table", harness.label, "table");
let table = catalog
.create_table(&namespace, table_creation(table_name))
.await?;
let ident = table.identifier().clone();

assert!(catalog.table_exists(&ident).await?);

// Capture metadata location and file_io before purge so we can verify
// that the underlying files are actually deleted.
let metadata_location = table.metadata_location().map(|s| s.to_string());
let file_io = table.file_io().clone();

catalog.purge_table(&ident).await?;
assert!(!catalog.table_exists(&ident).await?);

if let Some(location) = &metadata_location {
assert!(
!file_io.exists(location).await?,
"Metadata file should have been deleted after purge"
);
}

catalog.drop_namespace(&namespace).await?;

Ok(())
}

// Common behavior: purging a missing table should error.
#[rstest]
#[case::rest_catalog(CatalogKind::Rest)]
#[case::glue_catalog(CatalogKind::Glue)]
#[case::hms_catalog(CatalogKind::Hms)]
#[case::sql_catalog(CatalogKind::Sql)]
#[case::s3tables_catalog(CatalogKind::S3Tables)]
#[case::memory_catalog(CatalogKind::Memory)]
#[tokio::test]
async fn test_catalog_purge_table_missing_errors(#[case] kind: CatalogKind) -> Result<()> {
let Some(harness) = load_catalog(kind).await else {
return Ok(());
};
let catalog = harness.catalog;
let namespace = NamespaceIdent::new(normalize_test_name_with_parts!(
"catalog_purge_table_missing_errors",
harness.label
));

cleanup_namespace_dyn(catalog.as_ref(), &namespace).await;
catalog.create_namespace(&namespace, HashMap::new()).await?;

let table_ident = TableIdent::new(namespace.clone(), "missing".to_string());
assert!(catalog.purge_table(&table_ident).await.is_err());
Ok(())
}
55 changes: 35 additions & 20 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,35 @@ impl RestCatalog {
}
}

/// Sends a DELETE request for the given table, optionally requesting purge.
async fn delete_table(&self, table: &TableIdent, purge: bool) -> Result<()> {
let context = self.context().await?;

let mut request_builder = context
.client
.request(Method::DELETE, context.config.table_endpoint(table));

if purge {
request_builder = request_builder.query(&[("purgeRequested", "true")]);
}

let request = request_builder.build()?;
let http_response = context.client.query_catalog(request).await?;

match http_response.status() {
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(Error::new(
ErrorKind::TableNotFound,
"Tried to drop a table that does not exist",
)),
_ => Err(deserialize_unexpected_catalog_error(
http_response,
context.client.disable_header_redaction(),
)
.await),
}
}

/// Gets the [`RestContext`] from the catalog.
async fn context(&self) -> Result<&RestContext> {
self.ctx
Expand Down Expand Up @@ -828,27 +857,13 @@ impl Catalog for RestCatalog {

/// Drop a table from the catalog.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let context = self.context().await?;

let request = context
.client
.request(Method::DELETE, context.config.table_endpoint(table))
.build()?;

let http_response = context.client.query_catalog(request).await?;
self.delete_table(table, false).await
}

match http_response.status() {
StatusCode::NO_CONTENT | StatusCode::OK => Ok(()),
StatusCode::NOT_FOUND => Err(Error::new(
ErrorKind::TableNotFound,
"Tried to drop a table that does not exist",
)),
_ => Err(deserialize_unexpected_catalog_error(
http_response,
context.client.disable_header_redaction(),
)
.await),
}
/// Drop a table from the catalog and purge its data by sending
/// `purgeRequested=true` to the REST server.
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
self.delete_table(table, true).await
}

/// Check if a table exists in the catalog.
Expand Down
19 changes: 11 additions & 8 deletions crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,18 @@ impl Catalog for S3TablesCatalog {
Ok(self.load_table_with_version_token(table_ident).await?.0)
}

/// Drops an existing table from the s3tables catalog.
/// Not supported for S3Tables. Use `purge_table` instead.
///
/// Validates the table identifier and then deletes the corresponding
/// table from the s3tables catalog.
///
/// This function can return an error in the following situations:
/// - Errors from the underlying database deletion process, converted using
/// `from_aws_sdk_error`.
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
/// S3 Tables doesn't support soft delete, so dropping a table will permanently remove it from the catalog.
async fn drop_table(&self, _table: &TableIdent) -> Result<()> {
Err(Error::new(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorKind::FeatureUnsupported,
"drop_table is not supported for S3Tables; use purge_table instead",
))
}

/// Purge a table from the S3 Tables catalog.
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let req = self
.s3tables_client
.delete_table()
Expand Down
11 changes: 11 additions & 0 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,17 @@ impl Catalog for SqlCatalog {
Ok(())
}

async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let table_info = self.load_table(table).await?;
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
}

async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
if !self.table_exists(identifier).await? {
return no_such_table_err(identifier);
Expand Down
11 changes: 11 additions & 0 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,17 @@ impl Catalog for MemoryCatalog {
Ok(())
}

async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
let table_info = self.load_table(table_ident).await?;
self.drop_table(table_ident).await?;
crate::catalog::utils::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
}

/// Check if a table exists in the catalog.
async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
let root_namespace_state = self.root_namespace_state.lock().await;
Expand Down
9 changes: 9 additions & 0 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub mod memory;
mod metadata_location;
pub(crate) mod utils;

use std::collections::HashMap;
use std::fmt::{Debug, Display};
Expand Down Expand Up @@ -98,6 +99,14 @@ pub trait Catalog: Debug + Sync + Send {
/// Drop a table from the catalog, or returns error if it doesn't exist.
async fn drop_table(&self, table: &TableIdent) -> Result<()>;

/// Drop a table from the catalog and delete the underlying table data.
///
/// Implementations should load the table metadata, drop the table
/// from the catalog, then delete all associated data and metadata files.
/// The [`drop_table_data`](utils::drop_table_data) utility function can
/// be used for the file cleanup step.
async fn purge_table(&self, table: &TableIdent) -> Result<()>;

/// Check if a table exists in the catalog.
async fn table_exists(&self, table: &TableIdent) -> Result<bool>;

Expand Down
Loading
Loading