Inter thread communication bus - zephyr-bus/zbus GitHub Wiki

Introduction

Embedded systems are everywhere, playing different roles in our lives. Embedded software grows in complexity as technology evolves over time. To keep up with the increasing complexity, practitioners and researchers put effort into launching numerous development solutions, such as frameworks, libraries, RTOSes, and so forth. Zephyr is one of these solutions. However, as with the majority of popular RTOSes (see table below), Zephyr has only a limited set of Inter-Process Communication (IPC) mechanisms. This makes development hard when dealing with multi-threaded systems. Because of that, developers sometimes add coupled code to decrease the number of threads of the solution. The communication between threads becomes a nightmare when the system requires a more complex inter-thread communication approach. For example, there is no straightforward way of making a decoupled many-to-many thread communication in Zephyr. Linux solves part of that problem using D-Bus, a message bus system, as a simple way for applications to talk to one another. I suggest an alternative message bus with a focus on speed, memory, and energy consumption.

RTOS Shared memory Message Queue Stream/Pipe Mailbox Event Flag/Signal Comments Reference
Amazon FreeRTOS true true true false true   https://freertos.org/a00106.html
Mbed true true false false true   https://os.mbed.com/docs/mbed-os/v6.15/apis/scheduling-rtos-and-event-handling.html
RIOT true true false false false   https://doc.riot-os.org/group__core.html
NuttX true true true false true FILE stream is very powerful https://nuttx.apache.org/docs/latest/reference/index.html
Zephyr true true true true true Event flag is made by k_poll_signal https://docs.zephyrproject.org/3.0.0/reference/kernel/index.html#data-passing
Azure RTOS (ThreadX) true true false false true   https://docs.microsoft.com/pt-br/azure/rtos/threadx/chapter4

Problem description

Based on my knowledge, Zephyr does not offer any IPC able to perform a many-to-many thread communication. There is no straightforward way of making a decoupled many-to-many thread communication in Zephyr. The closest IPC available for that is the mailbox, but it does not deliver the message to many in fact. It delivers to the first reader. Developers must reinvent the wheel all the time when they need many-to-many thread communication.

Proposed change

Add a fast and decoupled inter-process communication mechanism (message bus, which hereafter is referred to as zbus, from "Zephyr-bus") to Zephyr to enable a many-to-many communication model. Using this IPC, developers will be able to easily make thread communication even for a many-to-many model. Besides the communication capabilities, in the way it is being proposed, the bus will offer asynchronous, and structured communication with time, space, and synchronization decoupling between the communication entities. Even ISRs (Interrupt Service Routines) will be able to send messages through the bus. The bus, in the current implementation, is made by a static shared-memory portion, semaphores, and message queues working together.

Detailed RFC

This RFC describes zbus, a message bus aimed to improve Zephyr threads' communication capabilities. It is implemented based on the publish/subscribe pattern. Message data is transmitted using managed shared-memory approach which provides well-suited performance. The figure below provides an overview of zbus.

The operations that can be done by threads are only publishing, subscribing, and reading channels. Publish and read operations are done by a thread that wants to change or read the channel message, respectively. In order to know if a channel had its message changed, a thread can subscribe to a channel and receive this notification. Publishing and reading can be done in execution time, but the subscription must be done in compile time

Proposed change (Detailed)

The Figure below presents the internal details of zbus. For simplicity purposes, there is only an internal thread to keep track of the changes and notify the subscribers when some channel is published. The description of each component is as follows:

  • Event dispatcher is the thread responsible for monitoring the change queue and sending the notifications to the subscribers' threads. The subscribers are mapped to channels at the subscribers' table;
  • Channel changed queue is a message queue where, after publishing, the changed channel's id will be passed. The Event dispatcher uses this message queue to keep the subscribers updated about the channels;
  • Channels structure stores all the channel data. This is a shared memory portion that would be globally accessed, but not directly;
  • Channel is formed by metadata and message:
    • Metadata is the information related to a channel to help the bus to work. Channels can be:
      • Read-only: the channel cannot be published. The only way of changing that is by setting an initial message value. It is useful for constants, like version;
      • On change: with this flag set to true, the event dispatcher will only notify the threads if the channel's message changes in fact. If someone publishes to the same channel the same message value, the dispatcher will ignore and not notify the subscribers;
      • Message size: this field stores the channel's message size;
      • Message reference: this field stores the channel's message reference;
      • Semaphore: this field references the semaphore used to manage the channel's access;
      • Subscribers: this field stores the channel's subscribers list;
    • Message is the information exchanged by threads.

