Data bus - rebus-org/Rebus GitHub Wiki

When building systems with messaging, it is usually advised that message sizes are kept small. This is because big messages can ruin the nice, low-latency, easy-flowing properties of messaging-based systems by clogging up queues and consuming unnecessary resources.

Sometimes you need to transfer big chunks of data though, like e.g. image, sound, and video files, big CSV files, etc.

This is where the "data bus" comes in :) the data bus is a dead simple implementation of the claim check pattern.

The basic principle is that the bus provides a mechanism that is capable of storing an attachment in some kind of data store, and then a reference to that attachment (the "claim check") is passed around in messages instead of the full payload.

This way, whoever gets the claim check will be able to retrieve the full payload if it is desired, possible forwarding the message with the claim check multiple times without paying the cost associated with transferring the actual data around.

How to configure it

Simply use the .DataBus() method on the configurer and build further on it to select a way to store data, like so:

Configure.With(...)
    .(...)
    .DataBus(d => d.StoreInFileSystem(@"\\SOMEMACHINE\DATA"))
    .Start();

where StoreInFileSystem in this case configures Rebus to store data as files on a network share somewhere.

Rebus comes with built-in data bus data storage options for FileSystem, SQL Server, an in-memory storage (useful for testing), and a storage implementation that uses Azure blobs.

How is it used?

When you want to create an attachment, you reach for Rebus' advanced API and use the DataBus property to create the attachment, like so:

using (var source = File.OpenRead(@"raw-audio.wav"))
{
    var attachment = await bus.Advanced.DataBus.CreateAttachment(source);

    await bus.Send(new ProcessRecording(attachment));
}

As you can see, the CreateAttachment will asynchronously pipe the source stream into its configured data store, returning an attachment to the caller. This attachment is the claim check - it holds nothing more than a generated ID for the attachment.

The ID is inside the returned DataBusAttachment, which can be included in messages as-is if the serializer supports it (all of Rebus' currently supported serializers do). Alternatively, the contained ID can be sent as a string.

In the receiving end, the attachment can be retrieved in a message handler by calling the OpenRead method on DataBusAttachment, like so:

public async Task Handle(ProcessRecording message)
{
    var attachment = message.Attachment;
    var destinationFilePath = Path.GetTempFile();

    using (var destination = File.OpenWrite(destinationFilePath))
    using (var stream = await attachment.OpenRead())
    {
        await stream.CopyToAsync(destination);
    }

    // ... process the file here
    await ProcessWaveFile(destinationFilePath);
});

If you chose to transfer the ID as a string, you can simply use the static OpenRead method on DataBusAttachment, like so:

using(var stream = await DataBusAttachment.OpenRead(attachmentId))
{
    // ...
}

Metadata

When you create an attachment, you can add some custom metadata to it if you like. If I wanted to add the username of the current user account to a new attachment, I could do it like this:

var metadata = new Dictionary<string, string> {
    { "username", Thread.CurrentPrincipal.Identity.Name }
};

var attachment = await bus.Advanced.DataBus.CreateAttachment(source, metadata);

and then at the receiving end, I could retrieve the metadata (without retrieving the entire payload!) like this:

var attachment = message.Attachment;
var metadata = await attachment.GetMetadata();

which can be pretty neat to include e.g. original file names and other things along with an attachment.

Default metadata

By default, the data bus will provide the following metadata fields:

  • Rbs2DataBusLength: Length (in bytes) of the stored data
  • Rbs2DataBusSaveTime: ISO8601-encoded time of when the data was saved
  • Rbs2DataBusReadTime: ISO8601-encoded time of when the data was last read (updated on every commenced read operation of the payload, absent when the data has not been read yet)