Skip to content

Commit 2c5f373

Browse files
committed
Add flag to throw exception on consumer startup failure
The flag can be set in the RMQConnectionFactory. An exception will be thrown or ignored when a consumer startup fails, depending on the value of the flag. This flag allows to maintain backward compatibility, but the behavior in 2.0 will be to throw an exception (see #62). [#159272755] References #62 Fixes #53
1 parent 7bc7d94 commit 2c5f373

File tree

8 files changed

+162
-36
lines changed

8 files changed

+162
-36
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
119119
public void postProcess(com.rabbitmq.client.ConnectionFactory connectionFactory) { }
120120
};
121121

122+
/**
123+
* Whether an exception should be thrown or not when consumer startup fails.
124+
* <p>
125+
* Default is false.
126+
*
127+
* @since 1.10.0
128+
*/
129+
private boolean throwExceptionOnConsumerStartFailure = false;
130+
122131
/** Default not to use ssl */
123132
private boolean ssl = false;
124133
private String tlsProtocol;
@@ -249,6 +258,7 @@ protected Connection createConnection(String username, String password, Connecti
249258
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
250259
.setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
251260
.setAmqpPropertiesCustomiser(amqpPropertiesCustomiser)
261+
.setThrowExceptionOnConsumerStartFailure(this.throwExceptionOnConsumerStartFailure)
252262
);
253263
conn.setTrustedPackages(this.trustedPackages);
254264
logger.debug("Connection {} created.", conn);
@@ -888,6 +898,18 @@ public void setAmqpConnectionFactoryPostProcessor(AmqpConnectionFactoryPostProce
888898
this.amqpConnectionFactoryPostProcessor = amqpConnectionFactoryPostProcessor;
889899
}
890900

901+
/**
902+
* Whether an exception should be thrown or not when consumer startup fails.
903+
* <p>
904+
* Default is false.
905+
*
906+
* @param throwExceptionOnConsumerStartFailure
907+
* @since 1.10.0
908+
*/
909+
public void setThrowExceptionOnConsumerStartFailure(boolean throwExceptionOnConsumerStartFailure) {
910+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
911+
}
912+
891913
private interface ConnectionCreator {
892914
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;
893915
}

src/main/java/com/rabbitmq/jms/client/ConnectionParams.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,27 @@ public class ConnectionParams {
5050
* If set to true, those queues will be deleted when the session is closed.
5151
* If set to false, queues will be deleted when the owning connection is closed.
5252
* Default is false.
53+
*
5354
* @since 1.8.0
5455
*/
5556
private boolean cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = false;
5657

5758
/**
5859
* Callback to customise properties of outbound AMQP messages.
60+
*
5961
* @since 1.9.0
6062
*/
6163
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
6264

65+
/**
66+
* Whether an exception should be thrown or not when consumer startup fails.
67+
* <p>
68+
* Default is false.
69+
*
70+
* @since 1.10.0
71+
*/
72+
private boolean throwExceptionOnConsumerStartFailure;
73+
6374
public Connection getRabbitConnection() {
6475
return rabbitConnection;
6576
}
@@ -140,4 +151,13 @@ public ConnectionParams setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amq
140151
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
141152
return this;
142153
}
154+
155+
public ConnectionParams setThrowExceptionOnConsumerStartFailure(boolean throwExceptionOnConsumerStartFailure) {
156+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
157+
return this;
158+
}
159+
160+
public boolean willThrowExceptionOnConsumerStartFailure() {
161+
return throwExceptionOnConsumerStartFailure;
162+
}
143163
}

src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -40,6 +40,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
4040
private final long terminationTimeout;
4141
private volatile boolean rejecting;
4242
private final boolean requeueOnMessageListenerException;
43+
private final boolean throwExceptionOnStartFailure;
4344

4445
/**
4546
* Constructor
@@ -49,7 +50,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
4950
* @param terminationTimeout wait time (in nanoseconds) for cancel to take effect
5051
*/
5152
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
52-
boolean requeueOnMessageListenerException) {
53+
boolean requeueOnMessageListenerException, boolean throwExceptionOnStartFailure) {
5354
this.messageConsumer = messageConsumer;
5455
this.channel = channel;
5556
this.messageListener = messageListener;
@@ -58,6 +59,7 @@ public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel chann
5859
this.completion = new Completion(); // completed when cancelled.
5960
this.rejecting = this.messageConsumer.getSession().getConnection().isStopped();
6061
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
62+
this.throwExceptionOnStartFailure = throwExceptionOnStartFailure;
6163
}
6264

6365
private String getConsTag() {
@@ -227,7 +229,9 @@ public void start() throws Exception {
227229
} catch (Exception e) {
228230
this.completion.setComplete(); // just in case someone is waiting on it
229231
logger.error("basicConsume (consumerTag='{}') threw exception", cT, e);
230-
throw new RMQJMSException("Error while starting consumer", e);
232+
if (throwExceptionOnStartFailure) {
233+
throw new RMQJMSException("Error while starting consumer", e);
234+
}
231235
}
232236
}
233237

