Skip to content

Commit 6846306

Browse files
committed
Throw exception only for queues when selector is not null
Selectors are supported with topics. [#159272967] Fixes #52 (cherry picked from commit 920afc1)
1 parent 12b6d16 commit 6846306

File tree

2 files changed

+116
-3
lines changed

2 files changed

+116
-3
lines changed

src/main/java/com/rabbitmq/jms/client/RMQSession.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -746,8 +746,10 @@ public MessageConsumer createConsumer(Destination destination, String messageSel
746746
illegalStateExceptionIfClosed();
747747
if (nullOrEmpty(messageSelector)) {
748748
return createConsumer(destination);
749+
} else if (isTopic(destination)) {
750+
return createConsumerInternal((RMQDestination) destination, null, false, messageSelector);
749751
} else {
750-
// we are not implementing this method yet
752+
// selectors are not supported for queues
751753
throw new UnsupportedOperationException();
752754
}
753755
}
@@ -756,6 +758,10 @@ private static boolean nullOrEmpty(String str) {
756758
return str==null || str.trim().isEmpty();
757759
}
758760

761+
private static boolean isTopic(Destination destination) {
762+
return !((RMQDestination) destination).isQueue();
763+
}
764+
759765
/**
760766
* {@inheritDoc}
761767
* @throws UnsupportedOperationException - method not implemented until we support selectors
@@ -767,8 +773,12 @@ public MessageConsumer createConsumer(Destination destination, String messageSel
767773
RMQMessageConsumer consumer = (RMQMessageConsumer)createConsumer(destination);
768774
consumer.setNoLocal(noLocal);
769775
return consumer;
770-
} else {
771-
// we are not implementing this method yet
776+
} else if (isTopic(destination)) {
777+
RMQMessageConsumer consumer = createConsumerInternal((RMQDestination) destination, null, false, messageSelector);
778+
consumer.setNoLocal(noLocal);
779+
return consumer;
780+
} else {
781+
// selectors are not supported for queues
772782
throw new UnsupportedOperationException();
773783
}
774784
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
2+
package com.rabbitmq.integration.tests;
3+
4+
import com.rabbitmq.jms.admin.RMQConnectionFactory;
5+
import com.rabbitmq.jms.client.message.RMQTextMessage;
6+
import org.junit.jupiter.api.AfterEach;
7+
import org.junit.jupiter.api.BeforeEach;
8+
import org.junit.jupiter.api.Test;
9+
10+
import javax.jms.DeliveryMode;
11+
import javax.jms.JMSException;
12+
import javax.jms.MessageConsumer;
13+
import javax.jms.Queue;
14+
import javax.jms.QueueConnection;
15+
import javax.jms.QueueSession;
16+
import javax.jms.Session;
17+
import javax.jms.TextMessage;
18+
import javax.jms.Topic;
19+
import javax.jms.TopicConnection;
20+
import javax.jms.TopicPublisher;
21+
import javax.jms.TopicSession;
22+
23+
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
25+
26+
/**
27+
* To make sure Session#createConsumer works for at least topic when a
28+
* selector is provided.
29+
*
30+
* See https://github.com/rabbitmq/rabbitmq-jms-client/issues/52
31+
*/
32+
public class SelectorAppliedToTopicNotQueueIT {
33+
34+
private static final String TOPIC_NAME = "test.topic." + SelectorAppliedToTopicNotQueueIT.class.getCanonicalName();
35+
private static final String QUEUE_NAME = "test.queue." + SimpleQueueMessageIT.class.getCanonicalName();
36+
private static final String MESSAGE1 = "Hello " + SelectorAppliedToTopicNotQueueIT.class.getName();
37+
38+
RMQConnectionFactory connFactory;
39+
TopicConnection topicConn;
40+
QueueConnection queueConn;
41+
42+
@BeforeEach
43+
public void beforeTests() throws Exception {
44+
this.connFactory =
45+
(RMQConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory()
46+
.getConnectionFactory();
47+
this.topicConn = connFactory.createTopicConnection();
48+
this.queueConn = connFactory.createQueueConnection();
49+
}
50+
51+
@AfterEach
52+
public void afterTests() throws Exception {
53+
if (this.topicConn != null)
54+
this.topicConn.close();
55+
if (queueConn != null)
56+
queueConn.close();
57+
}
58+
59+
@Test
60+
public void sendAndReceiveSupportedOnTopic() throws Exception {
61+
topicConn.start();
62+
TopicSession topicSession = topicConn.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
63+
Topic topic = topicSession.createTopic(TOPIC_NAME);
64+
MessageConsumer receiver = topicSession.createConsumer(topic, "boolProp");
65+
sendAndReceive(topicSession, topic, receiver);
66+
}
67+
68+
@Test
69+
public void sendAndReceiveSupportedOnTopicNoLocal() throws Exception {
70+
topicConn.start();
71+
TopicSession topicSession = topicConn.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
72+
Topic topic = topicSession.createTopic(TOPIC_NAME);
73+
MessageConsumer receiver = topicSession.createConsumer(topic, "boolProp", false);
74+
sendAndReceive(topicSession, topic, receiver);
75+
}
76+
77+
private void sendAndReceive(TopicSession topicSession, Topic topic, MessageConsumer receiver) throws JMSException {
78+
TopicPublisher sender = topicSession.createPublisher(topic);
79+
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
80+
TextMessage message = topicSession.createTextMessage(MESSAGE1);
81+
message.setBooleanProperty("boolProp", true);
82+
sender.send(message);
83+
RMQTextMessage tmsg = (RMQTextMessage) receiver.receive();
84+
String t = tmsg.getText();
85+
assertEquals(MESSAGE1, t);
86+
}
87+
88+
@Test
89+
public void sendAndReceiveNotSupportedOnQueue() throws Exception {
90+
queueConn.start();
91+
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
92+
Queue queue = queueSession.createQueue(QUEUE_NAME);
93+
assertThrows(UnsupportedOperationException.class, () -> queueSession.createConsumer(queue, "boolProp"));
94+
}
95+
96+
@Test
97+
public void sendAndReceiveNotSupportedOnQueueNoLocal() throws Exception {
98+
queueConn.start();
99+
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
100+
Queue queue = queueSession.createQueue(QUEUE_NAME);
101+
assertThrows(UnsupportedOperationException.class, () -> queueSession.createConsumer(queue, "boolProp", false));
102+
}
103+
}

0 commit comments

Comments
 (0)