-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: reserve memory for sorting indices during query execution #16959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev/1.3
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -20,7 +20,12 @@ | |||||
| package org.apache.iotdb.db.queryengine.execution.fragment; | ||||||
|
|
||||||
| import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; | ||||||
| import org.apache.iotdb.commons.exception.IllegalPathException; | ||||||
| import org.apache.iotdb.commons.exception.MetadataException; | ||||||
| import org.apache.iotdb.commons.path.MeasurementPath; | ||||||
| import org.apache.iotdb.commons.path.PartialPath; | ||||||
| import org.apache.iotdb.db.conf.IoTDBDescriptor; | ||||||
| import org.apache.iotdb.db.exception.query.QueryProcessException; | ||||||
| import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; | ||||||
| import org.apache.iotdb.db.queryengine.common.PlanFragmentId; | ||||||
| import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException; | ||||||
|
|
@@ -30,15 +35,26 @@ | |||||
| import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink; | ||||||
| import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.DataRegion; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; | ||||||
| import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; | ||||||
| import org.apache.iotdb.db.utils.datastructure.AlignedTVList; | ||||||
| import org.apache.iotdb.db.utils.datastructure.TVList; | ||||||
|
|
||||||
| import com.google.common.collect.ImmutableMap; | ||||||
| import org.apache.tsfile.enums.TSDataType; | ||||||
| import org.apache.tsfile.file.metadata.enums.CompressionType; | ||||||
| import org.apache.tsfile.file.metadata.enums.TSEncoding; | ||||||
| import org.apache.tsfile.read.reader.IPointReader; | ||||||
| import org.apache.tsfile.write.schema.MeasurementSchema; | ||||||
| import org.junit.Test; | ||||||
| import org.mockito.Mockito; | ||||||
|
|
||||||
| import java.io.ByteArrayOutputStream; | ||||||
| import java.io.IOException; | ||||||
| import java.io.PrintStream; | ||||||
| import java.util.ArrayList; | ||||||
| import java.util.Collections; | ||||||
|
|
@@ -49,6 +65,7 @@ | |||||
| import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID; | ||||||
| import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; | ||||||
| import static org.junit.Assert.assertEquals; | ||||||
| import static org.junit.Assert.assertFalse; | ||||||
| import static org.junit.Assert.assertTrue; | ||||||
| import static org.junit.Assert.fail; | ||||||
|
|
||||||
|
|
@@ -157,6 +174,70 @@ public void testTVListOwnerTransfer() throws InterruptedException { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Test | ||||||
| public void testTVListCloneForQuery() { | ||||||
| IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); | ||||||
|
|
||||||
| ExecutorService instanceNotificationExecutor = | ||||||
| IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); | ||||||
|
|
||||||
| try { | ||||||
| String deviceId = "d1"; | ||||||
| String measurementId = "s1"; | ||||||
| IMemTable memTable = createMemTable(deviceId, measurementId); | ||||||
| assertEquals(1, memTable.getMemTableMap().size()); | ||||||
| IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next(); | ||||||
| assertEquals(1, memChunkGroup.getMemChunkMap().size()); | ||||||
| IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); | ||||||
| TVList tvList = memChunk.getWorkingTVList(); | ||||||
| assertFalse(tvList.isSorted()); | ||||||
|
|
||||||
| // FragmentInstance Context | ||||||
| FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1"); | ||||||
| FragmentInstanceStateMachine stateMachine1 = | ||||||
| new FragmentInstanceStateMachine(id1, instanceNotificationExecutor); | ||||||
| FragmentInstanceContext fragmentInstanceContext1 = | ||||||
| createFragmentInstanceContext(id1, stateMachine1); | ||||||
|
|
||||||
| FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2"); | ||||||
| FragmentInstanceStateMachine stateMachine2 = | ||||||
| new FragmentInstanceStateMachine(id2, instanceNotificationExecutor); | ||||||
| FragmentInstanceContext fragmentInstanceContext2 = | ||||||
| createFragmentInstanceContext(id2, stateMachine2); | ||||||
|
|
||||||
| // query on memtable | ||||||
| MeasurementPath fullPath = | ||||||
| new MeasurementPath( | ||||||
| deviceId, | ||||||
| measurementId, | ||||||
| new MeasurementSchema( | ||||||
| measurementId, | ||||||
| TSDataType.INT32, | ||||||
| TSEncoding.RLE, | ||||||
| CompressionType.UNCOMPRESSED, | ||||||
| Collections.emptyMap())); | ||||||
| ReadOnlyMemChunk readOnlyMemChunk1 = | ||||||
| memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null); | ||||||
| ReadOnlyMemChunk readOnlyMemChunk2 = | ||||||
| memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null); | ||||||
|
|
||||||
| IPointReader pointReader = readOnlyMemChunk1.getPointReader(); | ||||||
| while (pointReader.hasNextTimeValuePair()) { | ||||||
| pointReader.nextTimeValuePair(); | ||||||
| } | ||||||
| assertTrue(tvList.isSorted()); | ||||||
| assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes()); | ||||||
| } catch (QueryProcessException | ||||||
| | IOException | ||||||
| | MetadataException | ||||||
| | MemoryNotEnoughException | ||||||
| | IllegalArgumentException e) { | ||||||
| fail(e.getMessage()); | ||||||
| } finally { | ||||||
| instanceNotificationExecutor.shutdown(); | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor) | ||||||
| throws CpuNotEnoughException { | ||||||
| IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); | ||||||
|
|
@@ -201,4 +282,20 @@ private TVList buildTVList() { | |||||
| } | ||||||
| return tvList; | ||||||
| } | ||||||
|
|
||||||
| private IMemTable createMemTable(String deviceId, String measurementId) | ||||||
| throws IllegalPathException { | ||||||
| IMemTable memTable = new PrimitiveMemTable("root.test", "1"); | ||||||
|
|
||||||
| int rows = 100; | ||||||
| for (int i = 0; i < 100; i++) { | ||||||
|
||||||
| for (int i = 0; i < 100; i++) { | |
| for (int i = 0; i < rows; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test creates two ReadOnlyMemChunk instances (readOnlyMemChunk1 and readOnlyMemChunk2) but only uses readOnlyMemChunk1. The unused variable readOnlyMemChunk2 should either be used in the test logic or removed to improve code clarity.