Skip to content

Commit d6c6d45

Browse files
committed
Add callback before receiving message
This commit adds a callback on message consumption, after the AMQP message is converted into a JMS message, but before it is dispatched to application code. This is used internally to ask to not declare the reply-to queue of a received message. A previous commit would make this change every time, but this broker the compliance test suite. So this is now an opt-in, useful for the server-side of RPC use cases, when we don't want to re-create the temporary reply-to destination. References #69 (cherry picked from commit d449114) Conflicts: src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java src/main/java/com/rabbitmq/jms/client/RMQSession.java src/test/java/com/rabbitmq/integration/tests/RpcIT.java src/test/java/com/rabbitmq/integration/tests/RpcSpringJmsIT.java
1 parent 3830fd8 commit d6c6d45

16 files changed

+353
-52
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44
import com.rabbitmq.client.Address;
55
import com.rabbitmq.client.MetricsCollector;
66
import com.rabbitmq.client.NoOpMetricsCollector;
7+
import com.rabbitmq.jms.client.AbstractReceivingContextConsumer;
78
import com.rabbitmq.jms.client.AmqpConnectionFactoryPostProcessor;
89
import com.rabbitmq.jms.client.AmqpPropertiesCustomiser;
910
import com.rabbitmq.jms.client.ConnectionParams;
1011
import com.rabbitmq.jms.client.RMQConnection;
12+
import com.rabbitmq.jms.client.RMQMessage;
13+
import com.rabbitmq.jms.client.ReceivingContext;
14+
import com.rabbitmq.jms.client.ReceivingContextConsumer;
1115
import com.rabbitmq.jms.client.SendingContext;
1216
import com.rabbitmq.jms.client.SendingContextConsumer;
1317
import com.rabbitmq.jms.util.RMQJMSException;
@@ -142,6 +146,16 @@ public void postProcess(com.rabbitmq.client.ConnectionFactory connectionFactory)
142146
public void accept(SendingContext ctx) { }
143147
};
144148

149+
/**
150+
* Callback before receiving a message.
151+
*
152+
* @since 1.11.0
153+
*/
154+
private ReceivingContextConsumer receivingContextConsumer = new ReceivingContextConsumer() {
155+
@Override
156+
public void accept(ReceivingContext ctx) { }
157+
};
158+
145159
/** Default not to use ssl */
146160
private boolean ssl = false;
147161
private String tlsProtocol;
@@ -191,6 +205,13 @@ public void accept(SendingContext ctx) { }
191205
*/
192206
private List<URI> uris = new ArrayList<URI>();
193207

208+
/**
209+
* Whether <code>replyTo</code> destination for consumed messages should be declared.
210+
*
211+
* @since 1.11.0
212+
*/
213+
private boolean declareReplyToDestination = true;
214+
194215
/**
195216
* {@inheritDoc}
196217
*/
@@ -262,6 +283,22 @@ protected Connection createConnection(String username, String password, Connecti
262283
}
263284
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(cf, connectionCreator);
264285

