Clustering MQTT - padogrid/padogrid GitHub Wiki

◀️ Installing Building Mosquitto 🔗 Cluster Archetypes ▶️


Starting Mosquitto Cluster

PadoGrid clusters Mosquitto brokers like any other clustering products. The same cluster lifecycle management commands apply to Mosquitto brokers. For example, the following commands create and run a new default cluster named mymosquitto consisting of three (3) Mossquitto brokers.

create_cluster -product mosquitto
switch_cluster mymosquitto
start_cluster

Running Client Commands

PadoGrid includes two (2) MQTT client commands for publishing and subscribing to virtual clusters: vc_publish and vc_subscribe. They work similarly as Mosquitto's mosquitto_pub and mosquitto_sub except that they can publish and subscribe to one or more brokers via a virtual cluster. They are also fully integrated with PadoGrid's MQTT clusters such that they automatically create a virtual cluster over a physical cluster. This feature is shown in the following examples.

Terminal 1 Terminal 1

# Virtual cluster subscription
vc_subscribe -t test/#

# Mosquitto equivalent of vc_subscribe. mosquitto_sub can only subscribed to a single broker.
mosquitto_sub -p 1883 -t test/#
mosquitto_sub -p 1884 -t test/#
mosquitto_sub -p 1885 -t test/#

Terminal 2 Terminal 2

# Virtual cluster publication
vc_publish -t test/topic1 -m "hello, world"

# Mosquitto equivalent of vc_publish. mosquitto_pub cannot achieve this except to conditionally
# target each endpoint.
for port in 1883 1884 1885; do
   if [ "$(mosquitto_pub -p $port -t test/topic1 -m "hello, world" 2>&1 > /dev/null)" == "" ]; then
       break;
   fi
done

The above commands publish and subscribe to the mymosquitto cluster started in the previous section. There are options to both commands for explcitly specifying the cluster and endpoints as shown in the examples below.

# Messaging by cluster name
vc_subscribe -cluster mymosquitto -qos 2 -t test/#
vc_publish -cluster mymosquitto -qos 2 -t test/topic1

# Messaging by endpoints
vc_subscribe -endpoints tcp://localhost:1883-1885  -qos 2 -t test/#
vc_publish -endpoints tcp://localhost:1883-1885  -qos 2 -t test/topic1

# Messaging by configuration file
vc_subscribe -config mqtt5-client.yaml -qos 2 -t test/#
vc_publish -config mqtt5-client.yaml -qos 2 -t test/topic1

Running perf_test

PadoGrid also provides the perf_test app for measuring MQTT broker performance metrics. As with other products, you create and run an instance of the perf_test app as follows.

Open two terminals and execute the following.

Terminal 1 Terminal 1

create_app -product mosquitto -app perf_test -name perf_test_mosquitto
cd_app perf_test_mosquitto/bin_sh
./subscribe_topic test/#

Terminal 2 Terminal 2

cd_app perf_test_mosquitto/bin_sh
./test_group -run

By default, the test_group script runs the etc/group.properties file, which is configured to concurrently publish data to two topics, test/topic1 and test/topic2. The first topic publishes 1 KiB payloads, and the second topic publishes 10 KiB payloads. You can view the throughput and latency metrics recorded in the results directory. You should see the subscriber (Terminal 1) receiving messages from the two topics.

Virtual Clusters

To provide a clustering service for Mosquitto, PadoGrid provides the HaMqttClient API that can be used to create virtual clusters from the client applications. HaMqttClient is driven by Paho, allowing a client application to cluster any Paho supported MQTT brokers such as Mosquitto.

Creating a virtual cluster is typically done in a configuration file. For example, the perf_test app is configured as follows.

persistence:
  className: MqttDefaultFilePersistence
  properties:
    - key: path
      value: ${persistence.dir}