src/main/java/com/rabbitmq/jms/client/RMQConnection.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,32 @@
11
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.ShutdownListener;
6+
import com.rabbitmq.client.ShutdownSignalException;
7+
import com.rabbitmq.jms.util.RMQJMSException;
8+
import com.rabbitmq.jms.util.WhiteListObjectInputStream;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import javax.jms.Connection;
13+
import javax.jms.ConnectionConsumer;
14+
import javax.jms.ConnectionMetaData;
15+
import javax.jms.Destination;
16+
import javax.jms.ExceptionListener;
17+
import javax.jms.IllegalStateException;
18+
import javax.jms.InvalidClientIDException;
19+
import javax.jms.JMSException;
20+
import javax.jms.Message;
21+
import javax.jms.MessageProducer;
22+
import javax.jms.Queue;
23+
import javax.jms.QueueConnection;
24+
import javax.jms.QueueSession;
25+
import javax.jms.ServerSessionPool;
26+
import javax.jms.Session;
27+
import javax.jms.Topic;
28+
import javax.jms.TopicConnection;
29+
import javax.jms.TopicSession;
430
import java.io.IOException;
531
import java.util.ArrayList;
632
import java.util.Collections;
@@ -10,18 +36,6 @@
1036
import java.util.concurrent.atomic.AtomicBoolean;
1137
import java.util.concurrent.atomic.AtomicReference;
1238

13-
import javax.jms.*;
14-
import javax.jms.IllegalStateException;
15-
16-
import com.rabbitmq.jms.util.WhiteListObjectInputStream;
17-
import org.slf4j.Logger;
18-
import org.slf4j.LoggerFactory;
19-
20-
import com.rabbitmq.client.Channel;
21-
import com.rabbitmq.client.ShutdownListener;
22-
import com.rabbitmq.client.ShutdownSignalException;
23-
import com.rabbitmq.jms.util.RMQJMSException;
24-
2539
/**
2640
* Implementation of the {@link Connection}, {@link QueueConnection} and {@link TopicConnection} interfaces.
2741
* A {@link RMQConnection} object holds a list of {@link RMQSession} objects as well as the actual
@@ -102,10 +116,20 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
102116

103117
/**
104118
* Callback to customise properties of outbound AMQP messages.
119+
*
105120
* @since 1.9.0
106121
*/
107122
private final AmqpPropertiesCustomiser amqpPropertiesCustomiser;
108123

124+
/**
125+
* Whether an exception should be thrown or not when consumer startup fails.
126+
* <p>
127+
* Default is false.
128+
*
129+
* @since 1.10.0
130+
*/
131+
private final boolean throwExceptionOnConsumerStartFailure;
132+
109133
/**
110134
* Classes in these packages can be transferred via ObjectMessage.
111135
*
@@ -130,6 +154,7 @@ public RMQConnection(ConnectionParams connectionParams) {
130154
this.requeueOnMessageListenerException = connectionParams.willRequeueOnMessageListenerException();
131155
this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = connectionParams.isCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose();
132156
this.amqpPropertiesCustomiser = connectionParams.getAmqpPropertiesCustomiser();
157+
this.throwExceptionOnConsumerStartFailure = connectionParams.willThrowExceptionOnConsumerStartFailure();
133158
}
134159

135160
/**
@@ -179,6 +204,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
179204
.setRequeueOnMessageListenerException(this.requeueOnMessageListenerException)
180205
.setCleanUpServerNamedQueuesForNonDurableTopics(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
181206
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
207+
.setThrowExceptionOnConsumerStartFailure(this.throwExceptionOnConsumerStartFailure)
182208
);
183209
session.setTrustedPackages(this.trustedPackages);
184210
this.sessions.add(session);

src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -84,6 +84,11 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
8484
*/
8585
private final boolean requeueOnMessageListenerException;
8686

87+
/**
88+
* Whether an exception should be thrown or not when consumer startup fails.
89+
*/
90+
private final boolean throwExceptionOnConsumerStartFailure;
91+
8792
/**
8893
* Creates a RMQMessageConsumer object. Internal constructor used by {@link RMQSession}
8994
*
@@ -93,8 +98,10 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
9398
* unique name.
9499
* @param paused - true if the connection is {@link javax.jms.Connection#stop}ped, false otherwise.
95100
* @param requeueOnMessageListenerException true to requeue message on RuntimeException in listener, false otherwise
101+
* @param throwExceptionOnConsumerStartFailure true to throw an exception if start-up fails, false otherwise
96102
*/
97-
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException) {
103+
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException,
104+
boolean throwExceptionOnConsumerStartFailure) {
98105
this.session = session;
99106
this.destination = destination;
100107
this.uuidTag = uuidTag;
@@ -104,6 +111,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
104111
this.receiveManager.openGate();
105112
this.autoAck = session.isAutoAck();
106113
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
114+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
107115
}
108116

