Skip to content

Commit 1df026d

Browse files
committed
Merge branch '5.2.x' into 5.3.x
2 parents 72c5314 + 92ea1d8 commit 1df026d

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,8 +445,20 @@ public void farmerTaskPropogatesException() {
445445
bulkProcessor.add(43, addTimeoutMs);
446446

447447
Runnable farmer = bulkProcessor.farmerTask();
448-
ConnectException e = assertThrows(
449-
ConnectException.class, () -> farmer.run());
448+
ConnectException e = assertThrows(ConnectException.class, () -> {
449+
farmer.run();
450+
// There's a small race condition in the farmer task where a failure on a batch thread
451+
// causes the stopRequested flag of the BulkProcessor to get set, and subsequently checked
452+
// on the farmer task thread, before the batch thread has time to actually complete and
453+
// throw an exception. When this happens, the invocation of farmer::run does not throw an
454+
// exception. However, we can still verify that a batch failed and that the bulk processor
455+
// is in the expected state by invoking BulkProcessor::throwIfFailed, which throws the first
456+
// error encountered on any batch thread. Even if the aforementioned race condition occurs,
457+
// the error should still have been captured by the bulk processor and if it is not present
458+
// by this point, it is a legitimate sign of a bug in the processor instead of a benign race
459+
// condition that just makes testing a little more complicated.
460+
bulkProcessor.throwIfFailed();
461+
});
450462
assertThat(e.getMessage(), containsString(errorInfo));
451463
}
452464

0 commit comments

Comments
 (0)