MQTT - green-ecolution/backend GitHub Wiki
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol commonly used in IoT. Its publish-subscribe model works as follows:
- Publishers send data (messages) to an MQTT broker on specific “topics.”
- Subscribers tell the broker which topics they want to listen to. When new data is published to those topics, the broker delivers it to each subscriber.
This model allows components to be decoupled. For example, sensors can publish data without needing to know which services or devices might be using it.
MQTT Example
Imagine a temperature sensor publishing to a topic called home/livingroom/temperature
every ten minutes. Another service interested in temperature changes subscribes to home/livingroom/#
. The #
wildcard means it will receive any messages under home/livingroom/
, which can include temperature, humidity, or other reading types.
How We Use MQTT
Our backend acts as an MQTT client, subscribing to the topic where TTN publishes sensor data. Below is an example of how we connect to the MQTT broker, set credentials, and listen for messages:
func (m *Mqtt) RunSubscriber(ctx context.Context) {
// Set MQTT client options including broker address and credentials
opts := MQTT.NewClientOptions()
opts.AddBroker(m.cfg.MQTT.Broker)
opts.SetClientID(m.cfg.MQTT.ClientID)
opts.SetUsername(m.cfg.MQTT.Username)
opts.SetPassword(m.cfg.MQTT.Password)
opts.OnConnect = func(_ MQTT.Client) {
slog.Info("connected to mqtt broker")
}
opts.OnConnectionLost = func(_ MQTT.Client, err error) {
slog.Error("lost connection to mqtt broker", "error", err)
}
// Create MQTT client with the configured options
client := MQTT.NewClient(opts)
// Attempt to connect to the broker
if token := client.Connect(); token.Wait() && token.Error() != nil {
slog.Error("error connecting to mqtt broker", "error", token.Error())
return
}
// Subscribe to the configured topic and handle messages with m.handleMqttMessage
token := client.Subscribe(m.cfg.MQTT.Topic, 1, m.handleMqttMessage)
go func(token MQTT.Token) {
_ = token.Wait()
if token.Error() != nil {
slog.Error("error subscribing to mqtt topic", "error", token.Error())
}
}(token)
<-ctx.Done()
slog.Info("shutting down mqtt subscriber")
}
Once subscribed, our system processes incoming messages to extract and store sensor data. The handleMqttMessage
function is responsible for handling received messages, ensuring that sensor data is properly extracted and forwarded for processing:
func (m *Mqtt) handleMqttMessage(_ MQTT.Client, msg MQTT.Message) {
// Convert the raw MQTT message to a structured sensor payload
sensorData, err := m.convertToMqttPayloadResponse(msg)
if err != nil {
slog.Error("error converting mqtt payload", "error", err)
return
}
// Map the response to the domain model
domainPayload := m.mapper.FromResponse(sensorData)
// Forward the data to the SensorService for further processing
_, err = m.svc.SensorService.HandleMessage(context.Background(), domainPayload)
if err != nil {
slog.Error("error handling sensor message", "error", err)
}
}