From 2f9e7c508a75208e9c5382c23d1c7a653e6d78a9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 6 May 2026 10:07:26 +0800 Subject: [PATCH] fix-1 --- .../scan/TsFileInsertionEventScanParser.java | 21 ++++++-- .../event/TsFileInsertionEventParserTest.java | 50 +++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index aeb7aaadf286..32823459fcfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -563,11 +563,6 @@ private void moveToNextChunkReader() final long chunkSize = timeChunkSize + valueChunkSize; if (chunkSize + chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - if (valueChunkList.size() == 1 - && chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - PipeDataNodeResourceManager.memory() - .forceResize(allocatedMemoryBlockForChunk, chunkSize); - } needReturn = recordAlignedChunk(valueChunkList, marker); } } @@ -577,9 +572,11 @@ private void moveToNextChunkReader() firstChunkHeader4NextSequentialValueChunks = chunkHeader; return; } + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); } else { chunkHeader = firstChunkHeader4NextSequentialValueChunks; firstChunkHeader4NextSequentialValueChunks = null; + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); } Chunk chunk = @@ -720,6 +717,20 @@ private boolean recordAlignedChunk(final List valueChunkList, final byte return false; } + private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( + final List valueChunkList, final ChunkHeader valueChunkHeader) { + if (!valueChunkList.isEmpty() || lastIndex < 0) { + return; + } + + final long chunkSize = + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) + + valueChunkHeader.getDataSize(); + if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize); + } + } + @Override public void close() { super.close(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index a2e7c558ea0b..8d02cc8a9985 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -57,6 +58,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; @@ -120,6 +122,46 @@ public void testScanParser() throws Exception { System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws Exception { + final long originalPipeMaxReaderChunkSize = + PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0); + + alignedTsFile = new File("single-aligned-value-chunk.tsfile"); + final List schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + + final Tablet tablet = new Tablet("root.sg.d", schemaList, 2); + tablet.addTimestamp(0, 1); + tablet.addValue("s1", 0, 1L); + tablet.addTimestamp(1, 2); + tablet.addValue("s1", 1, 2L); + + try { + try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) { + writer.registerAlignedTimeseries(new PartialPath("root.sg.d"), schemaList); + writer.writeAligned(tablet); + } + + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0); + } + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + } + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); @@ -666,4 +708,12 @@ private int getNonNullSize(final Tablet tablet) { } return count; } + + private PipeMemoryBlock getAllocatedChunkMemory(final TsFileInsertionEventScanParser parser) + throws NoSuchFieldException, IllegalAccessException { + final Field field = + TsFileInsertionEventScanParser.class.getDeclaredField("allocatedMemoryBlockForChunk"); + field.setAccessible(true); + return (PipeMemoryBlock) field.get(parser); + } }