@@ -447,8 +447,20 @@ public void farmerTaskPropogatesException() {
447447 bulkProcessor .add (43 , addTimeoutMs );
448448
449449 Runnable farmer = bulkProcessor .farmerTask ();
450- ConnectException e = assertThrows (
451- ConnectException .class , () -> farmer .run ());
450+ ConnectException e = assertThrows (ConnectException .class , () -> {
451+ farmer .run ();
452+ // There's a small race condition in the farmer task where a failure on a batch thread
453+ // causes the stopRequested flag of the BulkProcessor to get set, and subsequently checked
454+ // on the farmer task thread, before the batch thread has time to actually complete and
455+ // throw an exception. When this happens, the invocation of farmer::run does not throw an
456+ // exception. However, we can still verify that a batch failed and that the bulk processor
457+ // is in the expected state by invoking BulkProcessor::throwIfFailed, which throws the first
458+ // error encountered on any batch thread. Even if the aforementioned race condition occurs,
459+ // the error should still have been captured by the bulk processor and if it is not present
460+ // by this point, it is a legitimate sign of a bug in the processor instead of a benign race
461+ // condition that just makes testing a little more complicated.
462+ bulkProcessor .throwIfFailed ();
463+ });
452464 assertThat (e .getMessage (), containsString (errorInfo ));
453465 }
454466
0 commit comments