The actions available for a thread in zbus are:

  • Subscribe: When a thread needs to keep track of some channel changes, it will subscribe to it (compile-time). Whenever necessary, based on the On change flag, the event dispatcher will notify the subscribers. The notification is only an indication of the changed channel (an id, here is mapped to zbus_index_<channel's name> value) but not the message. The message reading is a discretionary action made by the subscriber. Sometimes the subscriber does not have an interest in reading the message, but wants to know if it changed. This is done to improve speed and make the thread with higher priority access the message first. All the threads are notified at the same time, but the reaction order of the threads is based on their priority;
  • Publish when a thread aims to change a channel message, it will publish to it (execution-time). There is a type-checking before this action. If the developer uses a different message type for this action, a compilation error will rise;
  • Read when a thread is interested in a channel message, it will read from it and retrieve the stored message without changing it (execution-time). The same type-checking made for publishing is done here.

The type checking for publishing (the reading is similar) is done by the code illustrated as follows:

#define zbus_chan_pub(chan, msg, timeout)                                               \
    ({                                                                                  \
        {                                                                               \
            __typeof__(__zbus_channels_instance()->chan) chan##__aux__;                   \
            __typeof__(msg) msg##__aux__;                                           \
            (void) (&chan##__aux__ == &msg##__aux__);                                 \
        }                                                                               \
        __ZBUS_LOG_DBG("[ZBUS] %s pub " #chan " at %s:%d", (k_is_in_isr() ? "ISR" : ""),  \
                     __FILE__, __LINE__);                                               \
        __zbus_chan_pub(ZBUS_CHANNEL_METADATA_GET(chan), (uint8_t *) &msg, sizeof(msg), \
                      timeout);                                                         \
    })

API description

To use zbus in its current implementation, the developer needs to define the messages and the channels by creating the zbus_messages.h and the zbus_channels.h files. The messages file must contain all types used to define the messages and the message definition. The channels are defined by setting some values of a macro as in the code as follows:

ZBUS_CHANNEL(<channel's name>,                           
           <on_change>,                                
           <read_only>,                                
           <message type>,                             
           ZBUS_CHANNEL_SUBSCRIBERS(<subscribers list>), 
           ZBUS_INIT(<initial message value>)
)

The <subscribers list> is a list of subscribers' message queues. If we want a thread to receive notifications of a channel we need to pass a queue for that. The thread needs to check the queue for knowing the changing events. The <initial message value> can be a struct initialization or just a zero for default initialization. For example ZBUS_INIT(.field1 = 10, .field2 = false). After the channel definition, zbus will provide the zbus_channel_index_t which is an enum with the channels' ids. The id generated to a channel is zbus_index_\<channel's name\> of the type zbus_channel_index_t. Event dispatcher uses these ids to send notifications.

The publishing and reading actions are executed by calling the macro zbus_chan_pub(<channel's name>, <message>, <timeout>) and zbus_chan_read(<channel's name>, <message>, <timeout>). The fields description are:

  • <channel's name>: The name of the channel but with no ";
  • <message value>: The value of the channel's message, not its reference. The macro will do what is needed;
  • <timeout>: The regular Zephyr timeout. For ISR calls you must use K_NO_WAIT.

Example of use

This example illustrates the way the subscribers can react to a notification. In this case, we have an immediate callback, a work queue callback, and a thread consuming the notification. The sensors thread generates samples of data and publishes them to the channel sensor_data. All the subscribers receive the notification or execute callbacks directly from the event dispatcher (high priority). The event dispatcher must have the highest priority among the user's threads to guarantee a proper execution. However, the developer must be careful with the callback subscribers because they are executed in the event dispatcher context (with high priority). Another good approach, for low priority actions, is to use work queues to execute the action in fact. The callback only submitted the work when executed to avoid problems.

Messages definition file. Here we have version and the sensor messages:

#ifndef _ZBUS_MESSAGES_H_
#define _ZBUS_MESSAGES_H_
#include <zephyr.h>

struct version {
    uint8_t major;
    uint8_t minor;
    uint16_t build;
};

struct sensor_msg {
    uint32_t temp;
    uint32_t press;
    uint32_t humidity;
};

#endif  // _ZBUS_MESSAGES_H_

Channels definition file. Here we have the read-only version channel and the sensor_data channel:

ZBUS_CHANNEL(version,                        /* Name */
             false,                          /* Persistent */
             false,                          /* On changes only */
             true,                           /* Read only */
             struct version,                 /* Message type */
             ZBUS_CHANNEL_SUBSCRIBERS_EMPTY, /* Subscribers */
             ZBUS_INIT(.major = 0, .minor = 1,
                       .build = 1023) /* Initial value major 0, minor 1, build 1023 */
)

ZBUS_CHANNEL(sensor_data,       /* Name */
             false,             /* Persistent */
             false,             /* On changes only */
             false,             /* Read only */
             struct sensor_msg, /* Message type */
             ZBUS_CHANNEL_SUBSCRIBERS(fast_handler1, 
                                      delay_handler1, 
                                      thread_handler1), /* Subscribers */
             ZBUS_INIT(0)                               /* Initial value {0} */
)

Assuming a trivial implementation of a sensor thread that generates sensor samples and publishes them to the sensor_data channel:

#include <logging/log.h>
#include "zbus.h"
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);

void peripheral_thread(void)
{
    struct sensor_msg sm = {0, 0};
    while (1) {
        sm.press += 1;
        sm.temp += 10;
        sm.humidity += 100;
        LOG_DBG("Sending sensor data...");
        zbus_chan_pub(sensor_data, sm, K_MSEC(250));
        // k_msleep(10);
    }
}

K_THREAD_DEFINE(peripheral_thread_id, 1024, peripheral_thread, NULL, NULL, NULL, 5, 0, 0);

Assuming a trivial implementation of the subscribers in different approaches. They are based in a callback, a work queue and a thread:

#include <stdint.h>
#include "kernel.h"
#include "sys/util_macro.h"
#include "zbus.h"

#include <logging/log.h>
#include "zbus.h"
#include "zbus_messages.h"
LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);

