diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 7d146d924667..db74601ffaeb 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -35,6 +35,7 @@ import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.expressions.StrictMetricsEvaluator; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -455,36 +456,40 @@ private boolean manifestHasDeletedFiles( boolean isDelete = reader.isDeleteManifestReader(); - for (ManifestEntry entry : reader.liveEntries()) { - F file = entry.file(); - boolean markedForDelete = - deletePaths.contains(file.location()) - || deleteFiles.contains(file) - || dropPartitions.contains(file.specId(), file.partition()) - || (isDelete - && entry.isLive() - && entry.dataSequenceNumber() > 0 - && entry.dataSequenceNumber() < minSequenceNumber) - || (isDelete && isDanglingDV((DeleteFile) file)); - - if (markedForDelete || evaluator.rowsMightMatch(file)) { - boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file); - ValidationException.check( - allRowsMatch - || isDelete, // ignore delete files where some records may not match the expression - "Cannot delete file where some, but not all, rows match filter %s: %s", - this.deleteExpression, - file.location()); - - if (allRowsMatch) { - if (failAnyDelete) { - throw new DeleteException(reader.spec().partitionToPath(file.partition())); + try (CloseableIterable> liveEntries = reader.liveEntries()) { + for (ManifestEntry entry : liveEntries) { + F file = entry.file(); + boolean markedForDelete = + deletePaths.contains(file.location()) + || deleteFiles.contains(file) + || dropPartitions.contains(file.specId(), file.partition()) + || (isDelete + && entry.isLive() + && entry.dataSequenceNumber() > 0 + && entry.dataSequenceNumber() < minSequenceNumber) + || (isDelete && isDanglingDV((DeleteFile) file)); + + if (markedForDelete || evaluator.rowsMightMatch(file)) { + boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file); + ValidationException.check( + allRowsMatch || isDelete, // ignore delete files where some records may not match the + // expression + "Cannot delete file where some, but not all, rows match filter %s: %s", + this.deleteExpression, + file.location()); + + if (allRowsMatch) { + if (failAnyDelete) { + throw new DeleteException(reader.spec().partitionToPath(file.partition())); + } + + // as soon as a deleted file is detected, stop scanning + return true; } - - // as soon as a deleted file is detected, stop scanning - return true; } } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest entries: %s", manifest); } return false; @@ -504,58 +509,58 @@ private ManifestFile filterManifestWithDeletedFiles( try { ManifestWriter writer = newManifestWriter(reader.spec()); - try { - reader - .liveEntries() - .forEach( - entry -> { - F file = entry.file(); - boolean isDanglingDV = isDelete && isDanglingDV((DeleteFile) file); - boolean markedForDelete = - isDanglingDV - || deletePaths.contains(file.location()) - || deleteFiles.contains(file) - || dropPartitions.contains(file.specId(), file.partition()) - || (isDelete - && entry.isLive() - && entry.dataSequenceNumber() > 0 - && entry.dataSequenceNumber() < minSequenceNumber); - if (markedForDelete || evaluator.rowsMightMatch(file)) { - boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file); - ValidationException.check( - allRowsMatch - || isDelete, // ignore delete files where some records may not match - // the expression - "Cannot delete file where some, but not all, rows match filter %s: %s", - this.deleteExpression, + try (CloseableIterable> liveEntries = reader.liveEntries()) { + liveEntries.forEach( + entry -> { + F file = entry.file(); + boolean isDanglingDV = isDelete && isDanglingDV((DeleteFile) file); + boolean markedForDelete = + isDanglingDV + || deletePaths.contains(file.location()) + || deleteFiles.contains(file) + || dropPartitions.contains(file.specId(), file.partition()) + || (isDelete + && entry.isLive() + && entry.dataSequenceNumber() > 0 + && entry.dataSequenceNumber() < minSequenceNumber); + if (markedForDelete || evaluator.rowsMightMatch(file)) { + boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file); + ValidationException.check( + allRowsMatch + || isDelete, // ignore delete files where some records may not match + // the expression + "Cannot delete file where some, but not all, rows match filter %s: %s", + this.deleteExpression, + file.location()); + + if (allRowsMatch) { + writer.delete(entry); + F fileCopy = file.copyWithoutStats(); + // add the file here in case it was deleted using an expression. The + // DeleteManifestFilterManager will then remove its matching DV + deleteFiles.add(fileCopy); + + if (deletedFiles.contains(file)) { + LOG.warn( + "Deleting a duplicate path from manifest {}: {}", + manifest.path(), file.location()); - - if (allRowsMatch) { - writer.delete(entry); - F fileCopy = file.copyWithoutStats(); - // add the file here in case it was deleted using an expression. The - // DeleteManifestFilterManager will then remove its matching DV - deleteFiles.add(fileCopy); - - if (deletedFiles.contains(file)) { - LOG.warn( - "Deleting a duplicate path from manifest {}: {}", - manifest.path(), - file.location()); - duplicateDeleteCount += 1; - } else { - // only add the file to deletes if it is a new delete - // this keeps the snapshot summary accurate for non-duplicate data - deletedFiles.add(fileCopy); - } - } else { - writer.existing(entry); - } - + duplicateDeleteCount += 1; } else { - writer.existing(entry); + // only add the file to deletes if it is a new delete + // this keeps the snapshot summary accurate for non-duplicate data + deletedFiles.add(fileCopy); } - }); + } else { + writer.existing(entry); + } + + } else { + writer.existing(entry); + } + }); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to close manifest entries: %s", manifest); } finally { writer.close(); } diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java index 68e5fa8b560e..21aae21125f0 100644 --- a/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestDeleteFiles.java @@ -667,6 +667,20 @@ public void removingDataFileByPathAlsoRemovesDV() { statuses(ManifestEntry.Status.DELETED, ManifestEntry.Status.EXISTING)); } + @TestTemplate + public void testFilterManifestsWithLimitedIOPool() { + TestTables.TrackingFileIO io = new TestTables.TrackingFileIO(FILE_IO); + TestTables.TestTableOperations ops = + new TestTables.TestTableOperations("limited-test", tableDir, io); + Table testTable = + TestTables.create( + tableDir, "limited-test", SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, ops); + commit(testTable, testTable.newFastAppend().appendFile(FILE_A), branch); + Snapshot deleteSnap = commit(testTable, testTable.newDelete().deleteFile(FILE_A), branch); + assertThat(deleteSnap.summary()).containsEntry(SnapshotSummary.DELETED_FILES_PROP, "1"); + assertThat(io.getPeakCount()).isEqualTo(1); + } + private static ByteBuffer longToBuffer(long value) { return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, value); } diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index 13c859f86065..d250235d81dd 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -21,7 +21,9 @@ import static org.apache.iceberg.TableMetadata.newTableMetadata; import java.io.File; +import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -31,6 +33,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -408,4 +411,106 @@ public void deleteFiles(Iterable pathsToDelete) throws BulkDeletionFailu throw new RuntimeException("Expected to mock this function"); } } + + /** A {@link FileIO} that enforces a limit on concurrent open input streams. */ + public static class TrackingFileIO implements FileIO { + + private final FileIO delegate; + private final AtomicInteger count = new AtomicInteger(0); + private final AtomicInteger peakCount = new AtomicInteger(0); + + TrackingFileIO(FileIO delegate) { + this.delegate = delegate; + } + + @Override + public InputFile newInputFile(String path) { + return new TrackingInputFile(delegate.newInputFile(path), count, peakCount); + } + + @Override + public OutputFile newOutputFile(String path) { + return delegate.newOutputFile(path); + } + + @Override + public void deleteFile(String path) { + delegate.deleteFile(path); + } + + public int getPeakCount() { + return peakCount.get(); + } + } + + private static class TrackingInputFile implements InputFile { + + private final InputFile delegate; + private final AtomicInteger count; + private final AtomicInteger peakCount; + + TrackingInputFile(InputFile delegate, AtomicInteger count, AtomicInteger peakCount) { + this.delegate = delegate; + this.count = count; + this.peakCount = peakCount; + } + + @Override + public long getLength() { + return delegate.getLength(); + } + + @Override + public SeekableInputStream newStream() { + peakCount.accumulateAndGet(count.incrementAndGet(), Math::max); + return new TrackingSeekableInputStream(delegate.newStream(), count); + } + + @Override + public String location() { + return delegate.location(); + } + + @Override + public boolean exists() { + return delegate.exists(); + } + } + + private static class TrackingSeekableInputStream extends SeekableInputStream { + + private final SeekableInputStream delegate; + private final AtomicInteger count; + + TrackingSeekableInputStream(SeekableInputStream delegate, AtomicInteger count) { + this.delegate = delegate; + this.count = count; + } + + @Override + public long getPos() throws IOException { + return delegate.getPos(); + } + + @Override + public void seek(long newPos) throws IOException { + delegate.seek(newPos); + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return delegate.read(b, off, len); + } + + @Override + public void close() throws IOException { + delegate.close(); + count.decrementAndGet(); + } + } }