Skip to content

Conversation

@97harsh
Copy link

@97harsh 97harsh commented Jan 5, 2026

Spark: Add branch support to rewrite_data_files procedure

This change enables the rewrite_data_files stored procedure to rewrite
data files on specific branches instead of only on the main branch.

Implementation:

  • Core: Extended RewriteDataFilesCommitManager to accept and use branch parameter
  • Action: Added toBranch() method to RewriteDataFilesSparkAction (v3.4, v3.5, v4.0, v4.1)
  • Procedure: Added optional branch parameter to RewriteDataFilesProcedure (all versions)
  • Tests: Added branch-specific test coverage for all Spark versions

Users can specify branches in two ways:

  1. Table identifier: CALL system.rewrite_data_files('db.table.branch_myBranch')
  2. Explicit parameter: CALL system.rewrite_data_files(table => 'db.table', branch => 'myBranch')

The implementation follows the existing pattern used by SparkWrite and other
branch-aware operations. The commit manager already had branch support built in,
this change wires it through the action and procedure layers.

Fixes #14813

This commit adds branch support to the rewrite_data_files Spark SQL
procedure, allowing users to rewrite data files on specific branches
instead of only the main branch.

Changes:
- Core: Updated RewriteDataFilesCommitManager to accept and apply branch parameter
- Action: Added toBranch() method to RewriteDataFilesSparkAction
- Procedure: Added branch parameter to all Spark versions (v3.4, v3.5, v4.0, v4.1)

Users can now specify branches in two ways:
1. Via table identifier: CALL system.rewrite_data_files('db.table.branch_myBranch')
2. Via explicit parameter: CALL system.rewrite_data_files(table => 'db.table', branch => 'myBranch')

Fixes apache#14813
@97harsh 97harsh marked this pull request as draft January 5, 2026 09:41
This commit fixes the compilation error by implementing the missing toBranch()
method in RewriteDataFilesSparkAction for Spark versions 3.4, 3.5, and 4.0.

Changes:
- Added toBranch(String targetBranch) method to RewriteDataFilesSparkAction
- Updated commitManager() to pass branch parameter to RewriteDataFilesCommitManager
- Added comprehensive branch tests to TestRewriteDataFilesProcedure (all versions)

The implementation follows the same pattern as v4.1 and matches how SparkWrite
handles branches. Integration tests passing: iceberg-delta-lake:check
- Add missing Table import in Spark 3.4 test file
- Fix branch names to use camelCase (testBranch, filteredBranch) to avoid SQL parsing errors
- Ensure files are actually rewritten by inserting multiple small files
- Add min-input-files option to force file compaction
- Remove incorrect snapshot ID ordering assertions
- Add explicit assertions to verify files are rewritten and snapshots change
@97harsh 97harsh marked this pull request as ready for review January 5, 2026 12:06
The previous implementation incorrectly passed null as a 4th parameter in
the 2-arg and 3-arg constructors, which caused them to call the wrong
constructor overload. This resulted in snapshotProperties being null,
leading to a NullPointerException when commitFileGroups() tried to iterate
over properties with forEach().

The issue broke Flink maintenance API tests (TestRewriteDataFiles and
TestFlinkTableSinkCompaction) because the Flink DataFileRewriteCommitter
uses the 2-arg constructor. Files were not being rewritten as expected.

Changes:
- Line 51: Remove null parameter to call 3-arg constructor
- Line 56: Remove null parameter to call 4-arg constructor with Map

This ensures the constructor chain properly passes through existing
constructors without introducing null values, and branch parameter is
correctly passed only through the appropriate constructors.
@97harsh 97harsh marked this pull request as draft January 5, 2026 13:53
When rewrite_data_files was called with a branch parameter, the planner
incorrectly used the main branch's snapshot to scan for files to compact,
while the commit targeted the specified branch. This caused validation
failures when branches diverged.

The fix ensures RewriteDataFilesSparkAction.execute() uses the branch's
snapshot ID when a branch is specified, allowing the planner to correctly
identify and compact files from the branch.

This change applies to all Spark versions (3.4, 3.5, 4.0, 4.1) and fixes
all rewrite strategies (binpack, sort, z-order) since they all rely on
the snapshot ID passed from RewriteDataFilesSparkAction.
Enhanced testBranchCompactionDoesNotAffectMain to verify that the new
snapshot created by rewrite_data_files is a child of the previous branch
snapshot. This ensures the compaction is committed to the branch's history
chain, not to main.

The assertion checks that:
  table.snapshot(branchSnapshotAfterCompaction).parentId() == branchSnapshotBeforeCompaction

This provides stronger validation that the rewrite operation correctly
targets and modifies the specified branch.
@97harsh 97harsh marked this pull request as ready for review January 5, 2026 15:20
Apply spotless formatting to multi-line sql() and assertThat() calls
across Spark 3.4, 3.5, and 4.0 modules.
Adds a precondition check in RewriteDataFilesSparkAction.execute() to
verify that the specified branch exists before attempting to access its
snapshot. This provides a clear error message instead of a cryptic
NullPointerException when a non-existent branch is specified.
@97harsh 97harsh requested a review from pvary January 6, 2026 03:47
@pvary
Copy link
Contributor

pvary commented Jan 6, 2026

