Skip to content

Commit a12b476

Browse files
authored
Merge pull request #48 from DarioBalinzo/feature/44/add-elastic-ssl-config
Feature/44/add elastic ssl config
2 parents 7f5a07c + 4bef0ef commit a12b476

File tree

8 files changed

+251
-23
lines changed

8 files changed

+251
-23
lines changed

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,33 @@ to avoid data losses when paginating (available starting from versions >= 1.4).
131131
* Type: any
132132
* Importance: low
133133

134+
135+
``es.tls.truststore.location``
136+
Elastic ssl truststore location
137+
138+
* Type: string
139+
* Importance: medium
140+
141+
``es.tls.truststore.password``
142+
Elastic ssl truststore password
143+
144+
* Type: string
145+
* Default: ""
146+
* Importance: medium
147+
148+
``es.tls.keystore.location``
149+
Elasticsearch keystore location
150+
151+
* Type: string
152+
* Importance: medium
153+
154+
``es.tls.keystore.password``
155+
Elasticsearch keystore password
156+
157+
* Type: string
158+
* Default: ""
159+
* Importance: medium
160+
134161
``connection.attempts``
135162
Maximum number of attempts to retrieve a valid Elasticsearch connection.
136163

doc/README.md

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,40 @@ The name of the strictly incrementing field to use to detect new records.
124124
* Importance: high
125125

126126
``incrementing.secondary.field.name``
127-
In case the main incrementing field may have duplicates,
128-
this secondary field is used as a secondary sort field in order
127+
In case the main incrementing field may have duplicates,
128+
this secondary field is used as a secondary sort field in order
129129
to avoid data losses when paginating (available starting from versions >= 1.4).
130130

131131
* Type: any
132132
* Importance: low
133133

134+
135+
``es.tls.truststore.location``
136+
Elastic ssl truststore location
137+
138+
* Type: string
139+
* Importance: medium
140+
141+
``es.tls.truststore.password``
142+
Elastic ssl truststore password
143+
144+
* Type: string
145+
* Default: ""
146+
* Importance: medium
147+
148+
``es.tls.keystore.location``
149+
Elasticsearch keystore location
150+
151+
* Type: string
152+
* Importance: medium
153+
154+
``es.tls.keystore.password``
155+
Elasticsearch keystore password
156+
157+
* Type: string
158+
* Default: ""
159+
* Importance: medium
160+
134161
``connection.attempts``
135162
Maximum number of attempts to retrieve a valid Elasticsearch connection.
136163

@@ -152,6 +179,13 @@ Indices prefix to include in copying.
152179
* Default: ""
153180
* Importance: medium
154181

182+
``index.names``
183+
List of elasticsearch indices: `es1,es2,es3`
184+
185+
* Type: string
186+
* Default: null
187+
* Importance: medium
188+
155189
### Connector Configuration
156190

157191
``poll.interval.ms``
@@ -183,6 +217,15 @@ nested fields. To provide multiple fields use `;` as separator
183217
* Importance: medium
184218
* Default: null
185219

220+
``filters.blacklist``
221+
Blacklist filter for extracting a subset of fields from elastic-search json documents. The blacklist filter supports
222+
nested fields. To provide multiple fields use `;` as separator
223+
(e.g. `customer;order.qty;order.price`).
224+
225+
* Type: string
226+
* Importance: medium
227+
* Default: null
228+
186229
``filters.json_cast``
187230
This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema. The json-cast
188231
filter supports nested fields. To provide multiple fields use `;` as separator

src/main/java/com/github/dariobalinzo/ElasticSourceConnector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,27 @@ public void start(Map<String, String> props) {
7474
.withMaxAttempts(maxConnectionAttempts)
7575
.withBackoff(connectionRetryBackoff);
7676

77+
String truststore = config.getString(ElasticSourceConnectorConfig.ES_TRUSTSTORE_CONF);
78+
String truststorePass = config.getString(ElasticSourceConnectorConfig.ES_TRUSTSTORE_PWD_CONF);
79+
String keystore = config.getString(ElasticSourceConnectorConfig.ES_KEYSTORE_CONF);
80+
String keystorePass = config.getString(ElasticSourceConnectorConfig.ES_KEYSTORE_PWD_CONF);
81+
82+
if (truststore != null) {
83+
connectionBuilder.withTrustStore(truststore, truststorePass);
84+
}
85+
86+
if (keystore != null) {
87+
connectionBuilder.withKeyStore(keystore, keystorePass);
88+
}
89+
7790
if (esUser == null || esUser.isEmpty()) {
7891
elasticConnection = connectionBuilder.build();
7992
} else {
8093
elasticConnection = connectionBuilder.withUser(esUser)
8194
.withPassword(esPwd)
8295
.build();
8396
}
97+
8498
elasticRepository = new ElasticRepository(elasticConnection);
8599
}
86100

