Skip to content

Commit 8d73831

Browse files
committed
Handle request-reply with RabbitMQ direct reply-to
This commit provides support for RabbitMQ direct reply-to when performing RPC operations. The client just needs to set up the JMSReplyTo header with a RMQDestination named "amq.rabbitmq.reply-to". This provides better performance than using a temporary queue, but it comes with 2 limitations: * the reply-to destination is an JMS "AMQP" destination, so only ByteMessages and TextMessages are supported. * the client needs to create a MessageConsumer and register a MessageListener to get the response, as direct reply-to doesn't work with basic.get. Fixes #48 (cherry picked from commit b1d7b78) Conflicts: src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java
1 parent 4441d6a commit 8d73831

File tree

5 files changed

+327
-23
lines changed

5 files changed

+327
-23
lines changed

src/main/java/com/rabbitmq/jms/client/MessageListenerConsumer.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ class MessageListenerConsumer implements Consumer, Abortable {
4242
private final boolean requeueOnMessageListenerException;
4343
private final boolean throwExceptionOnStartFailure;
4444

45+
/**
46+
* True when AMQP auto-ack is true as well. Happens
47+
* only when the consumer listens on direct reply to
48+
* pseudo-queue. No ack or nack operation is then performed.
49+
*/
50+
private final boolean skipAck;
51+
4552
/**
4653
* Constructor
4754
* @param messageConsumer to which this Rabbit Consumer belongs
@@ -60,6 +67,7 @@ public MessageListenerConsumer(RMQMessageConsumer messageConsumer, Channel chann
6067
this.rejecting = this.messageConsumer.getSession().getConnection().isStopped();
6168
this.requeueOnMessageListenerException = requeueOnMessageListenerException;
6269
this.throwExceptionOnStartFailure = throwExceptionOnStartFailure;
70+
this.skipAck = messageConsumer.amqpAutoAck();
6371
}
6472

6573
private String getConsTag() {
@@ -108,7 +116,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
108116
if (this.rejecting) {
109117
long dtag = envelope.getDeliveryTag();
110118
logger.debug("basicNack: dtag='{}'", dtag);
111-
this.messageConsumer.getSession().explicitNack(dtag);
119+
nack(dtag);
112120
return;
113121
}
114122
/* Wrap the incoming message in a GetResponse */
@@ -127,25 +135,25 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
127135
} catch(RMQMessageListenerExecutionJMSException e) {
128136
if (e.getCause() instanceof RuntimeException) {
129137
runtimeExceptionInListener = true;
130-
this.messageConsumer.getSession().explicitNack(dtag);
138+
nack(dtag);
131139
this.abort();
132140
} else {
133141
throw e;
134142
}
135143
}
136144
if (!runtimeExceptionInListener) {
137-
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
145+
dealWithAcknowledgments(dtag);
138146
}
139147
} else {
140148
// this is the "historical" behavior, not compliant with the spec
141-
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
149+
dealWithAcknowledgments(dtag);
142150
RMQMessage msg = RMQMessage.convertMessage(this.messageConsumer.getSession(), this.messageConsumer.getDestination(), response);
143151
this.messageConsumer.getSession().deliverMessage(msg, this.messageListener);
144152
}
145153
} else {
146154
// We are unable to deliver the message, nack it
147155
logger.debug("basicNack: dtag='{}' (null MessageListener)", dtag);
148-
this.messageConsumer.getSession().explicitNack(dtag);
156+
nack(dtag);
149157
}
150158
} catch (JMSException x) {
151159
logger.error("Error while delivering message", x);
@@ -156,6 +164,18 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
156164
}
157165
}
158166

167+
private void nack(long dtag) {
168+
if (!skipAck) {
169+
this.messageConsumer.getSession().explicitNack(dtag);
170+
}
171+
}
172+
173+
private void dealWithAcknowledgments(long dtag) {
174+
if (!skipAck) {
175+
this.messageConsumer.dealWithAcknowledgements(this.autoAck, dtag);
176+
}
177+
}
178+
159179
/**
160180
* {@inheritDoc}
161181
*/

src/main/java/com/rabbitmq/jms/client/RMQMessage.java

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,50 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import com.rabbitmq.client.BasicProperties;
55
import com.rabbitmq.client.Channel;
66
import com.rabbitmq.client.Consumer;
77
import com.rabbitmq.client.GetResponse;
88
import com.rabbitmq.jms.admin.RMQDestination;
9-
import com.rabbitmq.jms.client.message.*;
10-
import com.rabbitmq.jms.util.*;
9+
import com.rabbitmq.jms.client.message.RMQBytesMessage;
10+
import com.rabbitmq.jms.client.message.RMQMapMessage;
11+
import com.rabbitmq.jms.client.message.RMQObjectMessage;
12+
import com.rabbitmq.jms.client.message.RMQStreamMessage;
13+
import com.rabbitmq.jms.client.message.RMQTextMessage;
14+
import com.rabbitmq.jms.util.HexDisplay;
15+
import com.rabbitmq.jms.util.IteratorEnum;
16+
import com.rabbitmq.jms.util.RMQJMSException;
17+
import com.rabbitmq.jms.util.Util;
18+
import com.rabbitmq.jms.util.WhiteListObjectInputStream;
1119
import org.slf4j.Logger;
1220
import org.slf4j.LoggerFactory;
1321

