diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 99f5c742d37c..8dbb9dd44b8e 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -30,24 +30,18 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.Deletes; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.deletes.PositionDeleteIndexUtil; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.RangeReadable; import org.apache.iceberg.io.SeekableInputStream; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.orc.OrcRowReader; -import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -58,8 +52,6 @@ import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; -import org.apache.orc.TypeDescription; -import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -229,44 +221,9 @@ private CloseableIterable openDeletes( LOG.trace("Opening delete file {}", deleteFile.location()); InputFile inputFile = loadInputFile.apply(deleteFile); - switch (format) { - case AVRO: - return Avro.read(inputFile) - .project(projection) - .reuseContainers() - .createResolvingReader(PlannedDataReader::create) - .build(); - - case PARQUET: - return Parquet.read(inputFile) - .project(projection) - .filter(filter) - .reuseContainers() - .createReaderFunc(newParquetReaderFunc(projection)) - .build(); - - case ORC: - // reusing containers is automatic for ORC, no need to call 'reuseContainers' - return ORC.read(inputFile) - .project(projection) - .filter(filter) - .createReaderFunc(newOrcReaderFunc(projection)) - .build(); - - default: - throw new UnsupportedOperationException( - String.format( - "Cannot read deletes, %s is not a supported file format: %s", - format.name(), inputFile.location())); - } - } - - private Function> newParquetReaderFunc(Schema projection) { - return fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema); - } - - private Function> newOrcReaderFunc(Schema projection) { - return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema); + ReadBuilder builder = + FormatModelRegistry.readBuilder(format, Record.class, inputFile); + return builder.project(projection).reuseContainers().filter(filter).build(); } private Iterable execute(Iterable objects, Function func) { diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index 9a1455f80fb0..f18f5785105f 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -22,26 +22,19 @@ import java.util.Map; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.TableScan; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PartitionUtil; class GenericReader implements Serializable { @@ -96,58 +89,19 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject Map partition = PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant); - switch (task.file().format()) { - case AVRO: - Avro.ReadBuilder avro = - Avro.read(input) - .project(fileProjection) - .createResolvingReader(schema -> PlannedDataReader.create(schema, partition)) - .split(task.start(), task.length()); - - if (reuseContainers) { - avro.reuseContainers(); - } - - return avro.build(); - - case PARQUET: - Parquet.ReadBuilder parquet = - Parquet.read(input) - .project(fileProjection) - .createReaderFunc( - fileSchema -> - GenericParquetReaders.buildReader(fileProjection, fileSchema, partition)) - .split(task.start(), task.length()) - .caseSensitive(caseSensitive) - .filter(task.residual()); - - if (reuseContainers) { - parquet.reuseContainers(); - } - - return parquet.build(); - - case ORC: - Schema projectionWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - fileProjection, Sets.union(partition.keySet(), MetadataColumns.metadataFieldIds())); - ORC.ReadBuilder orc = - ORC.read(input) - .project(projectionWithoutConstantAndMetadataFields) - .createReaderFunc( - fileSchema -> - GenericOrcReader.buildReader(fileProjection, fileSchema, partition)) - .split(task.start(), task.length()) - .caseSensitive(caseSensitive) - .filter(task.residual()); - - return orc.build(); - - default: - throw new UnsupportedOperationException( - String.format( - "Cannot read %s file: %s", task.file().format().name(), task.file().location())); + ReadBuilder builder = + FormatModelRegistry.readBuilder(task.file().format(), Record.class, input); + if (reuseContainers) { + builder = builder.reuseContainers(); } + + return builder + .project(fileProjection) + .idToConstant(partition) + .split(task.start(), task.length()) + .caseSensitive(caseSensitive) + .filter(task.residual()) + .build(); } private class CombinedTaskIterable extends CloseableGroup implements CloseableIterable { diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 58966c666d5d..033b35985046 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -21,13 +21,9 @@ import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; -import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.function.BiFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -39,8 +35,6 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTableScan; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MetadataColumns; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.SerializableTable; @@ -49,19 +43,17 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; -import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.GenericDeleteFilter; -import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.InternalRecordWrapper; -import org.apache.iceberg.data.avro.PlannedDataReader; -import org.apache.iceberg.data.orc.GenericOrcReader; -import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.Record; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.formats.ReadBuilder; import org.apache.iceberg.hadoop.HadoopConfigurable; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -70,13 +62,7 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; -import org.apache.iceberg.orc.ORC; -import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.SerializationUtil; import org.apache.iceberg.util.ThreadPools; @@ -320,29 +306,34 @@ public void close() throws IOException { currentIterator.close(); } + @SuppressWarnings("unchecked") private CloseableIterable openTask(FileScanTask currentTask, Schema readSchema) { DataFile file = currentTask.file(); InputFile inputFile = encryptionManager.decrypt( EncryptedFiles.encryptedInput(io.newInputFile(file.location()), file.keyMetadata())); - CloseableIterable iterable; - switch (file.format()) { - case AVRO: - iterable = newAvroIterable(inputFile, currentTask, readSchema); - break; - case ORC: - iterable = newOrcIterable(inputFile, currentTask, readSchema); - break; - case PARQUET: - iterable = newParquetIterable(inputFile, currentTask, readSchema); - break; - default: - throw new UnsupportedOperationException( - String.format("Cannot read %s file: %s", file.format().name(), file.location())); + ReadBuilder readBuilder = + FormatModelRegistry.readBuilder(file.format(), Record.class, inputFile); + + if (reuseContainers) { + readBuilder = readBuilder.reuseContainers(); } - return iterable; + if (nameMapping != null) { + readBuilder = readBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return applyResidualFiltering( + (CloseableIterable) + readBuilder + .project(readSchema) + .split(currentTask.start(), currentTask.length()) + .caseSensitive(caseSensitive) + .filter(currentTask.residual()) + .build(), + currentTask.residual(), + readSchema); } @SuppressWarnings("unchecked") @@ -369,86 +360,6 @@ private CloseableIterable applyResidualFiltering( } } - private CloseableIterable newAvroIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Avro.ReadBuilder avroReadBuilder = - Avro.read(inputFile).project(readSchema).split(task.start(), task.length()); - if (reuseContainers) { - avroReadBuilder.reuseContainers(); - } - if (nameMapping != null) { - avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - - avroReadBuilder.createResolvingReader( - schema -> - PlannedDataReader.create( - schema, constantsMap(task, IdentityPartitionConverters::convertConstant))); - return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema); - } - - private CloseableIterable newParquetIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Parquet.ReadBuilder parquetReadBuilder = - Parquet.read(inputFile) - .project(readSchema) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - if (reuseContainers) { - parquetReadBuilder.reuseContainers(); - } - if (nameMapping != null) { - parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - parquetReadBuilder.createReaderFunc( - fileSchema -> - GenericParquetReaders.buildReader( - readSchema, - fileSchema, - constantsMap(task, IdentityPartitionConverters::convertConstant))); - CloseableIterable parquetIterator = parquetReadBuilder.build(); - return applyResidualFiltering(parquetIterator, task.residual(), readSchema); - } - - private CloseableIterable newOrcIterable( - InputFile inputFile, FileScanTask task, Schema readSchema) { - Map idToConstant = - constantsMap(task, IdentityPartitionConverters::convertConstant); - Schema readSchemaWithoutConstantAndMetadataFields = - TypeUtil.selectNot( - readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); - - // ORC does not support reuse containers yet - ORC.ReadBuilder orcReadBuilder = - ORC.read(inputFile) - .project(readSchemaWithoutConstantAndMetadataFields) - .filter(task.residual()) - .caseSensitive(caseSensitive) - .split(task.start(), task.length()); - orcReadBuilder.createReaderFunc( - fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema, idToConstant)); - - if (nameMapping != null) { - orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping)); - } - CloseableIterable orcIterator = orcReadBuilder.build(); - return applyResidualFiltering(orcIterator, task.residual(), readSchema); - } - - private Map constantsMap( - FileScanTask task, BiFunction converter) { - PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); - Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns); - boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); - if (projectsIdentityPartitionColumns) { - return PartitionUtil.constantsMap(task, converter); - } else { - return Collections.emptyMap(); - } - } - private static Schema readSchema( Configuration conf, Schema tableSchema, boolean caseSensitive) { Schema readSchema = InputFormatConfig.readSchema(conf);