Coordinating stuff that happens over time - rebus-org/Rebus GitHub Wiki
Sometimes, when building systems, stuff happens over time that's implicit and hard to understand, unless someone's being nice and draws it for you on a piece of paper using some sort of state machine notation.
One example can be some kind of financial trading company. When a new customer is registered, we need to collect some legal information regarding that customer, and we need to do that in several systems. You can probably imagine how this could be drawn as a state machine or a sequence diagram on a piece of paper, but when the time comes to implement these things, at least in my experience, then this stuff will be fragmented across the entire system...
Why is it that we're often content with having these state machines as implicit stuff in our code, when object-oriented programming is all about coding stuff that's present in the real world? Wouldn't it be nice of someone came up with a model for stuff that evolves over time, that would make modeling these things easy?
A saga is an instance of a state machine whose transitions are triggered by messages. In order to implement something like that, three things must be in place:
- a representation of the saga's state (in Rebus that's just a class that implements
ISagaData
), stored somewhere persistent - a model of the transitions and actions that constitute the saga (in Rebus that's a special message handler that's derived off of
Saga<TSagaData>
) - a mapping of message fields to saga data fields, that allows for existing saga instances to be found for each message that is handled (or if no instance is found, maybe it's allowed for a message to initiate a new saga?)
First, I'd make sure that my service was subscribed to events from the CRM system where new customers are registered:
await bus.Subscribe<Crm.Messages.Events.CustomerCreated>();
which would be accompanied by an endpoint mapping that ties all of CRM's messages to CRM:
.Routing(r => r.TypeBased()
.MapAssemblyOf<Crm.Messages.Events.CustomerCreated>("crm.input"))
assuming that the CRM system publishes this bad boy when a new customer is created:
public class CustomerCreated
{
public string CustomerId { get; set; }
}
Then, I'd make a simple model that would comprise the state of the saga:
public class CollectLegalInfoSagaData : ISagaData
{
// these two are required by Rebus
public Guid Id { get; set; }
public int Revision { get; set; }
// add your own fields and objects down here:
public string CrmCustomerId { get; set; }
public bool GotLegalInfoFromFirstSystem { get; set; }
public bool GotLegalInfoFromSecondSystem { get; set; }
}
As you can see, our state consists on three pieces of information (in addition to the mandatory Id
and Revision
properties):
- the CRM ID of the new customer
- whether we got legal information from the first system
- whether we got legal information from the second system
Now, the mapping of messages to sagas would be implemented like this:
public class CollectLegalInfoSaga : Saga<CollectLegalInfoSagaData>,
IAmInitiatedBy<CustomerCreated>,
IHandleMessages<LegalInfoAcquiredInFirstSystem>,
IHandleMessages<LegalInfoAcquiredInSecondSystem>
{
protected override void CorrelateMessages(ICorrelationConfig<CollectLegalInfoSagaData> config)
{
// ensure idempotency by setting up correlation for this one in addition to
// allowing CustomerCreated to initiate a new saga instance
config.Correlate<CustomerCreated>(m => m.CustomerId, d => d.CrmCustomerId);
// ensure proper correlation for the other messages
config.Correlate<LegalInfoAcquiredInFirstSystem>(m => m.CorrId, d => d.CrmCustomerId);
config.Correlate<LegalInfoAcquiredInSecondSystem>(m => m.CorrId, d => d.CrmCustomerId);
}
(...)
}
As you can see, CustomerCreated
is allowed to initiate a new saga if an existing instance cannot be found, although we also set up a correlation between CustomerId
of the event and CrmCustomerId
, which ensures that we don't get multiple saga instances if, for some reason, a CustomerCreated
event was received twice for the same customer.
And then, when legal information has been acquired in the two external systems, we want to find our saga again, allowing it continue where it left off.
That was the declarative part of setting up the saga - only thing left, is to write some code that will actually end up being executed - you see, (...)
above can't do the job, we need real code. So the saga should be equipped with the following handler methods:
public async Task Handle(CustomerCreated customerCreated)
{
// if saga is not new, we swallow the event - this is how you make the saga idempotent
if (!IsNew) return;
// store the CRM customer ID in the saga
Data.CrmCustomerId = customerCreated.CustomerId;
// command that legal information be acquired for the customer
await bus.Send(new AcquireLegalInformationFromFirstSystem {
CrmCustomerId = Data.CrmCustomerId,
CorrId = Data.CrmCustomerId
});
await bus.Send(new AcquireLegalInformationFromSecondSystem {
CrmCustomerId = Data.CrmCustomerId,
CorrId = Data.CrmCustomerId
});
}
This handler method ensures, by checking IsNew
, that we're guaranteed to have only one single instance of the saga for each customer ID running. Then, it saves the ID of the newly created customer and goes on to send requests to the two legal systems, providing the ID of the customer to check, and then, because our use of the request/reply pattern obligate a correlation ID, it supplies the customer ID to be used to correlate replies with the saga.
And then, the saga should be able to handle the two responses:
public async Task Handle(LegalInfoAcquiredInFirstSystem first)
{
Data.GotLegalInfoFromFirstSystem = true;
await PossiblyPerformCompleteAction();
}
public async Task Handle(LegalInfoAcquiredInSecondSystem first)
{
Data.GotLegalInfoFromSecondSystem = true;
await PossiblyPerformCompleteAction();
}
and, since we don't know the order in which the responses will arrive, this code must perform a complete saga completion check:
async Task PossiblyPerformCompleteAction()
{
if (Data.GotLegalInfoFromFirstSystem && Data.GotLegalInfoFromSecondSystem)
{
await bus.Publish(new CustomerIsLegallyOk { CrmCustomerId = Data.CrmCustomerId });
MarkAsComplete();
}
}
Note how MarkAsComplete()
is called when the saga is complete - this will allow the underlying saga persister to delete the saga, if it so desires.
The term is stolen from a paper on long-lived transactions in a database context, and it is not entirely accurate according to the original definition because the Rebus/NServiceBus implementations don't require you to implement any kind of rollback/compensating actions in the case where there's deviance from the sunshine scenario.
It's probably just called "Saga" because it sounds cool, way better than "Workflow", "Process Manager", etc. It does, however, correspond pretty accurately to the process manager as defined by Gregory Hohpe.
In the example shown above, we're using properties of incoming messages to correlated with our saga data, i.e. like this:
config.Correlate<CustomerCreated>(m => m.CustomerId, d => d.CrmCustomerId);
Since the first function (m => m.CustomerId
) is just an ordinary Func<TMessage, object>
, you can do anything C# will allow in there, e.g. like
config.Correlate<CustomerCreated>(m => $"{m.SomeProperty}/{m.AnotherProperty}", d => d.CorrelationId);
to concatenate properties SomeProperty
and AnotherProperty
of incoming messages and using that as the correlation ID.
While a saga handler must have at least one IAmInitiatedBy<TMessage>
on it to be able to have new instances created, it's totally possible (and often very desirable) to have the saga handler implement multiple IAmInitiatedBy<TMessage>
s, thus making it possible to relax the assumption that one specific message type is received first.
In some cases, saga handler will even implement IAmInitiatedBy<TMessage>
for all message types it can handle, because then it can be made tolerant towards receiving messages in any order.