diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java new file mode 100644 index 000000000000..d70e12be7817 --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowFormatModels.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized; + +import org.apache.arrow.vector.NullCheckingForGet; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.parquet.ParquetFormatModel; + +public class ArrowFormatModels { + public static void register() { + FormatModelRegistry.register( + ParquetFormatModel.create( + ColumnarBatch.class, + Object.class, + (schema, fileSchema, engineSchema, idToConstant) -> + ArrowReader.VectorizedCombinedScanIterator.buildReader( + schema, + fileSchema, + NullCheckingForGet.NULL_CHECKING_ENABLED /* setArrowValidityVector */))); + } + + private ArrowFormatModels() {} +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java index 06b7baec27d5..68a27bdfb8eb 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java @@ -29,7 +29,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.arrow.vector.NullCheckingForGet; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.iceberg.CombinedScanTask; @@ -40,13 +39,14 @@ import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMappingParser; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -189,8 +189,7 @@ public void close() throws IOException { * Reads the data file and returns an iterator of {@link VectorSchemaRoot}. Only Parquet data file * format is supported. */ - private static final class VectorizedCombinedScanIterator - implements CloseableIterator { + static final class VectorizedCombinedScanIterator implements CloseableIterator { private final Iterator fileItr; private final Map inputFiles; @@ -324,19 +323,8 @@ CloseableIterator open(FileScanTask task) { InputFile location = getInputFile(task); Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); if (task.file().format() == FileFormat.PARQUET) { - Parquet.ReadBuilder builder = - Parquet.read(location) - .project(expectedSchema) - .split(task.start(), task.length()) - .createBatchedReaderFunc( - fileSchema -> - buildReader( - expectedSchema, - fileSchema, /* setArrowValidityVector */ - NullCheckingForGet.NULL_CHECKING_ENABLED)) - .recordsPerBatch(batchSize) - .filter(task.residual()) - .caseSensitive(caseSensitive); + ReadBuilder builder = + FormatModelRegistry.readBuilder(FileFormat.PARQUET, ColumnarBatch.class, location); if (reuseContainers) { builder.reuseContainers(); @@ -345,7 +333,14 @@ CloseableIterator open(FileScanTask task) { builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); } - iter = builder.build(); + iter = + builder + .project(expectedSchema) + .split(task.start(), task.length()) + .recordsPerBatch(batchSize) + .caseSensitive(caseSensitive) + .filter(task.residual()) + .build(); } else { throw new UnsupportedOperationException( "Format: " + task.file().format() + " not supported for batched reads"); @@ -376,7 +371,7 @@ private InputFile getInputFile(FileScanTask task) { * @param fileSchema Schema of the data file. * @param setArrowValidityVector Indicates whether to set the validity vector in Arrow vectors. */ - private static ArrowBatchReader buildReader( + static ArrowBatchReader buildReader( Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) { return (ArrowBatchReader) TypeWithSchemaVisitor.visit( diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index b9adafdbc2c9..e86dd9f97aa1 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -55,7 +55,9 @@ private FormatModelRegistry() {} private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class); // The list of classes which are used for registering the reader and writer builders private static final List CLASSES_TO_REGISTER = - ImmutableList.of("org.apache.iceberg.data.GenericFormatModels"); + ImmutableList.of( + "org.apache.iceberg.data.GenericFormatModels", + "org.apache.iceberg.arrow.vectorized.ArrowFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS =