From 0c00e82ef316e0700ce54ed13a80c1d6ea0d99a4 Mon Sep 17 00:00:00 2001 From: Purushottam Sinha Date: Mon, 1 Jun 2026 21:40:12 +0530 Subject: [PATCH 1/2] [FLINK-39769][JUnit5 Migration] Migrate the JUnit5 for Module: flink-examples-streaming Migrate the remaining JUnit 4 tests in flink-examples-streaming to JUnit 5: - StreamingExamplesITCase and SocketWindowWordCountITCase: replace @RunWith(Parameterized.class) with native JUnit 5 @ParameterizedTest + @ValueSource on the tests that read asyncState, and move the base class from AbstractTestBaseJUnit4 to AbstractTestBase. In StreamingExamplesITCase the two tests that don't use asyncState become plain @Test. - TopSpeedWindowingExampleITCase: drop TestLogger and replace the @ClassRule MiniClusterWithClientResource / TemporaryFolder with @RegisterExtension MiniClusterExtension and @TempDir. - Convert the remaining org.junit.Assert.fail to AssertJ. --- .../test/StreamingExamplesITCase.java | 34 +++++++------------ .../TopSpeedWindowingExampleITCase.java | 28 +++++++-------- .../socket/SocketWindowWordCountITCase.java | 26 +++++--------- 3 files changed, 35 insertions(+), 53 deletions(-) diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java index afba4f53369e9..6cf99c4994b85 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java @@ -34,34 +34,24 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.test.examples.join.WindowJoinData; import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.commons.io.FileUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.file.Files; -import java.util.Arrays; -import java.util.Collection; import static org.apache.flink.test.util.TestBaseUtils.checkLinesAgainstRegexp; import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory; /** Integration test for streaming programs in Java examples. */ -@RunWith(Parameterized.class) -public class StreamingExamplesITCase extends AbstractTestBaseJUnit4 { - - @Parameterized.Parameter public boolean asyncState; - - @Parameterized.Parameters - public static Collection setup() { - return Arrays.asList(false, true); - } +class StreamingExamplesITCase extends AbstractTestBase { @Test - public void testWindowJoin() throws Exception { + void testWindowJoin() throws Exception { final String resultPath = Files.createTempDirectory("result-path").toUri().toString(); @@ -111,7 +101,7 @@ public Tuple2 map(String value) throws Exception { } @Test - public void testSessionWindowing() throws Exception { + void testSessionWindowing() throws Exception { final String resultPath = getTempDirPath("result"); org.apache.flink.streaming.examples.windowing.SessionWindowing.main( new String[] {"--output", resultPath}); @@ -120,8 +110,9 @@ public void testSessionWindowing() throws Exception { // state here. } - @Test - public void testWindowWordCount() throws Exception { + @ParameterizedTest(name = "asyncState: {0}") + @ValueSource(booleans = {false, true}) + void testWindowWordCount(boolean asyncState) throws Exception { final String windowSize = "25"; final String slideSize = "15"; final String textPath = createTempFile("text.txt", WordCountData.TEXT); @@ -153,8 +144,9 @@ public void testWindowWordCount() throws Exception { checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)"); } - @Test - public void testWordCount() throws Exception { + @ParameterizedTest(name = "asyncState: {0}") + @ValueSource(booleans = {false, true}) + void testWordCount(boolean asyncState) throws Exception { final String textPath = createTempFile("text.txt", WordCountData.TEXT); final String resultPath = getTempDirPath("result"); diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java index 72dd508de25e2..5d578fa2a5e60 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java @@ -20,37 +20,37 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing; import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; -import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.util.FileUtils; -import org.apache.flink.util.TestLogger; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory; /** Tests for {@link TopSpeedWindowing}. */ -public class TopSpeedWindowingExampleITCase extends TestLogger { +class TopSpeedWindowingExampleITCase { - @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - - @ClassRule - public static MiniClusterWithClientResource miniClusterResource = - new MiniClusterWithClientResource( + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_EXTENSION = + new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) .build()); + @TempDir static File temporaryFolder; + @Test - public void testTopSpeedWindowingExampleITCase() throws Exception { - File inputFile = temporaryFolder.newFile(); + void testTopSpeedWindowingExampleITCase() throws Exception { + File inputFile = new File(temporaryFolder, "input"); + inputFile.createNewFile(); FileUtils.writeFileUtf8(inputFile, TopSpeedWindowingExampleData.CAR_DATA); - final String resultPath = temporaryFolder.newFolder().toURI().toString(); + final String resultPath = new File(temporaryFolder, "result").toURI().toString(); TopSpeedWindowing.main( new String[] { diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java index 94bd5b9c4e0c6..18b83f8f2c51a 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java @@ -20,12 +20,11 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.streaming.examples.socket.SocketWindowWordCount; import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.AbstractTestBaseJUnit4; +import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.NetUtils; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -35,24 +34,15 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; -import java.util.Arrays; -import java.util.Collection; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.fail; /** Tests for {@link SocketWindowWordCount}. */ -@RunWith(Parameterized.class) -public class SocketWindowWordCountITCase extends AbstractTestBaseJUnit4 { +class SocketWindowWordCountITCase extends AbstractTestBase { - @Parameterized.Parameter public boolean asyncState; - - @Parameterized.Parameters - public static Collection setup() { - return Arrays.asList(false, true); - } - - @Test - public void testJavaProgram() throws Exception { + @ParameterizedTest(name = "asyncState: {0}") + @ValueSource(booleans = {false, true}) + void testJavaProgram(boolean asyncState) throws Exception { InetAddress localhost = InetAddress.getByName("localhost"); // suppress sysout messages from this example From ac6e2e02bdcd97720b928a80c0e9a28295315749 Mon Sep 17 00:00:00 2001 From: Purushottam Sinha Date: Tue, 23 Jun 2026 01:28:06 +0530 Subject: [PATCH 2/2] [FLINK-39769][tests] Register TestLoggerExtension via service loader for flink-examples-streaming Restores per-test logging after dropping `extends TestLogger` in the JUnit5 migration; matches the pattern used by other migrated modules. No test-jar exclude needed since this module's test-jar is not consumed elsewhere. --- .../org.junit.jupiter.api.extension.Extension | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 flink-examples/flink-examples-streaming/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension diff --git a/flink-examples/flink-examples-streaming/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-examples/flink-examples-streaming/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension new file mode 100644 index 0000000000000..75ae4ae1db36e --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.util.TestLoggerExtension +org.apache.flink.util.TestNameProviderExtension