1616
1717package io .confluent .connect .elasticsearch ;
1818
19+ import java .net .URI ;
20+ import java .net .URISyntaxException ;
1921import java .util .HashMap ;
2022import java .util .HashSet ;
2123import java .util .List ;
2224import java .util .Set ;
25+ import java .util .concurrent .TimeUnit ;
2326import org .apache .kafka .common .config .AbstractConfig ;
2427import org .apache .kafka .common .config .ConfigDef ;
2528import org .apache .kafka .common .config .ConfigDef .Importance ;
2629import org .apache .kafka .common .config .ConfigDef .Type ;
30+ import org .apache .kafka .common .config .ConfigDef .Validator ;
2731import org .apache .kafka .common .config .ConfigDef .Width ;
2832
2933import java .util .Map ;
34+ import org .apache .kafka .common .config .ConfigException ;
3035import org .apache .kafka .common .config .types .Password ;
3136
3237import static io .confluent .connect .elasticsearch .DataConverter .BehaviorOnNullValues ;
3338import static io .confluent .connect .elasticsearch .bulk .BulkProcessor .BehaviorOnMalformedDoc ;
39+ import static org .apache .kafka .common .config .ConfigDef .Range .between ;
3440
3541public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
3642
@@ -97,7 +103,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
97103 + "space to be made available by completed requests as records are added. If this timeout "
98104 + "is exceeded the task will fail." ;
99105 private static final String FLUSH_TIMEOUT_MS_DISPLAY = "Flush Timeout (ms)" ;
100- private static final int FLUSH_TIMEOUT_MS_DEFAULT = 10000 ;
106+ private static final int FLUSH_TIMEOUT_MS_DEFAULT = ( int ) TimeUnit . SECONDS . toMillis ( 10 ) ;
101107
102108 public static final String MAX_RETRIES_CONFIG = "max.retries" ;
103109 private static final String MAX_RETRIES_DOC =
@@ -121,15 +127,15 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
121127 + "The task fails if the client fails to connect to the server in this "
122128 + "interval, and will need to be restarted." ;
123129 private static final String CONNECTION_TIMEOUT_MS_DISPLAY = "Connection Timeout" ;
124- private static final int CONNECTION_TIMEOUT_MS_DEFAULT = 1000 ;
130+ private static final int CONNECTION_TIMEOUT_MS_DEFAULT = ( int ) TimeUnit . SECONDS . toMillis ( 1 ) ;
125131
126132 public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms" ;
127133 private static final String READ_TIMEOUT_MS_CONFIG_DOC = "How long to wait in "
128134 + "milliseconds for the Elasticsearch server to send a response. The task fails "
129135 + "if any read operation times out, and will need to be restarted to resume "
130136 + "further operations." ;
131137 private static final String READ_TIMEOUT_MS_DISPLAY = "Read Timeout" ;
132- private static final int READ_TIMEOUT_MS_DEFAULT = 3000 ;
138+ private static final int READ_TIMEOUT_MS_DEFAULT = ( int ) TimeUnit . SECONDS . toMillis ( 3 ) ;
133139
134140 public static final String CREATE_INDICES_AT_START_CONFIG = "auto.create.indices.at.start" ;
135141 private static final String CREATE_INDICES_AT_START_DOC = "Auto create the Elasticsearch"
@@ -236,6 +242,8 @@ private static void addConnectorConfigs(ConfigDef configDef) {
236242 .define (
237243 CONNECTION_URL_CONFIG ,
238244 Type .LIST ,
245+ ConfigDef .NO_DEFAULT_VALUE ,
246+ new UrlListValidator (),
239247 Importance .HIGH ,
240248 CONNECTION_URL_DOC ,
241249 CONNECTOR_GROUP ,
@@ -266,6 +274,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
266274 BATCH_SIZE_CONFIG ,
267275 Type .INT ,
268276 BATCH_SIZE_DEFAULT ,
277+ between (1 , 1000000 ),
269278 Importance .MEDIUM ,
270279 BATCH_SIZE_DOC ,
271280 CONNECTOR_GROUP ,
@@ -276,6 +285,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
276285 MAX_IN_FLIGHT_REQUESTS_CONFIG ,
277286 Type .INT ,
278287 MAX_IN_FLIGHT_REQUESTS_DEFAULT ,
288+ between (1 , 1000 ),
279289 Importance .MEDIUM ,
280290 MAX_IN_FLIGHT_REQUESTS_DOC ,
281291 CONNECTOR_GROUP ,
@@ -286,6 +296,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
286296 MAX_BUFFERED_RECORDS_CONFIG ,
287297 Type .INT ,
288298 MAX_BUFFERED_RECORDS_DEFAULT ,
299+ between (1 , Integer .MAX_VALUE ),
289300 Importance .LOW ,
290301 MAX_BUFFERED_RECORDS_DOC ,
291302 CONNECTOR_GROUP ,
@@ -296,6 +307,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
296307 LINGER_MS_CONFIG ,
297308 Type .LONG ,
298309 LINGER_MS_DEFAULT ,
310+ between (0 , TimeUnit .DAYS .toMillis (7 )),
299311 Importance .LOW ,
300312 LINGER_MS_DOC ,
301313 CONNECTOR_GROUP ,
@@ -306,6 +318,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
306318 FLUSH_TIMEOUT_MS_CONFIG ,
307319 Type .LONG ,
308320 FLUSH_TIMEOUT_MS_DEFAULT ,
321+ between (TimeUnit .SECONDS .toMillis (1 ), Long .MAX_VALUE ),
309322 Importance .LOW ,
310323 FLUSH_TIMEOUT_MS_DOC ,
311324 CONNECTOR_GROUP ,
@@ -316,6 +329,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
316329 MAX_RETRIES_CONFIG ,
317330 Type .INT ,
318331 MAX_RETRIES_DEFAULT ,
332+ between (0 , Integer .MAX_VALUE ),
319333 Importance .LOW ,
320334 MAX_RETRIES_DOC ,
321335 CONNECTOR_GROUP ,
@@ -326,6 +340,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
326340 RETRY_BACKOFF_MS_CONFIG ,
327341 Type .LONG ,
328342 RETRY_BACKOFF_MS_DEFAULT ,
343+ between (0 , TimeUnit .DAYS .toMillis (1 )),
329344 Importance .LOW ,
330345 RETRY_BACKOFF_MS_DOC ,
331346 CONNECTOR_GROUP ,
@@ -336,6 +351,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
336351 CONNECTION_TIMEOUT_MS_CONFIG ,
337352 Type .INT ,
338353 CONNECTION_TIMEOUT_MS_DEFAULT ,
354+ between (0 , TimeUnit .HOURS .toMillis (12 )),
339355 Importance .LOW ,
340356 CONNECTION_TIMEOUT_MS_CONFIG_DOC ,
341357 CONNECTOR_GROUP ,
@@ -346,6 +362,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
346362 READ_TIMEOUT_MS_CONFIG ,
347363 Type .INT ,
348364 READ_TIMEOUT_MS_DEFAULT ,
365+ between (0 , TimeUnit .DAYS .toMillis (7 )),
349366 Importance .LOW ,
350367 READ_TIMEOUT_MS_CONFIG_DOC ,
351368 CONNECTOR_GROUP ,
@@ -589,6 +606,29 @@ private Map<String, String> parseMapConfig(List<String> values) {
589606 return map ;
590607 }
591608
609+ private static class UrlListValidator implements Validator {
610+
611+ @ Override
612+ public void ensureValid (String name , Object value ) {
613+ @ SuppressWarnings ("unchecked" )
614+ List <String > urls = (List <String >) value ;
615+ for (String url : urls ) {
616+ try {
617+ new URI (url );
618+ } catch (URISyntaxException e ) {
619+ throw new ConfigException (
620+ name , value , "The provided url '" + url + "' is not a valid url."
621+ );
622+ }
623+ }
624+ }
625+
626+ @ Override
627+ public String toString () {
628+ return "List of valid URIs." ;
629+ }
630+ }
631+
592632 public static void main (String [] args ) {
593633 System .out .println (CONFIG .toEnrichedRst ());
594634 }
0 commit comments