Skip to content

Commit c1918a2

Browse files
Merge branch '5.0.x' into 5.1.x
2 parents a07a0b2 + ba8f7ee commit c1918a2

File tree

6 files changed

+21
-10
lines changed

6 files changed

+21
-10
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ private Object preProcessLogicalValue(String schemaName, Object value) {
322322
}
323323

324324
private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
325-
Collection collection = (Collection) value;
325+
Collection<?> collection = (Collection<?>) value;
326326
List<Object> result = new ArrayList<>();
327327
for (Object element: collection) {
328328
result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
public class ElasticsearchSinkTask extends SinkTask {
3535

3636
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
37-
private ElasticsearchWriter writer;
37+
private volatile ElasticsearchWriter writer;
3838
private ElasticsearchClient client;
3939
private Boolean createIndicesAtStartTime;
4040

@@ -123,8 +123,12 @@ public void put(Collection<SinkRecord> records) throws ConnectException {
123123

124124
@Override
125125
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
126-
log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets);
127-
writer.flush();
126+
if (writer != null) {
127+
log.debug("Flushing data to Elasticsearch with the following offsets: {}", offsets);
128+
writer.flush();
129+
} else {
130+
log.debug("Could not flush data to Elasticsearch because ESWriter already closed.");
131+
}
128132
}
129133

130134
@Override
@@ -137,6 +141,7 @@ public void stop() throws ConnectException {
137141
log.info("Stopping ElasticsearchSinkTask.");
138142
if (writer != null) {
139143
writer.stop();
144+
writer = null;
140145
}
141146
if (client != null) {
142147
client.close();

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ public BulkRequest createBulkRequest(List<IndexableRecord> batch) {
320320
}
321321

322322
// visible for testing
323-
protected BulkableAction toBulkableAction(IndexableRecord record) {
323+
protected BulkableAction<?> toBulkableAction(IndexableRecord record) {
324324
// If payload is null, the record was a tombstone and we should delete from the index.
325325
return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record);
326326
}

src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void stringKeyedMapCompactFormat() {
234234
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(),
235235
preProcessedSchema
236236
);
237-
HashMap newValue = (HashMap) converter.preProcessValue(origValue, origSchema, preProcessedSchema);
237+
HashMap<?, ?> newValue = (HashMap<?, ?>) converter.preProcessValue(origValue, origSchema, preProcessedSchema);
238238
assertEquals(origValue, newValue);
239239
}
240240

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
151151

152152
}
153153

154+
@Test
155+
public void testStopThenFlushDoesNotThrow() {
156+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
157+
task.start(createProps(), client);
158+
task.stop();
159+
task.flush(new HashMap<>());
160+
}
161+
154162
private boolean verifyIndexExist(InternalTestCluster cluster, String ... indices) {
155163
ActionFuture<IndicesExistsResponse> action = cluster
156164
.client()

src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,10 +378,8 @@ public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
378378
// Test both IGNORE and WARN options
379379
// There is no difference in logic between IGNORE and WARN, except for the logging.
380380
// Test to ensure they both work the same logically
381-
final List<BehaviorOnMalformedDoc> behaviorsToTest = new ArrayList<BehaviorOnMalformedDoc>() {{
382-
add(BehaviorOnMalformedDoc.WARN);
383-
add(BehaviorOnMalformedDoc.IGNORE);
384-
}};
381+
final List<BehaviorOnMalformedDoc> behaviorsToTest =
382+
Arrays.asList(BehaviorOnMalformedDoc.WARN, BehaviorOnMalformedDoc.IGNORE);
385383

386384
for(BehaviorOnMalformedDoc behaviorOnMalformedDoc : behaviorsToTest)
387385
{

0 commit comments

Comments
 (0)