void fh1_cb(zbus_channel_index_t idx);
ZBUS_SUBSCRIBER_REGISTER_CALLBACK(fast_handler1, fh1_cb);

void dh1_cb(zbus_channel_index_t idx);
ZBUS_SUBSCRIBER_REGISTER_CALLBACK(delay_handler1, dh1_cb);

struct sensor_msg msg = {0};
void fh1_cb(zbus_channel_index_t idx)
{
    zbus_chan_read_by_index_unsafe(idx, msg, K_NO_WAIT);
    printk("Sensor msg processed by CALLBACK fh1: temp = %u, press = %u, humidity = %u\n",
           msg.temp, msg.press, msg.humidity);
}

struct sensor_wq_info {
    struct k_work work;
    zbus_channel_index_t idx;
    uint8_t handle;
};

struct sensor_wq_info wq_handler1 = {.handle = 1};

void wq_dh_cb(struct k_work *item)
{
    struct sensor_wq_info *sens = CONTAINER_OF(item, struct sensor_wq_info, work);
    zbus_chan_read_by_index_unsafe(sens->idx, msg, K_NO_WAIT);
    printk("Sensor msg processed by WORK QUEUE handler dh%u: temp = %u, press = %u, "
           "humidity = %u\n",
           sens->handle, msg.temp, msg.press, msg.humidity);
}

void dh1_cb(zbus_channel_index_t idx)
{
    wq_handler1.idx = idx;
    k_work_submit(&wq_handler1.work);
}

void main(void)
{
    k_work_init(&wq_handler1.work, wq_dh_cb);

    struct version v = {0};
    zbus_chan_read(version, v, K_NO_WAIT);

    LOG_DBG("Sensor sample started, version %u.%u-%u!", v.major, v.minor, v.build);
}

ZBUS_SUBSCRIBER_REGISTER(thread_handler1, 4);
void thread_handler1_task()
{
    zbus_channel_index_t idx = ZBUS_CHANNEL_COUNT;
    while (!k_msgq_get(thread_handler1.queue, &idx, K_FOREVER)) {
        zbus_chan_read_by_index_unsafe(idx, msg, K_NO_WAIT);
        printk("Sensor msg processed by THREAD handler: temp = %u, press = %u, "
               "humidity = %u\n",
               msg.temp, msg.press, msg.humidity);
    }
}

K_THREAD_DEFINE(thread_handler1_id, 1024, thread_handler1_task, NULL, NULL, NULL, 3, 0,
                0);

The sequence of activities based on the code above is illustrated as follows.

