MQTT - dgt30-eng/General GitHub Wiki

Connection with the DGT3.0 platform trough MQTT

Requirements

The following requirements must be fulfilled to establish a connection to the platform, otherwise the connection will be rejected.

  • Register the IP from which the connection is requested (Consumer/Publisher).
  • Obtain the MQTT credentials (User and password).
  • Obtain a Client ID

To meet these requirements it will be necessary to contact the DGT3.0 platform through [email protected].

Previous considerations

To make the connection to the MQTT broker it is possible to use one of the many languages that have specific libraries for it and even directly with libraries such as Mosquito MQTT.

In this case, we are going to see an example made with Java and SpringBoot.

We will use the paho eclipse library for the connection, for this we will include it in a Maven project, adding the following dependency:

<dependency>
     <groupId>org.eclipse.paho</groupId>
     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     <version>1.2.5</version>
</dependency>

Consumer MQTT example

Client configuration

When using the library, the first thing we must do to receive messages from an MQTT broker is to obtain an implementation of the IMqttClient interface. This interface contains all the methods required by an application to establish a connection with the server and receive messages.

The library has two implementations of this interface, one asynchronous (MqttAsyncClient) and one synchronous (MqttClient). In our case, we will focus on the synchronous version.

The configuration itself is a two-step process: first we create an instance of the MqttClient class and connect it to the server, then we receive and handle messages through the MQTT channel.

Creating a new MQTT instance and connecting to the server

First, we will create a Bean with the MQTT broker configuration. For this example we create it in a configuration class.

To connect our instance we will make use of the connect() method, optionally passing an MqttConnectOptions instance that allows us to customize some aspects of the protocol. In particular, we can use those options to add additional information such as security credentials, session recovery mode, reconnection mode, etc. The MqttConnectionOptions class exposes those options as attributes that we can set using the methods provided by the library itself.

URL format: protocol://url:port

ex: ssl://preproduction.cmobility30.es:8883

@Bean
    public void enableMqtt() throws MqttException {
        try (MqttClient client = new MqttClient(
            URL,  //URL of the platform in the previously defined format (String)
            "client_read_XX",  
            new MemoryPersistence())) {

            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(userDGT);   //User provided by the DGT3.0 platform. 
 (String)
            mqttConnectOptions.setPassword(passwordDGT.toCharArray());  //Password provided by DGT3.0 platform (String)
            mqttConnectOptions.setCleanSession(false);   //Keep connection status
            mqttConnectOptions.setAutomaticReconnect(true);   //Reconnect in case of network failure

            client.connect(mqttConnectOptions);
            client.setCallback(listenerMq);   //Instance of the class that implements “MqttCallback”, as described below
            client.subscribe(topic);   //Topic from which messages will be received
        } catch (MqttException e) {
            LOGGER.error("MQTTException: "+ e.getMessage());
        }
    }

MQTT listener creation

It will be necessary to create a class that implements the “MqttCallback” class, which will be used to receive the messages through the MQTT channel which is previously configured. This listener will look like the following:

import org.eclipse.paho.client.mqttv3.MqttCallback;

@Component
public class ListenerMq implements MqttCallback {

    @Override
    public void connectionLost(Throwable cause) {
        LOGGER.error("Connection lost with mqtt: " + cause);
        //Add logic in case of connection loss
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws InterruptedException {
        LOGGER.info(topic + ": " + message.toString());
        //Add logic in the case of receiving a message from the topic
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        //Add logic
    }
}

MQTT publisher example

In this case we are going to show an example of a publisher to the DGT3.0 platform through the MQTT interface. For this we will also use Java on the springboot framework.

Client configuration

First, we are going to configure the connection to the broker. To do this, we will use a function similar to the following inside a configuration class:

    private final String brokerUrl = "ssl://preproduction.cmobility30.es:8883"; 
    @Bean
    public MqttAsyncClient mqttClient() throws MqttException {
        MqttAsyncClient client = new MqttAsyncClient(brokerUrl, MqttClient.generateClientId());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(userDGT);  //User provided by the DGT3.0 platform (String)
        options.setPassword(passwordDGT.toCharArray());  //Password provided by DGT3.0 platform (String)

        client.connect(options, null, new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                System.out.println("Connected to broker: " + brokerUrl);
            }
    
            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                System.err.println("Failed to connect to broker: " + exception.getMessage());
            }
        });
        return client;
    }

Once the microservice has been started, if everything has gone well, the following log should be displayed:

Connected to broker: ssl://preproduction.cmobility30.es:8883

Sending messages to the platform

For this example we have exposed a service class, which injects the MqttAsyncClient class and is able to publish messages to a topic with the publish method.

@Service
public class MqttPublisherService {

    @Autowired
    private MqttAsyncClient mqttClient;

    public void publish(String topic, String payload) {
        try {
            mqttClient.publish(topic, new MqttMessage(payload.getBytes()));
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

The following is an example of a controller where we use the above to publish a message:

@RestController
public class MqttController {

    @Autowired
    private MqttPublisherService mqttPublisherService;

    @PostMapping("/publish")
    public void publish() {
        mqttPublisherService.publish(topic, message);
    }
}

Example MQTT Explorer connection

In order to be able to test the connection to the DGT3.0 platform in a quick and easy way, it is possible to connect using the MQTT Explorer tool.

  • The configuration for the connection should be similar to the one provided in the following image, but using the credentials previously provided by the platform.

    image

  • Then, in the ‘Advanced’ tab, we will be able to configure the reading topics, as well as the corresponding client ID..

    image

  • Once we have logged in, we will be able to view the messages that are being posted in the topics, as shown in the following image.

    image


These examples show a possible implementation for the connection of both a client and a server to an MQTT broker, as well as an example of a connection with the MQTT explorer tool for testing purposes.

Further information can be found here.

⚠️ **GitHub.com Fallback** ⚠️