1616package com .hivemq .extensions .influxdb ;
1717
1818import com .hivemq .client .mqtt .mqtt5 .Mqtt5Client ;
19+ import com .influxdb .client .InfluxDBClient ;
20+ import com .influxdb .client .InfluxDBClientFactory ;
21+ import com .influxdb .client .domain .InfluxQLQuery ;
1922import io .github .sgtsilvio .gradle .oci .junit .jupiter .OciImages ;
20- import org .influxdb .InfluxDB ;
21- import org .influxdb .dto .Query ;
2223import org .jetbrains .annotations .NotNull ;
24+ import org .jetbrains .annotations .Nullable ;
2325import org .junit .jupiter .api .Test ;
24- import org .slf4j .Logger ;
25- import org .slf4j .LoggerFactory ;
2626import org .testcontainers .containers .InfluxDBContainer ;
2727import org .testcontainers .containers .Network ;
2828import org .testcontainers .hivemq .HiveMQContainer ;
3131import org .testcontainers .utility .MountableFile ;
3232
3333import static org .awaitility .Awaitility .await ;
34- import static org .influxdb .querybuilder .BuiltQuery .QueryBuilder .select ;
3534
3635@ SuppressWarnings ({"resource" })
3736@ Testcontainers
3837public class InfluxDbExtensionIT {
3938
40- private static final @ NotNull String INFLUXDB_NAME = "hivemq" ;
41-
42- private static final @ NotNull Logger LOG = LoggerFactory .getLogger (InfluxDbExtensionIT .class );
39+ private static final @ NotNull String INFLUXDB_DATABASE = "hivemq" ;
4340
4441 private final @ NotNull Network network = Network .newNetwork ();
4542
@@ -50,7 +47,8 @@ public class InfluxDbExtensionIT {
5047 .withNetwork (network )
5148 .withCopyToContainer (MountableFile .forClasspathResource ("influxdb.properties" ),
5249 "/opt/hivemq/extensions/hivemq-influxdb-extension/influxdb.properties" )
53- .withLogConsumer (outputFrame -> System .out .print ("HIVEMQ: " + outputFrame .getUtf8String ()));
50+ .withLogConsumer (outputFrame -> System .out .print ("HIVEMQ: " + outputFrame .getUtf8String ()))
51+ .withEnv ("HIVEMQ_DISABLE_STATISTICS" , "true" );
5452
5553 @ Container
5654 private final @ NotNull InfluxDBContainer <?> influxDB =
@@ -60,49 +58,51 @@ public class InfluxDbExtensionIT {
6058
6159 @ Test
6260 void testMetricsAreForwardedToInfluxDB () {
63- final var influxDbClient = influxDB .getNewInfluxDB ();
64- influxDbClient .setDatabase ("hivemq" );
65-
66- final var query = influxDbClient .query (new Query ("CREATE DATABASE \" " + INFLUXDB_NAME + "\" " ));
67- LOG .info ("created database with query result: {}" , query );
68- influxDbClient .setDatabase (INFLUXDB_NAME );
61+ try (final var influxDBClient = InfluxDBClientFactory .create (influxDB .getUrl ())) {
62+ final var createDbQuery = new InfluxQLQuery ("CREATE DATABASE \" %s\" " .formatted (INFLUXDB_DATABASE ), "" );
63+ influxDBClient .getInfluxQLQueryApi ().query (createDbQuery );
6964
70- final var mqttClient =
71- Mqtt5Client .builder ().serverHost (hivemq .getHost ()).serverPort (hivemq .getMqttPort ()).buildBlocking ();
72- mqttClient .connect ();
73- mqttClient .publishWith ().topic ("my/topic1" ).send ();
74- mqttClient .publishWith ().topic ("my/topic2" ).send ();
75- mqttClient .publishWith ().topic ("my/topic3" ).send ();
76- mqttClient .disconnect ();
65+ final var mqttClient =
66+ Mqtt5Client .builder ().serverHost (hivemq .getHost ()).serverPort (hivemq .getMqttPort ()).buildBlocking ();
67+ mqttClient .connect ();
68+ mqttClient .publishWith ().topic ("my/topic1" ).send ();
69+ mqttClient .publishWith ().topic ("my/topic2" ).send ();
70+ mqttClient .publishWith ().topic ("my/topic3" ).send ();
71+ mqttClient .disconnect ();
7772
78- await ().until (() -> getMetricMax (influxDbClient , "com.hivemq.messages.incoming.publish.count" ) == 3 );
79- await ().until (() -> getMetricMax (influxDbClient , "com.hivemq.messages.incoming.connect.count" ) == 1 );
73+ await ().until (() -> getMetricMax (influxDBClient , "com.hivemq.messages.incoming.publish.count" ) == 3 );
74+ await ().until (() -> getMetricMax (influxDBClient , "com.hivemq.messages.incoming.connect.count" ) == 1 );
75+ }
8076 }
8177
82- private long getMetricMax (final @ NotNull InfluxDB client , final @ NotNull String metric ) {
83- var acc = 0L ;
84- final var queryResult = client .query (select ("count" ).from (INFLUXDB_NAME , metric ));
85- for (final var result : queryResult .getResults ()) {
86- final var series = result .getSeries ();
87- if (series == null ) {
88- break ;
89- }
90- final var values = series .get (series .size () - 1 ).getValues ();
91- if (values == null ) {
92- break ;
93- }
94- long max = 0 ;
95- for (final var value : values ) {
96- if (value == null ) {
97- break ;
98- }
99- final var val = (double ) value .get (1 );
100- if (max < val ) {
101- max = (long ) val ;
78+ private long getMetricMax (final @ NotNull InfluxDBClient client , final @ NotNull String metric ) {
79+ final var influxQL = String .format ("SELECT MAX(count) FROM \" %s\" " , metric );
80+ final var query = new InfluxQLQuery (influxQL , INFLUXDB_DATABASE );
81+ final var result = client .getInfluxQLQueryApi ().query (query );
82+ long max = 0 ;
83+ for (final var queryResult : result .getResults ()) {
84+ for (final var series : queryResult .getSeries ()) {
85+ for (final var record : series .getValues ()) {
86+ final var value = getValue (record .getValueByKey ("max" ));
87+ if (value > max ) {
88+ max = value ;
89+ }
10290 }
10391 }
104- acc += max ;
10592 }
106- return acc ;
93+ return max ;
94+ }
95+
96+ private static long getValue (final @ Nullable Object valueField ) {
97+ if (valueField instanceof Number ) {
98+ return ((Number ) valueField ).longValue ();
99+ } else if (valueField != null ) {
100+ try {
101+ // try to parse as double if it's a string
102+ return (long ) Double .parseDouble (valueField .toString ());
103+ } catch (final NumberFormatException ignored ) {
104+ }
105+ }
106+ return Long .MIN_VALUE ;
107107 }
108108}
0 commit comments