@@ -446,8 +446,20 @@ public void farmerTaskPropogatesException() {
446446 bulkProcessor .add (43 , addTimeoutMs );
447447
448448 Runnable farmer = bulkProcessor .farmerTask ();
449- ConnectException e = assertThrows (
450- ConnectException .class , () -> farmer .run ());
449+ ConnectException e = assertThrows (ConnectException .class , () -> {
450+ farmer .run ();
451+ // There's a small race condition in the farmer task where a failure on a batch thread
452+ // causes the stopRequested flag of the BulkProcessor to get set, and subsequently checked
453+ // on the farmer task thread, before the batch thread has time to actually complete and
454+ // throw an exception. When this happens, the invocation of farmer::run does not throw an
455+ // exception. However, we can still verify that a batch failed and that the bulk processor
456+ // is in the expected state by invoking BulkProcessor::throwIfFailed, which throws the first
457+ // error encountered on any batch thread. Even if the aforementioned race condition occurs,
458+ // the error should still have been captured by the bulk processor and if it is not present
459+ // by this point, it is a legitimate sign of a bug in the processor instead of a benign race
460+ // condition that just makes testing a little more complicated.
461+ bulkProcessor .throwIfFailed ();
462+ });
451463 assertThat (e .getMessage (), containsString (errorInfo ));
452464 }
453465
0 commit comments