getPseudoServices() {
+ return this.pseudoServices;
+ }
+
+ /**
+ * Sets the client logic.
+ *
+ * @param clogic the new client logic
+ */
+ @Override
+ public void setClientLogic(ILlClientLogic clogic) {
+ return;
+ }
+
+ /**
+ * Gets the host implementation SP.
+ *
+ * @return the host implementation SP
+ */
+ @Override
+ public String getHostImplementationSp() {
+ return this.hostImplementation;
+ }
+
+ /**
+ * Adds the shutdown hook.
+ *
+ * @param hook the hook
+ */
+ public void addShutdownHook(Thread hook) {
+ Runtime.getRuntime().addShutdownHook(hook);
+ }
+
+ /**
+ * Sets the flag to run promethus measures.
+ */
+ private void setRunPromethusMeasures() {
+ this.services.forEach((k, v) -> {
+ if (v.isExposedToPrometheus()) {
+ this.runPrometheusServer = true;
+ }
+ });
+ }
+
+ /**
+ * Start prometheus measures.
+ *
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ private void startPrometheusMeasures() throws IOException {
+ this.prometheusMeasuresServer = new HTTPServer.Builder().withPort(this.prometheusMeasuresPort).build();
+ logger.info("Prometheus measures are now availabe for scraping.");
+ }
+
+ /**
+ * Run prometheus measures.
+ *
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ private void runPrometheusMeasures() throws IOException {
+ this.setRunPromethusMeasures();
+ if (this.runPrometheusServer) {
+ DefaultExports.initialize();
+ logger.info("Attempting to start Prometheus measures on port {}", this.prometheusMeasuresPort);
+ this.startPrometheusMeasures();
+ } else {
+ logger.info(
+ "No service is configured to be exposed for Prometheus. Server will not run and, therefore, no measures will be exposed.");
+ }
+ }
+}
diff --git a/src/main/java/at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync.java b/src/main/java/at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync.java
index ecbc6f7..5acf8a6 100644
--- a/src/main/java/at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync.java
+++ b/src/main/java/at/ac/ait/lablink/core/connection/mqtt/impl/MqttClientSync.java
@@ -5,863 +5,877 @@
package at.ac.ait.lablink.core.connection.mqtt.impl;
-import at.ac.ait.lablink.core.connection.IConnectionHandler;
-import at.ac.ait.lablink.core.connection.ex.LowLevelCommRuntimeException;
-import at.ac.ait.lablink.core.connection.mqtt.IMqttConnectionListener;
-import at.ac.ait.lablink.core.connection.mqtt.IMqttPublisher;
-import at.ac.ait.lablink.core.connection.mqtt.IMqttReceiverCallback;
-import at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber;
-import at.ac.ait.lablink.core.connection.mqtt.impl.MqttUtils;
-import at.ac.ait.lablink.core.ex.LlCoreRuntimeException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import at.ac.ait.lablink.core.connection.IConnectionHandler;
+import at.ac.ait.lablink.core.connection.ex.LowLevelCommRuntimeException;
+import at.ac.ait.lablink.core.connection.mqtt.IMqttConnectionListener;
+import at.ac.ait.lablink.core.connection.mqtt.IMqttPublisher;
+import at.ac.ait.lablink.core.connection.mqtt.IMqttReceiverCallback;
+import at.ac.ait.lablink.core.connection.mqtt.IMqttSubscriber;
+import at.ac.ait.lablink.core.ex.LlCoreRuntimeException;
/**
* Implementation of the low level MQTT client.
*
- * The low level MQTT client works as a wrapper interface to the MQTT client library. It
- * extends the library with additional functionality like an automatic re-establishment of a lost
- * connection to the broker or the possibility to register different listeners, which are informed
- * by the client if the connection state changes.
+ *
+ * The low level MQTT client works as a wrapper interface to the MQTT client
+ * library. It extends the library with additional functionality like an
+ * automatic re-establishment of a lost connection to the broker or the
+ * possibility to register different listeners, which are informed by the client
+ * if the connection state changes.
*
- *
This MQTT client uses a synchronous communication with the MQTT core (sending methods).
- * Therefore, every method call of the core library will be block until it's finished. This client
- * can be used for simple use cases, where the reaction time of a method call isn't very important.
+ *
+ * This MQTT client uses a synchronous communication with the MQTT core (sending
+ * methods). Therefore, every method call of the core library will be block
+ * until it's finished. This client can be used for simple use cases, where the
+ * reaction time of a method call isn't very important.
*
*
Functionality of the low Level client
*
- * LowLevel client properties
- * The lowLevel client should have the ability to reconnect to the broker, if it lost its
- * connection. Therefore a reconnection time period is necessary, where it tries to reconnect to
- * a broker. The client tries to reconnect to the broker for certain times or for infinite. The
- * reconnection functionality should be optionally be disabled by the settings.
+ *
+ * LowLevel client properties
+ * The lowLevel client should have the ability to reconnect to the broker, if it
+ * lost its connection. Therefore a reconnection time period is necessary, where
+ * it tries to reconnect to a broker. The client tries to reconnect to the
+ * broker for certain times or for infinite. The reconnection functionality
+ * should be optionally be disabled by the settings.
*
- *
The client needs some address parameters for the connection to the broker. These parameters
- * should be used as properties in a configuration file.
+ *
+ * The client needs some address parameters for the connection to the broker.
+ * These parameters should be used as properties in a configuration file.
*
- *
The first implementation won't be able to change the properties during runtime. For future
- * improvements the change of the properties should be handled dynamically during runtime.
- * Therefore parameters concerning the connection settings (e.g., broker address), should only be
- * handled during a disconnected period.
+ *
+ * The first implementation won't be able to change the properties during
+ * runtime. For future improvements the change of the properties should be
+ * handled dynamically during runtime. Therefore parameters concerning the
+ * connection settings (e.g., broker address), should only be handled during a
+ * disconnected period.
*
*
- *
Initialization (Object creation)
- * After initialization the client isn't connected to the MQTT broker. The system isn't
- * allowed to automatically reconnect to the broker. In this state it isn't possible to subscribe
- * or publish a topic. It should be possible to add the MqttConnectionListeners or the
- * MqttReceiver callback.
+ *
+ * Initialization (Object creation)
+ * After initialization the client isn't connected to the MQTT broker. The
+ * system isn't allowed to automatically reconnect to the broker. In this state
+ * it isn't possible to subscribe or publish a topic. It should be possible to
+ * add the MqttConnectionListeners or the MqttReceiver callback.
*
*
- *
Registering a callback method
- * Some callback methods (IMqttReceiverCallback or IMqttConnectionListener) could be registered
- * to the lowLevel client. The registrations could be dynamically added or removed from the
- * client. The client will inform the callback methods during its operation. If no callback is
- * registered to the client, the client will work in a correct way and it will drop all received
+ *
+ * Registering a callback method
+ * Some callback methods (IMqttReceiverCallback or IMqttConnectionListener)
+ * could be registered to the lowLevel client. The registrations could be
+ * dynamically added or removed from the client. The client will inform the
+ * callback methods during its operation. If no callback is registered to the
+ * client, the client will work in a correct way and it will drop all received
* messages.
*
*
- *
Connecting to the MQTT broker
- * After the initialization of the client it is possible to connect the client to a broker.
- * Therefore the connect method is called. After a successful connection establishment the client
- * will inform all MqttConnectionListeners by calling the
- * onEstablishedMqttConnection() method. If the connection to the broker can't be
- * established the client will inform the caller with an LowLevelCommRuntimeException
- * and the reconnection will be activated.
+ *
+ * Connecting to the MQTT broker
+ * After the initialization of the client it is possible to connect the client
+ * to a broker. Therefore the connect method is called. After a successful
+ * connection establishment the client will inform all MqttConnectionListeners
+ * by calling the onEstablishedMqttConnection() method. If the
+ * connection to the broker can't be established the client will inform the
+ * caller with an LowLevelCommRuntimeException and the reconnection
+ * will be activated.
*
*
- *
Operation Mode (connection established)
- * During the established connection it is possible to subscribe and publish messages or
- * receive messages from the broker.
+ *
+ * Operation Mode (connection established)
+ * During the established connection it is possible to subscribe and publish
+ * messages or receive messages from the broker.
*
*
- *
Receiving a MQTT message
- * If a message is received by the MQTT client it will redirect the incoming message to the
- * registered MqttCallbackReceiver
+ *
+ * Receiving a MQTT message
+ * If a message is received by the MQTT client it will redirect the incoming
+ * message to the registered MqttCallbackReceiver
*
*
- *
Disconnecting the MQTT client
- * By calling the disconnect method the client will perform the disconnection procedure.
- * Therefore it will call the onDisconnectingMqttConnection of all registered
- * MqttConnectionListeners and then it will disconnect from the Mqtt broker.
- * If the disconnection fails a LowLevelCommRuntimeException will be thrown.
+ *
+ * Disconnecting the MQTT client
+ * By calling the disconnect method the client will perform the disconnection
+ * procedure. Therefore it will call the
+ * onDisconnectingMqttConnection of all registered
+ * MqttConnectionListeners and then it will disconnect from the Mqtt broker. If
+ * the disconnection fails a LowLevelCommRuntimeException will be
+ * thrown.
*
- *
The manual called disconnect method disables an automatic reconnection.
+ *
+ * The manual called disconnect method disables an automatic reconnection.
*
*
- *
Lost the connection to the MQTT broker
- * If the client lost the connection to the broker (e.g., network errors, broker crashes) the
- * MQTT lib will call the lost connection handler. This call will be redirected to all registered
- * MqttConnectionListeners by calling the method onLostMqttConnection(). The client
- * will trigger the automatic reconnection sequence (if enabled) and tries to reconnect to the
- * MQTT broker.
+ *
+ * Lost the connection to the MQTT broker
+ * If the client lost the connection to the broker (e.g., network errors, broker
+ * crashes) the MQTT lib will call the lost connection handler. This call will
+ * be redirected to all registered MqttConnectionListeners by calling the method
+ * onLostMqttConnection(). The client will trigger the automatic
+ * reconnection sequence (if enabled) and tries to reconnect to the MQTT broker.
*
- *
During the disconnected state the client throws an exception if the publish or subscribe
- * methods are called.
+ *
+ * During the disconnected state the client throws an exception if the publish
+ * or subscribe methods are called.
*/
@SuppressWarnings("FieldCanBeLocal")
-public class MqttClientSync
- implements MqttCallback, IMqttPublisher, IConnectionHandler, IMqttSubscriber {
-
- private static final Logger logger = LoggerFactory.getLogger(MqttClientSync.class);
-
- // Preset properties of the class
-
- /* Quality of service for published messages */
- @SuppressWarnings("FieldCanBeLocal") private final int qualityOfService = 0;
-
- /* Default settings of the class */
-
- private final String defaultBrokerAddress = "localhost";
- private final int defaultBrokerPort = 1883;
- private final String defaultConnectionProtocol = "tcp";
- private final boolean defaultEnableReconnection = true;
- private final int defaultReconnectInterval = 10;
- private final int defaultReconnectNumberOfTries = -1;
- private final int defaultReceivedMessagesQueueSize = 2048;
-
- private int mqttConnectionTimeout = MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT;
-
- /* Client ID for MQTT communication (Not the clientId of the LablinkClient) */
- private final String clientId;
-
- /*
- * current address string of the broker.
- * The broker address uses the representation of the MQTT
- * library ({@link MqttClient}), e.g., "tcp://localhost:1883".
- */
- private final String brokerAddress;
-
- /* Mqtt synchronous client for publishing and receiving MQTT messages */
- private MqttClient mqttClient = null;
-
- /* Registered component to handle received messages */
- private IMqttReceiverCallback receiveCallback;
-
- private final Object receiveCallbackSyncMonitor = new Object();
-
- /* Registered connection listeners that should be informed about a state change. */
- private final List
- connectionListeners =
- new ArrayList();
-
- private final Object connectionListenersSyncMonitor = new Object();
-
- /* Current state of the client for reconnection handling */
- private ELlClientState currentClientState = ELlClientState.DISCONNECTED_FROM_BROKER;
-
-
- /* Own timer thread which handles the reconnection functionality */
- private final ReconnectionThread reconnectionThread;
-
- /* Worker thread for handling received messages */
- private final ReceivedMessageConsumer receivedMessageConsumerThread;
-
- private final Object publishMonitor = new Object();
-
-
- /**
- * Constructor with optional configuration object
- *
- * The MqttClientSync can be configured with a Configuration object. This object
- * can be memory based or it can be loaded from a resources/properties file. The configuration
- * will only be updated or taken during the creation of the client.
- * The following list shows the current implemented configuration properties withs their default
- * values (between brackets):
- *
- * - lowLevelComm.enableReconnection (true, boolean): Switch for enabling the
- * automatic reconnection, if the connection to the MQTT broker is lost.
- * - lowLevelComm.reconnectInterval (10, int): Time interval between two
- * reconnection tries in Seconds.
- * - lowLevelComm.reconnectNumberOfTries (-1, int): Maximum number of reconnection
- * tries. After this number of tries the client will switch to the disconnecting state. With
- * -1 the reconnection will be try forever (infinite)
- * - lowLevelComm.brokerAddress ("localhost", string): Address of the MQTT broker to
- * be connected.
- * - lowLevelComm.brokerPort (1883, int): Port of the MQTT broker to be connected.
- * - lowLevelComm.connectionProtocol ("tcp", string): Communication Protocol for the
- * MQTT broker. Usually tcp or ssl
- * - lowLevelComm.mqttConnectionTimeout (30, int): Mqtt Connection Timeout in
- * seconds
- * - lowLevelComm.receivedMessagesQueueSize (100, int): Queue Size for incoming
- * (received) messages. Incoming messages will be buffered in a queue and decoupled from the
- * incoming Mqtt thread.
- *
- * TODO add config parameters for SSL connection in the future
- *
- * @param mqttClientId MQTT client identifier (not the LablinkClient identifier)
- * For identification of the client within the broker.
- * @param config Configuration object that is used to parametrize the MQTT client.
- * Different parameters can be set. If no parameter is set, the client
- * will use the default settings.
- */
- public MqttClientSync(String mqttClientId, Configuration config) {
-
- if (config == null) {
- logger.info("No configuration is set for low-level MQTT client. Use default configuration.");
- config = new BaseConfiguration(); /* Initialize empty configuration */
- }
-
- logger.info("Initialize low-level MQTT client '{}'.", mqttClientId);
- this.clientId = mqttClientId;
-
- // Read configuration for MQTT broker address
- String brokerAddress = config.getString("lowLevelComm.brokerAddress", defaultBrokerAddress);
- int brokerPort = config.getInt("lowLevelComm.brokerPort", defaultBrokerPort);
- String
- connectionProtocol =
- config.getString("lowLevelComm.connectionProtocol", defaultConnectionProtocol);
-
- this.brokerAddress = createMqttBrokerAddress(brokerAddress, brokerPort, connectionProtocol);
- logger.info("BrokerAddress: {}", this.brokerAddress);
-
- this.mqttConnectionTimeout =
- config.getInt("lowLevelComm.mqttConnectionTimeout",
- MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT);
- logger.info("Connection Timeout: {}", this.mqttConnectionTimeout + "s");
-
- // Read configuration for Reconnection handling
- reconnectionThread = new ReconnectionThread(this);
-
- this.reconnectionThread.setEnableReconnection(
- config.getBoolean("lowLevelComm.enableReconnection", defaultEnableReconnection));
- this.reconnectionThread.setReconnectionInterval(
- config.getInt("lowLevelComm.reconnectInterval", defaultReconnectInterval) * 1000);
- this.reconnectionThread.setReconnectionTries(
- config.getInt("lowLevelComm.reconnectNumberOfTries", defaultReconnectNumberOfTries));
-
- logger.info("Reconnection Settings: Enabled: {} Interval: {}ms NoOfTries: {}",
- reconnectionThread.isEnableReconnection(), reconnectionThread.getReconnectionInterval(),
- reconnectionThread.getReconnectNumberOfTries());
-
- reconnectionThread.start();
-
- // Activate worker for receiving messages
- int
- queueSize =
- config.getInt("lowLevelComm.receivedMessagesQueueSize", defaultReceivedMessagesQueueSize);
- receivedMessageConsumerThread = new ReceivedMessageConsumer(queueSize, this.clientId);
- logger.info("ReceivedMessageConsumer: Queue Size: {}", queueSize);
- receivedMessageConsumerThread.start();
- }
-
- @Override
- public String toString() {
- return "MqttClientSync(" + clientId + ", " + brokerAddress + ')';
- }
-
- /**
- * Create the Address String for the MQTT broker.
- *
- * The method generated the address string for the MQTT broker without a validation of the
- * input parameters.
- *
- * @param brokerAddress Address of the broker (e.g., "localhost")
- * @param brokerPort Port of the broker (e.g., 1883)
- * @param connectionProtocol protocol of the connection (e.g., "tcp" or "ssh")
- * @return generated address string for the MQTT broker
- */
- private String createMqttBrokerAddress(String brokerAddress, int brokerPort,
- String connectionProtocol) {
- return String.format("%s://%s:%d", connectionProtocol, brokerAddress, brokerPort);
- }
-
- /**
- * Factory method for creating the Mqtt client.
- * Can be mocked for unit tests.
- *
- * @param brokerAddress Address of the broker
- * @param clientId ID of the client
- * @return the created MqttClient
- * @throws MqttException will be thrown by the Mqtt client creation
- */
- private static MqttClient createMqttClient(String brokerAddress, String clientId)
- throws MqttException {
- return new MqttClient(brokerAddress, clientId, null);
- }
-
- /**
- * Get the actual lowLevelMqttReceiver which contains the callback handler for received messages.
- *
- * @return actual used lowLevelMqttReceiver
- */
- public IMqttReceiverCallback getReceiveCallback() {
- return receiveCallback;
- }
-
- /**
- * Set the IMqttReceiverCallback which contains the callback handler for received messages.
- *
- * @param receiveCallback IMqttReceiverCallback to be set
- */
- public void setReceiveCallback(IMqttReceiverCallback receiveCallback) {
- if (receiveCallback == null) {
- throw new LlCoreRuntimeException("Set ReceiveCallback failed: Parameter is a null.");
- }
-
- logger.debug("Set new ReceiveCallback: {}", receiveCallback);
- synchronized (this.receiveCallbackSyncMonitor) {
- this.receiveCallback = receiveCallback;
- this.receivedMessageConsumerThread.setReceiveCallback(this.receiveCallback);
- }
- }
-
-
- /**
- * Add a IMqttConnectionListener to the client. This connection listener will be informed by the
- * the client, if an event regarding the connection will occur.
- *
- * @param listener IMqttConnectionListener to be added
- */
- public void addMqttConnectionListener(IMqttConnectionListener listener) {
-
- if (listener == null) {
- throw new LlCoreRuntimeException("Add ConnectionListener failed: Parameter is a null.");
- }
-
- if (!this.connectionListeners.contains(listener)) {
- logger.debug("Add connection listener: {}", listener.toString());
-
- synchronized (this.connectionListenersSyncMonitor) {
- this.connectionListeners.add(listener);
- }
- }
- }
-
- /**
- * Remove a connectionListener from the client.
- *
- * @param listener IMqttConnectionListener to be removed
- */
- public void removeConnectionListener(IMqttConnectionListener listener) {
- logger.trace("Remove connection listener: {}", listener.toString());
- synchronized (this.connectionListenersSyncMonitor) {
- this.connectionListeners.remove(listener);
- }
- }
-
- /**
- * Read the registered connection listeners (for testing purposes)
- *
- * @return the connection listeners.
- */
- List getConnectionListeners() {
- return connectionListeners;
- }
-
- /**
- * Read the clientId of the broker.
- *
- * @return the clientId of the broker.
- */
- public String getClientId() {
- return clientId;
- }
-
- /**
- * Read the broker address that the client uses.
- * The broker address uses the representation of the MQTT library ({@link MqttClient}),
- * e.g., "tcp://localhost:1883".
- *
- * @return broker address of the client.
- */
- public String getBrokerAddress() {
- return brokerAddress;
- }
-
- /**
- * Set the actual state of the client (thread safe) and trigger the reconnection timer.
- *
- * @param currentClientState set the current state
- */
- private synchronized void setCurrentClientStateAndTriggerReconnect(
- ELlClientState currentClientState) {
-
- this.currentClientState = currentClientState;
-
- synchronized (this.reconnectionThread) {
- this.reconnectionThread.notify();
- }
- }
-
- ELlClientState getCurrentClientState() {
- return currentClientState;
- }
-
- /**
- * Shutdown the MQTT lowLevel client.
- *
- * This method will be used for cleanup purposes. It should be called before the program's end.
- * It will disconnect from the broker and clean up its states.
- */
- public void shutdown() {
- if (isConnected()) {
- try {
- disconnect();
- } catch (LowLevelCommRuntimeException ex) {
- logger.warn("Error while disconnecting from broker during shutdown.");
- }
- }
-
- this.reconnectionThread.shutdown();
- this.receivedMessageConsumerThread.shutdown();
- }
-
- @Override
- public void connect() {
-
- // Create MQTT client if it doesn't exist.
- if (this.mqttClient == null) {
- try {
- this.mqttClient = MqttClientSync.createMqttClient(this.brokerAddress, this.clientId);
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
- }
-
- // Ignore connect method if the client is already connected
- if (isConnected()) {
- return;
- }
-
- // Connect to broker
- MqttConnectOptions mqttOpt = new MqttConnectOptions();
- mqttOpt.setCleanSession(true);
- mqttOpt.setConnectionTimeout(mqttConnectionTimeout);
- this.mqttClient.setCallback(this);
-
- try {
- this.mqttClient.connect(mqttOpt);
- this.setCurrentClientStateAndTriggerReconnect(ELlClientState.CONNECTED_TO_BROKER);
-
- synchronized (this.connectionListenersSyncMonitor) {
- for (IMqttConnectionListener listener : this.connectionListeners) {
- listener.onEstablishedMqttConnection();
- }
- }
- } catch (MqttException ex) {
- this.setCurrentClientStateAndTriggerReconnect(ELlClientState.TRY_TO_RECONNECT);
- throw new LowLevelCommRuntimeException(ex);
- }
- logger.info("MqttClient connected to broker {}", mqttClient.getServerURI());
-
- }
-
- @Override
- public void disconnect() {
- this.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
- if (isConnected()) {
-
- synchronized (this.connectionListenersSyncMonitor) {
- for (IMqttConnectionListener listener : this.connectionListeners) {
- listener.onDisconnectingMqttConnection();
- }
- }
-
- try {
- this.mqttClient.disconnect();
- logger.info("MqttClient disconnected from broker {}", mqttClient.getServerURI());
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
- }
- }
-
- @Override
- public boolean isConnected() {
- return this.mqttClient != null && this.mqttClient.isConnected();
- }
-
- @Override
- public void publish(String mqttTopic, byte[] mqttPayload) {
- if (!this.isConnected()) {
- throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
- }
-
- MqttUtils.validateMqttTopic(mqttTopic);
-
- MqttMessage mqttMsg = new MqttMessage(mqttPayload);
- mqttMsg.setQos(this.qualityOfService);
-
- synchronized (this.publishMonitor) {
- try {
- this.mqttClient.publish(mqttTopic, mqttMsg);
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
- }
- }
-
- @Override
- public void subscribe(String mqttTopic) {
- if (!this.isConnected()) {
- throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
- }
-
- try {
- this.mqttClient.subscribe(mqttTopic);
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
-
- }
-
- @Override
- public void subscribe(List mqttTopics) {
- if (!this.isConnected()) {
- throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
- }
-
- for (String mqttTopic : mqttTopics) {
- MqttUtils.validateMqttSubscription(mqttTopic);
- }
-
- try {
- this.mqttClient.subscribe(mqttTopics.toArray(new String[0]));
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
- }
-
- @Override
- public void unsubscribe(String mqttTopic) {
-
- if (!this.isConnected()) {
- throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
- }
- try {
- this.mqttClient.unsubscribe(mqttTopic);
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
-
- }
-
- @Override
- public void unsubscribe(List mqttTopics) {
- if (!this.isConnected()) {
- throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
- }
-
- for (String mqttTopic : mqttTopics) {
- MqttUtils.validateMqttSubscription(mqttTopic);
- }
-
- try {
- this.mqttClient.unsubscribe(mqttTopics.toArray(new String[0]));
- } catch (MqttException ex) {
- throw new LowLevelCommRuntimeException(ex);
- }
- }
-
- @Override
- public void connectionLost(Throwable throwable) {
-
- logger.warn("MQTT connection lost: {}", throwable.toString());
-
- synchronized (this.connectionListenersSyncMonitor) {
- for (IMqttConnectionListener listener : this.connectionListeners) {
- listener.onLostMqttConnection();
- }
- }
-
- this.setCurrentClientStateAndTriggerReconnect(ELlClientState.TRY_TO_RECONNECT);
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- logger.trace("New MQTT message received: Topic({}) ReceivedMessage()", topic,
- mqttMessage.toString());
-
- synchronized (this.receiveCallbackSyncMonitor) {
- if (this.receiveCallback != null && !mqttMessage.isDuplicate()) {
- this.receivedMessageConsumerThread.addNewMessage(topic, mqttMessage.getPayload());
- }
- }
- }
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken mqttDeliveryToken) {
- /* not used by the synchronous client */
- }
-
- /**
- * Inner enumeration to identify the current state of the client.
- *
- */
- enum ELlClientState {
- DISCONNECTED_FROM_BROKER, CONNECTED_TO_BROKER, TRY_TO_RECONNECT
- }
-
- /**
- * Thread for controlling the reconnection to the broker.
- *
- * The client provides the functionality to automatically reconnect to a broker, if the
- * connection can't be created or the connection was lost.
- *
- *
The reconnection thread will periodically call the connect method of the client and tries to
- * reconnect from it. The state variable of the client controls the reconnection. If it is set
- * by the client the reconnection thread will be notified and it will trigger the reconnection if
- * it is necessary.
- */
- private class ReconnectionThread extends Thread {
-
- private final MqttClientSync parent;
- private boolean keepRunning = true;
-
- /* Config values for reconnection */
- private boolean enableReconnection;
- private int reconnectionInterval; // in milliseconds
- private int reconnectionTries; // -1 or positive number
-
- /* actual reconnection try counter */
- private int actualTry = 1;
-
- /**
- * Default constructor.
- *
- * @param parent MqttClientSync where the reconnection should be handled
- */
- ReconnectionThread(MqttClientSync parent) {
- this.parent = parent;
- this.setDaemon(true);
- this.setName("ReconnectionThread: " + parent.getClientId());
-
- enableReconnection = parent.defaultEnableReconnection;
- reconnectionInterval = parent.defaultReconnectInterval;
- reconnectionTries = parent.defaultReconnectNumberOfTries;
- }
-
- /**
- * Shutdown and cleanup procedure.
- *
- */
- synchronized void shutdown() {
- this.keepRunning = false;
- this.interrupt();
- }
-
- @Override
- public void run() {
-
- logger.trace("Reconnection timer thread started (activated: {}, Interval: {}, Tries: {})",
- enableReconnection, reconnectionInterval, reconnectionTries);
-
- while (this.keepRunning) {
-
- ELlClientState beginClientState = parent.getCurrentClientState();
- logger.trace("ReconnectionTimerThread activated: {}", currentClientState);
-
- ELlClientState currentClientState = beginClientState;
-
- if (!this.enableReconnection) {
- if (currentClientState == ELlClientState.TRY_TO_RECONNECT) {
- parent.setCurrentClientStateAndTriggerReconnect(
- ELlClientState.DISCONNECTED_FROM_BROKER
- );
- currentClientState = parent.getCurrentClientState();
- }
- }
-
- if (currentClientState == ELlClientState.CONNECTED_TO_BROKER
- || currentClientState == ELlClientState.DISCONNECTED_FROM_BROKER) {
- actualTry = 1;
- }
-
- if (this.enableReconnection && currentClientState == ELlClientState.TRY_TO_RECONNECT) {
- handleReconnection();
- }
-
- try {
- synchronized (this) {
- if (beginClientState != parent.getCurrentClientState()) {
- continue;
- }
- if (currentClientState == ELlClientState.TRY_TO_RECONNECT) {
- this.wait(reconnectionInterval);
- } else {
- this.wait();
- }
- }
- } catch (InterruptedException ign) {
- // This is expected
- }
-
- }
- }
-
- private void handleReconnection() {
-
- logger.trace("Reconnection try: {}", actualTry);
- try {
- parent.connect();
- } catch (LowLevelCommRuntimeException ex) {
- logger.debug("Reconnection try ({}) was not successful.", actualTry);
- }
-
- if (this.reconnectionTries != -1 && actualTry >= this.reconnectionTries) {
- logger.warn("Maximum number of reconnection tries exceeds. Stop to reconnect");
- parent.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
- actualTry = 1;
- } else {
- actualTry++;
- }
- }
-
-
- void setEnableReconnection(boolean enableReconnection) {
- this.enableReconnection = enableReconnection;
- }
-
- void setReconnectionInterval(int reconnectionInterval) {
-
- if (reconnectionInterval > 0) {
- this.reconnectionInterval = reconnectionInterval;
- } else {
- throw new IllegalArgumentException(String.format(
- "False reconnection interval in milliseconds (%d) was "
- + "set. The parameter should be greater than 0.", reconnectionInterval));
- }
- }
-
- void setReconnectionTries(int reconnectionTries) {
-
- if (reconnectionTries == -1 || reconnectionTries > 0) {
- this.reconnectionTries = reconnectionTries;
- } else {
- throw new IllegalArgumentException(String.format(
- "False reconnection tries (%d) want to be set. The parameter should "
- + "be -1 for infinite tries or greater than 0", reconnectionTries));
- }
- }
-
- int getReconnectNumberOfTries() {
- return this.reconnectionTries;
- }
-
- int getReconnectionInterval() {
- return reconnectionInterval;
- }
-
- boolean isEnableReconnection() {
- return enableReconnection;
- }
- }
-
- /**
- * Worker thread to handle received messages.
- *
- *
This inner class is used to implement a blocking queue for the received messages. This is
- * necessary to decouple the Mqtt receiving callback from the further message handling.
- * Especially if the message callback publishes new messages this decoupling is necessary.
- */
- private class ReceivedMessageConsumer extends Thread {
-
- /**
- * Data Bean for received messages.
- *
- */
- class ReceivedMessage {
- public String topic;
- public byte[] payload;
- }
-
- private boolean isRunning = true;
-
- private final BlockingQueue receivedMsgQueue;
-
- private IMqttReceiverCallback receiverCallback;
-
- private final Object syncMonitor = new Object();
-
- /**
- * Construct the receiving messages' handler.
- *
- * @param queueSize Size of the Queue for receiving messages
- * @param clientId client ID
- */
- ReceivedMessageConsumer(int queueSize, String clientId) {
- this.setDaemon(true);
- this.setName("ReceivedMessageConsumer: " + clientId);
- receivedMsgQueue = new ArrayBlockingQueue(queueSize);
- }
-
- /**
- * Set the callback handler for handling receiving messages.
- *
- * @param receiveCallback handler to be set
- */
- void setReceiveCallback(IMqttReceiverCallback receiveCallback) {
- synchronized (this.syncMonitor) {
- this.receiverCallback = receiveCallback;
- }
- }
-
- /**
- * Add a new received message to the worker queue.
- *
- * @param topic of the received message
- * @param payload of the received message
- */
- void addNewMessage(String topic, byte[] payload) {
-
- ReceivedMessage msg = new ReceivedMessage();
- msg.topic = topic;
- msg.payload = payload;
-
- try {
- receivedMsgQueue.put(msg);
- } catch (InterruptedException ex) {
- // expected
- }
- }
-
- /**
- * Shutdown the ReceivedMessageConsumer Thread.
- *
- */
- void shutdown() {
- isRunning = false;
- this.interrupt();
- }
-
- @Override
- public void run() {
-
- while (isRunning) {
- try {
- ReceivedMessage msg = receivedMsgQueue.take();
- logger.trace(
- "Process received message (Topic: {} IPayload: {}) No of waiting objects: {}",
- msg.topic, new String(msg.payload), receivedMsgQueue.size()
- );
-
- synchronized (this.syncMonitor) {
- if (this.receiverCallback == null) {
- logger.warn("No ReceiverCallback is set in ReceivedMessageConsumerThread.");
- continue;
- }
- this.receiverCallback.handleRawMqttMessage(msg.topic, msg.payload);
- }
- } catch (InterruptedException ignore) {
- // ignore
- }
- }
- }
- }
+public class MqttClientSync implements MqttCallback, IMqttPublisher, IConnectionHandler, IMqttSubscriber {
+
+ private static final Logger logger = LoggerFactory.getLogger(MqttClientSync.class);
+
+ // Preset properties of the class
+
+ /* Quality of service for published messages */
+ @SuppressWarnings("FieldCanBeLocal")
+ private final int qualityOfService = 0;
+
+ /* Default settings of the class */
+
+ private final String defaultBrokerAddress = "localhost";
+ private final int defaultBrokerPort = 1883;
+ private final String defaultConnectionProtocol = "tcp";
+ private final boolean defaultEnableReconnection = true;
+ private final int defaultReconnectInterval = 10;
+ private final int defaultReconnectNumberOfTries = -1;
+ private final int defaultReceivedMessagesQueueSize = 2048;
+
+ private int mqttConnectionTimeout = MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT;
+
+ /* Client ID for MQTT communication (Not the clientId of the LablinkClient) */
+ private final String clientId;
+
+ /*
+ * current address string of the broker. The broker address uses the
+ * representation of the MQTT library ({@link MqttClient}), e.g.,
+ * "tcp://localhost:1883".
+ */
+ private final String brokerAddress;
+
+ /* Mqtt synchronous client for publishing and receiving MQTT messages */
+ private MqttClient mqttClient = null;
+
+ /* Registered component to handle received messages */
+ private IMqttReceiverCallback receiveCallback;
+
+ private final Object receiveCallbackSyncMonitor = new Object();
+
+ /*
+ * Registered connection listeners that should be informed about a state change.
+ */
+ private final List connectionListeners = new ArrayList();
+
+ private final Object connectionListenersSyncMonitor = new Object();
+
+ /* Current state of the client for reconnection handling */
+ private ELlClientState currentClientState = ELlClientState.DISCONNECTED_FROM_BROKER;
+
+ /* Own timer thread which handles the reconnection functionality */
+ private final ReconnectionThread reconnectionThread;
+
+ /* Worker thread for handling received messages */
+ private final ReceivedMessageConsumer receivedMessageConsumerThread;
+
+ private final Object publishMonitor = new Object();
+
+ /**
+ * Constructor with optional configuration object
+ *
+ *
+ * The MqttClientSync can be configured with a Configuration
+ * object. This object can be memory based or it can be loaded from a
+ * resources/properties file. The configuration will only be updated or taken
+ * during the creation of the client.
+ * The following list shows the current implemented configuration properties
+ * withs their default values (between brackets):
+ *
+ * - lowLevelComm.enableReconnection (true, boolean): Switch for
+ * enabling the automatic reconnection, if the connection to the MQTT broker is
+ * lost.
+ * - lowLevelComm.reconnectInterval (10, int): Time interval between
+ * two reconnection tries in Seconds.
+ * - lowLevelComm.reconnectNumberOfTries (-1, int): Maximum number of
+ * reconnection tries. After this number of tries the client will switch to the
+ * disconnecting state. With -1 the reconnection will be try forever
+ * (infinite)
+ * - lowLevelComm.brokerAddress ("localhost", string): Address of the
+ * MQTT broker to be connected.
+ * - lowLevelComm.brokerPort (1883, int): Port of the MQTT broker to be
+ * connected.
+ * - lowLevelComm.connectionProtocol ("tcp", string): Communication
+ * Protocol for the MQTT broker. Usually tcp or ssl
+ * - lowLevelComm.mqttConnectionTimeout (30, int): Mqtt Connection
+ * Timeout in seconds
+ * - lowLevelComm.receivedMessagesQueueSize (100, int): Queue Size for
+ * incoming (received) messages. Incoming messages will be buffered in a queue
+ * and decoupled from the incoming Mqtt thread.
+ *
+ * TODO add config parameters for SSL connection in the future
+ *
+ * @param mqttClientId MQTT client identifier (not the LablinkClient identifier)
+ * For identification of the client within the broker.
+ * @param config Configuration object that is used to parametrize the MQTT
+ * client. Different parameters can be set. If no parameter
+ * is set, the client will use the default settings.
+ */
+ public MqttClientSync(String mqttClientId, Configuration config) {
+
+ if (config == null) {
+ logger.info("No configuration is set for low-level MQTT client. Use default configuration.");
+ config = new BaseConfiguration(); /* Initialize empty configuration */
+ }
+
+ logger.info("Initialize low-level MQTT client '{}'.", mqttClientId);
+ this.clientId = mqttClientId;
+
+ // Read configuration for MQTT broker address
+ String brokerAddress = config.getString("lowLevelComm.brokerAddress", defaultBrokerAddress);
+ int brokerPort = config.getInt("lowLevelComm.brokerPort", defaultBrokerPort);
+ String connectionProtocol = config.getString("lowLevelComm.connectionProtocol", defaultConnectionProtocol);
+
+ this.brokerAddress = createMqttBrokerAddress(brokerAddress, brokerPort, connectionProtocol);
+ logger.info("BrokerAddress: {}", this.brokerAddress);
+
+ this.mqttConnectionTimeout = config.getInt("lowLevelComm.mqttConnectionTimeout",
+ MqttConnectOptions.CONNECTION_TIMEOUT_DEFAULT);
+ logger.info("Connection Timeout: {}", this.mqttConnectionTimeout + "s");
+
+ // Read configuration for Reconnection handling
+ reconnectionThread = new ReconnectionThread(this);
+
+ this.reconnectionThread
+ .setEnableReconnection(config.getBoolean("lowLevelComm.enableReconnection", defaultEnableReconnection));
+ this.reconnectionThread.setReconnectionInterval(
+ config.getInt("lowLevelComm.reconnectInterval", defaultReconnectInterval) * 1000);
+ this.reconnectionThread.setReconnectionTries(
+ config.getInt("lowLevelComm.reconnectNumberOfTries", defaultReconnectNumberOfTries));
+
+ logger.info("Reconnection Settings: Enabled: {} Interval: {}ms NoOfTries: {}",
+ reconnectionThread.isEnableReconnection(), reconnectionThread.getReconnectionInterval(),
+ reconnectionThread.getReconnectNumberOfTries());
+
+ reconnectionThread.start();
+
+ // Activate worker for receiving messages
+ int queueSize = config.getInt("lowLevelComm.receivedMessagesQueueSize", defaultReceivedMessagesQueueSize);
+ receivedMessageConsumerThread = new ReceivedMessageConsumer(queueSize, this.clientId);
+ logger.info("ReceivedMessageConsumer: Queue Size: {}", queueSize);
+ receivedMessageConsumerThread.start();
+ }
+
+ @Override
+ public String toString() {
+ return "MqttClientSync(" + clientId + ", " + brokerAddress + ')';
+ }
+
+ /**
+ * Create the Address String for the MQTT broker.
+ *
+ *
+ * The method generated the address string for the MQTT broker without a
+ * validation of the input parameters.
+ *
+ * @param brokerAddress Address of the broker (e.g., "localhost")
+ * @param brokerPort Port of the broker (e.g., 1883)
+ * @param connectionProtocol protocol of the connection (e.g., "tcp" or "ssh")
+ * @return generated address string for the MQTT broker
+ */
+ private String createMqttBrokerAddress(String brokerAddress, int brokerPort, String connectionProtocol) {
+ return String.format("%s://%s:%d", connectionProtocol, brokerAddress, brokerPort);
+ }
+
+ /**
+ * Factory method for creating the Mqtt client. Can be mocked for unit tests.
+ *
+ * @param brokerAddress Address of the broker
+ * @param clientId ID of the client
+ * @return the created MqttClient
+ * @throws MqttException will be thrown by the Mqtt client creation
+ */
+ private static MqttClient createMqttClient(String brokerAddress, String clientId) throws MqttException {
+ return new MqttClient(brokerAddress, clientId, null);
+ }
+
+ /**
+ * Get the actual lowLevelMqttReceiver which contains the callback handler for
+ * received messages.
+ *
+ * @return actual used lowLevelMqttReceiver
+ */
+ public IMqttReceiverCallback getReceiveCallback() {
+ return receiveCallback;
+ }
+
+ /**
+ * Set the IMqttReceiverCallback which contains the callback handler for
+ * received messages.
+ *
+ * @param receiveCallback IMqttReceiverCallback to be set
+ */
+ public void setReceiveCallback(IMqttReceiverCallback receiveCallback) {
+ if (receiveCallback == null) {
+ throw new LlCoreRuntimeException("Set ReceiveCallback failed: Parameter is a null.");
+ }
+
+ logger.debug("Set new ReceiveCallback: {}", receiveCallback);
+ synchronized (this.receiveCallbackSyncMonitor) {
+ this.receiveCallback = receiveCallback;
+ this.receivedMessageConsumerThread.setReceiveCallback(this.receiveCallback);
+ }
+ }
+
+ /**
+ * Add a IMqttConnectionListener to the client. This connection listener will be
+ * informed by the the client, if an event regarding the connection will occur.
+ *
+ * @param listener IMqttConnectionListener to be added
+ */
+ public void addMqttConnectionListener(IMqttConnectionListener listener) {
+
+ if (listener == null) {
+ throw new LlCoreRuntimeException("Add ConnectionListener failed: Parameter is a null.");
+ }
+
+ if (!this.connectionListeners.contains(listener)) {
+ logger.debug("Add connection listener: {}", listener.toString());
+
+ synchronized (this.connectionListenersSyncMonitor) {
+ this.connectionListeners.add(listener);
+ }
+ }
+ }
+
+ /**
+ * Remove a connectionListener from the client.
+ *
+ * @param listener IMqttConnectionListener to be removed
+ */
+ public void removeConnectionListener(IMqttConnectionListener listener) {
+ logger.trace("Remove connection listener: {}", listener.toString());
+ synchronized (this.connectionListenersSyncMonitor) {
+ this.connectionListeners.remove(listener);
+ }
+ }
+
+ /**
+ * Read the registered connection listeners (for testing purposes)
+ *
+ * @return the connection listeners.
+ */
+ List getConnectionListeners() {
+ return connectionListeners;
+ }
+
+ /**
+ * Read the clientId of the broker.
+ *
+ * @return the clientId of the broker.
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ /**
+ * Read the broker address that the client uses. The broker address uses the
+ * representation of the MQTT library ({@link MqttClient}), e.g.,
+ * "tcp://localhost:1883".
+ *
+ * @return broker address of the client.
+ */
+ public String getBrokerAddress() {
+ return brokerAddress;
+ }
+
+ /**
+ * Set the actual state of the client (thread safe) and trigger the reconnection
+ * timer.
+ *
+ * @param currentClientState set the current state
+ */
+ private synchronized void setCurrentClientStateAndTriggerReconnect(ELlClientState currentClientState) {
+
+ this.currentClientState = currentClientState;
+
+ synchronized (this.reconnectionThread) {
+ this.reconnectionThread.notify();
+ }
+ }
+
+ ELlClientState getCurrentClientState() {
+ return currentClientState;
+ }
+
+ /**
+ * Shutdown the MQTT lowLevel client.
+ *
+ *
+ * This method will be used for cleanup purposes. It should be called before the
+ * program's end. It will disconnect from the broker and clean up its states.
+ */
+ public void shutdown() {
+ if (isConnected()) {
+ try {
+ disconnect();
+ } catch (LowLevelCommRuntimeException ex) {
+ logger.warn("Error while disconnecting from broker during shutdown.");
+ }
+ }
+
+ this.reconnectionThread.shutdown();
+ this.receivedMessageConsumerThread.shutdown();
+ }
+
+ @Override
+ public void connect() {
+
+ // Create MQTT client if it doesn't exist.
+ if (this.mqttClient == null) {
+ try {
+ this.mqttClient = MqttClientSync.createMqttClient(this.brokerAddress, this.clientId);
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+ }
+
+ // Ignore connect method if the client is already connected
+ if (isConnected()) {
+ return;
+ }
+
+ // Connect to broker
+ MqttConnectOptions mqttOpt = new MqttConnectOptions();
+ mqttOpt.setCleanSession(true);
+ mqttOpt.setConnectionTimeout(mqttConnectionTimeout);
+ this.mqttClient.setCallback(this);
+
+ try {
+ this.mqttClient.connect(mqttOpt);
+ this.setCurrentClientStateAndTriggerReconnect(ELlClientState.CONNECTED_TO_BROKER);
+
+ synchronized (this.connectionListenersSyncMonitor) {
+ for (IMqttConnectionListener listener : this.connectionListeners) {
+ listener.onEstablishedMqttConnection();
+ }
+ }
+ } catch (MqttException ex) {
+ this.setCurrentClientStateAndTriggerReconnect(ELlClientState.TRY_TO_RECONNECT);
+ throw new LowLevelCommRuntimeException(ex);
+ }
+ logger.info("MqttClient connected to broker {}", mqttClient.getServerURI());
+
+ }
+
+ @Override
+ public void disconnect() {
+ this.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
+ if (isConnected()) {
+
+ synchronized (this.connectionListenersSyncMonitor) {
+ for (IMqttConnectionListener listener : this.connectionListeners) {
+ listener.onDisconnectingMqttConnection();
+ }
+ }
+
+ try {
+ this.mqttClient.disconnect();
+ logger.info("MqttClient disconnected from broker {}", mqttClient.getServerURI());
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return this.mqttClient != null && this.mqttClient.isConnected();
+ }
+
+ @Override
+ public void publish(String mqttTopic, byte[] mqttPayload) {
+ if (!this.isConnected()) {
+ throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
+ }
+
+ MqttUtils.validateMqttTopic(mqttTopic);
+
+ MqttMessage mqttMsg = new MqttMessage(mqttPayload);
+ mqttMsg.setQos(this.qualityOfService);
+
+ synchronized (this.publishMonitor) {
+ try {
+ this.mqttClient.publish(mqttTopic, mqttMsg);
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void subscribe(String mqttTopic) {
+ if (!this.isConnected()) {
+ throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
+ }
+
+ try {
+ this.mqttClient.subscribe(mqttTopic);
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+
+ }
+
+ @Override
+ public void subscribe(List mqttTopics) {
+ if (!this.isConnected()) {
+ throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
+ }
+
+ for (String mqttTopic : mqttTopics) {
+ MqttUtils.validateMqttSubscription(mqttTopic);
+ }
+
+ try {
+ this.mqttClient.subscribe(mqttTopics.toArray(new String[0]));
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void unsubscribe(String mqttTopic) {
+
+ if (!this.isConnected()) {
+ throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
+ }
+ try {
+ this.mqttClient.unsubscribe(mqttTopic);
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+
+ }
+
+ @Override
+ public void unsubscribe(List mqttTopics) {
+ if (!this.isConnected()) {
+ throw new LowLevelCommRuntimeException("MqttClientSync isn't connected to a broker");
+ }
+
+ for (String mqttTopic : mqttTopics) {
+ MqttUtils.validateMqttSubscription(mqttTopic);
+ }
+
+ try {
+ this.mqttClient.unsubscribe(mqttTopics.toArray(new String[0]));
+ } catch (MqttException ex) {
+ throw new LowLevelCommRuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void connectionLost(Throwable throwable) {
+
+ logger.warn("MQTT connection lost: {}", throwable.toString());
+
+ synchronized (this.connectionListenersSyncMonitor) {
+ for (IMqttConnectionListener listener : this.connectionListeners) {
+ listener.onLostMqttConnection();
+ }
+ }
+
+ this.setCurrentClientStateAndTriggerReconnect(ELlClientState.TRY_TO_RECONNECT);
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+ logger.trace("New MQTT message received: Topic({}) ReceivedMessage()", topic, mqttMessage.toString());
+
+ synchronized (this.receiveCallbackSyncMonitor) {
+ if (this.receiveCallback != null && !mqttMessage.isDuplicate()) {
+ this.receivedMessageConsumerThread.addNewMessage(topic, mqttMessage.getPayload());
+ }
+ }
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken mqttDeliveryToken) {
+ /* not used by the synchronous client */
+ }
+
+ /**
+ * Inner enumeration to identify the current state of the client.
+ *
+ */
+ enum ELlClientState {
+ DISCONNECTED_FROM_BROKER, CONNECTED_TO_BROKER, TRY_TO_RECONNECT
+ }
+
+ /**
+ * Thread for controlling the reconnection to the broker.
+ *
+ *
+ * The client provides the functionality to automatically reconnect to a broker,
+ * if the connection can't be created or the connection was lost.
+ *
+ *
+ * The reconnection thread will periodically call the connect method of the
+ * client and tries to reconnect from it. The state variable of the client
+ * controls the reconnection. If it is set by the client the reconnection thread
+ * will be notified and it will trigger the reconnection if it is necessary.
+ */
+ private class ReconnectionThread extends Thread {
+
+ private final MqttClientSync parent;
+ private boolean keepRunning = true;
+
+ /* Config values for reconnection */
+ private boolean enableReconnection;
+ private int reconnectionInterval; // in milliseconds
+ private int reconnectionTries; // -1 or positive number
+
+ /* actual reconnection try counter */
+ private int actualTry = 1;
+
+ /**
+ * Default constructor.
+ *
+ * @param parent MqttClientSync where the reconnection should be handled
+ */
+ ReconnectionThread(MqttClientSync parent) {
+ this.parent = parent;
+ this.setDaemon(true);
+ this.setName("ReconnectionThread: " + parent.getClientId());
+
+ enableReconnection = parent.defaultEnableReconnection;
+ reconnectionInterval = parent.defaultReconnectInterval;
+ reconnectionTries = parent.defaultReconnectNumberOfTries;
+ }
+
+ /**
+ * Shutdown and cleanup procedure.
+ *
+ */
+ synchronized void shutdown() {
+ this.keepRunning = false;
+ this.interrupt();
+ }
+
+ @Override
+ public void run() {
+
+ logger.trace("Reconnection timer thread started (activated: {}, Interval: {}, Tries: {})",
+ enableReconnection, reconnectionInterval, reconnectionTries);
+
+ while (this.keepRunning) {
+
+ ELlClientState beginClientState = parent.getCurrentClientState();
+ logger.trace("ReconnectionTimerThread activated: {}", currentClientState);
+
+ ELlClientState currentClientState = beginClientState;
+
+ if (!this.enableReconnection) {
+ if (currentClientState == ELlClientState.TRY_TO_RECONNECT) {
+ parent.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
+ currentClientState = parent.getCurrentClientState();
+ }
+ }
+
+ if (currentClientState == ELlClientState.CONNECTED_TO_BROKER
+ || currentClientState == ELlClientState.DISCONNECTED_FROM_BROKER) {
+ actualTry = 1;
+ }
+
+ if (this.enableReconnection && currentClientState == ELlClientState.TRY_TO_RECONNECT) {
+ handleReconnection();
+ }
+
+ try {
+ synchronized (this) {
+ if (beginClientState != parent.getCurrentClientState()) {
+ continue;
+ }
+ if (currentClientState == ELlClientState.TRY_TO_RECONNECT) {
+ this.wait(reconnectionInterval);
+ } else {
+ this.wait();
+ }
+ }
+ } catch (InterruptedException ign) {
+ // This is expected
+ }
+
+ }
+ }
+
+ private void handleReconnection() {
+
+ logger.trace("Reconnection try: {}", actualTry);
+ try {
+ parent.connect();
+ } catch (LowLevelCommRuntimeException ex) {
+ logger.debug("Reconnection try ({}) was not successful.", actualTry);
+ }
+
+ if (this.reconnectionTries != -1 && actualTry >= this.reconnectionTries) {
+ logger.warn("Maximum number of reconnection tries exceeds. Stop to reconnect");
+ parent.setCurrentClientStateAndTriggerReconnect(ELlClientState.DISCONNECTED_FROM_BROKER);
+ actualTry = 1;
+ } else {
+ actualTry++;
+ }
+ }
+
+ void setEnableReconnection(boolean enableReconnection) {
+ this.enableReconnection = enableReconnection;
+ }
+
+ void setReconnectionInterval(int reconnectionInterval) {
+
+ if (reconnectionInterval > 0) {
+ this.reconnectionInterval = reconnectionInterval;
+ } else {
+ throw new IllegalArgumentException(String.format("False reconnection interval in milliseconds (%d) was "
+ + "set. The parameter should be greater than 0.", reconnectionInterval));
+ }
+ }
+
+ void setReconnectionTries(int reconnectionTries) {
+
+ if (reconnectionTries == -1 || reconnectionTries > 0) {
+ this.reconnectionTries = reconnectionTries;
+ } else {
+ throw new IllegalArgumentException(
+ String.format("False reconnection tries (%d) want to be set. The parameter should "
+ + "be -1 for infinite tries or greater than 0", reconnectionTries));
+ }
+ }
+
+ int getReconnectNumberOfTries() {
+ return this.reconnectionTries;
+ }
+
+ int getReconnectionInterval() {
+ return reconnectionInterval;
+ }
+
+ boolean isEnableReconnection() {
+ return enableReconnection;
+ }
+ }
+
+ /**
+ * Worker thread to handle received messages.
+ *
+ *
+ * This inner class is used to implement a blocking queue for the received
+ * messages. This is necessary to decouple the Mqtt receiving callback from the
+ * further message handling. Especially if the message callback publishes new
+ * messages this decoupling is necessary.
+ */
+ private class ReceivedMessageConsumer extends Thread {
+
+ /**
+ * Data Bean for received messages.
+ *
+ */
+ class ReceivedMessage {
+ public String topic;
+ public byte[] payload;
+ }
+
+ private boolean isRunning = true;
+
+ private final BlockingQueue receivedMsgQueue;
+
+ private IMqttReceiverCallback receiverCallback;
+
+ private final Object syncMonitor = new Object();
+
+ /**
+ * Construct the receiving messages' handler.
+ *
+ * @param queueSize Size of the Queue for receiving messages
+ * @param clientId client ID
+ */
+ ReceivedMessageConsumer(int queueSize, String clientId) {
+ this.setDaemon(true);
+ this.setName("ReceivedMessageConsumer: " + clientId);
+ receivedMsgQueue = new ArrayBlockingQueue(queueSize);
+ }
+
+ /**
+ * Set the callback handler for handling receiving messages.
+ *
+ * @param receiveCallback handler to be set
+ */
+ void setReceiveCallback(IMqttReceiverCallback receiveCallback) {
+ synchronized (this.syncMonitor) {
+ this.receiverCallback = receiveCallback;
+ }
+ }
+
+ /**
+ * Add a new received message to the worker queue.
+ *
+ * @param topic of the received message
+ * @param payload of the received message
+ */
+ void addNewMessage(String topic, byte[] payload) {
+
+ ReceivedMessage msg = new ReceivedMessage();
+ msg.topic = topic;
+ msg.payload = payload;
+
+ try {
+ receivedMsgQueue.put(msg);
+ } catch (InterruptedException ex) {
+ // expected
+ }
+ }
+
+ /**
+ * Shutdown the ReceivedMessageConsumer Thread.
+ *
+ */
+ void shutdown() {
+ isRunning = false;
+ this.interrupt();
+ }
+
+ @Override
+ public void run() {
+
+ while (isRunning) {
+ try {
+ ReceivedMessage msg = receivedMsgQueue.take();
+ logger.trace("Process received message (Topic: {} IPayload: {}) No of waiting objects: {}",
+ msg.topic, new String(msg.payload), receivedMsgQueue.size());
+
+ synchronized (this.syncMonitor) {
+ if (this.receiverCallback == null) {
+ logger.warn("No ReceiverCallback is set in ReceivedMessageConsumerThread.");
+ continue;
+ }
+ this.receiverCallback.handleRawMqttMessage(msg.topic, msg.payload);
+ }
+ } catch (InterruptedException ignore) {
+ // ignore
+ }
+ }
+ }
+ }
}
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlService.java b/src/main/java/at/ac/ait/lablink/core/service/LlService.java
index 58deca7..18e47f4 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlService.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlService.java
@@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Map.Entry;
+// TODO: Auto-generated Javadoc
/**
* Abstract class extending the Lablink service base class.
*
@@ -19,6 +20,7 @@ public abstract class LlService extends LlServiceBase {
/** The current state value of the service. */
private T curState;
+ /** The notifiers. */
protected Map> notifiers =
new HashMap>();
@@ -42,6 +44,16 @@ public LlService(boolean readonly) {
super(readonly);
}
+ /**
+ * Instantiates a new ll service.
+ *
+ * @param readonly the readonly
+ * @param exposedToPrometheus the exposed to prometheus
+ */
+ public LlService(boolean readonly, boolean exposedToPrometheus) {
+ super(readonly, exposedToPrometheus);
+ }
+
/**
* Instantiates a new Lablink service with the name provided. The resulting instance will be
* read/write enabled.
@@ -63,6 +75,13 @@ public LlService(String name, boolean readonly) {
super(name, readonly);
}
+ /**
+ * Instantiates a new ll service.
+ *
+ * @param name the name
+ * @param readonly the readonly
+ * @param exposedToPrometheus the exposed to prometheus
+ */
public LlService(String name, boolean readonly, boolean exposedToPrometheus) {
super(name, readonly, exposedToPrometheus);
}
@@ -100,6 +119,12 @@ public T getCurState() {
return curState;
}
+ /**
+ * Notify state change.
+ *
+ * @param oldVal the old val
+ * @param newVal the new val
+ */
private void notifyStateChange(T oldVal, T newVal) {
this.setGage();
@@ -146,6 +171,11 @@ public void setCurState(T curVal) {
*/
public abstract boolean set(T newval);
+ /**
+ * Adds the state change notifier.
+ *
+ * @param notifier the notifier
+ */
public void addStateChangeNotifier(IServiceStateChangeNotifier notifier) {
this.notifiers.put(this.notifiers.size() + 1, notifier);
logger.debug("Another notifier added for service [{}].", this.getName());
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java b/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java
index 4be0236..d8dbbba 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java
@@ -5,168 +5,171 @@
package at.ac.ait.lablink.core.service;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.commons.lang.RandomStringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import io.prometheus.client.Gauge;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* Base class for Lablink services.
*/
public abstract class LlServiceBase implements Cloneable {
- protected static final Logger logger = LogManager.getLogger("LlServiceBase");
-
- /** The name of the service. */
- protected String name;
-
- /** The read-only flag. */
- protected boolean readOnly = false;
- protected boolean exposeToPrometheus = false;
-
- protected Gauge serviceGage;
-
- /**
- * Instantiates a new instance with random alpha-numeric
- * name and read-only flag set to {@code false}.
- */
- public LlServiceBase() {
- this(RandomStringUtils.randomAlphabetic(10), false);
- }
-
- /**
- * Instantiates a new instance.
- *
- * @param name service name
- * @param readonly read-only flag
- */
- public LlServiceBase(String name, boolean readonly) {
- this(name, readonly, true);
- }
-
- public LlServiceBase(String name, boolean readonly, boolean exposedToPrometheus) {
- this.setName(name);
- this.setReadOnly(readonly);
- this.exposeToPrometheus = exposedToPrometheus;
- serviceGage = Gauge.build().name(name).register();
- }
-
- /**
- * Instantiates a new instance with read-only flag set to {@code false}.
- *
- * @param name service name
- */
- public LlServiceBase(String name) {
- this(name, false);
- }
-
- /**
- * Instantiates a new instance with random alpha-numeric name.
- *
- * @param readonly read-only flag
- */
- public LlServiceBase(boolean readonly) {
- this(RandomStringUtils.randomAlphabetic(10), readonly);
- }
-
- /**
- * Gets the name.
- *
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the name.
- *
- * @param name the name to set
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Checks if is read-only.
- *
- * @return read-only flag
- */
- public boolean isReadOnly() {
- return readOnly;
- }
-
- /**
- * Sets the read-only flag.
- *
- * @param readOnly read-only flag
- */
- public void setReadOnly(boolean readOnly) {
- this.readOnly = readOnly;
- }
-
- /** The properties. */
- protected Map properties =
- new HashMap();
-
- /**
- * Gets the collection of properties stored.
- *
- * @return the properties
- */
- public Map getProperties() {
- return properties;
- }
-
- /**
- * Sets the properties as a complete collection.
- *
- * @param properties the properties
- */
- public void setProperties(Map properties) {
- this.properties = properties;
- }
-
- /**
- * Adds the property.
- *
- * @param key the key
- * @param val the val
- */
- public void addProperty(ELlServiceProperties key, String val) {
- this.properties.put(key, val);
- logger.debug("Property [{}] updated with value [{}] for service [{}]", key, val, this.name);
- }
-
- /**
- * Gets the property.
- *
- * @param key the key
- * @return the property
- */
- public String getProperty(ELlServiceProperties key) {
- return this.properties.get(key);
- }
-
- @Override
- public Object clone() throws CloneNotSupportedException {
- return super.clone();
- }
-
- public Object duplicate() throws CloneNotSupportedException {
- LlServiceBase base = (LlServiceBase) super.clone();
- return base;
- }
-
- public boolean isExposedToPrometheus() {
- return this.exposeToPrometheus;
- }
-
- protected void setGage() {
-
- }
+ protected static final Logger logger = LogManager.getLogger("LlServiceBase");
+
+ /** The name of the service. */
+ protected String name;
+
+ /** The read-only flag. */
+ protected boolean readOnly = false;
+ protected boolean exposeToPrometheus = false;
+
+ protected Gauge serviceGage;
+
+ /**
+ * Instantiates a new instance with random alpha-numeric name and read-only flag
+ * set to {@code false}.
+ */
+ public LlServiceBase() {
+ this(RandomStringUtils.randomAlphabetic(10), false);
+ }
+
+ /**
+ * Instantiates a new instance.
+ *
+ * @param name service name
+ * @param readonly read-only flag
+ */
+ public LlServiceBase(String name, boolean readonly) {
+ this(name, readonly, true);
+ }
+
+ public LlServiceBase(String name, boolean readonly, boolean exposedToPrometheus) {
+ this.setName(name);
+ this.setReadOnly(readonly);
+ this.exposeToPrometheus = exposedToPrometheus;
+ serviceGage = Gauge.build().namespace("lablinksim").name(name).help(name).register();
+ }
+
+ /**
+ * Instantiates a new instance with read-only flag set to {@code false}.
+ *
+ * @param name service name
+ */
+ public LlServiceBase(String name) {
+ this(name, false);
+ }
+
+ /**
+ * Instantiates a new instance with random alpha-numeric name.
+ *
+ * @param readonly read-only flag
+ */
+ public LlServiceBase(boolean readonly) {
+ this(RandomStringUtils.randomAlphabetic(10), readonly);
+ }
+
+ public LlServiceBase(boolean readonly, boolean exposedToPrometheus) {
+ this(RandomStringUtils.randomAlphabetic(10), readonly, exposedToPrometheus);
+ }
+
+ /**
+ * Gets the name.
+ *
+ * @return the name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the name.
+ *
+ * @param name the name to set
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Checks if is read-only.
+ *
+ * @return read-only flag
+ */
+ public boolean isReadOnly() {
+ return readOnly;
+ }
+
+ /**
+ * Sets the read-only flag.
+ *
+ * @param readOnly read-only flag
+ */
+ public void setReadOnly(boolean readOnly) {
+ this.readOnly = readOnly;
+ }
+
+ /** The properties. */
+ protected Map properties = new HashMap();
+
+ /**
+ * Gets the collection of properties stored.
+ *
+ * @return the properties
+ */
+ public Map getProperties() {
+ return properties;
+ }
+
+ /**
+ * Sets the properties as a complete collection.
+ *
+ * @param properties the properties
+ */
+ public void setProperties(Map properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Adds the property.
+ *
+ * @param key the key
+ * @param val the val
+ */
+ public void addProperty(ELlServiceProperties key, String val) {
+ this.properties.put(key, val);
+ logger.debug("Property [{}] updated with value [{}] for service [{}]", key, val, this.name);
+ }
+
+ /**
+ * Gets the property.
+ *
+ * @param key the key
+ * @return the property
+ */
+ public String getProperty(ELlServiceProperties key) {
+ return this.properties.get(key);
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ public Object duplicate() throws CloneNotSupportedException {
+ LlServiceBase base = (LlServiceBase) super.clone();
+ return base;
+ }
+
+ public boolean isExposedToPrometheus() {
+ return this.exposeToPrometheus;
+ }
+
+ protected void setGage() {
+
+ }
}
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServiceDouble.java b/src/main/java/at/ac/ait/lablink/core/service/LlServiceDouble.java
index b83a71c..dc25656 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServiceDouble.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServiceDouble.java
@@ -5,33 +5,87 @@
package at.ac.ait.lablink.core.service;
+// TODO: Auto-generated Javadoc
+/**
+ * The Class LlServiceDouble.
+ */
public abstract class LlServiceDouble extends LlService {
+ /**
+ * Default constructor. (1)
+ */
+ public LlServiceDouble() {
+ super(false, true);
+ init();
+ }
+
+ /**
+ * Constructor with service name only. (2)
+ *
+ * @param name the name
+ */
public LlServiceDouble(String name) {
- super(name);
- setCurState(0.0);
- this.exposeToPrometheus = true;
+ super(name, false, true);
+ init();
}
- public LlServiceDouble() {
- super();
- setCurState(0.0);
+ /**
+ * Constructor with readonly flag only. (3)
+ *
+ * @param readonly the readonly
+ */
+ public LlServiceDouble(boolean readonly) {
+ super(readonly, true);
+ init();
}
+ /**
+ * Instantiates a new ll service double. (4)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ */
public LlServiceDouble(String name, boolean readonly) {
- super(name, readonly);
- setCurState(0.0);
- this.exposeToPrometheus = true;
+ super(name, readonly, true);
+ init();
}
-
- public LlServiceDouble(boolean readonly) {
- super(readonly);
- setCurState(0.0);
- this.exposeToPrometheus = true;
+
+ /**
+ * Instantiates a new ll service double. (5)
+ *
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServiceDouble(boolean readonly, boolean expose) {
+ super(readonly, expose);
+ init();
}
+ /**
+ * Instantiates a new ll service double. (6)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServiceDouble(String name, boolean readonly, boolean expose) {
+ super(name, readonly, expose);
+ init();
+ }
+
+ /**
+ * Sets the gage.
+ */
@Override
protected void setGage() {
this.serviceGage.set(this.getCurState());
}
+
+ /**
+ * Inits the.
+ */
+ private void init() {
+ setCurState(0.0);
+ this.serviceGage.set(this.getCurState());
+ }
}
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServiceLong.java b/src/main/java/at/ac/ait/lablink/core/service/LlServiceLong.java
index 1303ff8..a7b9f74 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServiceLong.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServiceLong.java
@@ -7,32 +7,75 @@
public abstract class LlServiceLong extends LlService {
+ /**
+ * Default constructor. (1)
+ */
public LlServiceLong() {
- super();
- setCurState(0L);
- this.exposeToPrometheus = true;
+ super(false, true);
+ init();
}
-
+
+ /**
+ * Constructor with service name only. (2)
+ *
+ * @param name the name
+ */
public LlServiceLong(String name) {
- super(name);
- setCurState(0L);
- this.exposeToPrometheus = true;
+ super(name, false, true);
+ init();
+ }
+
+ /**
+ * Constructor with readonly flag only. (3)
+ *
+ * @param readonly the readonly
+ */
+ public LlServiceLong(boolean readonly) {
+ super(readonly, true);
+ init();
}
+ /**
+ * Instantiates a new ll service long. (4)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ */
public LlServiceLong(String name, boolean readonly) {
- super(name, readonly);
- setCurState(0L);
- this.exposeToPrometheus = true;
+ super(name, readonly, true);
+ init();
+ }
+
+ /**
+ * Instantiates a new ll service long. (5)
+ *
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServiceLong(boolean readonly, boolean expose) {
+ super(readonly, expose);
+ init();
}
- public LlServiceLong(boolean readonly) {
- super(readonly);
- setCurState(0L);
- this.exposeToPrometheus = true;
+ /**
+ * Instantiates a new ll service long. (6)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServiceLong(String name, boolean readonly, boolean expose) {
+ super(name, readonly, expose);
+ init();
}
@Override
protected void setGage() {
this.serviceGage.set(this.getCurState());
}
+
+ private void init() {
+ setCurState(0L);
+ this.serviceGage.set(this.getCurState());
+ }
}
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudo.java b/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudo.java
index da035a6..29ebfed 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudo.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudo.java
@@ -88,7 +88,7 @@ public LlServicePseudo(String name, boolean readonly, boolean exposedToPrometheu
}
/**
- * Instantiates a new ll service pscudo.
+ * Instantiates a new ll service pseudo.
*
* @param name the name
*/
@@ -97,7 +97,7 @@ public LlServicePseudo(String name) {
}
/**
- * Instantiates a new ll service pscudo.
+ * Instantiates a new ll service pseudo.
*
* @param readonly the readonly
*/
@@ -105,6 +105,10 @@ public LlServicePseudo(boolean readonly) {
super(readonly);
}
+ public LlServicePseudo(boolean readonly, boolean exposedToPrometheus) {
+ super(readonly, exposedToPrometheus);
+ }
+
/**
* Gets the service data type class.
*
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoDouble.java b/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoDouble.java
index 46de784..304afb7 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoDouble.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoDouble.java
@@ -7,28 +7,75 @@
public class LlServicePseudoDouble extends LlServicePseudo {
+ /**
+ * Default constructor. (1)
+ */
public LlServicePseudoDouble() {
- super();
- this.set(0.0);
+ super(false, true);
+ init();
+ }
+
+ /**
+ * Constructor with service name only. (2)
+ *
+ * @param name the name
+ */
+ public LlServicePseudoDouble(String name) {
+ super(name, false, true);
+ init();
}
- public LlServicePseudoDouble(String name, boolean readonly) {
- super(name, readonly);
- this.set(0.0);
+ /**
+ * Constructor with readonly flag only. (3)
+ *
+ * @param readonly the readonly
+ */
+ public LlServicePseudoDouble(boolean readonly) {
+ super(readonly, true);
+ init();
}
- public LlServicePseudoDouble(String name) {
- super(name);
- this.set(0.0);
+ /**
+ * Instantiates a new ll service PseudoDouble. (4)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ */
+ public LlServicePseudoDouble(String name, boolean readonly) {
+ super(name, readonly, true);
+ init();
+ }
+
+ /**
+ * Instantiates a new ll service PseudoDouble. (5)
+ *
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServicePseudoDouble(boolean readonly, boolean expose) {
+ super(readonly, expose);
+ init();
}
- public LlServicePseudoDouble(boolean readonly) {
- super(readonly);
- this.set(0.0);
+ /**
+ * Instantiates a new ll service PseudoDouble. (6)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServicePseudoDouble(String name, boolean readonly, boolean expose) {
+ super(name, readonly, expose);
+ init();
}
@Override
protected void setGage() {
this.serviceGage.set(this.get());
}
+
+ private void init() {
+ set(0.0);
+ this.serviceGage.set(this.get());
+ }
}
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoLong.java b/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoLong.java
index e8a8f50..0c405e3 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoLong.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServicePseudoLong.java
@@ -7,21 +7,66 @@
public class LlServicePseudoLong extends LlServicePseudo {
+ /**
+ * Default constructor. (1)
+ */
public LlServicePseudoLong() {
- super();
- this.set(0L);
+ super(false, true);
+ init();
+ }
+
+ /**
+ * Constructor with service name only. (2)
+ *
+ * @param name the name
+ */
+ public LlServicePseudoLong(String name) {
+ super(name, false, true);
+ init();
}
- public LlServicePseudoLong(String name, boolean readonly) {
- super(name, readonly);
+ /**
+ * Constructor with readonly flag only. (3)
+ *
+ * @param readonly the readonly
+ */
+ public LlServicePseudoLong(boolean readonly) {
+ super(readonly, true);
+ init();
}
- public LlServicePseudoLong(String name) {
- super(name);
+ /**
+ * Instantiates a new ll service PseudoLong. (4)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ */
+ public LlServicePseudoLong(String name, boolean readonly) {
+ super(name, readonly, true);
+ init();
+ }
+
+ /**
+ * Instantiates a new ll service PseudoLong. (5)
+ *
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServicePseudoLong(boolean readonly, boolean expose) {
+ super(readonly, expose);
+ init();
}
- public LlServicePseudoLong(boolean readonly) {
- super(readonly);
+ /**
+ * Instantiates a new ll service PseudoLong. (6)
+ *
+ * @param name the name
+ * @param readonly the readonly
+ * @param expose the expose
+ */
+ public LlServicePseudoLong(String name, boolean readonly, boolean expose) {
+ super(name, readonly, expose);
+ init();
}
@Override
@@ -29,4 +74,8 @@ protected void setGage() {
this.serviceGage.set(this.get());
}
+ private void init() {
+ set(0L);
+ this.serviceGage.set(this.get());
+ }
}
diff --git a/src/test/java/at/ac/ait/lablink/core/services/ServiceIT.java b/src/test/java/at/ac/ait/lablink/core/services/ServiceIT.java
index dea2584..899cbb5 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/ServiceIT.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/ServiceIT.java
@@ -3,7 +3,7 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service;
+package at.ac.ait.lablink.core.services;
import static org.junit.Assert.fail;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/BooleanValueTest.java b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/BooleanValueTest.java
index e382129..0594e9a 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/BooleanValueTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/BooleanValueTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.datapoint.payloads;
+package at.ac.ait.lablink.core.services.datapoint.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.datapoint.payloads.BooleanValue;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DataPointPropertiesTest.java b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DataPointPropertiesTest.java
index ae141ef..e0dc08f 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DataPointPropertiesTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DataPointPropertiesTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.datapoint.payloads;
+package at.ac.ait.lablink.core.services.datapoint.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.datapoint.payloads.DataPointProperties;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DoubleValueTest.java b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DoubleValueTest.java
index e7a8af4..c09874c 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DoubleValueTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/DoubleValueTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.datapoint.payloads;
+package at.ac.ait.lablink.core.services.datapoint.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.datapoint.payloads.DoubleValue;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/LongValueTest.java b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/LongValueTest.java
index fdfd946..c71c4f1 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/LongValueTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/LongValueTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.datapoint.payloads;
+package at.ac.ait.lablink.core.services.datapoint.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.datapoint.payloads.LongValue;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/StringValueTest.java b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/StringValueTest.java
index 8dec4cf..eb6e853 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/StringValueTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/datapoint/payloads/StringValueTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.datapoint.payloads;
+package at.ac.ait.lablink.core.services.datapoint.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.datapoint.payloads.StringValue;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncClientConfigMessageTest.java b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncClientConfigMessageTest.java
index 30de710..decd3fc 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncClientConfigMessageTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncClientConfigMessageTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.sync.payloads;
+package at.ac.ait.lablink.core.services.sync.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.sync.payloads.SyncClientConfigMessage;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoReplyTest.java b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoReplyTest.java
index 5a9e7bd..e87279b 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoReplyTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoReplyTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.sync.payloads;
+package at.ac.ait.lablink.core.services.sync.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.sync.payloads.SyncGoReply;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoRequestTest.java b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoRequestTest.java
index 34192e8..e5757a5 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoRequestTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncGoRequestTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.sync.payloads;
+package at.ac.ait.lablink.core.services.sync.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.sync.payloads.SyncGoRequest;
import org.junit.Before;
diff --git a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncParamMessageTest.java b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncParamMessageTest.java
index da44994..148d82f 100644
--- a/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncParamMessageTest.java
+++ b/src/test/java/at/ac/ait/lablink/core/services/sync/payloads/SyncParamMessageTest.java
@@ -3,9 +3,10 @@
// Distributed under the terms of the Modified BSD License.
//
-package at.ac.ait.lablink.core.service.sync.payloads;
+package at.ac.ait.lablink.core.services.sync.payloads;
import at.ac.ait.lablink.core.connection.encoding.encodables.PayloadBaseTest;
+import at.ac.ait.lablink.core.service.sync.payloads.SyncParamMessage;
import org.junit.Before;
From 53106ff81d208bcdcb6d4c04db836448ab1def8c Mon Sep 17 00:00:00 2001
From: Jawad Kazmi <78086114+kazmijawad@users.noreply.github.com>
Date: Sun, 19 Dec 2021 15:35:25 +0100
Subject: [PATCH 5/5] namespace updated
---
src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java b/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java
index d8dbbba..44986ee 100644
--- a/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java
+++ b/src/main/java/at/ac/ait/lablink/core/service/LlServiceBase.java
@@ -52,7 +52,7 @@ public LlServiceBase(String name, boolean readonly, boolean exposedToPrometheus)
this.setName(name);
this.setReadOnly(readonly);
this.exposeToPrometheus = exposedToPrometheus;
- serviceGage = Gauge.build().namespace("lablinksim").name(name).help(name).register();
+ serviceGage = Gauge.build().namespace("ait-lablink").name(name).help(name).register();
}
/**