diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java index 03f9df713a80a..20e5826740ecb 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java @@ -129,16 +129,22 @@ public void testCloseBlockingWaitingForFetcherShutdown() throws Exception { closingThread.start(); waitUntil( - () -> findThread(SplitFetcherManager.THREAD_NAME_PREFIX).size() == 2, + () -> findThread(SplitFetcherManager.THREAD_NAME_PREFIX).size() >= 2, Duration.ofSeconds(30), "The element queue draining thread should have started."); for (Thread t : findThread(SplitFetcherManager.THREAD_NAME_PREFIX)) { + // The intent of this check is to ensure no executor thread is tight-looping while + // close() is in progress. A terminated thread is also acceptable: it has finished + // its work and is no longer consuming CPU. If a thread is still RUNNABLE or BLOCKED + // after 30 seconds, waitUntil will fail the test, surfacing the regression we want + // to catch. waitUntil( () -> - t.getState().equals(Thread.State.WAITING) + !t.isAlive() + || t.getState().equals(Thread.State.WAITING) || t.getState().equals(Thread.State.TIMED_WAITING), Duration.ofSeconds(30), - "All the executor threads should be in waiting status."); + "Each executor thread should be terminated or in a (timed) waiting state."); } assertThat(fetcherManager.getQueue().getAvailabilityFuture().getNumberOfDependents()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncRunnableStreamOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncRunnableStreamOperatorTest.java index dedf01de245d4..1ecbc2768c242 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncRunnableStreamOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncRunnableStreamOperatorTest.java @@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -86,6 +85,7 @@ public class AbstractAsyncRunnableStreamOperatorTest { } @Test + @SuppressWarnings({"rawtypes"}) void testCreateAsyncExecutionController() throws Exception { try (AsyncOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { @@ -151,10 +151,9 @@ void testAsyncProcessWithKey() throws Exception { TestOperatorWithAsyncProcessWithKey testOperator = new TestOperatorWithAsyncProcessWithKey( new TestKeySelector(), ElementOrder.RECORD_ORDER); - AsyncOneInputStreamOperatorTestHarness, String> testHarness = - AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0); - testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); - try { + try (AsyncOneInputStreamOperatorTestHarness, String> testHarness = + AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0)) { + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); testHarness.open(); CompletableFuture future = testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5"))); @@ -173,8 +172,6 @@ void testAsyncProcessWithKey() throws Exception { // We don't have the mailbox executor actually running, so the new context is blocked // and never triggered. assertThat(testOperator.getProcessed()).isEqualTo(1); - } finally { - testHarness.close(); } } @@ -183,19 +180,15 @@ void testDirectAsyncProcess() throws Exception { TestOperatorWithDirectAsyncProcess testOperator = new TestOperatorWithDirectAsyncProcess( new TestKeySelector(), ElementOrder.RECORD_ORDER); - AsyncOneInputStreamOperatorTestHarness, String> testHarness = - AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0); - testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); - try { + try (AsyncOneInputStreamOperatorTestHarness, String> testHarness = + AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0)) { + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); testHarness.open(); - CompletableFuture future = - testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5"))); + testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5"))); testHarness.drainAsyncRequests(); assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); assertThat(testOperator.getProcessed()).isEqualTo(1); - } finally { - testHarness.close(); } } @@ -207,10 +200,9 @@ void testManyAsyncProcessWithKey() throws Exception { TestOperatorWithMultipleDirectAsyncProcess testOperator = new TestOperatorWithMultipleDirectAsyncProcess( new TestKeySelector(), ElementOrder.RECORD_ORDER, requests); - AsyncOneInputStreamOperatorTestHarness, String> testHarness = - AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0); - testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); - try { + try (AsyncOneInputStreamOperatorTestHarness, String> testHarness = + AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0)) { + testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend())); testHarness.open(); // Repeat twice @@ -227,12 +219,11 @@ void testManyAsyncProcessWithKey() throws Exception { // This ensures the order is correct according to the priority in AEC. assertThat(testOperator.getProcessedOrders()) .isEqualTo(testOperator.getExpectedProcessedOrders()); - } finally { - testHarness.close(); } } @Test + @SuppressWarnings({"rawtypes", "unchecked"}) void testCheckpointDrain() throws Exception { try (AsyncOneInputStreamOperatorTestHarness, String> testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { @@ -244,14 +235,20 @@ void testCheckpointDrain() throws Exception { ((AbstractAsyncRunnableStreamOperator) testHarness.getOperator()) .setAsyncKeyedContextElement( new StreamRecord<>(Tuple2.of(5, "5")), new TestKeySelector()); + // Block the async processing supplier until we have observed the in-flight record, + // otherwise the request can complete before the assertion runs and make the test + // flaky. + CompletableFuture blocker = new CompletableFuture<>(); ((AbstractAsyncRunnableStreamOperator) testHarness.getOperator()) .asyncProcess( () -> { + blocker.get(); return null; }); ((AbstractAsyncRunnableStreamOperator) testHarness.getOperator()) .postProcessElement(); assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(1); + blocker.complete(null); testHarness.drainAsyncRequests(); assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0); } @@ -325,9 +322,7 @@ void testWatermarkHooks() throws Exception { }); testOperator.setPostProcessFunction( - (watermark) -> { - testOperator.output(watermark.getTimestamp() + 100L); - }); + (watermark) -> testOperator.output(watermark.getTimestamp() + 100L)); ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); try (AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = @@ -372,8 +367,6 @@ void testWatermarkStatus() throws Exception { createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); TestOperator testOperator = (TestOperator) testHarness.getOperator(); - ThrowingConsumer>, Exception> processor = - RecordProcessorUtils.getRecordProcessor(testOperator); testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5"))); testHarness.processWatermarkInternal(new Watermark(205L)); CompletableFuture future = @@ -480,8 +473,7 @@ public void processElement(StreamRecord> element) throws synchronized (objectToWait) { objectToWait.wait(); } - asyncProcess(() -> processed.decrementAndGet()) - .thenAccept((a) -> processed.incrementAndGet()); + asyncProcess(processed::decrementAndGet).thenAccept((a) -> processed.incrementAndGet()); } @Override @@ -499,7 +491,7 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index } @Override - public void onEventTime(InternalTimer timer) throws Exception { + public void onEventTime(InternalTimer timer) { assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect( new StreamRecord<>( @@ -507,7 +499,7 @@ public void onEventTime(InternalTimer timer) throws Exce } @Override - public void onProcessingTime(InternalTimer timer) throws Exception { + public void onProcessingTime(InternalTimer timer) { assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect( new StreamRecord<>( @@ -538,11 +530,7 @@ private static class TestOperatorWithAsyncProcessWithKey extends TestOperator { @Override public void processElement(StreamRecord> element) throws Exception { - asyncProcessWithKey( - element.getValue().f0, - () -> { - processed.incrementAndGet(); - }); + asyncProcessWithKey(element.getValue().f0, processed::incrementAndGet); synchronized (objectToWait) { objectToWait.wait(); } @@ -558,7 +546,7 @@ private static class TestOperatorWithDirectAsyncProcess extends TestOperator { } @Override - public void processElement(StreamRecord> element) throws Exception { + public void processElement(StreamRecord> element) { asyncProcess(processed::decrementAndGet).thenAccept((e) -> processed.addAndGet(2)); } } @@ -579,7 +567,7 @@ private static class TestOperatorWithMultipleDirectAsyncProcess extends TestOper } @Override - public void processElement(StreamRecord> element) throws Exception { + public void processElement(StreamRecord> element) { for (int i = 0; i < numAsyncProcesses; i++) { final int finalI = i; if (i < numAsyncProcesses - 1) { @@ -625,17 +613,17 @@ private static class TestOperatorWithAsyncProcessTimer extends TestOperator { } @Override - public void processElement(StreamRecord> element) throws Exception { + public void processElement(StreamRecord> element) { processed.incrementAndGet(); } @Override - public void onEventTime(InternalTimer timer) throws Exception { + public void onEventTime(InternalTimer timer) { asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer)); } @Override - public void onProcessingTime(InternalTimer timer) throws Exception { + public void onProcessingTime(InternalTimer timer) { asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer)); } } @@ -691,23 +679,23 @@ public Watermark postProcessWatermark(Watermark watermark) throws Exception { } @Override - public void onEventTime(InternalTimer timer) throws Exception { + public void onEventTime(InternalTimer timer) { assertThat(getCurrentKey()).isEqualTo(timer.getKey()); output.collect(new StreamRecord<>(timer.getTimestamp())); } @Override - public void onProcessingTime(InternalTimer timer) throws Exception { + public void onProcessingTime(InternalTimer timer) { assertThat(getCurrentKey()).isEqualTo(timer.getKey()); } @Override - public void processElement1(StreamRecord element) throws Exception { + public void processElement1(StreamRecord element) { timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); } @Override - public void processElement2(StreamRecord element) throws Exception { + public void processElement2(StreamRecord element) { timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java index fd12bc5de7fe7..de55ab317e24e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java @@ -541,6 +541,21 @@ void testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause( waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + if (enabledRescaleHistory(configuration)) { + // The rescale-history bookkeeping (merging the still-open UPDATE_REQUIREMENT rescale + // with the new RECOVERABLE_FAILOVER one) is recorded asynchronously by the scheduler + // and is not synchronized with the parallelism/RUNNING signal we waited for above. + // Poll until the expected merged state is observed to avoid flakiness. + waitUntilConditionWithTimeout( + () -> { + List rescaleHistory = getRescaleHistory(miniCluster, jobGraph); + return rescaleHistory.size() == 2 + && rescaleHistory.get(0).getTriggerCause() + == TriggerCause.RECOVERABLE_FAILOVER; + }, + 10000); + } + final ExecutionGraphInfo executionGraphInfo = miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); runAdaptedParameterizedAssertion( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java index 5ec9baeb3ccd0..db8b3eac011d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SlowTaskDetectorOptions; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; @@ -204,12 +205,27 @@ void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception { executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2]; ev23.getCurrentExecutionAttempt().markFinished(); + // Pin the FINISHED timestamp of each baseline task to its DEPLOYING timestamp so that + // the resulting baseline execution time is 0. With a baseline of 0 and a lower bound of + // 0, every still-running task is unconditionally classified as slow. Without this, on + // fast machines the deploy/finish/detect calls can land in the same millisecond and + // leave the running tasks with execution time <= baseline, making the test flaky. + zeroOutFinishedExecutionTime(ev13); + zeroOutFinishedExecutionTime(ev23); + final Map> slowTasks = slowTaskDetector.findSlowTasks(executionGraph); assertThat(slowTasks).hasSize(4); } + private static void zeroOutFinishedExecutionTime(ExecutionVertex executionVertex) { + final long[] stateTimestamps = + executionVertex.getCurrentExecutionAttempt().getStateTimestamps(); + stateTimestamps[ExecutionState.FINISHED.ordinal()] = + stateTimestamps[ExecutionState.DEPLOYING.ordinal()]; + } + @Test void testFinishedTaskExceedRatioInDynamicGraph() throws Exception { final int parallelism = 3;