-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark:Add branch support to rewrite_data_files procedure #14964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Spark:Add branch support to rewrite_data_files procedure #14964
Conversation
This commit adds branch support to the rewrite_data_files Spark SQL
procedure, allowing users to rewrite data files on specific branches
instead of only the main branch.
Changes:
- Core: Updated RewriteDataFilesCommitManager to accept and apply branch parameter
- Action: Added toBranch() method to RewriteDataFilesSparkAction
- Procedure: Added branch parameter to all Spark versions (v3.4, v3.5, v4.0, v4.1)
Users can now specify branches in two ways:
1. Via table identifier: CALL system.rewrite_data_files('db.table.branch_myBranch')
2. Via explicit parameter: CALL system.rewrite_data_files(table => 'db.table', branch => 'myBranch')
Fixes apache#14813
This commit fixes the compilation error by implementing the missing toBranch() method in RewriteDataFilesSparkAction for Spark versions 3.4, 3.5, and 4.0. Changes: - Added toBranch(String targetBranch) method to RewriteDataFilesSparkAction - Updated commitManager() to pass branch parameter to RewriteDataFilesCommitManager - Added comprehensive branch tests to TestRewriteDataFilesProcedure (all versions) The implementation follows the same pattern as v4.1 and matches how SparkWrite handles branches. Integration tests passing: iceberg-delta-lake:check
- Add missing Table import in Spark 3.4 test file - Fix branch names to use camelCase (testBranch, filteredBranch) to avoid SQL parsing errors - Ensure files are actually rewritten by inserting multiple small files - Add min-input-files option to force file compaction - Remove incorrect snapshot ID ordering assertions - Add explicit assertions to verify files are rewritten and snapshots change
The previous implementation incorrectly passed null as a 4th parameter in the 2-arg and 3-arg constructors, which caused them to call the wrong constructor overload. This resulted in snapshotProperties being null, leading to a NullPointerException when commitFileGroups() tried to iterate over properties with forEach(). The issue broke Flink maintenance API tests (TestRewriteDataFiles and TestFlinkTableSinkCompaction) because the Flink DataFileRewriteCommitter uses the 2-arg constructor. Files were not being rewritten as expected. Changes: - Line 51: Remove null parameter to call 3-arg constructor - Line 56: Remove null parameter to call 4-arg constructor with Map This ensures the constructor chain properly passes through existing constructors without introducing null values, and branch parameter is correctly passed only through the appropriate constructors.
...k/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
When rewrite_data_files was called with a branch parameter, the planner incorrectly used the main branch's snapshot to scan for files to compact, while the commit targeted the specified branch. This caused validation failures when branches diverged. The fix ensures RewriteDataFilesSparkAction.execute() uses the branch's snapshot ID when a branch is specified, allowing the planner to correctly identify and compact files from the branch. This change applies to all Spark versions (3.4, 3.5, 4.0, 4.1) and fixes all rewrite strategies (binpack, sort, z-order) since they all rely on the snapshot ID passed from RewriteDataFilesSparkAction.
Enhanced testBranchCompactionDoesNotAffectMain to verify that the new snapshot created by rewrite_data_files is a child of the previous branch snapshot. This ensures the compaction is committed to the branch's history chain, not to main. The assertion checks that: table.snapshot(branchSnapshotAfterCompaction).parentId() == branchSnapshotBeforeCompaction This provides stronger validation that the rewrite operation correctly targets and modifies the specified branch.
Apply spotless formatting to multi-line sql() and assertThat() calls across Spark 3.4, 3.5, and 4.0 modules.
...k/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
Adds a precondition check in RewriteDataFilesSparkAction.execute() to verify that the specified branch exists before attempting to access its snapshot. This provides a clear error message instead of a cryptic NullPointerException when a non-existent branch is specified.
.../v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
|
Left some comments. In a next PR we will do the backport which should be easy and clean. Thanks! |
…thods Change checkAndApplyFilter and checkAndApplyStrategy to accept and return RewriteDataFilesSparkAction instead of the RewriteDataFiles interface, eliminating unnecessary casts at call sites.
Remove rewrite_data_files branch support from older Spark versions, keeping the feature only in Spark 4.1.
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
...ensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
Outdated
Show resolved
Hide resolved
| assertThat(table.currentSnapshot().snapshotId()) | ||
| .as("Main snapshot should remain unchanged") | ||
| .isEqualTo(mainSnapshotId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a compare between the actualRecords and expectedRecords?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review, Done!
|
|
||
| // Check if files were actually rewritten | ||
| int filesRewritten = (Integer) output.get(0)[0]; | ||
| int filesAdded = (Integer) output.get(0)[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a assert for filesAdded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
- Use assertEquals with row() pattern matching existing tests - Add expectedRecords/actualRecords comparison for data integrity - Use exact file counts (10 files from insertData, compacted to 1) - Remove conditional assertions in favor of direct assertions - Simplify test setup to use insertData() consistently
| table -> { | ||
| RewriteDataFiles action = actions().rewriteDataFiles(table).options(options); | ||
| RewriteDataFilesSparkAction action = | ||
| (RewriteDataFilesSparkAction) actions().rewriteDataFiles(table).options(options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this cast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remnant of a previous change
SparkActions.rewriteDataFiles() already returns RewriteDataFilesSparkAction, so the cast is redundant.
pvary
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 from my side
@Guosmilesmile: Any more comments?
Guosmilesmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM as well, just a minor / optional suggestion
| long startingSnapshotId, | ||
| boolean useStartingSequenceNumber, | ||
| Map<String, String> snapshotProperties) { | ||
| this(table, startingSnapshotId, useStartingSequenceNumber, snapshotProperties, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor : i wonder if we could just set this to MAIN branch as default, so we don't have to do isNull checks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to use null to delegate to SnapshotProducer's default behavior rather than explicitly setting "main".
This avoids imposing an opinion - if the default branch behavior changes in the future, this will automatically follow. The null check pattern is also consistent with how branch handling works elsewhere in the codebase (see SnapshotUtil methods that treat null as "use default").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then again, it makes sense to remove a lot of isNull checks by making toBranch(null) a no-op. This should make the code cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary / @singhpk234 any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is something, I have also considered during the review, but I don't have a strong opinion, so I left as it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaning +1 on letting null mean “use default branch” and making toBranch(null) a no‑op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review, done
- toBranch(null) results in no-op
- removed one condition check for branch!=null
|
Are you good @singhpk234 leaving as is? |
| private boolean removeDanglingDeletes; | ||
| private boolean useStartingSequenceNumber; | ||
| private boolean caseSensitive; | ||
| private String branch = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as what Prashant suggested above, I think we can probably default branch to main and can always assert on table.snapshot(branch) != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @dramaticlly , per @huaxingao 's recommendation kept
- null->default branch
- toBranch(null) results in no-op
This change ensures that calling toBranch(null) does not modify the internal branch state, treating null as "use default branch". This simplifies the calling code and removes redundant null checks.
|
@huaxingao |
| if (targetBranch != null) { | ||
| this.branch = targetBranch; | ||
| } | ||
| return this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline - if we keep this code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we do this?
It would make sense if we would define the branch differently, like:
private String branch = SnapshotRef.MAIN_BRANCH;
If it is just null then this code is just confusing. Basically equals
this.branch = targetBranch;
return this;
Just a little more confusing, because you only can not reset the value back to null 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you this makes sense, the only logical path out of this I see is to default it to MAIN_BRANCH
Updated to align with core Iceberg API behavior. SnapshotProducer.targetBranch() defaults to MAIN_BRANCH and rejects null with IllegalArgumentException - since RewriteDataFilesSparkAction eventually calls through to SnapshotProducer via rewrite.toBranch(), it makes sense to have consistent behavior at the action level.
Changes:
- branch now defaults to SnapshotRef.MAIN_BRANCH instead of null
- toBranch(null) now throws IllegalArgumentException (matching SnapshotProducer)
- Removed null-check guards in execute() that are no longer needed
- Added test testRewriteDataFilesToNullBranchFails
cc: @huaxingao
…ject null This change aligns RewriteDataFilesSparkAction.toBranch() with the core Iceberg API behavior in SnapshotProducer.targetBranch(): - Default branch to SnapshotRef.MAIN_BRANCH instead of null - Reject null branch with IllegalArgumentException - Remove null-check guards that are no longer needed Since RewriteDataFilesSparkAction eventually calls rewrite.toBranch() which invokes SnapshotProducer.targetBranch(), having consistent behavior at the action level prevents confusion and potential runtime errors.
Spark: Add branch support to rewrite_data_files procedure
This change enables the rewrite_data_files stored procedure to rewrite
data files on specific branches instead of only on the main branch.
Implementation:
Users can specify branches in two ways:
The implementation follows the existing pattern used by SparkWrite and other
branch-aware operations. The commit manager already had branch support built in,
this change wires it through the action and procedure layers.
Fixes #14813