From f5a344762d5f4b0e9b57c604be9c910c9737e0a8 Mon Sep 17 00:00:00 2001 From: Jing chen He Date: Wed, 13 May 2026 12:28:10 -0700 Subject: [PATCH] fix: advance _row_last_updated_at_version for UPDATE COLUMNS FROM on stable-row-id tables --- .../UpdateColumnsBackfillBatchWrite.java | 31 +++++++++++++++++-- .../update/BaseUpdateColumnsBackfillTest.java | 29 +++++------------ 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/UpdateColumnsBackfillBatchWrite.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/UpdateColumnsBackfillBatchWrite.java index 5e654e01a..e90a271f2 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/write/UpdateColumnsBackfillBatchWrite.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/write/UpdateColumnsBackfillBatchWrite.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -128,6 +129,12 @@ public void commit(WriterCommitMessage[] messages) { .findFirst() .orElse(new long[0]); + Map mergedUpdatedFragmentOffsets = new HashMap<>(); + Arrays.stream(messages) + .map(m -> (TaskCommit) m) + .map(TaskCommit::getUpdatedFragmentOffsets) + .forEach(m -> m.forEach(mergedUpdatedFragmentOffsets::put)); + if (updatedFragments.isEmpty()) { logger.info("No updated fragments to commit."); return; @@ -144,12 +151,14 @@ public void commit(WriterCommitMessage[] messages) { .map(Fragment::metadata) .forEach(updatedFragments::add); - // Commit update operation using CommitBuilder + // Commit update operation using CommitBuilder. Pass matched physical row offsets per + // fragment so Lance can partially refresh _row_last_updated_at_version (stable row IDs). Update update = Update.builder() .updatedFragments(updatedFragments) .fieldsModified(fieldsModified) .updateMode(Optional.of(Update.UpdateMode.RewriteColumns)) + .updatedFragmentOffsets(mergedUpdatedFragmentOffsets) .build(); long version = Objects.requireNonNull( @@ -168,6 +177,7 @@ public void commit(WriterCommitMessage[] messages) { public static class UpdateColumnsWriter extends AbstractBackfillWriter { private final List updatedFragments = new ArrayList<>(); + private final Map updatedFragmentOffsets = new HashMap<>(); private long[] fieldsModified; public UpdateColumnsWriter( @@ -197,11 +207,16 @@ protected void processFragment(Fragment fragment, ArrowArrayStream stream) { LanceDataset.ROW_ADDRESS_COLUMN.name()); updatedFragments.add(result.getUpdatedFragment()); fieldsModified = result.getFieldsModified(); + long[] rowOffsets = result.getUpdatedRowOffsets(); + if (rowOffsets != null && rowOffsets.length > 0) { + updatedFragmentOffsets.put( + (long) fragment.getId(), Arrays.copyOf(rowOffsets, rowOffsets.length)); + } } @Override protected WriterCommitMessage buildCommitMessage() { - return new TaskCommit(updatedFragments, fieldsModified); + return new TaskCommit(updatedFragments, fieldsModified, updatedFragmentOffsets); } } @@ -266,10 +281,16 @@ public String toString() { public static class TaskCommit implements WriterCommitMessage { private final List updatedFragments; private final long[] fieldsModified; + private final Map updatedFragmentOffsets; - TaskCommit(List updatedFragments, long[] fieldsModified) { + TaskCommit( + List updatedFragments, + long[] fieldsModified, + Map updatedFragmentOffsets) { this.updatedFragments = updatedFragments; this.fieldsModified = fieldsModified; + this.updatedFragmentOffsets = + updatedFragmentOffsets != null ? updatedFragmentOffsets : Collections.emptyMap(); } List getUpdatedFragments() { @@ -279,5 +300,9 @@ List getUpdatedFragments() { long[] getFieldsModified() { return fieldsModified; } + + Map getUpdatedFragmentOffsets() { + return updatedFragmentOffsets; + } } } diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseUpdateColumnsBackfillTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseUpdateColumnsBackfillTest.java index cffb7390a..e03ab0182 100644 --- a/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseUpdateColumnsBackfillTest.java +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/update/BaseUpdateColumnsBackfillTest.java @@ -51,6 +51,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Base test class for UPDATE COLUMNS FROM command. @@ -270,19 +271,10 @@ public void testUpdatePreservesRowIdAndFragId() { } /** - * Pins down the version-column behavior of UPDATE COLUMNS FROM on a stable-row-id table. - * - *

UPDATE COLUMNS goes through Lance's {@code Update} operation, which (unlike ADD COLUMNS via - * {@code Merge} and unlike row-level UPDATE) does not bump {@code - * _row_last_updated_at_version}. CDF consumers therefore cannot detect column-level rewrites via - * the version columns today. - * - *

This test pins down current behavior so a future change to make UPDATE COLUMNS CDF-aware - * shows up as a deliberate test update rather than a silent regression. - * - *

Tracking upstream fix: https://github.com/lance-format/lance/issues/6734 — once that lands, - * flip the {@code _row_last_updated_at_version} assertion below from {@code assertEquals} to a - * strict-greater check (mirroring the ADD COLUMNS version test). + * UPDATE COLUMNS FROM on a stable-row-id table must preserve {@code _row_created_at_version} and + * advance {@code _row_last_updated_at_version} for rewritten rows. The connector passes matched + * physical row offsets on commit so Lance can partially refresh last-updated metadata (see + * lance-format/lance#6734 and the Java {@code Update.updatedFragmentOffsets} API). */ @Test public void testUpdateColumnsPreservesCreatedAtAndAdvancesLastUpdatedWithStableRowIds() { @@ -319,14 +311,9 @@ public void testUpdateColumnsPreservesCreatedAtAndAdvancesLastUpdatedWithStableR b.getLong(1), a.getLong(1), "_row_created_at_version must be unchanged for id=" + b.getInt(0)); - // Known gap (lance-format/lance#6734): UPDATE COLUMNS does not currently advance - // last_updated. When that issue is fixed, flip this assertion to a strict-greater check. - assertEquals( - b.getLong(2), - a.getLong(2), - "_row_last_updated_at_version is currently NOT advanced by UPDATE COLUMNS (id=" - + b.getInt(0) - + ") — if this changes, update the assertion"); + assertTrue( + a.getLong(2) > b.getLong(2), + "_row_last_updated_at_version must advance after UPDATE COLUMNS for id=" + b.getInt(0)); } }