Skip to content

Commit ba8f7ee

Browse files
author
Lev Zemlyanov
authored
CC-12530: do not throw exception on flush after task close (#455)
* CC-12530: do not throw exception on flush after task close Signed-off-by: Lev Zemlyanov <lev@confluent.io>
1 parent 0710974 commit ba8f7ee

File tree

6 files changed

+21
-10
lines changed

6 files changed

+21
-10
lines changed

src/main/java/io/confluent/connect/elasticsearch/DataConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ private Object preProcessLogicalValue(String schemaName, Object value) {
323323
}
324324

325325
private Object preProcessArrayValue(Object value, Schema schema, Schema newSchema) {
326-
Collection collection = (Collection) value;
326+
Collection<?> collection = (Collection<?>) value;
327327
List<Object> result = new ArrayList<>();
328328
for (Object element: collection) {
329329
result.add(preProcessValue(element, schema.valueSchema(), newSchema.valueSchema()));

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTask.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
public class ElasticsearchSinkTask extends SinkTask {
3636

3737
private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
38-
private ElasticsearchWriter writer;
38+
private volatile ElasticsearchWriter writer;
3939
private ElasticsearchClient client;
4040
private Boolean createIndicesAtStartTime;
4141

@@ -124,8 +124,12 @@ public void put(Collection<SinkRecord> records) throws ConnectException {
124124

125125
@Override
126126
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
127-
log.trace("Flushing data to Elasticsearch with the following offsets: {}", offsets);
128-
writer.flush();
127+
if (writer != null) {
128+
log.debug("Flushing data to Elasticsearch with the following offsets: {}", offsets);
129+
writer.flush();
130+
} else {
131+
log.debug("Could not flush data to Elasticsearch because ESWriter already closed.");
132+
}
129133
}
130134

131135
@Override
@@ -138,6 +142,7 @@ public void stop() throws ConnectException {
138142
log.info("Stopping ElasticsearchSinkTask.");
139143
if (writer != null) {
140144
writer.stop();
145+
writer = null;
141146
}
142147
if (client != null) {
143148
client.close();

src/main/java/io/confluent/connect/elasticsearch/jest/JestElasticsearchClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public BulkRequest createBulkRequest(List<IndexableRecord> batch) {
316316
}
317317

318318
// visible for testing
319-
protected BulkableAction toBulkableAction(IndexableRecord record) {
319+
protected BulkableAction<?> toBulkableAction(IndexableRecord record) {
320320
// If payload is null, the record was a tombstone and we should delete from the index.
321321
return record.payload != null ? toIndexRequest(record) : toDeleteRequest(record);
322322
}

src/test/java/io/confluent/connect/elasticsearch/DataConverterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public void stringKeyedMapCompactFormat() {
235235
SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(),
236236
preProcessedSchema
237237
);
238-
HashMap newValue = (HashMap) converter.preProcessValue(origValue, origSchema, preProcessedSchema);
238+
HashMap<?, ?> newValue = (HashMap<?, ?>) converter.preProcessValue(origValue, origSchema, preProcessedSchema);
239239
assertEquals(origValue, newValue);
240240
}
241241

src/test/java/io/confluent/connect/elasticsearch/ElasticsearchSinkTaskTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ public void testCreateAndWriteToIndexNotCreatedAtStartTime() {
152152

153153
}
154154

155+
@Test
156+
public void testStopThenFlushDoesNotThrow() {
157+
ElasticsearchSinkTask task = new ElasticsearchSinkTask();
158+
task.start(createProps(), client);
159+
task.stop();
160+
task.flush(new HashMap<>());
161+
}
162+
155163
private boolean verifyIndexExist(InternalTestCluster cluster, String ... indices) {
156164
ActionFuture<IndicesExistsResponse> action = cluster
157165
.client()

src/test/java/io/confluent/connect/elasticsearch/bulk/BulkProcessorTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,10 +379,8 @@ public void ignoreOrWarnOnMalformedDoc() throws InterruptedException {
379379
// Test both IGNORE and WARN options
380380
// There is no difference in logic between IGNORE and WARN, except for the logging.
381381
// Test to ensure they both work the same logically
382-
final List<BehaviorOnMalformedDoc> behaviorsToTest = new ArrayList<BehaviorOnMalformedDoc>() {{
383-
add(BehaviorOnMalformedDoc.WARN);
384-
add(BehaviorOnMalformedDoc.IGNORE);
385-
}};
382+
final List<BehaviorOnMalformedDoc> behaviorsToTest =
383+
Arrays.asList(BehaviorOnMalformedDoc.WARN, BehaviorOnMalformedDoc.IGNORE);
386384

387385
for(BehaviorOnMalformedDoc behaviorOnMalformedDoc : behaviorsToTest)
388386
{

0 commit comments

Comments
 (0)