@@ -445,8 +445,20 @@ public void farmerTaskPropogatesException() {
445445 bulkProcessor .add (43 , addTimeoutMs );
446446
447447 Runnable farmer = bulkProcessor .farmerTask ();
448- ConnectException e = assertThrows (
449- ConnectException .class , () -> farmer .run ());
448+ ConnectException e = assertThrows (ConnectException .class , () -> {
449+ farmer .run ();
450+ // There's a small race condition in the farmer task where a failure on a batch thread
451+ // causes the stopRequested flag of the BulkProcessor to get set, and subsequently checked
452+ // on the farmer task thread, before the batch thread has time to actually complete and
453+ // throw an exception. When this happens, the invocation of farmer::run does not throw an
454+ // exception. However, we can still verify that a batch failed and that the bulk processor
455+ // is in the expected state by invoking BulkProcessor::throwIfFailed, which throws the first
456+ // error encountered on any batch thread. Even if the aforementioned race condition occurs,
457+ // the error should still have been captured by the bulk processor and if it is not present
458+ // by this point, it is a legitimate sign of a bug in the processor instead of a benign race
459+ // condition that just makes testing a little more complicated.
460+ bulkProcessor .throwIfFailed ();
461+ });
450462 assertThat (e .getMessage (), containsString (errorInfo ));
451463 }
452464
0 commit comments