Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -869,26 +869,33 @@ public void releaseResourceWhenAllDriversAreClosed() {
*/
private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
try {
queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
if (tvList.getReservedMemoryBytes() != tvListRamSize) {
LOGGER.warn(
"Release TVList owned by query: allocate size {}, release size {}",
tvList.getReservedMemoryBytes(),
tvListRamSize);
}
if (queryContextSet.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
}
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
tvList.clear();
} else {
// Transfer memory to next query. It must be exception-safe as this method is called
// during FragmentInstanceExecution cleanup. Any exception during this process could
// prevent proper resource cleanup and cause memory leaks.
Pair<Long, Long> releasedBytes =
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
FragmentInstanceContext queryContext =
(FragmentInstanceContext) queryContextSet.iterator().next();
queryContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
// mutable tvlist
TVList list = memChunk.getWorkingTVList();
TVList cloneList = null;
long tvListRamSize = list.calculateRamSize();
list.lockQueryList();
try {
if (copyTimeFilter != null
Expand Down Expand Up @@ -188,7 +189,8 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
list.setReservedMemoryBytes(tvListRamSize);
}
list.setOwnerQuery(firstQuery);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ protected void maybeReleaseTvList(TVList tvList) {
}

private void tryReleaseTvList(TVList tvList) {
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
if (tvList.getQueryContextSet().isEmpty()) {
Expand All @@ -110,7 +111,8 @@ private void tryReleaseTvList(TVList tvList) {
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
tvList.setReservedMemoryBytes(tvListRamSize);
}
// update current TVList owner to first query in the list
tvList.setOwnerQuery(firstQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.storageengine.dataregion.memtable;

import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
Expand Down Expand Up @@ -113,6 +114,21 @@ public void sortTvLists() {
int queryRowCount = entry.getValue();
if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) {
alignedTvList.sort();
long alignedTvListRamSize = alignedTvList.calculateRamSize();
alignedTvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery =
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
alignedTvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
alignedTvList.unlockQueryList();
}
}
}
}
Expand Down Expand Up @@ -339,10 +355,25 @@ public boolean isEmpty() {
@Override
public IPointReader getPointReader() {
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
AlignedTVList tvList = (AlignedTVList) entry.getKey();
AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
int queryLength = entry.getValue();
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
tvList.sort();
if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) {
alignedTvList.sort();
long alignedTvListRamSize = alignedTvList.calculateRamSize();
alignedTvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery =
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
alignedTvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
alignedTvList.unlockQueryList();
}
}
}
TsBlock tsBlock = buildTsBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
Expand Down Expand Up @@ -135,6 +136,20 @@ public void sortTvLists() {
int queryRowCount = entry.getValue();
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
tvList.sort();
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
tvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
tvList.unlockQueryList();
}
}
}
}
Expand Down Expand Up @@ -273,6 +288,20 @@ public IPointReader getPointReader() {
int queryLength = entry.getValue();
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
tvList.sort();
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
tvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
tvList.unlockQueryList();
}
}
}
TsBlock tsBlock = buildTsBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ public TSDataType getDataType() {
}

