Skip to content

Commit eb53845

Browse files
committed
Add URIs option to RMQConnectionFactory
Allows to specify several nodes URIs to connect to. [#157911716] Fixes #50 (cherry picked from commit f313c9c) Conflicts: src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java
1 parent 8d5d8d8 commit eb53845

File tree

2 files changed

+211
-70
lines changed

2 files changed

+211
-70
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 116 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@
3333
import javax.net.ssl.SSLException;
3434
import java.io.IOException;
3535
import java.io.Serializable;
36+
import java.net.URI;
37+
import java.net.URISyntaxException;
3638
import java.security.NoSuchAlgorithmException;
39+
import java.util.ArrayList;
3740
import java.util.List;
3841
import java.util.concurrent.TimeoutException;
3942

@@ -146,6 +149,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
146149
*/
147150
private List<String> trustedPackages = WhiteListObjectInputStream.DEFAULT_TRUSTED_PACKAGES;
148151

152+
/**
153+
* List of nodes URIs to connect to.
154+
*
155+
* @since 1.10.0
156+
*/
157+
private List<URI> uris = new ArrayList<URI>();
158+
149159
/**
150160
* {@inheritDoc}
151161
*/
@@ -163,43 +173,56 @@ public Connection createConnection(List<Address> endpoints) throws JMSException
163173
*/
164174
@Override
165175
public Connection createConnection(String username, String password) throws JMSException {
166-
logger.trace("Creating a connection for username '{}', password 'xxxxxxxx'.", username);
167-
this.username = username;
168-
this.password = password;
169-
// Create a new factory and set the properties
170-
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
171-
setRabbitUri(logger, this, factory, this.getUri());
172-
maybeEnableTLS(factory);
173-
maybeEnableHostnameVerification(factory);
174-
factory.setMetricsCollector(this.metricsCollector);
175-
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(factory);
176+
if (this.uris == null || this.uris.isEmpty()) {
177+
return createConnection(username, password, new ConnectionCreator() {
176178

177-
RMQConnection conn = new RMQConnection(new ConnectionParams()
178-
.setRabbitConnection(rabbitConnection)
179-
.setTerminationTimeout(getTerminationTimeout())
180-
.setQueueBrowserReadMax(getQueueBrowserReadMax())
181-
.setOnMessageTimeoutMs(getOnMessageTimeoutMs())
182-
.setChannelsQos(channelsQos)
183-
.setPreferProducerMessageProperty(preferProducerMessageProperty)
184-
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
185-
.setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
186-
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
187-
);
188-
conn.setTrustedPackages(this.trustedPackages);
189-
logger.debug("Connection {} created.", conn);
190-
return conn;
179+
@Override
180+
public com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception {
181+
return cf.newConnection();
182+
}
183+
});
184+
} else {
185+
final List<Address> addresses = new ArrayList<Address>(this.uris.size());
186+
for (URI uri : this.uris) {
187+
String host = uri.getHost();
188+
int port = uri.getPort();
189+
if (port == -1) {
190+
port = isSsl() ? DEFAULT_RABBITMQ_SSL_PORT :
191+
DEFAULT_RABBITMQ_PORT;
192+
}
193+
addresses.add(new Address(host, port));
194+
}
195+
return createConnection(username, password, new ConnectionCreator() {
196+
197+
@Override
198+
public com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception {
199+
return cf.newConnection(addresses);
200+
}
201+
});
202+
}
191203
}
192204

193-
public Connection createConnection(String username, String password, List<Address> endpoints)
205+
public Connection createConnection(String username, String password, final List<Address> endpoints)
194206
throws JMSException {
207+
return createConnection(username, password, new ConnectionCreator() {
208+
209+
@Override
210+
public com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception {
211+
return cf.newConnection(endpoints);
212+
}
213+
});
214+
}
215+
216+
protected Connection createConnection(String username, String password, ConnectionCreator connectionCreator) throws JMSException {
195217
logger.trace("Creating a connection for username '{}', password 'xxxxxxxx'.", username);
196218
this.username = username;
197219
this.password = password;
198-
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
220+
com.rabbitmq.client.ConnectionFactory cf = createConnectionFactory();
221+
setRabbitUri(logger, this, cf, getUri());
199222
maybeEnableTLS(cf);
200223
maybeEnableHostnameVerification(cf);
201224
cf.setMetricsCollector(this.metricsCollector);
202-
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, endpoints);
225+
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, connectionCreator);
203226

