Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,14 @@ public boolean canDeleteWhere(Predicate[] predicates) {
}
}

return canDeleteUsingMetadata(deleteExpr);
String scanBranch =
SparkTableUtil.determineReadBranch(
sparkSession(), icebergTable, branch, CaseInsensitiveStringMap.empty());
return canDeleteUsingMetadata(deleteExpr, scanBranch);
}

// a metadata delete is possible iff matching files can be deleted entirely
private boolean canDeleteUsingMetadata(Expression deleteExpr) {
private boolean canDeleteUsingMetadata(Expression deleteExpr, String scanBranch) {
boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());

if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
Expand All @@ -367,14 +370,14 @@ private boolean canDeleteUsingMetadata(Expression deleteExpr) {
.includeColumnStats()
.ignoreResiduals();

if (branch != null) {
scan = scan.useRef(branch);
if (scanBranch != null) {
scan = scan.useRef(scanBranch);
}

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
Map<Integer, Evaluator> evaluators = Maps.newHashMap();
StrictMetricsEvaluator metricsEvaluator =
new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), branch), deleteExpr);
new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), scanBranch), deleteExpr);

return Iterables.all(
tasks,
Expand Down Expand Up @@ -411,12 +414,10 @@ public void deleteWhere(Predicate[] predicates) {
.set("spark.app.id", sparkSession().sparkContext().applicationId())
.deleteFromRowFilter(deleteExpr);

if (SparkTableUtil.wapEnabled(table())) {
branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch);
}
String writeBranch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch);

if (branch != null) {
deleteFiles.toBranch(branch);
if (writeBranch != null) {
deleteFiles.toBranch(writeBranch);
}

if (!CommitMetadata.commitProperties().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.List;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand Down Expand Up @@ -185,4 +187,32 @@ public void testDeleteFromTablePartitionedByVarbinary() {
ImmutableList.of(row(1L, new byte[] {-29, -68, -47})),
sql("SELECT * FROM %s where data = X'e3bcd1'", tableName));
}

@TestTemplate
public void testDeleteWithWapBranch() throws NoSuchTableException {
sql(
"CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);

spark.conf().set(SparkSQLProperties.WAP_BRANCH, "dev1");
try {
// all rows go into one file on the WAP branch; main stays empty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also insert some rows/files into the main branch first? ideally with a row of matching the predicate of id=1 .

List<SimpleRecord> records =
Lists.newArrayList(
new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
df.coalesce(1).writeTo(tableName).append();

// delete a subset of rows - canDeleteWhere and deleteWhere must both
// resolve the WAP branch so they scan and commit to the same branch
sql("DELETE FROM %s WHERE id = 1", tableName);

assertEquals(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use assertj

assertThat(sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName))
.containsExactlyInAnyOrder(...)

"Should have deleted only the matching row on the WAP branch",
ImmutableList.of(row(2L, "b"), row(3L, "c")),
sql("SELECT * FROM %s VERSION AS OF 'dev1' ORDER BY id", tableName));
} finally {
spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
}
}
}