Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.functions.source.legacy.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.legacy.io.TextInputFormat;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
Expand All @@ -57,13 +56,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/**
* IT cases for the {@link ContinuousFileMonitoringFunction} and {@link
* ContinuousFileReaderOperator}.
*/
public class ContinuousFileProcessingITCase extends AbstractTestBaseJUnit4 {
class ContinuousFileProcessingITCase extends AbstractTestBase {

private static final int NO_OF_FILES = 5;
private static final int LINES_PER_FILE = 100;
Expand All @@ -81,8 +81,8 @@ public class ContinuousFileProcessingITCase extends AbstractTestBaseJUnit4 {

// PREPARING FOR THE TESTS

@Before
public void createHDFS() throws IOException {
@BeforeEach
void createHDFS() throws IOException {
baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);

Expand All @@ -102,16 +102,16 @@ public void createHDFS() throws IOException {
hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
}

@After
public void destroyHDFS() {
@AfterEach
void destroyHDFS() {
FileUtil.fullyDelete(baseDir);
hdfsCluster.shutdown();
}

// END OF PREPARATIONS

@Test
public void testProgram() throws Exception {
void testProgram() throws Exception {

/*
* This test checks the interplay between the monitor and the reader
Expand All @@ -138,7 +138,7 @@ public void testProgram() throws Exception {

// the monitor has always DOP 1
DataStream<TimestampedFileInputSplit> splits = env.addSource(monitoringFunction);
Assert.assertEquals(1, splits.getParallelism());
assertThat(splits.getParallelism()).isOne();

TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);

Expand All @@ -148,7 +148,7 @@ public void testProgram() throws Exception {
"FileSplitReader",
typeInfo,
new ContinuousFileReaderOperatorFactory<>(format));
Assert.assertEquals(PARALLELISM, content.getParallelism());
assertThat(content.getParallelism()).isEqualTo(PARALLELISM);

// finally for the sink we set the parallelism to 1 so that we can verify the output
TestingSinkFunction sink = new TestingSinkFunction();
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testProgram() throws Exception {

org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
hdfs.rename(tmpFile.f0, file);
Assert.assertTrue(hdfs.exists(file));
assertThat(hdfs.exists(file)).isTrue();
}

jobFuture.get();
Expand All @@ -217,7 +217,7 @@ private static class TestingSinkFunction extends RichSinkFunction<String> {
@Override
public void open(OpenContext openContext) throws Exception {
// this sink can only work with DOP 1
assertEquals(1, getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
assertThat(getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks()).isOne();

comparator =
new Comparator<String>() {
Expand All @@ -239,7 +239,7 @@ public void invoke(String value) throws Exception {
}

if (!content.add(value + "\n")) {
Assert.fail("Duplicate line: " + value);
fail("Duplicate line: " + value);
System.exit(0);
}

Expand All @@ -252,9 +252,9 @@ public void invoke(String value) throws Exception {
@Override
public void close() {
// check if the data that we collected are the ones they are supposed to be.
Assert.assertEquals(expectedContents.size(), actualContent.size());
assertThat(actualContent).hasSameSizeAs(expectedContents);
for (Integer fileIdx : expectedContents.keySet()) {
Assert.assertTrue(actualContent.keySet().contains(fileIdx));
assertThat(actualContent).containsKey(fileIdx);

List<String> cntnt = new ArrayList<>(actualContent.get(fileIdx));
Collections.sort(cntnt, comparator);
Expand All @@ -263,7 +263,7 @@ public void close() {
for (String line : cntnt) {
cntntStr.append(line);
}
Assert.assertEquals(expectedContents.get(fileIdx), cntntStr.toString());
assertThat(cntntStr).hasToString(expectedContents.get(fileIdx));
}
expectedContents.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,16 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.test.util.MigrationTest;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.OperatingSystem;

import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -62,33 +61,30 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

/** Tests that verify the migration from previous Flink version snapshots. */
@RunWith(Parameterized.class)
public class ContinuousFileProcessingMigrationTest implements MigrationTest {
@ExtendWith(ParameterizedTestExtension.class)
class ContinuousFileProcessingMigrationTest implements MigrationTest {

private static final int LINES_PER_FILE = 10;

private static final long INTERVAL = 100;

@Parameterized.Parameters(name = "Migration Savepoint: {0}")
@Parameters(name = "Migration Savepoint: {0}")
public static Collection<FlinkVersion> parameters() {
return FlinkVersion.rangeOf(
FlinkVersion.v1_8, MigrationTest.getMostRecentlyPublishedVersion());
}

private final FlinkVersion testMigrateVersion;

public ContinuousFileProcessingMigrationTest(FlinkVersion testMigrateVersion) {
this.testMigrateVersion = testMigrateVersion;
}

@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
@Parameter private FlinkVersion testMigrateVersion;

@BeforeClass
public static void verifyOS() {
Assume.assumeTrue(
"HDFS cluster cannot be start on Windows without extensions.",
!OperatingSystem.isWindows());
@BeforeAll
static void verifyOS() {
assumeThat(OperatingSystem.isWindows())
.as("HDFS cluster cannot be started on Windows without extensions.")
.isFalse();
}

@SnapshotsGenerator
Expand Down Expand Up @@ -138,10 +134,8 @@ public void writeReaderSnapshot(FlinkVersion flinkGenerateSavepointVersion) thro
+ "-snapshot");
}

@Test
public void testReaderRestore() throws Exception {
File testFolder = tempFolder.newFolder();

@TestTemplate
void testReaderRestore(@TempDir File testFolder) throws Exception {
final OneShotLatch latch = new OneShotLatch();

BlockingFileInputFormat format =
Expand Down Expand Up @@ -182,17 +176,17 @@ public void testReaderRestore() throws Exception {
// compare if the results contain what they should contain and also if
// they are the same, as they should.

Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1)));
Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2)));
Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3)));
Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4)));
assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split1));
assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split2));
assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split3));
assertThat(testHarness.getOutput()).contains(new StreamRecord<>(split4));
}