src/main/java/com/github/dariobalinzo/ElasticSourceConnectorConfig.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ public class ElasticSourceConnectorConfig extends AbstractConfig {
5050
private final static String ES_PWD_DOC = "Elasticsearch password";
5151
private final static String ES_PWD_DISPLAY = "Elasticsearch password";
5252

53+
public final static String ES_KEYSTORE_CONF = "es.tls.keystore.location";
54+
private final static String ES_KEYSTORE_DOC = "Elasticsearch keystore location";
55+
56+
public final static String ES_KEYSTORE_PWD_CONF = "es.tls.keystore.password";
57+
private final static String ES_KEYSTORE_PWD_DOC = "Elasticsearch keystore password";
58+
59+
public final static String ES_TRUSTSTORE_CONF = "es.tls.truststore.location";
60+
private final static String ES_TRUSTSTORE_DOC = "Elasticsearch truststore location";
61+
62+
public final static String ES_TRUSTSTORE_PWD_CONF = "es.tls.truststore.password";
63+
private final static String ES_TRUSTSTORE_PWD_DOC = "Elasticsearch truststore password";
64+
5365
public static final String CONNECTION_ATTEMPTS_CONFIG = "connection.attempts";
5466
private static final String CONNECTION_ATTEMPTS_DOC
5567
= "Maximum number of attempts to retrieve a valid Elasticsearch connection.";
@@ -200,6 +212,46 @@ private static void addDatabaseOptions(ConfigDef config) {
200212
++orderInGroup,
201213
Width.SHORT,
202214
ES_PWD_DISPLAY
215+
).define(
216+
ES_KEYSTORE_CONF,
217+
Type.STRING,
218+
null,
219+
Importance.MEDIUM,
220+
ES_KEYSTORE_DOC,
221+
DATABASE_GROUP,
222+
++orderInGroup,
223+
Width.SHORT,
224+
ES_KEYSTORE_DOC
225+
).define(
226+
ES_KEYSTORE_PWD_CONF,
227+
Type.STRING,
228+
"",
229+
Importance.MEDIUM,
230+
ES_KEYSTORE_PWD_DOC,
231+
DATABASE_GROUP,
232+
++orderInGroup,
233+
Width.SHORT,
234+
ES_KEYSTORE_PWD_DOC
235+
).define(
236+
ES_TRUSTSTORE_CONF,
237+
Type.STRING,
238+
null,
239+
Importance.MEDIUM,
240+
ES_TRUSTSTORE_DOC,
241+
DATABASE_GROUP,
242+
++orderInGroup,
243+
Width.SHORT,
244+
ES_TRUSTSTORE_DOC
245+
).define(
246+
ES_TRUSTSTORE_PWD_CONF,
247+
Type.STRING,
248+
"",
249+
Importance.MEDIUM,
250+
ES_TRUSTSTORE_PWD_DOC,
251+
DATABASE_GROUP,
252+
++orderInGroup,
253+
Width.SHORT,
254+
ES_TRUSTSTORE_PWD_DOC
203255
).define(
204256
CONNECTION_ATTEMPTS_CONFIG,
205257
Type.STRING,

src/main/java/com/github/dariobalinzo/elastic/ElasticConnection.java

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,61 +21,114 @@
2121
import org.apache.http.auth.UsernamePasswordCredentials;
2222
import org.apache.http.client.CredentialsProvider;
2323
import org.apache.http.impl.client.BasicCredentialsProvider;
24+
import org.apache.http.ssl.SSLContextBuilder;
25+
import org.apache.http.ssl.SSLContexts;
2426
import org.elasticsearch.client.RestClient;
2527
import org.elasticsearch.client.RestHighLevelClient;
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

31+
import javax.net.ssl.SSLContext;
2932
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.nio.file.Files;
35+
import java.nio.file.Path;
36+
import java.nio.file.Paths;
37+
import java.security.KeyStore;
3038
import java.util.Arrays;
39+
import java.util.Objects;
3140

3241
public class ElasticConnection {
3342
public final static Logger logger = LoggerFactory.getLogger(ElasticConnection.class);
3443

3544
private RestHighLevelClient client;
3645
private final long connectionRetryBackoff;
3746
private final int maxConnectionAttempts;
47+
private final String hosts;
48+
private final String protocol;
49+
private final int port;
50+
private final SSLContext sslContext;
51+
private final CredentialsProvider credentialsProvider;
3852

3953
ElasticConnection(ElasticConnectionBuilder builder) {
40-
String hosts = builder.hosts;
54+
hosts = builder.hosts;
55+
protocol = builder.protocol;
56+
port = builder.port;
57+
4158
String user = builder.user;
4259
String pwd = builder.pwd;
43-
String protocol = builder.protocol;
44-
int port = builder.port;
45-
46-
if (user == null) {
47-
createConnectionWithNoAuth(hosts, protocol, port);
60+
if (user != null) {
61+
credentialsProvider = new BasicCredentialsProvider();
62+
credentialsProvider.setCredentials(AuthScope.ANY,
63+
new UsernamePasswordCredentials(user, pwd));
4864
} else {
49-
createConnectionUsingAuth(hosts, protocol, port, user, pwd);
65+
credentialsProvider = null;
5066
}
5167

68+
sslContext = builder.trustStorePath == null ? null :
69+
getSslContext(
70+
builder.trustStorePath,
71+
builder.trustStorePassword,
72+
builder.keyStorePath,
73+
builder.keyStorePassword
74+
);
75+
76+
createConnection();
77+
5278
this.maxConnectionAttempts = builder.maxConnectionAttempts;
5379
this.connectionRetryBackoff = builder.connectionRetryBackoff;
5480
}
5581

56-
private void createConnectionWithNoAuth(String hosts, String protocol, int port) {
57-
logger.info("elastic auth disabled");
58-
HttpHost[] hostList = parseHosts(hosts, protocol, port);
59-
client = new RestHighLevelClient(RestClient.builder(hostList));
60-
}
61-
62-
private void createConnectionUsingAuth(String hosts, String protocol, int port, String user, String pwd) {
63-
logger.info("elastic auth enabled");
64-
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
65-
credentialsProvider.setCredentials(AuthScope.ANY,
66-
new UsernamePasswordCredentials(user, pwd));
67-
68-
82+
private void createConnection() {
6983
HttpHost[] hostList = parseHosts(hosts, protocol, port);
7084

7185
client = new RestHighLevelClient(
7286
RestClient.builder(hostList)
7387
.setHttpClientConfigCallback(
74-
httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
88+
httpClientBuilder -> {
89+
if (credentialsProvider != null) {
90+
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
91+
}
92+
if (sslContext != null) {
93+
httpClientBuilder.setSSLContext(sslContext);
94+
}
95+
return httpClientBuilder;
96+
}
7597
)
7698
);
7799
}
78100

101+
private SSLContext getSslContext(String trustStoreConf, String trustStorePass,
102+
String keyStoreConf, String keyStorePass) {
103+
104+
Objects.requireNonNull(trustStoreConf, "truststore location is required");
105+
Objects.requireNonNull(trustStorePass, "truststore password is required");
106+
107+
try {
108+
Path trustStorePath = Paths.get(trustStoreConf);
109+
KeyStore truststore = KeyStore.getInstance("pkcs12");
110+
try (InputStream is = Files.newInputStream(trustStorePath)) {
111+
truststore.load(is, trustStorePass.toCharArray());
112+
}
113+
SSLContextBuilder sslBuilder = SSLContexts.custom()
114+
.loadTrustMaterial(truststore, null);
115+
116+
if (keyStoreConf != null) {
117+
Objects.requireNonNull(keyStorePass, "keystore password is required");
118+
Path keyStorePath = Paths.get(keyStoreConf);
119+
KeyStore keyStore = KeyStore.getInstance("pkcs12");
120+
try (InputStream is = Files.newInputStream(keyStorePath)) {
121+
keyStore.load(is, keyStorePass.toCharArray());
122+
}
123+
sslBuilder.loadKeyMaterial(keyStore, keyStorePass.toCharArray());
124+
}
125+
126+
return sslBuilder.build();
127+
} catch (Exception e) {
128+
throw new SslContextException(e);
129+
}
130+
}
131+
79132
private HttpHost[] parseHosts(String hosts, String protocol, int port) {
80133
return Arrays.stream(hosts.split(";"))
81134
.map(host -> new HttpHost(host, port, protocol))

src/main/java/com/github/dariobalinzo/elastic/ElasticConnectionBuilder.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ public class ElasticConnectionBuilder {
2626
String user;
2727
String pwd;
2828

29+
String trustStorePath;
30+
String trustStorePassword;
31+
String keyStorePath;
32+
String keyStorePassword;
33+
2934
public ElasticConnectionBuilder(String hosts, int port) {
3035
this.hosts = hosts;
3136
this.port = port;
@@ -56,6 +61,18 @@ public ElasticConnectionBuilder withBackoff(long connectionRetryBackoff) {
5661
return this;
5762
}
5863

64+
public ElasticConnectionBuilder withTrustStore(String path, String password) {
65+
this.trustStorePath = path;
66+
this.trustStorePassword = password;
67+
return this;
68+
}
69+
70+
public ElasticConnectionBuilder withKeyStore(String path, String password) {
71+
this.keyStorePath = path;
72+
this.keyStorePassword = password;
73+
return this;
74+
}
75+
5976
public ElasticConnection build() {
6077
return new ElasticConnection(this);
6178
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.github.dariobalinzo.elastic;
2+
3+
public class SslContextException extends RuntimeException {
4+
public SslContextException(Exception e) {
5+
super(e);
6+
}
7+
}

0 commit comments

Comments
 (0)