diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 3b6a89d6640c..0f30189d572d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -92,6 +92,18 @@ public class PartialUpdateMergeFunction implements MergeFunction { */ private boolean meetInsert; + /** + * Flag indicating whether the latest processed record is a retract operation. Used to determine + * the correct behavior when the last record in a sequence is a retract. + */ + private boolean latestRetract = false; + + /** + * Stores the latest KeyValue processed. This is used when we need to initialize the row based + * on the last retract record in case the final result should be a deletion. + */ + private KeyValue latestKV; + protected PartialUpdateMergeFunction( InternalRow.FieldGetter[] getters, boolean ignoreDelete, @@ -118,6 +130,8 @@ public void reset() { this.notNullColumnFilled = false; this.row = new GenericRow(getters.length); this.latestSequenceNumber = 0; + this.latestRetract = false; + this.latestKV = null; fieldAggregators.forEach(w -> w.getValue().reset()); } @@ -125,6 +139,11 @@ public void reset() { public void add(KeyValue kv) { // refresh key object to avoid reference overwritten currentKey = kv.key(); + + // Update latestKV and latestRetract status + latestKV = kv; + latestRetract = false; + currentDeleteRow = false; if (kv.valueKind().isRetract()) { @@ -149,9 +168,10 @@ public void add(KeyValue kv) { if (removeRecordOnDelete) { if (kv.valueKind() == RowKind.DELETE) { currentDeleteRow = true; - row = new GenericRow(getters.length); - initRow(row, kv.value()); + // We don't immediately reset the row here anymore, instead we handle it in + // getResult() } + latestRetract = true; return; } @@ -311,13 +331,16 @@ private void retractWithSequenceGroup(KeyValue kv) { if (kv.valueKind() == RowKind.DELETE && sequenceGroupPartialDelete.contains(field)) { currentDeleteRow = true; - row = new GenericRow(getters.length); - initRow(row, kv.value()); + // We don't immediately reset the row here anymore, instead we + // handle it in getResult() return; } else { row.setField(field, getters[field].getFieldOrNull(kv.value())); updatedSequenceFields.add(field); } + if (sequenceGroupPartialDelete.contains(field)) { + latestRetract = true; + } } } } else { @@ -363,6 +386,15 @@ public KeyValue getResult() { reused = new KeyValue(); } + // If the current row should be deleted (currentDeleteRow is true) + // or there is a latest retract signal (latestRetract is true), + // we emit a delete row to retract previously emitted data. + if (currentDeleteRow || latestRetract) { + row = new GenericRow(getters.length); + initRow(row, latestKV.value()); + currentDeleteRow = true; + } + RowKind rowKind = currentDeleteRow || !meetInsert ? RowKind.DELETE : RowKind.INSERT; return reused.replace(currentKey, latestSequenceNumber, rowKind, row); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java index 5e88d2758ede..e8db80a90e4f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java @@ -882,6 +882,102 @@ public void testDeleteReproduceCorrectSequenceNumber() { assertThat(func.getResult().sequenceNumber()).isEqualTo(1); } + @Test + public void testUpdateBeforeRetract() { + Options options = new Options(); + options.set("partial-update.remove-record-on-delete", "true"); + RowType rowType = + RowType.of( + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT()); + + MergeFunctionFactory factory = + PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0")); + + MergeFunction func = factory.create(); + + func.reset(); + + add(func, RowKind.INSERT, 1, 1, 1, 1, 1); + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 1, 1); + + validate(func, null, null, null, null, null); + } + + @Test + public void testUpdateBeforeRetractSequenceGroup() { + Options options = new Options(); + options.set("fields.f3.sequence-group", "f1,f2"); + options.set("fields.f6.sequence-group", "f4,f5"); + RowType rowType = + RowType.of( + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT()); + MergeFunction func = + PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0")) + .create(); + func.reset(); + add(func, 1, 1, 1, 1, 1, 1, 1); + add(func, 1, 2, 2, 2, 2, 2, null); + validate(func, 1, 2, 2, 2, 1, 1, 1); + add(func, 1, 3, 3, 1, 3, 3, 3); + validate(func, 1, 2, 2, 2, 3, 3, 3); + + // delete + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, null); + validate(func, 1, null, null, 3, 3, 3, 3); + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, 4); + validate(func, 1, null, null, 3, null, null, 4); + add(func, 1, 4, 4, 4, 5, 5, 5); + validate(func, 1, 4, 4, 4, 5, 5, 5); + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 6, 1, 1, 6); + validate(func, 1, null, null, 6, null, null, 6); + } + + @Test + public void testUpdateBeforeRetractSequenceGroupPartialDelete() { + Options options = new Options(); + options.set("fields.f3.sequence-group", "f1,f2"); + options.set("fields.f6.sequence-group", "f4,f5"); + options.set("partial-update.remove-record-on-sequence-group", "f6"); + RowType rowType = + RowType.of( + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT(), + DataTypes.INT()); + MergeFunction func = + PartialUpdateMergeFunction.factory(options, rowType, ImmutableList.of("f0")) + .create(); + func.reset(); + add(func, 1, 1, 1, 1, 1, 1, 1); + add(func, 1, 2, 2, 2, 2, 2, null); + validate(func, 1, 2, 2, 2, 1, 1, 1); + add(func, 1, 3, 3, 1, 3, 3, 3); + validate(func, 1, 2, 2, 2, 3, 3, 3); + + // delete + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, null); + validate(func, 1, null, null, 3, 3, 3, 3); + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 3, 1, 1, 4); + validate(func, null, null, null, null, null, null, null); + add(func, 1, 4, 4, 4, 5, 5, 5); + validate(func, 1, 4, 4, 4, 5, 5, 5); + add(func, RowKind.UPDATE_BEFORE, 1, 1, 1, 6, 1, 1, 6); + validate(func, null, null, null, null, null, null, null); + } + private void add(MergeFunction function, Integer... f) { add(function, RowKind.INSERT, f); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 6e6519e8ec40..f91634bab856 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -1635,7 +1635,7 @@ public void testPartialUpdateRemoveRecordOnDelete() throws Exception { write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); commit.commit(1, write.prepareCommit(true, 1)); result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); - assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + assertThat(result).isEmpty(); // 3. Update After write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); @@ -1714,7 +1714,7 @@ public void testPartialUpdateRemoveRecordOnSequenceGroup() throws Exception { write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 11, 2, 29, 29, 2)); commit.commit(1, write.prepareCommit(true, 1)); result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); - assertThat(result).containsExactlyInAnyOrder("+I[1, 1, NULL, 2, NULL, NULL, 2]"); + assertThat(result).isEmpty(); // 3. Update After write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 11, 2, 30, 30, 3));