Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public void setupTableLocation() {

private RewriteDataFilesSparkAction basicRewrite(Table table) {
// Always compact regardless of input files
table.refresh();
return actions()
.rewriteDataFiles(table)
.option(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "1");
Expand Down Expand Up @@ -293,6 +292,7 @@ public void testBinPackAfterPartitionChange() {
Table table = createTable();

writeRecords(20, SCALE, 20);
table.refresh();
shouldHaveFiles(table, 20);
table.updateSpec().addField(Expressions.ref("c1")).commit();

Expand Down Expand Up @@ -366,7 +366,6 @@ public void testDataFilesRewrittenWithMaxDeleteRatio() throws Exception {

assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles);

table.refresh();
List<DataFile> newDataFiles = TestHelpers.dataFiles(table);
assertThat(newDataFiles).isEmpty();

Expand Down Expand Up @@ -416,7 +415,6 @@ public void testDataFilesRewrittenWithHighDeleteRatio() throws Exception {

assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles);

table.refresh();
List<DataFile> newDataFiles = TestHelpers.dataFiles(table);
assertThat(newDataFiles).hasSize(1);

Expand Down Expand Up @@ -466,7 +464,6 @@ public void testDataFilesNotRewrittenWithLowDeleteRatio() throws Exception {

assertThat(result.rewrittenDataFilesCount()).isEqualTo(0);

table.refresh();
List<DataFile> newDataFiles = TestHelpers.dataFiles(table);
assertThat(newDataFiles).hasSameSizeAs(dataFiles);

Expand All @@ -479,7 +476,6 @@ public void testBinPackWithV2PositionDeletes() throws IOException {
assumeThat(formatVersion).isEqualTo(2);
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
table.refresh();

List<DataFile> dataFiles = TestHelpers.dataFiles(table);
int total = (int) dataFiles.stream().mapToLong(ContentFile::recordCount).sum();
Expand All @@ -496,7 +492,6 @@ public void testBinPackWithV2PositionDeletes() throws IOException {
}

rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
long dataSizeBefore = testDataSize(table);
Result result =
Expand All @@ -523,7 +518,6 @@ public void testBinPackWithDVs() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
table.refresh();
List<Object[]> initialRecords = currentDataWithLineage();
Set<Long> rowIds =
initialRecords.stream().map(record -> (Long) record[0]).collect(Collectors.toSet());
Expand Down Expand Up @@ -555,7 +549,6 @@ public void testBinPackWithDVs() throws IOException {
}

rowDelta.commit();
table.refresh();
List<Object[]> recordsWithLineageAfterDelete = currentDataWithLineage();
rowIds.removeAll(rowIdsBeingRemoved);
assertThat(rowIds)
Expand Down Expand Up @@ -634,7 +627,6 @@ public void removeDanglingDVsFromDeleteManifest() throws Exception {
assertThat(result.rewrittenDataFilesCount()).isEqualTo(numDataFiles);
assertThat(result.removedDeleteFilesCount()).isEqualTo(numDataFiles);

table.refresh();
assertThat(TestHelpers.dataFiles(table)).hasSize(1);
assertThat(TestHelpers.deleteFiles(table)).isEmpty();

Expand Down Expand Up @@ -685,7 +677,6 @@ public void testRemoveDangledEqualityDeletesPartitionEvolution() {
.hasSize(1);

// partition evolution
table.refresh();
table.updateSpec().addField(Expressions.ref("c3")).commit();

// data seq = 4, write 2 new data files in both partitions for evolved spec
Expand Down Expand Up @@ -783,7 +774,6 @@ public void testBinPackWithDeleteAllData() throws IOException {
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
Table table = createTablePartitioned(1, 1, 1);
shouldHaveFiles(table, 1);
table.refresh();

List<DataFile> dataFiles = TestHelpers.dataFiles(table);
int total = (int) dataFiles.stream().mapToLong(ContentFile::recordCount).sum();
Expand All @@ -799,7 +789,6 @@ public void testBinPackWithDeleteAllData() throws IOException {
}

rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
long dataSizeBefore = testDataSize(table);

Expand Down Expand Up @@ -836,7 +825,6 @@ public void testBinPackWithStartingSequenceNumber() {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
long dataSizeBefore = testDataSize(table);

Expand All @@ -852,7 +840,6 @@ public void testBinPackWithStartingSequenceNumber() {
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);

table.refresh();
assertThat(table.currentSnapshot().sequenceNumber())
.as("Table sequence number should be incremented")
.isGreaterThan(oldSequenceNumber);
Expand All @@ -873,7 +860,6 @@ public void testBinPackWithStartingSequenceNumberV1Compatibility() {
Table table = createTablePartitioned(4, 2, SCALE, properties);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
assertThat(oldSequenceNumber).as("Table sequence number should be 0").isZero();
long dataSizeBefore = testDataSize(table);
Expand All @@ -890,7 +876,6 @@ public void testBinPackWithStartingSequenceNumberV1Compatibility() {
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);

table.refresh();
assertThat(table.currentSnapshot().sequenceNumber())
.as("Table sequence number should still be 0")
.isEqualTo(oldSequenceNumber);
Expand Down Expand Up @@ -984,6 +969,7 @@ public void testBinPackCombineMixedFiles() {
// Add one more small file, and one large file
writeRecords(1, SCALE);
writeRecords(1, SCALE * 3);
table.refresh();
shouldHaveFiles(table, 3);

List<Object[]> expectedRecords = currentData();
Expand Down Expand Up @@ -1073,8 +1059,6 @@ public void testPartialProgressEnabled() {
assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

shouldHaveSnapshots(table, 11);
shouldHaveACleanCache(table);

Expand All @@ -1101,8 +1085,6 @@ public void testMultipleGroups() {
assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1130,8 +1112,6 @@ public void testPartialProgressMaxCommits() {
assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1163,8 +1143,6 @@ public void testSingleCommitWithRewriteFailure() {
.isInstanceOf(RuntimeException.class)
.hasMessage("Rewrite Failed");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1197,8 +1175,6 @@ public void testSingleCommitWithCommitFailure() {
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Cannot commit rewrite");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1231,8 +1207,6 @@ public void testCommitFailsWithUncleanableFailure() {
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Arbitrary Failure");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1266,8 +1240,6 @@ public void testParallelSingleCommitWithRewriteFailure() {
.isInstanceOf(CommitFailedException.class)
.hasMessage("Rewrite Failed");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1306,8 +1278,6 @@ public void testPartialProgressWithRewriteFailure() {
assertThat(result.failedDataFilesCount()).isEqualTo(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1349,8 +1319,6 @@ public void testParallelPartialProgressWithRewriteFailure() {
assertThat(result.failedDataFilesCount()).isEqualTo(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1395,8 +1363,6 @@ public void testParallelPartialProgressWithCommitFailure() {
assertThat(result.rewriteResults()).as("Should have 6 fileGroups").hasSize(6);
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1435,8 +1401,6 @@ public void testParallelPartialProgressWithMaxFailedCommits() {
.hasMessageContaining(
"1 rewrite commits failed. This is more than the maximum allowed failures of 0");

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1468,11 +1432,8 @@ public void testParallelPartialProgressWithMaxCommitsLargerThanTotalGroupCount()
.option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_FAILED_COMMITS, "1");
rewrite.execute();

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);
table.refresh();
assertThat(table.snapshots())
.as("Table did not have the expected number of snapshots")
// To tolerate 1 random commit failure
Expand Down Expand Up @@ -1548,8 +1509,6 @@ public void testSortMultipleGroups() {
assertThat(result.rewriteResults()).as("Should have 10 fileGroups").hasSize(10);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1579,8 +1538,6 @@ public void testSimpleSort() {
assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1615,8 +1572,6 @@ public void testSortAfterPartitionChange() {
.hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1646,8 +1601,6 @@ public void testSortCustomSortOrder() {
assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand All @@ -1662,6 +1615,7 @@ public void testSortCustomSortOrderRequiresRepartition() {
int partitions = 4;
Table table = createTable();
writeRecords(20, SCALE, partitions);
table.refresh();
shouldHaveLastCommitUnsorted(table, "c3");

// Add a partition column so this requires repartitioning
Expand All @@ -1685,8 +1639,6 @@ public void testSortCustomSortOrderRequiresRepartition() {
assertThat(result.rewriteResults()).as("Should have 1 fileGroups").hasSize(1);
assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1726,8 +1678,6 @@ public void testAutoSortShuffleOutput() {
.as("Should have written 40+ files")
.hasSizeGreaterThanOrEqualTo(40);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1805,8 +1755,6 @@ public void testZOrderSort() {
.as("Should have written 40+ files")
.hasSizeGreaterThanOrEqualTo(40);

table.refresh();

List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData, postRewriteData);

Expand Down Expand Up @@ -1871,8 +1819,6 @@ public void testZOrderAllTypesSort() {
.as("Should have written 1 file")
.hasSize(1);

table.refresh();

List<Row> postRaw =
spark
.read()
Expand Down Expand Up @@ -2178,15 +2124,13 @@ protected long testDataSize(Table table) {
}

protected void shouldHaveMultipleFiles(Table table) {
table.refresh();
int numFiles = Iterables.size(table.newScan().planFiles());
assertThat(numFiles)
.as(String.format("Should have multiple files, had %d", numFiles))
.isGreaterThan(1);
}

protected void shouldHaveFiles(Table table, int numExpected) {
table.refresh();
List<FileScanTask> files =
StreamSupport.stream(table.newScan().planFiles().spliterator(), false)
.collect(Collectors.toList());
Expand All @@ -2209,7 +2153,6 @@ protected long shouldHaveMinSequenceNumberInPartition(
}

protected void shouldHaveSnapshots(Table table, int expectedSnapshots) {
table.refresh();
assertThat(table.snapshots())
.as("Table did not have the expected number of snapshots")
.hasSize(expectedSnapshots);
Expand Down Expand Up @@ -2262,7 +2205,6 @@ private <T> Pair<T, T> boundsOf(DataFile file, NestedField field, Class<T> javaC

private <T> List<Pair<Pair<T, T>, Pair<T, T>>> checkForOverlappingFiles(
Table table, String column) {
table.refresh();
NestedField field = table.schema().caseInsensitiveFindField(column);
Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();

Expand Down Expand Up @@ -2344,6 +2286,7 @@ protected Table createTable() {
protected Table createTable(int files) {
Table table = createTable();
writeRecords(files, SCALE);
table.refresh();
return table;
}

Expand All @@ -2354,6 +2297,7 @@ protected Table createTablePartitioned(
assertThat(table.currentSnapshot()).as("Table must be empty").isNull();

writeRecords(files, numRecords, partitions);
table.refresh();
return table;
}

Expand Down Expand Up @@ -2406,11 +2350,11 @@ private Table createTypeTestTable() {
.mode("append")
.save(tableLocation);

table.refresh();
return table;
}

protected int averageFileSize(Table table) {
table.refresh();
return (int)
Streams.stream(table.newScan().planFiles())
.mapToLong(FileScanTask::length)
Expand Down
Loading
Loading