From 1a3b0ed6ce563a0e616ec3633624612d14ca1d10 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 4 Mar 2026 09:41:32 -0800 Subject: [PATCH 1/4] fix branch delete for canDeleteWhere where it does not resolve to the correct branch --- .../iceberg/spark/source/SparkTable.java | 4 +++ .../iceberg/spark/sql/TestDeleteFrom.java | 30 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 6dcf1c9a158e..f7fab6128122 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -337,6 +337,10 @@ public boolean canDeleteWhere(Predicate[] predicates) { Preconditions.checkArgument( snapshotId == null, "Cannot delete from table at a specific snapshot: %s", snapshotId); + if (SparkTableUtil.wapEnabled(table())) { + branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); + } + Expression deleteExpr = Expressions.alwaysTrue(); for (Predicate predicate : predicates) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java index 2389bcc17387..7eded57c9d0c 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java @@ -24,9 +24,11 @@ import java.util.List; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -185,4 +187,32 @@ public void testDeleteFromTablePartitionedByVarbinary() { ImmutableList.of(row(1L, new byte[] {-29, -68, -47})), sql("SELECT * FROM %s where data = X'e3bcd1'", tableName)); } + + @TestTemplate + public void testDeleteWithWapBranch() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1"); + try { + // all rows go into one file on the WAP branch; main stays empty + List records = + Lists.newArrayList( + new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c")); + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.coalesce(1).writeTo(tableName).append(); + + // delete a subset of rows - canDeleteWhere and deleteWhere must both + // resolve the WAP branch so they scan and commit to the same branch + sql("DELETE FROM %s WHERE id = 1", tableName); + + assertEquals( + "Should have deleted only the matching row on the WAP branch", + ImmutableList.of(row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName)); + } finally { + spark.conf().unset(SparkSQLProperties.WAP_BRANCH); + } + } } From 19a6329285ad637c51435abb3a8a633448899fce Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 4 Mar 2026 12:17:11 -0800 Subject: [PATCH 2/4] fixing test --- .../apache/iceberg/spark/source/SparkTable.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index f7fab6128122..aae81dd77c52 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -337,10 +337,6 @@ public boolean canDeleteWhere(Predicate[] predicates) { Preconditions.checkArgument( snapshotId == null, "Cannot delete from table at a specific snapshot: %s", snapshotId); - if (SparkTableUtil.wapEnabled(table())) { - branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); - } - Expression deleteExpr = Expressions.alwaysTrue(); for (Predicate predicate : predicates) { @@ -352,11 +348,14 @@ public boolean canDeleteWhere(Predicate[] predicates) { } } - return canDeleteUsingMetadata(deleteExpr); + String scanBranch = + SparkTableUtil.determineReadBranch( + sparkSession(), icebergTable, branch, CaseInsensitiveStringMap.empty()); + return canDeleteUsingMetadata(deleteExpr, scanBranch); } // a metadata delete is possible iff matching files can be deleted entirely - private boolean canDeleteUsingMetadata(Expression deleteExpr) { + private boolean canDeleteUsingMetadata(Expression deleteExpr, String scanBranch) { boolean caseSensitive = SparkUtil.caseSensitive(sparkSession()); if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) { @@ -371,14 +370,14 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) { .includeColumnStats() .ignoreResiduals(); - if (branch != null) { - scan = scan.useRef(branch); + if (scanBranch != null) { + scan = scan.useRef(scanBranch); } try (CloseableIterable tasks = scan.planFiles()) { Map evaluators = Maps.newHashMap(); StrictMetricsEvaluator metricsEvaluator = - new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), branch), deleteExpr); + new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), scanBranch), deleteExpr); return Iterables.all( tasks, From b576cc56070a7a446fa6afb7ee22542fab05e2f1 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 4 Mar 2026 14:00:43 -0800 Subject: [PATCH 3/4] fixing test --- .../java/org/apache/iceberg/spark/source/SparkTable.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index aae81dd77c52..d033efd69b16 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -414,12 +414,11 @@ public void deleteWhere(Predicate[] predicates) { .set("spark.app.id", sparkSession().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); - if (SparkTableUtil.wapEnabled(table())) { - branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); - } + String writeBranch = + SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); - if (branch != null) { - deleteFiles.toBranch(branch); + if (writeBranch != null) { + deleteFiles.toBranch(writeBranch); } if (!CommitMetadata.commitProperties().isEmpty()) { From ac87dce38b0980a9a2388a5bac946dda91695247 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 4 Mar 2026 15:04:43 -0800 Subject: [PATCH 4/4] fix format --- .../main/java/org/apache/iceberg/spark/source/SparkTable.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index d033efd69b16..985b2750502b 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -414,8 +414,7 @@ public void deleteWhere(Predicate[] predicates) { .set("spark.app.id", sparkSession().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); - String writeBranch = - SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); + String writeBranch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); if (writeBranch != null) { deleteFiles.toBranch(writeBranch);