Skip to content

Commit 9bdb917

Browse files
Merge pull request #45 from rabbitmq/rabbitmq-jms-client-42-message-customizer
Add AmqpPropertiesCustomiser interface
2 parents bff4eda + a7bb7db commit 9bdb917

File tree

10 files changed

+206
-17
lines changed

10 files changed

+206
-17
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
<rabbitmq.version>4.6.0</rabbitmq.version>
4545
<slf4j-api.version>1.7.25</slf4j-api.version>
4646
<junit.version>4.12</junit.version>
47-
<mockito-core.version>2.12.0</mockito-core.version>
48-
<awaitility.version>3.0.0</awaitility.version>
47+
<mockito-core.version>2.16.0</mockito-core.version>
48+
<awaitility.version>3.1.0</awaitility.version>
4949

5050
<maven.dependency.plugin.version>2.8</maven.dependency.plugin.version>
5151
<maven.compiler.plugin.version>3.5.1</maven.compiler.plugin.version>

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
/* Copyright (c) 2013-2017 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.admin;
33

44
import com.rabbitmq.client.Address;
5+
import com.rabbitmq.jms.client.AmqpPropertiesCustomiser;
56
import com.rabbitmq.jms.client.ConnectionParams;
67
import com.rabbitmq.jms.client.RMQConnection;
78
import com.rabbitmq.jms.util.RMQJMSException;
@@ -72,6 +73,12 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
7273
*/
7374
private boolean cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = false;
7475

76+
/**
77+
* Callback to customise properties of outbound AMQP messages.
78+
* @since 1.9.0
79+
*/
80+
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
81+
7582
/** Default not to use ssl */
7683
private boolean ssl = false;
7784
private String tlsProtocol;
@@ -134,6 +141,7 @@ public Connection createConnection(String username, String password) throws JMSE
134141
.setPreferProducerMessageProperty(preferProducerMessageProperty)
135142
.setRequeueOnMessageListenerException(requeueOnMessageListenerException)
136143
.setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
144+
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
137145
);
138146
conn.setTrustedPackages(this.trustedPackages);
139147
logger.debug("Connection {} created.", conn);
@@ -693,5 +701,10 @@ public void setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose(boolean
693701
public boolean isCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose() {
694702
return this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose;
695703
}
704+
705+
public void setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amqpPropertiesCustomiser) {
706+
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
707+
}
708+
696709
}
697710

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
2+
3+
package com.rabbitmq.jms.client;
4+
5+
import com.rabbitmq.client.AMQP;
6+
7+
import javax.jms.Message;
8+
9+
/**
10+
* Callback to customise properties of outbound AMQP messages.
11+
* @since 1.9.0
12+
*/
13+
public interface AmqpPropertiesCustomiser {
14+
15+
/**
16+
* Customise AMQP message properties.
17+
* @param builder the AMQP properties builder
18+
* @param jmsMessage the outbound JMS message
19+
* @return the customised or a new AMQP properties builder
20+
*/
21+
AMQP.BasicProperties.Builder customise(AMQP.BasicProperties.Builder builder, Message jmsMessage);
22+
23+
}

src/main/java/com/rabbitmq/jms/client/ConnectionParams.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2016-2017 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2016-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import com.rabbitmq.client.Connection;
@@ -54,6 +54,12 @@ public class ConnectionParams {
5454
*/
5555
private boolean cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = false;
5656

57+
/**
58+
* Callback to customise properties of outbound AMQP messages.
59+
* @since 1.9.0
60+
*/
61+
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
62+
5763
public Connection getRabbitConnection() {
5864
return rabbitConnection;
5965
}
@@ -125,4 +131,13 @@ public ConnectionParams setCleanUpServerNamedQueuesForNonDurableTopicsOnSessionC
125131
this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose;
126132
return this;
127133
}
134+
135+
public AmqpPropertiesCustomiser getAmqpPropertiesCustomiser() {
136+
return amqpPropertiesCustomiser;
137+
}
138+
139+
public ConnectionParams setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amqpPropertiesCustomiser) {
140+
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
141+
return this;
142+
}
128143
}

