Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 81 additions & 76 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -455,36 +456,40 @@ private boolean manifestHasDeletedFiles(

boolean isDelete = reader.isDeleteManifestReader();

for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.location())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
&& entry.dataSequenceNumber() < minSequenceNumber)
|| (isDelete && isDanglingDV((DeleteFile) file));

if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file);
ValidationException.check(
allRowsMatch
|| isDelete, // ignore delete files where some records may not match the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.location());

if (allRowsMatch) {
if (failAnyDelete) {
throw new DeleteException(reader.spec().partitionToPath(file.partition()));
try (CloseableIterable<ManifestEntry<F>> liveEntries = reader.liveEntries()) {
for (ManifestEntry<F> entry : liveEntries) {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.location())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
&& entry.dataSequenceNumber() < minSequenceNumber)
|| (isDelete && isDanglingDV((DeleteFile) file));

if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file);
ValidationException.check(
allRowsMatch || isDelete, // ignore delete files where some records may not match the
// expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.location());

if (allRowsMatch) {
if (failAnyDelete) {
throw new DeleteException(reader.spec().partitionToPath(file.partition()));
}

// as soon as a deleted file is detected, stop scanning
return true;
}

// as soon as a deleted file is detected, stop scanning
return true;
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest entries: %s", manifest);
}

return false;
Expand All @@ -504,58 +509,58 @@ private ManifestFile filterManifestWithDeletedFiles(

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
try {
reader
.liveEntries()
.forEach(
entry -> {
F file = entry.file();
boolean isDanglingDV = isDelete && isDanglingDV((DeleteFile) file);
boolean markedForDelete =
isDanglingDV
|| deletePaths.contains(file.location())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
&& entry.dataSequenceNumber() < minSequenceNumber);
if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file);
ValidationException.check(
allRowsMatch
|| isDelete, // ignore delete files where some records may not match
// the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
try (CloseableIterable<ManifestEntry<F>> liveEntries = reader.liveEntries()) {
liveEntries.forEach(
entry -> {
F file = entry.file();
boolean isDanglingDV = isDelete && isDanglingDV((DeleteFile) file);
boolean markedForDelete =
isDanglingDV
|| deletePaths.contains(file.location())
|| deleteFiles.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
&& entry.dataSequenceNumber() < minSequenceNumber);
if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file);
ValidationException.check(
allRowsMatch
|| isDelete, // ignore delete files where some records may not match
// the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.location());

if (allRowsMatch) {
writer.delete(entry);
F fileCopy = file.copyWithoutStats();
// add the file here in case it was deleted using an expression. The
// DeleteManifestFilterManager will then remove its matching DV
deleteFiles.add(fileCopy);

if (deletedFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
file.location());

if (allRowsMatch) {
writer.delete(entry);
F fileCopy = file.copyWithoutStats();
// add the file here in case it was deleted using an expression. The
// DeleteManifestFilterManager will then remove its matching DV
deleteFiles.add(fileCopy);

if (deletedFiles.contains(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
file.location());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(fileCopy);
}
} else {
writer.existing(entry);
}

duplicateDeleteCount += 1;
} else {
writer.existing(entry);
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(fileCopy);
}
});
} else {
writer.existing(entry);
}

} else {
writer.existing(entry);
}
});
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest entries: %s", manifest);
} finally {
writer.close();
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestDeleteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,20 @@ public void removingDataFileByPathAlsoRemovesDV() {
statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING));
}

@TestTemplate
public void testFilterManifestsWithLimitedIOPool() {
TestTables.TrackingFileIO io = new TestTables.TrackingFileIO(FILE_IO);
TestTables.TestTableOperations ops =
new TestTables.TestTableOperations("limited-test", tableDir, io);
Table testTable =
TestTables.create(
tableDir, "limited-test", SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, ops);
commit(testTable, testTable.newFastAppend().appendFile(FILE_A), branch);
Snapshot deleteSnap = commit(testTable, testTable.newDelete().deleteFile(FILE_A), branch);
assertThat(deleteSnap.summary()).containsEntry(SnapshotSummary.DELETED_FILES_PROP, "1");
assertThat(io.getPeakCount()).isEqualTo(1);
}

private static ByteBuffer longToBuffer(long value) {
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value);
}
Expand Down
105 changes: 105 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestTables.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.iceberg.TableMetadata.newTableMetadata;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand All @@ -31,6 +33,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -408,4 +411,106 @@ public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailu
throw new RuntimeException("Expected to mock this function");
}
}

/** A {@link FileIO} that enforces a limit on concurrent open input streams. */
public static class TrackingFileIO implements FileIO {

private final FileIO delegate;
private final AtomicInteger count = new AtomicInteger(0);
private final AtomicInteger peakCount = new AtomicInteger(0);

TrackingFileIO(FileIO delegate) {
this.delegate = delegate;
}

@Override
public InputFile newInputFile(String path) {
return new TrackingInputFile(delegate.newInputFile(path), count, peakCount);
}

@Override
public OutputFile newOutputFile(String path) {
return delegate.newOutputFile(path);
}

@Override
public void deleteFile(String path) {
delegate.deleteFile(path);
}

public int getPeakCount() {
return peakCount.get();
}
}

private static class TrackingInputFile implements InputFile {

private final InputFile delegate;
private final AtomicInteger count;
private final AtomicInteger peakCount;

TrackingInputFile(InputFile delegate, AtomicInteger count, AtomicInteger peakCount) {
this.delegate = delegate;
this.count = count;
this.peakCount = peakCount;
}

@Override
public long getLength() {
return delegate.getLength();
}

@Override
public SeekableInputStream newStream() {
peakCount.accumulateAndGet(count.incrementAndGet(), Math::max);
return new TrackingSeekableInputStream(delegate.newStream(), count);
}

@Override
public String location() {
return delegate.location();
}

@Override
public boolean exists() {
return delegate.exists();
}
}

private static class TrackingSeekableInputStream extends SeekableInputStream {

private final SeekableInputStream delegate;
private final AtomicInteger count;

TrackingSeekableInputStream(SeekableInputStream delegate, AtomicInteger count) {
this.delegate = delegate;
this.count = count;
}

@Override
public long getPos() throws IOException {
return delegate.getPos();
}

@Override
public void seek(long newPos) throws IOException {
delegate.seek(newPos);
}

@Override
public int read() throws IOException {
return delegate.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return delegate.read(b, off, len);
}

@Override
public void close() throws IOException {
delegate.close();
count.decrementAndGet();
}
}
}
Loading