Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
*/
private boolean meetInsert;

/**
* Flag indicating whether the latest processed record is a retract operation. Used to determine
* the correct behavior when the last record in a sequence is a retract.
*/
private boolean latestRetract = false;

/**
* Stores the latest KeyValue processed. This is used when we need to initialize the row based
* on the last retract record in case the final result should be a deletion.
*/
private KeyValue latestKV;

protected PartialUpdateMergeFunction(
InternalRow.FieldGetter[] getters,
boolean ignoreDelete,
Expand All @@ -118,13 +130,20 @@ public void reset() {
this.notNullColumnFilled = false;
this.row = new GenericRow(getters.length);
this.latestSequenceNumber = 0;
this.latestRetract = false;
this.latestKV = null;
fieldAggregators.forEach(w -> w.getValue().reset());
}

@Override
public void add(KeyValue kv) {
// refresh key object to avoid reference overwritten
currentKey = kv.key();

// Update latestKV and latestRetract status
latestKV = kv;
latestRetract = false;

currentDeleteRow = false;
if (kv.valueKind().isRetract()) {

Expand All @@ -149,9 +168,10 @@ public void add(KeyValue kv) {
if (removeRecordOnDelete) {
if (kv.valueKind() == RowKind.DELETE) {
currentDeleteRow = true;
row = new GenericRow(getters.length);
initRow(row, kv.value());
// We don't immediately reset the row here anymore, instead we handle it in
// getResult()
}
latestRetract = true;
return;
}

Expand Down Expand Up @@ -311,13 +331,16 @@ private void retractWithSequenceGroup(KeyValue kv) {
if (kv.valueKind() == RowKind.DELETE
&& sequenceGroupPartialDelete.contains(field)) {
currentDeleteRow = true;
row = new GenericRow(getters.length);
initRow(row, kv.value());
// We don't immediately reset the row here anymore, instead we
// handle it in getResult()
return;
} else {
row.setField(field, getters[field].getFieldOrNull(kv.value()));
updatedSequenceFields.add(field);
}
if (sequenceGroupPartialDelete.contains(field)) {
latestRetract = true;
}
}
}
} else {
Expand Down Expand Up @@ -363,6 +386,15 @@ public KeyValue getResult() {
reused = new KeyValue();
}

// If the current row should be deleted (currentDeleteRow is true)
// or there is a latest retract signal (latestRetract is true),
// we emit a delete row to retract previously emitted data.
if (currentDeleteRow || latestRetract) {
row = new GenericRow(getters.length);
initRow(row, latestKV.value());
currentDeleteRow = true;
}

RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT;
return reused.replace(currentKey, latestSequenceNumber, rowKind, row);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,102 @@ public void testDeleteReproduceCorrectSequenceNumber() {
assertThat(func.getResult().sequenceNumber()).isEqualTo(1);
}

@Test
public void testUpdateBeforeRetract() {
Options options = new Options();
options.set("partial-update.remove-record-on-delete", "true");
RowType rowType =
RowType.of(
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());

MergeFunctionFactory<KeyValue> factory =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"));

MergeFunction<KeyValue> func = factory.create();

func.reset();

add(func, RowKind.INSERT, 1, 1, 1, 1, 1);
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 1, 1);

validate(func, null, null, null, null, null);
}

@Test
public void testUpdateBeforeRetractSequenceGroup() {
Options options = new Options();
options.set("fields.f3.sequence-group", "f1,f2");
options.set("fields.f6.sequence-group", "f4,f5");
RowType rowType =
RowType.of(
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
MergeFunction<KeyValue> func =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"))
.create();
func.reset();
add(func, 1, 1, 1, 1, 1, 1, 1);
add(func, 1, 2, 2, 2, 2, 2, null);
validate(func, 1, 2, 2, 2, 1, 1, 1);
add(func, 1, 3, 3, 1, 3, 3, 3);
validate(func, 1, 2, 2, 2, 3, 3, 3);

// delete
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, null);
validate(func, 1, null, null, 3, 3, 3, 3);
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, 4);
validate(func, 1, null, null, 3, null, null, 4);
add(func, 1, 4, 4, 4, 5, 5, 5);
validate(func, 1, 4, 4, 4, 5, 5, 5);
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 6, 1, 1, 6);
validate(func, 1, null, null, 6, null, null, 6);
}

@Test
public void testUpdateBeforeRetractSequenceGroupPartialDelete() {
Options options = new Options();
options.set("fields.f3.sequence-group", "f1,f2");
options.set("fields.f6.sequence-group", "f4,f5");
options.set("partial-update.remove-record-on-sequence-group", "f6");
RowType rowType =
RowType.of(
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT(),
DataTypes.INT());
MergeFunction<KeyValue> func =
PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0"))
.create();
func.reset();
add(func, 1, 1, 1, 1, 1, 1, 1);
add(func, 1, 2, 2, 2, 2, 2, null);
validate(func, 1, 2, 2, 2, 1, 1, 1);
add(func, 1, 3, 3, 1, 3, 3, 3);
validate(func, 1, 2, 2, 2, 3, 3, 3);

// delete
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, null);
validate(func, 1, null, null, 3, 3, 3, 3);
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, 4);
validate(func, null, null, null, null, null, null, null);
add(func, 1, 4, 4, 4, 5, 5, 5);
validate(func, 1, 4, 4, 4, 5, 5, 5);
add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 6, 1, 1, 6);
validate(func, null, null, null, null, null, null, null);
}

private void add(MergeFunction<KeyValue> function, Integer... f) {
add(function, RowKind.INSERT, f);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,7 @@ public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
assertThat(result).isEmpty();

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
Expand Down Expand Up @@ -1714,7 +1714,7 @@ public void testPartialUpdateRemoveRecordOnSequenceGroup() throws Exception {
write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 11, 2, 29, 29, 2));
commit.commit(1, write.prepareCommit(true, 1));
result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
assertThat(result).containsExactlyInAnyOrder("+I[1, 1, NULL, 2, NULL, NULL, 2]");
assertThat(result).isEmpty();

// 3. Update After
write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 11, 2, 30, 30, 3));
Expand Down
Loading