Skip to content

Commit 3830fd8

Browse files
committed
Do not declare reply-to destination for consumed messages
This commit instructs to not declare a reply-to RMQDestination for consumed messages. This avoids trying to create a second time a temporary reply-to destination (reply-to destinations are supposed to be created by a RPC client before sending the request). This happens to also fix #47. Fixes #69 (cherry picked from commit 09282e4) Conflicts: src/test/java/com/rabbitmq/integration/tests/RpcIT.java src/test/java/com/rabbitmq/integration/tests/RpcWithAmqpDirectReplyIT.java
1 parent 05992b8 commit 3830fd8

File tree

4 files changed

+102
-40
lines changed

4 files changed

+102
-40
lines changed

src/main/java/com/rabbitmq/jms/client/RMQMessage.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,7 @@ static RMQMessage convertJmsMessage(RMQSession session, RMQDestination dest, Get
908908
message.setReadonly(true); // Set readOnly - mandatory for received messages
909909

910910
maybeSetupDirectReplyTo(message, response.getProps().getReplyTo());
911+
doNotDeclareReplyToDestination(message);
911912

912913
return message;
913914
}
@@ -927,6 +928,7 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination
927928
message.setReadonly(true); // Set readOnly - mandatory for received messages
928929

929930
maybeSetupDirectReplyTo(message, response.getProps().getReplyTo());
931+
doNotDeclareReplyToDestination(message);
930932

931933
return message;
932934
} catch (IOException x) {
@@ -955,6 +957,23 @@ private static void maybeSetupDirectReplyTo(RMQMessage message, String replyTo)
955957
}
956958
}
957959

