diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index 7e0f6207edc9..2e0186ddcd43 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1473,6 +1473,64 @@ protected void append(String target, Employee... employees) throws NoSuchTableEx inputDF.coalesce(1).writeTo(target).append(); } + @TestTemplate + public void testPartitionDeleteWithWapBranch() { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + sql("INSERT INTO %s VALUES (2, 'a'), (2, 'b')", tableName); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "dev1"), + () -> { + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'a')", tableName); + + sql("DELETE FROM %s WHERE data = 'a'", tableName); + + assertEquals( + "Main branch should keep all original data", + ImmutableList.of(row(2L, "a"), row(2L, "b")), + sql("SELECT * FROM %s.branch_main ORDER BY id, data", tableName)); + + assertEquals( + "WAP branch should have data=a row deleted", + ImmutableList.of(row(2L, "b")), + sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id, data", tableName)); + }); + } + + @TestTemplate + public void testNonPartitionDeleteWithWapBranch() { + sql( + "CREATE TABLE %s (id bigint, data string) USING iceberg " + + "PARTITIONED BY (data) " + + "TBLPROPERTIES ('%s' = 'true')", + tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + + sql("INSERT INTO %s VALUES (2, 'a'), (2, 'b')", tableName); + + withSQLConf( + ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "dev1"), + () -> { + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'a')", tableName); + + sql("DELETE FROM %s WHERE id = 2", tableName); + + assertEquals( + "Main branch should keep all original data", + ImmutableList.of(row(2L, "a"), row(2L, "b")), + sql("SELECT * FROM %s.branch_main ORDER BY id, data", tableName)); + + assertEquals( + "WAP branch should have id=2 row deleted", + ImmutableList.of(row(1L, "a")), + sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id, data", tableName)); + }); + } + private RowLevelOperationMode mode(Table table) { String modeName = table.properties().getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT); return RowLevelOperationMode.fromName(modeName); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index ae3c0ce0c8bb..7767eb139a30 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -49,6 +49,7 @@ import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.spark.SparkV2Filters; import org.apache.iceberg.spark.TimeTravel; @@ -233,7 +234,12 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) { .includeColumnStats() .ignoreResiduals(); - if (snapshot != null) { + String targetBranch = + SparkTableUtil.determineReadBranch( + spark(), table(), branch, CaseInsensitiveStringMap.empty()); + if (targetBranch != null) { + scan = scan.useRef(targetBranch); + } else if (snapshot != null) { scan = scan.useSnapshot(snapshot.snapshotId()); } @@ -275,8 +281,11 @@ public void deleteWhere(Predicate[] predicates) { .set("spark.app.id", spark().sparkContext().applicationId()) .deleteFromRowFilter(deleteExpr); - if (branch != null) { - deleteFiles.toBranch(branch); + String targetBranch = + SparkTableUtil.determineWriteBranch( + spark(), table(), branch, CaseInsensitiveStringMap.empty()); + if (targetBranch != null) { + deleteFiles.toBranch(targetBranch); } if (!CommitMetadata.commitProperties().isEmpty()) {