Persisting events - Horusiath/Akkling GitHub Wiki

In order to persist actor's state, Akkling.Persistence utilizes command/event pattern, a well known eventsourcing primitives. You can read more about them in Akka.Persistence documentation. Major difference here is that in Akkling there is no distinction between recovery handlers, command handlers and persist callbacks. All of them are handled by a single actor behavior loop (see example).

In order to persist an event, you can utilize dedicated persistent effects:

  • Persist, PersistAll - those effects take an event or sequence of events and order them to be stored inside event journal.
  • PersistAsync, PersistAllAsync work similar to their synchronous cousins, however they can optimize event persisting mechanism by additional batching at cost of periodical actor's state inconsistency.
  • Defer will schedule an additional actor's loop cycle, once batch of asynchronously persisted messages will be stored.

Handling actor phases

Sometimes you might need to distinguish within which phase an actor is receiving an event, i.e. you shouldn't use any logic other than updating state, once it's in recovery phase. For that you may use combination of eventsourced context properties and active patterns:

let! msg = ctx.Receive()
match msg with
| event when ctx.IsRecovering () -> 
    // actor is in recovery phase
| Persisted ctx event -> 
    // actor has persisted an event
| Deffered ctx event ->
    // actor has persisted events batch and invoked deffered phase
| _ -> ...

Snapshots

When event stream becomes longer, it may take some time to recover from all of them. For that reason, you can use optimization technique called snapshotting. Once snapshot will be stored (this is async operation), when recovering in the future, actor may first try to recover from the latest snapshot available and then replay events, that occurred only from this point in time forward.

In order to save a snapshot you can use simply send SaveSnapshot message directly to the snapshot store:

typed ctx.SnapshotStore <! SaveSnapshot(SnapshotMetadata(ctx.Pid, ctx.LastSequenceNr()), state)

If you want to use snapshots during a recovery phase you may use following pattern:

spawnPersist system "pid" <| fun ctx -> 
    let rec loop state = 
        actor { 
            let! msg = ctx.Receive()
            match msg with
            | SnapshotOffer snapshot -> return! loop snapshot
            | _ -> ...
        }
    loop initialState

Other persistent lifecycle events

In order to determine, once recovery cycle has finished or failed, you may use dedicated event hooks:

  • ReplaySucceed triggers, once recovery phase has ended successfully. It's a good place i.e. to change actor's state machine from recovery handlers to command/event handlers at this point.
  • ReplayFailed triggers, when an exception occurred during recovery phase. This will always result in stopping an actor. You may want to use it i.e. to send negative acknowledgement (NAck) to actors, that sent any messages to failing actor.

Example:

let! msg = ctx.Receive()
match msg with
| PersistentLifecycleEvent e ->
    match e with
    | ReplayFailed -> 
        // actor failed to recover
    | ReplaySucceed -> 
        // actor recovered successfully
| _ -> ...