The sequence starts with the sensors thread generating a sample and publishing it to the sensor_data channel. The bus immediately executes the callbacks, one of them would actually run another will submit work to the system work queue. After that callback, the system will notify the thread handler about the change by sending to it the id of the sensor_datachannel. Supposing the system work queue has a higher priority than the thread handler, it will execute the work. The thread handler wakes up, reads the content of the channel, and prints it. The described actions would run in a loop and repeat indefinitely.

Benchmark

The benchmark was designed to transfer 256000 bytes from the producer to the consumer. The only variable was the size of the channel's message, from 1 byte to 256 bytes. The board where the benchmark was executed is a hifive1_revb, and the code is in the repository samples.

Message size (bytes) Callback average (ms) Callback average data rate (B/s) Thread average (ms) Thread average data rate (B/s)
1 41797,00 6124,84 58322,67 4389,37
2 21096,67 12134,62 29263,00 8748,25
4 10656,00 24024,02 14734,67 17373,99
8 5450,67 46966,73 7478,33 34232,23
16 2838,67 90183,18 3859,00 66338,43
32 1531,00 167210,97 2044,33 125224,20
64 880,33 290798,94 1133,00 225948,81
128 555,00 461261,26 680,00 376470,59
256 391,00 654731,46 453,00 565121,41
The table shows how the channel's size impacts time. Another influential factor is subscriber style. It will be faster when synchronous and slower when asynchronous as expected. In the best-case scenario, the developer can expect to send from 4000 to 600000 bytes per second when using the zbus. It will depend on the number of threads, size of channels, subscriber style, and other factors. But it would work properly in many scenarios of use if well designed. For example Bluetooth scan process, sensor capture, streaming of bytes, etc.

Benefits

In this section I will describe some benefits of using zbus:

  1. Increase abstraction: It can perform one-to-one, one-to-many, and many-to-many communication models seamlessly. The developer can solely use zbus for almost all of the communication needed. The only limitation here is performance, but for control communication, it seems to be enough (this needs more performance measurements).
  2. Thread does not need to know the others: The communication is not oriented by destination, but by channel. The channel is defined by a struct so it works as an API. We implicitly add an Interface Definition Language to it by using C structs as the message definition. Another feature is the type checking during compile-time to avoid using wrong message types during publishing and reading;
  3. Well-known publish/subscribe pattern: It is getting more common in the embedded systems field with the advent of Internet of Things. So I believe there won't be high friction in adopting it;
  4. Improve testability: By that, I understand that the developer can easily replace threads by adding stubs or mocks to act as the actual one. With that, we can increase the observability and controllability of the code when needed by inspecting the message exchanged, and injecting messages respectively;
  5. All the communication can be done in an asynchronous reactive way, enabling the system to be easily power efficient.

Extensibility

There is an extensible logic to the bus enabling that to be monitored and even replicated in different targets. It is possible to capture all the messages exchanged by the bus and inject messages as well. It is also possible to replicate the changes from one bus to another by using some interface like serial or BLE. It is a developer activity, for now, there is no code in zbus related to the replication process.

Future work

I would imagine that using zbus will increase the abstraction and reusability of Zephyr threads. A set of correlated channels form a Port, which means this Port has all the APIs needed to use some thread (as a service). A device driver interface could be written using zbus, it would be easy to use, without adding extra driver API calls to the user code. The sensor driver API would be an example. The fetch, the data, and other related things could be channels. Image the code below could be real:

// ...
void button_pressed(const struct device *dev, struct gpio_callback *cb,
		    uint32_t pins)
{
    struct trigger_msg fetch = {true};
    zbus_chan_pub(BME280_SENSOR_FETCH, fetch, K_NO_WAIT);
}
void some_thread(void)
{
    zbus_channel_index_t idx = 0;
    struct bme280_msg sensor_data = {0};
    while (1) {
        if (!k_msgq_get(&some_thread_queue, &idx, K_FOREVER)) {
            if (idx == zbus_index_bme280_sensor_fetch_done) {
                zbus_chan_read(BME280_SENSOR_DATA, sensor_data, K_MSEC(100));
                // you can read sensor data by:
                // sensor_data.temp
                // sensor_data.press
                // sensor_data.humidity
            }
        }
    }
}
// ...