src/main/java/com/rabbitmq/jms/client/RMQConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -100,6 +100,12 @@ public class RMQConnection implements Connection, QueueConnection, TopicConnecti
100100
*/
101101
private final boolean cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose;
102102

103+
/**
104+
* Callback to customise properties of outbound AMQP messages.
105+
* @since 1.9.0
106+
*/
107+
private final AmqpPropertiesCustomiser amqpPropertiesCustomiser;
108+
103109
/**
104110
* Classes in these packages can be transferred via ObjectMessage.
105111
*
@@ -123,6 +129,7 @@ public RMQConnection(ConnectionParams connectionParams) {
123129
this.preferProducerMessageProperty = connectionParams.willPreferProducerMessageProperty();
124130
this.requeueOnMessageListenerException = connectionParams.willRequeueOnMessageListenerException();
125131
this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose = connectionParams.isCleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose();
132+
this.amqpPropertiesCustomiser = connectionParams.getAmqpPropertiesCustomiser();
126133
}
127134

128135
/**
@@ -171,6 +178,7 @@ public Session createSession(boolean transacted, int acknowledgeMode) throws JMS
171178
.setPreferProducerMessageProperty(this.preferProducerMessageProperty)
172179
.setRequeueOnMessageListenerException(this.requeueOnMessageListenerException)
173180
.setCleanUpServerNamedQueuesForNonDurableTopics(this.cleanUpServerNamedQueuesForNonDurableTopicsOnSessionClose)
181+
.setAmqpPropertiesCustomiser(this.amqpPropertiesCustomiser)
174182
);
175183
session.setTrustedPackages(this.trustedPackages);
176184
this.sessions.add(session);

src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013-2017 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -69,20 +69,27 @@ public class RMQMessageProducer implements MessageProducer, QueueSender, TopicPu
6969

7070
private final SendingStrategy sendingStrategy;
7171

72-
/**
73-
* Create a producer of messages.
74-
* @param session which this producer uses
75-
* @param destination to which this producer sends messages.
76-
* @param preferProducerMessageProperty properties take precedence over respective message properties
77-
*/
78-
public RMQMessageProducer(RMQSession session, RMQDestination destination, boolean preferProducerMessageProperty) {
72+
private final AmqpPropertiesCustomiser amqpPropertiesCustomiser;
73+
74+
public RMQMessageProducer(RMQSession session, RMQDestination destination, boolean preferProducerMessageProperty, AmqpPropertiesCustomiser amqpPropertiesCustomiser) {
7975
this.session = session;
8076
this.destination = destination;
8177
if (preferProducerMessageProperty) {
8278
sendingStrategy = new PreferMessageProducerPropertySendingStategy();
8379
} else {
8480
sendingStrategy = new PreferMessagePropertySendingStrategy();
8581
}
82+
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser == null ? new NoOpAmqpPropertiesCustomiser() : amqpPropertiesCustomiser;
83+
}
84+
85+
/**
86+
* Create a producer of messages.
87+
* @param session which this producer uses
88+
* @param destination to which this producer sends messages.
89+
* @param preferProducerMessageProperty properties take precedence over respective message properties
90+
*/
91+
public RMQMessageProducer(RMQSession session, RMQDestination destination, boolean preferProducerMessageProperty) {
92+
this(session, destination, preferProducerMessageProperty, new NoOpAmqpPropertiesCustomiser());
8693
}
8794

8895
/**
@@ -292,6 +299,8 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, int del
292299
bob.expiration(rmqExpiration(timeToLive));
293300
bob.headers(msg.toAmqpHeaders());
294301

302+
bob = amqpPropertiesCustomiser.customise(bob, msg);
303+
295304
byte[] data = msg.toAmqpByteArray();
296305

297306
this.session.getChannel().basicPublish(destination.getAmqpExchangeName(), destination.getAmqpRoutingKey(), bob.build(), data);
@@ -461,4 +470,13 @@ public void send(Destination destination, Message message, int deliveryMode, int
461470
private enum MessageExpirationType {
462471
TTL, EXPIRATION
463472
}
473+
474+
private static final class NoOpAmqpPropertiesCustomiser implements AmqpPropertiesCustomiser {
475+
476+
@Override
477+
public AMQP.BasicProperties.Builder customise(AMQP.BasicProperties.Builder builder, Message jmsMessage) {
478+
return builder;
479+
}
480+
481+
}
464482
}

src/main/java/com/rabbitmq/jms/client/RMQSession.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2013-2017 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import java.io.IOException;
@@ -103,6 +103,12 @@ public class RMQSession implements Session, QueueSession, TopicSession {
103103
*/
104104
private final boolean cleanUpServerNamedQueuesForNonDurableTopics;
105105

106+
/**
107+
* Callback to customise properties of outbound AMQP messages.
108+
* @since 1.9.0
109+
*/
110+
private final AmqpPropertiesCustomiser amqpPropertiesCustomiser;
111+
106112
/** The main RabbitMQ channel we use under the hood */
107113
private final Channel channel;
108114
/** Set to true if close() has been called and completed */
@@ -195,6 +201,7 @@ public RMQSession(SessionParams sessionParams) throws JMSException {
195201
this.preferProducerMessageProperty = sessionParams.willPreferProducerMessageProperty();
196202
this.requeueOnMessageListenerException = sessionParams.willRequeueOnMessageListenerException();
197203
this.cleanUpServerNamedQueuesForNonDurableTopics = sessionParams.isCleanUpServerNamedQueuesForNonDurableTopics();
204+
this.amqpPropertiesCustomiser = sessionParams.getAmqpPropertiesCustomiser();
198205

199206
if (transacted) {
200207
this.acknowledgeMode = Session.SESSION_TRANSACTED;
@@ -604,7 +611,7 @@ public MessageProducer createProducer(Destination destination) throws JMSExcepti
604611
illegalStateExceptionIfClosed();
605612
RMQDestination dest = (RMQDestination) destination;
606613
declareDestinationIfNecessary(dest);
607-
RMQMessageProducer producer = new RMQMessageProducer(this, dest, this.preferProducerMessageProperty);
614+
RMQMessageProducer producer = new RMQMessageProducer(this, dest, this.preferProducerMessageProperty, this.amqpPropertiesCustomiser);
608615
this.producers.add(producer);
609616
return producer;
610617
}

src/main/java/com/rabbitmq/jms/client/SessionParams.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/* Copyright (c) 2016-2017 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2016-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.jms.client;
33

44
import javax.jms.Message;
@@ -49,6 +49,12 @@ public class SessionParams {
4949
*/
5050
private boolean cleanUpServerNamedQueuesForNonDurableTopics = false;
5151

52+
/**
53+
* Callback to customise properties of outbound AMQP messages.
54+
* @since 1.9.0
55+
*/
56+
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
57+
5258
public RMQConnection getConnection() {
5359
return connection;
5460
}
@@ -120,4 +126,13 @@ public SessionParams setCleanUpServerNamedQueuesForNonDurableTopics(boolean clea
120126
this.cleanUpServerNamedQueuesForNonDurableTopics = cleanUpServerNamedQueuesForNonDurableTopics;
121127
return this;
122128
}
129+
130+
public AmqpPropertiesCustomiser getAmqpPropertiesCustomiser() {
131+
return amqpPropertiesCustomiser;
132+
}
133+
134+
public SessionParams setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amqpPropertiesCustomiser) {
135+
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
136+
return this;
137+
}
123138
}

