diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 62f82a1f5b47..14995b887bfd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -869,11 +869,18 @@ public void releaseResourceWhenAllDriversAreClosed() { */ private void releaseTVListOwnedByQuery() { for (TVList tvList : tvListSet) { + long tvListRamSize = tvList.calculateRamSize(); tvList.lockQueryList(); Set queryContextSet = tvList.getQueryContextSet(); try { queryContextSet.remove(this); if (tvList.getOwnerQuery() == this) { + if (tvList.getReservedMemoryBytes() != tvListRamSize) { + LOGGER.warn( + "Release TVList owned by query: allocate size {}, release size {}", + tvList.getReservedMemoryBytes(), + tvListRamSize); + } if (queryContextSet.isEmpty()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -881,14 +888,14 @@ private void releaseTVListOwnedByQuery() { tvList, this.getId()); } - memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize()); + memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes()); tvList.clear(); } else { // Transfer memory to next query. It must be exception-safe as this method is called // during FragmentInstanceExecution cleanup. Any exception during this process could // prevent proper resource cleanup and cause memory leaks. Pair releasedBytes = - memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize()); + memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes()); FragmentInstanceContext queryContext = (FragmentInstanceContext) queryContextSet.iterator().next(); queryContext diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index a66f19108ea6..b00c8737bd9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -148,6 +148,7 @@ protected Map prepareTvListMapForQuery( // mutable tvlist TVList list = memChunk.getWorkingTVList(); TVList cloneList = null; + long tvListRamSize = list.calculateRamSize(); list.lockQueryList(); try { if (copyTimeFilter != null @@ -188,7 +189,8 @@ protected Map prepareTvListMapForQuery( if (firstQuery instanceof FragmentInstanceContext) { MemoryReservationManager memoryReservationManager = ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize()); + memoryReservationManager.reserveMemoryCumulatively(tvListRamSize); + list.setReservedMemoryBytes(tvListRamSize); } list.setOwnerQuery(firstQuery); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 58aabe4a5ac1..9db0196a3a11 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -99,6 +99,7 @@ protected void maybeReleaseTvList(TVList tvList) { } private void tryReleaseTvList(TVList tvList) { + long tvListRamSize = tvList.calculateRamSize(); tvList.lockQueryList(); try { if (tvList.getQueryContextSet().isEmpty()) { @@ -110,7 +111,8 @@ private void tryReleaseTvList(TVList tvList) { if (firstQuery instanceof FragmentInstanceContext) { MemoryReservationManager memoryReservationManager = ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize()); + memoryReservationManager.reserveMemoryCumulatively(tvListRamSize); + tvList.setReservedMemoryBytes(tvListRamSize); } // update current TVList owner to first query in the list tvList.setOwnerQuery(firstQuery); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java index 6d13b49110e1..d00424856bae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader; @@ -113,6 +114,21 @@ public void sortTvLists() { int queryRowCount = entry.getValue(); if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) { alignedTvList.sort(); + long alignedTvListRamSize = alignedTvList.calculateRamSize(); + alignedTvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = + (FragmentInstanceContext) alignedTvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + alignedTvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + alignedTvList.unlockQueryList(); + } } } } @@ -339,10 +355,25 @@ public boolean isEmpty() { @Override public IPointReader getPointReader() { for (Map.Entry entry : alignedTvListQueryMap.entrySet()) { - AlignedTVList tvList = (AlignedTVList) entry.getKey(); + AlignedTVList alignedTvList = (AlignedTVList) entry.getKey(); int queryLength = entry.getValue(); - if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) { - tvList.sort(); + if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) { + alignedTvList.sort(); + long alignedTvListRamSize = alignedTvList.calculateRamSize(); + alignedTvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = + (FragmentInstanceContext) alignedTvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + alignedTvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + alignedTvList.unlockQueryList(); + } } } TsBlock tsBlock = buildTsBlock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java index 64a8868c8c75..3438759cfa90 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader; @@ -135,6 +136,20 @@ public void sortTvLists() { int queryRowCount = entry.getValue(); if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) { tvList.sort(); + long tvListRamSize = tvList.calculateRamSize(); + tvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + tvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + tvList.unlockQueryList(); + } } } } @@ -273,6 +288,20 @@ public IPointReader getPointReader() { int queryLength = entry.getValue(); if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) { tvList.sort(); + long tvListRamSize = tvList.calculateRamSize(); + tvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + tvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + tvList.unlockQueryList(); + } } } TsBlock tsBlock = buildTsBlock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index e0c08fbdd43f..e4230787a761 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -836,7 +836,7 @@ public TSDataType getDataType() { } @Override - public long calculateRamSize() { + public synchronized long calculateRamSize() { return timestamps.size() * alignedTvListArrayMemCost(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index fedc3830ad8a..073b03f0a963 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -75,8 +75,10 @@ public abstract class TVList implements WALEntryValue { // Index relation: arrayIndex -> elementIndex protected List bitMap; - // lock to provide synchronization for query list + // Guards queryContextSet, ownerQuery, and reservedMemoryBytes. + // Always acquire this lock before accessing/modifying these fields. private final ReentrantLock queryListLock = new ReentrantLock(); + // set of query that this TVList is used protected final Set queryContextSet; @@ -84,6 +86,9 @@ public abstract class TVList implements WALEntryValue { // When it is null, the TVList is owned by insert thread and released after flush. protected QueryContext ownerQuery; + // Reserved memory by the query. Ensure to acquire queryListLock before update. + protected long reservedMemoryBytes = 0L; + protected boolean sorted = true; protected long maxTime; protected long minTime; @@ -151,7 +156,7 @@ public static long tvListArrayMemCost(TSDataType type) { return size; } - public long calculateRamSize() { + public synchronized long calculateRamSize() { return timestamps.size() * tvListArrayMemCost(); } @@ -159,6 +164,18 @@ public synchronized boolean isSorted() { return sorted; } + public void setReservedMemoryBytes(long bytes) { + this.reservedMemoryBytes = bytes; + } + + public void addReservedMemoryBytes(long bytes) { + this.reservedMemoryBytes += bytes; + } + + public long getReservedMemoryBytes() { + return reservedMemoryBytes; + } + public abstract void sort(); public void increaseReferenceCount() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index e0655edc55a4..30b9df14b3ac 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -20,7 +20,12 @@ package org.apache.iotdb.db.queryengine.execution.fragment; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException; @@ -30,15 +35,26 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink; import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; +import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; +import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import com.google.common.collect.ImmutableMap; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import org.mockito.Mockito; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collections; @@ -49,6 +65,7 @@ import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -157,6 +174,70 @@ public void testTVListOwnerTransfer() throws InterruptedException { } } + @Test + public void testTVListCloneForQuery() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + + try { + String deviceId = "d1"; + String measurementId = "s1"; + IMemTable memTable = createMemTable(deviceId, measurementId); + assertEquals(1, memTable.getMemTableMap().size()); + IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next(); + assertEquals(1, memChunkGroup.getMemChunkMap().size()); + IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); + TVList tvList = memChunk.getWorkingTVList(); + assertFalse(tvList.isSorted()); + + // FragmentInstance Context + FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1"); + FragmentInstanceStateMachine stateMachine1 = + new FragmentInstanceStateMachine(id1, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext1 = + createFragmentInstanceContext(id1, stateMachine1); + + FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2"); + FragmentInstanceStateMachine stateMachine2 = + new FragmentInstanceStateMachine(id2, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext2 = + createFragmentInstanceContext(id2, stateMachine2); + + // query on memtable + MeasurementPath fullPath = + new MeasurementPath( + deviceId, + measurementId, + new MeasurementSchema( + measurementId, + TSDataType.INT32, + TSEncoding.RLE, + CompressionType.UNCOMPRESSED, + Collections.emptyMap())); + ReadOnlyMemChunk readOnlyMemChunk1 = + memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null); + ReadOnlyMemChunk readOnlyMemChunk2 = + memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null); + + IPointReader pointReader = readOnlyMemChunk1.getPointReader(); + while (pointReader.hasNextTimeValuePair()) { + pointReader.nextTimeValuePair(); + } + assertTrue(tvList.isSorted()); + assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes()); + } catch (QueryProcessException + | IOException + | MetadataException + | MemoryNotEnoughException + | IllegalArgumentException e) { + fail(e.getMessage()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor) throws CpuNotEnoughException { IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); @@ -201,4 +282,20 @@ private TVList buildTVList() { } return tvList; } + + private IMemTable createMemTable(String deviceId, String measurementId) + throws IllegalPathException { + IMemTable memTable = new PrimitiveMemTable("root.test", "1"); + + int rows = 100; + for (int i = 0; i < 100; i++) { + memTable.write( + DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)), + Collections.singletonList( + new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)), + rows - i - 1, + new Object[] {i + 10}); + } + return memTable; + } }