Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 5 additions & 48 deletions data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,18 @@
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.deletes.PositionDeleteIndexUtil;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.orc.OrcRowReader;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
Expand All @@ -58,8 +52,6 @@
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.orc.TypeDescription;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -229,44 +221,9 @@ private CloseableIterable<Record> openDeletes(
LOG.trace("Opening delete file {}", deleteFile.location());
InputFile inputFile = loadInputFile.apply(deleteFile);

switch (format) {
case AVRO:
return Avro.read(inputFile)
.project(projection)
.reuseContainers()
.createResolvingReader(PlannedDataReader::create)
.build();

case PARQUET:
return Parquet.read(inputFile)
.project(projection)
.filter(filter)
.reuseContainers()
.createReaderFunc(newParquetReaderFunc(projection))
.build();

case ORC:
// reusing containers is automatic for ORC, no need to call 'reuseContainers'
return ORC.read(inputFile)
.project(projection)
.filter(filter)
.createReaderFunc(newOrcReaderFunc(projection))
.build();

default:
throw new UnsupportedOperationException(
String.format(
"Cannot read deletes, %s is not a supported file format: %s",
format.name(), inputFile.location()));
}
}

private Function<MessageType, ParquetValueReader<?>> newParquetReaderFunc(Schema projection) {
return fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema);
}

private Function<TypeDescription, OrcRowReader<?>> newOrcReaderFunc(Schema projection) {
return fileSchema -> GenericOrcReader.buildReader(projection, fileSchema);
ReadBuilder<Record, ?> builder =
FormatModelRegistry.readBuilder(format, Record.class, inputFile);
return builder.project(projection).reuseContainers().filter(filter).build();
}