Maybe all of the repetitive and error-prone initialization of devices could be done by the driver and started only by the DTS. No sensor API calls and no sensor initialization code are needed. Just the "service" enabled in DTS and everything running properly. It would be necessary to add an abstraction layer on top of the bus which initializes and manage the "BME280 sensor service". I did that for GNSS, in my tests, and I could change the GNSS module with no changes on the consumer side. The "GNSS service API" can still be the same, only the adapter had to change. I am from both industry and academy, I am a Ph.D. candidate right now, and my work is to define an architecture that enables great maintainability and abstraction by using some software engineering techniques, and zbus is part of that. This example of the sensor abstraction is a drop of that.

Dependencies

The implementation depends only on semaphores and message queues. The rest of the code is plain C and there is no dynamic allocation there.

Concerns and Unresolved Questions

The main concerns about the solution:

  • The heavy use of preprocessor may generate unreadable compilation errors when the developer makes a mistake;
  • There is no pub/sub loop checking. It is not an easy task considering that can occurs in a chain of calls. If a thread is subscribed to a channel and published to it, maybe a high priority loop will occur (the event dispatcher must have a high priority) causing a kind of starvation;
  • The bus is not made for streaming purposes. It would be used for control messages only. It will be necessary to measure performance;
  • It will increase the footprint of the solution. Each channel allocates the message structure and a metadata portion of 32 bytes. Each subscriber will allocate a message queue to receive change notifications;
  • I have implemented systems using this before, and the results were very interesting for me. But the code needs a lot more tests. I did some initial tests but, it must have plenty more.

Alternatives

I could not find any direct alternative able to run in constrained devices with this kind of set of features.

Community alternatives suggestions:

The table below is a superficial comparison between suggested alternatives. This comparison possibly contains bias because I do not know or understand the Event Manager as I do for zbus.

Comparison item zbus Event Manager NCS Laird messaging bus
Made for Increase code quality by enabling reuse and increasing testability Reduce the number of threads using events and callbacks Multiple receivers broadcast messaging framework system
Metaphor Messaging bus Event manager Event manager
Message definition time Compile-time Compile-time Compile-time, though a custom message could be created dynamically, but the receivers would need to know how to parse the struct
Message definition approach Centralized, single file to describe channels and subscriptions Decentralized, one file per each message and subscription would happen in every place Decentralized, multiple files, usually one per module, all combined into an auto-generated output file by CMake
Message allocation style Static (compile-time) Dynamic or Static (execution-time, it uses a “weak” alloc function) Dynamic (execution-time)
Message persistency Persistent still exists after processed Transient, deleted after processed Both, cleaned up if DISPATCH_OK is returned by a handler, otherwise remains
Message distribution pattern (Take a look at the Reference [1]) Publish/subscribe Message passing (Event/listener) Message passing (Event/listener)
Subscription style Compile-time, but it can be disabled in execution-time by masking the subscriber Compile-time, but run-time could be added using extension hooks Compile-time but execution-time filtering was planned and partially added
Message transmission style Direct transmission when using callbacks style. Two-factor for asynchronous transmission where, first, the event dispatcher sends the id of the changed channel to the subscriber, and second the subscriber decides if it reads the content or not. The transmission order is defined by the position of the subscriber on the subscribing list Direct transmission. It transmits the data to the listeners one by one, you can define priority to regulate the transmission order Direct transmission. Sends an event to listeners one by one for them to process (if a broadcast), unicast goes to a single listener. The order is defined by the order in which listeners register themselves at startup
Subscriber execution styles Synchronous (by callback), asynchronous (by queue) Synchronous (by callback) Asynchronous (by queue)
Implementation approach Static memory, semaphores, and queues Dynamic memory, kernel spinlock, LD files, sys APIs Dynamic memory (buffer pool), message queues
Use code generation (tools/scripts) No, only macros No, only macros CMake used for autogenerating 3 sets of output files: message IDs, message codes, and message types (containing structs) which can be globally included
Extension mechanism (you can add your own functionality) Yes (uart_bridge, remote_mock) Yes (app_event_manager_profiler_tracer, event_manager_proxy) Not built in, no
Maturity Feature-complete Production-ready Production-ready

Initial implementation

The current implementation is made using several preprocessor macros and possibly will be changed. This is a simple implementation and a proof of concept of the bus. You can take a look at the PoC code here.

References

  1. Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec. 2003. The many faces of publish/subscribe. ACM Comput. Surv. 35, 2 (June 2003), 114–131. https://doi.org/10.1145/857076.857078
⚠️ **GitHub.com Fallback** ⚠️