Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T
private List<ColumnCategory> columnTypes;
private List<String> measurementList;
private List<TSDataType> dataTypeList;
private List<IMeasurementSchema> fieldSchemaList;
private int deviceIdSize;

private List<ModsOperationUtil.ModsInfo> modsInfoList;
Expand Down Expand Up @@ -194,7 +195,7 @@ public boolean hasNext() {

long size = 0;
List<AbstractAlignedChunkMetadata> iChunkMetadataList =
reader.getAlignedChunkMetadata(pair.left, true);
reader.getAlignedChunkMetadata(pair.left, false);

Iterator<AbstractAlignedChunkMetadata> chunkMetadataIterator =
iChunkMetadataList.iterator();
Expand All @@ -213,27 +214,7 @@ public boolean hasNext() {
continue;
}

Iterator<IChunkMetadata> iChunkMetadataIterator =
alignedChunkMetadata.getValueChunkMetadataList().iterator();
while (iChunkMetadataIterator.hasNext()) {
IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next();
if (iChunkMetadata == null) {
iChunkMetadataIterator.remove();
continue;
}

if (!modifications.isEmpty()
&& ModsOperationUtil.isAllDeletedByMods(
pair.getLeft(),
iChunkMetadata.getMeasurementUid(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
iChunkMetadataIterator.remove();
}
}

if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) {
if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) {
chunkMetadataIterator.remove();
continue;
}
Expand Down Expand Up @@ -267,6 +248,7 @@ public boolean hasNext() {
dataTypeList = new ArrayList<>();
columnTypes = new ArrayList<>();
measurementList = new ArrayList<>();
fieldSchemaList = new ArrayList<>();

for (int i = 0; i < columnSchemaSize; i++) {
final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i);
Expand All @@ -280,6 +262,9 @@ public boolean hasNext() {
measurementList.add(measurementName);
dataTypeList.add(schema.getType());
}
if (ColumnCategory.FIELD.equals(columnCategory)) {
fieldSchemaList.add(schema);
}
}
}
deviceIdSize = dataTypeList.size();
Expand Down Expand Up @@ -331,9 +316,9 @@ private Tablet buildNextTablet() {
tablet =
new Tablet(
tableName,
measurementList,
dataTypeList,
columnTypes,
new ArrayList<>(measurementList),
new ArrayList<>(dataTypeList),
new ArrayList<>(columnTypes),
rowCountAndMemorySize.getLeft());
tablet.initBitMaps();
isFirstRow = false;
Expand Down Expand Up @@ -376,6 +361,20 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta
long size = timeChunkSize;

final List<Chunk> valueChunkList = new ArrayList<>();
final Map<String, IChunkMetadata> valueChunkMetadataMap =
alignedChunkMetadata.getValueChunkMetadataList().stream()
.filter(Objects::nonNull)
.filter(
metadata ->
!isFieldDeletedByMods(
metadata.getMeasurementUid(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime()))
.collect(
Collectors.toMap(
IChunkMetadata::getMeasurementUid,
metadata -> metadata,
(left, right) -> left));

// To ensure that the Tablet has the same alignedChunk column as the current one,
// you need to create a new Tablet to fill in the data.
Expand All @@ -392,50 +391,99 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta
measurementList.subList(deviceIdSize, measurementList.size()).clear();
dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear();

for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) {
final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset);
boolean hasSelectedField = fieldSchemaList.isEmpty();
boolean hasSelectedNonNullChunk = false;
for (; offset < fieldSchemaList.size(); ++offset) {
final IMeasurementSchema schema = fieldSchemaList.get(offset);
if (isFieldDeletedByMods(
schema.getMeasurementName(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime())) {
continue;
}

final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName());
Chunk chunk = null;
if (metadata != null) {
final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata);
size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
if (valueChunkList.isEmpty()) {
chunk = reader.readMemChunk((ChunkMetadata) metadata);
final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk);
if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
if (!hasSelectedNonNullChunk) {
// If the first chunk exceeds the memory limit, we need to allocate more memory
size = newSize;
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size);
columnTypes.add(ColumnCategory.FIELD);
measurementList.add(metadata.getMeasurementUid());
dataTypeList.add(metadata.getDataType());
valueChunkList.add(chunk);
++offset;
} else {
break;
}
break;
} else {
// Record the column information corresponding to Meta to fill in Tablet
columnTypes.add(ColumnCategory.FIELD);
measurementList.add(metadata.getMeasurementUid());
dataTypeList.add(metadata.getDataType());
valueChunkList.add(chunk);
size = newSize;
}
hasSelectedNonNullChunk = true;
}

columnTypes.add(ColumnCategory.FIELD);
measurementList.add(schema.getMeasurementName());
dataTypeList.add(schema.getType());
valueChunkList.add(chunk);
hasSelectedField = true;
}

if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) {
if (offset >= fieldSchemaList.size()) {
currentChunkMetadata = null;
}

if (!hasSelectedField) {
this.chunkReader = null;
this.batchData = null;
return;
}

this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null);
this.modsInfoList =
ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications);
}

private boolean areAllFieldsDeletedByMods(
final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) {
if (modifications.isEmpty() || fieldSchemaList.isEmpty()) {
return false;
}

for (final IMeasurementSchema schema : fieldSchemaList) {
if (!ModsOperationUtil.isAllDeletedByMods(
currentDeviceID,
schema.getMeasurementName(),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
return false;
}
}
return true;
}

private boolean isFieldDeletedByMods(
final String measurementID, final long startTime, final long endTime) {
return !modifications.isEmpty()
&& ModsOperationUtil.isAllDeletedByMods(
deviceID, measurementID, startTime, endTime, modifications);
}

private boolean fillMeasurementValueColumns(
final BatchData data, final Tablet tablet, final int rowIndex) {
final TsPrimitiveType[] primitiveTypes = data.getVector();
final TsPrimitiveType[] primitiveTypes =
Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0];
boolean needFillTime = false;
boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize;

for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) {
final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize];
if (primitiveType == null
|| ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) {
final TsPrimitiveType primitiveType =
i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null;
final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i));
if (!isDeleted) {
hasNonDeletedField = true;
}
if (primitiveType == null || isDeleted) {
switch (dataTypeList.get(i)) {
case TEXT:
case BLOB:
Expand Down Expand Up @@ -480,7 +528,7 @@ private boolean fillMeasurementValueColumns(
throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType());
}
}
return needFillTime;
return needFillTime || hasNonDeletedField;
}

private void fillDeviceIdColumns(
Expand Down
Loading
Loading