Introduction
MQTT (MQ Telemetry Transport) is one of the most widely used protocols in IoT.
macchina.io EDGE comes with an MQTT client implementation based on the widely-used Eclipse Paho C library. The MQTT client supports both MQTT 3.1.1 and 5. The client can be used from both C++ code and JavaScript. The following examples will show JavaScript code. For C++ usage, please refer to the reference documentation, specifically, the IoT::MQTT::MQTTClient class. The methods shown in the samples are the same for JavaScript and C++.
Setting Up MQTT Connections
In macchina.io EDGE, MQTT connections to brokers are configured in the global configuration. Multiple connections are supported. For every connection, a configuration needs to be provided.
All client configuration settings must be under the mqtt.clients.<id> configuration hierarchy, where <id> identifies the connection, so that it can be found via the service registry.
A minimal configuration could look like this:
mqtt.clients.eclipse.serverURI = tcp://mqtt.eclipseprojects.io:1883 mqtt.clients.eclipse.clientId = ${system.nodeId}
Note: ${system.nodeId} expands to the MAC address of the first active Ethernet/Wi-Fi interface on the system.
After changing the MQTT clients configuration in the global configuration file, macchina.io EDGE must be restarted. It's also possible to add or modify a MQTT client configuration using the Settings application. After changing the configuration, the MQTT bundle (io.macchina.mqtt.client) must be restarted.
Required Connection Settings
To configure a connection, at least the following configuration settings have to be provided:
- serverURI: The address (protocol, host name and port number) of the server, using an URI scheme. Use the tcp scheme for an unencrypted TCP connection (e.g.: tcp://iot.eclipse.org:1883) or the ssl scheme for an encrypted TLS connection (e.g.: ssl://ssl://data.iot.us-west-2.amazonaws.com:8883). For a WebSocket-based MQTT connection, the ws and wss (TLS) schemes can be used. This setting is also used to identify the connection if an additional list of server URIs is specified (see serverURIs).
- clientId: The client ID to be used when connecting to the broker, to identify the persistent MQTT session to (re-)establish. Can be left empty (default if not specified), but in this case the cleanStart (for MQTT 5) or cleanSession (for MQTT 3) must be set to true.
Optional Connection Settings
The following additional and optional configuration settings to control the connection behavior can be specified:
- mqttVersion: Specifies the MQTT version to use. Valid values are: 0 (default) - start with MQTT version 3.1.1, and if that fails, fall back to MQTT 3.1; 3 - connect with MQTT version 3.1 only; 4 - connect with MQTT version 3.1.1 only; 5 - connect with MQTT version 5 only.
- username: Specifies the username used for authenticating against the MQTT broker.
- password: Specifies the password used for authenticating against the MQTT broker.
- persistence.path: If set to a filesystem path, enables file-based persistence for MQTT messages. The messages will be persistet in the given directory. If not specified, or set to an empty string, disables message persistence (messages will only be kept in RAM).
- keepAliveInterval: Specifies the interval in seconds in which heartbeat messages are sent if no other activity happens on the MQTT connection. Defaults to 60.
- connectTimeout: Specifies the timeout in seconds when reconnecting to the MQTT broker. Defaults to 30.
- initialConnectTimeout: If non-zero, and if connectRetries is also non-zero, attempt to connect first with this timeout, given in seconds. Defaults to zero.
- connectRetries: Specifies the number of retries if initial connect attempt does not succeed. Defaults to zero. If this is greater than zero, the first connect attempt to the broker will be made using initialConnectTimeout if the latter is greater than zero, otherwise connectTimeout. If not successful, additional attempts will be made connecting to the broker, again using initialConnectTimeout, if greater than zero, or connectTimeout, otherwise. If retryConnectWithExponentialBackoff is true, the timeout will be doubled with each successive attempt. The final attempt will be made with such a timeout so that the total timeout specified in connectTimeout will not be exceeded.
- retryConnectWithExponentialBackoff: If set to true, and if both connectRetries and initialConnectTimeout are greater than zero, then the connect timeout, starting with initialConnectTimeout, will be doubled after every unsuccessful connection attempt. Defaults to false.
- cleanSession: If set to true, start with a clean session (MQTT 3 only). Defaults to false.
- cleanStart: If set to true, start with a clean session (MQTT 5 only). Defaults to false.
- reliable: If set to true, only one message at a time can be "in flight". Defaults to false.
- maxInflightMessages: Specifies the maximum number of messages in flight. Defaults to 0.
- serverURIs: A comma-separated (or semicolon-separated) list of URIs to try to connect to. The client will try each of the URIs specified, until it success connecting. If not specified (or empty), the URI specified in serverURI will be used. For the URI format, please see the serverURI setting.
Last Will and Testament Settings
For setting the last will and testament, the following options can be used:
- will.topic: Specifies the topic name for the last will message.
- will.message: Specifies the message content for the last will message. Currently, only UTF-8 text can be specified.
- will.qos: Specifies the QoS level of the last will message. Valid values are 0, 1 and 2, corresponding to the respective Quality of Service levels 0 - At most once delivery, 1 - At least once delivery, and 2 - Exactly once delivery. Defaults to zero.
- will.retained: Specifies the retained flag for the last will message (true of false). Defaults to false.
SSL/TLS Settings
The following settings are used for TLS connections (ssl:// or wss:// URIs).
- ssl.trustStore: Specifies the path to a file containing a list of server certificates trusted by the client, in PEM format. Typically contains a list of root certificates.
- ssl.disableDefaultTrustStore: If set to true, disables the built-in OpenSSL root certificates. Defaults to false.
- ssl.caPath: Specifies the path to a directory containing server certificates trusted by the client, in PEM format.
- ssl.keyStore: Specifies the path to a file containing the certificate chain of the client, optionally also including the client's private key, in PEM format.
- ssl.privateKey: Specifies the path to a file containing the private key of the client's certificate. Only used if ssl.keyStore does not contain the private key.
- ssl.privateKeyPassword: Specifies the password used to decrypt the private key, if it is encrypted.
- ssl.enabledCipherSuites: Specifies a list of enabled SSL/TLS cipher suites the client will present to the server during the SSL/TLS handshake. If not specified, enables all cipher suites supported by the client and TLS version. See the OpenSSL documentation for the cipher list format.
- ssl.version: Specifies the TLS version used to connect to the MQTT broker. Valid values are default (which is the default and supports all available protocols), tls1.0, tls1.1 and tls1.2. Only used for OpenSSL versions less than 1.1.
The MQTTClient Service
For every MQTT connection configured, a IoT::MQTT::MQTTClient service instance will be created and registered with the service registry. There are different ways of finding a specific MQTTClient instance. Typically, an instance will be located by its ID (which is the <id> part of the full setting path). Given the above example of a minimum configuration, the ID would be eclipse. Additionally, the client service could also be found via the serverURI.
The following JavaScript code snipped looks up a specific MQTT client.
const mqttClientRefs = serviceRegistry.find('io.macchina.mqtt.id == "eclipse"'); if (mqttClientRefs.length > 0) { var mqttClient = mqttClientRefs[0].instance(); // ... }
For a C++ example, please see the MQTTSubscribe example bundle in the samples directory.
Note that depending on the MQTT version used on the connection (3.1/3.1.1 or 5), different methods need to be used for some operations.
The following service properties will be set for the MQTTClient service and can be used in queries:
- io.macchina.protocol: always set to io.macchina.mqtt.
- io.macchina.mqtt.clientId: set to the client ID specified in the configuration.
- io.macchina.mqtt.serverURI: set to the server URI (primary) specified in the configuration.
- io.macchina.mqtt.id: set to the <id> path component of the client's configuration settings tree.
- io.macchina.mqtt.version: set to the MQTT version configured in the mqttVersion setting.
Publishing a Message
To publish a message on an MQTT 3.1 or 3.1.1 connection, use the publish() or publishMessage() method. publish() provides a simplified interface for publishing a text-only payload message on a given subject with a certain QoS level.
For MQTT version 5, use publish5() and publishMessage5() instead.
publish(topic, payload [, qos])
Publishes a message with the given payload (string) on the given topic (string), with the given QoS level (defaults to 0). Returns a delivery token (integer) that can be used to verify that the message has been delivered via the messageDelivered event. For use with MQTT 3.1 and 3.1.1 only.
Example:
mqttClient.publish('myTopic', 'Hello, world!');
publishMessage(topic, message)
Publishes a message (given as message object containing all the details) on the given topic. For use with MQTT 3.1 and 3.1.1 only. Returns a delivery token (integer) that can be used to verify that the message has been delivered via the messageDelivered event. For use with MQTT 3.1 and 3.1.1 only.
The message object supports the following properties:
- payload (string): The payload of the message. Can be Unicode text only (UTF-8). Cannot be specified together with binaryPayload.
- binaryPayload (buffer): A binary payload, given via a Buffer object. Cannot be specified together with payload.
- qos (integer): The QoS level of the message. 0 = at most once deliver, 1 = at least once delivery, 2 = exactly once delivery. Defaults to 0.
- retained (boolean): The retained flag of the message. If true, the broker will retain a copy of the message and deliver it to new subscribers to that topic.
Example:
mqttClient.publishMessage('myTopic', { payload: 'Hello, world!', qos: 1, retained: true });
publish5(topic, payload [, qos [, retained [, properties]]])
Publishes a message with the given payload (string) on the given topic (string), with the given QoS level (integer, defaults to 0), retained flag (boolean, defaults to false) and optional message properties. For use with MQTT 5 only. Returns a PublishResult (object) containing a deliveryToken and response object with additional status or error information.
Message properties are given as array of Property objects containing an identifier (defaults to 38 for user property) and exactly one of the following value properties:
- byteValue: a single byte value
- uint16Value: a single 16-bit unsigned integer value
- uint32Value: a single 32-bit unsigned integer value
- binaryValue: a buffer containing binary data
- stringValue: a UTF-8 encoded string
For a user property (identifier is 38), the name of the user property also needs to be specified as a string.
For a list of valid property identifiers, please refer to the MQTT specification of the C++ documentation.
Example:
mqttClient.publish5('myTopic', 'Hello, world!', 1, true, [ { name: 'myProperty', stringValue: 'someString' } ] );
publishMessage5(topic, message)
Publishes a message (given as message object containing all the details) on the given topic. For use with MQTT 5 only. Returns a PublishResult (object) containing a deliveryToken and response object with additional status or error information.
The message object supports the following properties:
- payload (string): The payload of the message. Can be Unicode text only (UTF-8). Cannot be specified together with binaryPayload.
- binaryPayload (buffer): A binary payload, given via a Buffer object. Cannot be specified together with payload.
- qos (integer): The QoS level of the message. 0 = at most once deliver, 1 = at least once delivery, 2 = exactly once delivery. Defaults to 0.
- retained (boolean): The retained flag of the message. If true, the broker will retain a copy of the message and deliver it to new subscribers to that topic.
- properties (array): An array of message property objects. See publish5() for details.
Example:
mqttClient.publishMessage5('myTopic', { payload: 'Hello, world!', qos: 1, retained: true, properties: [ { name: 'myProperty', stringValue: 'someString' } ] });
Subscribing to Topics and Receiving Messages
In order to receive messages, a MQTT client first needs to subscribe to one or more topics. Incoming messages are then reported via the messageArrived event.
To subscribe to a single topic, the subscribe() (MQTT 3) and subscribe5() (MQTT 5) methods can be used. To subscribe to multiple topics at once, the subscribeMany() and subscribeMany5() methods are available. Corresponding methods to unsubscribe one or many topics are available as well.
subscribe(topic [, qos])
This method subscribes to the given topic (string) with the given optional QoS level (integer 0-2, default 0). For use with MQTT 3.1 and 3.1.1 only.
Example:
mqttClient.subscribe('MyTopic', 0);
subscribe5(topic [, qos [, options [, properties]]])
This method subscribes to the given topic (string) with the given optional QoS level (integer 0-2, default 0), options and properties. For use with MQTT 5 only.
Returns a Response object with result status information.
The options object supports the following properties:
- noLocal (boolean): Set to true to not receive own publications. Set to false for original MQTT behaviour - all messages matching the subscription are received.
- retainAsPublished (boolean): To keep the retain flag as on the original publish message, set to true. If false, defaults to the original MQTT behaviour where the retain flag is only set on publications sent by a broker if in response to a subscribe request.
- retainHandling (integer): Specifies how retained messages are handled. 0 - send retained messages at the time of the subscribe; 1 - send retained messages on subscribe only if the subscription is new; 2 - do not send retained messages at all
Subscription properties, if provided, are given as array of Property objects
Example:
mqttClient.subscribe5('MyTopic', 0, { noLocal: true });
subscribeMany(topicsAndQos)
This methods subscribes to multiple topics, given in an array together with respective QoS levels, in one go. For use with MQTT 3.1 and 3.1.1 only.
Example:
mqttClient.subscribeMany([ { topic: 'MyTopic', qos: 1 }, { topic: 'AnotherTopic', qos: 0 } ]);
subscribeMany5(topicsAndQoS [, options [, properties]])
This methods subscribes to multiple topics, given in an array together with respective QoS levels, in one go. For use with MQTT 5 only.
Returns a Response object with result status information.
For options and properties arguments, please see subscribe5().
Example:
mqttClient.subscribeMany5( [ { topic: 'MyTopic', qos: 1 }, { topic: 'AnotherTopic', qos: 0 } ], { noLocal: true } );
unsubscribe(topic)
Unsubscribes from a single topic. For use with MQTT 3.1 and 3.1.1 only.
unsubscribe5(topic [, properties])
Unsubscribes from a single topic. For use with MQTT 5 only.
unsubscribeMany(topics)
Unsubscribes from multiple topics given in an array in one go. For use with MQTT 3.1 and 3.1.1 only.
Example:
mqttClient.unsubscribeMany(['MyTopic', 'AnotherTopic']);
unsubscribeMany5(topics [, properties])
Unsubscribes from multiple topics given in an array in one go. For use with MQTT 5 only.
messageArrived
Received messages are delivered via the messageArrived event. A MessageArrivedEvent object will be provided to the event handler, containing the following properties:
- topic (string): The topic the messages was published to.
- message: An message object containing the message payload and other information. See publishMessage() for more information.
- dup (boolean): The dup flag indicates whether or not this message is a duplicate. It is only meaningful when receiving QoS1 messages. When true, the client application should take appropriate action to deal with the duplicate message.
Example:
mqttClient.on('messageArrived', ev => { logger.information('Message arrived on topic "%s" with payload "%s".', ev.data.topic, ev.data.message.payload); });
Important: When subscribing to an event, make sure that the object that provides the event is protected from the JavaScript garbage collector. This is typically done by storing the object in a global variable. Subscribing to an event on an object stored in a local variable (in a function or block) declared with let or const will typically work for some time, until the garbage collector decides to remove the object. After this, no further events will be delivered.
Connection management
The MQTTClient service automatically manages the connection to the broker. The connection will be established the first time the service is used. If the connection is lost, the service will automatically attempt to reconnect. The status of the connection is reported via events.
It's also possible to explicitly initiate the connection to the broker. This can only be done if the connection has not already been established. This makes sense for MQTT version 5, where properties can be specified for the connection and also for the last will and testament. In order to specify these properties, the connect5() and connectAsync5() methods can be called before any other methods are called.
connect()
If not already connected, will connect to the MQTT broker to establish a MQTT 3.1 or 3.1.1 connection. This method blocks until the connection has been established, or an error occurs.
Returns a ConnectionInfo object providing details about the connection, such as the used MQTT version (mqttVersion property) or whether a previously established session was present (sessionPresent).
connectAsync()
If not already connected, will connect to the MQTT broker to establish a MQTT 3.1 or 3.1.1 connection. The connection will be made in a separate thread, and the successful connection will be reported via the connectionEstablished event.
connect5([connectProperties [, willProperties]])
If not already connected, will connect to the MQTT broker to establish a MQTT 5 connection. This method blocks until the connection has been established, or an error occurs.
Returns a ConnectionInfo object providing details about the connection, such as the used MQTT version (mqttVersion property) or whether a previously established session was present (sessionPresent).
MQTT properties (array) can be specified for the connection and the last will/testament. These will also be used in subsequent automatic reconnects.
connectAsync5([connectProperties [, willProperties]])
If not already connected, will connect to the MQTT broker to establish a MQTT 5 connection. The connection will be made in a separate thread, and the successful connection will be reported via the connectionEstablished event.
MQTT properties (array) can be specified for the connection and the last will/testament. These will also be used in subsequent automatic reconnects.
disconnect(timeout)
Disconnects from the server. For use with MQTT 3.1 and 3.1.1 only. Explicitly disconnecting from the server using this method will also disable automatic reconnect.
In order to allow the client time to complete handling of messages that are in-flight when this function is called, a timeout period is specified (in milliseconds). When the timeout period has expired, the client disconnects even if there are still outstanding message acknowledgements. The next time the client connects to the same server, any QoS 1 or 2 messages which have not completed will be retried depending on the clean session settings for both the previous and the new connection.
disconnect5(timeout [, reason [, properties]])
Disconnects from the server. For use with MQTT 5 only. Explicitly disconnecting from the server using this method will also disable automatic reconnect. MQTT V5 reason code and properties can be given.
In order to allow the client time to complete handling of messages that are in-flight when this function is called, a timeout period is specified (in milliseconds). When the timeout period has expired, the client disconnects even if there are still outstanding message acknowledgements. The next time the client connects to the same server, any QoS 1 or 2 messages which have not completed will be retried depending on the clean session settings for both the previous and the new connection.
connectionEstablished
This event is fired when a connection to the broker has been established. A ConnectionEstablishedEvent object containing a connectionInfo object property is provided to the event handler.
This event is supported with all MQTT versions.
connectionLost
This event is fired when the connection to the server has been lost due to a network issue, or due to the server unexpectedly dropping the connection.
A ConnectionLostEvent object containing a cause property (string) describing the cause for the connection loss (if known) is passed to the event handler.
This event is supported with all MQTT versions.
disconnected
This event is fired when the server orderly closes the connection from the client by sending a disconnect MQTT packet. A DisconnectedEvent object containing a reasonCode and properties is passed to the event handler.
This event is only supported with MQTT 5.