From 37b35addb745e80218e1d474ab1c8706346ee098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:14:38 +0800 Subject: [PATCH 1/2] Add FlushChunkMetadataListener --- .../writer/FlushChunkMetadataListener.java | 33 +++++++++++++++++++ .../tsfile/write/writer/TsFileIOWriter.java | 13 ++++++++ 2 files changed, 46 insertions(+) create mode 100644 java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java new file mode 100644 index 000000000..2937763e0 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.write.writer; + +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.utils.Pair; + +import java.util.List; + +@FunctionalInterface +public interface FlushChunkMetadataListener { + + // measurement id -> chunk metadata list + void onFlush(List>> sortedChunkMetadataList); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 672fbb8e7..66e432000 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -66,6 +66,7 @@ import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue; import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex; @@ -131,6 +132,8 @@ public class TsFileIOWriter implements AutoCloseable { protected String encryptKey; + private final List flushListeners = new CopyOnWriteArrayList<>(); + /** empty construct function. */ protected TsFileIOWriter() { if (TS_FILE_CONFIG.getEncryptFlag()) { @@ -210,6 +213,10 @@ public void setEncryptParam(String encryptLevel, String encryptType, String encr this.encryptKey = encryptKey; } + public void addFlushListener(FlushChunkMetadataListener listener) { + flushListeners.add(listener); + } + /** * Writes given bytes to output stream. This method is called when total memory size exceeds the * chunk group size threshold. @@ -723,6 +730,12 @@ protected int sortAndFlushChunkMetadata() throws IOException { lastSerializePath = seriesPath; logger.debug("Flushing {}", seriesPath); } + + // notify the listeners + for (final FlushChunkMetadataListener listener : flushListeners) { + listener.onFlush(sortedChunkMetadataList); + } + // clear the cache metadata to release the memory chunkGroupMetadataList.clear(); if (chunkMetadataList != null) { From 4fbc7fcf9f6e0a0ca0afaa4f595cc03572f9ab3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E5=AD=90=E5=9D=A4?= <55695098+DanielWang2035@users.noreply.github.com> Date: Tue, 10 Dec 2024 14:18:13 +0800 Subject: [PATCH 2/2] Fix and add test --- .../writer/FlushChunkMetadataListener.java | 6 +-- .../tsfile/write/writer/TsFileIOWriter.java | 36 +++++++++++++--- .../TsFileIOWriterMemoryControlTest.java | 43 +++++++++++++++++++ 3 files changed, 75 insertions(+), 10 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java index 2937763e0..237cb1913 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/FlushChunkMetadataListener.java @@ -20,7 +20,7 @@ package org.apache.tsfile.write.writer; import org.apache.tsfile.file.metadata.IChunkMetadata; -import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.utils.Pair; import java.util.List; @@ -28,6 +28,6 @@ @FunctionalInterface public interface FlushChunkMetadataListener { - // measurement id -> chunk metadata list - void onFlush(List>> sortedChunkMetadataList); + // Pair -> chunk metadata list + void onFlush(List, List>> sortedChunkMetadataList); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java index 66e432000..54244ed08 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java @@ -66,7 +66,6 @@ import java.util.Map.Entry; import java.util.Queue; import java.util.TreeMap; -import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.addCurrentIndexNodeToQueue; import static org.apache.tsfile.file.metadata.MetadataIndexConstructor.checkAndBuildLevelIndex; @@ -132,7 +131,7 @@ public class TsFileIOWriter implements AutoCloseable { protected String encryptKey; - private final List flushListeners = new CopyOnWriteArrayList<>(); + private final List flushListeners = new ArrayList<>(); /** empty construct function. */ protected TsFileIOWriter() { @@ -672,6 +671,19 @@ public void setMaxPlanIndex(long maxPlanIndex) { this.maxPlanIndex = maxPlanIndex; } + public long getMaxMetadataSize() { + return maxMetadataSize; + } + + /** + * Set the max memory size of chunk metadata. Note that the new size may be larger than current + * chunk metadata size, so caller would better call {@link #checkMetadataSizeAndMayFlush()} after + * this to avoid violating memory control. + */ + public void setMaxMetadataSize(long maxMetadataSize) { + this.maxMetadataSize = maxMetadataSize; + } + /** * Check if the size of chunk metadata in memory is greater than the given threshold. If so, the * chunk metadata will be written to a temp files. Notice! If you are writing a aligned device @@ -711,29 +723,39 @@ public int checkMetadataSizeAndMayFlush() throws IOException { protected int sortAndFlushChunkMetadata() throws IOException { int writtenSize = 0; // group by series - List>> sortedChunkMetadataList = + final List>> sortedChunkMetadataList = TSMIterator.sortChunkMetadata( chunkGroupMetadataList, currentChunkGroupDeviceId, chunkMetadataList); if (tempOutput == null) { tempOutput = new LocalTsFileOutput(new FileOutputStream(chunkMetadataTempFile)); } hasChunkMetadataInDisk = true; + + // This list is the same as sortedChunkMetadataList, but Path is replaced by Pair + final List, List>> + sortedChunkMetadataListForCallBack = new ArrayList<>(); + for (Pair> pair : sortedChunkMetadataList) { - Path seriesPath = pair.left; - boolean isNewPath = !seriesPath.equals(lastSerializePath); + final Path seriesPath = pair.left; + final boolean isNewPath = !seriesPath.equals(lastSerializePath); if (isNewPath) { // record the count of path to construct bloom filter later pathCount++; } - List iChunkMetadataList = pair.right; + final List iChunkMetadataList = pair.right; writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath); lastSerializePath = seriesPath; + sortedChunkMetadataListForCallBack.add( + new Pair<>( + new Pair<>(seriesPath.getIDeviceID(), seriesPath.getMeasurement()), + iChunkMetadataList)); logger.debug("Flushing {}", seriesPath); } // notify the listeners for (final FlushChunkMetadataListener listener : flushListeners) { - listener.onFlush(sortedChunkMetadataList); + listener.onFlush(sortedChunkMetadataListForCallBack); } // clear the cache metadata to release the memory diff --git a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java index bb47294e5..77ffdab0c 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/write/writer/TsFileIOWriterMemoryControlTest.java @@ -56,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; public class TsFileIOWriterMemoryControlTest { private static File testFile = new File("target", "1-1-0-0.tsfile"); @@ -248,6 +249,48 @@ public void testSerializeAndDeserializeMixedChunkMetadata() throws IOException { } } + /** The following test is for calling listeners after flushing chunk metadata. */ + @Test + public void testFlushChunkMetadataListener() throws IOException { + try (TsFileIOWriter writer = new TsFileIOWriter(testFile, 1024 * 1024 * 10)) { + final AtomicInteger cnt1 = new AtomicInteger(0); + final AtomicInteger cnt2 = new AtomicInteger(0); + writer.addFlushListener(sortedChunkMetadataList -> cnt1.incrementAndGet()); + writer.addFlushListener(sortedChunkMetadataList -> cnt2.incrementAndGet()); + List originChunkMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + IDeviceID deviceId = sortedDeviceId.get(i); + writer.startChunkGroup(deviceId); + generateIntData(0, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateBooleanData(1, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateFloatData(2, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateDoubleData(3, 0L, new ArrayList<>()).writeToFileWriter(writer); + generateTextData(4, 0L, new ArrayList<>()).writeToFileWriter(writer); + originChunkMetadataList.addAll(writer.chunkMetadataList); + writer.endChunkGroup(); + } + writer.sortAndFlushChunkMetadata(); + writer.tempOutput.flush(); + + TSMIterator iterator = + TSMIterator.getTSMIteratorInDisk( + writer.chunkMetadataTempFile, + writer.chunkGroupMetadataList, + writer.endPosInCMTForDevice); + for (int i = 0; iterator.hasNext(); ++i) { + Pair timeseriesMetadataPair = iterator.next(); + TimeseriesMetadata timeseriesMetadata = timeseriesMetadataPair.right; + Assert.assertEquals(sortedSeriesId.get(i % 5), timeseriesMetadata.getMeasurementId()); + Assert.assertEquals( + originChunkMetadataList.get(i).getDataType(), timeseriesMetadata.getTsDataType()); + Assert.assertEquals( + originChunkMetadataList.get(i).getStatistics(), timeseriesMetadata.getStatistics()); + } + Assert.assertEquals(1, cnt1.get()); + Assert.assertEquals(1, cnt2.get()); + } + } + /** The following tests is for writing normal series in different nums. */ /**