Skip to content

Commit 7bc7d94

Browse files
committed
Throw JMS Exception when listening on non-existing destination
[#159272755] Fixes #62 (cherry picked from commit c89705a)
1 parent 0fcb6c8 commit 7bc7d94

File tree

6 files changed

+97
-20
lines changed

6 files changed

+97
-20
lines changed

src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import javax.jms.JMSException;
99
import javax.jms.MessageListener;
1010

11+
import com.rabbitmq.jms.util.RMQJMSException;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
1314

@@ -216,7 +217,7 @@ public void stop() {
216217
}
217218

218219
@Override
219-
public void start() {
220+
public void start() throws Exception {
220221
String cT = this.getConsTag();
221222
logger.trace("consumerTag='{}'", cT);
222223
this.rejecting = false;
@@ -226,6 +227,7 @@ public void start() {
226227
} catch (Exception e) {
227228
this.completion.setComplete(); // just in case someone is waiting on it
228229
logger.error("basicConsume (consumerTag='{}') threw exception", cT, e);
230+
throw new RMQJMSException("Error while starting consumer", e);
229231
}
230232
}
231233

src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import javax.jms.Topic;
1818
import javax.jms.TopicSubscriber;
1919

20+
import com.rabbitmq.jms.util.RMQJMSException;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
2223

@@ -179,15 +180,21 @@ public void setMessageListener(MessageListener messageListener) throws JMSExcept
179180
logger.trace("setting MessageListener({})", messageListener);
180181
this.removeListenerConsumer(); // if there is any
181182
this.messageListener = messageListener;
182-
this.setNewListenerConsumer(messageListener); // if needed
183+
try {
184+
this.setNewListenerConsumer(messageListener); // if needed
185+
} catch (JMSException e) {
186+
throw e;
187+
} catch (Exception e) {
188+
throw new RMQJMSException(e);
189+
}
183190
}
184191

185192
/**
186193
* Create a new RabitMQ Consumer, if necessary.
187194
* @param messageListener to drive from Consumer; no Consumer is created if this is null.
188195
* @throws IllegalStateException
189196
*/
190-
private void setNewListenerConsumer(MessageListener messageListener) throws IllegalStateException {
197+
private void setNewListenerConsumer(MessageListener messageListener) throws Exception {
191198
if (messageListener != null) {
192199
MessageListenerConsumer mlConsumer =
193200
new MessageListenerConsumer(this,
@@ -404,7 +411,13 @@ void internalClose() throws JMSException {
404411
/* stop and remove any active subscription - waits for onMessage processing to finish */
405412
this.removeListenerConsumer();
406413

407-
this.abortables.abort(); // abort Consumers of both types that remain
414+
try {
415+
this.abortables.abort(); // abort Consumers of both types that remain
416+
} catch (JMSException e) {
417+
throw e;
418+
} catch (Exception e) {
419+
throw new RMQJMSException(e);
420+
}
408421

409422
this.closed = true;
410423
this.closing = false;
@@ -471,7 +484,7 @@ private boolean getNoLocalNoException() {
471484
*
472485
* @throws InterruptedException if the thread is interrupted
473486
*/
474-
void pause() throws InterruptedException {
487+
void pause() throws Exception {
475488
this.receiveManager.closeGate();
476489
this.receiveManager.waitToClear(new TimeTracker(STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS));
477490
this.abortables.stop();
@@ -484,7 +497,13 @@ void pause() throws InterruptedException {
484497
* @throws javax.jms.JMSException if the thread is interrupted
485498
*/
486499
void resume() throws JMSException {
487-
this.abortables.start(); // async listener restarted
500+
try {
501+
this.abortables.start(); // async listener restarted
502+
} catch (JMSException e) {
503+
throw e;
504+
} catch (Exception e) {
505+
throw new RMQJMSException(e);
506+
}
488507
this.receiveManager.openGate(); // sync listener allowed to run
489508
}
490509

src/main/java/com/rabbitmq/jms/client/RMQSession.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,9 +1182,14 @@ void pause() throws JMSException {
11821182
for (RMQMessageConsumer consumer : this.consumers) {
11831183
try {
11841184
consumer.pause();
1185+
} catch (JMSException e) {
1186+
throw e;
11851187
} catch (InterruptedException x) {
11861188
logger.error("Consumer({}) pause interrupted", consumer, x);
11871189
throw new RMQJMSException(x);
1190+
} catch (Exception x) {
1191+
logger.error("Error while pausing consumer({})", consumer, x);
1192+
throw new RMQJMSException(x);
11881193
}
11891194
}
11901195
}

src/main/java/com/rabbitmq/jms/util/Abortable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,24 @@ public interface Abortable {
99
* Cause any other implementing threads to terminate fairly quickly, signalling abnormal termination
1010
* to its instigator, if necessary.
1111
* <p>
12-
* Implementations of this method must be thread-safe, and throw no exceptions, except possibly {@link RuntimeException}s.
12+
* Implementations of this method must be thread-safe.
1313
* </p>
1414
*/
15-
void abort();
15+
void abort() throws Exception;
1616

1717
/**
1818
* Cause any other implementing threads to stop temporarily.
1919
* <p>
20-
* Implementations of this method must be thread-safe, and throw no exceptions, except possibly {@link RuntimeException}s.
20+
* Implementations of this method must be thread-safe.
2121
* </p>
2222
*/
23-
void stop();
23+
void stop() throws Exception;
2424

2525
/**
2626
* Cause any other stopped threads to start.
2727
* <p>
28-
* Implementations of this method must be thread-safe, and throw no exceptions, except possibly {@link RuntimeException}s.
28+
* Implementations of this method must be thread-safe.
2929
* </p>
3030
*/
31-
void start();
31+
void start() throws Exception;
3232
}

src/main/java/com/rabbitmq/jms/util/AbortableHolder.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ public class AbortableHolder implements Abortable {
1212
private final boolean[] flags = new boolean[] { false, false, false }; // to prevent infinite regress
1313

1414
private enum Action {
15-
ABORT(0) { @Override void doit(Abortable a) { a.abort(); } },
16-
START(1) { @Override void doit(Abortable a) { a.start(); } },
17-
STOP(2) { @Override void doit(Abortable a) { a.stop(); } };
15+
ABORT(0) { @Override void doit(Abortable a) throws Exception { a.abort(); } },
16+
START(1) { @Override void doit(Abortable a) throws Exception { a.start(); } },
17+
STOP(2) { @Override void doit(Abortable a) throws Exception { a.stop(); } };
1818
private final int ind;
1919

2020
Action(int ind) { this.ind = ind; }
2121

2222
int index() { return this.ind; }
2323

24-
abstract void doit(Abortable a);
24+
abstract void doit(Abortable a) throws Exception;
2525
};
2626

2727
/**
@@ -38,13 +38,13 @@ public void remove(Abortable a) {
3838
this.abortableQueue.remove(a);
3939
}
4040

41-
public void abort() { act(Action.ABORT); }
41+
public void abort() throws Exception { act(Action.ABORT); }
4242

43-
public void start() { act(Action.START); }
43+
public void start() throws Exception { act(Action.START); }
4444

45-
public void stop() { act(Action.STOP); }
45+
public void stop() throws Exception { act(Action.STOP); }
4646

47-
private void act(AbortableHolder.Action action) {
47+
private void act(AbortableHolder.Action action) throws Exception {
4848
if (this.flags[action.index()]) return; // prevent infinite
4949
this.flags[action.index()] = true; // regress
5050

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
2+
3+
package com.rabbitmq.integration.tests;
4+
5+
import com.rabbitmq.jms.admin.RMQDestination;
6+
import org.junit.jupiter.api.AfterEach;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
10+
import javax.jms.JMSException;
11+
import javax.jms.MessageConsumer;
12+
import javax.jms.QueueConnection;
13+
import javax.jms.QueueConnectionFactory;
14+
import javax.jms.Session;
15+
16+
import static org.junit.jupiter.api.Assertions.assertThrows;
17+
18+
/**
19+
* To make sure an exception is thrown when starting listening on
20+
* a non-existing destination.
21+
*/
22+
public class ExceptionHandlingIT {
23+
24+
QueueConnectionFactory connFactory;
25+
QueueConnection queueConn;
26+
27+
@BeforeEach
28+
public void beforeTests() throws Exception {
29+
this.connFactory = (QueueConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory()
30+
.getConnectionFactory();
31+
this.queueConn = connFactory.createQueueConnection();
32+
}
33+
34+
@AfterEach
35+
public void afterTests() throws Exception {
36+
if (queueConn != null)
37+
queueConn.close();
38+
}
39+
40+
@Test
41+
public void consumerOnNonExistingDestination() throws Exception {
42+
queueConn.start();
43+
String queueName = ExceptionHandlingIT.class.getSimpleName() + " " + System.currentTimeMillis();
44+
RMQDestination queue = new RMQDestination(queueName, null, null, queueName);
45+
Session session = queueConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
46+
MessageConsumer messageConsumer = session.createConsumer(queue);
47+
48+
assertThrows(JMSException.class, () -> messageConsumer.setMessageListener(msg -> {
49+
}));
50+
}
51+
}

0 commit comments

Comments
 (0)