Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.encoding.decoder;

import org.apache.tsfile.utils.Binary;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;

public abstract class DecoderWrapper extends Decoder {

protected final Decoder originDecoder;

public DecoderWrapper(Decoder originDecoder) {
super(originDecoder.getType());
this.originDecoder = originDecoder;
}

@Override
public boolean hasNext(ByteBuffer buffer) throws IOException {
return originDecoder.hasNext(buffer);
}

@Override
public void reset() {
originDecoder.reset();
}

@Override
public int readInt(ByteBuffer buffer) {
return originDecoder.readInt(buffer);
}

@Override
public boolean readBoolean(ByteBuffer buffer) {
return originDecoder.readBoolean(buffer);
}

@Override
public short readShort(ByteBuffer buffer) {
return originDecoder.readShort(buffer);
}

@Override
public long readLong(ByteBuffer buffer) {
return originDecoder.readLong(buffer);
}

@Override
public float readFloat(ByteBuffer buffer) {
return originDecoder.readFloat(buffer);
}

@Override
public double readDouble(ByteBuffer buffer) {
return originDecoder.readDouble(buffer);
}

@Override
public Binary readBinary(ByteBuffer buffer) {
return originDecoder.readBinary(buffer);
}

@Override
public BigDecimal readBigDecimal(ByteBuffer buffer) {
return originDecoder.readBigDecimal(buffer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.tsfile.file.header;

import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encoding.decoder.DecoderWrapper;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
Expand All @@ -36,6 +38,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.function.Function;
import java.util.function.LongConsumer;

public class ChunkHeader {
Expand Down Expand Up @@ -64,6 +67,7 @@ public class ChunkHeader {
// the following fields do not need to be serialized.
private int numOfPages;
private final int serializedSize;
private Function<Decoder, DecoderWrapper> replaceDecoder;

public ChunkHeader(
String measurementID,
Expand Down Expand Up @@ -277,6 +281,22 @@ public TSDataType getDataType() {
return dataType;
}

public Decoder calculateDecoderForNonTimeChunk() {
Decoder decoder = Decoder.getDecoderByType(encodingType, dataType);
return replaceDecoder == null ? decoder : replaceDecoder.apply(decoder);
}

public Decoder replaceDecoder(Decoder decoder) {
if (replaceDecoder == null) {
return decoder;
}
return replaceDecoder.apply(decoder);
}

public void setReplaceDecoder(Function<Decoder, DecoderWrapper> replaceDecoder) {
this.replaceDecoder = replaceDecoder;
}

/**
* serialize to outputStream.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,7 @@ public List<TimeseriesMetadata> getDeviceTimeseriesMetadata(IDeviceID device) th

private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(
IDeviceID device, boolean needChunkMetadata) throws IOException {
readFileMetadata();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to add this line?

MetadataIndexNode metadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,7 @@ private AbstractAlignedPageReader constructAlignedPageReader(
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()),
encryptParam);
valueDataTypeList.add(valueChunkHeader.getDataType());
valueDecoderList.add(
Decoder.getDecoderByType(
valueChunkHeader.getEncodingType(), valueChunkHeader.getDataType()));
valueDecoderList.add(valueChunkHeader.calculateDecoderForNonTimeChunk());
isAllNull = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.tsfile.read.reader.chunk;

import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.file.MetaMarker;
Expand Down Expand Up @@ -141,7 +140,7 @@ private PageReader constructPageReader(PageHeader pageHeader) {
new LazyLoadPageData(
chunkDataBuffer.array(), currentPagePosition, unCompressor, encryptParam),
chunkHeader.getDataType(),
Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()),
chunkHeader.calculateDecoderForNonTimeChunk(),
defaultTimeDecoder,
queryFilter);
reader.setDeleteIntervalList(deleteIntervalList);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.tsfile.read.reader;

import org.apache.tsfile.constant.TestConstant;
import org.apache.tsfile.encoding.decoder.DecoderWrapper;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
import org.apache.tsfile.read.reader.chunk.ChunkReader;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;

import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;

public class ReplaceDecoderTest {

private static final String FILE_PATH =
TestConstant.BASE_OUTPUT_PATH.concat("ReplaceDecoder.tsfile");

@After
public void teardown() {
new File(FILE_PATH).delete();
}

@Test
public void testNonAligned() throws IOException, WriteProcessException {
IDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) {
writer.registerTimeseries(deviceID, new MeasurementSchema("s1", TSDataType.INT32));
Tablet tablet =
new Tablet(
deviceID.toString(),
Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT32)));
for (int i = 0; i < 10; i++) {
tablet.addTimestamp(i, i);
tablet.addValue(i, 0, i);
}
writer.writeTree(tablet);
writer.flush();
}
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
TimeseriesMetadata timeseriesMetadata = reader.getDeviceTimeseriesMetadata(deviceID).get(0);
for (IChunkMetadata iChunkMetadata : timeseriesMetadata.getChunkMetadataList()) {
Chunk chunk = reader.readMemChunk((ChunkMetadata) iChunkMetadata);
chunk
.getHeader()
.setReplaceDecoder(
decoder ->
new DecoderWrapper(decoder) {
@Override
public int readInt(ByteBuffer buffer) {
return decoder.readInt(buffer) + 10;
}
});
ChunkReader chunkReader = new ChunkReader(chunk);
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
IPointReader pointReader = batchData.getBatchDataIterator();
while (pointReader.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
Assert.assertEquals(
(int) timeValuePair.getTimestamp(), (Integer) timeValuePair.getValues()[0] - 10);
}
}
}
}
}

@Test
public void testAligned() throws IOException, WriteProcessException {
IDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) {
writer.registerAlignedTimeseries(
deviceID, Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT32)));
Tablet tablet =
new Tablet(
deviceID.toString(),
Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT32)));
for (int i = 0; i < 10; i++) {
tablet.addTimestamp(i, i);
tablet.addValue(i, 0, i);
}
writer.writeTree(tablet);
writer.flush();
}
try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
reader.getAlignedChunkMetadata(deviceID, true);

for (AbstractAlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) {
Chunk timeChunk =
reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata());
Chunk valueChunk =
reader.readMemChunk(
(ChunkMetadata) alignedChunkMetadata.getValueChunkMetadataList().get(0));
valueChunk
.getHeader()
.setReplaceDecoder(
decoder ->
new DecoderWrapper(decoder) {
@Override
public int readInt(ByteBuffer buffer) {
return decoder.readInt(buffer) + 10;
}
});
AlignedChunkReader chunkReader =
new AlignedChunkReader(timeChunk, Collections.singletonList(valueChunk));
while (chunkReader.hasNextSatisfiedPage()) {
BatchData batchData = chunkReader.nextPageData();
IPointReader pointReader = batchData.getBatchDataIterator();
while (pointReader.hasNextTimeValuePair()) {
TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
Assert.assertEquals(
(int) timeValuePair.getTimestamp(), (Integer) timeValuePair.getValues()[0] - 10);
}
}
}
}
}
}
Loading