diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 6dcf1c9a158e..985b2750502b 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -348,11 +348,14 @@ public boolean canDeleteWhere(Predicate[] predicates) { } } - return canDeleteUsingMetadata(deleteExpr); + String scanBranch = + SparkTableUtil.determineReadBranch( + sparkSession(), icebergTable, branch, CaseInsensitiveStringMap.empty()); + return canDeleteUsingMetadata(deleteExpr, scanBranch); } // a metadata delete is possible iff matching files can be deleted entirely - private boolean canDeleteUsingMetadata(Expression deleteExpr) { + private boolean canDeleteUsingMetadata(Expression deleteExpr, String scanBranch) { boolean caseSensitive = SparkUtil.caseSensitive(sparkSession()); if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) { @@ -367,14 +370,14 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) { .includeColumnStats() .ignoreResiduals(); - if (branch != null) { - scan = scan.useRef(branch); + if (scanBranch != null) { + scan = scan.useRef(scanBranch); } try (CloseableIterable tasks = scan.planFiles()) { Map evaluators = Maps.newHashMap(); StrictMetricsEvaluator metricsEvaluator = - new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), branch), deleteExpr); + new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), scanBranch), deleteExpr); return Iterables.all( tasks, @@ -411,12 +414,10 @@ public void deleteWhere(Predicate[] predicates) { .set("spark.app.id", sparkSession().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); - if (SparkTableUtil.wapEnabled(table())) { - branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); - } + String writeBranch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); - if (branch != null) { - deleteFiles.toBranch(branch); + if (writeBranch != null) { + deleteFiles.toBranch(writeBranch); } if (!CommitMetadata.commitProperties().isEmpty()) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index 2389bcc17387..7eded57c9d0c 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -24,9 +24,11 @@ import java.util.List; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -185,4 +187,32 @@ public void testDeleteFromTablePartitionedByVarbinary() { ImmutableList.of(row(1L, new byte[] {-29, -68, -47})), sql("SELECT * FROM %s where data = X'e3bcd1'", tableName)); } + + @TestTemplate + public void testDeleteWithWapBranch() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1"); + try { + // all rows go into one file on the WAP branch; main stays empty + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + + // delete a subset of rows - canDeleteWhere and deleteWhere must both + // resolve the WAP branch so they scan and commit to the same branch + sql("DELETE FROM %s WHERE id = 1", tableName); + + assertEquals( + "Should have deleted only the matching row on the WAP branch", + ImmutableList.of(row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName)); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } }