Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,22 @@ public void testCloseBlockingWaitingForFetcherShutdown() throws Exception {
closingThread.start();

waitUntil(
() -> findThread(SplitFetcherManager.THREAD_NAME_PREFIX).size() == 2,
() -> findThread(SplitFetcherManager.THREAD_NAME_PREFIX).size() >= 2,
Duration.ofSeconds(30),
"The element queue draining thread should have started.");
for (Thread t : findThread(SplitFetcherManager.THREAD_NAME_PREFIX)) {
// The intent of this check is to ensure no executor thread is tight-looping while
// close() is in progress. A terminated thread is also acceptable: it has finished
// its work and is no longer consuming CPU. If a thread is still RUNNABLE or BLOCKED
// after 30 seconds, waitUntil will fail the test, surfacing the regression we want
// to catch.
waitUntil(
() ->
t.getState().equals(Thread.State.WAITING)
!t.isAlive()
|| t.getState().equals(Thread.State.WAITING)
Comment thread
Dennis-Mircea marked this conversation as resolved.
|| t.getState().equals(Thread.State.TIMED_WAITING),
Duration.ofSeconds(30),
"All the executor threads should be in waiting status.");
"Each executor thread should be terminated or in a (timed) waiting state.");
}

assertThat(fetcherManager.getQueue().getAvailabilityFuture().getNumberOfDependents())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand Down Expand Up @@ -86,6 +85,7 @@ public class AbstractAsyncRunnableStreamOperatorTest {
}

@Test
@SuppressWarnings({"rawtypes"})
void testCreateAsyncExecutionController() throws Exception {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
Expand Down Expand Up @@ -151,10 +151,9 @@ void testAsyncProcessWithKey() throws Exception {
TestOperatorWithAsyncProcessWithKey testOperator =
new TestOperatorWithAsyncProcessWithKey(
new TestKeySelector(), ElementOrder.RECORD_ORDER);
AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
try {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0)) {
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
testHarness.open();
CompletableFuture<Void> future =
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
Expand All @@ -173,8 +172,6 @@ void testAsyncProcessWithKey() throws Exception {
// We don't have the mailbox executor actually running, so the new context is blocked
// and never triggered.
assertThat(testOperator.getProcessed()).isEqualTo(1);
} finally {
testHarness.close();
}
}

Expand All @@ -183,19 +180,15 @@ void testDirectAsyncProcess() throws Exception {
TestOperatorWithDirectAsyncProcess testOperator =
new TestOperatorWithDirectAsyncProcess(
new TestKeySelector(), ElementOrder.RECORD_ORDER);
AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
try {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0)) {
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
testHarness.open();
CompletableFuture<Void> future =
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));

testHarness.drainAsyncRequests();
assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
assertThat(testOperator.getProcessed()).isEqualTo(1);
} finally {
testHarness.close();
}
}

Expand All @@ -207,10 +200,9 @@ void testManyAsyncProcessWithKey() throws Exception {
TestOperatorWithMultipleDirectAsyncProcess testOperator =
new TestOperatorWithMultipleDirectAsyncProcess(
new TestKeySelector(), ElementOrder.RECORD_ORDER, requests);
AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0);
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
try {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
AsyncOneInputStreamOperatorTestHarness.create(testOperator, 128, 1, 0)) {
testHarness.setStateBackend(buildAsyncStateBackend(new HashMapStateBackend()));
testHarness.open();

// Repeat twice
Expand All @@ -227,12 +219,11 @@ void testManyAsyncProcessWithKey() throws Exception {
// This ensures the order is correct according to the priority in AEC.
assertThat(testOperator.getProcessedOrders())
.isEqualTo(testOperator.getExpectedProcessedOrders());
} finally {
testHarness.close();
}
}

@Test
@SuppressWarnings({"rawtypes", "unchecked"})
void testCheckpointDrain() throws Exception {
try (AsyncOneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
Expand All @@ -244,14 +235,20 @@ void testCheckpointDrain() throws Exception {
((AbstractAsyncRunnableStreamOperator<String>) testHarness.getOperator())
.setAsyncKeyedContextElement(
new StreamRecord<>(Tuple2.of(5, "5")), new TestKeySelector());
// Block the async processing supplier until we have observed the in-flight record,
// otherwise the request can complete before the assertion runs and make the test
// flaky.
CompletableFuture<Void> blocker = new CompletableFuture<>();
((AbstractAsyncRunnableStreamOperator<String>) testHarness.getOperator())
.asyncProcess(
() -> {
blocker.get();
return null;
});
((AbstractAsyncRunnableStreamOperator<String>) testHarness.getOperator())
.postProcessElement();
assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
blocker.complete(null);
testHarness.drainAsyncRequests();
assertThat(asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
}
Expand Down Expand Up @@ -325,9 +322,7 @@ void testWatermarkHooks() throws Exception {
});

testOperator.setPostProcessFunction(
(watermark) -> {
testOperator.output(watermark.getTimestamp() + 100L);
});
(watermark) -> testOperator.output(watermark.getTimestamp() + 100L));

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
try (AsyncKeyedTwoInputStreamOperatorTestHarness<Integer, Long, Long, Long> testHarness =
Expand Down Expand Up @@ -372,8 +367,6 @@ void testWatermarkStatus() throws Exception {
createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) {
testHarness.open();
TestOperator testOperator = (TestOperator) testHarness.getOperator();
ThrowingConsumer<StreamRecord<Tuple2<Integer, String>>, Exception> processor =
RecordProcessorUtils.getRecordProcessor(testOperator);
testHarness.processElementInternal(new StreamRecord<>(Tuple2.of(5, "5")));
testHarness.processWatermarkInternal(new Watermark(205L));
CompletableFuture<Void> future =
Expand Down Expand Up @@ -480,8 +473,7 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
synchronized (objectToWait) {
objectToWait.wait();
}
asyncProcess(() -> processed.decrementAndGet())
.thenAccept((a) -> processed.incrementAndGet());
asyncProcess(processed::decrementAndGet).thenAccept((a) -> processed.incrementAndGet());
}

@Override
Expand All @@ -499,15 +491,15 @@ protected void processWatermarkStatus(WatermarkStatus watermarkStatus, int index
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
"EventTimer-" + timer.getKey() + "-" + timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(
new StreamRecord<>(
Expand Down Expand Up @@ -538,11 +530,7 @@ private static class TestOperatorWithAsyncProcessWithKey extends TestOperator {

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
asyncProcessWithKey(
element.getValue().f0,
() -> {
processed.incrementAndGet();
});
asyncProcessWithKey(element.getValue().f0, processed::incrementAndGet);
synchronized (objectToWait) {
objectToWait.wait();
}
Expand All @@ -558,7 +546,7 @@ private static class TestOperatorWithDirectAsyncProcess extends TestOperator {
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
public void processElement(StreamRecord<Tuple2<Integer, String>> element) {
asyncProcess(processed::decrementAndGet).thenAccept((e) -> processed.addAndGet(2));
}
}
Expand All @@ -579,7 +567,7 @@ private static class TestOperatorWithMultipleDirectAsyncProcess extends TestOper
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
public void processElement(StreamRecord<Tuple2<Integer, String>> element) {
for (int i = 0; i < numAsyncProcesses; i++) {
final int finalI = i;
if (i < numAsyncProcesses - 1) {
Expand Down Expand Up @@ -625,17 +613,17 @@ private static class TestOperatorWithAsyncProcessTimer extends TestOperator {
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
public void processElement(StreamRecord<Tuple2<Integer, String>> element) {
processed.incrementAndGet();
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) {
asyncProcessWithKey(timer.getKey(), () -> super.onEventTime(timer));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) {
asyncProcessWithKey(timer.getKey(), () -> super.onProcessingTime(timer));
}
}
Expand Down Expand Up @@ -691,23 +679,23 @@ public Watermark postProcessWatermark(Watermark watermark) throws Exception {
}

@Override
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
output.collect(new StreamRecord<>(timer.getTimestamp()));
}

@Override
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) {
assertThat(getCurrentKey()).isEqualTo(timer.getKey());
}

@Override
public void processElement1(StreamRecord<Long> element) throws Exception {
public void processElement1(StreamRecord<Long> element) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue());
}

@Override
public void processElement2(StreamRecord<Long> element) throws Exception {
public void processElement2(StreamRecord<Long> element) {
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, element.getValue());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,21 @@ void testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause(

waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM);

if (enabledRescaleHistory(configuration)) {
// The rescale-history bookkeeping (merging the still-open UPDATE_REQUIREMENT rescale
// with the new RECOVERABLE_FAILOVER one) is recorded asynchronously by the scheduler
// and is not synchronized with the parallelism/RUNNING signal we waited for above.
// Poll until the expected merged state is observed to avoid flakiness.
waitUntilConditionWithTimeout(
() -> {
List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph);
return rescaleHistory.size() == 2
&& rescaleHistory.get(0).getTriggerCause()
== TriggerCause.RECOVERABLE_FAILOVER;
},
10000);
}

final ExecutionGraphInfo executionGraphInfo =
miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
runAdaptedParameterizedAssertion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
Expand Down Expand Up @@ -204,12 +205,27 @@ void testMultipleJobVertexFinishedTaskExceedRatio() throws Exception {
executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()[2];
ev23.getCurrentExecutionAttempt().markFinished();

// Pin the FINISHED timestamp of each baseline task to its DEPLOYING timestamp so that
// the resulting baseline execution time is 0. With a baseline of 0 and a lower bound of
// 0, every still-running task is unconditionally classified as slow. Without this, on
// fast machines the deploy/finish/detect calls can land in the same millisecond and
// leave the running tasks with execution time <= baseline, making the test flaky.
zeroOutFinishedExecutionTime(ev13);
zeroOutFinishedExecutionTime(ev23);

final Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks =
slowTaskDetector.findSlowTasks(executionGraph);

assertThat(slowTasks).hasSize(4);
}

private static void zeroOutFinishedExecutionTime(ExecutionVertex executionVertex) {
final long[] stateTimestamps =
executionVertex.getCurrentExecutionAttempt().getStateTimestamps();
stateTimestamps[ExecutionState.FINISHED.ordinal()] =
stateTimestamps[ExecutionState.DEPLOYING.ordinal()];
}

@Test
void testFinishedTaskExceedRatioInDynamicGraph() throws Exception {
final int parallelism = 3;
Expand Down