960+
/**
961+
* Indicates to not declare a reply-to {@link RMQDestination}.
962+
* <p>
963+
* This is used for inbound messages, when the reply-to destination
964+
* is supposed to be already created. This avoids trying to create
965+
* a second time a temporary queue.
966+
*
967+
* @param message
968+
* @throws JMSException
969+
* @since 1.11.0
970+
*/
971+
private static void doNotDeclareReplyToDestination(RMQMessage message) throws JMSException {
972+
if (message.getJMSReplyTo() != null && message.getJMSReplyTo() instanceof RMQDestination) {
973+
((RMQDestination) message.getJMSReplyTo()).setDeclared(true);
974+
}
975+
}
976+
958977
/**
959978
* Generate the headers for an AMQP message.
960979
* <p>

src/test/java/com/rabbitmq/integration/tests/RpcIT.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import static org.hamcrest.Matchers.instanceOf;
3232
import static org.hamcrest.Matchers.is;
3333
import static org.junit.Assert.assertNotNull;
34-
import static org.junit.Assert.assertNull;
3534

3635
/**
3736
*
@@ -94,16 +93,7 @@ public void tearDown() throws Exception {
9493
}
9594

9695
@Test
97-
public void noResponseWhenServerTriesToRecreateTemporaryResponseQueue() throws Exception {
98-
setupRpcServer();
99-
100-
String messageContent = UUID.randomUUID().toString();
101-
Message response = doRpc(messageContent);
102-
assertNull(response);
103-
}
104-
105-
@Test
106-
public void responseOkWhenServerDoesNotRecreateTemporaryResponseQueue() throws Exception {
96+
public void rpc() throws Exception {
10797
setupRpcServer(destinationAlreadyDeclaredForRpcResponse());
10898

10999
String messageContent = UUID.randomUUID().toString();
@@ -148,15 +138,6 @@ void setupRpcServer(SendingContextConsumer sendingContextConsumer) throws Except
148138
rpcServer = new RpcServer(serverConnection);
149139
}
150140

151-
void setupRpcServer() throws Exception {
152-
setupRpcServer(new SendingContextConsumer() {
153-
154-
@Override
155-
public void accept(SendingContext ctx) {
156-
}
157-
});
158-
}
159-
160141
private static class RpcServer {
161142

162143
Session session;

src/test/java/com/rabbitmq/integration/tests/RpcWithAmqpDirectReplyIT.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.integration.tests;
33

4-
import com.rabbitmq.jms.admin.RMQConnectionFactory;
54
import com.rabbitmq.jms.admin.RMQDestination;
6-
import com.rabbitmq.jms.client.SendingContext;
7-
import com.rabbitmq.jms.client.SendingContextConsumer;
85
import org.junit.After;
96
import org.junit.Before;
107
import org.junit.Test;
@@ -52,19 +49,6 @@ protected static void drainQueue(Session session, Queue queue) throws Exception
5249
}
5350
}
5451

55-
static SendingContextConsumer destinationAlreadyDeclaredForRpcResponse() {
56-
return new SendingContextConsumer() {
57-
58-
@Override
59-
public void accept(SendingContext ctx) throws JMSException {
60-
if (ctx.getMessage().getJMSCorrelationID() != null && ctx.getDestination() instanceof RMQDestination) {
61-
RMQDestination destination = (RMQDestination) ctx.getDestination();
62-
destination.setDeclared(true);
63-
}
64-
}
65-
};
66-
}
67-
6852
@Before
6953
public void init() throws Exception {
7054
ConnectionFactory connectionFactory = AbstractTestConnectionFactory.getTestConnectionFactory()
@@ -93,7 +77,7 @@ public void tearDown() throws Exception {
9377

9478
@Test
9579
public void responseOkWhenServerDoesNotRecreateTemporaryResponseQueue() throws Exception {
96-
setupRpcServer(destinationAlreadyDeclaredForRpcResponse());
80+
setupRpcServer();
9781

9882
String messageContent = UUID.randomUUID().toString();
9983
Message response = doRpc(messageContent);
@@ -128,10 +112,9 @@ public void onMessage(Message msg) {
128112
return queue.poll(2, TimeUnit.SECONDS);
129113
}
130114

131-
void setupRpcServer(SendingContextConsumer sendingContextConsumer) throws Exception {
132-
RMQConnectionFactory connectionFactory = (RMQConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory()
115+
void setupRpcServer() throws Exception {
116+
ConnectionFactory connectionFactory = AbstractTestConnectionFactory.getTestConnectionFactory()
133117
.getConnectionFactory();
134-
connectionFactory.setSendingContextConsumer(sendingContextConsumer);
135118
serverConnection = connectionFactory.createConnection();
136119
serverConnection.start();
137120
Session session = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
2+
package com.rabbitmq.integration.tests;
3+
4+
import com.rabbitmq.jms.admin.RMQConnectionFactory;
5+
import com.rabbitmq.jms.client.SendingContext;
6+
import com.rabbitmq.jms.client.SendingContextConsumer;
7+
import org.junit.After;
8+
import org.junit.Before;
9+
import org.junit.Test;
10+
11+
import javax.jms.Connection;
12+
import javax.jms.Message;
13+
import javax.jms.MessageConsumer;
14+
import javax.jms.MessageProducer;
15+
import javax.jms.Queue;
16+
import javax.jms.Session;
17+
import javax.jms.TextMessage;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
/**
23+
*
24+
*/
25+
public class SendingContextConsumerIT {
26+
27+
private static final String QUEUE_NAME = "test.queue." + SendingContextConsumerIT.class.getCanonicalName();
28+
final AtomicInteger sentCount = new AtomicInteger(0);
29+
Connection connection;
30+
31+
protected static void drainQueue(Session session, Queue queue) throws Exception {
32+
MessageConsumer receiver = session.createConsumer(queue);
33+
Message msg = receiver.receiveNoWait();
34+
while (msg != null) {
35+
msg = receiver.receiveNoWait();
36+
}
37+
}
38+
39+
@Before
40+
public void init() throws Exception {
41+
RMQConnectionFactory connectionFactory = (RMQConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory()
42+
.getConnectionFactory();
43+
connectionFactory.setSendingContextConsumer(new SendingContextConsumer() {
44+
45+
@Override
46+
public void accept(SendingContext ctx) {
47+
sentCount.incrementAndGet();
48+
}
49+
});
50+
connection = connectionFactory.createConnection();
51+
connection.start();
52+
}
53+
54+
@After
55+
public void tearDown() throws Exception {
56+
if (connection != null) {
57+
connection.close();
58+
}
59+
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
60+
com.rabbitmq.client.Connection c = cf.newConnection();
61+
try {
62+
c.createChannel().queueDelete(QUEUE_NAME);
63+
} finally {
64+
c.close();
65+
}
66+
}
67+
68+
@Test
69+
public void sendingContextConsumerShouldBeCalledWhenSendingMessage() throws Exception {
70+
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
71+
TextMessage message = session.createTextMessage("hello");
72+
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
73+
int initialCount = sentCount.get();
74+
producer.send(message);
75+
assertEquals(initialCount + 1, sentCount.get());
76+
producer.send(message);
77+
assertEquals(initialCount + 2, sentCount.get());
78+
}
79+
}

0 commit comments

Comments
 (0)