private <I, O> Iterable<O> execute(Iterable<I> objects, Function<I, O> func) {
Expand Down
74 changes: 14 additions & 60 deletions data/src/main/java/org/apache/iceberg/data/GenericReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,19 @@
import java.util.Map;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;

class GenericReader implements Serializable {
Expand Down Expand Up @@ -96,58 +89,19 @@ private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProject
Map<Integer, ?> partition =
PartitionUtil.constantsMap(task, IdentityPartitionConverters::convertConstant);

switch (task.file().format()) {
case AVRO:
Avro.ReadBuilder avro =
Avro.read(input)
.project(fileProjection)
.createResolvingReader(schema -> PlannedDataReader.create(schema, partition))
.split(task.start(), task.length());

if (reuseContainers) {
avro.reuseContainers();
}

return avro.build();

case PARQUET:
Parquet.ReadBuilder parquet =
Parquet.read(input)
.project(fileProjection)
.createReaderFunc(
fileSchema ->
GenericParquetReaders.buildReader(fileProjection, fileSchema, partition))
.split(task.start(), task.length())
.caseSensitive(caseSensitive)
.filter(task.residual());

if (reuseContainers) {
parquet.reuseContainers();
}

return parquet.build();

case ORC:
Schema projectionWithoutConstantAndMetadataFields =
TypeUtil.selectNot(
fileProjection, Sets.union(partition.keySet(), MetadataColumns.metadataFieldIds()));
ORC.ReadBuilder orc =
ORC.read(input)
.project(projectionWithoutConstantAndMetadataFields)
.createReaderFunc(
fileSchema ->
GenericOrcReader.buildReader(fileProjection, fileSchema, partition))
.split(task.start(), task.length())
.caseSensitive(caseSensitive)
.filter(task.residual());

return orc.build();

default:
throw new UnsupportedOperationException(
String.format(
"Cannot read %s file: %s", task.file().format().name(), task.file().location()));
ReadBuilder<Record, ?> builder =
FormatModelRegistry.readBuilder(task.file().format(), Record.class, input);
if (reuseContainers) {
builder = builder.reuseContainers();
}

return builder
.project(fileProjection)
.idToConstant(partition)
.split(task.start(), task.length())
.caseSensitive(caseSensitive)
.filter(task.residual())
.build();
}

private class CombinedTaskIterable extends CloseableGroup implements CloseableIterable<Record> {
Expand Down
135 changes: 23 additions & 112 deletions mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
Expand All @@ -39,8 +35,6 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SerializableTable;
Expand All @@ -49,19 +43,17 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.GenericDeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.avro.PlannedDataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
Expand All @@ -70,13 +62,7 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SerializationUtil;
import org.apache.iceberg.util.ThreadPools;

Expand Down Expand Up @@ -320,29 +306,34 @@ public void close() throws IOException {
currentIterator.close();
}

@SuppressWarnings("unchecked")
private CloseableIterable<T> openTask(FileScanTask currentTask, Schema readSchema) {
DataFile file = currentTask.file();
InputFile inputFile =
encryptionManager.decrypt(
EncryptedFiles.encryptedInput(io.newInputFile(file.location()), file.keyMetadata()));

CloseableIterable<T> iterable;
switch (file.format()) {
case AVRO:
iterable = newAvroIterable(inputFile, currentTask, readSchema);
break;
case ORC:
iterable = newOrcIterable(inputFile, currentTask, readSchema);
break;
case PARQUET:
iterable = newParquetIterable(inputFile, currentTask, readSchema);
break;
default:
throw new UnsupportedOperationException(
String.format("Cannot read %s file: %s", file.format().name(), file.location()));
ReadBuilder<Record, ?> readBuilder =
FormatModelRegistry.readBuilder(file.format(), Record.class, inputFile);

if (reuseContainers) {
readBuilder = readBuilder.reuseContainers();
}

return iterable;
if (nameMapping != null) {
readBuilder = readBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

return applyResidualFiltering(
(CloseableIterable<T>)
readBuilder
.project(readSchema)
.split(currentTask.start(), currentTask.length())
.caseSensitive(caseSensitive)
.filter(currentTask.residual())
.build(),
currentTask.residual(),
readSchema);
}

@SuppressWarnings("unchecked")
Expand All @@ -369,86 +360,6 @@ private CloseableIterable<T> applyResidualFiltering(
}
}

private CloseableIterable<T> newAvroIterable(
InputFile inputFile, FileScanTask task, Schema readSchema) {
Avro.ReadBuilder avroReadBuilder =
Avro.read(inputFile).project(readSchema).split(task.start(), task.length());
if (reuseContainers) {
avroReadBuilder.reuseContainers();
}
if (nameMapping != null) {
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}

avroReadBuilder.createResolvingReader(
schema ->
PlannedDataReader.create(
schema, constantsMap(task, IdentityPartitionConverters::convertConstant)));
return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema);
}

private CloseableIterable<T> newParquetIterable(
InputFile inputFile, FileScanTask task, Schema readSchema) {
Parquet.ReadBuilder parquetReadBuilder =
Parquet.read(inputFile)
.project(readSchema)
.filter(task.residual())
.caseSensitive(caseSensitive)
.split(task.start(), task.length());
if (reuseContainers) {
parquetReadBuilder.reuseContainers();
}
if (nameMapping != null) {
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
parquetReadBuilder.createReaderFunc(
fileSchema ->
GenericParquetReaders.buildReader(
readSchema,
fileSchema,
constantsMap(task, IdentityPartitionConverters::convertConstant)));
CloseableIterable<T> parquetIterator = parquetReadBuilder.build();
return applyResidualFiltering(parquetIterator, task.residual(), readSchema);
}

private CloseableIterable<T> newOrcIterable(
InputFile inputFile, FileScanTask task, Schema readSchema) {
Map<Integer, ?> idToConstant =
constantsMap(task, IdentityPartitionConverters::convertConstant);
Schema readSchemaWithoutConstantAndMetadataFields =
TypeUtil.selectNot(
readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));

// ORC does not support reuse containers yet
ORC.ReadBuilder orcReadBuilder =
ORC.read(inputFile)
.project(readSchemaWithoutConstantAndMetadataFields)
.filter(task.residual())
.caseSensitive(caseSensitive)
.split(task.start(), task.length());
orcReadBuilder.createReaderFunc(
fileSchema -> GenericOrcReader.buildReader(readSchema, fileSchema, idToConstant));

if (nameMapping != null) {
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
CloseableIterable<T> orcIterator = orcReadBuilder.build();
return applyResidualFiltering(orcIterator, task.residual(), readSchema);
}

private Map<Integer, ?> constantsMap(
FileScanTask task, BiFunction<Type, Object, Object> converter) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
} else {
return Collections.emptyMap();
}
}

private static Schema readSchema(
Configuration conf, Schema tableSchema, boolean caseSensitive) {
Schema readSchema = InputFormatConfig.readSchema(conf);
Expand Down