109117
/**
@@ -202,7 +210,8 @@ private void setNewListenerConsumer(MessageListener messageListener) throws Exce
202210
messageListener,
203211
TimeUnit.MILLISECONDS.toNanos(this.session.getConnection()
204212
.getTerminationTimeout()),
205-
this.requeueOnMessageListenerException);
213+
this.requeueOnMessageListenerException,
214+
this.throwExceptionOnConsumerStartFailure);
206215
if (this.listenerConsumer.compareAndSet(null, mlConsumer)) {
207216
this.abortables.add(mlConsumer);
208217
if (!this.getSession().getConnection().isStopped()) {

src/main/java/com/rabbitmq/jms/client/RMQSession.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public class RMQSession implements Session, QueueSession, TopicSession {
9292
* {@link javax.jms.MessageListener} or not.
9393
* Default is false.
9494
*/
95-
private boolean requeueOnMessageListenerException = false;
95+
private final boolean requeueOnMessageListenerException;
9696

9797
/**
9898
* Whether using auto-delete for server-named queues for non-durable topics.
@@ -109,6 +109,13 @@ public class RMQSession implements Session, QueueSession, TopicSession {
109109
*/
110110
private final AmqpPropertiesCustomiser amqpPropertiesCustomiser;
111111

112+
/**
113+
* Whether an exception should be thrown or not when consumer startup fails.
114+
*
115+
* @since 1.10.0
116+
*/
117+
private final boolean throwExceptionOnConsumerStartFailure;
118+
112119
/** The main RabbitMQ channel we use under the hood */
113120
private final Channel channel;
114121
/** Set to true if close() has been called and completed */
@@ -202,6 +209,7 @@ public RMQSession(SessionParams sessionParams) throws JMSException {
202209
this.requeueOnMessageListenerException = sessionParams.willRequeueOnMessageListenerException();
203210
this.cleanUpServerNamedQueuesForNonDurableTopics = sessionParams.isCleanUpServerNamedQueuesForNonDurableTopics();
204211
this.amqpPropertiesCustomiser = sessionParams.getAmqpPropertiesCustomiser();
212+
this.throwExceptionOnConsumerStartFailure = sessionParams.willThrowExceptionOnConsumerStartFailure();
205213

206214
if (transacted) {
207215
this.acknowledgeMode = Session.SESSION_TRANSACTED;
@@ -688,7 +696,8 @@ private RMQMessageConsumer createConsumerInternal(RMQDestination dest, String uu
688696
throw new RMQJMSException("RabbitMQ Exception creating Consumer", x);
689697
}
690698
}
691-
RMQMessageConsumer consumer = new RMQMessageConsumer(this, dest, consumerTag, getConnection().isStopped(), jmsSelector, this.requeueOnMessageListenerException);
699+
RMQMessageConsumer consumer = new RMQMessageConsumer(this, dest, consumerTag, getConnection().isStopped(), jmsSelector,
700+
this.requeueOnMessageListenerException, this.throwExceptionOnConsumerStartFailure);
692701
this.consumers.add(consumer);
693702
return consumer;
694703
}

src/main/java/com/rabbitmq/jms/client/SessionParams.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,29 @@ public class SessionParams {
4444
* Whether using auto-delete for server-named queues for non-durable topics.
4545
* If set to true, those queues will be deleted when the session is closed.
4646
* If set to false, queues will be deleted when the owning connection is closed.
47+
* <p>
4748
* Default is false.
49+
*
4850
* @since 1.8.0
4951
*/
5052
private boolean cleanUpServerNamedQueuesForNonDurableTopics = false;
5153

5254
/**
5355
* Callback to customise properties of outbound AMQP messages.
56+
*
5457
* @since 1.9.0
5558
*/
5659
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
5760

61+
/**
62+
* Whether an exception should be thrown or not when consumer startup fails.
63+
* <p>
64+
* Default is false.
65+
*
66+
* @since 1.10.0
67+
*/
68+
private boolean throwExceptionOnConsumerStartFailure= false;
69+
5870
public RMQConnection getConnection() {
5971
return connection;
6072
}
@@ -135,4 +147,13 @@ public SessionParams setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amqpPr
135147
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
136148
return this;
137149
}
150+
151+
public SessionParams setThrowExceptionOnConsumerStartFailure(boolean throwExceptionOnConsumerStartFailure) {
152+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
153+
return this;
154+
}
155+
156+
public boolean willThrowExceptionOnConsumerStartFailure() {
157+
return throwExceptionOnConsumerStartFailure;
158+
}
138159
}

0 commit comments

Comments
 (0)