diff --git a/parquet-variant/pom.xml b/parquet-variant/pom.xml new file mode 100644 index 0000000000..365fd826ea --- /dev/null +++ b/parquet-variant/pom.xml @@ -0,0 +1,82 @@ + + + + org.apache.parquet + parquet + ../pom.xml + 1.16.0-SNAPSHOT + + + 4.0.0 + + parquet-variant + jar + + Apache Parquet Variant + https://parquet.apache.org + + + + + + + org.apache.parquet + parquet-cli + ${project.version} + + + org.apache.parquet + parquet-jackson + ${project.version} + runtime + + + ${jackson.groupId} + jackson-core + ${jackson.version} + + + ${jackson.groupId} + jackson-databind + ${jackson-databind.version} + test + + + org.slf4j + slf4j-api + ${slf4j.version} + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.maven.plugins + maven-shade-plugin + + + + + diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/DefaultScalarToJson.java b/parquet-variant/src/main/java/org/apache/parquet/variant/DefaultScalarToJson.java new file mode 100644 index 0000000000..d9d5813472 --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/DefaultScalarToJson.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.variant; + +import com.fasterxml.jackson.core.JsonGenerator; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; +import java.util.Base64; +import java.util.Locale; +import java.util.UUID; + +/** + * This converts Variant scalar values to JSON. + */ +public class DefaultScalarToJson implements Variant.ScalarToJson { + /** The format for a timestamp without time zone. */ + private static final DateTimeFormatter TIMESTAMP_NTZ_FORMATTER = new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral('T') + .appendPattern("HH:mm:ss") + .appendFraction(ChronoField.MICRO_OF_SECOND, 6, 6, true) + .toFormatter(Locale.US); + + /** The format for a timestamp without time zone, with nanosecond precision. */ + private static final DateTimeFormatter TIMESTAMP_NANOS_NTZ_FORMATTER = new DateTimeFormatterBuilder() + .append(DateTimeFormatter.ISO_LOCAL_DATE) + .appendLiteral('T') + .appendPattern("HH:mm:ss") + .appendFraction(ChronoField.NANO_OF_SECOND, 9, 9, true) + .toFormatter(Locale.US); + + /** The format for a timestamp with time zone. */ + private static final DateTimeFormatter TIMESTAMP_FORMATTER = new DateTimeFormatterBuilder() + .append(TIMESTAMP_NTZ_FORMATTER) + .appendOffset("+HH:MM", "+00:00") + .toFormatter(Locale.US); + + /** The format for a timestamp with time zone, with nanosecond precision. */ + private static final DateTimeFormatter TIMESTAMP_NANOS_FORMATTER = new DateTimeFormatterBuilder() + .append(TIMESTAMP_NANOS_NTZ_FORMATTER) + .appendOffset("+HH:MM", "+00:00") + .toFormatter(Locale.US); + + /** The format for a time. */ + private static final DateTimeFormatter TIME_FORMATTER = new DateTimeFormatterBuilder() + .appendPattern("HH:mm:ss") + .appendFraction(ChronoField.MICRO_OF_SECOND, 6, 6, true) + .toFormatter(Locale.US); + + public void writeNull(JsonGenerator gen) throws IOException { + gen.writeNull(); + } + + public void writeBoolean(JsonGenerator gen, boolean value) throws IOException { + gen.writeBoolean(value); + } + + public void writeByte(JsonGenerator gen, byte value) throws IOException { + gen.writeNumber(value); + } + + public void writeShort(JsonGenerator gen, short value) throws IOException { + gen.writeNumber(value); + } + + public void writeInt(JsonGenerator gen, int value) throws IOException { + gen.writeNumber(value); + } + + public void writeLong(JsonGenerator gen, long value) throws IOException { + gen.writeNumber(value); + } + + public void writeFloat(JsonGenerator gen, float value) throws IOException { + gen.writeNumber(value); + } + + public void writeDouble(JsonGenerator gen, double value) throws IOException { + gen.writeNumber(value); + } + + public void writeString(JsonGenerator gen, String value) throws IOException { + gen.writeString(value); + } + + public void writeBinary(JsonGenerator gen, byte[] value) throws IOException { + gen.writeString(Base64.getEncoder().encodeToString(value)); + } + + public void writeDecimal(JsonGenerator gen, BigDecimal value) throws IOException { + gen.writeNumber(value.toPlainString()); + } + + public void writeUUID(JsonGenerator gen, UUID value) throws IOException { + gen.writeString(value.toString()); + } + + public void writeDate(JsonGenerator gen, int value) throws IOException { + gen.writeString(LocalDate.ofEpochDay(value).toString()); + } + + public void writeTime(JsonGenerator gen, long microsSinceMidnight) throws IOException { + gen.writeString(TIME_FORMATTER.format(LocalTime.ofNanoOfDay(microsSinceMidnight * 1_000))); + } + + public void writeTimestamp(JsonGenerator gen, long microsSinceEpoch) throws IOException { + gen.writeString( + TIMESTAMP_FORMATTER.format(microsToInstant(microsSinceEpoch).atZone(ZoneOffset.UTC))); + } + + public void writeTimestampNtz(JsonGenerator gen, long microsSinceEpoch) throws IOException { + gen.writeString( + TIMESTAMP_NTZ_FORMATTER.format(microsToInstant(microsSinceEpoch).atZone(ZoneOffset.UTC))); + } + + public void writeTimestampNanos(JsonGenerator gen, long nanosSinceEpoch) throws IOException { + gen.writeString( + TIMESTAMP_NANOS_FORMATTER.format(nanosToInstant(nanosSinceEpoch).atZone(ZoneOffset.UTC))); + } + + public void writeTimestampNanosNtz(JsonGenerator gen, long nanosSinceEpoch) throws IOException { + gen.writeString(TIMESTAMP_NANOS_NTZ_FORMATTER.format( + nanosToInstant(nanosSinceEpoch).atZone(ZoneOffset.UTC))); + } + + protected Instant microsToInstant(long microsSinceEpoch) { + return Instant.EPOCH.plus(microsSinceEpoch, ChronoUnit.MICROS); + } + + protected Instant nanosToInstant(long timestampNanos) { + return Instant.EPOCH.plus(timestampNanos, ChronoUnit.NANOS); + } +} diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/MalformedVariantException.java b/parquet-variant/src/main/java/org/apache/parquet/variant/MalformedVariantException.java new file mode 100644 index 0000000000..3ecc707a11 --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/MalformedVariantException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +/** + * An exception indicating that the Variant is malformed. + */ +public class MalformedVariantException extends RuntimeException { + public MalformedVariantException(String message) { + super(message); + } +} diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/UnknownVariantTypeException.java b/parquet-variant/src/main/java/org/apache/parquet/variant/UnknownVariantTypeException.java new file mode 100644 index 0000000000..3cdacb5d99 --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/UnknownVariantTypeException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +/** + * An exception indicating that the Variant contains an unknown type. + */ +public class UnknownVariantTypeException extends RuntimeException { + public final int typeId; + + /** + * @param typeId the type id that was unknown + */ + public UnknownVariantTypeException(int typeId) { + super("Unknown type in Variant. id: " + typeId); + this.typeId = typeId; + } + + /** + * @return the type id that was unknown + */ + public int typeId() { + return typeId; + } +} diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java new file mode 100644 index 0000000000..c278dae93e --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/Variant.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.variant; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import java.io.CharArrayWriter; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.parquet.cli.util.RuntimeIOException; + +/** + * This Variant class holds the Variant-encoded value and metadata binary values. + */ +public final class Variant { + /** The buffer that contains the Variant value. */ + final ByteBuffer value; + + /** The buffer that contains the Variant metadata. */ + final ByteBuffer metadata; + + /** + * The threshold to switch from linear search to binary search when looking up a field by key in + * an object. This is a performance optimization to avoid the overhead of binary search for a + * short list. + */ + static final int BINARY_SEARCH_THRESHOLD = 32; + + public Variant(byte[] value, byte[] metadata) { + this(value, 0, metadata, 0); + } + + Variant(byte[] value, int valuePos, byte[] metadata, int metadataPos) { + this( + ByteBuffer.wrap(value, valuePos, value.length - valuePos), + ByteBuffer.wrap(metadata, metadataPos, metadata.length - metadataPos)); + } + + Variant(ByteBuffer value, ByteBuffer metadata) { + this.value = value.asReadOnlyBuffer(); + this.value.mark(); + + this.metadata = metadata.asReadOnlyBuffer(); + this.metadata.mark(); + + // There is currently only one allowed version. + if ((metadata.get(metadata.position()) & VariantUtil.VERSION_MASK) != VariantUtil.VERSION) { + throw new UnsupportedOperationException(String.format( + "Unsupported variant metadata version: %02X", + metadata.get(metadata.position()) & VariantUtil.VERSION_MASK)); + } + } + + /** + * @return the boolean value + */ + public boolean getBoolean() { + return VariantUtil.getBoolean(value); + } + + /** + * @return the byte value + */ + public byte getByte() { + long longValue = VariantUtil.getLong(value); + if (longValue < Byte.MIN_VALUE || longValue > Byte.MAX_VALUE) { + throw new IllegalStateException("Value out of range for byte: " + longValue); + } + return (byte) longValue; + } + + /** + * @return the short value + */ + public short getShort() { + long longValue = VariantUtil.getLong(value); + if (longValue < Short.MIN_VALUE || longValue > Short.MAX_VALUE) { + throw new IllegalStateException("Value out of range for short: " + longValue); + } + return (short) longValue; + } + + /** + * @return the int value + */ + public int getInt() { + long longValue = VariantUtil.getLong(value); + if (longValue < Integer.MIN_VALUE || longValue > Integer.MAX_VALUE) { + throw new IllegalStateException("Value out of range for int: " + longValue); + } + return (int) longValue; + } + + /** + * @return the long value + */ + public long getLong() { + return VariantUtil.getLong(value); + } + + /** + * @return the double value + */ + public double getDouble() { + return VariantUtil.getDouble(value); + } + + /** + * @return the decimal value + */ + public BigDecimal getDecimal() { + return VariantUtil.getDecimal(value); + } + + /** + * @return the float value + */ + public float getFloat() { + return VariantUtil.getFloat(value); + } + + /** + * @return the binary value + */ + public byte[] getBinary() { + return VariantUtil.getBinary(value); + } + + /** + * @return the UUID value + */ + public UUID getUUID() { + return VariantUtil.getUUID(value); + } + + /** + * @return the string value + */ + public String getString() { + return VariantUtil.getString(value); + } + + /** + * @return the type of the variant value + */ + public VariantUtil.Type getType() { + return VariantUtil.getType(value); + } + + /** + * @return the number of object fields in the variant. `getType()` must be `Type.OBJECT`. + */ + public int numObjectElements() { + return VariantUtil.getObjectInfo(value).numElements; + } + + /** + * Returns the object field Variant value whose key is equal to `key`. + * Return null if the key is not found. `getType()` must be `Type.OBJECT`. + * @param key the key to look up + * @return the field value whose key is equal to `key`, or null if key is not found + */ + public Variant getFieldByKey(String key) { + VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); + // Use linear search for a short list. Switch to binary search when the length reaches + // `BINARY_SEARCH_THRESHOLD`. + if (info.numElements < BINARY_SEARCH_THRESHOLD) { + for (int i = 0; i < info.numElements; ++i) { + ObjectField field = getFieldAtIndex( + i, + value, + metadata, + info.idSize, + info.offsetSize, + value.position() + info.idStartOffset, + value.position() + info.offsetStartOffset, + value.position() + info.dataStartOffset); + if (field.key.equals(key)) { + return field.value; + } + } + } else { + int low = 0; + int high = info.numElements - 1; + while (low <= high) { + // Use unsigned right shift to compute the middle of `low` and `high`. This is not only a + // performance optimization, because it can properly handle the case where `low + high` + // overflows int. + int mid = (low + high) >>> 1; + ObjectField field = getFieldAtIndex( + mid, + value, + metadata, + info.idSize, + info.offsetSize, + value.position() + info.idStartOffset, + value.position() + info.offsetStartOffset, + value.position() + info.dataStartOffset); + int cmp = field.key.compareTo(key); + if (cmp < 0) { + low = mid + 1; + } else if (cmp > 0) { + high = mid - 1; + } else { + return field.value; + } + } + } + return null; + } + + /** + * A field in a Variant object. + */ + public static final class ObjectField { + public final String key; + public final Variant value; + + public ObjectField(String key, Variant value) { + this.key = key; + this.value = value; + } + } + + /** + * Returns the ObjectField at the `index` slot. Return null if `index` is out of the bound of + * `[0, objectSize())`. `getType()` must be `Type.OBJECT`. + * @param index the index of the object field to get + * @return the ObjectField at the `index` slot, or null if `index` is out of bounds + */ + public ObjectField getFieldAtIndex(int index) { + VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); + if (index < 0 || index >= info.numElements) { + return null; + } + return getFieldAtIndex( + index, + value, + metadata, + info.idSize, + info.offsetSize, + value.position() + info.idStartOffset, + value.position() + info.offsetStartOffset, + value.position() + info.dataStartOffset); + } + + private static ObjectField getFieldAtIndex( + int index, + ByteBuffer value, + ByteBuffer metadata, + int idSize, + int offsetSize, + int idStart, + int offsetStart, + int dataStart) { + // idStart, offsetStart, and dataStart are absolute positions in the `value` buffer. + int id = VariantUtil.readUnsigned(value, idStart + idSize * index, idSize); + int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize); + String key = VariantUtil.getMetadataKey(metadata, id); + Variant v = new Variant(VariantUtil.slice(value, dataStart + offset), metadata); + return new ObjectField(key, v); + } + + /** + * @return the number of array elements. `getType()` must be `Type.ARRAY`. + */ + public int numArrayElements() { + return VariantUtil.getArrayInfo(value).numElements; + } + + /** + * Returns the array element Variant value at the `index` slot. Returns null if `index` is + * out of the bound of `[0, arraySize())`. `getType()` must be `Type.ARRAY`. + * @param index the index of the array element to get + * @return the array element Variant at the `index` slot, or null if `index` is out of bounds + */ + public Variant getElementAtIndex(int index) { + VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value); + if (index < 0 || index >= info.numElements) { + return null; + } + return getElementAtIndex( + index, + value, + metadata, + info.offsetSize, + value.position() + info.offsetStartOffset, + value.position() + info.dataStartOffset); + } + + private static Variant getElementAtIndex( + int index, ByteBuffer value, ByteBuffer metadata, int offsetSize, int offsetStart, int dataStart) { + // offsetStart and dataStart are absolute positions in the `value` buffer. + int offset = VariantUtil.readUnsigned(value, offsetStart + offsetSize * index, offsetSize); + return new Variant(VariantUtil.slice(value, dataStart + offset), metadata); + } + + /** + * An interface to write Variant scalar values to a JSON generator. + */ + public interface ScalarToJson { + void writeNull(JsonGenerator gen) throws IOException; + + void writeBoolean(JsonGenerator gen, boolean value) throws IOException; + + void writeByte(JsonGenerator gen, byte value) throws IOException; + + void writeShort(JsonGenerator gen, short value) throws IOException; + + void writeInt(JsonGenerator gen, int value) throws IOException; + + void writeLong(JsonGenerator gen, long value) throws IOException; + + void writeFloat(JsonGenerator gen, float value) throws IOException; + + void writeDouble(JsonGenerator gen, double value) throws IOException; + + void writeString(JsonGenerator gen, String value) throws IOException; + + void writeBinary(JsonGenerator gen, byte[] value) throws IOException; + + void writeDecimal(JsonGenerator gen, BigDecimal value) throws IOException; + + void writeUUID(JsonGenerator gen, UUID value) throws IOException; + + void writeDate(JsonGenerator gen, int value) throws IOException; + + void writeTime(JsonGenerator gen, long microsSinceMidnight) throws IOException; + + void writeTimestamp(JsonGenerator gen, long microsSinceEpoch) throws IOException; + + void writeTimestampNtz(JsonGenerator gen, long microsSinceEpoch) throws IOException; + + void writeTimestampNanos(JsonGenerator gen, long nanosSinceEpoch) throws IOException; + + void writeTimestampNanosNtz(JsonGenerator gen, long nanosSinceEpoch) throws IOException; + } + + /** + * @return the JSON representation of the variant + * @throws MalformedVariantException if the variant is malformed + */ + public String toJson() { + return toJson(new DefaultScalarToJson()); + } + + /** + * @param scalarWriter the writer to use for writing scalar values + * @return the JSON representation of the variant + * @throws MalformedVariantException if the variant is malformed + */ + public String toJson(ScalarToJson scalarWriter) { + try (CharArrayWriter writer = new CharArrayWriter(); + JsonGenerator gen = new JsonFactory().createGenerator(writer)) { + toJsonImpl(value, metadata, gen, scalarWriter); + gen.flush(); + return writer.toString(); + } catch (IOException e) { + throw new RuntimeIOException("Failed to convert variant to json", e); + } + } + + private static void toJsonImpl(ByteBuffer value, ByteBuffer metadata, JsonGenerator gen, ScalarToJson scalarWriter) + throws IOException { + switch (VariantUtil.getType(value)) { + case OBJECT: { + VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(value); + gen.writeStartObject(); + for (int i = 0; i < info.numElements; ++i) { + ObjectField field = getFieldAtIndex( + i, + value, + metadata, + info.idSize, + info.offsetSize, + value.position() + info.idStartOffset, + value.position() + info.offsetStartOffset, + value.position() + info.dataStartOffset); + gen.writeFieldName(field.key); + toJsonImpl(field.value.value, field.value.metadata, gen, scalarWriter); + } + gen.writeEndObject(); + break; + } + case ARRAY: { + VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(value); + gen.writeStartArray(); + for (int i = 0; i < info.numElements; ++i) { + Variant v = getElementAtIndex( + i, + value, + metadata, + info.offsetSize, + value.position() + info.offsetStartOffset, + value.position() + info.dataStartOffset); + toJsonImpl(v.value, v.metadata, gen, scalarWriter); + } + gen.writeEndArray(); + break; + } + case NULL: + scalarWriter.writeNull(gen); + break; + case BOOLEAN: + scalarWriter.writeBoolean(gen, VariantUtil.getBoolean(value)); + break; + case BYTE: + scalarWriter.writeByte(gen, (byte) VariantUtil.getLong(value)); + break; + case SHORT: + scalarWriter.writeShort(gen, (short) VariantUtil.getLong(value)); + break; + case INT: + scalarWriter.writeInt(gen, (int) VariantUtil.getLong(value)); + break; + case LONG: + scalarWriter.writeLong(gen, VariantUtil.getLong(value)); + break; + case STRING: + scalarWriter.writeString(gen, VariantUtil.getString(value)); + break; + case BINARY: + scalarWriter.writeBinary(gen, VariantUtil.getBinary(value)); + break; + case FLOAT: + scalarWriter.writeFloat(gen, VariantUtil.getFloat(value)); + break; + case DOUBLE: + scalarWriter.writeDouble(gen, VariantUtil.getDouble(value)); + break; + case DECIMAL4: + case DECIMAL8: + case DECIMAL16: + scalarWriter.writeDecimal(gen, VariantUtil.getDecimal(value)); + break; + case DATE: + scalarWriter.writeDate(gen, (int) VariantUtil.getLong(value)); + break; + case TIMESTAMP: + scalarWriter.writeTimestamp(gen, VariantUtil.getLong(value)); + break; + case TIMESTAMP_NTZ: + scalarWriter.writeTimestampNtz(gen, VariantUtil.getLong(value)); + break; + case TIME: + scalarWriter.writeTime(gen, VariantUtil.getLong(value)); + break; + case TIMESTAMP_NANOS: + scalarWriter.writeTimestampNanos(gen, VariantUtil.getLong(value)); + break; + case TIMESTAMP_NANOS_NTZ: + scalarWriter.writeTimestampNanosNtz(gen, VariantUtil.getLong(value)); + break; + case UUID: + scalarWriter.writeUUID(gen, VariantUtil.getUUID(value)); + break; + default: + throw new IllegalArgumentException("Unsupported type: " + VariantUtil.getType(value)); + } + } +} diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java new file mode 100644 index 0000000000..41434cc8a8 --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantBuilder.java @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.exc.InputCoercionException; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; + +/** + * Builder for creating Variant value and metadata. + */ +public class VariantBuilder { + /** + * Creates a VariantBuilder. + * @param allowDuplicateKeys if true, only the last occurrence of a duplicate key will be kept. + * Otherwise, an exception will be thrown. + */ + public VariantBuilder(boolean allowDuplicateKeys) { + this.allowDuplicateKeys = allowDuplicateKeys; + } + + /** + * Parse a JSON string as a Variant value. + * @param json the JSON string to parse + * @return the Variant value + * @throws IOException if any JSON parsing error happens + * the size limit + */ + public static Variant parseJson(String json) throws IOException { + return parseJson(json, new VariantBuilder(false)); + } + + /** + * Parse a JSON string as a Variant value. + * @param json the JSON string to parse + * @param builder the VariantBuilder to use for building the Variant + * @return the Variant value + * @throws IOException if any JSON parsing error happens + */ + public static Variant parseJson(String json, VariantBuilder builder) throws IOException { + try (JsonParser parser = new JsonFactory().createParser(json)) { + parser.nextToken(); + return parseJson(parser, builder); + } + } + + /** + * Parse a JSON parser as a Variant value. + * @param parser the JSON parser to use + * @param builder the VariantBuilder to use for building the Variant + * @return the Variant value + * @throws IOException if any JSON parsing error happens + */ + public static Variant parseJson(JsonParser parser, VariantBuilder builder) throws IOException { + builder.buildFromJsonParser(parser); + return builder.result(); + } + + /** + * @return the Variant value + */ + public Variant result() { + int numKeys = dictionaryKeys.size(); + // Use long to avoid overflow in accumulating lengths. + long dictionaryStringSize = 0; + for (byte[] key : dictionaryKeys) { + dictionaryStringSize += key.length; + } + // Determine the number of bytes required per offset entry. + // The largest offset is the one-past-the-end value, which is total string size. It's very + // unlikely that the number of keys could be larger, but incorporate that into the calculation + // in case of pathological data. + long maxSize = Math.max(dictionaryStringSize, numKeys); + int offsetSize = getMinIntegerSize((int) maxSize); + + int offsetStart = 1 + offsetSize; + int stringStart = offsetStart + (numKeys + 1) * offsetSize; + long metadataSize = stringStart + dictionaryStringSize; + + byte[] metadata = new byte[(int) metadataSize]; + int headerByte = VariantUtil.VERSION | ((offsetSize - 1) << 6); + VariantUtil.writeLong(metadata, 0, headerByte, 1); + VariantUtil.writeLong(metadata, 1, numKeys, offsetSize); + int currentOffset = 0; + for (int i = 0; i < numKeys; ++i) { + VariantUtil.writeLong(metadata, offsetStart + i * offsetSize, currentOffset, offsetSize); + byte[] key = dictionaryKeys.get(i); + System.arraycopy(key, 0, metadata, stringStart + currentOffset, key.length); + currentOffset += key.length; + } + VariantUtil.writeLong(metadata, offsetStart + numKeys * offsetSize, currentOffset, offsetSize); + return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); + } + + public void appendString(String str) { + byte[] text = str.getBytes(StandardCharsets.UTF_8); + boolean longStr = text.length > VariantUtil.MAX_SHORT_STR_SIZE; + checkCapacity((longStr ? 1 + VariantUtil.U32_SIZE : 1) + text.length); + if (longStr) { + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.LONG_STR); + VariantUtil.writeLong(writeBuffer, writePos, text.length, VariantUtil.U32_SIZE); + writePos += VariantUtil.U32_SIZE; + } else { + writeBuffer[writePos++] = VariantUtil.shortStrHeader(text.length); + } + System.arraycopy(text, 0, writeBuffer, writePos, text.length); + writePos += text.length; + } + + public void appendNull() { + checkCapacity(1); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.NULL); + } + + public void appendBoolean(boolean b) { + checkCapacity(1); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(b ? VariantUtil.TRUE : VariantUtil.FALSE); + } + + /** + * Appends a long value to the variant builder. The actual encoded integer type depends on the + * value range of the long value. + * @param l the long value to append + */ + public void appendLong(long l) { + if (l == (byte) l) { + checkCapacity(1 + 1); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.INT8); + VariantUtil.writeLong(writeBuffer, writePos, l, 1); + writePos += 1; + } else if (l == (short) l) { + checkCapacity(1 + 2); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.INT16); + VariantUtil.writeLong(writeBuffer, writePos, l, 2); + writePos += 2; + } else if (l == (int) l) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.INT32); + VariantUtil.writeLong(writeBuffer, writePos, l, 4); + writePos += 4; + } else { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.INT64); + VariantUtil.writeLong(writeBuffer, writePos, l, 8); + writePos += 8; + } + } + + public void appendDouble(double d) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.DOUBLE); + VariantUtil.writeLong(writeBuffer, writePos, Double.doubleToLongBits(d), 8); + writePos += 8; + } + + /** + * Appends a decimal value to the variant builder. The actual encoded decimal type depends on the + * precision and scale of the decimal value. + * @param d the decimal value to append + */ + public void appendDecimal(BigDecimal d) { + BigInteger unscaled = d.unscaledValue(); + if (d.scale() <= VariantUtil.MAX_DECIMAL4_PRECISION && d.precision() <= VariantUtil.MAX_DECIMAL4_PRECISION) { + checkCapacity(2 + 4); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.DECIMAL4); + writeBuffer[writePos++] = (byte) d.scale(); + VariantUtil.writeLong(writeBuffer, writePos, unscaled.intValueExact(), 4); + writePos += 4; + } else if (d.scale() <= VariantUtil.MAX_DECIMAL8_PRECISION + && d.precision() <= VariantUtil.MAX_DECIMAL8_PRECISION) { + checkCapacity(2 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.DECIMAL8); + writeBuffer[writePos++] = (byte) d.scale(); + VariantUtil.writeLong(writeBuffer, writePos, unscaled.longValueExact(), 8); + writePos += 8; + } else { + assert d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION + && d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION; + checkCapacity(2 + 16); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.DECIMAL16); + writeBuffer[writePos++] = (byte) d.scale(); + // `toByteArray` returns a big-endian representation. We need to copy it reversely and sign + // extend it to 16 bytes. + byte[] bytes = unscaled.toByteArray(); + for (int i = 0; i < bytes.length; ++i) { + writeBuffer[writePos + i] = bytes[bytes.length - 1 - i]; + } + byte sign = (byte) (bytes[0] < 0 ? -1 : 0); + for (int i = bytes.length; i < 16; ++i) { + writeBuffer[writePos + i] = sign; + } + writePos += 16; + } + } + + public void appendDate(int daysSinceEpoch) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.DATE); + VariantUtil.writeLong(writeBuffer, writePos, daysSinceEpoch, 4); + writePos += 4; + } + + public void appendTimestamp(long microsSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.TIMESTAMP); + VariantUtil.writeLong(writeBuffer, writePos, microsSinceEpoch, 8); + writePos += 8; + } + + public void appendTimestampNtz(long microsSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.TIMESTAMP_NTZ); + VariantUtil.writeLong(writeBuffer, writePos, microsSinceEpoch, 8); + writePos += 8; + } + + public void appendTime(long microsSinceMidnight) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.TIME); + VariantUtil.writeLong(writeBuffer, writePos, microsSinceMidnight, 8); + writePos += 8; + } + + public void appendTimestampNanos(long nanosSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.TIMESTAMP_NANOS); + VariantUtil.writeLong(writeBuffer, writePos, nanosSinceEpoch, 8); + writePos += 8; + } + + public void appendTimestampNanosNtz(long nanosSinceEpoch) { + checkCapacity(1 + 8); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.TIMESTAMP_NANOS_NTZ); + VariantUtil.writeLong(writeBuffer, writePos, nanosSinceEpoch, 8); + writePos += 8; + } + + public void appendFloat(float f) { + checkCapacity(1 + 4); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.FLOAT); + VariantUtil.writeLong(writeBuffer, writePos, Float.floatToIntBits(f), 8); + writePos += 4; + } + + public void appendBinary(byte[] binary) { + checkCapacity(1 + VariantUtil.U32_SIZE + binary.length); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.BINARY); + VariantUtil.writeLong(writeBuffer, writePos, binary.length, VariantUtil.U32_SIZE); + writePos += VariantUtil.U32_SIZE; + System.arraycopy(binary, 0, writeBuffer, writePos, binary.length); + writePos += binary.length; + } + + public void appendUUID(java.util.UUID uuid) { + checkCapacity(1 + VariantUtil.UUID_SIZE); + writeBuffer[writePos++] = VariantUtil.primitiveHeader(VariantUtil.UUID); + + ByteBuffer bb = + ByteBuffer.wrap(writeBuffer, writePos, VariantUtil.UUID_SIZE).order(ByteOrder.BIG_ENDIAN); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + writePos += VariantUtil.UUID_SIZE; + } + + /** + * Adds a key to the Variant dictionary. If the key already exists, the dictionary is unmodified. + * @param key the key to add + * @return the id of the key + */ + public int addKey(String key) { + return dictionary.computeIfAbsent(key, newKey -> { + int id = dictionaryKeys.size(); + dictionaryKeys.add(newKey.getBytes(StandardCharsets.UTF_8)); + return id; + }); + } + + /** + * @return the current write position of the variant builder + */ + public int getWritePos() { + return writePos; + } + + /** + * Finish writing a Variant object after all of its fields have already been written. The process + * is as follows: + * 1. The caller calls `getWritePos()` before writing any fields to obtain the `start` parameter. + * 2. The caller appends all the object fields to the builder. In the meantime, it should maintain + * the `fields` parameter. Before appending each field, it should append an entry to `fields` to + * record the offset of the field. The offset is computed as `getWritePos() - start`. + * 3. The caller calls `finishWritingObject` to finish writing the Variant object. + * + * This method will sort the fields by key. If there are duplicate field keys: + * - when `allowDuplicateKeys` is true, the field with the greatest offset value (the last + * appended one) is kept. + * - otherwise, throw an exception. + * @param start the start position of the object in the write buffer + * @param fields the list of `FieldEntry` in the object + * @throws VariantDuplicateKeyException if there are duplicate keys and `allowDuplicateKeys` is + * false + */ + public void finishWritingObject(int start, ArrayList fields) { + int size = fields.size(); + Collections.sort(fields); + int maxId = size == 0 ? 0 : fields.get(0).id; + if (allowDuplicateKeys) { + int distinctPos = 0; + // Maintain a list of distinct keys in-place. + for (int i = 1; i < size; ++i) { + maxId = Math.max(maxId, fields.get(i).id); + if (fields.get(i).id == fields.get(i - 1).id) { + // Found a duplicate key. Keep the field with the greater offset. + if (fields.get(distinctPos).offset < fields.get(i).offset) { + fields.set(distinctPos, fields.get(distinctPos).withNewOffset(fields.get(i).offset)); + } + } else { + // Found a distinct key. Add the field to the list. + ++distinctPos; + fields.set(distinctPos, fields.get(i)); + } + } + if (distinctPos + 1 < fields.size()) { + size = distinctPos + 1; + // Resize `fields` to `size`. + fields.subList(size, fields.size()).clear(); + // Sort the fields by offsets so that we can move the value data of each field to the new + // offset without overwriting the fields after it. + fields.sort(Comparator.comparingInt(f -> f.offset)); + int currentOffset = 0; + for (int i = 0; i < size; ++i) { + int oldOffset = fields.get(i).offset; + int fieldSize = VariantUtil.valueSize(writeBuffer, start + oldOffset); + System.arraycopy(writeBuffer, start + oldOffset, writeBuffer, start + currentOffset, fieldSize); + fields.set(i, fields.get(i).withNewOffset(currentOffset)); + currentOffset += fieldSize; + } + writePos = start + currentOffset; + // Change back to the sort order by field keys, required by the Variant specification. + Collections.sort(fields); + } + } else { + for (int i = 1; i < size; ++i) { + maxId = Math.max(maxId, fields.get(i).id); + String key = fields.get(i).key; + if (key.equals(fields.get(i - 1).key)) { + throw new VariantDuplicateKeyException(key); + } + } + } + int dataSize = writePos - start; + boolean largeSize = size > VariantUtil.U8_MAX; + int sizeBytes = largeSize ? VariantUtil.U32_SIZE : 1; + int idSize = getMinIntegerSize(maxId); + int offsetSize = getMinIntegerSize(dataSize); + // The space for header byte, object size, id list, and offset list. + int headerSize = 1 + sizeBytes + size * idSize + (size + 1) * offsetSize; + checkCapacity(headerSize); + // Shift the just-written field data to make room for the object header section. + System.arraycopy(writeBuffer, start, writeBuffer, start + headerSize, dataSize); + writePos += headerSize; + writeBuffer[start] = VariantUtil.objectHeader(largeSize, idSize, offsetSize); + VariantUtil.writeLong(writeBuffer, start + 1, size, sizeBytes); + int idStart = start + 1 + sizeBytes; + int offsetStart = idStart + size * idSize; + for (int i = 0; i < size; ++i) { + VariantUtil.writeLong(writeBuffer, idStart + i * idSize, fields.get(i).id, idSize); + VariantUtil.writeLong(writeBuffer, offsetStart + i * offsetSize, fields.get(i).offset, offsetSize); + } + VariantUtil.writeLong(writeBuffer, offsetStart + size * offsetSize, dataSize, offsetSize); + } + + /** + * Finish writing a Variant array after all of its elements have already been written. The process + * is similar to that of `finishWritingObject`. + * @param start the start position of the array in the write buffer + * @param offsets the list of offsets of the array elements + */ + public void finishWritingArray(int start, ArrayList offsets) { + int dataSize = writePos - start; + int size = offsets.size(); + boolean largeSize = size > VariantUtil.U8_MAX; + int sizeBytes = largeSize ? VariantUtil.U32_SIZE : 1; + int offsetSize = getMinIntegerSize(dataSize); + // The space for header byte, object size, and offset list. + int headerSize = 1 + sizeBytes + (size + 1) * offsetSize; + checkCapacity(headerSize); + // Shift the just-written field data to make room for the header section. + System.arraycopy(writeBuffer, start, writeBuffer, start + headerSize, dataSize); + writePos += headerSize; + writeBuffer[start] = VariantUtil.arrayHeader(largeSize, offsetSize); + VariantUtil.writeLong(writeBuffer, start + 1, size, sizeBytes); + int offsetStart = start + 1 + sizeBytes; + for (int i = 0; i < size; ++i) { + VariantUtil.writeLong(writeBuffer, offsetStart + i * offsetSize, offsets.get(i), offsetSize); + } + VariantUtil.writeLong(writeBuffer, offsetStart + size * offsetSize, dataSize, offsetSize); + } + + /** + * Appends a Variant value to the Variant builder. The input Variant keys must be inserted into + * the builder dictionary and rebuilt with new field ids. For scalar values in the input + * Variant, we can directly copy the binary slice. + * @param v the Variant value to append + */ + public void appendVariant(Variant v) { + appendVariantImpl(v.value, v.value.position(), v.metadata); + } + + private void appendVariantImpl(ByteBuffer value, int valuePos, ByteBuffer metadata) { + VariantUtil.checkIndex(valuePos, value.limit()); + int basicType = value.get(valuePos) & VariantUtil.BASIC_TYPE_MASK; + switch (basicType) { + case VariantUtil.OBJECT: { + VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(VariantUtil.slice(value, valuePos)); + ArrayList fields = new ArrayList<>(info.numElements); + int start = writePos; + for (int i = 0; i < info.numElements; ++i) { + int id = VariantUtil.readUnsigned( + value, valuePos + info.idStartOffset + info.idSize * i, info.idSize); + int offset = VariantUtil.readUnsigned( + value, valuePos + info.offsetStartOffset + info.offsetSize * i, info.offsetSize); + int elementPos = valuePos + info.dataStartOffset + offset; + String key = VariantUtil.getMetadataKey(metadata, id); + int newId = addKey(key); + fields.add(new FieldEntry(key, newId, writePos - start)); + appendVariantImpl(value, elementPos, metadata); + } + finishWritingObject(start, fields); + break; + } + case VariantUtil.ARRAY: { + VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(VariantUtil.slice(value, valuePos)); + ArrayList offsets = new ArrayList<>(info.numElements); + int start = writePos; + for (int i = 0; i < info.numElements; ++i) { + int offset = VariantUtil.readUnsigned( + value, valuePos + info.offsetStartOffset + info.offsetSize * i, info.offsetSize); + int elementPos = valuePos + info.dataStartOffset + offset; + offsets.add(writePos - start); + appendVariantImpl(value, elementPos, metadata); + } + finishWritingArray(start, offsets); + break; + } + default: + shallowAppendVariantImpl(value, valuePos); + break; + } + } + + private void shallowAppendVariantImpl(ByteBuffer value, int valuePos) { + int size = VariantUtil.valueSize(value, valuePos); + VariantUtil.checkIndex(valuePos + size - 1, value.limit()); + checkCapacity(size); + VariantUtil.slice(value, valuePos).get(writeBuffer, writePos, size); + writePos += size; + } + + private void checkCapacity(int additionalBytes) { + int requiredBytes = writePos + additionalBytes; + if (requiredBytes > writeBuffer.length) { + // Allocate a new buffer with a capacity of the next power of 2 of `requiredBytes`. + int newCapacity = Integer.highestOneBit(requiredBytes); + newCapacity = newCapacity < requiredBytes ? newCapacity * 2 : newCapacity; + byte[] newValue = new byte[newCapacity]; + System.arraycopy(writeBuffer, 0, newValue, 0, writePos); + writeBuffer = newValue; + } + } + + /** + * Class to store the information of a Variant object field. We need to collect all fields of + * an object, sort them by their keys, and build the Variant object in sorted order. + */ + public static final class FieldEntry implements Comparable { + final String key; + final int id; + final int offset; + + public FieldEntry(String key, int id, int offset) { + this.key = key; + this.id = id; + this.offset = offset; + } + + FieldEntry withNewOffset(int newOffset) { + return new FieldEntry(key, id, newOffset); + } + + @Override + public int compareTo(FieldEntry other) { + return key.compareTo(other.key); + } + } + + private void buildFromJsonParser(JsonParser parser) throws IOException { + JsonToken token = parser.currentToken(); + if (token == null) { + throw new JsonParseException(parser, "Unexpected null token"); + } + switch (token) { + case START_OBJECT: { + ArrayList fields = new ArrayList<>(); + int start = writePos; + while (parser.nextToken() != JsonToken.END_OBJECT) { + String key = parser.currentName(); + parser.nextToken(); + int id = addKey(key); + fields.add(new FieldEntry(key, id, writePos - start)); + buildFromJsonParser(parser); + } + finishWritingObject(start, fields); + break; + } + case START_ARRAY: { + ArrayList offsets = new ArrayList<>(); + int start = writePos; + while (parser.nextToken() != JsonToken.END_ARRAY) { + offsets.add(writePos - start); + buildFromJsonParser(parser); + } + finishWritingArray(start, offsets); + break; + } + case VALUE_STRING: + appendString(parser.getText()); + break; + case VALUE_NUMBER_INT: + try { + appendLong(parser.getLongValue()); + } catch (InputCoercionException ignored) { + // If the value doesn't fit any integer type, try to parse it as decimal instead. + if (!tryParseDecimal(parser.getText())) { + throw new JsonParseException(parser, "Cannot parse token as int/decimal. token: " + token); + } + } + break; + case VALUE_NUMBER_FLOAT: + parseAndAppendFloatingPoint(parser); + break; + case VALUE_TRUE: + appendBoolean(true); + break; + case VALUE_FALSE: + appendBoolean(false); + break; + case VALUE_NULL: + appendNull(); + break; + default: + throw new JsonParseException(parser, "Unexpected token " + token); + } + } + + /** + * Returns the size (number of bytes) of the smallest unsigned integer type that can store + * `value`. It must be within `[0, U24_MAX]`. + * @param value the value to get the size for + * @return the size (number of bytes) of the smallest unsigned integer type that can store `value` + */ + private int getMinIntegerSize(int value) { + assert value >= 0 && value <= VariantUtil.U24_MAX; + if (value <= VariantUtil.U8_MAX) { + return VariantUtil.U8_SIZE; + } + if (value <= VariantUtil.U16_MAX) { + return VariantUtil.U16_SIZE; + } + return VariantUtil.U24_SIZE; + } + + /** + * Parse a JSON number as a floating point value. If the number can be parsed as a decimal, it + * will be appended as a decimal value. Otherwise, it will be appended as a double value. + * @param parser the JSON parser to use + */ + private void parseAndAppendFloatingPoint(JsonParser parser) throws IOException { + if (!tryParseDecimal(parser.getText())) { + appendDouble(parser.getDoubleValue()); + } + } + + /** + * Try to parse a JSON number as a decimal. The input must only use the decimal format + * (an integer value with an optional '.' in it) and must not use scientific notation. It also + * must fit into the precision limitation of decimal types. + * @param input the input string to parse as decimal + * @return whether the parsing succeeds + */ + private boolean tryParseDecimal(String input) { + for (int i = 0; i < input.length(); ++i) { + char ch = input.charAt(i); + if (ch != '-' && ch != '.' && !(ch >= '0' && ch <= '9')) { + return false; + } + } + BigDecimal d = new BigDecimal(input); + if (d.scale() <= VariantUtil.MAX_DECIMAL16_PRECISION && d.precision() <= VariantUtil.MAX_DECIMAL16_PRECISION) { + appendDecimal(d); + return true; + } + return false; + } + + /** The buffer for building the Variant value. The first `writePos` bytes have been written. */ + private byte[] writeBuffer = new byte[128]; + + private int writePos = 0; + /** The dictionary for mapping keys to monotonically increasing ids. */ + private final HashMap dictionary = new HashMap<>(); + /** The keys in the dictionary, in id order. */ + private final ArrayList dictionaryKeys = new ArrayList<>(); + + private final boolean allowDuplicateKeys; +} diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantDuplicateKeyException.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantDuplicateKeyException.java new file mode 100644 index 0000000000..12e94416c4 --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantDuplicateKeyException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +/** + * An exception indicating that the Variant contains a duplicate key. + */ +public class VariantDuplicateKeyException extends RuntimeException { + public final String key; + + /** + * @param key the key that was duplicated + */ + public VariantDuplicateKeyException(String key) { + super("Failed to build Variant because of duplicate object key: " + key); + this.key = key; + } + + /** + * @return the key that was duplicated + */ + public String getKey() { + return key; + } +} diff --git a/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java new file mode 100644 index 0000000000..9c4c13472e --- /dev/null +++ b/parquet-variant/src/main/java/org/apache/parquet/variant/VariantUtil.java @@ -0,0 +1,795 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.parquet.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; + +/** + * This class defines constants related to the Variant format and provides functions for + * manipulating Variant binaries. + * + * A Variant is made up of 2 binaries: value and metadata. A Variant value consists of a one-byte + * header and a number of content bytes (can be zero). The header byte is divided into upper 6 bits + * (called "type info") and lower 2 bits (called "basic type"). The content format is explained in + * the below constants for all possible basic type and type info values. + * + * The Variant metadata includes a version id and a dictionary of distinct strings (case-sensitive). + * Its binary format is: + * - Version: 1-byte unsigned integer. The only acceptable value is 1 currently. + * - Dictionary size: 4-byte little-endian unsigned integer. The number of keys in the + * dictionary. + * - Offsets: (size + 1) * 4-byte little-endian unsigned integers. `offsets[i]` represents the + * starting position of string i, counting starting from the address of `offsets[0]`. Strings + * must be stored contiguously, so we don’t need to store the string size, instead, we compute it + * with `offset[i + 1] - offset[i]`. + * - UTF-8 string data. + */ +public class VariantUtil { + public static final int BASIC_TYPE_BITS = 2; + public static final int BASIC_TYPE_MASK = 0b00000011; + public static final int PRIMITIVE_TYPE_MASK = 0b00111111; + /** The inclusive maximum value of the type info value. It is the size limit of `SHORT_STR`. */ + public static final int MAX_SHORT_STR_SIZE = 0b00111111; + + // The basic types + + /** + * Primitive value. + * The type info value must be one of the values in the "Primitive" section below. + */ + public static final int PRIMITIVE = 0; + /** + * Short string value. + * The type info value is the string size, which must be in `[0, MAX_SHORT_STR_SIZE]`. + * The string content bytes directly follow the header byte. + */ + public static final int SHORT_STR = 1; + /** + * Object value. + * The content contains a size, a list of field ids, a list of field offsets, and + * the actual field values. The list of field ids has `size` ids, while the list of field offsets + * has `size + 1` offsets, where the last offset represents the total size of the field values + * data. The list of fields ids must be sorted by the field name in alphabetical order. + * Duplicate field names within one object are not allowed. + * 5 bits in the type info are used to specify the integer type of the object header. It is + * 0_b4_b3b2_b1b0 (MSB is 0), where: + * - b4: the integer type of size. When it is 0/1, `size` is a little-endian 1/4-byte + * unsigned integer. + * - b3b2: the integer type of ids. When the 2 bits are 0/1/2, the id list contains + * 1/2/3-byte little-endian unsigned integers. + * - b1b0: the integer type of offset. When the 2 bits are 0/1/2, the offset list contains + * 1/2/3-byte little-endian unsigned integers. + */ + public static final int OBJECT = 2; + /** + * Array value. + * The content contains a size, a list of field offsets, and the actual element values. + * It is similar to an object without the id list. The length of the offset list + * is `size + 1`, where the last offset represent the total size of the element data. + * Its type info is: 000_b2_b1b0: + * - b2: the type of size. + * - b1b0: the integer type of offset. + */ + public static final int ARRAY = 3; + + // The primitive types + + /** JSON Null value. Empty content. */ + public static final int NULL = 0; + /** True value. Empty content. */ + public static final int TRUE = 1; + /** False value. Empty content. */ + public static final int FALSE = 2; + /** 1-byte little-endian signed integer. */ + public static final int INT8 = 3; + /** 2-byte little-endian signed integer. */ + public static final int INT16 = 4; + /** 4-byte little-endian signed integer. */ + public static final int INT32 = 5; + /** 4-byte little-endian signed integer. */ + public static final int INT64 = 6; + /** 8-byte IEEE double. */ + public static final int DOUBLE = 7; + /** 4-byte decimal. Content is 1-byte scale + 4-byte little-endian signed integer. */ + public static final int DECIMAL4 = 8; + /** 8-byte decimal. Content is 1-byte scale + 8-byte little-endian signed integer. */ + public static final int DECIMAL8 = 9; + /** 16-byte decimal. Content is 1-byte scale + 16-byte little-endian signed integer. */ + public static final int DECIMAL16 = 10; + /** + * Date value. Content is 4-byte little-endian signed integer that represents the + * number of days from the Unix epoch. + */ + public static final int DATE = 11; + /** + * Timestamp value. Content is 8-byte little-endian signed integer that represents the number of + * microseconds elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. It is displayed to users in + * their local time zones and may be displayed differently depending on the execution environment. + */ + public static final int TIMESTAMP = 12; + /** + * Timestamp_ntz value. It has the same content as `TIMESTAMP` but should always be interpreted + * as if the local time zone is UTC. + */ + public static final int TIMESTAMP_NTZ = 13; + /** 4-byte IEEE float. */ + public static final int FLOAT = 14; + /** + * Binary value. The content is (4-byte little-endian unsigned integer representing the binary + * size) + (size bytes of binary content). + */ + public static final int BINARY = 15; + /** + * Long string value. The content is (4-byte little-endian unsigned integer representing the + * string size) + (size bytes of string content). + */ + public static final int LONG_STR = 16; + /** + * Time value. Values can be from 00:00:00 to 23:59:59.999999. + * Content is 8-byte little-endian unsigned integer that represents the number of microseconds + * since midnight. + */ + public static final int TIME = 17; + /** + * Timestamp nanos value. Similar to `TIMESTAMP`, but represents the number of nanoseconds + * elapsed since the Unix epoch, 1970-01-01 00:00:00 UTC. + */ + public static final int TIMESTAMP_NANOS = 18; + /** + * Timestamp nanos (without timestamp) value. It has the same content as `TIMESTAMP_NANOS` but + * should always be interpreted as if the local time zone is UTC. + */ + public static final int TIMESTAMP_NANOS_NTZ = 19; + /** + * UUID value. The content is a 16-byte binary, encoded using big-endian. + * For example, UUID 00112233-4455-6677-8899-aabbccddeeff is encoded as the bytes + * 00 11 22 33 44 55 66 77 88 99 aa bb cc dd ee ff. + */ + public static final int UUID = 20; + + // The metadata version. + public static final byte VERSION = 1; + // The lower 4 bits of the first metadata byte contain the version. + public static final byte VERSION_MASK = 0x0F; + + // Constants for various unsigned integer sizes. + public static final int U8_MAX = 0xFF; + public static final int U16_MAX = 0xFFFF; + public static final int U24_MAX = 0xFFFFFF; + public static final int U8_SIZE = 1; + public static final int U16_SIZE = 2; + public static final int U24_SIZE = 3; + public static final int U32_SIZE = 4; + + // Max decimal precision for each decimal type. + public static final int MAX_DECIMAL4_PRECISION = 9; + public static final int MAX_DECIMAL8_PRECISION = 18; + public static final int MAX_DECIMAL16_PRECISION = 38; + + // The size (in bytes) of a UUID. + public static final int UUID_SIZE = 16; + + /** + * Write the least significant `numBytes` bytes in `value` into `bytes[pos, pos + numBytes)` in + * little endian. + * @param bytes The byte array to write into + * @param pos The starting index of the byte array to write into + * @param value The value to write + * @param numBytes The number of bytes to write + */ + public static void writeLong(byte[] bytes, int pos, long value, int numBytes) { + for (int i = 0; i < numBytes; ++i) { + bytes[pos + i] = (byte) ((value >>> (8 * i)) & 0xFF); + } + } + + public static byte primitiveHeader(int type) { + return (byte) (type << 2 | PRIMITIVE); + } + + public static byte shortStrHeader(int size) { + return (byte) (size << 2 | SHORT_STR); + } + + public static byte objectHeader(boolean largeSize, int idSize, int offsetSize) { + return (byte) (((largeSize ? 1 : 0) << (BASIC_TYPE_BITS + 4)) + | ((idSize - 1) << (BASIC_TYPE_BITS + 2)) + | ((offsetSize - 1) << BASIC_TYPE_BITS) + | OBJECT); + } + + public static byte arrayHeader(boolean largeSize, int offsetSize) { + return (byte) (((largeSize ? 1 : 0) << (BASIC_TYPE_BITS + 2)) | ((offsetSize - 1) << BASIC_TYPE_BITS) | ARRAY); + } + + /** + * Check the validity of an array index `pos`. + * @param pos The index to check + * @param length The length of the array + * @throws IllegalArgumentException if the index is out of bound + */ + public static void checkIndex(int pos, int length) { + if (pos < 0 || pos >= length) { + throw new IllegalArgumentException( + String.format("Invalid byte-array offset (%d). length: %d", pos, length)); + } + } + + /** + * Reads a little-endian signed long value from `buffer[pos, pos + numBytes)`. + * @param buffer The ByteBuffer to read from + * @param pos The starting index of the buffer to read from + * @param numBytes The number of bytes to read + * @return The long value + */ + static long readLong(ByteBuffer buffer, int pos, int numBytes) { + checkIndex(pos, buffer.limit()); + checkIndex(pos + numBytes - 1, buffer.limit()); + long result = 0; + // All bytes except the most significant byte should be unsigned-extended and shifted + // (so we need & 0xFF`). The most significant byte should be sign-extended and is handled + // after the loop. + for (int i = 0; i < numBytes - 1; ++i) { + long unsignedByteValue = buffer.get(pos + i) & 0xFF; + result |= unsignedByteValue << (8 * i); + } + long signedByteValue = buffer.get(pos + numBytes - 1); + result |= signedByteValue << (8 * (numBytes - 1)); + return result; + } + + /** + * Read a little-endian unsigned int value from `bytes[pos, pos + numBytes)`. The value must fit + * into a non-negative int (`[0, Integer.MAX_VALUE]`). + */ + static int readUnsigned(byte[] bytes, int pos, int numBytes) { + checkIndex(pos, bytes.length); + checkIndex(pos + numBytes - 1, bytes.length); + int result = 0; + // Similar to the `readLong` loop, but all bytes should be unsigned-extended. + for (int i = 0; i < numBytes; ++i) { + int unsignedByteValue = bytes[pos + i] & 0xFF; + result |= unsignedByteValue << (8 * i); + } + if (result < 0) { + throw new IllegalArgumentException(String.format("Failed to read unsigned int. numBytes: %d", numBytes)); + } + return result; + } + + /** + * Read a little-endian unsigned int value from `bytes[pos, pos + numBytes)`. The value must fit + * into a non-negative int (`[0, Integer.MAX_VALUE]`). + */ + static int readUnsigned(ByteBuffer bytes, int pos, int numBytes) { + checkIndex(pos, bytes.limit()); + checkIndex(pos + numBytes - 1, bytes.limit()); + int result = 0; + // Similar to the `readLong` loop, but all bytes should be unsigned-extended. + for (int i = 0; i < numBytes; ++i) { + int unsignedByteValue = bytes.get(pos + i) & 0xFF; + result |= unsignedByteValue << (8 * i); + } + if (result < 0) { + throw new IllegalArgumentException(String.format("Failed to read unsigned int. numBytes: %d", numBytes)); + } + return result; + } + + /** + * The value type of Variant value. It is determined by the header byte. + */ + public enum Type { + OBJECT, + ARRAY, + NULL, + BOOLEAN, + BYTE, + SHORT, + INT, + LONG, + STRING, + DOUBLE, + DECIMAL4, + DECIMAL8, + DECIMAL16, + DATE, + TIMESTAMP, + TIMESTAMP_NTZ, + FLOAT, + BINARY, + TIME, + TIMESTAMP_NANOS, + TIMESTAMP_NANOS_NTZ, + UUID + } + + /** + * Returns the value type of Variant value `value[pos...]`. It is only legal to call `get*` if + * `getType` returns the corresponding type. For example, it is only legal to call + * `getLong` if this method returns `Type.Long`. + * @param value The Variant value to get the type from + * @return The type of the Variant value + */ + public static Type getType(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + switch (basicType) { + case SHORT_STR: + return Type.STRING; + case OBJECT: + return Type.OBJECT; + case ARRAY: + return Type.ARRAY; + default: + switch (typeInfo) { + case NULL: + return Type.NULL; + case TRUE: + case FALSE: + return Type.BOOLEAN; + case INT8: + return Type.BYTE; + case INT16: + return Type.SHORT; + case INT32: + return Type.INT; + case INT64: + return Type.LONG; + case DOUBLE: + return Type.DOUBLE; + case DECIMAL4: + return Type.DECIMAL4; + case DECIMAL8: + return Type.DECIMAL8; + case DECIMAL16: + return Type.DECIMAL16; + case DATE: + return Type.DATE; + case TIMESTAMP: + return Type.TIMESTAMP; + case TIMESTAMP_NTZ: + return Type.TIMESTAMP_NTZ; + case FLOAT: + return Type.FLOAT; + case BINARY: + return Type.BINARY; + case LONG_STR: + return Type.STRING; + case TIME: + return Type.TIME; + case TIMESTAMP_NANOS: + return Type.TIMESTAMP_NANOS; + case TIMESTAMP_NANOS_NTZ: + return Type.TIMESTAMP_NANOS_NTZ; + case UUID: + return Type.UUID; + default: + throw new UnknownVariantTypeException(typeInfo); + } + } + } + + /** + * Computes the actual size (in bytes) of the Variant value at `value[pos...]`. + * `value.length - pos` is an upper bound of the size, but the actual size may be smaller. + * @param value The Variant value + * @param pos The starting index of the Variant value + * @return The actual size of the Variant value + */ + public static int valueSize(byte[] value, int pos) { + return valueSize(ByteBuffer.wrap(value), pos); + } + + public static int valueSize(ByteBuffer value, int pos) { + checkIndex(pos, value.limit()); + int basicType = value.get(pos) & BASIC_TYPE_MASK; + int typeInfo = (value.get(pos) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + switch (basicType) { + case SHORT_STR: + return 1 + typeInfo; + case OBJECT: { + VariantUtil.ObjectInfo info = VariantUtil.getObjectInfo(slice(value, pos)); + return info.dataStartOffset + + readUnsigned( + value, + pos + info.offsetStartOffset + info.numElements * info.offsetSize, + info.offsetSize); + } + case ARRAY: { + VariantUtil.ArrayInfo info = VariantUtil.getArrayInfo(slice(value, pos)); + return info.dataStartOffset + + readUnsigned( + value, + pos + info.offsetStartOffset + info.numElements * info.offsetSize, + info.offsetSize); + } + default: + switch (typeInfo) { + case NULL: + case TRUE: + case FALSE: + return 1; + case INT8: + return 2; + case INT16: + return 3; + case INT32: + case DATE: + case FLOAT: + return 5; + case INT64: + case DOUBLE: + case TIMESTAMP: + case TIMESTAMP_NTZ: + case TIME: + case TIMESTAMP_NANOS: + case TIMESTAMP_NANOS_NTZ: + return 9; + case DECIMAL4: + return 6; + case DECIMAL8: + return 10; + case DECIMAL16: + return 18; + case BINARY: + case LONG_STR: + return 1 + U32_SIZE + readUnsigned(value, pos + 1, U32_SIZE); + case UUID: + return 1 + UUID_SIZE; + default: + throw new UnknownVariantTypeException(typeInfo); + } + } + } + + private static IllegalArgumentException unexpectedType(Type type) { + return new IllegalArgumentException("Expected type to be " + type); + } + + private static IllegalArgumentException unexpectedType(Type[] types) { + return new IllegalArgumentException("Expected type to be one of: " + Arrays.toString(types)); + } + + public static boolean getBoolean(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != PRIMITIVE || (typeInfo != TRUE && typeInfo != FALSE)) { + throw unexpectedType(Type.BOOLEAN); + } + return typeInfo == TRUE; + } + + /** + * Returns a long value from Variant value `value[pos...]`. + * It is only legal to call it if `getType` returns one of Type.BYTE, SHORT, INT, LONG, + * DATE, TIMESTAMP, TIMESTAMP_NTZ, TIME, TIMESTAMP_NANOS, TIMESTAMP_NANOS_NTZ. + * If the type is `DATE`, the return value is guaranteed to fit into an int and + * represents the number of days from the Unix epoch. + * If the type is `TIMESTAMP/TIMESTAMP_NTZ`, the return value represents the number of + * microseconds from the Unix epoch. + * If the type is `TIME`, the return value represents the number of microseconds since midnight. + * If the type is `TIMESTAMP_NANOS/TIMESTAMP_NANOS_NTZ`, the return value represents the number of + * nanoseconds from the Unix epoch. + * @param value The Variant value + * @return The long value + */ + public static long getLong(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + String exceptionMessage = + "Expect type to be one of: BYTE, SHORT, INT, LONG, TIMESTAMP, TIMESTAMP_NTZ, TIME, TIMESTAMP_NANOS, TIMESTAMP_NANOS_NTZ"; + if (basicType != PRIMITIVE) { + throw new IllegalStateException(exceptionMessage); + } + switch (typeInfo) { + case INT8: + return readLong(value, value.position() + 1, 1); + case INT16: + return readLong(value, value.position() + 1, 2); + case INT32: + case DATE: + return readLong(value, value.position() + 1, 4); + case INT64: + case TIMESTAMP: + case TIMESTAMP_NTZ: + case TIME: + case TIMESTAMP_NANOS: + case TIMESTAMP_NANOS_NTZ: + return readLong(value, value.position() + 1, 8); + default: + throw new IllegalStateException(exceptionMessage); + } + } + + public static double getDouble(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != PRIMITIVE || typeInfo != DOUBLE) { + throw unexpectedType(Type.DOUBLE); + } + return Double.longBitsToDouble(readLong(value, value.position() + 1, 8)); + } + + public static BigDecimal getDecimalWithOriginalScale(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != PRIMITIVE) { + throw unexpectedType(new Type[] {Type.DECIMAL4, Type.DECIMAL8, Type.DECIMAL16}); + } + // Interpret the scale byte as unsigned. If it is a negative byte, the unsigned value must be + // greater than `MAX_DECIMAL16_PRECISION` and will trigger an error in `checkDecimal`. + int scale = value.get(value.position() + 1) & 0xFF; + BigDecimal result; + switch (typeInfo) { + case DECIMAL4: + result = BigDecimal.valueOf(readLong(value, value.position() + 2, 4), scale); + break; + case DECIMAL8: + result = BigDecimal.valueOf(readLong(value, value.position() + 2, 8), scale); + break; + case DECIMAL16: + checkIndex(value.position() + 17, value.limit()); + byte[] bytes = new byte[16]; + // Copy the bytes reversely because the `BigInteger` constructor expects a big-endian + // representation. + for (int i = 0; i < 16; ++i) { + bytes[i] = value.get(value.position() + 17 - i); + } + result = new BigDecimal(new BigInteger(bytes), scale); + break; + default: + throw unexpectedType(new Type[] {Type.DECIMAL4, Type.DECIMAL8, Type.DECIMAL16}); + } + return result; + } + + public static BigDecimal getDecimal(ByteBuffer value) { + return getDecimalWithOriginalScale(value); + } + + public static float getFloat(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != PRIMITIVE || typeInfo != FLOAT) { + throw unexpectedType(Type.FLOAT); + } + return Float.intBitsToFloat((int) readLong(value, value.position() + 1, 4)); + } + + public static byte[] getBinary(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != PRIMITIVE || typeInfo != BINARY) { + throw unexpectedType(Type.BINARY); + } + int start = value.position() + 1 + U32_SIZE; + int length = readUnsigned(value, value.position() + 1, U32_SIZE); + checkIndex(start + length - 1, value.limit()); + byte[] ret = new byte[length]; + slice(value, start).get(ret); + return ret; + } + + public static String getString(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType == SHORT_STR || (basicType == PRIMITIVE && typeInfo == LONG_STR)) { + int start; + int length; + if (basicType == SHORT_STR) { + start = value.position() + 1; + length = typeInfo; + } else { + start = value.position() + 1 + U32_SIZE; + length = readUnsigned(value, value.position() + 1, U32_SIZE); + } + checkIndex(start + length - 1, value.limit()); + if (value.hasArray()) { + // If the buffer is backed by an array, we can use the array directly. + return new String(value.array(), value.arrayOffset() + start, length); + } else { + // If the buffer is not backed by an array, we need to copy the bytes into a new array. + byte[] valueArray = new byte[length]; + slice(value, start).get(valueArray); + return new String(valueArray); + } + } + throw unexpectedType(Type.STRING); + } + + public static java.util.UUID getUUID(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != PRIMITIVE || typeInfo != UUID) { + throw unexpectedType(Type.UUID); + } + int start = value.position() + 1; + checkIndex(start + UUID_SIZE - 1, value.limit()); + ByteBuffer bb = VariantUtil.slice(value, start).order(ByteOrder.BIG_ENDIAN); + return new java.util.UUID(bb.getLong(), bb.getLong()); + } + + /** + * Slices the `value` buffer starting from `start` index. + * @param value The ByteBuffer to slice + * @param start The starting index of the slice + * @return The sliced ByteBuffer + */ + public static ByteBuffer slice(ByteBuffer value, int start) { + int oldPos = value.position(); + value.position(start); + ByteBuffer newSlice = value.slice(); + value.position(oldPos); + return newSlice; + } + + /** + * A helper class representing the details of a Variant object, used for `ObjectHandler`. + */ + public static class ObjectInfo { + /** Number of object fields. */ + public final int numElements; + /** The integer size of the field id list. */ + public final int idSize; + /** The integer size of the offset list. */ + public final int offsetSize; + /** The byte offset (from the beginning of the Variant object) of the field id list. */ + public final int idStartOffset; + /** The byte offset (from the beginning of the Variant object) of the offset list. */ + public final int offsetStartOffset; + /** The byte offset (from the beginning of the Variant object) of the field data. */ + public final int dataStartOffset; + + public ObjectInfo( + int numElements, + int idSize, + int offsetSize, + int idStartOffset, + int offsetStartOffset, + int dataStartOffset) { + this.numElements = numElements; + this.idSize = idSize; + this.offsetSize = offsetSize; + this.idStartOffset = idStartOffset; + this.offsetStartOffset = offsetStartOffset; + this.dataStartOffset = dataStartOffset; + } + } + + /** + * Parses the object at `value[pos...]`, and returns the object details. + */ + public static ObjectInfo getObjectInfo(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != OBJECT) { + throw unexpectedType(Type.OBJECT); + } + // Refer to the comment of the `OBJECT` constant for the details of the object header encoding. + // Suppose `typeInfo` has a bit representation of 0_b4_b3b2_b1b0, the following line extracts + // b4 to determine whether the object uses a 1/4-byte size. + boolean largeSize = ((typeInfo >> 4) & 0x1) != 0; + int sizeBytes = (largeSize ? U32_SIZE : 1); + int numElements = readUnsigned(value, value.position() + 1, sizeBytes); + // Extracts b3b2 to determine the integer size of the field id list. + int idSize = ((typeInfo >> 2) & 0x3) + 1; + // Extracts b1b0 to determine the integer size of the offset list. + int offsetSize = (typeInfo & 0x3) + 1; + int idStartOffset = 1 + sizeBytes; + int offsetStartOffset = idStartOffset + numElements * idSize; + int dataStartOffset = offsetStartOffset + (numElements + 1) * offsetSize; + return new ObjectInfo(numElements, idSize, offsetSize, idStartOffset, offsetStartOffset, dataStartOffset); + } + + /** + * A helper class representing the details of a Variant array, used for `ArrayHandler`. + */ + public static class ArrayInfo { + /** Number of object fields. */ + public final int numElements; + /** The integer size of the offset list. */ + public final int offsetSize; + /** The byte offset (from the beginning of the Variant array) of the offset list. */ + public final int offsetStartOffset; + /** The byte offset (from the beginning of the Variant array) of the field data. */ + public final int dataStartOffset; + + public ArrayInfo(int numElements, int offsetSize, int offsetStartOffset, int dataStartOffset) { + this.numElements = numElements; + this.offsetSize = offsetSize; + this.offsetStartOffset = offsetStartOffset; + this.dataStartOffset = dataStartOffset; + } + } + + /** + * Parses the array at `value[pos...]`, and returns the array details. + */ + public static ArrayInfo getArrayInfo(ByteBuffer value) { + checkIndex(value.position(), value.limit()); + int basicType = value.get(value.position()) & BASIC_TYPE_MASK; + int typeInfo = (value.get(value.position()) >> BASIC_TYPE_BITS) & PRIMITIVE_TYPE_MASK; + if (basicType != ARRAY) { + throw unexpectedType(Type.ARRAY); + } + // Refer to the comment of the `ARRAY` constant for the details of the object header encoding. + // Suppose `typeInfo` has a bit representation of 000_b2_b1b0, the following line extracts + // b2 to determine whether the object uses a 1/4-byte size. + boolean largeSize = ((typeInfo >> 2) & 0x1) != 0; + int sizeBytes = (largeSize ? U32_SIZE : 1); + int numElements = readUnsigned(value, value.position() + 1, sizeBytes); + // Extracts b1b0 to determine the integer size of the offset list. + int offsetSize = (typeInfo & 0x3) + 1; + int offsetStartOffset = 1 + sizeBytes; + int dataStartOffset = offsetStartOffset + (numElements + 1) * offsetSize; + return new ArrayInfo(numElements, offsetSize, offsetStartOffset, dataStartOffset); + } + + /** + * Returns a key at `id` in the Variant metadata. + * + * @param metadata The Variant metadata + * @param id The key id + * @return The key + * @throws MalformedVariantException if the Variant is malformed + * @throws IllegalArgumentException the id is out of bounds + */ + public static String getMetadataKey(ByteBuffer metadata, int id) { + // Extracts the highest 2 bits in the metadata header to determine the integer size of the + // offset list. + int offsetSize = ((metadata.get(metadata.position()) >> 6) & 0x3) + 1; + int dictSize = readUnsigned(metadata, metadata.position() + 1, offsetSize); + if (id >= dictSize) { + throw new IllegalArgumentException( + String.format("Invalid dictionary id: %d. dictionary size: %d", id, dictSize)); + } + // The offset list after the header byte, and a `dictSize` with `offsetSize` bytes. + int offsetListPos = metadata.position() + 1 + offsetSize; + // The data starts after the offset list, and `(dictSize + 1)` offset values. + int dataPos = offsetListPos + (dictSize + 1) * offsetSize; + int offset = readUnsigned(metadata, offsetListPos + (id) * offsetSize, offsetSize); + int nextOffset = readUnsigned(metadata, offsetListPos + (id + 1) * offsetSize, offsetSize); + if (offset > nextOffset) { + throw new MalformedVariantException( + String.format("Invalid offset: %d. next offset: %d", offset, nextOffset)); + } + checkIndex(dataPos + nextOffset - 1, metadata.limit()); + if (metadata.hasArray()) { + return new String(metadata.array(), metadata.arrayOffset() + dataPos + offset, nextOffset - offset); + } else { + // ByteBuffer does not have an array, so we need to use the `get` method to read the bytes. + byte[] metadataArray = new byte[nextOffset - offset]; + slice(metadata, dataPos + offset).get(metadataArray); + return new String(metadataArray); + } + } +} diff --git a/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantEncoding.java b/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantEncoding.java new file mode 100644 index 0000000000..fbeac8e24f --- /dev/null +++ b/parquet-variant/src/test/java/org/apache/parquet/variant/TestVariantEncoding.java @@ -0,0 +1,604 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.variant; + +import com.fasterxml.jackson.core.*; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.SecureRandom; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.Base64; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestVariantEncoding { + private static final Logger LOG = LoggerFactory.getLogger(TestVariantEncoding.class); + private static final String RANDOM_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + private static final List SAMPLE_JSON_VALUES = Arrays.asList( + "null", + "true", + "false", + "12", + "-9876543210", + "4.5678E123", + "8.765E-2", + "\"string value\"", + "-9876.543", + "234.456789", + "{\"a\": 1, \"b\": {\"e\": -4, \"f\": 5.5}, \"c\": true}", + "[1, -2, 4.5, -6.7, \"str\", true]"); + + /** Random number generator for generating random strings */ + private static SecureRandom random = new SecureRandom(); + /** Object mapper for comparing json values */ + private final ObjectMapper mapper = new ObjectMapper(); + + private void checkJson(String expected, String actual) { + try { + StreamReadConstraints.overrideDefaultStreamReadConstraints( + StreamReadConstraints.builder().maxNestingDepth(100000).build()); + Assert.assertEquals(mapper.readTree(expected), mapper.readTree(actual)); + } catch (IOException e) { + Assert.fail("Failed to parse json: " + e); + } + } + + private void checkJson(String jsonValue) { + try { + StreamReadConstraints.overrideDefaultStreamReadConstraints( + StreamReadConstraints.builder().maxNestingDepth(100000).build()); + Variant v = VariantBuilder.parseJson(jsonValue); + checkJson(jsonValue, v.toJson()); + } catch (IOException e) { + Assert.fail("Failed to parse json: " + jsonValue + " " + e); + } + } + + private void checkType(Variant v, int expectedBasicType, VariantUtil.Type expectedType) { + Assert.assertEquals(expectedBasicType, v.value.get(v.value.position()) & VariantUtil.BASIC_TYPE_MASK); + Assert.assertEquals(expectedType, v.getType()); + } + + private long microsSinceEpoch(Instant instant) { + return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + instant.getNano() / 1000; + } + + private long nanosSinceEpoch(Instant instant) { + return TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano(); + } + + private String randomString(int len) { + StringBuilder sb = new StringBuilder(len); + for (int i = 0; i < len; i++) { + sb.append(RANDOM_CHARS.charAt(random.nextInt(RANDOM_CHARS.length()))); + } + return sb.toString(); + } + + private void testVariant(Variant v, Consumer consumer) { + consumer.accept(v); + // Create new Variant with different byte offsets + byte[] newValue = new byte[v.value.capacity() + 50]; + byte[] newMetadata = new byte[v.metadata.capacity() + 50]; + Arrays.fill(newValue, (byte) 0xFF); + Arrays.fill(newMetadata, (byte) 0xFF); + v.value.position(0); + v.value.get(newValue, 25, v.value.capacity()); + v.value.position(0); + v.metadata.position(0); + v.metadata.get(newMetadata, 25, v.metadata.capacity()); + v.metadata.position(0); + Variant v2 = new Variant( + ByteBuffer.wrap(newValue, 25, v.value.capacity()), + ByteBuffer.wrap(newMetadata, 25, v.metadata.capacity())); + consumer.accept(v2); + } + + @Test + public void testNullJson() { + checkJson("null"); + } + + @Test + public void testBooleanJson() { + Arrays.asList("true", "false").forEach(this::checkJson); + } + + @Test + public void testIntegerJson() { + Arrays.asList( + "0", + Byte.toString(Byte.MIN_VALUE), + Byte.toString(Byte.MAX_VALUE), + Short.toString(Short.MIN_VALUE), + Short.toString(Short.MAX_VALUE), + Integer.toString(Integer.MIN_VALUE), + Integer.toString(Integer.MAX_VALUE), + Long.toString(Long.MIN_VALUE), + Long.toString(Long.MAX_VALUE)) + .forEach(this::checkJson); + } + + @Test + public void testFloatJson() { + Arrays.asList( + Float.toString(Float.MIN_VALUE), Float.toString(Float.MAX_VALUE), + Double.toString(Double.MIN_VALUE), Double.toString(Double.MAX_VALUE)) + .forEach(this::checkJson); + } + + @Test + public void testStringJson() { + Arrays.asList("\"short string\"", "\"long string: " + new String(new char[1000]).replace("\0", "x") + "\"") + .forEach(this::checkJson); + } + + @Test + public void testDecimalJson() { + Arrays.asList( + "12.34", "-43.21", + "10.2147483647", "-1021474836.47", + "109223372036854775.807", "-109.223372036854775807") + .forEach(this::checkJson); + } + + @Test + public void testNullBuilder() { + VariantBuilder vb = new VariantBuilder(false); + vb.appendNull(); + testVariant(vb.result(), v -> checkType(v, VariantUtil.NULL, VariantUtil.Type.NULL)); + } + + @Test + public void testBooleanBuilder() { + Arrays.asList(true, false).forEach(b -> { + VariantBuilder vb = new VariantBuilder(false); + vb.appendBoolean(b); + testVariant(vb.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.BOOLEAN); + Assert.assertEquals(b, v.getBoolean()); + }); + }); + } + + @Test + public void testIntegerBuilder() { + Arrays.asList( + 0L, + (long) Byte.MIN_VALUE, + (long) Byte.MAX_VALUE, + (long) Short.MIN_VALUE, + (long) Short.MAX_VALUE, + (long) Integer.MIN_VALUE, + (long) Integer.MAX_VALUE, + Long.MIN_VALUE, + Long.MAX_VALUE) + .forEach(l -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendLong(l); + testVariant(vb2.result(), v -> { + if (Byte.MIN_VALUE <= l && l <= Byte.MAX_VALUE) { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.BYTE); + } else if (Short.MIN_VALUE <= l && l <= Short.MAX_VALUE) { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.SHORT); + } else if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.INT); + } else { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.LONG); + } + Assert.assertEquals((long) l, v.getLong()); + }); + }); + + Arrays.asList( + 0, + (int) Byte.MIN_VALUE, + (int) Byte.MAX_VALUE, + (int) Short.MIN_VALUE, + (int) Short.MAX_VALUE, + Integer.MIN_VALUE, + Integer.MAX_VALUE) + .forEach(i -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendLong((long) i); + testVariant(vb2.result(), v -> { + if (Byte.MIN_VALUE <= i && i <= Byte.MAX_VALUE) { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.BYTE); + } else if (Short.MIN_VALUE <= i && i <= Short.MAX_VALUE) { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.SHORT); + } else { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.INT); + } + Assert.assertEquals((int) i, v.getInt()); + }); + }); + + Arrays.asList((short) 0, (short) Byte.MIN_VALUE, (short) Byte.MAX_VALUE, Short.MIN_VALUE, Short.MAX_VALUE) + .forEach(s -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendLong(s); + testVariant(vb2.result(), v -> { + if (Byte.MIN_VALUE <= s && s <= Byte.MAX_VALUE) { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.BYTE); + } else { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.SHORT); + } + Assert.assertEquals((short) s, v.getShort()); + }); + }); + + Arrays.asList((byte) 0, Byte.MIN_VALUE, Byte.MAX_VALUE).forEach(b -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendLong(b); + testVariant(vb2.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.BYTE); + Assert.assertEquals((byte) b, v.getByte()); + }); + }); + } + + @Test + public void testFloatBuilder() { + Arrays.asList(Float.MIN_VALUE, 0f, Float.MAX_VALUE).forEach(f -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendFloat(f); + testVariant(vb2.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.FLOAT); + Assert.assertEquals(f, v.getFloat(), 0.000001); + }); + }); + } + + @Test + public void testDoubleBuilder() { + Arrays.asList(Double.MIN_VALUE, 0d, Double.MAX_VALUE).forEach(d -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendDouble(d); + testVariant(vb2.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.DOUBLE); + Assert.assertEquals(d, v.getDouble(), 0.000001); + }); + }); + } + + @Test + public void testStringBuilder() { + IntStream.range(VariantUtil.MAX_SHORT_STR_SIZE - 3, VariantUtil.MAX_SHORT_STR_SIZE + 3) + .forEach(len -> { + VariantBuilder vb2 = new VariantBuilder(false); + String s = randomString(len); + vb2.appendString(s); + testVariant(vb2.result(), v -> { + if (len <= VariantUtil.MAX_SHORT_STR_SIZE) { + checkType(v, VariantUtil.SHORT_STR, VariantUtil.Type.STRING); + } else { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.STRING); + } + Assert.assertEquals(s, v.getString()); + }); + }); + } + + @Test + public void testDecimalBuilder() { + // decimal4 + Arrays.asList(new BigDecimal("123.456"), new BigDecimal("-987.654")).forEach(d -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendDecimal(d); + testVariant(vb2.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.DECIMAL4); + Assert.assertEquals(d, v.getDecimal()); + }); + }); + + // decimal8 + Arrays.asList(new BigDecimal("10.2147483647"), new BigDecimal("-1021474836.47")) + .forEach(d -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendDecimal(d); + testVariant(vb2.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.DECIMAL8); + Assert.assertEquals(d, v.getDecimal()); + }); + }); + + // decimal16 + Arrays.asList(new BigDecimal("109223372036854775.807"), new BigDecimal("-109.223372036854775807")) + .forEach(d -> { + VariantBuilder vb2 = new VariantBuilder(false); + vb2.appendDecimal(d); + testVariant(vb2.result(), v -> { + checkType(v, VariantUtil.PRIMITIVE, VariantUtil.Type.DECIMAL16); + Assert.assertEquals(d, v.getDecimal()); + }); + }); + } + + @Test + public void testVariantBuilder() throws IOException { + Variant subV = + VariantBuilder.parseJson("{\"a\": 1.1, \"b\": {\"d\": [[1], \"foo\"]}, \"c\": [true, {\"a\": 2}]}"); + testVariant(subV, v -> { + VariantBuilder vb = new VariantBuilder(false); + vb.appendVariant(v); + testVariant(vb.result(), v2 -> { + checkType(v2, VariantUtil.OBJECT, VariantUtil.Type.OBJECT); + checkJson(v.toJson(), v2.toJson()); + }); + }); + } + + @Test + public void testDate() { + VariantBuilder vb = new VariantBuilder(false); + int days = Math.toIntExact(LocalDate.of(2024, 12, 16).toEpochDay()); + vb.appendDate(days); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"2024-12-16\"", v.toJson()); + Assert.assertEquals(days, v.getInt()); + }); + } + + @Test + public void testTimestamp() { + DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME; + VariantBuilder vb = new VariantBuilder(false); + long micros = microsSinceEpoch(Instant.from(dtf.parse("2024-12-16T10:23:45.321456-08:00"))); + vb.appendTimestamp(micros); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"2024-12-16T18:23:45.321456+00:00\"", v.toJson()); + Assert.assertEquals(micros, v.getLong()); + }); + } + + @Test + public void testTimestampNtz() { + DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME; + VariantBuilder vb = new VariantBuilder(false); + long micros = microsSinceEpoch(Instant.from(dtf.parse("2024-01-01T23:00:00.000001Z"))); + vb.appendTimestampNtz(micros); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"2024-01-01T23:00:00.000001\"", v.toJson()); + Assert.assertEquals(micros, v.getLong()); + }); + } + + @Test + public void testTime() { + for (String timeStr : Arrays.asList( + "00:00:00.000000", "00:00:00.000120", "12:00:00.000000", "12:00:00.002300", "23:59:59.999999")) { + VariantBuilder vb = new VariantBuilder(false); + long micros = LocalTime.parse(timeStr).toNanoOfDay() / 1_000; + vb.appendTime(micros); + testVariant(vb.result(), v -> { + Assert.assertEquals(String.format("\"%s\"", timeStr), v.toJson()); + Assert.assertEquals(micros, v.getLong()); + }); + } + } + + @Test + public void testTimestampNanos() { + DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME; + VariantBuilder vb = new VariantBuilder(false); + long nanos = nanosSinceEpoch(Instant.from(dtf.parse("2024-12-16T10:23:45.321456987-08:00"))); + vb.appendTimestampNanos(nanos); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"2024-12-16T18:23:45.321456987+00:00\"", v.toJson()); + Assert.assertEquals(nanos, v.getLong()); + }); + } + + @Test + public void testTimestampNanosNtz() { + DateTimeFormatter dtf = DateTimeFormatter.ISO_DATE_TIME; + VariantBuilder vb = new VariantBuilder(false); + long nanos = nanosSinceEpoch(Instant.from(dtf.parse("2024-01-01T23:00:00.839280983Z"))); + vb.appendTimestampNanosNtz(nanos); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"2024-01-01T23:00:00.839280983\"", v.toJson()); + Assert.assertEquals(nanos, v.getLong()); + }); + } + + @Test + public void testBinary() { + VariantBuilder vb = new VariantBuilder(false); + byte[] binary = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + vb.appendBinary(binary); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"" + Base64.getEncoder().encodeToString(binary) + "\"", v.toJson()); + Assert.assertArrayEquals(binary, v.getBinary()); + }); + } + + @Test + public void testUUID() { + VariantBuilder vb = new VariantBuilder(false); + byte[] uuid = new byte[] {0, 17, 34, 51, 68, 85, 102, 119, -120, -103, -86, -69, -52, -35, -18, -1}; + long msb = ByteBuffer.wrap(uuid, 0, 8).order(ByteOrder.BIG_ENDIAN).getLong(); + long lsb = ByteBuffer.wrap(uuid, 8, 8).order(ByteOrder.BIG_ENDIAN).getLong(); + UUID expected = new UUID(msb, lsb); + + vb.appendUUID(expected); + testVariant(vb.result(), v -> { + Assert.assertEquals("\"00112233-4455-6677-8899-aabbccddeeff\"", v.toJson()); + Assert.assertEquals(expected, v.getUUID()); + }); + } + + @Test + public void testObject() { + // simple object + StringBuilder sb = new StringBuilder(); + sb.append("{"); + for (int i = 0; i < SAMPLE_JSON_VALUES.size(); i++) { + if (i > 0) sb.append(", "); + sb.append("\"field" + i + "\": ").append(SAMPLE_JSON_VALUES.get(i)); + } + sb.append("}"); + checkJson(sb.toString()); + + // wide object + sb = new StringBuilder(); + sb.append("{"); + for (int i = 0; i < 50000; i++) { + if (i > 0) sb.append(", "); + sb.append("\"field" + i + "\": ").append(SAMPLE_JSON_VALUES.get(i % SAMPLE_JSON_VALUES.size())); + } + sb.append("}"); + checkJson(sb.toString()); + + // deep object + sb = new StringBuilder(); + // Jackson object mapper hit a stack overflow if json is too deep + for (int i = 0; i < 500; i++) { + sb.append("{").append("\"field" + i + "\": "); + } + sb.append("{"); + for (int i = 0; i < SAMPLE_JSON_VALUES.size(); i++) { + if (i > 0) sb.append(", "); + sb.append("\"field" + i + "\": ").append(SAMPLE_JSON_VALUES.get(i)); + } + sb.append("}"); + for (int i = 0; i < 500; i++) { + sb.append("}"); + } + checkJson(sb.toString()); + } + + @Test + public void testGetObjectFields() throws IOException { + // Create small object for linear search + StringBuilder sb = new StringBuilder(); + sb.append("{"); + for (int i = 0; i < Variant.BINARY_SEARCH_THRESHOLD / 2; i++) { + if (i > 0) sb.append(", "); + sb.append("\"field" + i + "\": ").append(i); + } + sb.append("}"); + testVariant(VariantBuilder.parseJson(sb.toString()), v -> { + Assert.assertEquals(Variant.BINARY_SEARCH_THRESHOLD / 2, v.numObjectElements()); + for (int i = 0; i < Variant.BINARY_SEARCH_THRESHOLD / 2; i++) { + String actual = v.getFieldByKey("field" + i).toJson(); + Assert.assertEquals(String.valueOf(i), actual); + // check by index + Variant.ObjectField field = v.getFieldAtIndex(i); + Assert.assertTrue(field.key.startsWith("field")); + Assert.assertEquals(field.key.substring("field".length()), field.value.toJson()); + } + }); + + // Create larger object for binary search + sb = new StringBuilder(); + sb.append("{"); + for (int i = 0; i < 2 * Variant.BINARY_SEARCH_THRESHOLD; i++) { + if (i > 0) sb.append(", "); + sb.append("\"field" + i + "\": ").append(i); + } + sb.append("}"); + testVariant(VariantBuilder.parseJson(sb.toString()), v -> { + Assert.assertEquals(2 * Variant.BINARY_SEARCH_THRESHOLD, v.numObjectElements()); + for (int i = 0; i < 2 * Variant.BINARY_SEARCH_THRESHOLD; i++) { + String actual = v.getFieldByKey("field" + i).toJson(); + Assert.assertEquals(String.valueOf(i), actual); + // check by index + Variant.ObjectField field = v.getFieldAtIndex(i); + Assert.assertTrue(field.key.startsWith("field")); + Assert.assertEquals(field.key.substring("field".length()), field.value.toJson()); + } + }); + } + + @Test + public void testArray() throws IOException { + // simple array + StringBuilder sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < SAMPLE_JSON_VALUES.size(); i++) { + if (i > 0) sb.append(", "); + sb.append(SAMPLE_JSON_VALUES.get(i)); + } + sb.append("]"); + checkJson(sb.toString()); + // Check array elements + testVariant(VariantBuilder.parseJson(sb.toString()), v -> { + Assert.assertEquals(SAMPLE_JSON_VALUES.size(), v.numArrayElements()); + for (int i = 0; i < SAMPLE_JSON_VALUES.size(); i++) { + String actual = v.getElementAtIndex(i).toJson(); + checkJson(SAMPLE_JSON_VALUES.get(i), actual); + } + }); + + // large array + sb = new StringBuilder(); + sb.append("["); + for (int i = 0; i < 50000; i++) { + if (i > 0) sb.append(", "); + sb.append(SAMPLE_JSON_VALUES.get(i % SAMPLE_JSON_VALUES.size())); + } + sb.append("]"); + checkJson(sb.toString()); + // Check array elements + testVariant(VariantBuilder.parseJson(sb.toString()), v -> { + Assert.assertEquals(50000, v.numArrayElements()); + for (int i = 0; i < 50000; i++) { + String actual = v.getElementAtIndex(i).toJson(); + checkJson(SAMPLE_JSON_VALUES.get(i % SAMPLE_JSON_VALUES.size()), actual); + } + }); + } + + @Test + public void testAllowDuplicateKeys() { + // disallow duplicate keys + try { + VariantBuilder.parseJson("{\"a\": 1, \"a\": 2}"); + Assert.fail("Expected VariantDuplicateKeyException with duplicate keys"); + } catch (IOException e) { + Assert.fail("Expected VariantDuplicateKeyException with duplicate keys"); + } catch (VariantDuplicateKeyException e) { + // Expected + } + + // allow duplicate keys + try { + Variant v = VariantBuilder.parseJson("{\"a\": 1, \"a\": 2}", new VariantBuilder(true)); + Assert.assertEquals(1, v.numObjectElements()); + Assert.assertEquals(VariantUtil.Type.BYTE, v.getFieldByKey("a").getType()); + Assert.assertEquals(2, v.getFieldByKey("a").getLong()); + } catch (Exception e) { + Assert.fail("Unexpected exception: " + e); + } + } +} diff --git a/pom.xml b/pom.xml index 2496171867..5f49bf1764 100644 --- a/pom.xml +++ b/pom.xml @@ -165,6 +165,7 @@ parquet-protobuf parquet-thrift parquet-hadoop-bundle + parquet-variant