14-
import javax.jms.*;
15-
import java.io.*;
22+
import javax.jms.BytesMessage;
23+
import javax.jms.DeliveryMode;
24+
import javax.jms.Destination;
25+
import javax.jms.JMSException;
26+
import javax.jms.MapMessage;
27+
import javax.jms.Message;
28+
import javax.jms.MessageFormatException;
29+
import javax.jms.MessageNotWriteableException;
30+
import javax.jms.ObjectMessage;
31+
import javax.jms.StreamMessage;
32+
import javax.jms.TextMessage;
33+
import java.io.ByteArrayInputStream;
34+
import java.io.ByteArrayOutputStream;
35+
import java.io.IOException;
36+
import java.io.ObjectInput;
37+
import java.io.ObjectOutput;
38+
import java.io.ObjectOutputStream;
39+
import java.io.Serializable;
1640
import java.lang.reflect.Constructor;
1741
import java.lang.reflect.InvocationTargetException;
1842
import java.nio.charset.Charset;
19-
import java.util.*;
43+
import java.util.Date;
44+
import java.util.Enumeration;
45+
import java.util.HashMap;
46+
import java.util.List;
47+
import java.util.Map;
2048
import java.util.Map.Entry;
2149

2250
/**
@@ -26,6 +54,8 @@ public abstract class RMQMessage implements Message, Cloneable {
2654
/** Logger shared with derived classes */
2755
protected final Logger logger = LoggerFactory.getLogger(RMQMessage.class);
2856

57+
private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";
58+
2959
protected void loggerDebugByteArray(String format, byte[] buffer, Object arg) {
3060
if (logger.isDebugEnabled()) {
3161
StringBuilder bufferOutput = new StringBuilder("Byte array, length ").append(buffer.length).append(" :\n");
@@ -876,6 +906,9 @@ static RMQMessage convertJmsMessage(RMQSession session, RMQDestination dest, Get
876906
// message.setJMSDestination(dest); // DO NOT set the destination bug#57214768
877907
// JMSProperties already set
878908
message.setReadonly(true); // Set readOnly - mandatory for received messages
909+
910+
maybeSetupDirectReplyTo(message, response.getProps().getReplyTo());
911+
879912
return message;
880913
}
881914

@@ -892,12 +925,36 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination
892925
message.setJMSDestination(dest); // We cannot know the original destination, so set local one
893926
message.setJMSPropertiesFromAmqpProperties(props);
894927
message.setReadonly(true); // Set readOnly - mandatory for received messages
928+
929+
maybeSetupDirectReplyTo(message, response.getProps().getReplyTo());
930+
895931
return message;
896932
} catch (IOException x) {
897933
throw new RMQJMSException(x);
898934
}
899935
}
900936