Left some comments.
Could you please remove the Spark 3.4, 3.5, 4.0 changes for now, so it is easier to remove, and apply the changes requested by the reviewers?

In a next PR we will do the backport which should be easy and clean.

Thanks!

…thods

Change checkAndApplyFilter and checkAndApplyStrategy to accept and return
RewriteDataFilesSparkAction instead of the RewriteDataFiles interface,
eliminating unnecessary casts at call sites.
Remove rewrite_data_files branch support from older Spark versions,
keeping the feature only in Spark 4.1.
assertThat(table.currentSnapshot().snapshotId())
.as("Main snapshot should remain unchanged")
.isEqualTo(mainSnapshotId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a compare between the actualRecords and expectedRecords?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review, Done!


// Check if files were actually rewritten
int filesRewritten = (Integer) output.get(0)[0];
int filesAdded = (Integer) output.get(0)[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a assert for filesAdded?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

- Use assertEquals with row() pattern matching existing tests
- Add expectedRecords/actualRecords comparison for data integrity
- Use exact file counts (10 files from insertData, compacted to 1)
- Remove conditional assertions in favor of direct assertions
- Simplify test setup to use insertData() consistently
@97harsh 97harsh requested a review from pvary January 7, 2026 08:16
table -> {
RewriteDataFiles action = actions().rewriteDataFiles(table).options(options);
RewriteDataFilesSparkAction action =
(RewriteDataFilesSparkAction) actions().rewriteDataFiles(table).options(options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this cast?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remnant of a previous change

SparkActions.rewriteDataFiles() already returns RewriteDataFilesSparkAction,
so the cast is redundant.
Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from my side
@Guosmilesmile: Any more comments?

Copy link
Contributor

@Guosmilesmile Guosmilesmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as well, just a minor / optional suggestion

long startingSnapshotId,
boolean useStartingSequenceNumber,
Map<String, String> snapshotProperties) {
this(table, startingSnapshotId, useStartingSequenceNumber, snapshotProperties, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor : i wonder if we could just set this to MAIN branch as default, so we don't have to do isNull checks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to use null to delegate to SnapshotProducer's default behavior rather than explicitly setting "main".
This avoids imposing an opinion - if the default branch behavior changes in the future, this will automatically follow. The null check pattern is also consistent with how branch handling works elsewhere in the codebase (see SnapshotUtil methods that treat null as "use default").

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, it makes sense to remove a lot of isNull checks by making toBranch(null) a no-op. This should make the code cleaner.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary / @singhpk234 any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something, I have also considered during the review, but I don't have a strong opinion, so I left as it is.

@singhpk234?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaning +1 on letting null mean “use default branch” and making toBranch(null) a no‑op.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review, done

  • toBranch(null) results in no-op
  • removed one condition check for branch!=null

@97harsh 97harsh requested a review from singhpk234 January 7, 2026 17:21
@97harsh
Copy link
Author

97harsh commented Jan 7, 2026

Are you good @singhpk234 leaving as is?
Can we merge if good?

private boolean removeDanglingDeletes;
private boolean useStartingSequenceNumber;
private boolean caseSensitive;
private String branch = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as what Prashant suggested above, I think we can probably default branch to main and can always assert on table.snapshot(branch) != null

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dramaticlly , per @huaxingao 's recommendation kept

  • null->default branch
  • toBranch(null) results in no-op

This change ensures that calling toBranch(null) does not modify the
internal branch state, treating null as "use default branch". This
simplifies the calling code and removes redundant null checks.
@97harsh
Copy link
Author

97harsh commented Jan 8, 2026

@huaxingao
Can we merge if good?

if (targetBranch != null) {
this.branch = targetBranch;
}
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline - if we keep this code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this?

It would make sense if we would define the branch differently, like:

private String branch = SnapshotRef.MAIN_BRANCH;

If it is just null then this code is just confusing. Basically equals

this.branch = targetBranch;
return this;

Just a little more confusing, because you only can not reset the value back to null 😄

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you this makes sense, the only logical path out of this I see is to default it to MAIN_BRANCH

Updated to align with core Iceberg API behavior. SnapshotProducer.targetBranch() defaults to MAIN_BRANCH and rejects null with IllegalArgumentException - since RewriteDataFilesSparkAction eventually calls through to SnapshotProducer via rewrite.toBranch(), it makes sense to have consistent behavior at the action level.

Changes:

  • branch now defaults to SnapshotRef.MAIN_BRANCH instead of null
  • toBranch(null) now throws IllegalArgumentException (matching SnapshotProducer)
  • Removed null-check guards in execute() that are no longer needed
  • Added test testRewriteDataFilesToNullBranchFails

cc: @huaxingao

…ject null

This change aligns RewriteDataFilesSparkAction.toBranch() with the core
Iceberg API behavior in SnapshotProducer.targetBranch():
- Default branch to SnapshotRef.MAIN_BRANCH instead of null
- Reject null branch with IllegalArgumentException
- Remove null-check guards that are no longer needed

Since RewriteDataFilesSparkAction eventually calls rewrite.toBranch()
which invokes SnapshotProducer.targetBranch(), having consistent
behavior at the action level prevents confusion and potential runtime
errors.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RewriteDataFiles procedure does not support branch operations

6 participants