EventSource - unders/mywiki GitHub Wiki
Event Store
Event store = Database + Message Broker
Entity id Entity type Event id Event Type Event data
101 Order 99 OrderCreated Serialized Aggregate Order Data
101 Order 100 OrderApproved Serialized Aggregate Order Data
- Save Aggregate Events
- Get aggregate Events by entity id
- Subscribe to events (Event Type)
Snapshot
Periodically snapshot to avoid loading all events
Current state
currentState = foldl(applyEvent, initial state, events)
Request Handling
- Order service
pastEvents = eventStore.findEvents(entityId)
o := order.New()
o.applyEvents(pastEvents)
newEvents = processCmd(order.SomeCommand)
o.applyEvents(newEvents)
eventStore.saveEvents(newEvents) - optimistic locking
Subscription
Event Store publishing Events consumed by other services
eventStore.subscribe(eventTypes)
Talks
Event Sourcing You are doing it wrong by David Schmitz
Summary
- ES + DDD == 💜
- Needs more upfront design
- Not enough books
- Avoid frameworks
When building event sourced system, we allways use:
- Domain Driven Design
- Event Driven Architecture
- Distributed Systems
Technical Level
- Event Payload - each event has a payload
- Events - belongs to a stream
- Streams - events is stored in streams as data and event type, ordered in creation order
- EventStore (a database) - stream are stored in an event store using unique identifiers
- Projections - Drives current state from stream of events
- Aggregate - Projections build aggregates
- Read Model - Aggregates can be stored in a read model.
Example
- stream: user-xxx
- events: UserCreated, UserOnboarded, UserRelocated
- Projections: HandleUserCreated, HandleUserOnboarded, HandleUserOnboarded
- User Aggregate: Represents the user at the time of the last event read,
Read Model
readModel = Stream.Of(events).LeftFold(handlers)
,
but you may not need a read model, depending on your strategy for storing events:
- Maybe all events of an aggregate type in a single stream?
- Better: One stream per aggregate: User-A, User-B, ...
GET users/B -> User Microservice ->
handlers = {
"UserCreated", (current, event) => {},
"UserOnboarded", (current, event) => {},
"UserRelocated", (current, event) => {},
}
aggregate := readAggregateFromStream(
"user",
"B",
{},
fromStartOfStream,
handlers
)
Since an event is in past tense and we should be able to reapply handlers forever, that means that the handler cannot have any side effects; that stuff must be handler somewhere else...
How do we write data
- A Use Case example: Only withdraw money, if the bank accounts holds enough money; events: MoneyDeposited, MoneyWithdrawn
The correctness of the business result depends on the order of events. The aggregate is responsible for enforcing business invariants -> the aggregate is a transaction boundary.
PUT accounts/1234 -> Account Microservice ->
{ aggregate, lastVersionNumber} = readAggregateFromStream(...)
events = executeBusinessLogic(...)
emitEvents('account', '1234', events, lastVersionNumber)
Since executeBusinessLogic
can contain side effects, how do we solve it? We must lock the stream...?
Papers
- Versioning in an Event Sourced system - Gregory Young
- The dark side of Event sourcing: Managing data conversion - Michiel Overeen
- Effective Aggregate Design Part 1: Modelling a Single Aggregate - Vaughn Vernon
- Exploring CQRS and Event Sourcing (Microsoft) - patterns and practices.