Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -128,6 +129,12 @@ public void commit(WriterCommitMessage[] messages) {
.findFirst()
.orElse(new long[0]);

Map<Long, long[]> mergedUpdatedFragmentOffsets = new HashMap<>();
Arrays.stream(messages)
.map(m -> (TaskCommit) m)
.map(TaskCommit::getUpdatedFragmentOffsets)
.forEach(m -> m.forEach(mergedUpdatedFragmentOffsets::put));

if (updatedFragments.isEmpty()) {
logger.info("No updated fragments to commit.");
return;
Expand All @@ -144,12 +151,14 @@ public void commit(WriterCommitMessage[] messages) {
.map(Fragment::metadata)
.forEach(updatedFragments::add);

// Commit update operation using CommitBuilder
// Commit update operation using CommitBuilder. Pass matched physical row offsets per
// fragment so Lance can partially refresh _row_last_updated_at_version (stable row IDs).
Update update =
Update.builder()
.updatedFragments(updatedFragments)
.fieldsModified(fieldsModified)
.updateMode(Optional.of(Update.UpdateMode.RewriteColumns))
.updatedFragmentOffsets(mergedUpdatedFragmentOffsets)
.build();
long version =
Objects.requireNonNull(
Expand All @@ -168,6 +177,7 @@ public void commit(WriterCommitMessage[] messages) {

public static class UpdateColumnsWriter extends AbstractBackfillWriter {
private final List<FragmentMetadata> updatedFragments = new ArrayList<>();
private final Map<Long, long[]> updatedFragmentOffsets = new HashMap<>();
private long[] fieldsModified;

public UpdateColumnsWriter(
Expand Down Expand Up @@ -197,11 +207,16 @@ protected void processFragment(Fragment fragment, ArrowArrayStream stream) {
LanceDataset.ROW_ADDRESS_COLUMN.name());
updatedFragments.add(result.getUpdatedFragment());
fieldsModified = result.getFieldsModified();
long[] rowOffsets = result.getUpdatedRowOffsets();
if (rowOffsets != null && rowOffsets.length > 0) {
updatedFragmentOffsets.put(
(long) fragment.getId(), Arrays.copyOf(rowOffsets, rowOffsets.length));
}
}

@Override
protected WriterCommitMessage buildCommitMessage() {
return new TaskCommit(updatedFragments, fieldsModified);
return new TaskCommit(updatedFragments, fieldsModified, updatedFragmentOffsets);
}
}

Expand Down Expand Up @@ -266,10 +281,16 @@ public String toString() {
public static class TaskCommit implements WriterCommitMessage {
private final List<FragmentMetadata> updatedFragments;
private final long[] fieldsModified;
private final Map<Long, long[]> updatedFragmentOffsets;

TaskCommit(List<FragmentMetadata> updatedFragments, long[] fieldsModified) {
TaskCommit(
List<FragmentMetadata> updatedFragments,
long[] fieldsModified,
Map<Long, long[]> updatedFragmentOffsets) {
this.updatedFragments = updatedFragments;
this.fieldsModified = fieldsModified;
this.updatedFragmentOffsets =
updatedFragmentOffsets != null ? updatedFragmentOffsets : Collections.emptyMap();
}

List<FragmentMetadata> getUpdatedFragments() {
Expand All @@ -279,5 +300,9 @@ List<FragmentMetadata> getUpdatedFragments() {
long[] getFieldsModified() {
return fieldsModified;
}

Map<Long, long[]> getUpdatedFragmentOffsets() {
return updatedFragmentOffsets;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Base test class for UPDATE COLUMNS FROM command.
Expand Down Expand Up @@ -270,19 +271,10 @@ public void testUpdatePreservesRowIdAndFragId() {
}

/**
* Pins down the version-column behavior of UPDATE COLUMNS FROM on a stable-row-id table.
*
* <p>UPDATE COLUMNS goes through Lance's {@code Update} operation, which (unlike ADD COLUMNS via
* {@code Merge} and unlike row-level UPDATE) does <strong>not</strong> bump {@code
* _row_last_updated_at_version}. CDF consumers therefore cannot detect column-level rewrites via
* the version columns today.
*
* <p>This test pins down current behavior so a future change to make UPDATE COLUMNS CDF-aware
* shows up as a deliberate test update rather than a silent regression.
*
* <p>Tracking upstream fix: https://github.com/lance-format/lance/issues/6734 — once that lands,
* flip the {@code _row_last_updated_at_version} assertion below from {@code assertEquals} to a
* strict-greater check (mirroring the ADD COLUMNS version test).
* UPDATE COLUMNS FROM on a stable-row-id table must preserve {@code _row_created_at_version} and
* advance {@code _row_last_updated_at_version} for rewritten rows. The connector passes matched
* physical row offsets on commit so Lance can partially refresh last-updated metadata (see
* lance-format/lance#6734 and the Java {@code Update.updatedFragmentOffsets} API).
*/
@Test
public void testUpdateColumnsPreservesCreatedAtAndAdvancesLastUpdatedWithStableRowIds() {
Expand Down Expand Up @@ -319,14 +311,9 @@ public void testUpdateColumnsPreservesCreatedAtAndAdvancesLastUpdatedWithStableR
b.getLong(1),
a.getLong(1),
"_row_created_at_version must be unchanged for id=" + b.getInt(0));
// Known gap (lance-format/lance#6734): UPDATE COLUMNS does not currently advance
// last_updated. When that issue is fixed, flip this assertion to a strict-greater check.
assertEquals(
b.getLong(2),
a.getLong(2),
"_row_last_updated_at_version is currently NOT advanced by UPDATE COLUMNS (id="
+ b.getInt(0)
+ ") — if this changes, update the assertion");
assertTrue(
a.getLong(2) > b.getLong(2),
"_row_last_updated_at_version must advance after UPDATE COLUMNS for id=" + b.getInt(0));
}
}

Expand Down
Loading