@Override
public long calculateRamSize() {
public synchronized long calculateRamSize() {
return timestamps.size() * alignedTvListArrayMemCost();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,20 @@ public abstract class TVList implements WALEntryValue {
// Index relation: arrayIndex -> elementIndex
protected List<BitMap> bitMap;

// lock to provide synchronization for query list
// Guards queryContextSet, ownerQuery, and reservedMemoryBytes.
// Always acquire this lock before accessing/modifying these fields.
private final ReentrantLock queryListLock = new ReentrantLock();

// set of query that this TVList is used
protected final Set<QueryContext> queryContextSet;

// the owner query which is obligated to release the TVList.
// When it is null, the TVList is owned by insert thread and released after flush.
protected QueryContext ownerQuery;

// Reserved memory by the query. Ensure to acquire queryListLock before update.
protected long reservedMemoryBytes = 0L;

protected boolean sorted = true;
protected long maxTime;
protected long minTime;
Expand Down Expand Up @@ -151,14 +156,26 @@ public static long tvListArrayMemCost(TSDataType type) {
return size;
}

public long calculateRamSize() {
public synchronized long calculateRamSize() {
return timestamps.size() * tvListArrayMemCost();
}

public synchronized boolean isSorted() {
return sorted;
}

public void setReservedMemoryBytes(long bytes) {
this.reservedMemoryBytes = bytes;
}

public void addReservedMemoryBytes(long bytes) {
this.reservedMemoryBytes += bytes;
}

public long getReservedMemoryBytes() {
return reservedMemoryBytes;
}

public abstract void sort();

public void increaseReferenceCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
package org.apache.iotdb.db.queryengine.execution.fragment;

import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
Expand All @@ -30,15 +35,26 @@
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;

import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -49,6 +65,7 @@
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -157,6 +174,70 @@ public void testTVListOwnerTransfer() throws InterruptedException {
}
}

@Test
public void testTVListCloneForQuery() {
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);

ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");

try {
String deviceId = "d1";
String measurementId = "s1";
IMemTable memTable = createMemTable(deviceId, measurementId);
assertEquals(1, memTable.getMemTableMap().size());
IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next();
assertEquals(1, memChunkGroup.getMemChunkMap().size());
IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next();
TVList tvList = memChunk.getWorkingTVList();
assertFalse(tvList.isSorted());

// FragmentInstance Context
FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1");
FragmentInstanceStateMachine stateMachine1 =
new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext1 =
createFragmentInstanceContext(id1, stateMachine1);

FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2");
FragmentInstanceStateMachine stateMachine2 =
new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext2 =
createFragmentInstanceContext(id2, stateMachine2);

// query on memtable
MeasurementPath fullPath =
new MeasurementPath(
deviceId,
measurementId,
new MeasurementSchema(
measurementId,
TSDataType.INT32,
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()));
ReadOnlyMemChunk readOnlyMemChunk1 =
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
ReadOnlyMemChunk readOnlyMemChunk2 =
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);
Comment on lines +221 to +222
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test creates two ReadOnlyMemChunk instances (readOnlyMemChunk1 and readOnlyMemChunk2) but only uses readOnlyMemChunk1. The unused variable readOnlyMemChunk2 should either be used in the test logic or removed to improve code clarity.

Suggested change
ReadOnlyMemChunk readOnlyMemChunk2 =
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);

Copilot uses AI. Check for mistakes.

IPointReader pointReader = readOnlyMemChunk1.getPointReader();
while (pointReader.hasNextTimeValuePair()) {
pointReader.nextTimeValuePair();
}
assertTrue(tvList.isSorted());
assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes());
} catch (QueryProcessException
| IOException
| MetadataException
| MemoryNotEnoughException
| IllegalArgumentException e) {
fail(e.getMessage());
} finally {
instanceNotificationExecutor.shutdown();
}
}

private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
throws CpuNotEnoughException {
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
Expand Down Expand Up @@ -201,4 +282,20 @@ private TVList buildTVList() {
}
return tvList;
}

private IMemTable createMemTable(String deviceId, String measurementId)
throws IllegalPathException {
IMemTable memTable = new PrimitiveMemTable("root.test", "1");

int rows = 100;
for (int i = 0; i < 100; i++) {
Copy link

Copilot AI Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable rows is declared with the value 100, but it's not actually used in the loop logic. The loop iterates from 0 to 99, and the timestamp is calculated as rows - i - 1, but this could be simplified to just use the literal 100 or properly use the rows variable in the loop condition.

Suggested change
for (int i = 0; i < 100; i++) {
for (int i = 0; i < rows; i++) {

Copilot uses AI. Check for mistakes.
memTable.write(
DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
Collections.singletonList(
new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)),
rows - i - 1,
new Object[] {i + 10});
}
return memTable;
}
}
Loading