Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/format/table/transaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ The following operations are retryable conflicts with DataReplacement:
- CreateIndex (only if the field being replaced is being indexed)
- Rewrite (only if overlapping fragments)
- Update (only if overlapping fragments)
- Delete (only if overlapping fragments)
- Merge (always)

### UpdateMemWalState
Expand Down
3 changes: 2 additions & 1 deletion rust/lance/src/dataset/tests/dataset_merge_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,8 @@ async fn test_datafile_replacement_error() {
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, new_data_file)],
},
Some(2),
// read at the current version (after the Merge above)
Some(dataset.manifest.version),
None,
None,
Arc::new(Default::default()),
Expand Down
111 changes: 107 additions & 4 deletions rust/lance/src/io/commit/conflict_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -904,13 +904,42 @@ impl<'a> TransactionRebase<'a> {
match &other_transaction.operation {
Operation::Append { .. }
| Operation::Clone { .. }
| Operation::Delete { .. }
| Operation::Update { .. }
| Operation::Merge { .. }
| Operation::UpdateConfig { .. }
| Operation::ReserveFragments { .. }
| Operation::Project { .. }
| Operation::UpdateBases { .. } => Ok(()),
Operation::Merge { .. } => {
// Merge rewrites the whole fragment list; always conflict
// (symmetric with check_merge_txn).
Err(self.retryable_conflict_err(other_transaction, other_version))
}
Operation::Update {
updated_fragments,
removed_fragment_ids,
..
}
| Operation::Delete {
updated_fragments,
deleted_fragment_ids: removed_fragment_ids,
..
} => {
// A concurrent Update/Delete that changed one of our target
// fragments makes our positional column file stale; conflict so
// the committer rebuilds (lance otherwise accepts it silently).
for replacement in replacements {
let touches_our_fragment = updated_fragments
.iter()
.map(|f| f.id)
.chain(removed_fragment_ids.iter().copied())
.any(|id| id == replacement.0);
if touches_our_fragment {
return Err(
self.retryable_conflict_err(other_transaction, other_version)
);
}
}
Comment on lines +926 to +940

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to conflict if a delete removed a fragment that we updated? Should this be asymmetric?

DataReplacement followed by Delete -> Disallowed?
Delete followed by DataReplacement -> Allowed (but changes discarded silently)

This is a genuine question, I don't really know the answer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the specific case of Delete followed by DataReplacement, will throw an error here if you hit it today:
https://github.com/lance-format/lance/blob/main/rust/lance/src/dataset/transaction.rs#L2229,L2231

I assume we could probably work around this in the DataReplacement operation but handling it in the conflict mechanism seems like a more certain approach to me.

For the other direction we definitely need to conflict or the wrong rows may get deleted.

Ok(())
}
Operation::CreateIndex { new_indices, .. } => {
// A data replacement only conflicts if it is updating the field that
// is being indexed.
Expand Down Expand Up @@ -3258,7 +3287,7 @@ mod tests {
(
"DataReplacement vs Rewrite on different fragment",
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01)],
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())],
},
Operation::Rewrite {
groups: vec![RewriteGroup {
Expand All @@ -3270,6 +3299,80 @@ mod tests {
},
Compatible,
),
// A concurrent Update/Delete on a fragment we replace a column in must
// conflict, else the stale positional file is applied silently.
(
"DataReplacement vs Update on same fragment",
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())],
},
Operation::Update {
updated_fragments: vec![Fragment::new(0)],
removed_fragment_ids: vec![],
new_fragments: vec![],
fields_modified: vec![],
merged_generations: Vec::new(),
fields_for_preserving_frag_bitmap: vec![],
update_mode: None,
inserted_rows_filter: None,
updated_fragment_offsets: None,
},
Retryable,
),
(
"DataReplacement vs Update on different fragment",
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())],
},
Operation::Update {
updated_fragments: vec![Fragment::new(1)],
removed_fragment_ids: vec![],
new_fragments: vec![],
fields_modified: vec![],
merged_generations: Vec::new(),
fields_for_preserving_frag_bitmap: vec![],
update_mode: None,
inserted_rows_filter: None,
updated_fragment_offsets: None,
},
Compatible,
),
(
"DataReplacement vs Delete on same fragment",
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())],
},
Operation::Delete {
deleted_fragment_ids: vec![],
updated_fragments: vec![Fragment::new(0)],
predicate: "a > 0".to_string(),
},
Retryable,
),
(
"DataReplacement vs Delete that removes the fragment",
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01.clone())],
},
Operation::Delete {
deleted_fragment_ids: vec![0],
updated_fragments: vec![],
predicate: "a > 0".to_string(),
},
Retryable,
),
// Merge rewrites the whole fragment list -> always conflicts.
(
"DataReplacement vs Merge",
Operation::DataReplacement {
replacements: vec![DataReplacementGroup(0, data_file_frag0_fields01)],
},
Operation::Merge {
fragments: vec![Fragment::new(0)],
schema: lance_core::datatypes::Schema::default(),
},
Retryable,
),
];

for (description, op1, op2, expected) in cases {
Expand Down
Loading