937+
/**
938+
* Properly assign JMSReplyTo header when using direct reply to.
939+
* <p>
940+
* On a received request message, the AMQP reply-to property is
941+
* set to a specific <code>amq.rabbitmq.reply-to.ID</code> value.
942+
* We must use this value for the JMS reply to destination if
943+
* we want to send the response back to the destination the sender
944+
* is waiting.
945+
*
946+
* @param message
947+
* @param replyTo
948+
* @throws JMSException
949+
* @since 1.11.0
950+
*/
951+
private static void maybeSetupDirectReplyTo(RMQMessage message, String replyTo) throws JMSException {
952+
if (replyTo != null && replyTo.startsWith(DIRECT_REPLY_TO)) {
953+
RMQDestination replyToDestination = new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo);
954+
message.setJMSReplyTo(replyToDestination);
955+
}
956+
}
957+
901958
/**
902959
* Generate the headers for an AMQP message.
903960
* <p>

src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
public class RMQMessageConsumer implements MessageConsumer, QueueReceiver, TopicSubscriber {
4545
private final Logger logger = LoggerFactory.getLogger(RMQMessageConsumer.class);
4646

47+
private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";
48+
4749
private static final int DEFAULT_BATCHING_SIZE = 5;
4850
private static final long STOP_TIMEOUT_MS = 1000; // ONE SECOND
4951
/** The destination that this consumer belongs to */
@@ -289,8 +291,9 @@ void basicConsume(Consumer consumer, String consTag) throws IOException {
289291
logger.debug("consuming from queue '{}' with tag '{}'", name, consTag);
290292
getSession().getChannel()
291293
.basicConsume(name, /* the name of the queue */
292-
false, /* autoack is ALWAYS false, otherwise we risk acking messages that are received
293-
* to the client but the client listener(onMessage) has not yet been invoked */
294+
amqpAutoAck(), /* autoack is true only when listening on direct-reply-to, otherwise
295+
* autoack is ALWAYS false, since we risk acking messages that are received
296+
* to the client but the client listener(onMessage) has not yet been invoked with autoack = true */
294297
consTag, /* the consumer tag to use */
295298
this.noLocal, /* RabbitMQ accepts but does not support noLocal=true for subscriptions */
296299
false, /* exclusive will always be false: exclusive consumer access true means only this
@@ -553,4 +556,19 @@ GetResponse getFromRabbitQueue() {
553556
}
554557
return null;
555558
}
559+
560+
/**
561+
* Whether the underlying AMQP consumer uses auto-ack or not.
562+
*
563+
* Auto-ack is enabled only for when consuming on direct reply to.
564+
*
565+
* @return
566+
*/
567+
protected boolean amqpAutoAck() {
568+
return isDirectReplyTo();
569+
}
570+
571+
private boolean isDirectReplyTo() {
572+
return this.destination.isAmqp() && DIRECT_REPLY_TO.equals(this.destination.getDestinationName());
573+
}
556574
}

src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

4-
import java.io.IOException;
4+
import com.rabbitmq.client.AMQP;
5+
import com.rabbitmq.jms.admin.RMQDestination;
6+
import com.rabbitmq.jms.client.message.RMQBytesMessage;
7+
import com.rabbitmq.jms.client.message.RMQTextMessage;
8+
import com.rabbitmq.jms.util.RMQJMSException;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
511

612
import javax.jms.Destination;
713
import javax.jms.InvalidDestinationException;
@@ -12,15 +18,7 @@
1218
import javax.jms.QueueSender;
1319
import javax.jms.Topic;
1420
import javax.jms.TopicPublisher;
15-
16-
import org.slf4j.Logger;
17-
import org.slf4j.LoggerFactory;
18-
19-
import com.rabbitmq.client.AMQP;
20-
import com.rabbitmq.jms.admin.RMQDestination;
21-
import com.rabbitmq.jms.client.message.RMQBytesMessage;
22-
import com.rabbitmq.jms.client.message.RMQTextMessage;
23-
import com.rabbitmq.jms.util.RMQJMSException;
21+
import java.io.IOException;
2422

2523
import static com.rabbitmq.jms.client.RMQMessage.JMS_MESSAGE_DELIVERY_MODE;
2624
import static com.rabbitmq.jms.client.RMQMessage.JMS_MESSAGE_EXPIRATION;
@@ -33,6 +31,8 @@ public class RMQMessageProducer implements MessageProducer, QueueSender, TopicPu
3331

3432
private final Logger logger = LoggerFactory.getLogger(RMQMessageProducer.class);
3533

34+
private static final String DIRECT_REPLY_TO = "amq.rabbitmq.reply-to";
35+
3636
/**
3737
* The destination that we send our message to
3838
*/
@@ -318,6 +318,8 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, int del
318318

319319
bob = amqpPropertiesCustomiser.customise(bob, msg);
320320

321+
maybeSetReplyToPropertyToDirectReplyTo(bob, msg);
322+
321323
byte[] data = msg.toAmqpByteArray();
322324

323325
this.session.getChannel().basicPublish(destination.getAmqpExchangeName(), destination.getAmqpRoutingKey(), bob.build(), data);
@@ -341,6 +343,8 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, int de
341343
bob.expiration(rmqExpiration(timeToLive));
342344
bob.headers(msg.toHeaders());
343345

346+
maybeSetReplyToPropertyToDirectReplyTo(bob, msg);
347+
344348
byte[] data = msg.toByteArray();
345349

346350
this.session.getChannel().basicPublish(destination.getAmqpExchangeName(), destination.getAmqpRoutingKey(), bob.build(), data);
@@ -349,6 +353,29 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, int de
349353
}
350354
}
351355

356+
/**
357+
* Set AMQP reply-to property to direct-reply-to if necessary.
358+
* <p>
359+
* Set the <code>reply-to</code> property to <code>amq.rabbitmq.reply-to</code>
360+
* if the <code>JMSReplyTo</code> header is set to a destination with that
361+
* name.
362+
* <p>
363+
* For outbound RPC request.
364+
*
365+
* @param builder
366+
* @param msg
367+
* @throws JMSException
368+
* @since 1.11.0
369+
*/
370+
private static void maybeSetReplyToPropertyToDirectReplyTo(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException {
371+
if (msg.getJMSReplyTo() != null && msg.getJMSReplyTo() instanceof RMQDestination) {
372+
RMQDestination replyTo = (RMQDestination) msg.getJMSReplyTo();
373+
if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName())) {
374+
builder.replyTo(DIRECT_REPLY_TO);
375+
}
376+
}
377+
}
378+
352379
/** This is dictated by `erlang:send_after' on which rabbitmq depends to implement TTL:
353380
* <br/><code>-define(MAX_EXPIRY_TIMER, 4294967295)</code>.
354381
*/

0 commit comments

Comments
 (0)