Skip to content

Commit d87bcf9

Browse files
committed
Add support for metrics collection
Add a MetricsCollector property to RMQConnectionFactory. [#157312331] Fixes #49 (cherry picked from commit 2195e91) Conflicts: src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java src/test/java/com/rabbitmq/integration/tests/AbstractITQueue.java
1 parent ae91916 commit d87bcf9

File tree

3 files changed

+162
-3
lines changed

3 files changed

+162
-3
lines changed

src/main/java/com/rabbitmq/jms/admin/RMQConnectionFactory.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
package com.rabbitmq.jms.admin;
33

44
import com.rabbitmq.client.Address;
5+
import com.rabbitmq.client.MetricsCollector;
6+
import com.rabbitmq.client.NoOpMetricsCollector;
57
import com.rabbitmq.jms.client.AmqpPropertiesCustomiser;
68
import com.rabbitmq.jms.client.ConnectionParams;
79
import com.rabbitmq.jms.client.RMQConnection;
@@ -11,8 +13,21 @@
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
1315

14-
import javax.jms.*;
15-
import javax.naming.*;
16+
import javax.jms.Connection;
17+
import javax.jms.ConnectionFactory;
18+
import javax.jms.JMSException;
19+
import javax.jms.Message;
20+
import javax.jms.MessageListener;
21+
import javax.jms.MessageProducer;
22+
import javax.jms.QueueConnection;
23+
import javax.jms.QueueConnectionFactory;
24+
import javax.jms.TopicConnection;
25+
import javax.jms.TopicConnectionFactory;
26+
import javax.naming.NamingException;
27+
import javax.naming.RefAddr;
28+
import javax.naming.Reference;
29+
import javax.naming.Referenceable;
30+
import javax.naming.StringRefAddr;
1631
import javax.net.ssl.SSLContext;
1732
import javax.net.ssl.SSLException;
1833
import java.io.IOException;
@@ -21,7 +36,9 @@
2136
import java.util.List;
2237
import java.util.concurrent.TimeoutException;
2338

24-
import static com.rabbitmq.jms.util.UriCodec.*;
39+
import static com.rabbitmq.jms.util.UriCodec.encHost;
40+
import static com.rabbitmq.jms.util.UriCodec.encSegment;
41+
import static com.rabbitmq.jms.util.UriCodec.encUserinfo;
2542

2643
/**
2744
* RabbitMQ Implementation of JMS {@link ConnectionFactory}
@@ -79,6 +96,13 @@ public class RMQConnectionFactory implements ConnectionFactory, Referenceable, S
7996
*/
8097
private AmqpPropertiesCustomiser amqpPropertiesCustomiser;
8198

99+
/**
100+
* Collector for AMQP-client metrics.
101+
*
102+
* @since 1.10.0
103+
*/
104+
private MetricsCollector metricsCollector = new NoOpMetricsCollector();
105+
82106
/** Default not to use ssl */
83107
private boolean ssl = false;
84108
private String tlsProtocol;
@@ -130,6 +154,8 @@ public Connection createConnection(String username, String password) throws JMSE
130154
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
131155
setRabbitUri(logger, this, factory, this.getUri());
132156
maybeEnableTLS(factory);
157+
158+
factory.setMetricsCollector(this.metricsCollector);
133159
com.rabbitmq.client.Connection rabbitConnection = instantiateNodeConnection(factory);
134160

135161
RMQConnection conn = new RMQConnection(new ConnectionParams()
@@ -706,5 +732,15 @@ public void setAmqpPropertiesCustomiser(AmqpPropertiesCustomiser amqpPropertiesC
706732
this.amqpPropertiesCustomiser = amqpPropertiesCustomiser;
707733
}
708734

735+
/**
736+
* Set the collector for AMQP-client metrics.
737+
*
738+
* @param metricsCollector
739+
* @since 1.10.0
740+
* @see com.rabbitmq.client.ConnectionFactory#setMetricsCollector(MetricsCollector)
741+
*/
742+
public void setMetricsCollector(MetricsCollector metricsCollector) {
743+
this.metricsCollector = metricsCollector;
744+
}
709745
}
710746

src/test/java/com/rabbitmq/integration/tests/AbstractITQueue.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.junit.After;
1414
import org.junit.Before;
15+
import com.rabbitmq.jms.admin.RMQConnectionFactory;
1516

1617
public abstract class AbstractITQueue {
1718
QueueConnectionFactory connFactory;
@@ -21,9 +22,14 @@ public abstract class AbstractITQueue {
2122
public void beforeTests() throws Exception {
2223
this.connFactory = (QueueConnectionFactory) AbstractTestConnectionFactory.getTestConnectionFactory()
2324
.getConnectionFactory();
25+
customise((RMQConnectionFactory) connFactory);
2426
this.queueConn = connFactory.createQueueConnection();
2527
}
2628

29+
protected void customise(RMQConnectionFactory connectionFactory) {
30+
31+
}
32+
2733
protected static void drainQueue(QueueSession session, Queue queue) throws Exception {
2834
QueueReceiver receiver = session.createReceiver(queue);
2935
int n=0;
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/* Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. */
2+
package com.rabbitmq.integration.tests;
3+
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
import com.rabbitmq.client.impl.AbstractMetricsCollector;
7+
import com.rabbitmq.jms.admin.RMQConnectionFactory;
8+
import org.junit.Test;
9+
10+
import javax.jms.DeliveryMode;
11+
import javax.jms.Queue;
12+
import javax.jms.QueueReceiver;
13+
import javax.jms.QueueSender;
14+
import javax.jms.QueueSession;
15+
import javax.jms.Session;
16+
import java.io.Serializable;
17+
18+
import static org.junit.Assert.assertEquals;
19+
20+
/**
21+
* Integration test to check metrics collection is properly set up.
22+
*/
23+
public class MetricsCollectionIT extends AbstractITQueue {
24+
25+
private static final String QUEUE_NAME = "test.queue." + MetricsCollectionIT.class.getCanonicalName();
26+
private static final long TEST_RECEIVE_TIMEOUT = 1000; // one second
27+
28+
TestMetricsCollector metricsCollector = new TestMetricsCollector();
29+
30+
@Override
31+
protected void customise(RMQConnectionFactory connectionFactory) {
32+
connectionFactory.setMetricsCollector(metricsCollector);
33+
}
34+
35+
@Test
36+
public void testSendAndReceiveLongTextMessage() throws Exception {
37+
assertEquals(1, metricsCollector.connectionCount);
38+
assertEquals(0, metricsCollector.channelCount);
39+
assertEquals(0, metricsCollector.publishedMessageCount);
40+
assertEquals(0, metricsCollector.consumedMessageCount);
41+
sendAndConsumeMessage();
42+
assertEquals(1, metricsCollector.channelCount);
43+
assertEquals(1, metricsCollector.publishedMessageCount);
44+
assertEquals(1, metricsCollector.consumedMessageCount);
45+
}
46+
47+
protected void sendAndConsumeMessage() throws Exception {
48+
MessageTestType mtt = MessageTestType.TEXT;
49+
try {
50+
queueConn.start();
51+
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
52+
Queue queue = queueSession.createQueue(QUEUE_NAME);
53+
54+
drainQueue(queueSession, queue);
55+
56+
QueueSender queueSender = queueSession.createSender(queue);
57+
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
58+
queueSender.send(mtt.gen(queueSession, (Serializable) queue));
59+
} finally {
60+
reconnect();
61+
}
62+
63+
queueConn.start();
64+
QueueSession queueSession = queueConn.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
65+
Queue queue = queueSession.createQueue(QUEUE_NAME);
66+
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
67+
mtt.check(queueReceiver.receive(TEST_RECEIVE_TIMEOUT), (Serializable) queue);
68+
}
69+
70+
static class TestMetricsCollector extends AbstractMetricsCollector {
71+
72+
volatile int connectionCount = 0;
73+
volatile int channelCount = 0;
74+
volatile int publishedMessageCount = 0;
75+
volatile int consumedMessageCount = 0;
76+
77+
@Override
78+
protected void incrementConnectionCount(Connection connection) {
79+
connectionCount++;
80+
}
81+
82+
@Override
83+
protected void decrementConnectionCount(Connection connection) {
84+
connectionCount--;
85+
}
86+
87+
@Override
88+
protected void incrementChannelCount(Channel channel) {
89+
channelCount++;
90+
}
91+
92+
@Override
93+
protected void decrementChannelCount(Channel channel) {
94+
channelCount--;
95+
}
96+
97+
@Override
98+
protected void markPublishedMessage() {
99+
publishedMessageCount++;
100+
}
101+
102+
@Override
103+
protected void markConsumedMessage() {
104+
consumedMessageCount++;
105+
}
106+
107+
@Override
108+
protected void markAcknowledgedMessage() {
109+
110+
}
111+
112+
@Override
113+
protected void markRejectedMessage() {
114+
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)