204227
RMQConnection conn = new RMQConnection(new ConnectionParams()
205228
.setRabbitConnection(rabbitConnection)
@@ -210,42 +233,21 @@ public Connection createConnection(String username, String password, List<Addres
210233
.setPreferProducerMessageProperty(preferProducerMessageProperty)
211234
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
212235
.setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
236+
.setAmqpPropertiesCustomiser(amqpPropertiesCustomiser)
213237
);
214238
conn.setTrustedPackages(this.trustedPackages);
215239
logger.debug("Connection {} created.", conn);
216240
return conn;
217241
}
218242

219-
private com.rabbitmq.client.Connection instantiateNodeConnection(com.rabbitmq.client.ConnectionFactory cf)
220-
throws JMSException {
221-
try {
222-
return cf.newConnection();
223-
} catch (SSLException ssle) {
224-
throw new RMQJMSSecurityException("SSL Exception establishing RabbitMQ Connection", ssle);
225-
} catch (Exception x) {
226-
if (x instanceof IOException) {
227-
IOException ioe = (IOException) x;
228-
String msg = ioe.getMessage();
229-
if (msg!=null) {
230-
if (msg.contains("authentication failure") || msg.contains("refused using authentication"))
231-
throw new RMQJMSSecurityException(ioe);
232-
else if (msg.contains("Connection refused"))
233-
throw new RMQJMSException("RabbitMQ connection was refused. RabbitMQ broker may not be available.", ioe);
234-
}
235-
throw new RMQJMSException(ioe);
236-
} else if (x instanceof TimeoutException) {
237-
TimeoutException te = (TimeoutException) x;
238-
throw new RMQJMSException("Timed out establishing RabbitMQ Connection", te);
239-
} else {
240-
throw new RMQJMSException("Unexpected exception thrown by newConnection()", x);
241-
}
242-
}
243+
protected com.rabbitmq.client.ConnectionFactory createConnectionFactory() {
244+
return new com.rabbitmq.client.ConnectionFactory();
243245
}
244246

245-
private com.rabbitmq.client.Connection instantiateNodeConnection(
246-
com.rabbitmq.client.ConnectionFactory cf, List<Address> endpoints) throws JMSException {
247+
private com.rabbitmq.client.Connection instantiateNodeConnection(com.rabbitmq.client.ConnectionFactory cf, ConnectionCreator connectionCreator)
248+
throws JMSException {
247249
try {
248-
return cf.newConnection(endpoints);
250+
return connectionCreator.create(cf);
249251
} catch (SSLException ssle) {
250252
throw new RMQJMSSecurityException("SSL Exception establishing RabbitMQ Connection", ssle);
251253
} catch (Exception x) {
@@ -301,16 +303,54 @@ public String toString() {
301303
*/
302304
public void setUri(String uriString) throws JMSException {
303305
logger.trace("Set connection factory parameters by URI '{}'", uriString);
304-
// Create a temp factory and set the properties by uri
305-
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
306-
setRabbitUri(logger, this, factory, uriString);
307-
// Now extract our properties from this factory, leaving the rest unchanged.
308-
this.host = factory.getHost();
309-
this.password = factory.getPassword();
310-
this.port = factory.getPort();
311-
this.ssl = factory.isSSL();
312-
this.username = factory.getUsername();
313-
this.virtualHost = factory.getVirtualHost();
306+
if (uriString != null && !uriString.trim().isEmpty()) {
307+
// Create a temp factory and set the properties by uri
308+
com.rabbitmq.client.ConnectionFactory factory = createConnectionFactory();
309+
setRabbitUri(logger, this, factory, uriString);
310+
// Now extract our properties from this factory, leaving the rest unchanged.
311+
this.host = factory.getHost();
312+
this.password = factory.getPassword();
313+
this.port = factory.getPort();
314+
this.ssl = factory.isSSL();
315+
this.username = factory.getUsername();
316+
this.virtualHost = factory.getVirtualHost();
317+
} else {
318+
this.host = null;
319+
this.password = null;
320+
this.port = -1;
321+
this.ssl = false;
322+
this.username = null;
323+
this.virtualHost = null;
324+
}
325+
}
326+
327+
/**
328+
* Sets the nodes URIs to connect to.
329+
*
330+
* @param urisAsStrings
331+
* @throws JMSException
332+
* @since 1.10.0
333+
*/
334+
public void setUris(List<String> urisAsStrings) throws JMSException {
335+
if (urisAsStrings != null && !urisAsStrings.isEmpty()) {
336+
List<URI> uris = new ArrayList<URI>(urisAsStrings.size());
337+
for (String uriAsString : urisAsStrings) {
338+
try {
339+
URI uri = new URI(uriAsString);
340+
if (uri.getScheme() == null || (!"amqp".equals(uri.getScheme()) && !"amqps".equals(uri.getScheme()))) {
341+
throw new IllegalArgumentException("Wrong scheme in AMQP URI: " + uri.getScheme());
342+
}
343+
uris.add(uri);
344+
} catch (URISyntaxException e) {
345+
throw new IllegalArgumentException("Invalid URI: " + uriAsString);
346+
}
347+
}
348+
this.uris = uris;
349+
this.setUri(urisAsStrings.get(0));
350+
} else {
351+
this.uris = null;
352+
setUri(null);
353+
}
314354
}
315355

316356
/**
@@ -810,5 +850,17 @@ public void setHostnameVerification(boolean hostnameVerification) {
810850
public void setHostnameVerifier(HostnameVerifier hostnameVerifier) {
811851
this.hostnameVerifier = hostnameVerifier;
812852
}
853+
854+
public List<String> getUris() {
855+
List<String> urisAsStrings = new ArrayList<String>(uris.size());
856+
for (URI uri : this.uris) {
857+
urisAsStrings.add(uri.toString());
858+
}
859+
return urisAsStrings;
860+
}
861+
862+
private interface ConnectionCreator {
863+
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;
864+
}
813865
}
814866

src/test/java/com/rabbitmq/jms/admin/RMQConnectionFactoryTest.java

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,30 @@
1-
package com.rabbitmq.jms.admin;
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
22

3-
import static org.junit.Assert.assertEquals;
3+
package com.rabbitmq.jms.admin;
44

5-
import java.util.Enumeration;
6-
import java.util.Hashtable;
7-
import java.util.Properties;
5+
import com.rabbitmq.client.Address;
6+
import com.rabbitmq.client.AddressResolver;
7+
import com.rabbitmq.client.Connection;
8+
import com.rabbitmq.client.ConnectionFactory;
9+
import org.junit.Before;
10+
import org.junit.Test;
811

912
import javax.naming.CompositeName;
1013
import javax.naming.RefAddr;
1114
import javax.naming.Reference;
1215
import javax.naming.StringRefAddr;
16+
import java.util.Enumeration;
17+
import java.util.Hashtable;
18+
import java.util.List;
19+
import java.util.Properties;
20+
import java.util.concurrent.ExecutorService;
1321

14-
import org.junit.Test;
22+
import static java.util.Arrays.asList;
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertFalse;
25+
import static org.junit.Assert.assertNotNull;
26+
import static org.junit.Assert.assertTrue;
27+
import static org.mockito.Mockito.mock;
1528

1629
public class RMQConnectionFactoryTest {
1730

@@ -194,4 +207,80 @@ public void testConnectionFactoryReferenceUpdated() throws Exception {
194207

195208
assertEquals("Not the correct uri", "amqps://fred:my-password@sillyHost:42/bill", newFactory.getUri());
196209
}
210+
211+
TestRmqConnectionFactory rmqCf;
212+
213+
AddressResolver passedInAddressResolver;
214+
215+
@Before
216+
public void init() {
217+
rmqCf = new TestRmqConnectionFactory();
218+
passedInAddressResolver = null;
219+
}
220+
221+
@Test(expected = IllegalArgumentException.class)
222+
public void shouldFailWhenOneOfUrisIsInvalid() throws Exception {
223+
rmqCf.setUris(asList("amqp://localhost", "invalid-amqp-uri"));
224+
}
225+
226+
@Test
227+
public void firstUriShouldBeAppliedToGlobalSettings() throws Exception {
228+
rmqCf.setUris(asList("amqp://user:pass@host1:10000/vhost", "amqp://user:pass@host2:10000/vhost"));
229+
assertEquals("host1", rmqCf.getHost());
230+
assertEquals("user", rmqCf.getUsername());
231+
assertEquals("pass", rmqCf.getPassword());
232+
assertEquals(10000, rmqCf.getPort());
233+
assertEquals("vhost", rmqCf.getVirtualHost());
234+
assertFalse(rmqCf.isSsl());
235+
}
236+
237+
@Test
238+
public void firstUriShouldBeAppliedToGlobalSettingsTls() throws Exception {
239+
rmqCf.setUris(asList("amqps://user:pass@host1:10000/vhost", "amqps://user:pass@host2:10000/vhost"));
240+
assertEquals("host1", rmqCf.getHost());
241+
assertEquals("user", rmqCf.getUsername());
242+
assertEquals("pass", rmqCf.getPassword());
243+
assertEquals(10000, rmqCf.getPort());
244+
assertEquals("vhost", rmqCf.getVirtualHost());
245+
assertTrue(rmqCf.isSsl());
246+
}
247+
248+
@Test
249+
public void shouldUseSingleAddressWhenSingleUri() throws Exception {
250+
rmqCf.setUri("amqp://localhost:10000");
251+
rmqCf.createConnection("guest", "guest");
252+
assertNotNull(passedInAddressResolver);
253+
List<Address> resolved = passedInAddressResolver.getAddresses();
254+
assertTrue(resolved.size() >= 1);
255+
assertTrue(resolved.size() <= 2);
256+
// don't check host, as there can be some DNS resolution happening
257+
assertEquals(10000, resolved.get(0).getPort());
258+
}
259+
260+
@Test
261+
public void shouldUseSeveralAddressesWhenUrisIsUsed() throws Exception {
262+
rmqCf.setUris(asList("amqps://user:pass@host1:10000/vhost", "amqps://user:pass@host2:10000/vhost"));
263+
rmqCf.createConnection("user", "pass");
264+
assertNotNull(passedInAddressResolver);
265+
assertEquals(2, passedInAddressResolver.getAddresses().size());
266+
assertEquals("host1", passedInAddressResolver.getAddresses().get(0).getHost());
267+
assertEquals(10000, passedInAddressResolver.getAddresses().get(0).getPort());
268+
assertEquals("host2", passedInAddressResolver.getAddresses().get(1).getHost());
269+
assertEquals(10000, passedInAddressResolver.getAddresses().get(1).getPort());
270+
}
271+
272+
class TestRmqConnectionFactory extends RMQConnectionFactory {
273+
274+
@Override
275+
protected ConnectionFactory createConnectionFactory() {
276+
return new ConnectionFactory() {
277+
278+
@Override
279+
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName) {
280+
passedInAddressResolver = addressResolver;
281+
return mock(Connection.class);
282+
}
283+
};
284+
}
285+
}
197286
}

0 commit comments

Comments
 (0)