Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -897,26 +897,33 @@ public void releaseResourceWhenAllDriversAreClosed() {
*/
private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
try {
queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
if (tvList.getReservedMemoryBytes() != tvListRamSize) {
LOGGER.warn(
"Release TVList owned by query: allocate size {}, release size {}",
tvList.getReservedMemoryBytes(),
tvListRamSize);
}
if (queryContextSet.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"TVList {} is released by the query, FragmentInstance Id is {}",
tvList,
this.getId());
}
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
tvList.clear();
} else {
// Transfer memory to next query. It must be exception-safe as this method is called
// during FragmentInstanceExecution cleanup. Any exception during this process could
// prevent proper resource cleanup and cause memory leaks.
Pair<Long, Long> releasedBytes =
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
FragmentInstanceContext queryContext =
(FragmentInstanceContext) queryContextSet.iterator().next();
queryContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
// mutable tvlist
TVList list = memChunk.getWorkingTVList();
TVList cloneList = null;
long tvListRamSize = list.calculateRamSize();
list.lockQueryList();
try {
if (copyTimeFilter != null
Expand Down Expand Up @@ -193,7 +194,8 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
list.setReservedMemoryBytes(tvListRamSize);
}
list.setOwnerQuery(firstQuery);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ protected void maybeReleaseTvList(TVList tvList) {
}

private void tryReleaseTvList(TVList tvList) {
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
if (tvList.getQueryContextSet().isEmpty()) {
Expand All @@ -112,7 +113,8 @@ private void tryReleaseTvList(TVList tvList) {
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext) firstQuery).getMemoryReservationContext();
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
tvList.setReservedMemoryBytes(tvListRamSize);
}
// update current TVList owner to first query in the list
tvList.setOwnerQuery(firstQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.storageengine.dataregion.memtable;

import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
Expand Down Expand Up @@ -118,6 +119,21 @@ public void sortTvLists() {
int queryRowCount = entry.getValue();
if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) {
alignedTvList.sort();
long alignedTvListRamSize = alignedTvList.calculateRamSize();
alignedTvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery =
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
alignedTvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
alignedTvList.unlockQueryList();
}
}
}
}
Expand Down Expand Up @@ -356,10 +372,25 @@ public boolean isEmpty() {
@Override
public IPointReader getPointReader() {
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
AlignedTVList tvList = (AlignedTVList) entry.getKey();
AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
int queryLength = entry.getValue();
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
tvList.sort();
if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) {
alignedTvList.sort();
long alignedTvListRamSize = alignedTvList.calculateRamSize();
alignedTvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery =
(FragmentInstanceContext) alignedTvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
alignedTvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
alignedTvList.unlockQueryList();
}
}
}
TsBlock tsBlock = buildTsBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
Expand Down Expand Up @@ -135,6 +136,20 @@ public void sortTvLists() {
int queryRowCount = entry.getValue();
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
tvList.sort();
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
tvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
tvList.unlockQueryList();
}
}
}
}
Expand Down Expand Up @@ -274,6 +289,20 @@ public IPointReader getPointReader() {
int queryLength = entry.getValue();
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
tvList.sort();
long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery();
if (ownerQuery != null) {
long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
if (deltaBytes > 0) {
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
tvList.addReservedMemoryBytes(deltaBytes);
}
}
} finally {
tvList.unlockQueryList();
}
}
}
TsBlock tsBlock = buildTsBlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,7 @@ public TSDataType getDataType() {
}

@Override
public long calculateRamSize() {
public synchronized long calculateRamSize() {
return timestamps.size() * alignedTvListArrayMemCost();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,20 @@ public abstract class TVList implements WALEntryValue {
// Index relation: arrayIndex -> elementIndex
protected List<BitMap> bitMap;

// lock to provide synchronization for query list
// Guards queryContextSet, ownerQuery, and reservedMemoryBytes.
// Always acquire this lock before accessing/modifying these fields.
private final ReentrantLock queryListLock = new ReentrantLock();

// set of query that this TVList is used
protected final Set<QueryContext> queryContextSet;

// the owner query which is obligated to release the TVList.
// When it is null, the TVList is owned by insert thread and released after flush.
protected QueryContext ownerQuery;

// Reserved memory by the query. Ensure to acquire queryListLock before update.
protected long reservedMemoryBytes = 0L;

protected boolean sorted = true;
protected long maxTime;
protected long minTime;
Expand Down Expand Up @@ -157,14 +162,26 @@ public static long tvListArrayMemCost(TSDataType type) {
return size;
}

public long calculateRamSize() {
public synchronized long calculateRamSize() {
return timestamps.size() * tvListArrayMemCost();
}

public synchronized boolean isSorted() {
return sorted;
}

public void setReservedMemoryBytes(long bytes) {
this.reservedMemoryBytes = bytes;
}

public void addReservedMemoryBytes(long bytes) {
this.reservedMemoryBytes += bytes;
}

public long getReservedMemoryBytes() {
return reservedMemoryBytes;
}

public abstract void sort();

public void increaseReferenceCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
package org.apache.iotdb.db.queryengine.execution.fragment;

import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
Expand All @@ -30,15 +35,27 @@
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;

import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -49,6 +66,7 @@
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -157,6 +175,69 @@ public void testTVListOwnerTransfer() throws InterruptedException {
}
}

@Test
public void testTVListCloneForQuery() {
IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);

ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");

try {
String deviceId = "d1";
String measurementId = "s1";
IMemTable memTable = createMemTable(deviceId, measurementId);
assertEquals(1, memTable.getMemTableMap().size());
IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next();
assertEquals(1, memChunkGroup.getMemChunkMap().size());
IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next();
TVList tvList = memChunk.getWorkingTVList();
assertFalse(tvList.isSorted());

// FragmentInstance Context
FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1");
FragmentInstanceStateMachine stateMachine1 =
new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext1 =
createFragmentInstanceContext(id1, stateMachine1);

FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2");
FragmentInstanceStateMachine stateMachine2 =
new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext2 =
createFragmentInstanceContext(id2, stateMachine2);

// query on memtable
NonAlignedFullPath fullPath =
new NonAlignedFullPath(
IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId),
new MeasurementSchema(
measurementId,
TSDataType.INT32,
TSEncoding.RLE,
CompressionType.UNCOMPRESSED,
Collections.emptyMap()));
ReadOnlyMemChunk readOnlyMemChunk1 =
memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null);
ReadOnlyMemChunk readOnlyMemChunk2 =
memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null);

IPointReader pointReader = readOnlyMemChunk1.getPointReader();
while (pointReader.hasNextTimeValuePair()) {
pointReader.nextTimeValuePair();
}
assertTrue(tvList.isSorted());
assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes());
} catch (QueryProcessException
| IOException
| MetadataException
| MemoryNotEnoughException
| IllegalArgumentException e) {
fail(e.getMessage());
} finally {
instanceNotificationExecutor.shutdown();
}
}

private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor)
throws CpuNotEnoughException {
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
Expand Down Expand Up @@ -201,4 +282,20 @@ private TVList buildTVList() {
}
return tvList;
}

private IMemTable createMemTable(String deviceId, String measurementId)
throws IllegalPathException {
IMemTable memTable = new PrimitiveMemTable("root.test", "1");

int rows = 100;
for (int i = 0; i < 100; i++) {
memTable.write(
DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
Collections.singletonList(
new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)),
rows - i - 1,
new Object[] {i + 10});
}
return memTable;
}
}
Loading