From 9f014a72f3465d85eefaebe0d933b56904164d34 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma Date: Thu, 9 Jan 2025 16:02:52 +0100 Subject: [PATCH 1/5] fix INT96 read issue --- .../avro/AvroCompatRecordMaterializer.java | 6 +- .../avro/AvroIndexedRecordConverter.java | 132 +++++++++----- .../apache/parquet/avro/AvroReadSupport.java | 8 +- .../parquet/avro/AvroRecordConverter.java | 162 ++++++++++++------ .../parquet/avro/AvroRecordMaterializer.java | 6 +- .../parquet/avro/AvroSchemaConverter.java | 11 +- .../parquet/avro/TestArrayCompatibility.java | 39 ++++- .../hadoop/DeprecatedInputFormatTest.java | 2 + .../hadoop/DeprecatedOutputFormatTest.java | 2 + .../hadoop/TestDirectCodecFactory.java | 2 + 10 files changed, 264 insertions(+), 106 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java index 9b2cbf7186..365f5d64bb 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java @@ -21,6 +21,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -29,8 +30,9 @@ class AvroCompatRecordMaterializer extends RecordMateri private AvroIndexedRecordConverter root; - public AvroCompatRecordMaterializer(MessageType requestedSchema, Schema avroSchema, GenericData baseModel) { - this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema, baseModel); + public AvroCompatRecordMaterializer( + MessageType requestedSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) { + this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema, baseModel, conf); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java index ff77d44408..35961b825c 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java @@ -29,6 +29,7 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import org.apache.parquet.Preconditions; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; @@ -57,20 +58,26 @@ class AvroIndexedRecordConverter extends GroupConverter private final GenericData model; private final Map recordDefaults = new HashMap(); - public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) { - this(parquetSchema, avroSchema, SpecificData.get()); + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, ParquetConfiguration conf) { + this(parquetSchema, avroSchema, SpecificData.get(), conf); } - public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) { - this(null, parquetSchema, avroSchema, baseModel); + public AvroIndexedRecordConverter( + MessageType parquetSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) { + this(null, parquetSchema, avroSchema, baseModel, conf); } - public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema) { - this(parent, parquetSchema, avroSchema, SpecificData.get()); + public AvroIndexedRecordConverter( + ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, ParquetConfiguration conf) { + this(parent, parquetSchema, avroSchema, SpecificData.get(), conf); } public AvroIndexedRecordConverter( - ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, GenericData baseModel) { + ParentValueContainer parent, + GroupType parquetSchema, + Schema avroSchema, + GenericData baseModel, + ParquetConfiguration conf) { this.parent = parent; this.avroSchema = avroSchema; int schemaSize = parquetSchema.getFieldCount(); @@ -89,13 +96,17 @@ public AvroIndexedRecordConverter( Schema.Field avroField = getAvroField(parquetField.getName()); Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema()); final int finalAvroIndex = avroFieldIndexes.remove(avroField.name()); - converters[parquetFieldIndex++] = - newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() { + converters[parquetFieldIndex++] = newConverter( + nonNullSchema, + parquetField, + model, + new ParentValueContainer() { @Override public void add(Object value) { AvroIndexedRecordConverter.this.set(finalAvroIndex, value); } - }); + }, + conf); } // store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema) for (String fieldName : avroFieldIndexes.keySet()) { @@ -137,7 +148,8 @@ private Schema.Field getAvroField(String parquetFieldName) { return avroField; } - private static Converter newConverter(Schema schema, Type type, GenericData model, ParentValueContainer setter) { + private static Converter newConverter( + Schema schema, Type type, GenericData model, ParentValueContainer setter, ParquetConfiguration conf) { LogicalType logicalType = schema.getLogicalType(); // the expected type is always null because it is determined by the parent @@ -148,7 +160,7 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode switch (schema.getType()) { case ARRAY: - return new AvroArrayConverter(parent, type.asGroupType(), schema, model); + return new AvroArrayConverter(parent, type.asGroupType(), schema, model, conf); case BOOLEAN: return new AvroConverters.FieldBooleanConverter(parent); case BYTES: @@ -166,13 +178,13 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode case LONG: return new AvroConverters.FieldLongConverter(parent); case MAP: - return new MapConverter(parent, type.asGroupType(), schema, model); + return new MapConverter(parent, type.asGroupType(), schema, model, conf); case RECORD: - return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model); + return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model, conf); case STRING: return new AvroConverters.FieldStringConverter(parent); case UNION: - return new AvroUnionConverter(parent, type, schema, model); + return new AvroUnionConverter(parent, type, schema, model, conf); case NULL: // fall through default: throw new UnsupportedOperationException( @@ -317,25 +329,35 @@ static final class AvroArrayConverter extends GroupConverter { private final Converter converter; private GenericArray array; - public AvroArrayConverter(ParentValueContainer parent, GroupType type, Schema avroSchema, GenericData model) { + public AvroArrayConverter( + ParentValueContainer parent, + GroupType type, + Schema avroSchema, + GenericData model, + ParquetConfiguration conf) { this.parent = parent; this.avroSchema = avroSchema; Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType()); Type repeatedType = type.getType(0); // always determine whether the repeated type is the element type by // matching it against the element schema. - if (AvroRecordConverter.isElementType(repeatedType, elementSchema)) { + if (AvroRecordConverter.isElementType(repeatedType, elementSchema, conf)) { // the element type is the repeated type (and required) - converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - array.add(value); - } - }); + converter = newConverter( + elementSchema, + repeatedType, + model, + new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + array.add(value); + } + }, + conf); } else { // the element is wrapped in a synthetic group and may be optional - converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model); + converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model, conf); } } @@ -369,16 +391,21 @@ final class ElementConverter extends GroupConverter { private Object element; private final Converter elementConverter; - public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) { + public ElementConverter( + GroupType repeatedType, Schema elementSchema, GenericData model, ParquetConfiguration conf) { Type elementType = repeatedType.getType(0); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); - this.elementConverter = - newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() { + this.elementConverter = newConverter( + nonNullElementSchema, + elementType, + model, + new ParentValueContainer() { @Override public void add(Object value) { ElementConverter.this.element = value; } - }); + }, + conf); } @Override @@ -406,7 +433,11 @@ static final class AvroUnionConverter extends GroupConverter { private Object memberValue = null; public AvroUnionConverter( - ParentValueContainer parent, Type parquetSchema, Schema avroSchema, GenericData model) { + ParentValueContainer parent, + Type parquetSchema, + Schema avroSchema, + GenericData model, + ParquetConfiguration conf) { this.parent = parent; GroupType parquetGroup = parquetSchema.asGroupType(); this.memberConverters = new Converter[parquetGroup.getFieldCount()]; @@ -416,15 +447,19 @@ public AvroUnionConverter( Schema memberSchema = avroSchema.getTypes().get(index); if (!memberSchema.getType().equals(Schema.Type.NULL)) { Type memberType = parquetGroup.getType(parquetIndex); - memberConverters[parquetIndex] = - newConverter(memberSchema, memberType, model, new ParentValueContainer() { + memberConverters[parquetIndex] = newConverter( + memberSchema, + memberType, + model, + new ParentValueContainer() { @Override public void add(Object value) { Preconditions.checkArgument( memberValue == null, "Union is resolving to more than one type"); memberValue = value; } - }); + }, + conf); parquetIndex++; // Note for nulls the parquetIndex id not increased } } @@ -452,10 +487,15 @@ static final class MapConverter extends GroupConverter { private final Converter keyValueConverter; private Map map; - public MapConverter(ParentValueContainer parent, GroupType mapType, Schema mapSchema, GenericData model) { + public MapConverter( + ParentValueContainer parent, + GroupType mapType, + Schema mapSchema, + GenericData model, + ParquetConfiguration conf) { this.parent = parent; GroupType repeatedKeyValueType = mapType.getType(0).asGroupType(); - this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model); + this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model, conf); } @Override @@ -480,7 +520,8 @@ final class MapKeyValueConverter extends GroupConverter { private final Converter keyConverter; private final Converter valueConverter; - public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, GenericData model) { + public MapKeyValueConverter( + GroupType keyValueType, Schema mapSchema, GenericData model, ParquetConfiguration conf) { keyConverter = new PrimitiveConverter() { @Override public final void addBinary(Binary value) { @@ -490,13 +531,18 @@ public final void addBinary(Binary value) { Type valueType = keyValueType.getType(1); Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); - valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - MapKeyValueConverter.this.value = (V) value; - } - }); + valueConverter = newConverter( + nonNullValueSchema, + valueType, + model, + new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + MapKeyValueConverter.this.value = (V) value; + } + }, + conf); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index 58a28bfeba..debc4878ee 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -156,15 +156,15 @@ public RecordMaterializer prepareForRead( GenericData model = getDataModel(configuration, avroSchema); String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY); if (Boolean.parseBoolean(compatEnabled)) { - return newCompatMaterializer(parquetSchema, avroSchema, model); + return newCompatMaterializer(parquetSchema, avroSchema, model, configuration); } - return new AvroRecordMaterializer(parquetSchema, avroSchema, model); + return new AvroRecordMaterializer(parquetSchema, avroSchema, model, configuration); } @SuppressWarnings("unchecked") private static RecordMaterializer newCompatMaterializer( - MessageType parquetSchema, Schema avroSchema, GenericData model) { - return (RecordMaterializer) new AvroCompatRecordMaterializer(parquetSchema, avroSchema, model); + MessageType parquetSchema, Schema avroSchema, GenericData model, ParquetConfiguration conf) { + return (RecordMaterializer) new AvroCompatRecordMaterializer(parquetSchema, avroSchema, model, conf); } private GenericData getDataModel(ParquetConfiguration conf, Schema schema) { diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 441428bfa7..4aee109aee 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -58,6 +58,7 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.avro.AvroConverters.FieldStringConverter; import org.apache.parquet.avro.AvroConverters.FieldStringableConverter; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -91,9 +92,12 @@ class AvroRecordConverter extends AvroConverters.AvroGroupConverter { private final GenericData model; private final Map recordDefaults = new HashMap(); + private final AvroSchemaConverter avroSchemaConverter; + private final ParquetConfiguration conf; - public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) { - this(null, parquetSchema, avroSchema, baseModel); + public AvroRecordConverter( + MessageType parquetSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) { + this(null, parquetSchema, avroSchema, baseModel, conf); LogicalType logicalType = avroSchema.getLogicalType(); Conversion conversion = baseModel.getConversionFor(logicalType); this.rootContainer = ParentValueContainer.getConversionContainer( @@ -109,11 +113,17 @@ public void add(Object value) { } public AvroRecordConverter( - ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, GenericData model) { + ParentValueContainer parent, + GroupType parquetSchema, + Schema avroSchema, + GenericData model, + ParquetConfiguration conf) { super(parent); this.avroSchema = avroSchema; this.model = (model == null ? ReflectData.get() : model); this.converters = new Converter[parquetSchema.getFieldCount()]; + this.conf = conf; + this.avroSchemaConverter = new AvroSchemaConverter(true, conf); Map avroFieldIndexes = new HashMap(); int avroFieldIndex = 0; @@ -142,7 +152,7 @@ public void add(Object value) { Class fieldClass = fields.get(avroField.name()); converters[parquetFieldIndex] = - newConverter(nonNullSchema, parquetField, this.model, fieldClass, container); + newConverter(nonNullSchema, parquetField, this.model, fieldClass, container, conf); // @Stringable doesn't affect the reflected schema; must be enforced here if (recordClass != null && converters[parquetFieldIndex] instanceof FieldStringConverter) { @@ -326,12 +336,18 @@ private Schema.Field getAvroField(String parquetFieldName) { String.format("Parquet/Avro schema mismatch: Avro field '%s' not found", parquetFieldName)); } - private static Converter newConverter(Schema schema, Type type, GenericData model, ParentValueContainer setter) { - return newConverter(schema, type, model, null, setter); + private static Converter newConverter( + Schema schema, Type type, GenericData model, ParentValueContainer setter, ParquetConfiguration conf) { + return newConverter(schema, type, model, null, setter, conf); } private static Converter newConverter( - Schema schema, Type type, GenericData model, Class knownClass, ParentValueContainer setter) { + Schema schema, + Type type, + GenericData model, + Class knownClass, + ParentValueContainer setter, + ParquetConfiguration conf) { LogicalType logicalType = schema.getLogicalType(); Conversion conversion; @@ -383,19 +399,19 @@ private static Converter newConverter( } return newStringConverter(schema, model, parent); case RECORD: - return new AvroRecordConverter(parent, type.asGroupType(), schema, model); + return new AvroRecordConverter(parent, type.asGroupType(), schema, model, conf); case ENUM: return new AvroConverters.FieldEnumConverter(parent, schema, model); case ARRAY: Class arrayDatumClass = getDatumClass(conversion, knownClass, schema, model); if (arrayDatumClass != null && arrayDatumClass.isArray()) { - return new AvroArrayConverter(parent, type.asGroupType(), schema, model, arrayDatumClass); + return new AvroArrayConverter(parent, type.asGroupType(), schema, model, arrayDatumClass, conf); } - return new AvroCollectionConverter(parent, type.asGroupType(), schema, model, arrayDatumClass); + return new AvroCollectionConverter(parent, type.asGroupType(), schema, model, arrayDatumClass, conf); case MAP: - return new MapConverter(parent, type.asGroupType(), schema, model); + return new MapConverter(parent, type.asGroupType(), schema, model, conf); case UNION: - return new AvroUnionConverter(parent, type, schema, model); + return new AvroUnionConverter(parent, type, schema, model, conf); case FIXED: return new AvroConverters.FieldFixedConverter(parent, schema, model); default: @@ -557,6 +573,7 @@ static final class AvroCollectionConverter extends GroupConverter { private final ParentValueContainer parent; private final Schema avroSchema; private final Converter converter; + private final ParquetConfiguration conf; private Class containerClass; private Collection container; @@ -565,26 +582,33 @@ public AvroCollectionConverter( GroupType type, Schema avroSchema, GenericData model, - Class containerClass) { + Class containerClass, + ParquetConfiguration conf) { this.parent = parent; this.avroSchema = avroSchema; this.containerClass = containerClass; + this.conf = conf; Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType()); Type repeatedType = type.getType(0); // always determine whether the repeated type is the element type by // matching it against the element schema. - if (isElementType(repeatedType, elementSchema)) { + if (isElementType(repeatedType, elementSchema, conf)) { // the element type is the repeated type (and required) - converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - container.add(value); - } - }); + converter = newConverter( + elementSchema, + repeatedType, + model, + new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + container.add(value); + } + }, + conf); } else { // the element is wrapped in a synthetic group and may be optional - converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model); + converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model, conf); } } @@ -631,17 +655,22 @@ final class ElementConverter extends GroupConverter { private Object element; private final Converter elementConverter; - public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) { + public ElementConverter( + GroupType repeatedType, Schema elementSchema, GenericData model, ParquetConfiguration conf) { Type elementType = repeatedType.getType(0); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); - this.elementConverter = - newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() { + this.elementConverter = newConverter( + nonNullElementSchema, + elementType, + model, + new ParentValueContainer() { @Override @SuppressWarnings("unchecked") public void add(Object value) { ElementConverter.this.element = value; } - }); + }, + conf); } @Override @@ -680,6 +709,7 @@ static final class AvroArrayConverter extends GroupConverter { private final ParentValueContainer parent; private final Schema avroSchema; private final Converter converter; + private final ParquetConfiguration conf; private Class elementClass; private Collection container; @@ -688,9 +718,11 @@ public AvroArrayConverter( GroupType type, Schema avroSchema, GenericData model, - Class arrayClass) { + Class arrayClass, + ParquetConfiguration conf) { this.parent = parent; this.avroSchema = avroSchema; + this.conf = conf; Preconditions.checkArgument(arrayClass.isArray(), "Cannot convert non-array: %s", arrayClass.getName()); this.elementClass = arrayClass.getComponentType(); @@ -701,12 +733,12 @@ public AvroArrayConverter( // always determine whether the repeated type is the element type by // matching it against the element schema. - if (isElementType(repeatedType, elementSchema)) { + if (isElementType(repeatedType, elementSchema, conf)) { // the element type is the repeated type (and required) - converter = newConverter(elementSchema, repeatedType, model, elementClass, setter); + converter = newConverter(elementSchema, repeatedType, model, elementClass, setter, conf); } else { // the element is wrapped in a synthetic group and may be optional - converter = new ArrayElementConverter(repeatedType.asGroupType(), elementSchema, model, setter); + converter = new ArrayElementConverter(repeatedType.asGroupType(), elementSchema, model, setter, conf); } } @@ -845,19 +877,26 @@ public void add(Object value) { final class ArrayElementConverter extends GroupConverter { private boolean isSet; private final Converter elementConverter; + private final ParquetConfiguration conf; public ArrayElementConverter( GroupType repeatedType, Schema elementSchema, GenericData model, - final ParentValueContainer setter) { + final ParentValueContainer setter, + ParquetConfiguration conf) { + this.conf = conf; Type elementType = repeatedType.getType(0); Preconditions.checkArgument( !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED), "Cannot convert list of optional elements to primitive array"); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); this.elementConverter = newConverter( - nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() { + nonNullElementSchema, + elementType, + model, + elementClass, + new ParentValueContainer() { @Override public void add(Object value) { isSet = true; @@ -911,7 +950,8 @@ public void addDouble(double value) { isSet = true; setter.addDouble(value); } - }); + }, + conf); } @Override @@ -954,7 +994,7 @@ public void end() { * @param elementSchema the expected Schema for list elements * @return {@code true} if the repeatedType is the element schema */ - static boolean isElementType(Type repeatedType, Schema elementSchema) { + static boolean isElementType(Type repeatedType, Schema elementSchema, ParquetConfiguration conf) { if (repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 || repeatedType.getName().equals("array") @@ -963,7 +1003,8 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) { // synthetic wrapper. Must be a group with one optional or required field return true; } else if (elementSchema != null && elementSchema.getType() == Schema.Type.RECORD) { - Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType()); + AvroSchemaConverter converter = new AvroSchemaConverter(true, conf); + Schema schemaFromRepeated = converter.convert(repeatedType.asGroupType()); if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated) .getType() == COMPATIBLE) { @@ -975,21 +1016,30 @@ static boolean isElementType(Type repeatedType, Schema elementSchema) { static final class AvroUnionConverter extends AvroConverters.AvroGroupConverter { private final Converter[] memberConverters; + private final ParquetConfiguration conf; private Object memberValue = null; public AvroUnionConverter( - ParentValueContainer parent, Type parquetSchema, Schema avroSchema, GenericData model) { + ParentValueContainer parent, + Type parquetSchema, + Schema avroSchema, + GenericData model, + ParquetConfiguration conf) { super(parent); GroupType parquetGroup = parquetSchema.asGroupType(); this.memberConverters = new Converter[parquetGroup.getFieldCount()]; + this.conf = conf; int parquetIndex = 0; for (int index = 0; index < avroSchema.getTypes().size(); index++) { Schema memberSchema = avroSchema.getTypes().get(index); if (!memberSchema.getType().equals(Schema.Type.NULL)) { Type memberType = parquetGroup.getType(parquetIndex); - memberConverters[parquetIndex] = - newConverter(memberSchema, memberType, model, new ParentValueContainer() { + memberConverters[parquetIndex] = newConverter( + memberSchema, + memberType, + model, + new ParentValueContainer() { @Override public void add(Object value) { Preconditions.checkArgument( @@ -997,7 +1047,8 @@ public void add(Object value) { "Union is resolving to more than one type"); memberValue = value; } - }); + }, + conf); parquetIndex++; // Note for nulls the parquetIndex id not increased } } @@ -1027,10 +1078,15 @@ static final class MapConverter extends GroupConverter { private final Class mapClass; private Map map; - public MapConverter(ParentValueContainer parent, GroupType mapType, Schema mapSchema, GenericData model) { + public MapConverter( + ParentValueContainer parent, + GroupType mapType, + Schema mapSchema, + GenericData model, + ParquetConfiguration conf) { this.parent = parent; GroupType repeatedKeyValueType = mapType.getType(0).asGroupType(); - this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model); + this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model, conf); this.schema = mapSchema; this.mapClass = getDatumClass(mapSchema, model); } @@ -1065,8 +1121,11 @@ final class MapKeyValueConverter extends GroupConverter { private V value; private final Converter keyConverter; private final Converter valueConverter; + private final ParquetConfiguration conf; - public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, GenericData model) { + public MapKeyValueConverter( + GroupType keyValueType, Schema mapSchema, GenericData model, ParquetConfiguration conf) { + this.conf = conf; keyConverter = newStringConverter(mapSchema, model, new ParentValueContainer() { @Override @SuppressWarnings("unchecked") @@ -1077,13 +1136,18 @@ public void add(Object value) { Type valueType = keyValueType.getType(1); Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); - valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - MapKeyValueConverter.this.value = (V) value; - } - }); + valueConverter = newConverter( + nonNullValueSchema, + valueType, + model, + new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + MapKeyValueConverter.this.value = (V) value; + } + }, + conf); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java index 0034149e12..3511313452 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java @@ -20,6 +20,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -28,8 +29,9 @@ class AvroRecordMaterializer extends RecordMaterializer { private AvroRecordConverter root; - public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema, GenericData baseModel) { - this.root = new AvroRecordConverter(requestedSchema, avroSchema, baseModel); + public AvroRecordMaterializer( + MessageType requestedSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration configuration) { + this.root = new AvroRecordConverter(requestedSchema, avroSchema, baseModel, configuration); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 033a80d8fd..6628c6a2db 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -90,9 +90,18 @@ public class AvroSchemaConverter { private final Set pathsToInt96; public AvroSchemaConverter() { - this(ADD_LIST_ELEMENT_RECORDS_DEFAULT); + this(ADD_LIST_ELEMENT_RECORDS_DEFAULT, null); } + AvroSchemaConverter(boolean assumeRepeatedIsListElement, ParquetConfiguration conf) { + this.assumeRepeatedIsListElement = assumeRepeatedIsListElement; + this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; + this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; + this.readInt96AsFixed = conf != null + ? conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT) + : READ_INT96_AS_FIXED_DEFAULT; + this.pathsToInt96 = Collections.emptySet(); + } /** * Constructor used by {@link AvroRecordConverter#isElementType}, which always * uses the 2-level list conversion. diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index fd4cf2011b..a5cfc30a7d 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.avro; +import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; import static org.apache.parquet.avro.AvroTestUtil.array; import static org.apache.parquet.avro.AvroTestUtil.field; import static org.apache.parquet.avro.AvroTestUtil.instance; @@ -36,6 +37,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.DirectWriterTest; +import org.apache.parquet.conf.HadoopParquetConfiguration; +import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.schema.MessageType; @@ -59,6 +62,7 @@ public static void setupNewBehaviorConfiguration() { @Test @Ignore(value = "Not yet supported") public void testUnannotatedListOfPrimitives() throws Exception { + Path test = writeDirect("message UnannotatedListOfPrimitives {" + " repeated int32 list_of_ints;" + "}", rc -> { rc.startMessage(); @@ -1116,10 +1120,12 @@ public void testIsElementTypeRequiredRepeatedRecord() { + " }\n" + "}"); Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); + ParquetConfiguration conf = new HadoopParquetConfiguration(); Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), - avroSchema.getFields().get(0).schema())); + avroSchema.getFields().get(0).schema(), + conf)); // Test `array` style naming parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" @@ -1133,7 +1139,28 @@ public void testIsElementTypeRequiredRepeatedRecord() { Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field"), - avroSchema.getFields().get(0).schema())); + avroSchema.getFields().get(0).schema(), + conf)); + } + + @Test + public void testIsElementTypeInt96Element() { + Configuration configuration = new Configuration(); + configuration.setBoolean(READ_INT96_AS_FIXED, true); + + MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithInt96 {\n" + + " optional group list (LIST) {\n" + + " repeated group list {\n" + + " optional int96 a_timestamp;\n" + + " }\n" + + " }\n" + + "}"); + Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema); + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list").asGroupType().getType("list"), + AvroSchemaConverter.getNonNull(avroSchema.getFields().get(0).schema()) + .getElementType(), + new HadoopParquetConfiguration(configuration))); } @Test @@ -1147,10 +1174,11 @@ public void testIsElementTypeOptionalRepeatedRecord() { + " }\n" + "}"); Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); - + final HadoopParquetConfiguration conf = new HadoopParquetConfiguration(); Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), - avroSchema.getFields().get(0).schema())); + avroSchema.getFields().get(0).schema(), + conf)); // Test `array` style naming parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" @@ -1164,7 +1192,8 @@ public void testIsElementTypeOptionalRepeatedRecord() { Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field"), - avroSchema.getFields().get(0).schema())); + avroSchema.getFields().get(0).schema(), + conf)); } @Test diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java index 0b446e665c..ef5ef1fa2b 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java @@ -65,6 +65,7 @@ import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageTypeParser; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +74,7 @@ * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with * different parameters than ParquetInputFormat */ +@Ignore public class DeprecatedInputFormatTest { private static final Logger LOG = LoggerFactory.getLogger(DeprecatedInputFormatTest.class); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java index f37aaa321a..6393235167 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java @@ -37,12 +37,14 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; /** * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with * different parameters than ParquetInputFormat */ +@Ignore public class DeprecatedOutputFormatTest { final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet"); final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index c78ee09ecc..3e3a8e650a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -39,6 +39,7 @@ import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,6 +211,7 @@ public void createDirectFactoryWithHeapAllocatorFails() { } @Test + @Ignore public void compressionCodecs() { final int[] sizes = {4 * 1024, 1 * 1024 * 1024}; final boolean[] comp = {true, false}; From d30ed57bbe8db4646f0dc6f1494dde72d116a451 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma Date: Fri, 10 Jan 2025 12:14:32 +0100 Subject: [PATCH 2/5] revert back unnecessary changes --- .../org/apache/parquet/hadoop/DeprecatedInputFormatTest.java | 2 -- .../org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java | 2 -- .../java/org/apache/parquet/hadoop/TestDirectCodecFactory.java | 2 -- 3 files changed, 6 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java index ef5ef1fa2b..0b446e665c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedInputFormatTest.java @@ -65,7 +65,6 @@ import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.schema.MessageTypeParser; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +73,6 @@ * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with * different parameters than ParquetInputFormat */ -@Ignore public class DeprecatedInputFormatTest { private static final Logger LOG = LoggerFactory.getLogger(DeprecatedInputFormatTest.class); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java index 6393235167..f37aaa321a 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/DeprecatedOutputFormatTest.java @@ -37,14 +37,12 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; /** * DeprecatedParquetInputFormat is used by cascading. It initializes the recordReader using an initialize method with * different parameters than ParquetInputFormat */ -@Ignore public class DeprecatedOutputFormatTest { final Path parquetPath = new Path("target/test/example/TestInputOutputFormat/parquet"); final Path inputPath = new Path("src/test/java/org/apache/parquet/hadoop/example/TestInputOutputFormat.java"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 3e3a8e650a..c78ee09ecc 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -39,7 +39,6 @@ import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -211,7 +210,6 @@ public void createDirectFactoryWithHeapAllocatorFails() { } @Test - @Ignore public void compressionCodecs() { final int[] sizes = {4 * 1024, 1 * 1024 * 1024}; final boolean[] comp = {true, false}; From d0c5e336ab59d1a795ceca46dc1175727fc6ef8c Mon Sep 17 00:00:00 2001 From: Pratyush Sharma Date: Mon, 27 Jan 2025 18:10:36 +0100 Subject: [PATCH 3/5] review comments - hardcode AvroSchemaConverter to always read int96 --- .../avro/AvroCompatRecordMaterializer.java | 6 +- .../avro/AvroIndexedRecordConverter.java | 132 +++++--------- .../apache/parquet/avro/AvroReadSupport.java | 8 +- .../parquet/avro/AvroRecordConverter.java | 162 ++++++------------ .../parquet/avro/AvroRecordMaterializer.java | 6 +- .../parquet/avro/AvroSchemaConverter.java | 13 +- .../parquet/avro/TestArrayCompatibility.java | 21 +-- 7 files changed, 108 insertions(+), 240 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java index 365f5d64bb..9b2cbf7186 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroCompatRecordMaterializer.java @@ -21,7 +21,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; -import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -30,9 +29,8 @@ class AvroCompatRecordMaterializer extends RecordMateri private AvroIndexedRecordConverter root; - public AvroCompatRecordMaterializer( - MessageType requestedSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) { - this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema, baseModel, conf); + public AvroCompatRecordMaterializer(MessageType requestedSchema, Schema avroSchema, GenericData baseModel) { + this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema, baseModel); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java index 35961b825c..ff77d44408 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java @@ -29,7 +29,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import org.apache.parquet.Preconditions; -import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; @@ -58,26 +57,20 @@ class AvroIndexedRecordConverter extends GroupConverter private final GenericData model; private final Map recordDefaults = new HashMap(); - public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, ParquetConfiguration conf) { - this(parquetSchema, avroSchema, SpecificData.get(), conf); + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) { + this(parquetSchema, avroSchema, SpecificData.get()); } - public AvroIndexedRecordConverter( - MessageType parquetSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) { - this(null, parquetSchema, avroSchema, baseModel, conf); + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) { + this(null, parquetSchema, avroSchema, baseModel); } - public AvroIndexedRecordConverter( - ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, ParquetConfiguration conf) { - this(parent, parquetSchema, avroSchema, SpecificData.get(), conf); + public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema) { + this(parent, parquetSchema, avroSchema, SpecificData.get()); } public AvroIndexedRecordConverter( - ParentValueContainer parent, - GroupType parquetSchema, - Schema avroSchema, - GenericData baseModel, - ParquetConfiguration conf) { + ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, GenericData baseModel) { this.parent = parent; this.avroSchema = avroSchema; int schemaSize = parquetSchema.getFieldCount(); @@ -96,17 +89,13 @@ public AvroIndexedRecordConverter( Schema.Field avroField = getAvroField(parquetField.getName()); Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema()); final int finalAvroIndex = avroFieldIndexes.remove(avroField.name()); - converters[parquetFieldIndex++] = newConverter( - nonNullSchema, - parquetField, - model, - new ParentValueContainer() { + converters[parquetFieldIndex++] = + newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() { @Override public void add(Object value) { AvroIndexedRecordConverter.this.set(finalAvroIndex, value); } - }, - conf); + }); } // store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema) for (String fieldName : avroFieldIndexes.keySet()) { @@ -148,8 +137,7 @@ private Schema.Field getAvroField(String parquetFieldName) { return avroField; } - private static Converter newConverter( - Schema schema, Type type, GenericData model, ParentValueContainer setter, ParquetConfiguration conf) { + private static Converter newConverter(Schema schema, Type type, GenericData model, ParentValueContainer setter) { LogicalType logicalType = schema.getLogicalType(); // the expected type is always null because it is determined by the parent @@ -160,7 +148,7 @@ private static Converter newConverter( switch (schema.getType()) { case ARRAY: - return new AvroArrayConverter(parent, type.asGroupType(), schema, model, conf); + return new AvroArrayConverter(parent, type.asGroupType(), schema, model); case BOOLEAN: return new AvroConverters.FieldBooleanConverter(parent); case BYTES: @@ -178,13 +166,13 @@ private static Converter newConverter( case LONG: return new AvroConverters.FieldLongConverter(parent); case MAP: - return new MapConverter(parent, type.asGroupType(), schema, model, conf); + return new MapConverter(parent, type.asGroupType(), schema, model); case RECORD: - return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model, conf); + return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model); case STRING: return new AvroConverters.FieldStringConverter(parent); case UNION: - return new AvroUnionConverter(parent, type, schema, model, conf); + return new AvroUnionConverter(parent, type, schema, model); case NULL: // fall through default: throw new UnsupportedOperationException( @@ -329,35 +317,25 @@ static final class AvroArrayConverter extends GroupConverter { private final Converter converter; private GenericArray array; - public AvroArrayConverter( - ParentValueContainer parent, - GroupType type, - Schema avroSchema, - GenericData model, - ParquetConfiguration conf) { + public AvroArrayConverter(ParentValueContainer parent, GroupType type, Schema avroSchema, GenericData model) { this.parent = parent; this.avroSchema = avroSchema; Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType()); Type repeatedType = type.getType(0); // always determine whether the repeated type is the element type by // matching it against the element schema. - if (AvroRecordConverter.isElementType(repeatedType, elementSchema, conf)) { + if (AvroRecordConverter.isElementType(repeatedType, elementSchema)) { // the element type is the repeated type (and required) - converter = newConverter( - elementSchema, - repeatedType, - model, - new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - array.add(value); - } - }, - conf); + converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + array.add(value); + } + }); } else { // the element is wrapped in a synthetic group and may be optional - converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model, conf); + converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model); } } @@ -391,21 +369,16 @@ final class ElementConverter extends GroupConverter { private Object element; private final Converter elementConverter; - public ElementConverter( - GroupType repeatedType, Schema elementSchema, GenericData model, ParquetConfiguration conf) { + public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) { Type elementType = repeatedType.getType(0); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); - this.elementConverter = newConverter( - nonNullElementSchema, - elementType, - model, - new ParentValueContainer() { + this.elementConverter = + newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() { @Override public void add(Object value) { ElementConverter.this.element = value; } - }, - conf); + }); } @Override @@ -433,11 +406,7 @@ static final class AvroUnionConverter extends GroupConverter { private Object memberValue = null; public AvroUnionConverter( - ParentValueContainer parent, - Type parquetSchema, - Schema avroSchema, - GenericData model, - ParquetConfiguration conf) { + ParentValueContainer parent, Type parquetSchema, Schema avroSchema, GenericData model) { this.parent = parent; GroupType parquetGroup = parquetSchema.asGroupType(); this.memberConverters = new Converter[parquetGroup.getFieldCount()]; @@ -447,19 +416,15 @@ public AvroUnionConverter( Schema memberSchema = avroSchema.getTypes().get(index); if (!memberSchema.getType().equals(Schema.Type.NULL)) { Type memberType = parquetGroup.getType(parquetIndex); - memberConverters[parquetIndex] = newConverter( - memberSchema, - memberType, - model, - new ParentValueContainer() { + memberConverters[parquetIndex] = + newConverter(memberSchema, memberType, model, new ParentValueContainer() { @Override public void add(Object value) { Preconditions.checkArgument( memberValue == null, "Union is resolving to more than one type"); memberValue = value; } - }, - conf); + }); parquetIndex++; // Note for nulls the parquetIndex id not increased } } @@ -487,15 +452,10 @@ static final class MapConverter extends GroupConverter { private final Converter keyValueConverter; private Map map; - public MapConverter( - ParentValueContainer parent, - GroupType mapType, - Schema mapSchema, - GenericData model, - ParquetConfiguration conf) { + public MapConverter(ParentValueContainer parent, GroupType mapType, Schema mapSchema, GenericData model) { this.parent = parent; GroupType repeatedKeyValueType = mapType.getType(0).asGroupType(); - this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model, conf); + this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model); } @Override @@ -520,8 +480,7 @@ final class MapKeyValueConverter extends GroupConverter { private final Converter keyConverter; private final Converter valueConverter; - public MapKeyValueConverter( - GroupType keyValueType, Schema mapSchema, GenericData model, ParquetConfiguration conf) { + public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, GenericData model) { keyConverter = new PrimitiveConverter() { @Override public final void addBinary(Binary value) { @@ -531,18 +490,13 @@ public final void addBinary(Binary value) { Type valueType = keyValueType.getType(1); Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); - valueConverter = newConverter( - nonNullValueSchema, - valueType, - model, - new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - MapKeyValueConverter.this.value = (V) value; - } - }, - conf); + valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + MapKeyValueConverter.this.value = (V) value; + } + }); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index debc4878ee..58a28bfeba 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -156,15 +156,15 @@ public RecordMaterializer prepareForRead( GenericData model = getDataModel(configuration, avroSchema); String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY); if (Boolean.parseBoolean(compatEnabled)) { - return newCompatMaterializer(parquetSchema, avroSchema, model, configuration); + return newCompatMaterializer(parquetSchema, avroSchema, model); } - return new AvroRecordMaterializer(parquetSchema, avroSchema, model, configuration); + return new AvroRecordMaterializer(parquetSchema, avroSchema, model); } @SuppressWarnings("unchecked") private static RecordMaterializer newCompatMaterializer( - MessageType parquetSchema, Schema avroSchema, GenericData model, ParquetConfiguration conf) { - return (RecordMaterializer) new AvroCompatRecordMaterializer(parquetSchema, avroSchema, model, conf); + MessageType parquetSchema, Schema avroSchema, GenericData model) { + return (RecordMaterializer) new AvroCompatRecordMaterializer(parquetSchema, avroSchema, model); } private GenericData getDataModel(ParquetConfiguration conf, Schema schema) { diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 4aee109aee..441428bfa7 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -58,7 +58,6 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.avro.AvroConverters.FieldStringConverter; import org.apache.parquet.avro.AvroConverters.FieldStringableConverter; -import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -92,12 +91,9 @@ class AvroRecordConverter extends AvroConverters.AvroGroupConverter { private final GenericData model; private final Map recordDefaults = new HashMap(); - private final AvroSchemaConverter avroSchemaConverter; - private final ParquetConfiguration conf; - public AvroRecordConverter( - MessageType parquetSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration conf) { - this(null, parquetSchema, avroSchema, baseModel, conf); + public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema, GenericData baseModel) { + this(null, parquetSchema, avroSchema, baseModel); LogicalType logicalType = avroSchema.getLogicalType(); Conversion conversion = baseModel.getConversionFor(logicalType); this.rootContainer = ParentValueContainer.getConversionContainer( @@ -113,17 +109,11 @@ public void add(Object value) { } public AvroRecordConverter( - ParentValueContainer parent, - GroupType parquetSchema, - Schema avroSchema, - GenericData model, - ParquetConfiguration conf) { + ParentValueContainer parent, GroupType parquetSchema, Schema avroSchema, GenericData model) { super(parent); this.avroSchema = avroSchema; this.model = (model == null ? ReflectData.get() : model); this.converters = new Converter[parquetSchema.getFieldCount()]; - this.conf = conf; - this.avroSchemaConverter = new AvroSchemaConverter(true, conf); Map avroFieldIndexes = new HashMap(); int avroFieldIndex = 0; @@ -152,7 +142,7 @@ public void add(Object value) { Class fieldClass = fields.get(avroField.name()); converters[parquetFieldIndex] = - newConverter(nonNullSchema, parquetField, this.model, fieldClass, container, conf); + newConverter(nonNullSchema, parquetField, this.model, fieldClass, container); // @Stringable doesn't affect the reflected schema; must be enforced here if (recordClass != null && converters[parquetFieldIndex] instanceof FieldStringConverter) { @@ -336,18 +326,12 @@ private Schema.Field getAvroField(String parquetFieldName) { String.format("Parquet/Avro schema mismatch: Avro field '%s' not found", parquetFieldName)); } - private static Converter newConverter( - Schema schema, Type type, GenericData model, ParentValueContainer setter, ParquetConfiguration conf) { - return newConverter(schema, type, model, null, setter, conf); + private static Converter newConverter(Schema schema, Type type, GenericData model, ParentValueContainer setter) { + return newConverter(schema, type, model, null, setter); } private static Converter newConverter( - Schema schema, - Type type, - GenericData model, - Class knownClass, - ParentValueContainer setter, - ParquetConfiguration conf) { + Schema schema, Type type, GenericData model, Class knownClass, ParentValueContainer setter) { LogicalType logicalType = schema.getLogicalType(); Conversion conversion; @@ -399,19 +383,19 @@ private static Converter newConverter( } return newStringConverter(schema, model, parent); case RECORD: - return new AvroRecordConverter(parent, type.asGroupType(), schema, model, conf); + return new AvroRecordConverter(parent, type.asGroupType(), schema, model); case ENUM: return new AvroConverters.FieldEnumConverter(parent, schema, model); case ARRAY: Class arrayDatumClass = getDatumClass(conversion, knownClass, schema, model); if (arrayDatumClass != null && arrayDatumClass.isArray()) { - return new AvroArrayConverter(parent, type.asGroupType(), schema, model, arrayDatumClass, conf); + return new AvroArrayConverter(parent, type.asGroupType(), schema, model, arrayDatumClass); } - return new AvroCollectionConverter(parent, type.asGroupType(), schema, model, arrayDatumClass, conf); + return new AvroCollectionConverter(parent, type.asGroupType(), schema, model, arrayDatumClass); case MAP: - return new MapConverter(parent, type.asGroupType(), schema, model, conf); + return new MapConverter(parent, type.asGroupType(), schema, model); case UNION: - return new AvroUnionConverter(parent, type, schema, model, conf); + return new AvroUnionConverter(parent, type, schema, model); case FIXED: return new AvroConverters.FieldFixedConverter(parent, schema, model); default: @@ -573,7 +557,6 @@ static final class AvroCollectionConverter extends GroupConverter { private final ParentValueContainer parent; private final Schema avroSchema; private final Converter converter; - private final ParquetConfiguration conf; private Class containerClass; private Collection container; @@ -582,33 +565,26 @@ public AvroCollectionConverter( GroupType type, Schema avroSchema, GenericData model, - Class containerClass, - ParquetConfiguration conf) { + Class containerClass) { this.parent = parent; this.avroSchema = avroSchema; this.containerClass = containerClass; - this.conf = conf; Schema elementSchema = AvroSchemaConverter.getNonNull(avroSchema.getElementType()); Type repeatedType = type.getType(0); // always determine whether the repeated type is the element type by // matching it against the element schema. - if (isElementType(repeatedType, elementSchema, conf)) { + if (isElementType(repeatedType, elementSchema)) { // the element type is the repeated type (and required) - converter = newConverter( - elementSchema, - repeatedType, - model, - new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - container.add(value); - } - }, - conf); + converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + container.add(value); + } + }); } else { // the element is wrapped in a synthetic group and may be optional - converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model, conf); + converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model); } } @@ -655,22 +631,17 @@ final class ElementConverter extends GroupConverter { private Object element; private final Converter elementConverter; - public ElementConverter( - GroupType repeatedType, Schema elementSchema, GenericData model, ParquetConfiguration conf) { + public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) { Type elementType = repeatedType.getType(0); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); - this.elementConverter = newConverter( - nonNullElementSchema, - elementType, - model, - new ParentValueContainer() { + this.elementConverter = + newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() { @Override @SuppressWarnings("unchecked") public void add(Object value) { ElementConverter.this.element = value; } - }, - conf); + }); } @Override @@ -709,7 +680,6 @@ static final class AvroArrayConverter extends GroupConverter { private final ParentValueContainer parent; private final Schema avroSchema; private final Converter converter; - private final ParquetConfiguration conf; private Class elementClass; private Collection container; @@ -718,11 +688,9 @@ public AvroArrayConverter( GroupType type, Schema avroSchema, GenericData model, - Class arrayClass, - ParquetConfiguration conf) { + Class arrayClass) { this.parent = parent; this.avroSchema = avroSchema; - this.conf = conf; Preconditions.checkArgument(arrayClass.isArray(), "Cannot convert non-array: %s", arrayClass.getName()); this.elementClass = arrayClass.getComponentType(); @@ -733,12 +701,12 @@ public AvroArrayConverter( // always determine whether the repeated type is the element type by // matching it against the element schema. - if (isElementType(repeatedType, elementSchema, conf)) { + if (isElementType(repeatedType, elementSchema)) { // the element type is the repeated type (and required) - converter = newConverter(elementSchema, repeatedType, model, elementClass, setter, conf); + converter = newConverter(elementSchema, repeatedType, model, elementClass, setter); } else { // the element is wrapped in a synthetic group and may be optional - converter = new ArrayElementConverter(repeatedType.asGroupType(), elementSchema, model, setter, conf); + converter = new ArrayElementConverter(repeatedType.asGroupType(), elementSchema, model, setter); } } @@ -877,26 +845,19 @@ public void add(Object value) { final class ArrayElementConverter extends GroupConverter { private boolean isSet; private final Converter elementConverter; - private final ParquetConfiguration conf; public ArrayElementConverter( GroupType repeatedType, Schema elementSchema, GenericData model, - final ParentValueContainer setter, - ParquetConfiguration conf) { - this.conf = conf; + final ParentValueContainer setter) { Type elementType = repeatedType.getType(0); Preconditions.checkArgument( !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED), "Cannot convert list of optional elements to primitive array"); Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); this.elementConverter = newConverter( - nonNullElementSchema, - elementType, - model, - elementClass, - new ParentValueContainer() { + nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() { @Override public void add(Object value) { isSet = true; @@ -950,8 +911,7 @@ public void addDouble(double value) { isSet = true; setter.addDouble(value); } - }, - conf); + }); } @Override @@ -994,7 +954,7 @@ public void end() { * @param elementSchema the expected Schema for list elements * @return {@code true} if the repeatedType is the element schema */ - static boolean isElementType(Type repeatedType, Schema elementSchema, ParquetConfiguration conf) { + static boolean isElementType(Type repeatedType, Schema elementSchema) { if (repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 || repeatedType.getName().equals("array") @@ -1003,8 +963,7 @@ static boolean isElementType(Type repeatedType, Schema elementSchema, ParquetCon // synthetic wrapper. Must be a group with one optional or required field return true; } else if (elementSchema != null && elementSchema.getType() == Schema.Type.RECORD) { - AvroSchemaConverter converter = new AvroSchemaConverter(true, conf); - Schema schemaFromRepeated = converter.convert(repeatedType.asGroupType()); + Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType()); if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated) .getType() == COMPATIBLE) { @@ -1016,30 +975,21 @@ static boolean isElementType(Type repeatedType, Schema elementSchema, ParquetCon static final class AvroUnionConverter extends AvroConverters.AvroGroupConverter { private final Converter[] memberConverters; - private final ParquetConfiguration conf; private Object memberValue = null; public AvroUnionConverter( - ParentValueContainer parent, - Type parquetSchema, - Schema avroSchema, - GenericData model, - ParquetConfiguration conf) { + ParentValueContainer parent, Type parquetSchema, Schema avroSchema, GenericData model) { super(parent); GroupType parquetGroup = parquetSchema.asGroupType(); this.memberConverters = new Converter[parquetGroup.getFieldCount()]; - this.conf = conf; int parquetIndex = 0; for (int index = 0; index < avroSchema.getTypes().size(); index++) { Schema memberSchema = avroSchema.getTypes().get(index); if (!memberSchema.getType().equals(Schema.Type.NULL)) { Type memberType = parquetGroup.getType(parquetIndex); - memberConverters[parquetIndex] = newConverter( - memberSchema, - memberType, - model, - new ParentValueContainer() { + memberConverters[parquetIndex] = + newConverter(memberSchema, memberType, model, new ParentValueContainer() { @Override public void add(Object value) { Preconditions.checkArgument( @@ -1047,8 +997,7 @@ public void add(Object value) { "Union is resolving to more than one type"); memberValue = value; } - }, - conf); + }); parquetIndex++; // Note for nulls the parquetIndex id not increased } } @@ -1078,15 +1027,10 @@ static final class MapConverter extends GroupConverter { private final Class mapClass; private Map map; - public MapConverter( - ParentValueContainer parent, - GroupType mapType, - Schema mapSchema, - GenericData model, - ParquetConfiguration conf) { + public MapConverter(ParentValueContainer parent, GroupType mapType, Schema mapSchema, GenericData model) { this.parent = parent; GroupType repeatedKeyValueType = mapType.getType(0).asGroupType(); - this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model, conf); + this.keyValueConverter = new MapKeyValueConverter(repeatedKeyValueType, mapSchema, model); this.schema = mapSchema; this.mapClass = getDatumClass(mapSchema, model); } @@ -1121,11 +1065,8 @@ final class MapKeyValueConverter extends GroupConverter { private V value; private final Converter keyConverter; private final Converter valueConverter; - private final ParquetConfiguration conf; - public MapKeyValueConverter( - GroupType keyValueType, Schema mapSchema, GenericData model, ParquetConfiguration conf) { - this.conf = conf; + public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, GenericData model) { keyConverter = newStringConverter(mapSchema, model, new ParentValueContainer() { @Override @SuppressWarnings("unchecked") @@ -1136,18 +1077,13 @@ public void add(Object value) { Type valueType = keyValueType.getType(1); Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); - valueConverter = newConverter( - nonNullValueSchema, - valueType, - model, - new ParentValueContainer() { - @Override - @SuppressWarnings("unchecked") - public void add(Object value) { - MapKeyValueConverter.this.value = (V) value; - } - }, - conf); + valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + public void add(Object value) { + MapKeyValueConverter.this.value = (V) value; + } + }); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java index 3511313452..0034149e12 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java @@ -20,7 +20,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; -import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; @@ -29,9 +28,8 @@ class AvroRecordMaterializer extends RecordMaterializer { private AvroRecordConverter root; - public AvroRecordMaterializer( - MessageType requestedSchema, Schema avroSchema, GenericData baseModel, ParquetConfiguration configuration) { - this.root = new AvroRecordConverter(requestedSchema, avroSchema, baseModel, configuration); + public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema, GenericData baseModel) { + this.root = new AvroRecordConverter(requestedSchema, avroSchema, baseModel); } @Override diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 6628c6a2db..b98d0c5acc 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -90,18 +90,9 @@ public class AvroSchemaConverter { private final Set pathsToInt96; public AvroSchemaConverter() { - this(ADD_LIST_ELEMENT_RECORDS_DEFAULT, null); + this(ADD_LIST_ELEMENT_RECORDS_DEFAULT); } - AvroSchemaConverter(boolean assumeRepeatedIsListElement, ParquetConfiguration conf) { - this.assumeRepeatedIsListElement = assumeRepeatedIsListElement; - this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; - this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; - this.readInt96AsFixed = conf != null - ? conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT) - : READ_INT96_AS_FIXED_DEFAULT; - this.pathsToInt96 = Collections.emptySet(); - } /** * Constructor used by {@link AvroRecordConverter#isElementType}, which always * uses the 2-level list conversion. @@ -112,7 +103,7 @@ public AvroSchemaConverter() { this.assumeRepeatedIsListElement = assumeRepeatedIsListElement; this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; - this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT; + this.readInt96AsFixed = true; this.pathsToInt96 = Collections.emptySet(); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index a5cfc30a7d..085f4925ed 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -37,8 +37,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.DirectWriterTest; -import org.apache.parquet.conf.HadoopParquetConfiguration; -import org.apache.parquet.conf.ParquetConfiguration; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.schema.MessageType; @@ -62,7 +60,6 @@ public static void setupNewBehaviorConfiguration() { @Test @Ignore(value = "Not yet supported") public void testUnannotatedListOfPrimitives() throws Exception { - Path test = writeDirect("message UnannotatedListOfPrimitives {" + " repeated int32 list_of_ints;" + "}", rc -> { rc.startMessage(); @@ -1120,12 +1117,10 @@ public void testIsElementTypeRequiredRepeatedRecord() { + " }\n" + "}"); Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); - ParquetConfiguration conf = new HadoopParquetConfiguration(); Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), - avroSchema.getFields().get(0).schema(), - conf)); + avroSchema.getFields().get(0).schema())); // Test `array` style naming parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" @@ -1139,8 +1134,7 @@ public void testIsElementTypeRequiredRepeatedRecord() { Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field"), - avroSchema.getFields().get(0).schema(), - conf)); + avroSchema.getFields().get(0).schema())); } @Test @@ -1159,8 +1153,7 @@ public void testIsElementTypeInt96Element() { Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list").asGroupType().getType("list"), AvroSchemaConverter.getNonNull(avroSchema.getFields().get(0).schema()) - .getElementType(), - new HadoopParquetConfiguration(configuration))); + .getElementType())); } @Test @@ -1174,11 +1167,10 @@ public void testIsElementTypeOptionalRepeatedRecord() { + " }\n" + "}"); Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); - final HadoopParquetConfiguration conf = new HadoopParquetConfiguration(); + Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), - avroSchema.getFields().get(0).schema(), - conf)); + avroSchema.getFields().get(0).schema())); // Test `array` style naming parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" @@ -1192,8 +1184,7 @@ public void testIsElementTypeOptionalRepeatedRecord() { Assert.assertTrue(AvroRecordConverter.isElementType( parquetSchema.getType("list_field"), - avroSchema.getFields().get(0).schema(), - conf)); + avroSchema.getFields().get(0).schema())); } @Test From bc3e8ac28ab0e629d6041c2d22be6a65b9330018 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma Date: Tue, 28 Jan 2025 11:12:38 +0100 Subject: [PATCH 4/5] fix failing test TestAvroSchemaConverter#testParquetInt96DefaultPass --- .../parquet/avro/TestAvroSchemaConverter.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 077e9cecd5..c8640d14c6 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -145,7 +145,7 @@ private void testParquetToAvroConversion(Schema avroSchema, String schemaString) private void testParquetToAvroConversion(Configuration conf, Schema avroSchema, String schemaString) throws Exception { - AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf); + AvroSchemaConverter avroSchemaConverter = conf == null ? new AvroSchemaConverter() : new AvroSchemaConverter(conf); Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType(schemaString)); assertEquals("converting " + schemaString + " to " + avroSchema, avroSchema.toString(), schema.toString()); } @@ -587,16 +587,12 @@ public void testParquetInt96AsFixed12AvroType() throws Exception { } @Test - public void testParquetInt96DefaultFail() throws Exception { + public void testParquetInt96DefaultPass() throws Exception { Schema schema = Schema.createRecord("myrecord", null, null, false); + Schema int96schema = Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12); + schema.setFields(Collections.singletonList(new Schema.Field("int96_field", int96schema, null, null))); - MessageType parquetSchemaWithInt96 = - MessageTypeParser.parseMessageType("message myrecord {\n required int96 int96_field;\n}\n"); - - assertThrows( - "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.", - IllegalArgumentException.class, - () -> new AvroSchemaConverter().convert(parquetSchemaWithInt96)); + testParquetToAvroConversion(null, schema, "message myrecord {\n required int96 int96_field;\n}\n"); } @Test From 9f05d91f6195505e887a6765efc158e55292a247 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma Date: Tue, 28 Jan 2025 18:17:30 +0100 Subject: [PATCH 5/5] new AvroSchemaConverter constructor with readInt96AsFixed param --- .../apache/parquet/avro/AvroRecordConverter.java | 2 +- .../apache/parquet/avro/AvroSchemaConverter.java | 9 +++++---- .../parquet/avro/TestAvroSchemaConverter.java | 14 +++++++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 441428bfa7..09d61db5a2 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -939,7 +939,7 @@ public void end() { // 2-level lists and the result is checked to see if it matches the requested // element type. This should always convert assuming 2-level lists because // 2-level and 3-level can't be mixed. - private static final AvroSchemaConverter CONVERTER = new AvroSchemaConverter(true); + private static final AvroSchemaConverter CONVERTER = new AvroSchemaConverter(true, true); /** * Returns whether the given type is the element type of a list or is a diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index b98d0c5acc..9632fc1754 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -90,20 +90,21 @@ public class AvroSchemaConverter { private final Set pathsToInt96; public AvroSchemaConverter() { - this(ADD_LIST_ELEMENT_RECORDS_DEFAULT); + this(ADD_LIST_ELEMENT_RECORDS_DEFAULT, READ_INT96_AS_FIXED_DEFAULT); } /** * Constructor used by {@link AvroRecordConverter#isElementType}, which always - * uses the 2-level list conversion. + * uses the 2-level list conversion and reads INT96 as 12 byte array. * * @param assumeRepeatedIsListElement whether to assume 2-level lists + * @param readInt96AsFixed whether to read Parquet INT96 as 12 byte array. */ - AvroSchemaConverter(boolean assumeRepeatedIsListElement) { + AvroSchemaConverter(boolean assumeRepeatedIsListElement, boolean readInt96AsFixed) { this.assumeRepeatedIsListElement = assumeRepeatedIsListElement; this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; - this.readInt96AsFixed = true; + this.readInt96AsFixed = readInt96AsFixed; this.pathsToInt96 = Collections.emptySet(); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index c8640d14c6..077e9cecd5 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -145,7 +145,7 @@ private void testParquetToAvroConversion(Schema avroSchema, String schemaString) private void testParquetToAvroConversion(Configuration conf, Schema avroSchema, String schemaString) throws Exception { - AvroSchemaConverter avroSchemaConverter = conf == null ? new AvroSchemaConverter() : new AvroSchemaConverter(conf); + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(conf); Schema schema = avroSchemaConverter.convert(MessageTypeParser.parseMessageType(schemaString)); assertEquals("converting " + schemaString + " to " + avroSchema, avroSchema.toString(), schema.toString()); } @@ -587,12 +587,16 @@ public void testParquetInt96AsFixed12AvroType() throws Exception { } @Test - public void testParquetInt96DefaultPass() throws Exception { + public void testParquetInt96DefaultFail() throws Exception { Schema schema = Schema.createRecord("myrecord", null, null, false); - Schema int96schema = Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12); - schema.setFields(Collections.singletonList(new Schema.Field("int96_field", int96schema, null, null))); - testParquetToAvroConversion(null, schema, "message myrecord {\n required int96 int96_field;\n}\n"); + MessageType parquetSchemaWithInt96 = + MessageTypeParser.parseMessageType("message myrecord {\n required int96 int96_field;\n}\n"); + + assertThrows( + "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.", + IllegalArgumentException.class, + () -> new AvroSchemaConverter().convert(parquetSchemaWithInt96)); } @Test