Skip to content

Commit 9a5ee3d

Browse files
Merge pull request #64 from rabbitmq/rabbitmq-jms-client-53-add-flag-to-throw-exception
Add flag to throw exception on consumer startup failure
2 parents 0fcb6c8 + 2c5f373 commit 9a5ee3d

File tree

10 files changed

+242
-39
lines changed

10 files changed

+242
-39
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
119119
public void postProcess(com.rabbitmq.client.ConnectionFactory connectionFactory) { }
120120
};
121121

122+
/**
123+
* Whether an exception should be thrown or not when consumer startup fails.
124+
* <p>
125+
* Default is false.
126+
*
127+
* @since 1.10.0
128+
*/
129+
private boolean throwExceptionOnConsumerStartFailure = false;
130+
122131
/** Default not to use ssl */
123132
private boolean ssl = false;
124133
private String tlsProtocol;
@@ -249,6 +258,7 @@ protected Connection createConnection(String username, String password, Connecti
249258
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
250259
.setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
251260
.setAmqpPropertiesCustomiser(amqpPropertiesCustomiser)
261+
.setThrowExceptionOnConsumerStartFailure(this.throwExceptionOnConsumerStartFailure)
252262
);
253263
conn.setTrustedPackages(this.trustedPackages);
254264
logger.debug("Connection {} created.", conn);
@@ -888,6 +898,18 @@ public void setAmqpConnectionFactoryPostProcessor(AmqpConnectionFactoryPostProce
888898
this.amqpConnectionFactoryPostProcessor = amqpConnectionFactoryPostProcessor;
889899
}
890900

901+
/**
902+
* Whether an exception should be thrown or not when consumer startup fails.
903+
* <p>
904+
* Default is false.
905+
*
906+
* @param throwExceptionOnConsumerStartFailure
907+
* @since 1.10.0
908+
*/
909+
public void setThrowExceptionOnConsumerStartFailure(boolean throwExceptionOnConsumerStartFailure) {
910+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
911+
}
912+
891913
private interface ConnectionCreator {
892914
com.rabbitmq.client.Connection create(com.rabbitmq.client.ConnectionFactory cf) throws Exception;
893915
}

src/main/java/com/rabbitmq/jms/client/ConnectionParams.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,27 @@ public class ConnectionParams {
5050
* If set to true, those queues will be deleted when the session is closed.
5151
* If set to false, queues will be deleted when the owning connection is closed.
5252
* Default is false.
53+
*
5354
* @since 1.8.0
5455
*/
5556
private boolean cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = false;
5657

5758
/**
5859
* Callback to customise properties of outbound AMQP messages.
60+
*
5961
* @since 1.9.0
6062
*/
6163
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
6264

65+
/**
66+
* Whether an exception should be thrown or not when consumer startup fails.
67+
* <p>
68+
* Default is false.
69+
*
70+
* @since 1.10.0
71+
*/
72+
private boolean throwExceptionOnConsumerStartFailure;
73+
6374
public Connection getRabbitConnection() {
6475
return rabbitConnection;
6576
}
@@ -140,4 +151,13 @@ public ConnectionParams setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amq
140151
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
141152
return this;
142153
}
154+
155+
public ConnectionParams setThrowExceptionOnConsumerStartFailure(boolean throwExceptionOnConsumerStartFailure) {
156+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
157+
return this;
158+
}
159+
160+
public boolean willThrowExceptionOnConsumerStartFailure() {
161+
return throwExceptionOnConsumerStartFailure;
162+
}
143163
}

src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -8,6 +8,7 @@
88
import javax.jms.JMSException;
99
import javax.jms.MessageListener;
1010

11+
import com.rabbitmq.jms.util.RMQJMSException;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
1314

@@ -39,6 +40,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
3940
private final long terminationTimeout;
4041
private volatile boolean rejecting;
4142
private final boolean requeueOnMessageListenerException;
43+
private final boolean throwExceptionOnStartFailure;
4244

4345
/**
4446
* Constructor
@@ -48,7 +50,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
4850
* @param terminationTimeout wait time (in nanoseconds) for cancel to take effect
4951
*/
5052
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
51-
boolean requeueOnMessageListenerException) {
53+
boolean requeueOnMessageListenerException, boolean throwExceptionOnStartFailure) {
5254
this.messageConsumer = messageConsumer;
5355
this.channel = channel;
5456
this.messageListener = messageListener;
@@ -57,6 +59,7 @@ public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel chann
5759
this.completion = new Completion(); // completed when cancelled.
5860
this.rejecting = this.messageConsumer.getSession().getConnection().isStopped();
5961
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
62+
this.throwExceptionOnStartFailure = throwExceptionOnStartFailure;
6063
}
6164