clusters:
  - name: ${cluster.name}
    connections:
      - connection:
          serverURIs: [tcp://localhost:1883-1885]

Virtual Cluster

See Default HaMqttClient Configuration File for details.

The persistence element defines the Paho's persistence settings. HaMqttClient supports Paho's MemoryPersistence and MqttDefaultFilePersistence along with application custom classes. The properties element defines the arguments to the persistence class. In the example, it sets the path property to the system property persistence.dir, which perf_test supplies with the JVM argument -Dpersistence.dir.

✏️ You can pass in system properties using the ${} notation and environment variables using the ${env:} notation in the configuration file.

The clusters element defines the virtual cluster name, which must be unique within the configuration file.

The connection element is equivalent to Paho's MqttConnectionOptions. You can configure any options supported by MqttConnectionOptions using this element.

The most important connection option is serverURIs. It defines a list of endpoints that form a virtual cluster. By default, HaMqttClient sets this option to [tcp://localhost:1883-1885] as shown above. This effectively clusters three (3) brokers with port numbers, 1883, 1884, and 1885 running on localhost.

The serverURIs option supports the range wildcard, -, for listing ports and the last octet of IPv4 addresses.

tcp|ssl|ws|wss://<host-name>[:<port-range>]
tcp|ssl|ws|wss://<IPv3>.<octet-range>[:port-range]

The following shows the use of the range wildcard.

Supported

# defaults to port 1883
tcp://10.1.2.1-10
# ports specified
tcp://10.1.2.1-10:1883
tcp://10.1.2.1-10:1883-1885
tcp://test.mosquitto.org:1883-1884

Not Supported

tcp://10.1.2-5.1
tcp://10.1.2-5.1
tcp://10.1-2.2.1
tcp://10-12.1.2.1
# IPv6 is not supported

HaMqttClient supports four protocols as Paho: tcp, ssl, ws, and wss.

serverURIs can mix different protocols. For example, the following creates a virtual cluster with tcp and websockets.

clusters:
  - name: ${cluster.name}
    connections:
      - connection:
          # Free Internet brokers
          serverURIs: [tcp://test.mosquitto.org:1883, ws://test.mosquitto.org:8080, tcp://broker.emqx.io:1883, ws://broker.emqx.io:8083, tcp://broker.hivemq.com:1883, ws://broker.hivemq.com:8000]

Working with Multiple Clusters

You can configure any number of virtual clusters limited only by your machine's available system resources. By default, each cluster attempts to make connections to all brokers. If you have too many clusters and brokers, then you could potentially run out of system resources such as file descriptors, CPUs, and memory.

HaMqttClient is built on top of Paho's MqttClient, which spawns four (4) threads per endpoint connection. Having too many endpoints in a virtual cluster can lead to a larger number of threads. For a large number of endpoints, it is recommnended that you split them into multiple virtual clusters and aggregate them via bridges (See Bridging Clusters.)

The clusters element takes an array of clusters. It is important that each cluster has a unique name, otherwise, one will overwrite the other. The following shows an example.

clusters:
  - name: mqtt-test
    connections:
      - connection:
          serverURIs: 
            - tcp://test.mosquitto.org:1883
            - ws://test.mosquitto.org:8080
            - tcp://broker.emqx.io:1883
            - ws://broker.emqx.io:8083
            - tcp://broker.hivemq.com:1883
            - ws://broker.hivemq.com:8000
  - name: mylocal
    connections:
      - connection:
          serverURIs: [tcp://localhost:1883-1885]
  - name: enterprise
    connections:
      - connection:
          serverURIs: [tcp://10.1.2.1-5]

Multiple Clusters

The example above has three (3) clusters defined. The mqtt-test cluster clusters the brokers available on the Internet. These brokers are free to use and maintained by the respective companies. The mylocal cluster clusters a set of localhost brokers. The enterprise cluster clusters a range of IP addresses with the default port, 1883.

Using the HaMqttClient API, you obtain each instance by name as follows.

HaMqttClient mqttTestCluster = HaCluster.getHaMqttClient("mqtt-test");
HaMqttClient mylocalCluster = HaCluster.getHaMqttClient("mylocal");
HaMqttClient enterpriseCluster = HaCluster.getHaMqttClient("enterprise");

Once you have an HaMqttClient instance, you can use the same Paho's MqttClient API to access the cluster. There is no difference in API between MattClient and HaMqttClient as shown in the example below. The primary benefit of HaMqttClient is that it provides a single entry point to a cluster consisting of many brokers.

myTestCluster.subscribe(new MqttCallback() {...});
myTestCluster.publish("test/topic1", "hello, world".getBytes(), 2, false);
myTestCluster.disconnect();
myTestCluster.close();

Bridging Clusters

Mosquitto supports broker bridges for forwarding messages from broker to broker. Bridging is important for those applications that do not have direct access to edge brokers. By bridging a central broker and and edge brokers, applications can send or receive data to or from the edge brokers via the central broker.

HaMqttClient also supports bridging similar to Mosquitto except that it is done from the client side.

  • Bridge clusters as opposed to brokers
  • Filter topics to control both incoming and outgoing messages
  • Override QoS per topic filter

The subsequent sections show how applications can access edge clusters via an enterprise (central) cluster.

Example 1: Bridging Incoming Messages

This example shows how to configure the application to receive filtered messages from the edge cluster and forward them to the enterprise cluster. It does not override the edge cluster's QoS. The default QoS value of -1 passes on the subscriber's QoS to the publisher that forwards the messages.

clusters:
  - name: edge
    connections:
      - connection:
          serverURIs: [tcp://localhost:31001-31020]
    bridges:
      # 'in' forwards incoming messages to the target cluster, 'enterprise'
      in:
        - cluster: cluster-enterprise
          topicFilters: [test/#]
          qos: -1
  - name: enterprise
    connections:
      - connection:
          serverURIs: [tcp://localhost:32001-32010]

A common use case for bridging incoming messages is to place the bridge app in DMZ between firewalls to provide access to the edge devices to enterprise applications as shown in the diagram below. However, this way of bridging does not provide the HA service since the bridge app itself cannot be clustered. Data loss may occur if the bridge app fails.

MQTT In Bridge

Example 2: Bridging Outgoing Messages

This example shows how to configure the application to send the messages to the edge cluster which in turn sends filtered messages directly to the enterprise cluster. It overrides the edge cluster's QoS with 2.

clusters:
  - name: edge
    connections:
      - connection:
          serverURIs: [tcp://localhost:31001-31020]
    bridges:
      # 'out' sends outgoing messages to the target cluster, 'enterprise'
      out:
        - cluster: enterprise
          topicFilters: [test/#]
          qos: 2
  - name: cluster-enterprise
    connections:
      - connection:
          serverURIs: [tcp://localhost:32001-32010]

With outgoing messages, edge devices themselves directly bridge the enterprise cluster as shown in the diagram below. This means, unlike incoming messages, outgoing messages fully utilize HA. Data loss will not occur until edge devices themselves fail.

MQTT In Bridge

Understanding HA Support for Bridged Clusters

Because the virtual clusters are created and maintained by each application, the HA support is tightly coupled with applications.

For incoming bridges, if the application goes down, then the enterprise application connected to the enterprise cluster stops receiving data. The edge applications connected to the edge cluster may continue to send data but they never reach the enterprise application since the bridge that the application hosts has been taken out. This leads to data loss. This also means incoming bridges do not support HA.

For outgoing bridges, since the application itself is publishing data, if the application goes down, no additional data will be sent. This means there is no data loss since data is no longer published. Hence, outgoing bridges fully support HA.

In addition to bridged clusters, we can also leverage bridged brokers to provide HA. This is achieved by creating sticky clusters. But first, before we dive into sticky clusters, we need to define FoS levels.

FoS (Failover Service)

FoS provides a failover service over MQTT brokers. FoS has four (4) levels of service: 0, 1, 2, and 3. FoS is primarily used for creating sticky clusters to provide HA services over bridged brokers. Bridged brokers forward messages from broker to broker whereas bridged clusters forward messages from cluster to cluster.

❗ If FoS is not set or set to an invalid value, then it defaults to 0. With FoS 0, the virtual cluster must not contain bridged brokers. Otherwise, the application will receive duplicate messages since it subscribes to all the brokers in the cluster.

FoS 0 - all subscribers, all connections

FoS 0 is the default level. It makes subscriptions and connections to all endpoints. Unless you want to create a sticky cluster, this level should be used.

FoS 0

FoS 1 - one subscriber, two connections

This level opens connections to two endpoints, but sticks topic subscriptions only to one endpoint. No subscriptions are made to the other endpoint. Only when the sticky endpoint fails, subscriptions are made to the other endpoint at that time. Failover is quick but there is a subscription delay which could lead to a brief period of data loss.

FoS 1

FoS 2 - two subscribers, two connections

Like FoS 1, this level opens two (2) connections but increases the number of endpoints that make subscriptions to two (2). That means, with FoS 2, HaMqttClient receives duplicate messages, one from each endpoint. HaMqttClient, however, sticks to one of the endpoints and delivers data only from that endpoint to the application. It discards the data from the other endpoint. The other endpoint is a standby and replaces the sticky endpoint upon failure. Failover is immediate with no data loss.

FoS 2

FoS 3 - select subscribers, select connections

FoS 3 allows configuration of both subscriberCount and liveEndpointCount. By default, FoS 3 makes subscriptions to all of the endpoints. This is rather expensive and wasteful if there are many endpoints. The higher the counts the higher the degree of HA but at the expense of increased duplicate data and network traffic. Hence, in this level, it is important that you adjust subscriberCount and liveEndpointCount to minimally impact the application performance.

FoS 3

FoS Parameters

FoS simplifies the subscription configuration by overriding the following parameters.

Parameter Default Value Description
fos 0 Failover Service level. Valid values are 0, 1, 2, 3.
subscriberCount -1 Maximum number of clients allowed to make suscriptions. -1 for all clients.
liveEndpointCount -1 Maximum number of client connections allowed. -1 for all clients.

FoS overrides the default values of the above parameters as shown in the table below. For example, fos: 1 sets subscriberCount: 1 and liveEndpointCount: 2. These parameters are hard wired and not configurable for FoS 0, 1, and 2. To configure them, select FoS 3. If FoS is not set or invalid, then HaMqttClient defaults to FoS 0.

fos subscriberCount liveEndpointCount Parameters Configurable?
0 -1 -1 No
1 1 2 No
2 2 2 No
3 -1 -1 Yes

The above table shows the fixed values that cannot be configured in bold font.

Sticky Clusters

A sticky cluster is a special cluster that sticks to a single broker for all subscriptions. By sticking to one broker, it blocks the delivery of data from other brokers in the cluster. This pattern is particularly useful for bridging brokers with HA intact.

To create a sticky cluster, we also need help from the MQTT brokers. The brokers must be capable of providing the following bridging services.

  1. Ability to bridge multiple brokers.
  2. Ability to prevent cyclic (looping) messages. The bridged brokers must not send messages back to the originating broker.

Mosquitto supports both services but it can only bridge two (2) brokers at a time. Unfortunately, this limits the maximum cluster size to two (2) endpoints.

With bridged brokers and FoS set to 1, 2, or 3, we can create a virtual cluster with HA fully intact. These FoS levels stick to a single broker guaranteeing the delivery of non-duplicate data to the application.

A sticky cluster is created by configuring fos and serverURIs. For example, the following creates a sticky cluster named enterprise with FoS 2.

clusters:
  - name: enterprise
    fos: 2
    connections:
      - connection:
          serverURIs: [tcp://localhost:32001-32002]

Sticky Cluster

To complete the sticky cluster, the brokers themselves must be bridged. The following Mosquitto example configures the tcp://localhost:32001 broker to bridge tcp://localhost:32002.

The first broker's mosquitto.conf file:

connection bridged-01
address localhost:32002
remote_clientid bridged-01
cleansession false
notifications false
start_type automatic
topic # both 0
bridge_protocol_version mqttv50
try_private true

❗ The second broker must not be configured with a bridge; otherwise, cyclic (looping) messages will occur.

FoS Selection Guidelines

It is important that you select the right FoS level for your application based on the following guidelines.

  • For non-bridged brokers, always select FoS 0 (default).

  • For bridged brokers, select FoS 2 to avoid data loss during failover.

  • For bridged brokers, select FoS 3 and apply Archetype 7 to attain a higher level of availability and scalability than FoS 2.

  • For bridged brokers, select FoS 1 if your application has resource constraints and can tolerate a brief period of data loss during failover.


◀️ Installing Building Mosquitto 🔗 Cluster Archetypes ▶️

⚠️ **GitHub.com Fallback** ⚠️