From ce0fa98cb84095751ced7a2ecd6d715f195154cb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 14:06:29 +0800 Subject: [PATCH 1/2] fix --- ...sertionEventTableParserTabletIterator.java | 144 ++++++++++++------ .../plan/analyze/load/LoadTsFileAnalyzer.java | 99 +++++++++--- .../event/TsFileInsertionEventParserTest.java | 138 +++++++++++++++++ .../analyze/load/LoadTsFileAnalyzerTest.java | 110 +++++++++++++ 4 files changed, 420 insertions(+), 71 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index f05cf872c798b..36479a1c701dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -101,6 +101,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator columnTypes; private List measurementList; private List dataTypeList; + private List fieldSchemaList; private int deviceIdSize; private List modsInfoList; @@ -194,7 +195,7 @@ public boolean hasNext() { long size = 0; List iChunkMetadataList = - reader.getAlignedChunkMetadata(pair.left, true); + reader.getAlignedChunkMetadata(pair.left, false); Iterator chunkMetadataIterator = iChunkMetadataList.iterator(); @@ -213,27 +214,7 @@ public boolean hasNext() { continue; } - Iterator iChunkMetadataIterator = - alignedChunkMetadata.getValueChunkMetadataList().iterator(); - while (iChunkMetadataIterator.hasNext()) { - IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next(); - if (iChunkMetadata == null) { - iChunkMetadataIterator.remove(); - continue; - } - - if (!modifications.isEmpty() - && ModsOperationUtil.isAllDeletedByMods( - pair.getLeft(), - iChunkMetadata.getMeasurementUid(), - alignedChunkMetadata.getStartTime(), - alignedChunkMetadata.getEndTime(), - modifications)) { - iChunkMetadataIterator.remove(); - } - } - - if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) { + if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) { chunkMetadataIterator.remove(); continue; } @@ -267,6 +248,7 @@ public boolean hasNext() { dataTypeList = new ArrayList<>(); columnTypes = new ArrayList<>(); measurementList = new ArrayList<>(); + fieldSchemaList = new ArrayList<>(); for (int i = 0; i < columnSchemaSize; i++) { final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i); @@ -280,6 +262,9 @@ public boolean hasNext() { measurementList.add(measurementName); dataTypeList.add(schema.getType()); } + if (ColumnCategory.FIELD.equals(columnCategory)) { + fieldSchemaList.add(schema); + } } } deviceIdSize = dataTypeList.size(); @@ -331,9 +316,9 @@ private Tablet buildNextTablet() { tablet = new Tablet( tableName, - measurementList, - dataTypeList, - columnTypes, + new ArrayList<>(measurementList), + new ArrayList<>(dataTypeList), + new ArrayList<>(columnTypes), rowCountAndMemorySize.getLeft()); tablet.initBitMaps(); isFirstRow = false; @@ -376,6 +361,20 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta long size = timeChunkSize; final List valueChunkList = new ArrayList<>(); + final Map valueChunkMetadataMap = + alignedChunkMetadata.getValueChunkMetadataList().stream() + .filter(Objects::nonNull) + .filter( + metadata -> + !isFieldDeletedByMods( + metadata.getMeasurementUid(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime())) + .collect( + Collectors.toMap( + IChunkMetadata::getMeasurementUid, + metadata -> metadata, + (left, right) -> left)); // To ensure that the Tablet has the same alignedChunk column as the current one, // you need to create a new Tablet to fill in the data. @@ -392,50 +391,99 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta measurementList.subList(deviceIdSize, measurementList.size()).clear(); dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear(); - for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) { - final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset); + boolean hasSelectedField = fieldSchemaList.isEmpty(); + boolean hasSelectedNonNullChunk = false; + for (; offset < fieldSchemaList.size(); ++offset) { + final IMeasurementSchema schema = fieldSchemaList.get(offset); + if (isFieldDeletedByMods( + schema.getMeasurementName(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime())) { + continue; + } + + final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName()); + Chunk chunk = null; if (metadata != null) { - final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata); - size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk); - if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - if (valueChunkList.isEmpty()) { + chunk = reader.readMemChunk((ChunkMetadata) metadata); + final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk); + if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + if (!hasSelectedNonNullChunk) { // If the first chunk exceeds the memory limit, we need to allocate more memory + size = newSize; PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size); - columnTypes.add(ColumnCategory.FIELD); - measurementList.add(metadata.getMeasurementUid()); - dataTypeList.add(metadata.getDataType()); - valueChunkList.add(chunk); - ++offset; + } else { + break; } - break; } else { - // Record the column information corresponding to Meta to fill in Tablet - columnTypes.add(ColumnCategory.FIELD); - measurementList.add(metadata.getMeasurementUid()); - dataTypeList.add(metadata.getDataType()); - valueChunkList.add(chunk); + size = newSize; } + hasSelectedNonNullChunk = true; } + + columnTypes.add(ColumnCategory.FIELD); + measurementList.add(schema.getMeasurementName()); + dataTypeList.add(schema.getType()); + valueChunkList.add(chunk); + hasSelectedField = true; } - if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) { + if (offset >= fieldSchemaList.size()) { currentChunkMetadata = null; } + if (!hasSelectedField) { + this.chunkReader = null; + this.batchData = null; + return; + } + this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null); this.modsInfoList = ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications); } + private boolean areAllFieldsDeletedByMods( + final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) { + if (modifications.isEmpty() || fieldSchemaList.isEmpty()) { + return false; + } + + for (final IMeasurementSchema schema : fieldSchemaList) { + if (!ModsOperationUtil.isAllDeletedByMods( + currentDeviceID, + schema.getMeasurementName(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime(), + modifications)) { + return false; + } + } + return true; + } + + private boolean isFieldDeletedByMods( + final String measurementID, final long startTime, final long endTime) { + return !modifications.isEmpty() + && ModsOperationUtil.isAllDeletedByMods( + deviceID, measurementID, startTime, endTime, modifications); + } + private boolean fillMeasurementValueColumns( final BatchData data, final Tablet tablet, final int rowIndex) { - final TsPrimitiveType[] primitiveTypes = data.getVector(); + final TsPrimitiveType[] primitiveTypes = + Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0]; boolean needFillTime = false; + boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize; for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) { - final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize]; - if (primitiveType == null - || ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) { + final TsPrimitiveType primitiveType = + i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null; + final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i)); + if (!isDeleted) { + hasNonDeletedField = true; + } + if (primitiveType == null || isDeleted) { switch (dataTypeList.get(i)) { case TEXT: case BLOB: @@ -480,7 +528,7 @@ private boolean fillMeasurementValueColumns( throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType()); } } - return needFillTime; + return needFillTime || hasNonDeletedField; } private void fillDeviceIdColumns( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index e1e6d59719145..680add7ec5cd6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -54,11 +54,17 @@ import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.external.commons.io.FileUtils; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.MetadataIndexNode; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.TimeseriesMetadata; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator; +import org.apache.tsfile.read.controller.IMetadataQuerier; +import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; +import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +75,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -381,7 +388,7 @@ private void analyzeSingleTsFile(final File tsFile, int i) throws Exception { .getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount()); // check if the tsfile is empty - if (!timeseriesMetadataIterator.hasNext()) { + if (!isTableModelFile && !timeseriesMetadataIterator.hasNext()) { throw new LoadEmptyFileException(tsFile.getAbsolutePath()); } @@ -410,7 +417,7 @@ && handleSingleMiniFile(i)) { sessionInfo.getDatabaseName().orElse(null), SqlDialect.TABLE); context.setSession(newSessionInfo); - doAnalyzeSingleTableFile(tsFile, reader, timeseriesMetadataIterator, tableSchemaMap); + doAnalyzeSingleTableFile(tsFile, reader, tableSchemaMap); } else { final SessionInfo newSessionInfo = new SessionInfo( @@ -525,14 +532,11 @@ private void doAnalyzeSingleTreeFile( private void doAnalyzeSingleTableFile( final File tsFile, final TsFileSequenceReader reader, - final TsFileSequenceReaderTimeseriesMetadataIterator timeseriesMetadataIterator, final Map tableSchemaMap) - throws IOException, LoadAnalyzeException { + throws IOException, LoadAnalyzeException, LoadEmptyFileException { // construct tsfile resource final TsFileResource tsFileResource = constructTsFileResource(reader, tsFile); - long writePointCount = 0; - if (Objects.isNull(databaseForTableData)) { // If database is not specified, use the database from current session. // If still not specified, throw an exception. @@ -553,23 +557,9 @@ private void doAnalyzeSingleTableFile( getOrCreateTableSchemaCache().setTableSchemaMap(tableSchemaMap); getOrCreateTableSchemaCache().setCurrentModificationsAndTimeIndex(tsFileResource, reader); - while (timeseriesMetadataIterator.hasNext()) { - final Map> device2TimeseriesMetadata = - timeseriesMetadataIterator.next(); - - // Update time index no matter if resource file exists or not, because resource file may be - // untrusted - TsFileResourceUtils.updateTsFileResource( - device2TimeseriesMetadata, - tsFileResource, - IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()); - getOrCreateTableSchemaCache().setCurrentTimeIndex(tsFileResource.getTimeIndex()); - - for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) { - getOrCreateTableSchemaCache().autoCreateAndVerify(deviceId); - } - - writePointCount += getWritePointCount(device2TimeseriesMetadata); + final long writePointCount = updateTableTsFileResourceAndVerifySchema(reader, tsFileResource); + if (tsFileResource.getDevices().isEmpty()) { + throw new LoadEmptyFileException(tsFile.getAbsolutePath()); } getOrCreateTableSchemaCache().flush(); @@ -589,6 +579,50 @@ private void doAnalyzeSingleTableFile( addWritePointCount(writePointCount); } + private long updateTableTsFileResourceAndVerifySchema( + final TsFileSequenceReader reader, final TsFileResource tsFileResource) + throws IOException, LoadAnalyzeException { + long writePointCount = 0; + final IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader); + final List tableNames = + new ArrayList<>(metadataQuerier.getWholeFileMetadata().getTableSchemaMap().keySet()); + + for (final String tableName : tableNames) { + final MetadataIndexNode tableRoot = + metadataQuerier.getWholeFileMetadata().getTableMetadataIndexNode(tableName); + if (Objects.isNull(tableRoot)) { + continue; + } + + final Iterator> deviceIterator = + metadataQuerier.deviceIterator(tableRoot, null); + while (deviceIterator.hasNext()) { + final IDeviceID deviceId = deviceIterator.next().getLeft(); + boolean hasChunk = false; + + for (final AbstractAlignedChunkMetadata alignedChunkMetadata : + reader.getAlignedChunkMetadata(deviceId, false)) { + if (Objects.isNull(alignedChunkMetadata) + || Objects.isNull(alignedChunkMetadata.getTimeChunkMetadata())) { + continue; + } + + hasChunk = true; + tsFileResource.updateStartTime(deviceId, alignedChunkMetadata.getStartTime()); + tsFileResource.updateEndTime(deviceId, alignedChunkMetadata.getEndTime()); + writePointCount += getTableWritePointCount(alignedChunkMetadata); + } + + if (hasChunk) { + getOrCreateTableSchemaCache().setCurrentTimeIndex(tsFileResource.getTimeIndex()); + getOrCreateTableSchemaCache().autoCreateAndVerify(deviceId); + } + } + } + + return writePointCount; + } + private TsFileResource constructTsFileResource( final TsFileSequenceReader reader, final File tsFile) throws IOException { final TsFileResource tsFileResource = new TsFileResource(tsFile); @@ -636,6 +670,25 @@ private static long getWritePointCount( .sum(); } + private static long getTableWritePointCount( + final AbstractAlignedChunkMetadata alignedChunkMetadata) { + long writePointCount = 0; + boolean hasValueChunkMetadata = false; + for (final IChunkMetadata valueChunkMetadata : + alignedChunkMetadata.getValueChunkMetadataList()) { + if (Objects.nonNull(valueChunkMetadata) + && Objects.nonNull(valueChunkMetadata.getStatistics())) { + hasValueChunkMetadata = true; + writePointCount += valueChunkMetadata.getStatistics().getCount(); + } + } + return hasValueChunkMetadata + || Objects.isNull(alignedChunkMetadata.getTimeChunkMetadata()) + || Objects.isNull(alignedChunkMetadata.getTimeChunkMetadata().getStatistics()) + ? writePointCount + : alignedChunkMetadata.getTimeChunkMetadata().getStatistics().getCount(); + } + private void addWritePointCount(long writePointCount) { if (isTableModelStatement) { loadTsFileTableStatement.addWritePointCount(writePointCount); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index a2e7c558ea0b5..9d1b476f73666 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -24,18 +24,30 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.pipe.api.access.Row; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; @@ -45,9 +57,12 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -60,12 +75,14 @@ import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.junit.Assert.fail; @@ -96,6 +113,7 @@ public void tearDown() throws Exception { .getConfig() .setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); if (alignedTsFile != null) { + ModificationFile.getExclusiveMods(alignedTsFile).delete(); alignedTsFile.delete(); } if (nonalignedTsFile != null) { @@ -120,6 +138,126 @@ public void testScanParser() throws Exception { System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testTableParserWithAllNullFields() throws Exception { + alignedTsFile = new File("table-all-null.tsfile"); + writeTableTsFileWithNullableFields(true); + + assertParsedTablets(parseTableTablets(false), Arrays.asList("tag1", "s1", "s2"), 3, true); + } + + @Test + public void testTableParserWithMixedAllNullFields() throws Exception { + alignedTsFile = new File("table-mixed-all-null.tsfile"); + writeTableTsFileWithNullableFields(false); + + assertParsedTablets(parseTableTablets(false), Arrays.asList("tag1", "s1", "s2"), 3, false); + } + + @Test + public void testTableParserWithAllNullFieldsAndDeletedValueChunk() throws Exception { + alignedTsFile = new File("table-all-null-with-mod.tsfile"); + writeTableTsFileWithNullableFields(false); + try (final ModificationFile modificationFile = + new ModificationFile(ModificationFile.getExclusiveMods(alignedTsFile), false)) { + modificationFile.write( + new TableDeletionEntry( + new DeletionPredicate( + "table1", new IDPredicate.NOP(), Collections.singletonList("s2")), + new TimeRange(100, 102))); + } + + assertParsedTablets(parseTableTablets(true), Arrays.asList("tag1", "s1"), 3, true); + } + + private void writeTableTsFileWithNullableFields(final boolean allFieldsNull) throws Exception { + final List tableSchemaList = + Arrays.asList( + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.DOUBLE)); + final List columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); + + final Schema schema = new Schema(); + schema.registerTableSchema(new TableSchema("table1", tableSchemaList, columnCategoryList)); + try (final TsFileIOWriter writer = new TsFileIOWriter(alignedTsFile)) { + writer.setSchema(schema); + final IDeviceID deviceID = new StringArrayDeviceID(new String[] {"table1", "tagA"}); + writer.startChunkGroup(deviceID); + + final AlignedChunkWriterImpl chunkWriter = + new AlignedChunkWriterImpl(tableSchemaList.subList(1, tableSchemaList.size())); + for (int i = 0; i < 3; i++) { + final long time = 100 + i; + chunkWriter.getTimeChunkWriter().write(time); + chunkWriter.getValueChunkWriterByIndex(0).write(time, 0L, true); + chunkWriter.getValueChunkWriterByIndex(1).write(time, 1.0 + i, allFieldsNull); + } + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + writer.endFile(); + } + } + + private List parseTableTablets(final boolean isWithMod) throws IOException { + final List parsedTablets = new ArrayList<>(); + try (final TsFileInsertionEventTableParser parser = + new TsFileInsertionEventTableParser( + alignedTsFile, + new TablePattern(true, null, null), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + null, + isWithMod)) { + for (final TabletInsertionEvent event : parser.toTabletInsertionEvents()) { + Assert.assertTrue(event instanceof PipeRawTabletInsertionEvent); + parsedTablets.add(((PipeRawTabletInsertionEvent) event).convertToTablet()); + } + } + return parsedTablets; + } + + private void assertParsedTablets( + final List tablets, + final List expectedColumns, + final int expectedRowCount, + final boolean expectS2Null) { + Assert.assertFalse(tablets.isEmpty()); + int rowCount = 0; + for (final Tablet tablet : tablets) { + if (tablet.getRowSize() == 0) { + continue; + } + + Assert.assertEquals("table1", tablet.getTableName()); + Assert.assertEquals( + expectedColumns, + tablet.getSchemas().stream() + .map(IMeasurementSchema::getMeasurementName) + .collect(Collectors.toList())); + Assert.assertEquals(ColumnCategory.TAG, tablet.getColumnTypes().get(0)); + for (int i = 1; i < tablet.getColumnTypes().size(); i++) { + Assert.assertEquals(ColumnCategory.FIELD, tablet.getColumnTypes().get(i)); + } + + for (int i = 0; i < tablet.getRowSize(); i++, rowCount++) { + Assert.assertEquals(100 + rowCount, tablet.getTimestamp(i)); + Assert.assertFalse(tablet.isNull(i, 0)); + Assert.assertTrue(tablet.isNull(i, 1)); + if (expectedColumns.size() > 2) { + Assert.assertEquals(expectS2Null, tablet.isNull(i, 2)); + if (!expectS2Null) { + Assert.assertEquals(1.0 + rowCount, (double) tablet.getValue(i, 2), 0.0); + } + } + } + } + Assert.assertEquals(expectedRowCount, rowCount); + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java new file mode 100644 index 0000000000000..9e86ca60728b0 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.analyze.load; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class LoadTsFileAnalyzerTest { + + @Test + public void testTableWritePointCountFallbackToTimeChunkWhenAllFieldsNull() throws Exception { + final File tsFile = new File("load-table-all-null.tsfile"); + writeTableTsFileWithAllNullFields(tsFile); + + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + final IDeviceID deviceID = new StringArrayDeviceID(new String[] {"table1", "tagA"}); + final List alignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, false); + Assert.assertEquals(1, alignedChunkMetadataList.size()); + + final AbstractAlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); + Assert.assertNotNull(alignedChunkMetadata.getTimeChunkMetadata()); + Assert.assertEquals( + 3, alignedChunkMetadata.getTimeChunkMetadata().getStatistics().getCount()); + Assert.assertEquals( + 0, + alignedChunkMetadata.getValueChunkMetadataList().stream() + .filter(Objects::nonNull) + .count()); + + final Method method = + LoadTsFileAnalyzer.class.getDeclaredMethod( + "getTableWritePointCount", AbstractAlignedChunkMetadata.class); + method.setAccessible(true); + Assert.assertEquals(3L, method.invoke(null, alignedChunkMetadata)); + } finally { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + } + } + + private void writeTableTsFileWithAllNullFields(final File tsFile) throws Exception { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + + final List tableSchemaList = + Arrays.asList( + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.DOUBLE)); + final List columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); + + final Schema schema = new Schema(); + schema.registerTableSchema(new TableSchema("table1", tableSchemaList, columnCategoryList)); + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.setSchema(schema); + writer.startChunkGroup(new StringArrayDeviceID(new String[] {"table1", "tagA"})); + + final AlignedChunkWriterImpl chunkWriter = + new AlignedChunkWriterImpl(tableSchemaList.subList(1, tableSchemaList.size())); + for (int i = 0; i < 3; i++) { + final long time = 100 + i; + chunkWriter.getTimeChunkWriter().write(time); + chunkWriter.getValueChunkWriterByIndex(0).write(time, 0L, true); + chunkWriter.getValueChunkWriterByIndex(1).write(time, 0.0, true); + } + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + writer.endFile(); + } + } +} From e2cb0837669ff3b56829fd852b605b2ec29e70d8 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 19:38:42 +0800 Subject: [PATCH 2/2] fix --- .../plan/analyze/load/LoadTsFileAnalyzer.java | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 680add7ec5cd6..8944975fc412b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter; @@ -53,6 +54,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.encrypt.EncryptUtils; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.external.commons.io.FileUtils; import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -60,11 +62,14 @@ import org.apache.tsfile.file.metadata.MetadataIndexNode; import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator; import org.apache.tsfile.read.controller.IMetadataQuerier; import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.utils.TsPrimitiveType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +80,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -583,6 +589,18 @@ private long updateTableTsFileResourceAndVerifySchema( final TsFileSequenceReader reader, final TsFileResource tsFileResource) throws IOException, LoadAnalyzeException { long writePointCount = 0; + if (tsFileResource.getTimeIndex().getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) { + tsFileResource.setTimeIndex( + IoTDBDescriptor.getInstance().getConfig().getTimeIndexLevel().getTimeIndex()); + } + Map> deviceLastValues = + IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad() + ? new HashMap<>() + : null; + long lastValuesMemCost = 0; + final long lastValuesMemoryBudget = + IoTDBDescriptor.getInstance().getConfig().getCacheLastValuesMemoryBudgetInByte(); + final IMetadataQuerier metadataQuerier = new MetadataQuerierByFileImpl(reader); final List tableNames = new ArrayList<>(metadataQuerier.getWholeFileMetadata().getTableSchemaMap().keySet()); @@ -611,6 +629,18 @@ private long updateTableTsFileResourceAndVerifySchema( tsFileResource.updateStartTime(deviceId, alignedChunkMetadata.getStartTime()); tsFileResource.updateEndTime(deviceId, alignedChunkMetadata.getEndTime()); writePointCount += getTableWritePointCount(alignedChunkMetadata); + if (deviceLastValues != null) { + lastValuesMemCost = + updateTableDeviceLastValues( + deviceLastValues, + deviceId, + alignedChunkMetadata, + lastValuesMemCost, + lastValuesMemoryBudget); + if (lastValuesMemCost > lastValuesMemoryBudget) { + deviceLastValues = null; + } + } } if (hasChunk) { @@ -620,9 +650,89 @@ private long updateTableTsFileResourceAndVerifySchema( } } + tsFileResource.setLastValues(convertTableDeviceLastValues(deviceLastValues)); return writePointCount; } + private static long updateTableDeviceLastValues( + final Map> deviceLastValues, + final IDeviceID deviceId, + final AbstractAlignedChunkMetadata alignedChunkMetadata, + long lastValuesMemCost, + final long lastValuesMemoryBudget) { + for (final IChunkMetadata chunkMetadata : alignedChunkMetadata.getValueChunkMetadataList()) { + if (Objects.isNull(chunkMetadata)) { + continue; + } + + Map deviceMap = deviceLastValues.get(deviceId); + if (Objects.isNull(deviceMap)) { + deviceMap = new HashMap<>(); + deviceLastValues.put(deviceId, deviceMap); + lastValuesMemCost += RamUsageEstimator.shallowSizeOf(deviceMap); + lastValuesMemCost += deviceId.ramBytesUsed(); + } + + final int previousSize = deviceMap.size(); + final String measurement = chunkMetadata.getMeasurementUid(); + final TimeValuePair oldPair = deviceMap.get(measurement); + if (Objects.nonNull(oldPair) && oldPair.getTimestamp() > chunkMetadata.getEndTime()) { + continue; + } + + final TimeValuePair newPair = buildLastValuePair(chunkMetadata); + if (Objects.nonNull(oldPair)) { + lastValuesMemCost -= oldPair.getSize(); + } + if (Objects.nonNull(newPair)) { + deviceMap.put(measurement, newPair); + lastValuesMemCost += newPair.getSize(); + } else { + deviceMap.remove(measurement); + } + lastValuesMemCost += + (long) (deviceMap.size() - previousSize) * RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY; + if (lastValuesMemCost > lastValuesMemoryBudget) { + return lastValuesMemCost; + } + } + return lastValuesMemCost; + } + + private static TimeValuePair buildLastValuePair(final IChunkMetadata chunkMetadata) { + if (Objects.isNull(chunkMetadata.getStatistics()) + || Objects.equals(chunkMetadata.getDataType(), TSDataType.BLOB)) { + return null; + } + + final TsPrimitiveType lastValue = + TsPrimitiveType.getByType( + Objects.equals(chunkMetadata.getDataType(), TSDataType.VECTOR) + ? TSDataType.INT64 + : chunkMetadata.getDataType(), + chunkMetadata.getStatistics().getLastValue()); + return new TimeValuePair(chunkMetadata.getEndTime(), lastValue); + } + + private static Map>> convertTableDeviceLastValues( + final Map> deviceLastValues) { + if (Objects.isNull(deviceLastValues)) { + return null; + } + + final Map>> finalDeviceLastValues = + new HashMap<>(deviceLastValues.size()); + for (final Map.Entry> entry : + deviceLastValues.entrySet()) { + final List> lastValues = new ArrayList<>(entry.getValue().size()); + for (final Map.Entry lastValueEntry : entry.getValue().entrySet()) { + lastValues.add(new Pair<>(lastValueEntry.getKey(), lastValueEntry.getValue())); + } + finalDeviceLastValues.put(entry.getKey(), lastValues); + } + return finalDeviceLastValues; + } + private TsFileResource constructTsFileResource( final TsFileSequenceReader reader, final File tsFile) throws IOException { final TsFileResource tsFileResource = new TsFileResource(tsFile);