Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

/** A base writer factory to be extended by query engine integrations. */
/**
* A base writer factory to be extended by query engine integrations.
*
* @deprecated since version 1.11.0 and will be removed in 1.12.0. Use {@link
* RegistryBasedFileWriterFactory}
*/
@Deprecated
public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T>, Serializable {
private final Table table;
private final FileFormat dataFileFormat;
Expand Down Expand Up @@ -75,13 +81,6 @@ protected BaseFileWriterFactory(
this.positionDeleteRowSchema = null;
}

/**
* @deprecated This constructor is deprecated as of version 1.11.0 and will be removed in 1.12.0.
* Position deletes that include row data are no longer supported. Use {@link
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Position deletes that include row data are no longer supported

is this still true? i noticed its missing in the new deprecation message

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the deprecation comment here because this method was introduced in the current release cycle, before we decided to deprecate the entire class. Now that the class itself is deprecated, and the replacement class doesn’t provide a way to set the position‑delete row‑data schema, the constuctor is effectively deprecated as well. However, since there’s no new equivalent method, we don’t really have a suitable place to add a dedicated deprecation message.

* #BaseFileWriterFactory(Table, FileFormat, Schema, SortOrder, FileFormat, int[], Schema,
* SortOrder, Map)} instead.
*/
@Deprecated
protected BaseFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Expand Down
157 changes: 143 additions & 14 deletions data/src/main/java/org/apache/iceberg/data/GenericFileWriterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,37 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.formats.FormatModelRegistry;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
public class GenericFileWriterFactory extends RegistryBasedFileWriterFactory<Record, Schema> {
private static final Logger LOG = LoggerFactory.getLogger(GenericFileWriterFactory.class);

private Table table;
private FileFormat format;
private Schema positionDeleteRowSchema;
private Map<String, String> writerProperties;

GenericFileWriterFactory(
Table table,
Expand All @@ -50,13 +66,16 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
ImmutableMap.of());
ImmutableMap.of(),
null,
null);
}

/**
Expand All @@ -80,14 +99,20 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema,
writerProperties);
writerProperties,
null,
null);
this.table = table;
this.format = dataFileFormat;
this.positionDeleteRowSchema = positionDeleteRowSchema;
this.writerProperties = writerProperties != null ? writerProperties : ImmutableMap.of();
}

/**
Expand All @@ -107,64 +132,168 @@ public class GenericFileWriterFactory extends BaseFileWriterFactory<Record> {
super(
table,
dataFileFormat,
Record.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema);
ImmutableMap.of(),
dataSchema,
equalityDeleteRowSchema);
this.table = table;
this.format = dataFileFormat;
this.positionDeleteRowSchema = positionDeleteRowSchema;
}

static Builder builderFor(Table table) {
return new Builder(table);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configureDataWrite(Avro.DataWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {
builder.createWriterFunc(DataWriter::create);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configureDataWrite(ORC.DataWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}

@Override
/**
* @deprecated Since 1.11.0, will be removed in 1.12.0. It won't be called starting in 1.11.0 as
* the configuration is done by the {@link FormatModelRegistry}.
*/
@Deprecated
protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
builder.createWriterFunc(GenericOrcWriter::buildWriter);
}

@Override
public PositionDeleteWriter<Record> newPositionDeleteWriter(
EncryptedOutputFile file, PartitionSpec spec, StructLike partition) {
if (positionDeleteRowSchema == null) {
return super.newPositionDeleteWriter(file, spec, partition);
} else {
LOG.warn(
"Deprecated feature used. Position delete row schema is used to create the position delete writer.");
Map<String, String> properties = table == null ? ImmutableMap.of() : table.properties();
MetricsConfig metricsConfig =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this part of the code actually mimic

public PositionDeleteWriter<T> newPositionDeleteWriter(
to maintain existing behavior? Because right now I think it's a copy of
public PositionDeleteWriter<Record> newPosDeleteWriter(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally misunderstood this comment.
Fixed now. Please check

table == null
? MetricsConfig.forPositionDelete()
: MetricsConfig.forPositionDelete(table);

try {
return switch (format) {
case AVRO ->
Avro.writeDeletes(file)
.setAll(properties)
.setAll(writerProperties)
.metricsConfig(metricsConfig)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
.overwrite()
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();
case ORC ->
ORC.writeDeletes(file)
.setAll(properties)
.setAll(writerProperties)
.metricsConfig(metricsConfig)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
.overwrite()
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();
case PARQUET ->
Parquet.writeDeletes(file)
.setAll(properties)
.setAll(writerProperties)
.metricsConfig(metricsConfig)
.createWriterFunc(GenericParquetWriter::create)
.withPartition(partition)
.overwrite()
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();
default ->
throw new UnsupportedOperationException(
"Cannot write pos-deletes for unsupported file format: " + format);
};
} catch (IOException e) {
throw new UncheckedIOException("Failed to create new position delete writer", e);
}
}
}

public static class Builder {
private final Table table;
private FileFormat dataFileFormat;
Expand Down
Loading