Skip to content

Commit aff70b7

Browse files
author
bosiew.tian
committed
[core] Support reading sequence_number in AuditLogTable and BinlogTable
1 parent 816a76f commit aff70b7

File tree

21 files changed

+1512
-218
lines changed

21 files changed

+1512
-218
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.paimon.table.source.ChainSplit;
5151
import org.apache.paimon.table.source.DataSplit;
5252
import org.apache.paimon.table.source.DeletionFile;
53+
import org.apache.paimon.table.source.KeyValueSystemFieldsRecordReader;
5354
import org.apache.paimon.table.source.Split;
5455
import org.apache.paimon.types.DataField;
5556
import org.apache.paimon.types.RowType;
@@ -62,8 +63,11 @@
6263
import java.io.IOException;
6364
import java.util.ArrayList;
6465
import java.util.Arrays;
66+
import java.util.Collections;
6567
import java.util.Comparator;
68+
import java.util.HashMap;
6669
import java.util.List;
70+
import java.util.Map;
6771
import java.util.Set;
6872
import java.util.stream.Collectors;
6973

@@ -97,6 +101,11 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> {
97101
@Nullable private int[][] outerProjection;
98102
@Nullable private VariantAccessInfo[] variantAccess;
99103

104+
private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> systemFieldExtractors =
105+
Collections.emptyList();
106+
107+
@Nullable private int[] projection = null;
108+
100109
private boolean forceKeepDelete = false;
101110

102111
public MergeFileSplitRead(
@@ -137,18 +146,31 @@ public MergeFileSplitRead withReadKeyType(RowType readKeyType) {
137146
return this;
138147
}
139148

149+
public List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> getSystemFieldExtractors() {
150+
return systemFieldExtractors;
151+
}
152+
153+
@Nullable
154+
public int[] getProjection() {
155+
return projection;
156+
}
157+
140158
@Override
141159
public MergeFileSplitRead withReadType(RowType readType) {
160+
this.systemFieldExtractors = collectSystemFieldExtractors(readType);
161+
this.projection = createProjection(readType);
162+
142163
// todo: replace projectedFields with readType
143164
RowType tableRowType = tableSchema.logicalRowType();
165+
List<String> fieldNames = tableSchema.fieldNames();
144166
int[][] projectedFields =
145167
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
168+
.filter(i -> i >= 0) // Filter out system fields (index = -1)
146169
.mapToObj(i -> new int[] {i})
147170
.toArray(int[][]::new);
148171
int[][] newProjectedFields = projectedFields;
149172
if (sequenceFields.size() > 0) {
150173
// make sure projection contains sequence fields
151-
List<String> fieldNames = tableSchema.fieldNames();
152174
List<String> projectedNames = Projection.of(projectedFields).project(fieldNames);
153175
int[] lackFields =
154176
sequenceFields.stream()
@@ -408,4 +430,95 @@ public UserDefinedSeqComparator createUdsComparator() {
408430
return UserDefinedSeqComparator.create(
409431
readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
410432
}
433+
434+
/**
435+
* Collects system field extractors for the requested read type.
436+
*
437+
* @param readType the requested read type (may contain system fields)
438+
* @return list of extractors for system fields present in readType
439+
*/
440+
private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor>
441+
collectSystemFieldExtractors(RowType readType) {
442+
if (readType == null) {
443+
return Collections.emptyList();
444+
}
445+
446+
List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> extractors = new ArrayList<>();
447+
for (String fieldName : readType.getFieldNames()) {
448+
KeyValueSystemFieldsRecordReader.SystemFieldExtractor extractor =
449+
KeyValueSystemFieldsRecordReader.getExtractor(fieldName);
450+
if (extractor != null) {
451+
extractors.add(extractor);
452+
}
453+
}
454+
return extractors;
455+
}
456+
457+
/**
458+
* Creates a projection array to reorder fields from natural order to requested order.
459+
*
460+
* <p>Natural order is [system fields... + physical fields...]. This method creates a projection
461+
* array to map from natural order to the requested readType order.
462+
*
463+
* <p>Physical fields in natural order follow the same order as they appear in readType (not
464+
* table schema order).
465+
*
466+
* <p>Example: readType = [pt, rowkind, col1], systemFieldExtractors = [rowkind] Natural order:
467+
* [rowkind(0), pt(1), col1(2)] (physical fields pt, col1 in readType order) Requested order:
468+
* [pt, rowkind, col1] Projection: [1, 0, 2]
469+
*
470+
* @param readType the requested read type (may contain system fields)
471+
* @return projection array, or null if fields are already in natural order
472+
*/
473+
@Nullable
474+
private int[] createProjection(RowType readType) {
475+
if (readType == null || systemFieldExtractors.isEmpty()) {
476+
return null;
477+
}
478+
479+
List<String> readFieldNames = readType.getFieldNames();
480+
List<String> tableFieldNames = tableSchema.fieldNames();
481+
int systemFieldCount = systemFieldExtractors.size();
482+
483+
// Build a map from field name to its index in natural order
484+
Map<String, Integer> naturalOrderMap = new HashMap<>();
485+
486+
// System fields are first in natural order
487+
for (int i = 0; i < systemFieldExtractors.size(); i++) {
488+
for (String sysFieldName : readFieldNames) {
489+
if (KeyValueSystemFieldsRecordReader.getExtractor(sysFieldName)
490+
== systemFieldExtractors.get(i)) {
491+
naturalOrderMap.put(sysFieldName, i);
492+
break;
493+
}
494+
}
495+
}
496+
497+
// Physical fields follow system fields, in the order they appear in readType
498+
int physicalFieldIndex = 0;
499+
for (String fieldName : readFieldNames) {
500+
// Skip if it's a system field
501+
if (KeyValueSystemFieldsRecordReader.getExtractor(fieldName) != null) {
502+
continue;
503+
}
504+
// Only include if it's a valid table field
505+
if (tableFieldNames.contains(fieldName)) {
506+
naturalOrderMap.put(fieldName, systemFieldCount + physicalFieldIndex);
507+
physicalFieldIndex++;
508+
}
509+
}
510+
511+
// Create projection array
512+
int[] projection = new int[readFieldNames.size()];
513+
boolean needsProjection = false;
514+
for (int i = 0; i < readFieldNames.size(); i++) {
515+
String fieldName = readFieldNames.get(i);
516+
projection[i] = naturalOrderMap.get(fieldName);
517+
if (projection[i] != i) {
518+
needsProjection = true;
519+
}
520+
}
521+
522+
return needsProjection ? projection : null;
523+
}
411524
}

0 commit comments

Comments
 (0)