src/test/java/com/rabbitmq/integration/tests/AbstractAmqpITQueue.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
/* Copyright (c) 2013 Pivotal Software, Inc. All rights reserved. */
1+
/* Copyright (c) 2013-2018 Pivotal Software, Inc. All rights reserved. */
22
package com.rabbitmq.integration.tests;
33

44
import javax.jms.QueueConnection;
55
import javax.jms.QueueConnectionFactory;
66

7+
import com.rabbitmq.jms.admin.RMQConnectionFactory;
78
import org.junit.After;
89
import org.junit.Before;
910

@@ -23,12 +24,17 @@ public abstract class AbstractAmqpITQueue {
2324
public void setUp() throws Exception {
2425
this.connFactory = (QueueConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory()
2526
.getConnectionFactory();
27+
customise((RMQConnectionFactory) connFactory);
2628
this.queueConn = connFactory.createQueueConnection();
2729

2830
this.rabbitConn = rabbitConnFactory.newConnection();
2931
this.channel = rabbitConn.createChannel();
3032
}
3133

34+
protected void customise(RMQConnectionFactory connectionFactory) {
35+
36+
}
37+
3238
@After
3339
public void tearDown() throws Exception {
3440
if (this.queueConn != null) this.queueConn.close();
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
2+
package com.rabbitmq.integration.tests;
3+
4+
import com.rabbitmq.client.AMQP;
5+
import com.rabbitmq.client.GetResponse;
6+
import com.rabbitmq.jms.admin.RMQConnectionFactory;
7+
import com.rabbitmq.jms.admin.RMQDestination;
8+
import com.rabbitmq.jms.client.AmqpPropertiesCustomiser;
9+
import org.junit.Test;
10+
11+
import javax.jms.BytesMessage;
12+
import javax.jms.DeliveryMode;
13+
import javax.jms.Message;
14+
import javax.jms.Queue;
15+
import javax.jms.QueueReceiver;
16+
import javax.jms.QueueSender;
17+
import javax.jms.QueueSession;
18+
import javax.jms.Session;
19+
import javax.jms.TextMessage;
20+
import java.util.Arrays;
21+
import java.util.Enumeration;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
27+
import static org.junit.Assert.assertArrayEquals;
28+
import static org.junit.Assert.assertEquals;
29+
import static org.junit.Assert.assertNotNull;
30+
31+
/**
32+
* Integration test to test the AMQP properties customiser is applied correctly.
33+
*/
34+
public class AmqpPropertiesCustomiserIT extends AbstractAmqpITQueue {
35+
36+
private static final String QUEUE_NAME = "test.queue."+AmqpPropertiesCustomiserIT.class.getCanonicalName();
37+
public static final String MESSAGE = "hello";
38+
public static final String TEXT_PLAIN = "text/plain";
39+
40+
@Override
41+
protected void customise(RMQConnectionFactory connectionFactory) {
42+
connectionFactory.setAmqpPropertiesCustomiser(new AmqpPropertiesCustomiser() {
43+
44+
@Override
45+
public AMQP.BasicProperties.Builder customise(AMQP.BasicProperties.Builder builder, Message jmsMessage) {
46+
builder.contentType(TEXT_PLAIN);
47+
return builder;
48+
}
49+
});
50+
}
51+
52+
@Test
53+
public void customiserIsApplied() throws Exception {
54+
55+
channel.queueDeclare(QUEUE_NAME,
56+
false, // durable
57+
true, // exclusive
58+
true, // autoDelete
59+
null // options
60+
);
61+
62+
queueConn.start();
63+
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
64+
Queue queue = new RMQDestination(QUEUE_NAME, "", QUEUE_NAME, null); // write-only AMQP-mapped queue
65+
66+
QueueSender queueSender = queueSession.createSender(queue);
67+
68+
TextMessage message = queueSession.createTextMessage(MESSAGE);
69+
70+
queueSender.send(message);
71+
queueConn.close();
72+
73+
GetResponse response = channel.basicGet(QUEUE_NAME, false);
74+
assertNotNull("basicGet failed to retrieve a response", response);
75+
76+
byte[] body = response.getBody();
77+
assertNotNull("body of response is null", body);
78+
79+
assertEquals("body of response is not correct", MESSAGE, new String(body));
80+
81+
assertEquals("body of response is not correct", TEXT_PLAIN, response.getProps().getContentType());
82+
}
83+
84+
}

0 commit comments

Comments
 (0)