6265
private String getConsTag() {
@@ -216,7 +219,7 @@ public void stop() {
216219
}
217220

218221
@Override
219-
public void start() {
222+
public void start() throws Exception {
220223
String cT = this.getConsTag();
221224
logger.trace("consumerTag='{}'", cT);
222225
this.rejecting = false;
@@ -226,6 +229,9 @@ public void start() {
226229
} catch (Exception e) {
227230
this.completion.setComplete(); // just in case someone is waiting on it
228231
logger.error("basicConsume (consumerTag='{}') threw exception", cT, e);
232+
if (throwExceptionOnStartFailure) {
233+
throw new RMQJMSException("Error while starting consumer", e);
234+
}
229235
}
230236
}
231237

src/main/java/com/rabbitmq/jms/client/RMQConnection.java

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,32 @@
11
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.ShutdownListener;
6+
import com.rabbitmq.client.ShutdownSignalException;
7+
import com.rabbitmq.jms.util.RMQJMSException;
8+
import com.rabbitmq.jms.util.WhiteListObjectInputStream;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import javax.jms.Connection;
13+
import javax.jms.ConnectionConsumer;
14+
import javax.jms.ConnectionMetaData;
15+
import javax.jms.Destination;
16+
import javax.jms.ExceptionListener;
17+
import javax.jms.IllegalStateException;
18+
import javax.jms.InvalidClientIDException;
19+
import javax.jms.JMSException;
20+
import javax.jms.Message;
21+
import javax.jms.MessageProducer;
22+
import javax.jms.Queue;
23+
import javax.jms.QueueConnection;
24+
import javax.jms.QueueSession;
25+
import javax.jms.ServerSessionPool;
26+
import javax.jms.Session;
27+
import javax.jms.Topic;
28+
import javax.jms.TopicConnection;
29+
import javax.jms.TopicSession;
430
import java.io.IOException;
531
import java.util.ArrayList;
632
import java.util.Collections;
@@ -10,18 +36,6 @@
1036
import java.util.concurrent.atomic.AtomicBoolean;
1137
import java.util.concurrent.atomic.AtomicReference;
1238

13-
import javax.jms.*;
14-
import javax.jms.IllegalStateException;
15-
16-
import com.rabbitmq.jms.util.WhiteListObjectInputStream;
17-
import org.slf4j.Logger;
18-
import org.slf4j.LoggerFactory;
19-
20-
import com.rabbitmq.client.Channel;
21-
import com.rabbitmq.client.ShutdownListener;
22-
import com.rabbitmq.client.ShutdownSignalException;
23-
import com.rabbitmq.jms.util.RMQJMSException;
24-
2539
/**
2640
* Implementation of the {@link Connection}, {@link QueueConnection} and {@link TopicConnection} interfaces.
2741
* A {@link RMQConnection} object holds a list of {@link RMQSession} objects as well as the actual
@@ -102,10 +116,20 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
102116

103117
/**
104118
* Callback to customise properties of outbound AMQP messages.
119+
*
105120
* @since 1.9.0
106121
*/
107122
private final AmqpPropertiesCustomiser amqpPropertiesCustomiser;
108123

124+
/**
125+
* Whether an exception should be thrown or not when consumer startup fails.
126+
* <p>
127+
* Default is false.
128+
*
129+
* @since 1.10.0
130+
*/
131+
private final boolean throwExceptionOnConsumerStartFailure;
132+
109133
/**
110134
* Classes in these packages can be transferred via ObjectMessage.
111135
*
@@ -130,6 +154,7 @@ public RMQConnection(ConnectionParams connectionParams) {
130154
this.requeueOnMessageListenerException = connectionParams.willRequeueOnMessageListenerException();
131155
this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = connectionParams.isCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose();
132156
this.amqpPropertiesCustomiser = connectionParams.getAmqpPropertiesCustomiser();
157+
this.throwExceptionOnConsumerStartFailure = connectionParams.willThrowExceptionOnConsumerStartFailure();
133158
}
134159

135160
/**
@@ -179,6 +204,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
179204
.setRequeueOnMessageListenerException(this.requeueOnMessageListenerException)
180205
.setCleanUpServerNamedQueuesForNonDurableTopics(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
181206
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
207+
.setThrowExceptionOnConsumerStartFailure(this.throwExceptionOnConsumerStartFailure)
182208
);
183209
session.setTrustedPackages(this.trustedPackages);
184210
this.sessions.add(session);

src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -17,6 +17,7 @@
1717
import javax.jms.Topic;
1818
import javax.jms.TopicSubscriber;
1919

20+
import com.rabbitmq.jms.util.RMQJMSException;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

@@ -83,6 +84,11 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
8384
*/
8485
private final boolean requeueOnMessageListenerException;
8586

87+
/**
88+
* Whether an exception should be thrown or not when consumer startup fails.
89+
*/
90+
private final boolean throwExceptionOnConsumerStartFailure;
91+
8692
/**
8793
* Creates a RMQMessageConsumer object. Internal constructor used by {@link RMQSession}
8894
*
@@ -92,8 +98,10 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
9298
* unique name.
9399
* @param paused - true if the connection is {@link javax.jms.Connection#stop}ped, false otherwise.
94100
* @param requeueOnMessageListenerException true to requeue message on RuntimeException in listener, false otherwise
101+
* @param throwExceptionOnConsumerStartFailure true to throw an exception if start-up fails, false otherwise
95102
*/
96-
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException) {
103+
RMQMessageConsumer(RMQSession session, RMQDestination destination, String uuidTag, boolean paused, String messageSelector, boolean requeueOnMessageListenerException,
104+
boolean throwExceptionOnConsumerStartFailure) {
97105
this.session = session;
98106
this.destination = destination;
99107
this.uuidTag = uuidTag;
@@ -103,6 +111,7 @@ public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, Topic
103111
this.receiveManager.openGate();
104112
this.autoAck = session.isAutoAck();
105113
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
114+
this.throwExceptionOnConsumerStartFailure = throwExceptionOnConsumerStartFailure;
106115
}
107116

108117
/**
@@ -179,23 +188,30 @@ public void setMessageListener(MessageListener messageListener) throws JMSExcept
179188
logger.trace("setting MessageListener({})", messageListener);
180189
this.removeListenerConsumer(); // if there is any
181190
this.messageListener = messageListener;
182-
this.setNewListenerConsumer(messageListener); // if needed
191+
try {
192+
this.setNewListenerConsumer(messageListener); // if needed
193+
} catch (JMSException e) {
194+
throw e;
195+
} catch (Exception e) {
196+
throw new RMQJMSException(e);
197+
}
183198
}
184199

185200
/**
186201
* Create a new RabitMQ Consumer, if necessary.
187202
* @param messageListener to drive from Consumer; no Consumer is created if this is null.
188203
* @throws IllegalStateException
189204
*/
190-
private void setNewListenerConsumer(MessageListener messageListener) throws IllegalStateException {
205+
private void setNewListenerConsumer(MessageListener messageListener) throws Exception {
191206
if (messageListener != null) {
192207
MessageListenerConsumer mlConsumer =
193208
new MessageListenerConsumer(this,
194209
getSession().getChannel(),
195210
messageListener,
196211
TimeUnit.MILLISECONDS.toNanos(this.session.getConnection()
197212
.getTerminationTimeout()),
198-
this.requeueOnMessageListenerException);
213+
this.requeueOnMessageListenerException,
214+
this.throwExceptionOnConsumerStartFailure);
199215
if (this.listenerConsumer.compareAndSet(null, mlConsumer)) {
200216
this.abortables.add(mlConsumer);
201217
if (!this.getSession().getConnection().isStopped()) {
@@ -404,7 +420,13 @@ void internalClose() throws JMSException {
404420
/* stop and remove any active subscription - waits for onMessage processing to finish */
405421
this.removeListenerConsumer();
406422

407-
this.abortables.abort(); // abort Consumers of both types that remain
423+
try {
424+
this.abortables.abort(); // abort Consumers of both types that remain
425+
} catch (JMSException e) {
426+
throw e;
427+
} catch (Exception e) {
428+
throw new RMQJMSException(e);
429+
}
408430

409431
this.closed = true;
410432
this.closing = false;
@@ -471,7 +493,7 @@ private boolean getNoLocalNoException() {
471493
*
472494
* @throws InterruptedException if the thread is interrupted
473495
*/
474-
void pause() throws InterruptedException {
496+
void pause() throws Exception {
475497
this.receiveManager.closeGate();
476498
this.receiveManager.waitToClear(new TimeTracker(STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS));
477499
this.abortables.stop();
@@ -484,7 +506,13 @@ void pause() throws InterruptedException {
484506
* @throws javax.jms.JMSException if the thread is interrupted
485507
*/
486508
void resume() throws JMSException {
487-
this.abortables.start(); // async listener restarted
509+
try {
510+
this.abortables.start(); // async listener restarted
511+
} catch (JMSException e) {
512+
throw e;
513+
} catch (Exception e) {
514+
throw new RMQJMSException(e);
515+
}
488516
this.receiveManager.openGate(); // sync listener allowed to run
489517
}
490518

0 commit comments

Comments
 (0)