diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java index ae4b911f75425..498d06a1b654b 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingITCase.java @@ -34,16 +34,15 @@ import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; import org.apache.flink.streaming.api.functions.source.legacy.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.legacy.io.TextInputFormat; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.ExceptionUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; @@ -57,13 +56,14 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** * IT cases for the {@link ContinuousFileMonitoringFunction} and {@link * ContinuousFileReaderOperator}. */ -public class ContinuousFileProcessingITCase extends AbstractTestBaseJUnit4 { +class ContinuousFileProcessingITCase extends AbstractTestBase { private static final int NO_OF_FILES = 5; private static final int LINES_PER_FILE = 100; @@ -81,8 +81,8 @@ public class ContinuousFileProcessingITCase extends AbstractTestBaseJUnit4 { // PREPARING FOR THE TESTS - @Before - public void createHDFS() throws IOException { + @BeforeEach + void createHDFS() throws IOException { baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile(); FileUtil.fullyDelete(baseDir); @@ -102,8 +102,8 @@ public void createHDFS() throws IOException { hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf); } - @After - public void destroyHDFS() { + @AfterEach + void destroyHDFS() { FileUtil.fullyDelete(baseDir); hdfsCluster.shutdown(); } @@ -111,7 +111,7 @@ public void destroyHDFS() { // END OF PREPARATIONS @Test - public void testProgram() throws Exception { + void testProgram() throws Exception { /* * This test checks the interplay between the monitor and the reader @@ -138,7 +138,7 @@ public void testProgram() throws Exception { // the monitor has always DOP 1 DataStream splits = env.addSource(monitoringFunction); - Assert.assertEquals(1, splits.getParallelism()); + assertThat(splits.getParallelism()).isOne(); TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); @@ -148,7 +148,7 @@ public void testProgram() throws Exception { "FileSplitReader", typeInfo, new ContinuousFileReaderOperatorFactory<>(format)); - Assert.assertEquals(PARALLELISM, content.getParallelism()); + assertThat(content.getParallelism()).isEqualTo(PARALLELISM); // finally for the sink we set the parallelism to 1 so that we can verify the output TestingSinkFunction sink = new TestingSinkFunction(); @@ -201,7 +201,7 @@ public void testProgram() throws Exception { org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i); hdfs.rename(tmpFile.f0, file); - Assert.assertTrue(hdfs.exists(file)); + assertThat(hdfs.exists(file)).isTrue(); } jobFuture.get(); @@ -217,7 +217,7 @@ private static class TestingSinkFunction extends RichSinkFunction { @Override public void open(OpenContext openContext) throws Exception { // this sink can only work with DOP 1 - assertEquals(1, getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks()); + assertThat(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks()).isOne(); comparator = new Comparator() { @@ -239,7 +239,7 @@ public void invoke(String value) throws Exception { } if (!content.add(value + "\n")) { - Assert.fail("Duplicate line: " + value); + fail("Duplicate line: " + value); System.exit(0); } @@ -252,9 +252,9 @@ public void invoke(String value) throws Exception { @Override public void close() { // check if the data that we collected are the ones they are supposed to be. - Assert.assertEquals(expectedContents.size(), actualContent.size()); + assertThat(actualContent).hasSameSizeAs(expectedContents); for (Integer fileIdx : expectedContents.keySet()) { - Assert.assertTrue(actualContent.keySet().contains(fileIdx)); + assertThat(actualContent).containsKey(fileIdx); List cntnt = new ArrayList<>(actualContent.get(fileIdx)); Collections.sort(cntnt, comparator); @@ -263,7 +263,7 @@ public void close() { for (String line : cntnt) { cntntStr.append(line); } - Assert.assertEquals(expectedContents.get(fileIdx), cntntStr.toString()); + assertThat(cntntStr).hasToString(expectedContents.get(fileIdx)); } expectedContents.clear(); } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java index 6505ca96d3fc8..e24b0bcf32f41 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java @@ -41,17 +41,16 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; import org.apache.flink.test.util.MigrationTest; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.OperatingSystem; import org.apache.commons.io.FileUtils; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.FileOutputStream; @@ -62,33 +61,30 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + /** Tests that verify the migration from previous Flink version snapshots. */ -@RunWith(Parameterized.class) -public class ContinuousFileProcessingMigrationTest implements MigrationTest { +@ExtendWith(ParameterizedTestExtension.class) +class ContinuousFileProcessingMigrationTest implements MigrationTest { private static final int LINES_PER_FILE = 10; private static final long INTERVAL = 100; - @Parameterized.Parameters(name = "Migration Savepoint: {0}") + @Parameters(name = "Migration Savepoint: {0}") public static Collection parameters() { return FlinkVersion.rangeOf( FlinkVersion.v1_8, MigrationTest.getMostRecentlyPublishedVersion()); } - private final FlinkVersion testMigrateVersion; - - public ContinuousFileProcessingMigrationTest(FlinkVersion testMigrateVersion) { - this.testMigrateVersion = testMigrateVersion; - } - - @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @Parameter private FlinkVersion testMigrateVersion; - @BeforeClass - public static void verifyOS() { - Assume.assumeTrue( - "HDFS cluster cannot be start on Windows without extensions.", - !OperatingSystem.isWindows()); + @BeforeAll + static void verifyOS() { + assumeThat(OperatingSystem.isWindows()) + .as("HDFS cluster cannot be started on Windows without extensions.") + .isFalse(); } @SnapshotsGenerator @@ -138,10 +134,8 @@ public void writeReaderSnapshot(FlinkVersion flinkGenerateSavepointVersion) thro + "-snapshot"); } - @Test - public void testReaderRestore() throws Exception { - File testFolder = tempFolder.newFolder(); - + @TestTemplate + void testReaderRestore(@TempDir File testFolder) throws Exception { final OneShotLatch latch = new OneShotLatch(); BlockingFileInputFormat format = @@ -182,17 +176,17 @@ public void testReaderRestore() throws Exception { // compare if the results contain what they should contain and also if // they are the same, as they should. - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1))); - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2))); - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3))); - Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4))); + assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split1)); + assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split2)); + assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split3)); + assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split4)); } @SnapshotsGenerator public void writeMonitoringSourceSnapshot(FlinkVersion flinkGenerateSavepointVersion) throws Exception { - File testFolder = tempFolder.newFolder(); + File testFolder = Files.createTempDirectory("tmpDirPrefix").toFile(); long fileModTime = Long.MIN_VALUE; for (int i = 0; i < 1; i++) { @@ -266,10 +260,8 @@ public void markAsTemporarilyIdle() {} testHarness.close(); } - @Test - public void testMonitoringSourceRestore() throws Exception { - - File testFolder = tempFolder.newFolder(); + @TestTemplate + void testMonitoringSourceRestore(@TempDir File testFolder) throws Exception { TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath())); @@ -292,8 +284,8 @@ public void testMonitoringSourceRestore() throws Exception { testHarness.open(); - Assert.assertEquals( - fileNameAndModTime.f1.longValue(), monitoringFunction.getGlobalModificationTime()); + assertThat(monitoringFunction.getGlobalModificationTime()) + .isEqualTo(fileNameAndModTime.f1.longValue()); } private Tuple2 getResourceFilename(FlinkVersion version) throws IOException { @@ -387,7 +379,7 @@ private Tuple2 createFileAndFillWithData( File base, String fileName, int fileIdx, String sampleLine) throws IOException { File file = new File(base, fileName + fileIdx); - Assert.assertFalse(file.exists()); + assertThat(file.exists()).isFalse(); File tmp = new File(base, "." + fileName + fileIdx); FileOutputStream stream = new FileOutputStream(tmp); @@ -401,7 +393,7 @@ private Tuple2 createFileAndFillWithData( FileUtils.moveFile(tmp, file); - Assert.assertTrue("No result file present", file.exists()); + assertThat(file.exists()).as("No result file present").isTrue(); return new Tuple2<>(file, str.toString()); } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 0383ea28e8b7f..074fb72393ce6 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -49,13 +49,10 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.FileNotFoundException; @@ -71,10 +68,15 @@ import java.util.TreeSet; import java.util.UUID; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assumptions.assumeThat; + /** * Tests for the {@link ContinuousFileMonitoringFunction} and {@link ContinuousFileReaderOperator}. */ -public class ContinuousFileProcessingTest { +class ContinuousFileProcessingTest { private static final int NO_OF_FILES = 5; private static final int LINES_PER_FILE = 10; @@ -85,17 +87,15 @@ public class ContinuousFileProcessingTest { private static String hdfsURI; private static MiniDFSCluster hdfsCluster; - @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); + @TempDir static File hdfsDir; - @BeforeClass - public static void createHDFS() { - Assume.assumeTrue( - "HDFS cluster cannot be start on Windows without extensions.", - !OperatingSystem.isWindows()); + @BeforeAll + static void createHDFS() { + assumeThat(OperatingSystem.isWindows()) + .as("HDFS cluster cannot be started on Windows without extensions.") + .isFalse(); try { - File hdfsDir = tempFolder.newFolder(); - org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration(); hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsDir.getAbsolutePath()); @@ -115,19 +115,19 @@ public static void createHDFS() { } catch (Throwable e) { e.printStackTrace(); - Assert.fail("Test failed " + e.getMessage()); + fail("Test failed " + e.getMessage()); } } - @AfterClass - public static void destroyHDFS() { + @AfterAll + static void destroyHDFS() { if (hdfsCluster != null) { hdfsCluster.shutdown(); } } @Test - public void testInvalidPathSpecification() throws Exception { + void testInvalidPathSpecification() throws Exception { String invalidPath = "hdfs://" @@ -140,24 +140,19 @@ public void testInvalidPathSpecification() throws Exception { ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>( format, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); - try { - monitoringFunction.run( - new DummySourceContext() { - @Override - public void collect(TimestampedFileInputSplit element) { - // we should never arrive here with an invalid path - Assert.fail("Test passes with an invalid path."); - } - }); - - // we should never arrive here with an invalid path - Assert.fail("Test passed with an invalid path."); - - } catch (FileNotFoundException e) { - Assert.assertEquals( - "The provided file path " + format.getFilePaths()[0] + " does not exist.", - e.getMessage()); - } + assertThatThrownBy( + () -> + monitoringFunction.run( + new DummySourceContext() { + @Override + public void collect(TimestampedFileInputSplit element) { + // we should never arrive here with an invalid path + fail("Test passes with an invalid path."); + } + })) + .isInstanceOf(FileNotFoundException.class) + .hasMessage( + "The provided file path " + format.getFilePaths()[0] + " does not exist."); } private OneInputStreamOperatorTestHarness createHarness( @@ -171,7 +166,7 @@ private OneInputStreamOperatorTestHarness crea } @Test - public void testFileReadingOperatorWithEventTime() throws Exception { + void testFileReadingOperatorWithEventTime() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; Set filesCreated = new HashSet<>(); @@ -218,7 +213,7 @@ public void testFileReadingOperatorWithEventTime() throws Exception { // we are in event time, which emits no watermarks, so the last watermark will mark the // of the input stream. - Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size()); + assertThat(tester.getOutput()).hasSize(NO_OF_FILES * LINES_PER_FILE + 1); Map> actualFileContents = new HashMap<>(); Object lastElement = null; @@ -241,13 +236,12 @@ public void testFileReadingOperatorWithEventTime() throws Exception { } // check if the last element is the LongMax watermark - Assert.assertTrue(lastElement instanceof Watermark); - Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + assertThat(lastElement).isInstanceOf(Watermark.class); + assertThat(((Watermark) lastElement).getTimestamp()).isEqualTo(Long.MAX_VALUE); - Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); + assertThat(actualFileContents).hasSameSizeAs(expectedFileContents); for (Integer fileIdx : expectedFileContents.keySet()) { - Assert.assertTrue( - "file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); + assertThat(actualFileContents).as("file" + fileIdx + " not found").containsKey(fileIdx); List cntnt = actualFileContents.get(fileIdx); Collections.sort( @@ -263,7 +257,7 @@ public int compare(String o1, String o2) { for (String line : cntnt) { cntntStr.append(line); } - Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); + assertThat(cntntStr).hasToString(expectedFileContents.get(fileIdx)); } for (org.apache.hadoop.fs.Path file : filesCreated) { @@ -272,7 +266,7 @@ public int compare(String o1, String o2) { } @Test - public void testReaderSnapshotRestore() throws Exception { + void testReaderSnapshotRestore() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; TimestampedFileInputSplit split1 = @@ -339,13 +333,13 @@ public void testReaderSnapshotRestore() throws Exception { // compare if the results contain what they should contain and also if // they are the same, as they should. - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1))); - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2))); - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3))); - Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4))); + assertThat(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1))).isTrue(); + assertThat(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2))).isTrue(); + assertThat(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3))).isTrue(); + assertThat(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4))).isTrue(); - Assert.assertArrayEquals( - initTestInstance.getOutput().toArray(), restoredTestInstance.getOutput().toArray()); + assertThat(restoredTestInstance.getOutput()) + .containsExactlyElementsOf(initTestInstance.getOutput()); } private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) { @@ -406,7 +400,7 @@ public void close() {} //// Monitoring Function Tests ////// @Test - public void testFilePathFiltering() throws Exception { + void testFilePathFiltering() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; Set filesCreated = new HashSet<>(); @@ -448,7 +442,7 @@ public boolean filterPath(Path filePath) { monitoringFunction.open(DefaultOpenContext.INSTANCE); monitoringFunction.run(context); - Assert.assertArrayEquals(filesKept.toArray(), context.getSeenFiles().toArray()); + assertThat(context.getSeenFiles()).isEqualTo(filesKept); // finally delete the files created for the test. for (org.apache.hadoop.fs.Path file : filesCreated) { @@ -457,7 +451,7 @@ public boolean filterPath(Path filePath) { } @Test - public void testNestedFilesProcessing() throws Exception { + void testNestedFilesProcessing() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; final Set filesCreated = new HashSet<>(); @@ -469,7 +463,7 @@ public void testNestedFilesProcessing() throws Exception { org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path( testBasePath + "/" + "firstLevelDir" + "/" + "secondLevelDir"); - Assert.assertFalse(hdfs.exists(firstLevelDir)); + assertThat(hdfs.exists(firstLevelDir)).isFalse(); hdfs.mkdirs(firstLevelDir); hdfs.mkdirs(secondLevelDir); @@ -509,7 +503,7 @@ public void testNestedFilesProcessing() throws Exception { monitoringFunction.open(DefaultOpenContext.INSTANCE); monitoringFunction.run(context); - Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray()); + assertThat(context.getSeenFiles()).isEqualTo(filesToBeRead); // finally delete the dirs and the files created for the test. for (org.apache.hadoop.fs.Path file : filesCreated) { @@ -520,7 +514,7 @@ public void testNestedFilesProcessing() throws Exception { } @Test - public void testSortingOnModTime() throws Exception { + void testSortingOnModTime() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; final long[] modTimes = new long[NO_OF_FILES]; @@ -548,7 +542,7 @@ public void testSortingOnModTime() throws Exception { monitoringFunction.open(DefaultOpenContext.INSTANCE); monitoringFunction.run(context); - Assert.assertEquals(splits.length, context.getCounter()); + assertThat(context.getCounter()).isEqualTo(splits.length); // delete the created files. for (int i = 0; i < NO_OF_FILES; i++) { @@ -557,7 +551,7 @@ public void testSortingOnModTime() throws Exception { } @Test - public void testProcessOnce() throws Exception { + void testProcessOnce() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; final OneShotLatch latch = new OneShotLatch(); @@ -566,7 +560,7 @@ public void testProcessOnce() throws Exception { Tuple2 bootstrap = createFileAndFillWithData( testBasePath, "file", NO_OF_FILES + 1, "This is test line."); - Assert.assertTrue(hdfs.exists(bootstrap.f0)); + assertThat(hdfs.exists(bootstrap.f0)).isTrue(); // the source is supposed to read only this file. final Set filesToBeRead = new TreeSet<>(); @@ -596,7 +590,7 @@ public void run() { context.close(); } catch (Exception e) { - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } }; @@ -617,7 +611,7 @@ public void run() { // wait until the monitoring thread exits t.join(); - Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray()); + assertThat(context.getSeenFiles()).isEqualTo(filesToBeRead); // finally delete the files created for the test. hdfs.delete(bootstrap.f0, false); @@ -627,7 +621,7 @@ public void run() { } @Test - public void testFunctionRestore() throws Exception { + void testFunctionRestore() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; org.apache.hadoop.fs.Path path = null; @@ -707,14 +701,14 @@ public void run() { testHarnessCopy.initializeState(snapshot); testHarnessCopy.open(); - Assert.assertNull(error[0]); - Assert.assertEquals(fileModTime, monitoringFunctionCopy.getGlobalModificationTime()); + assertThat(error[0]).isNull(); + assertThat(monitoringFunctionCopy.getGlobalModificationTime()).isEqualTo(fileModTime); hdfs.delete(path, false); } @Test - public void testProcessContinuously() throws Exception { + void testProcessContinuously() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; final OneShotLatch latch = new OneShotLatch(); @@ -723,7 +717,7 @@ public void testProcessContinuously() throws Exception { Tuple2 bootstrap = createFileAndFillWithData( testBasePath, "file", NO_OF_FILES + 1, "This is test line."); - Assert.assertTrue(hdfs.exists(bootstrap.f0)); + assertThat(hdfs.exists(bootstrap.f0)).isTrue(); final Set filesToBeRead = new TreeSet<>(); filesToBeRead.add(bootstrap.f0.getName()); @@ -749,7 +743,7 @@ public void run() { monitoringFunction.open(DefaultOpenContext.INSTANCE); monitoringFunction.run(context); } catch (Exception e) { - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } }; @@ -771,7 +765,7 @@ public void run() { // wait until the monitoring thread exits t.join(); - Assert.assertArrayEquals(filesToBeRead.toArray(), context.getSeenFiles().toArray()); + assertThat(context.getSeenFiles()).isEqualTo(filesToBeRead); // finally delete the files created for the test. hdfs.delete(bootstrap.f0, false); @@ -861,13 +855,13 @@ public void collect(TimestampedFileInputSplit element) { new org.apache.hadoop.fs.Path(element.getPath().getPath())) .getModificationTime(); - Assert.assertTrue(modTime >= lastSeenModTime); - Assert.assertEquals(expectedModificationTimes[splitCounter], modTime); + assertThat(modTime).isGreaterThanOrEqualTo(lastSeenModTime); + assertThat(modTime).isEqualTo(expectedModificationTimes[splitCounter]); lastSeenModTime = modTime; splitCounter++; } catch (IOException e) { - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } } @@ -899,7 +893,7 @@ public void close() {} private static int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(6, tkns.length); + assertThat(tkns).hasSize(6); return Integer.parseInt(tkns[tkns.length - 1]); } @@ -916,7 +910,7 @@ private static Tuple2 createFileAndFillWithDa org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileRandSuffix); - Assert.assertFalse(hdfs.exists(file)); + assertThat(hdfs.exists(file)).isFalse(); org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileRandSuffix); @@ -931,7 +925,7 @@ private static Tuple2 createFileAndFillWithDa hdfs.rename(tmp, file); - Assert.assertTrue("No result file present", hdfs.exists(file)); + assertThat(hdfs.exists(file)).as("No result file present").isTrue(); return new Tuple2<>(file, str.toString()); } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java index 7fa404074384f..838466208ad2d 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java @@ -28,17 +28,16 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.NetUtils; -import org.apache.flink.util.TestLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -46,15 +45,13 @@ import java.io.IOException; import java.net.URI; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; /** * Tests for distributing files with {@link org.apache.flink.api.common.cache.DistributedCache} via * HDFS. */ -public class DistributedCacheDfsTest extends TestLogger { +class DistributedCacheDfsTest { private static final String testFileContent = "Goethe - Faust: Der Tragoedie erster Teil\n" @@ -74,11 +71,11 @@ public class DistributedCacheDfsTest extends TestLogger { + "Da flammt ein blitzendes Verheeren Dem Pfade vor des Donnerschlags. Doch\n" + "deine Boten, Herr, verehren Das sanfte Wandeln deines Tags."; - @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + @TempDir static File dataDir; - @ClassRule - public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - new MiniClusterWithClientResource( + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) @@ -90,10 +87,8 @@ public class DistributedCacheDfsTest extends TestLogger { private static Path testFile; private static Path testDir; - @BeforeClass - public static void setup() throws Exception { - File dataDir = TEMP_FOLDER.newFolder(); - + @BeforeAll + static void setup() throws Exception { conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); hdfsCluster = builder.build(); @@ -123,13 +118,13 @@ private static Path writeFile(FileSystem dfs, Path rootDir, String fileName) return file; } - @AfterClass - public static void teardown() { + @AfterAll + static void teardown() { hdfsCluster.shutdown(); } @Test - public void testDistributedFileViaDFS() throws Exception { + void testDistributedFileViaDFS() throws Exception { createJobWithRegisteredCachedFiles().execute("Distributed Cache Via Blob Test Program"); } @@ -139,7 +134,7 @@ public void testDistributedFileViaDFS() throws Exception { * cover this cases. */ @Test - public void testSubmittingJobViaRestClusterClient() throws Exception { + void testSubmittingJobViaRestClusterClient() throws Exception { RestClusterClient restClusterClient = new RestClusterClient<>( MINI_CLUSTER_RESOURCE.getClientConfiguration(), @@ -158,7 +153,7 @@ public void testSubmittingJobViaRestClusterClient() throws Exception { jobResult.getSerializedThrowable().isPresent() ? jobResult.getSerializedThrowable().get().getFullStringifiedStackTrace() : "Job failed."; - assertTrue(messageInCaseOfFailure, jobResult.isSuccess()); + assertThat(jobResult.isSuccess()).as(messageInCaseOfFailure).isTrue(); } private StreamExecutionEnvironment createJobWithRegisteredCachedFiles() { @@ -183,19 +178,19 @@ public String map(Integer value) throws Exception { getRuntimeContext().getDistributedCache().getFile("test_data").toURI()); Path path = new Path(actualFile.toUri()); - assertFalse(path.getFileSystem().isDistributedFS()); + assertThat(path.getFileSystem().isDistributedFS()).isFalse(); DataInputStream in = new DataInputStream(actualFile.getFileSystem().open(actualFile)); String contents = in.readUTF(); - assertEquals(testFileContent, contents); + assertThat(contents).isEqualTo(testFileContent); final Path actualDir = new Path(getRuntimeContext().getDistributedCache().getFile("test_dir").toURI()); FileStatus fileStatus = actualDir.getFileSystem().getFileStatus(actualDir); - assertTrue(fileStatus.isDir()); + assertThat(fileStatus.isDir()).isTrue(); FileStatus[] fileStatuses = actualDir.getFileSystem().listStatus(actualDir); - assertEquals(2, fileStatuses.length); + assertThat(fileStatuses).hasSize(2); return contents; } diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 4331b16ed6c6d..2ec94a8a6b396 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory; import org.apache.flink.streaming.api.legacy.io.TextOutputFormat; import org.apache.flink.streaming.examples.wordcount.WordCount; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.OperatingSystem; @@ -45,14 +46,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; @@ -60,31 +58,33 @@ import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assumptions.assumeThat; /** * This test should logically be located in the 'flink-runtime' tests. However, this project has * already all dependencies required (flink-examples-streaming). Also, the ParallelismOneExecEnv is * here. */ -public class HDFSTest { +class HDFSTest { protected String hdfsURI; private MiniDFSCluster hdfsCluster; private org.apache.hadoop.fs.Path hdPath; protected org.apache.hadoop.fs.FileSystem hdfs; - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir java.nio.file.Path temporaryFolder; - @BeforeClass - public static void verifyOS() { - Assume.assumeTrue( - "HDFS cluster cannot be started on Windows without extensions.", - !OperatingSystem.isWindows()); + @BeforeAll + static void verifyOS() { + assumeThat(OperatingSystem.isWindows()) + .as("HDFS cluster cannot be started on Windows without extensions.") + .isFalse(); } - @Before - public void createHDFS() { + @BeforeEach + void createHDFS() { try { Configuration hdConf = new Configuration(); @@ -111,12 +111,12 @@ public void createHDFS() { } catch (Throwable e) { e.printStackTrace(); - Assert.fail("Test failed " + e.getMessage()); + fail("Test failed " + e.getMessage()); } } - @After - public void destroyHDFS() { + @AfterEach + void destroyHDFS() { try { hdfs.delete(hdPath, false); hdfsCluster.shutdown(); @@ -126,13 +126,13 @@ public void destroyHDFS() { } @Test - public void testHDFS() { + void testHDFS() { Path file = new Path(hdfsURI + hdPath); org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result"); try { FileSystem fs = file.getFileSystem(); - assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem); + assertThat(fs).as("Must be HadoopFileSystem").isInstanceOf(HadoopFileSystem.class); DopOneTestEnvironment.setAsContext(); try { @@ -144,12 +144,12 @@ public void testHDFS() { }); } catch (Throwable t) { t.printStackTrace(); - Assert.fail("Test failed with " + t.getMessage()); + fail("Test failed with " + t.getMessage()); } finally { DopOneTestEnvironment.unsetAsContext(); } - assertTrue("No result file present", hdfs.exists(result)); + assertThat(hdfs.exists(result)).as("No result file present").isTrue(); // validate output: StringWriter writer = new StringWriter(); @@ -163,16 +163,16 @@ public void testHDFS() { String resultString = writer.toString(); - Assert.assertEquals("(hdfs,10)\n" + "(hello,10)\n", resultString); + assertThat(resultString).isEqualTo("(hdfs,10)\n" + "(hello,10)\n"); } catch (IOException e) { e.printStackTrace(); - Assert.fail("Error in test: " + e.getMessage()); + fail("Error in test: " + e.getMessage()); } } @Test - public void testChangingFileNames() { + void testChangingFileNames() { org.apache.hadoop.fs.Path hdfsPath = new org.apache.hadoop.fs.Path(hdfsURI + "/hdfsTest"); Path path = new Path(hdfsPath.toString()); @@ -191,18 +191,15 @@ public void testChangingFileNames() { outputFormat.writeRecord(type); outputFormat.close(); - assertTrue("No result file present", hdfs.exists(hdfsPath)); + assertThat(hdfs.exists(hdfsPath)).as("No result file present").isTrue(); FileStatus[] files = hdfs.listStatus(hdfsPath); - Assert.assertEquals(2, files.length); - for (FileStatus file : files) { - assertTrue( - "1".equals(file.getPath().getName()) - || "2".equals(file.getPath().getName())); - } + assertThat(files) + .extracting(file -> file.getPath().getName()) + .containsExactlyInAnyOrder("1", "2"); } catch (IOException e) { e.printStackTrace(); - Assert.fail(e.getMessage()); + fail(e.getMessage()); } } @@ -212,19 +209,20 @@ public void testChangingFileNames() { * org.apache.flink.runtime.blob.BlobServer} directly. */ @Test - public void testBlobServerRecovery() throws Exception { + void testBlobServerRecovery() throws Exception { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.set( - BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServerOptions.STORAGE_DIRECTORY, + TempDirUtils.newFolder(temporaryFolder).getAbsolutePath()); config.set(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); try { TestingBlobHelpers.testBlobServerRecovery( - config, blobStoreService, temporaryFolder.newFolder()); + config, blobStoreService, TempDirUtils.newFolder(temporaryFolder)); } finally { blobStoreService.cleanupAllData(); blobStoreService.close(); @@ -236,19 +234,20 @@ public void testBlobServerRecovery() throws Exception { * recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}. */ @Test - public void testBlobServerCorruptedFile() throws Exception { + void testBlobServerCorruptedFile() throws Exception { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.set( - BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServerOptions.STORAGE_DIRECTORY, + TempDirUtils.newFolder(temporaryFolder).getAbsolutePath()); config.set(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); try { TestingBlobHelpers.testGetFailsFromCorruptFile( - config, blobStoreService, temporaryFolder.newFolder()); + config, blobStoreService, TempDirUtils.newFolder(temporaryFolder)); } finally { blobStoreService.cleanupAllData(); blobStoreService.close(); @@ -260,19 +259,20 @@ public void testBlobServerCorruptedFile() throws Exception { * any participating BlobServer when uploaded via a BLOB cache. */ @Test - public void testBlobCacheRecovery() throws Exception { + void testBlobCacheRecovery() throws Exception { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.set( - BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServerOptions.STORAGE_DIRECTORY, + TempDirUtils.newFolder(temporaryFolder).getAbsolutePath()); config.set(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); try { TestingBlobHelpers.testBlobCacheRecovery( - config, blobStoreService, temporaryFolder.newFolder()); + config, blobStoreService, TempDirUtils.newFolder(temporaryFolder)); } finally { blobStoreService.cleanupAllData(); blobStoreService.close(); @@ -284,19 +284,20 @@ public void testBlobCacheRecovery() throws Exception { * recognised during the download via a BLOB cache. */ @Test - public void testBlobCacheCorruptedFile() throws Exception { + void testBlobCacheCorruptedFile() throws Exception { org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration(); config.set(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); config.set( - BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); + BlobServerOptions.STORAGE_DIRECTORY, + TempDirUtils.newFolder(temporaryFolder).getAbsolutePath()); config.set(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config); try { TestingBlobHelpers.testGetFailsFromCorruptFile( - new JobID(), config, blobStoreService, temporaryFolder.newFolder()); + new JobID(), config, blobStoreService, TempDirUtils.newFolder(temporaryFolder)); } finally { blobStoreService.cleanupAllData(); blobStoreService.close();