Skip to content

Commit e88e0f6

Browse files
Merge branch '5.0.x' into 5.1.x
2 parents c1918a2 + 3c002f8 commit e88e0f6

File tree

4 files changed

+318
-1
lines changed

4 files changed

+318
-1
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<hamcrest.version>1.3</hamcrest.version>
4545
<mockito.version>2.13.0</mockito.version>
4646
<jest.version>6.3.1</jest.version>
47-
<test.containers.version>1.11.4</test.containers.version>
47+
<test.containers.version>1.15.0-rc2</test.containers.version>
4848
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
4949
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
5050
</properties>

src/main/java/io/confluent/connect/elasticsearch/ElasticsearchSinkConnector.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package io.confluent.connect.elasticsearch;
1717

18+
import org.apache.kafka.common.config.Config;
1819
import org.apache.kafka.common.config.ConfigDef;
1920
import org.apache.kafka.common.config.ConfigException;
2021
import org.apache.kafka.connect.connector.Task;
@@ -74,4 +75,10 @@ public void stop() throws ConnectException {
7475
public ConfigDef config() {
7576
return ElasticsearchSinkConnectorConfig.CONFIG;
7677
}
78+
79+
@Override
80+
public Config validate(Map<String, String> connectorConfigs) {
81+
Validator validator = new Validator(connectorConfigs);
82+
return validator.validate();
83+
}
7784
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/**
2+
* Copyright 2020 Confluent Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
**/
16+
17+
package io.confluent.connect.elasticsearch;
18+
19+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
20+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
21+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
22+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
23+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
24+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG;
25+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG;
26+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG;
27+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG;
28+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
29+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
30+
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.function.Function;
34+
import java.util.stream.Collectors;
35+
import org.apache.kafka.common.config.Config;
36+
import org.apache.kafka.common.config.ConfigException;
37+
import org.apache.kafka.common.config.ConfigValue;
38+
39+
public class Validator {
40+
41+
private ElasticsearchSinkConnectorConfig config;
42+
private Map<String, ConfigValue> values;
43+
private List<ConfigValue> validations;
44+
45+
public Validator(Map<String, String> props) {
46+
try {
47+
this.config = new ElasticsearchSinkConnectorConfig(props);
48+
} catch (ConfigException e) {
49+
// some configs are invalid
50+
}
51+
52+
validations = ElasticsearchSinkConnectorConfig.CONFIG.validate(props);
53+
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));
54+
}
55+
56+
public Config validate() {
57+
if (config == null) {
58+
// individual configs are invalid, no point in validating combinations
59+
return new Config(validations);
60+
}
61+
62+
validateCredentials();
63+
validateIgnoreConfigs();
64+
validateLingerMs();
65+
validateMaxBufferedRecords();
66+
67+
return new Config(validations);
68+
}
69+
70+
71+
private void validateCredentials() {
72+
boolean onlyOneSet = config.username() != null ^ config.password() != null;
73+
if (onlyOneSet) {
74+
String errorMessage = String.format(
75+
"Both %s and %s must be set.", CONNECTION_USERNAME_CONFIG, CONNECTION_PASSWORD_CONFIG
76+
);
77+
addErrorMessage(CONNECTION_USERNAME_CONFIG, errorMessage);
78+
addErrorMessage(CONNECTION_PASSWORD_CONFIG, errorMessage);
79+
}
80+
}
81+
82+
private void validateIgnoreConfigs() {
83+
if (config.ignoreKey() && !config.ignoreKeyTopics().isEmpty()) {
84+
String errorMessage = String.format(
85+
"%s can not be set if %s is true.", IGNORE_KEY_TOPICS_CONFIG, IGNORE_KEY_CONFIG
86+
);
87+
addErrorMessage(IGNORE_KEY_CONFIG, errorMessage);
88+
addErrorMessage(IGNORE_KEY_TOPICS_CONFIG, errorMessage);
89+
}
90+
91+
if (config.ignoreSchema() && !config.ignoreSchemaTopics().isEmpty()) {
92+
String errorMessage = String.format(
93+
"%s can not be set if %s is true.", IGNORE_SCHEMA_TOPICS_CONFIG, IGNORE_SCHEMA_CONFIG
94+
);
95+
addErrorMessage(IGNORE_SCHEMA_CONFIG, errorMessage);
96+
addErrorMessage(IGNORE_SCHEMA_TOPICS_CONFIG, errorMessage);
97+
}
98+
}
99+
100+
private void validateLingerMs() {
101+
if (config.lingerMs() > config.flushTimeoutMs()) {
102+
String errorMessage = String.format(
103+
"%s (%d) can not be larger than %s (%d).",
104+
LINGER_MS_CONFIG, config.lingerMs(), FLUSH_TIMEOUT_MS_CONFIG, config.flushTimeoutMs()
105+
);
106+
addErrorMessage(LINGER_MS_CONFIG, errorMessage);
107+
addErrorMessage(FLUSH_TIMEOUT_MS_CONFIG, errorMessage);
108+
}
109+
}
110+
111+
private void validateMaxBufferedRecords() {
112+
if (config.maxBufferedRecords() < config.batchSize() * config.maxInFlightRequests()) {
113+
String errorMessage = String.format(
114+
"%s (%d) must be larger than or equal to %s (%d) x %s (%d).",
115+
MAX_BUFFERED_RECORDS_CONFIG, config.maxBufferedRecords(),
116+
BATCH_SIZE_CONFIG, config.batchSize(),
117+
MAX_IN_FLIGHT_REQUESTS_CONFIG, config.maxInFlightRequests()
118+
);
119+
120+
addErrorMessage(MAX_BUFFERED_RECORDS_CONFIG, errorMessage);
121+
addErrorMessage(BATCH_SIZE_CONFIG, errorMessage);
122+
addErrorMessage(MAX_IN_FLIGHT_REQUESTS_CONFIG, errorMessage);
123+
}
124+
}
125+
126+
private void addErrorMessage(String property, String error) {
127+
values.get(property).addErrorMessage(error);
128+
}
129+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/**
2+
* Copyright 2020 Confluent Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
**/
16+
17+
package io.confluent.connect.elasticsearch;
18+
19+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
20+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
21+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
22+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
23+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.FLUSH_TIMEOUT_MS_CONFIG;
24+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
25+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_TOPICS_CONFIG;
26+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_CONFIG;
27+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_SCHEMA_TOPICS_CONFIG;
28+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.LINGER_MS_CONFIG;
29+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_BUFFERED_RECORDS_CONFIG;
30+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
31+
import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.TYPE_NAME_CONFIG;
32+
import static org.junit.Assert.assertFalse;
33+
import static org.junit.Assert.assertTrue;
34+
35+
import java.util.HashMap;
36+
import java.util.Map;
37+
import org.apache.kafka.common.config.Config;
38+
import org.apache.kafka.common.config.ConfigValue;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
42+
public class ValidatorTest {
43+
44+
private Map<String, String> props;
45+
private Validator validator;
46+
47+
@Before
48+
public void setup() {
49+
props = new HashMap<>();
50+
props.put(TYPE_NAME_CONFIG, "type");
51+
props.put(CONNECTION_URL_CONFIG, "localhost:8080");
52+
}
53+
54+
@Test
55+
public void testInvalidCredentials() {
56+
props.put(CONNECTION_USERNAME_CONFIG, "username");
57+
validator = new Validator(props);
58+
59+
Config result = validator.validate();
60+
assertHasErrorMessage(result, CONNECTION_USERNAME_CONFIG, "must be set");
61+
assertHasErrorMessage(result, CONNECTION_PASSWORD_CONFIG, "must be set");
62+
props.remove(CONNECTION_USERNAME_CONFIG);
63+
64+
props.put(CONNECTION_PASSWORD_CONFIG, "password");
65+
validator = new Validator(props);
66+
result = validator.validate();
67+
assertHasErrorMessage(result, CONNECTION_USERNAME_CONFIG, "must be set");
68+
assertHasErrorMessage(result, CONNECTION_PASSWORD_CONFIG, "must be set");
69+
}
70+
71+
@Test
72+
public void testValidCredentials() {
73+
// username and password not set
74+
validator = new Validator(props);
75+
76+
Config result = validator.validate();
77+
assertNoErrors(result);
78+
79+
// both set
80+
props.put(CONNECTION_USERNAME_CONFIG, "username");
81+
props.put(CONNECTION_PASSWORD_CONFIG, "password");
82+
validator = new Validator(props);
83+
84+
result = validator.validate();
85+
assertNoErrors(result);
86+
}
87+
88+
@Test
89+
public void testInvalidIgnoreConfigs() {
90+
props.put(IGNORE_KEY_CONFIG, "true");
91+
props.put(IGNORE_KEY_TOPICS_CONFIG, "some,topics");
92+
props.put(IGNORE_SCHEMA_CONFIG, "true");
93+
props.put(IGNORE_SCHEMA_TOPICS_CONFIG, "some,other,topics");
94+
validator = new Validator(props);
95+
96+
Config result = validator.validate();
97+
assertHasErrorMessage(result, IGNORE_KEY_CONFIG, "is true");
98+
assertHasErrorMessage(result, IGNORE_KEY_TOPICS_CONFIG, "is true");
99+
assertHasErrorMessage(result, IGNORE_SCHEMA_CONFIG, "is true");
100+
assertHasErrorMessage(result, IGNORE_SCHEMA_TOPICS_CONFIG, "is true");
101+
}
102+
103+
@Test
104+
public void testValidIgnoreConfigs() {
105+
// topics configs not set
106+
props.put(IGNORE_KEY_CONFIG, "true");
107+
props.put(IGNORE_SCHEMA_CONFIG, "true");
108+
validator = new Validator(props);
109+
110+
Config result = validator.validate();
111+
assertNoErrors(result);
112+
113+
// ignore configs are false
114+
props.put(IGNORE_KEY_CONFIG, "false");
115+
props.put(IGNORE_KEY_TOPICS_CONFIG, "some,topics");
116+
props.put(IGNORE_SCHEMA_CONFIG, "false");
117+
props.put(IGNORE_SCHEMA_TOPICS_CONFIG, "some,other,topics");
118+
validator = new Validator(props);
119+
120+
result = validator.validate();
121+
assertNoErrors(result);
122+
}
123+
124+
@Test
125+
public void testInvalidLingerMs() {
126+
props.put(LINGER_MS_CONFIG, "2");
127+
props.put(FLUSH_TIMEOUT_MS_CONFIG, "1");
128+
validator = new Validator(props);
129+
130+
Config result = validator.validate();
131+
assertHasErrorMessage(result, LINGER_MS_CONFIG, "can not be larger than");
132+
assertHasErrorMessage(result, FLUSH_TIMEOUT_MS_CONFIG, "can not be larger than");
133+
}
134+
135+
@Test
136+
public void testValidLingerMs() {
137+
props.put(LINGER_MS_CONFIG, "1");
138+
props.put(FLUSH_TIMEOUT_MS_CONFIG, "2");
139+
validator = new Validator(props);
140+
141+
Config result = validator.validate();
142+
assertNoErrors(result);
143+
}
144+
145+
@Test
146+
public void testInvalidMaxBufferedRecords() {
147+
props.put(MAX_BUFFERED_RECORDS_CONFIG, "1");
148+
props.put(BATCH_SIZE_CONFIG, "2");
149+
props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "2");
150+
validator = new Validator(props);
151+
152+
Config result = validator.validate();
153+
assertHasErrorMessage(result, MAX_BUFFERED_RECORDS_CONFIG, "must be larger than or equal to");
154+
assertHasErrorMessage(result, BATCH_SIZE_CONFIG, "must be larger than or equal to");
155+
assertHasErrorMessage(result, MAX_IN_FLIGHT_REQUESTS_CONFIG, "must be larger than or equal to");
156+
}
157+
158+
@Test
159+
public void testValidMaxBufferedRecords() {
160+
props.put(MAX_BUFFERED_RECORDS_CONFIG, "5");
161+
props.put(BATCH_SIZE_CONFIG, "2");
162+
props.put(MAX_IN_FLIGHT_REQUESTS_CONFIG, "2");
163+
validator = new Validator(props);
164+
165+
Config result = validator.validate();
166+
assertNoErrors(result);
167+
}
168+
169+
private static void assertHasErrorMessage(Config config, String property, String msg) {
170+
for (ConfigValue configValue : config.configValues()) {
171+
if (configValue.name().equals(property)) {
172+
assertFalse(configValue.errorMessages().isEmpty());
173+
assertTrue(configValue.errorMessages().get(0).contains(msg));
174+
}
175+
}
176+
}
177+
178+
private static void assertNoErrors(Config config) {
179+
config.configValues().forEach(c -> assertTrue(c.errorMessages().isEmpty()));
180+
}
181+
}

0 commit comments

Comments
 (0)