@SnapshotsGenerator
public void writeMonitoringSourceSnapshot(FlinkVersion flinkGenerateSavepointVersion)
throws Exception {

File testFolder = tempFolder.newFolder();
File testFolder = Files.createTempDirectory("tmpDirPrefix").toFile();

long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
Expand Down Expand Up @@ -266,10 +260,8 @@ public void markAsTemporarilyIdle() {}
testHarness.close();
}

@Test
public void testMonitoringSourceRestore() throws Exception {

File testFolder = tempFolder.newFolder();
@TestTemplate
void testMonitoringSourceRestore(@TempDir File testFolder) throws Exception {

TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));

Expand All @@ -292,8 +284,8 @@ public void testMonitoringSourceRestore() throws Exception {

testHarness.open();

Assert.assertEquals(
fileNameAndModTime.f1.longValue(), monitoringFunction.getGlobalModificationTime());
assertThat(monitoringFunction.getGlobalModificationTime())
.isEqualTo(fileNameAndModTime.f1.longValue());
}

private Tuple2<String, Long> getResourceFilename(FlinkVersion version) throws IOException {
Expand Down Expand Up @@ -387,7 +379,7 @@ private Tuple2<File, String> createFileAndFillWithData(
File base, String fileName, int fileIdx, String sampleLine) throws IOException {

File file = new File(base, fileName + fileIdx);
Assert.assertFalse(file.exists());
assertThat(file.exists()).isFalse();

File tmp = new File(base, "." + fileName + fileIdx);
FileOutputStream stream = new FileOutputStream(tmp);
Expand All @@ -401,7 +393,7 @@ private Tuple2<File, String> createFileAndFillWithData(

FileUtils.moveFile(tmp, file);

Assert.assertTrue("No result file present", file.exists());
assertThat(file.exists()).as("No result file present").isTrue();
return new Tuple2<>(file, str.toString());
}

Expand Down
Loading