1515
1616package io .confluent .connect .elasticsearch ;
1717
18+ import java .net .URI ;
19+ import java .net .URISyntaxException ;
1820import java .util .HashMap ;
1921import java .util .HashSet ;
2022import java .util .List ;
2123import java .util .Set ;
24+ import java .util .concurrent .TimeUnit ;
2225import org .apache .kafka .common .config .AbstractConfig ;
2326import org .apache .kafka .common .config .ConfigDef ;
2427import org .apache .kafka .common .config .ConfigDef .Importance ;
2528import org .apache .kafka .common .config .ConfigDef .Type ;
29+ import org .apache .kafka .common .config .ConfigDef .Validator ;
2630import org .apache .kafka .common .config .ConfigDef .Width ;
2731
2832import java .util .Map ;
33+ import org .apache .kafka .common .config .ConfigException ;
2934import org .apache .kafka .common .config .types .Password ;
3035
3136import static io .confluent .connect .elasticsearch .DataConverter .BehaviorOnNullValues ;
3237import static io .confluent .connect .elasticsearch .bulk .BulkProcessor .BehaviorOnMalformedDoc ;
38+ import static org .apache .kafka .common .config .ConfigDef .Range .between ;
3339
3440public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
3541
@@ -96,7 +102,7 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
96102 + "space to be made available by completed requests as records are added. If this timeout "
97103 + "is exceeded the task will fail." ;
98104 private static final String FLUSH_TIMEOUT_MS_DISPLAY = "Flush Timeout (ms)" ;
99- private static final int FLUSH_TIMEOUT_MS_DEFAULT = 10000 ;
105+ private static final int FLUSH_TIMEOUT_MS_DEFAULT = ( int ) TimeUnit . SECONDS . toMillis ( 10 ) ;
100106
101107 public static final String MAX_RETRIES_CONFIG = "max.retries" ;
102108 private static final String MAX_RETRIES_DOC =
@@ -120,15 +126,15 @@ public class ElasticsearchSinkConnectorConfig extends AbstractConfig {
120126 + "The task fails if the client fails to connect to the server in this "
121127 + "interval, and will need to be restarted." ;
122128 private static final String CONNECTION_TIMEOUT_MS_DISPLAY = "Connection Timeout" ;
123- private static final int CONNECTION_TIMEOUT_MS_DEFAULT = 1000 ;
129+ private static final int CONNECTION_TIMEOUT_MS_DEFAULT = ( int ) TimeUnit . SECONDS . toMillis ( 1 ) ;
124130
125131 public static final String READ_TIMEOUT_MS_CONFIG = "read.timeout.ms" ;
126132 private static final String READ_TIMEOUT_MS_CONFIG_DOC = "How long to wait in "
127133 + "milliseconds for the Elasticsearch server to send a response. The task fails "
128134 + "if any read operation times out, and will need to be restarted to resume "
129135 + "further operations." ;
130136 private static final String READ_TIMEOUT_MS_DISPLAY = "Read Timeout" ;
131- private static final int READ_TIMEOUT_MS_DEFAULT = 3000 ;
137+ private static final int READ_TIMEOUT_MS_DEFAULT = ( int ) TimeUnit . SECONDS . toMillis ( 3 ) ;
132138
133139 public static final String CREATE_INDICES_AT_START_CONFIG = "auto.create.indices.at.start" ;
134140 private static final String CREATE_INDICES_AT_START_DOC = "Auto create the Elasticsearch"
@@ -235,6 +241,8 @@ private static void addConnectorConfigs(ConfigDef configDef) {
235241 .define (
236242 CONNECTION_URL_CONFIG ,
237243 Type .LIST ,
244+ ConfigDef .NO_DEFAULT_VALUE ,
245+ new UrlListValidator (),
238246 Importance .HIGH ,
239247 CONNECTION_URL_DOC ,
240248 CONNECTOR_GROUP ,
@@ -265,6 +273,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
265273 BATCH_SIZE_CONFIG ,
266274 Type .INT ,
267275 BATCH_SIZE_DEFAULT ,
276+ between (1 , 1000000 ),
268277 Importance .MEDIUM ,
269278 BATCH_SIZE_DOC ,
270279 CONNECTOR_GROUP ,
@@ -275,6 +284,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
275284 MAX_IN_FLIGHT_REQUESTS_CONFIG ,
276285 Type .INT ,
277286 MAX_IN_FLIGHT_REQUESTS_DEFAULT ,
287+ between (1 , 1000 ),
278288 Importance .MEDIUM ,
279289 MAX_IN_FLIGHT_REQUESTS_DOC ,
280290 CONNECTOR_GROUP ,
@@ -285,6 +295,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
285295 MAX_BUFFERED_RECORDS_CONFIG ,
286296 Type .INT ,
287297 MAX_BUFFERED_RECORDS_DEFAULT ,
298+ between (1 , Integer .MAX_VALUE ),
288299 Importance .LOW ,
289300 MAX_BUFFERED_RECORDS_DOC ,
290301 CONNECTOR_GROUP ,
@@ -295,6 +306,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
295306 LINGER_MS_CONFIG ,
296307 Type .LONG ,
297308 LINGER_MS_DEFAULT ,
309+ between (0 , TimeUnit .DAYS .toMillis (7 )),
298310 Importance .LOW ,
299311 LINGER_MS_DOC ,
300312 CONNECTOR_GROUP ,
@@ -305,6 +317,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
305317 FLUSH_TIMEOUT_MS_CONFIG ,
306318 Type .LONG ,
307319 FLUSH_TIMEOUT_MS_DEFAULT ,
320+ between (TimeUnit .SECONDS .toMillis (1 ), Long .MAX_VALUE ),
308321 Importance .LOW ,
309322 FLUSH_TIMEOUT_MS_DOC ,
310323 CONNECTOR_GROUP ,
@@ -315,6 +328,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
315328 MAX_RETRIES_CONFIG ,
316329 Type .INT ,
317330 MAX_RETRIES_DEFAULT ,
331+ between (0 , Integer .MAX_VALUE ),
318332 Importance .LOW ,
319333 MAX_RETRIES_DOC ,
320334 CONNECTOR_GROUP ,
@@ -325,6 +339,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
325339 RETRY_BACKOFF_MS_CONFIG ,
326340 Type .LONG ,
327341 RETRY_BACKOFF_MS_DEFAULT ,
342+ between (0 , TimeUnit .DAYS .toMillis (1 )),
328343 Importance .LOW ,
329344 RETRY_BACKOFF_MS_DOC ,
330345 CONNECTOR_GROUP ,
@@ -335,6 +350,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
335350 CONNECTION_TIMEOUT_MS_CONFIG ,
336351 Type .INT ,
337352 CONNECTION_TIMEOUT_MS_DEFAULT ,
353+ between (0 , TimeUnit .HOURS .toMillis (12 )),
338354 Importance .LOW ,
339355 CONNECTION_TIMEOUT_MS_CONFIG_DOC ,
340356 CONNECTOR_GROUP ,
@@ -345,6 +361,7 @@ private static void addConnectorConfigs(ConfigDef configDef) {
345361 READ_TIMEOUT_MS_CONFIG ,
346362 Type .INT ,
347363 READ_TIMEOUT_MS_DEFAULT ,
364+ between (0 , TimeUnit .DAYS .toMillis (7 )),
348365 Importance .LOW ,
349366 READ_TIMEOUT_MS_CONFIG_DOC ,
350367 CONNECTOR_GROUP ,
@@ -588,6 +605,29 @@ private Map<String, String> parseMapConfig(List<String> values) {
588605 return map ;
589606 }
590607
608+ private static class UrlListValidator implements Validator {
609+
610+ @ Override
611+ public void ensureValid (String name , Object value ) {
612+ @ SuppressWarnings ("unchecked" )
613+ List <String > urls = (List <String >) value ;
614+ for (String url : urls ) {
615+ try {
616+ new URI (url );
617+ } catch (URISyntaxException e ) {
618+ throw new ConfigException (
619+ name , value , "The provided url '" + url + "' is not a valid url."
620+ );
621+ }
622+ }
623+ }
624+
625+ @ Override
626+ public String toString () {
627+ return "List of valid URIs." ;
628+ }
629+ }
630+
591631 public static void main (String [] args ) {
592632 System .out .println (CONFIG .toEnrichedRst ());
593633 }
0 commit comments