Skip to content

Commit 44e87cc

Browse files
committed
Merge branch '5.0.x' into 5.1.x
2 parents c01018e + 116e415 commit 44e87cc

File tree

2 files changed

+38
-14
lines changed

2 files changed

+38
-14
lines changed

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchWriter.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -280,17 +280,14 @@ private void tryWriteRecord(
280280
String index,
281281
boolean ignoreKey,
282282
boolean ignoreSchema) {
283-
283+
IndexableRecord record = null;
284284
try {
285-
IndexableRecord record = converter.convertRecord(
286-
sinkRecord,
287-
index,
288-
type,
289-
ignoreKey,
290-
ignoreSchema);
291-
if (record != null) {
292-
bulkProcessor.add(record, flushTimeoutMs);
293-
}
285+
record = converter.convertRecord(
286+
sinkRecord,
287+
index,
288+
type,
289+
ignoreKey,
290+
ignoreSchema);
294291
} catch (ConnectException convertException) {
295292
if (dropInvalidMessage) {
296293
log.error(
@@ -305,6 +302,9 @@ private void tryWriteRecord(
305302
throw convertException;
306303
}
307304
}
305+
if (record != null) {
306+
bulkProcessor.add(record, flushTimeoutMs);
307+
}
308308
}
309309

310310
/**

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchWriterTest.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,14 @@
1717

1818
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
1919
import com.fasterxml.jackson.databind.ObjectMapper;
20-
2120
import org.apache.kafka.connect.data.Decimal;
2221
import org.apache.kafka.connect.data.Schema;
2322
import org.apache.kafka.connect.data.SchemaBuilder;
2423
import org.apache.kafka.connect.data.Struct;
2524
import org.apache.kafka.connect.errors.ConnectException;
2625
import org.apache.kafka.connect.errors.DataException;
2726
import org.apache.kafka.connect.sink.SinkRecord;
28-
import org.junit.Rule;
27+
import org.hamcrest.MatcherAssert;
2928
import org.junit.Before;
3029
import org.junit.Test;
3130

@@ -41,9 +40,8 @@
4140
import java.util.Map;
4241
import java.util.Set;
4342

44-
import org.junit.rules.ExpectedException;
45-
4643
import static io.confluent.connect.elasticsearch.DataConverter.BehaviorOnNullValues;
44+
import static org.hamcrest.Matchers.containsString;
4745

4846
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
4947
public class ElasticsearchWriterTest extends ElasticsearchSinkTestBase {
@@ -458,6 +456,32 @@ public void testDropInvalidRecord() throws Exception {
458456
verifySearchResults(outputRecords, ignoreKey, ignoreSchema);
459457
}
460458

459+
@Test
460+
public void testDropInvalidRecordThrowsOnOtherErrors() throws Exception {
461+
ignoreSchema = true;
462+
Collection<SinkRecord> inputRecords = new ArrayList<>();
463+
464+
Schema structSchema = SchemaBuilder.struct().name("struct")
465+
.field("bytes", SchemaBuilder.BYTES_SCHEMA)
466+
.build();
467+
468+
Struct struct = new Struct(structSchema);
469+
struct.put("bytes", new byte[]{42});
470+
471+
SinkRecord validRecord = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, structSchema, struct, 1);
472+
473+
inputRecords.add(validRecord);
474+
475+
final ElasticsearchWriter nonStrictWriter = initWriter(client, true);
476+
// stop the bulk processor
477+
nonStrictWriter.stop();
478+
479+
// try to write on a stopped writer, should throw
480+
ConnectException e = assertThrows(ConnectException.class,
481+
() -> nonStrictWriter.write(inputRecords));
482+
MatcherAssert.assertThat(e.getMessage(), containsString("Stopping"));
483+
}
484+
461485
private Collection<SinkRecord> prepareData(int numRecords) {
462486
Collection<SinkRecord> records = new ArrayList<>();
463487
for (int i = 0; i < numRecords; ++i) {

0 commit comments

Comments
 (0)