286+
ReceivingContextConsumer rcc;
287+
if (this.declareReplyToDestination) {
288+
rcc = this.receivingContextConsumer;
289+
} else {
290+
rcc = new AbstractReceivingContextConsumer() {
291+
292+
@Override
293+
public void accept(ReceivingContext ctx) throws JMSException {
294+
RMQMessage.doNotDeclareReplyToDestination(ctx.getMessage());
295+
}
296+
};
297+
if (this.receivingContextConsumer != null) {
298+
rcc = ((AbstractReceivingContextConsumer) rcc).andThen(this.receivingContextConsumer);
299+
}
300+
}
301+
265302
RMQConnection conn = new RMQConnection(new ConnectionParams()
266303
.setRabbitConnection(rabbitConnection)
267304
.setTerminationTimeout(getTerminationTimeout())
@@ -274,6 +311,7 @@ protected Connection createConnection(String username, String password, Connecti
274311
.setAmqpPropertiesCustomiser(amqpPropertiesCustomiser)
275312
.setThrowExceptionOnConsumerStartFailure(this.throwExceptionOnConsumerStartFailure)
276313
.setSendingContextConsumer(sendingContextConsumer)
314+
.setReceivingContextConsumer(rcc)
277315
);
278316
conn.setTrustedPackages(this.trustedPackages);
279317
logger.debug("Connection {} created.", conn);
@@ -941,5 +979,35 @@ private interface ConnectionCreator {
941979
public void setSendingContextConsumer(SendingContextConsumer sendingContextConsumer) {
942980
this.sendingContextConsumer = sendingContextConsumer;
943981
}
982+
983+
/**
984+
* Set callback called before dispatching a received message to application code.
985+
* Can be used to customize messages before they handed
986+
* over application.
987+
*
988+
* @param receivingContextConsumer
989+
* @see ReceivingContextConsumer
990+
* @since 1.11.0
991+
*/
992+
public void setReceivingContextConsumer(ReceivingContextConsumer receivingContextConsumer) {
993+
this.receivingContextConsumer = receivingContextConsumer;
994+
}
995+
996+
/**
997+
* Whether <code>replyTo</code> destination for consumed messages should be declared.
998+
* <p>
999+
* Default is <code>true</code>. Set this value to <code>false</code> for the
1000+
* <b>server-side of RPC</b>, this avoids creating a temporary reply-to destination on
1001+
* both client and server, leading to an error.
1002+
* <p>
1003+
* This is implemented as {@link ReceivingContextConsumer}.
1004+
*
1005+
* @see RMQConnectionFactory#setReceivingContextConsumer(ReceivingContextConsumer)
1006+
* @since 1.11.0
1007+
*/
1008+
public void setDeclareReplyToDestination(boolean declareReplyToDestination) {
1009+
this.declareReplyToDestination = declareReplyToDestination;
1010+
}
1011+
9441012
}
9451013

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.rabbitmq.jms.client;
2+
3+
import javax.jms.JMSException;
4+
5+
/**
6+
* Composeable {@link ReceivingContextConsumer}.
7+
*
8+
* @see ReceivingContextConsumer
9+
* @since 1.11.0
10+
*/
11+
public abstract class AbstractReceivingContextConsumer implements ReceivingContextConsumer {
12+
13+
/**
14+
* Returns a composed {@code AbstractReceivingContextConsumer} that performs, in sequence, this
15+
* operation followed by the {@code after} operation. If performing either
16+
* operation throws an exception, it is relayed to the caller of the
17+
* composed operation. If performing this operation throws an exception,
18+
* the {@code after} operation will not be performed.
19+
*
20+
* @param after the operation to perform after this operation
21+
* @return a composed {@code ReceivingContextConsumer} that performs in sequence this
22+
* operation followed by the {@code after} operation
23+
* @throws NullPointerException if {@code after} is null
24+
*/
25+
public AbstractReceivingContextConsumer andThen(final ReceivingContextConsumer after) {
26+
if (after == null) {
27+
throw new NullPointerException();
28+
}
29+
return new AbstractReceivingContextConsumer() {
30+
31+
@Override
32+
public void accept(ReceivingContext ctx) throws JMSException {
33+
AbstractReceivingContextConsumer.this.accept(ctx);
34+
after.accept(ctx);
35+
}
36+
};
37+
}
38+
}

src/main/java/com/rabbitmq/jms/client/BrowsingConsumer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2014 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2014-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -25,13 +25,17 @@ class BrowsingConsumer extends DefaultConsumer {
2525
private final RMQSession session;
2626
private final RMQDestination dest;
2727

28-
public BrowsingConsumer(Channel channel, RMQSession session, RMQDestination dest, int messagesExpected, java.util.Queue<RMQMessage> msgQueue, SqlEvaluator evaluator) {
28+
private final ReceivingContextConsumer receivingContextConsumer;
29+
30+
public BrowsingConsumer(Channel channel, RMQSession session, RMQDestination dest, int messagesExpected, java.util.Queue<RMQMessage> msgQueue, SqlEvaluator evaluator,
31+
ReceivingContextConsumer receivingContextConsumer) {
2932
super(channel);
3033
this.messagesExpected = messagesExpected;
3134
this.msgQueue = msgQueue;
3235
this.evaluator = evaluator;
3336
this.session = session;
3437
this.dest = dest;
38+
this.receivingContextConsumer = receivingContextConsumer;
3539
}
3640

3741
public boolean finishesInTime(int browsingConsumerTimeout) {
@@ -61,7 +65,8 @@ public void handleDelivery(String consumerTag,
6165
throws IOException {
6266
if (this.messagesExpected==0) return;
6367
try {
64-
RMQMessage msg = RMQMessage.convertMessage(this.session, this.dest, new GetResponse(envelope, properties, body, --this.messagesExpected));
68+
RMQMessage msg = RMQMessage.convertMessage(this.session, this.dest,
69+
new GetResponse(envelope, properties, body, --this.messagesExpected), this.receivingContextConsumer);
6570
if (evaluator==null || evaluator.evaluate(msg.toHeaders()))
6671
this.msgQueue.add(msg);
6772
} catch (JMSException e) {

src/main/java/com/rabbitmq/jms/client/BrowsingMessageEnumeration.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2014 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2014-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.util.Enumeration;
@@ -14,20 +14,23 @@ class BrowsingMessageEnumeration implements Enumeration<RMQMessage> {
1414
private static final int BROWSING_CONSUMER_TIMEOUT = 10000; // ms
1515
private final java.util.Queue<RMQMessage> msgQueue;
1616

17-
public BrowsingMessageEnumeration(RMQSession session, RMQDestination dest, Channel channel, SqlEvaluator evaluator, int readMax) {
17+
public BrowsingMessageEnumeration(RMQSession session, RMQDestination dest, Channel channel, SqlEvaluator evaluator, int readMax,
18+
ReceivingContextConsumer receivingContextConsumer) {
1819
java.util.Queue<RMQMessage> msgQ = new ConcurrentLinkedQueue<RMQMessage>();
19-
populateQueue(msgQ, channel, session, dest, evaluator, readMax);
20+
populateQueue(msgQ, channel, session, dest, evaluator, readMax, receivingContextConsumer);
2021
this.msgQueue = msgQ;
2122
}
2223

23-
private static void populateQueue(java.util.Queue<RMQMessage> msgQueue, Channel channel, RMQSession session, RMQDestination dest, SqlEvaluator evaluator, int readMax) {
24+
private static void populateQueue(java.util.Queue<RMQMessage> msgQueue, Channel channel, RMQSession session, RMQDestination dest, SqlEvaluator evaluator,
25+
int readMax, ReceivingContextConsumer receivingContextConsumer) {
2426
try {
2527
String destQueueName = dest.getQueueName();
2628
int qCount = getNumberOfMessages(channel, destQueueName);
2729
if (qCount > 0) { // we need to read them
2830
int messagesExpected = (readMax<=0) ? qCount : Math.min(readMax, qCount);
2931
channel.basicQos(messagesExpected); // limit subsequent consumers to the expected number of messages unacknowledged
30-
BrowsingConsumer bc = new BrowsingConsumer(channel, session, dest, messagesExpected, msgQueue, evaluator);
32+
BrowsingConsumer bc = new BrowsingConsumer(channel, session, dest, messagesExpected, msgQueue,
33+
evaluator, receivingContextConsumer);
3134
String consumerTag = channel.basicConsume(destQueueName, bc);
3235
if (bc.finishesInTime(BROWSING_CONSUMER_TIMEOUT))
3336
return;

src/main/java/com/rabbitmq/jms/client/BrowsingMessageQueue.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2014 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2014-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.util.Enumeration;
@@ -23,13 +23,16 @@ class BrowsingMessageQueue implements QueueBrowser {
2323
private final SqlEvaluator evaluator;
2424
private final RMQSession session;
2525
private final int queueBrowserReadMax;
26+
private final ReceivingContextConsumer receivingContextConsumer;
2627

27-
public BrowsingMessageQueue(RMQSession session, RMQDestination dest, String selector, int queueBrowserReadMax) throws JMSException {
28+
public BrowsingMessageQueue(RMQSession session, RMQDestination dest, String selector,
29+
int queueBrowserReadMax, ReceivingContextConsumer receivingContextConsumer) throws JMSException {
2830
this.dest = dest;
2931
this.selector = selector;
3032
this.session = session;
3133
this.evaluator = setEvaluator(selector);
3234
this.queueBrowserReadMax = queueBrowserReadMax;
35+
this.receivingContextConsumer = receivingContextConsumer;
3336
}
3437

3538
private static final SqlEvaluator setEvaluator(String selector) throws JMSException {
@@ -53,7 +56,8 @@ public String getMessageSelector() throws JMSException {
5356
@SuppressWarnings("rawtypes")
5457
public Enumeration getEnumeration() throws JMSException {
5558
Channel chan = this.session.getBrowsingChannel();
56-
Enumeration e = new BrowsingMessageEnumeration(this.session, this.dest, chan, this.evaluator, this.queueBrowserReadMax);
59+
Enumeration e = new BrowsingMessageEnumeration(this.session, this.dest, chan, this.evaluator,
60+
this.queueBrowserReadMax, this.receivingContextConsumer);
5761
session.closeBrowsingChannel(chan); // this should requeue all the messages browsed
5862
return e;
5963
}

src/main/java/com/rabbitmq/jms/client/ConnectionParams.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ public class ConnectionParams {
7878
*/
7979
private SendingContextConsumer sendingContextConsumer;
8080

81+
/**
82+
* Callback before receiving a message.
83+
*
84+
* @since 1.11.0
85+
*/
86+
private ReceivingContextConsumer receivingContextConsumer;
87+
8188
public Connection getRabbitConnection() {
8289
return rabbitConnection;
8390
}
@@ -176,4 +183,13 @@ public ConnectionParams setSendingContextConsumer(SendingContextConsumer sending
176183
this.sendingContextConsumer = sendingContextConsumer;
177184
return this;
178185
}
186+
187+
public ReceivingContextConsumer getReceivingContextConsumer() {
188+
return receivingContextConsumer;
189+
}
190+
191+
public ConnectionParams setReceivingContextConsumer(ReceivingContextConsumer receivingContextConsumer) {
192+
this.receivingContextConsumer = receivingContextConsumer;
193+
return this;
194+
}
179195
}

src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class MessageListenerConsumer implements Consumer, Abortable {
4949
*/
5050
private final boolean skipAck;
5151

52+
private final ReceivingContextConsumer receivingContextConsumer;
53+
5254
/**
5355
* Constructor
5456
* @param messageConsumer to which this Rabbit Consumer belongs
@@ -57,7 +59,7 @@ class MessageListenerConsumer implements Consumer, Abortable {
5759
* @param terminationTimeout wait time (in nanoseconds) for cancel to take effect
5860
*/
5961
public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel channel, MessageListener messageListener, long terminationTimeout,
60-
boolean requeueOnMessageListenerException, boolean throwExceptionOnStartFailure) {
62+
boolean requeueOnMessageListenerException, boolean throwExceptionOnStartFailure, ReceivingContextConsumer receivingContextConsumer) {
6163
this.messageConsumer = messageConsumer;
6264
this.channel = channel;
6365
this.messageListener = messageListener;
@@ -68,6 +70,7 @@ public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel chann
6870
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
6971
this.throwExceptionOnStartFailure = throwExceptionOnStartFailure;
7072
this.skipAck = messageConsumer.amqpAutoAck();
73+
this.receivingContextConsumer = receivingContextConsumer;
7174
}
7275

7376
private String getConsTag() {
@@ -128,7 +131,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
128131
// requeuing in case of RuntimeException from the listener
129132
// see https://github.com/rabbitmq/rabbitmq-jms-client/issues/23
130133
// see section 4.5.2 of JMS 1.1 specification
131-
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
134+
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(),
135+
response, this.receivingContextConsumer);
132136
boolean runtimeExceptionInListener = false;
133137
try {
134138
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
@@ -147,7 +151,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
147151
} else {
148152
// this is the "historical" behavior, not compliant with the spec
149153
dealWithAcknowledgments(dtag);
150-
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
154+
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(),
155+
response, this.receivingContextConsumer);
151156
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
152157
}
153158
} else {

src/main/java/com/rabbitmq/jms/client/RMQConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
137137
*/
138138
private final SendingContextConsumer sendingContextConsumer;
139139

140+
private final ReceivingContextConsumer receivingContextConsumer;
141+
140142
/**
141143
* Classes in these packages can be transferred via ObjectMessage.
142144
*
@@ -163,6 +165,7 @@ public RMQConnection(ConnectionParams connectionParams) {
163165
this.amqpPropertiesCustomiser = connectionParams.getAmqpPropertiesCustomiser();
164166
this.throwExceptionOnConsumerStartFailure = connectionParams.willThrowExceptionOnConsumerStartFailure();
165167
this.sendingContextConsumer = connectionParams.getSendingContextConsumer();
168+
this.receivingContextConsumer = connectionParams.getReceivingContextConsumer();
166169
}
167170

168171
/**
@@ -214,6 +217,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
214217
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
215218
.setThrowExceptionOnConsumerStartFailure(this.throwExceptionOnConsumerStartFailure)
216219
.setSendingContextConsumer(this.sendingContextConsumer)
220+
.setReceivingContextConsumer(this.receivingContextConsumer)
217221
);
218222
session.setTrustedPackages(this.trustedPackages);
219223
this.sessions.add(session);

0 commit comments

Comments
 (0)