diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index d94b83e4c973..01d865a610e0 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -177,7 +177,6 @@ public void setupTableLocation() { private RewriteDataFilesSparkAction basicRewrite(Table table) { // Always compact regardless of input files - table.refresh(); return actions() .rewriteDataFiles(table) .option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1"); @@ -293,6 +292,7 @@ public void testBinPackAfterPartitionChange() { Table table = createTable(); writeRecords(20, SCALE, 20); + table.refresh(); shouldHaveFiles(table, 20); table.updateSpec().addField(Expressions.ref("c1")).commit(); @@ -366,7 +366,6 @@ public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception { assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); - table.refresh(); List newDataFiles = TestHelpers.dataFiles(table); assertThat(newDataFiles).isEmpty(); @@ -416,7 +415,6 @@ public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception { assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); - table.refresh(); List newDataFiles = TestHelpers.dataFiles(table); assertThat(newDataFiles).hasSize(1); @@ -466,7 +464,6 @@ public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception { assertThat(result.rewrittenDataFilesCount()).isEqualTo(0); - table.refresh(); List newDataFiles = TestHelpers.dataFiles(table); assertThat(newDataFiles).hasSameSizeAs(dataFiles); @@ -479,7 +476,6 @@ public void testBinPackWithV2PositionDeletes() throws IOException { assumeThat(formatVersion).isEqualTo(2); Table table = createTablePartitioned(4, 2); shouldHaveFiles(table, 8); - table.refresh(); List dataFiles = TestHelpers.dataFiles(table); int total = (int) dataFiles.stream().mapToLong(ContentFile::recordCount).sum(); @@ -496,7 +492,6 @@ public void testBinPackWithV2PositionDeletes() throws IOException { } rowDelta.commit(); - table.refresh(); List expectedRecords = currentData(); long dataSizeBefore = testDataSize(table); Result result = @@ -523,7 +518,6 @@ public void testBinPackWithDVs() throws IOException { assumeThat(formatVersion).isGreaterThanOrEqualTo(3); Table table = createTablePartitioned(4, 2); shouldHaveFiles(table, 8); - table.refresh(); List initialRecords = currentDataWithLineage(); Set rowIds = initialRecords.stream().map(record -> (Long) record[0]).collect(Collectors.toSet()); @@ -555,7 +549,6 @@ public void testBinPackWithDVs() throws IOException { } rowDelta.commit(); - table.refresh(); List recordsWithLineageAfterDelete = currentDataWithLineage(); rowIds.removeAll(rowIdsBeingRemoved); assertThat(rowIds) @@ -634,7 +627,6 @@ public void removeDanglingDVsFromDeleteManifest() throws Exception { assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles); assertThat(result.removedDeleteFilesCount()).isEqualTo(numDataFiles); - table.refresh(); assertThat(TestHelpers.dataFiles(table)).hasSize(1); assertThat(TestHelpers.deleteFiles(table)).isEmpty(); @@ -685,7 +677,6 @@ public void testRemoveDangledEqualityDeletesPartitionEvolution() { .hasSize(1); // partition evolution - table.refresh(); table.updateSpec().addField(Expressions.ref("c3")).commit(); // data seq = 4, write 2 new data files in both partitions for evolved spec @@ -783,7 +774,6 @@ public void testBinPackWithDeleteAllData() throws IOException { assumeThat(formatVersion).isGreaterThanOrEqualTo(2); Table table = createTablePartitioned(1, 1, 1); shouldHaveFiles(table, 1); - table.refresh(); List dataFiles = TestHelpers.dataFiles(table); int total = (int) dataFiles.stream().mapToLong(ContentFile::recordCount).sum(); @@ -799,7 +789,6 @@ public void testBinPackWithDeleteAllData() throws IOException { } rowDelta.commit(); - table.refresh(); List expectedRecords = currentData(); long dataSizeBefore = testDataSize(table); @@ -836,7 +825,6 @@ public void testBinPackWithStartingSequenceNumber() { Table table = createTablePartitioned(4, 2); shouldHaveFiles(table, 8); List expectedRecords = currentData(); - table.refresh(); long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); long dataSizeBefore = testDataSize(table); @@ -852,7 +840,6 @@ public void testBinPackWithStartingSequenceNumber() { List actualRecords = currentData(); assertEquals("Rows must match", expectedRecords, actualRecords); - table.refresh(); assertThat(table.currentSnapshot().sequenceNumber()) .as("Table sequence number should be incremented") .isGreaterThan(oldSequenceNumber); @@ -873,7 +860,6 @@ public void testBinPackWithStartingSequenceNumberV1Compatibility() { Table table = createTablePartitioned(4, 2, SCALE, properties); shouldHaveFiles(table, 8); List expectedRecords = currentData(); - table.refresh(); long oldSequenceNumber = table.currentSnapshot().sequenceNumber(); assertThat(oldSequenceNumber).as("Table sequence number should be 0").isZero(); long dataSizeBefore = testDataSize(table); @@ -890,7 +876,6 @@ public void testBinPackWithStartingSequenceNumberV1Compatibility() { List actualRecords = currentData(); assertEquals("Rows must match", expectedRecords, actualRecords); - table.refresh(); assertThat(table.currentSnapshot().sequenceNumber()) .as("Table sequence number should still be 0") .isEqualTo(oldSequenceNumber); @@ -984,6 +969,7 @@ public void testBinPackCombineMixedFiles() { // Add one more small file, and one large file writeRecords(1, SCALE); writeRecords(1, SCALE * 3); + table.refresh(); shouldHaveFiles(table, 3); List expectedRecords = currentData(); @@ -1073,8 +1059,6 @@ public void testPartialProgressEnabled() { assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - shouldHaveSnapshots(table, 11); shouldHaveACleanCache(table); @@ -1101,8 +1085,6 @@ public void testMultipleGroups() { assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1130,8 +1112,6 @@ public void testPartialProgressMaxCommits() { assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1163,8 +1143,6 @@ public void testSingleCommitWithRewriteFailure() { .isInstanceOf(RuntimeException.class) .hasMessage("Rewrite Failed"); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1197,8 +1175,6 @@ public void testSingleCommitWithCommitFailure() { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Cannot commit rewrite"); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1231,8 +1207,6 @@ public void testCommitFailsWithUncleanableFailure() { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Arbitrary Failure"); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1266,8 +1240,6 @@ public void testParallelSingleCommitWithRewriteFailure() { .isInstanceOf(CommitFailedException.class) .hasMessage("Rewrite Failed"); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1306,8 +1278,6 @@ public void testPartialProgressWithRewriteFailure() { assertThat(result.failedDataFilesCount()).isEqualTo(6); assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1349,8 +1319,6 @@ public void testParallelPartialProgressWithRewriteFailure() { assertThat(result.failedDataFilesCount()).isEqualTo(6); assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1395,8 +1363,6 @@ public void testParallelPartialProgressWithCommitFailure() { assertThat(result.rewriteResults()).as("Should have 6 fileGroups").hasSize(6); assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1435,8 +1401,6 @@ public void testParallelPartialProgressWithMaxFailedCommits() { .hasMessageContaining( "1 rewrite commits failed. This is more than the maximum allowed failures of 0"); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1468,11 +1432,8 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount() .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "1"); rewrite.execute(); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); - table.refresh(); assertThat(table.snapshots()) .as("Table did not have the expected number of snapshots") // To tolerate 1 random commit failure @@ -1548,8 +1509,6 @@ public void testSortMultipleGroups() { assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1579,8 +1538,6 @@ public void testSimpleSort() { assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1615,8 +1572,6 @@ public void testSortAfterPartitionChange() { .hasSize(1); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1646,8 +1601,6 @@ public void testSortCustomSortOrder() { assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1662,6 +1615,7 @@ public void testSortCustomSortOrderRequiresRepartition() { int partitions = 4; Table table = createTable(); writeRecords(20, SCALE, partitions); + table.refresh(); shouldHaveLastCommitUnsorted(table, "c3"); // Add a partition column so this requires repartitioning @@ -1685,8 +1639,6 @@ public void testSortCustomSortOrderRequiresRepartition() { assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1); assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1726,8 +1678,6 @@ public void testAutoSortShuffleOutput() { .as("Should have written 40+ files") .hasSizeGreaterThanOrEqualTo(40); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1805,8 +1755,6 @@ public void testZOrderSort() { .as("Should have written 40+ files") .hasSizeGreaterThanOrEqualTo(40); - table.refresh(); - List postRewriteData = currentData(); assertEquals("We shouldn't have changed the data", originalData, postRewriteData); @@ -1871,8 +1819,6 @@ public void testZOrderAllTypesSort() { .as("Should have written 1 file") .hasSize(1); - table.refresh(); - List postRaw = spark .read() @@ -2178,7 +2124,6 @@ protected long testDataSize(Table table) { } protected void shouldHaveMultipleFiles(Table table) { - table.refresh(); int numFiles = Iterables.size(table.newScan().planFiles()); assertThat(numFiles) .as(String.format("Should have multiple files, had %d", numFiles)) @@ -2186,7 +2131,6 @@ protected void shouldHaveMultipleFiles(Table table) { } protected void shouldHaveFiles(Table table, int numExpected) { - table.refresh(); List files = StreamSupport.stream(table.newScan().planFiles().spliterator(), false) .collect(Collectors.toList()); @@ -2209,7 +2153,6 @@ protected long shouldHaveMinSequenceNumberInPartition( } protected void shouldHaveSnapshots(Table table, int expectedSnapshots) { - table.refresh(); assertThat(table.snapshots()) .as("Table did not have the expected number of snapshots") .hasSize(expectedSnapshots); @@ -2262,7 +2205,6 @@ private Pair boundsOf(DataFile file, NestedField field, Class javaC private List, Pair>> checkForOverlappingFiles( Table table, String column) { - table.refresh(); NestedField field = table.schema().caseInsensitiveFindField(column); Class javaClass = (Class) field.type().typeId().javaClass(); @@ -2344,6 +2286,7 @@ protected Table createTable() { protected Table createTable(int files) { Table table = createTable(); writeRecords(files, SCALE); + table.refresh(); return table; } @@ -2354,6 +2297,7 @@ protected Table createTablePartitioned( assertThat(table.currentSnapshot()).as("Table must be empty").isNull(); writeRecords(files, numRecords, partitions); + table.refresh(); return table; } @@ -2406,11 +2350,11 @@ private Table createTypeTestTable() { .mode("append") .save(tableLocation); + table.refresh(); return table; } protected int averageFileSize(Table table) { - table.refresh(); return (int) Streams.stream(table.newScan().planFiles()) .mapToLong(FileScanTask::length) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index 829ac761c876..7adbe760e763 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -251,7 +251,6 @@ public void testRewriteAll() throws Exception { @TestTemplate public void testRewriteFilter() throws Exception { Table table = createTablePartitioned(4, 2, SCALE); - table.refresh(); List dataFiles = TestHelpers.dataFiles(table); writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); @@ -260,7 +259,6 @@ public void testRewriteFilter() throws Exception { List deleteFiles = deleteFiles(table); assertThat(deleteFiles).hasSize(8); - table.refresh(); List expectedRecords = records(table); List expectedDeletes = deleteRecords(table); assertThat(expectedRecords).hasSize(12000); @@ -536,7 +534,6 @@ public void testSomePartitionsDanglingDeletes() throws Exception { @TestTemplate public void testRewriteFilterRemoveDangling() throws Exception { Table table = createTablePartitioned(4, 2, SCALE); - table.refresh(); List dataFiles = TestHelpers.dataFiles(table); writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles, true); @@ -545,7 +542,6 @@ public void testRewriteFilterRemoveDangling() throws Exception { List deleteFiles = deleteFiles(table); assertThat(deleteFiles).hasSize(8); - table.refresh(); List expectedRecords = records(table); List expectedDeletes = deleteRecords(table); assertThat(expectedRecords).hasSize(12000); // 16000 data - 4000 delete rows @@ -705,7 +701,6 @@ public void testSchemaEvolution() throws Exception { List newSchemaDeleteFiles = except(deleteFiles(table), deleteFiles); assertThat(newSchemaDeleteFiles).hasSize(4); - table.refresh(); List expectedDeletes = deleteRecords(table); List expectedRecords = records(table); assertThat(expectedDeletes).hasSize(4000); // 4 files * 1000 per file