Intro Tour - deanrad/rx-helper GitHub Wiki
A Quick Tour of the Rx-Helper API
trigger
/process
, and on
API Intro 1.1 — Rx-Helper is a way to code in a reactive, event-oriented way, vs a more direct imperative way you may be used to. If you can use and write functions that return Promises, you have a good taste of what this can be like. Rx-Helper will take it up a notch, allowing you to manage all the input, as well as side effects for your component, or your whole app! It will work on the front, or back end, and is not prescriptive or opinionated about which framework you use, whether you're coding a console app, WebSocket server, or a Single Page front-end App.
We'll define different parts of Rx-Helper as we go - but the most important is that of an
event. An event is an object, with a String type
field, and optionally a payload. These are the blood that runs through the veins of your app - all things that happen (after your initial setup), are driven by them. Let's see some code:
Hello World, and Source-Code ordering
Assume we have the following:
import { trigger, on } from "rx-helper";
const log = (...args) => console.log(...args);
Then our minimal hello world is:
trigger("hello");
on("hello", () => log("Hello World!"));
// OOPS! That did nothing!
Or rather:
on("hello", () => log("Hello World!"));
trigger("hello");
// "Hello World!"
You set up event handlers, then trigger events—that's all there is to it!
A word of caution: As with all event-driven programs, the assumptions you might have about source-code order and when things execute might need to be reconsidered. Handlers must be set up first! In practice, this only takes a little practice to get used to. Remember this mantra: "Events drive everything"
Event Input and Handling
Events are objects, that can carry more than just their string type
- they can have a payload. These are combined into a single Object, with string type
, and any type payload
fields. The Flux Standard Action is the specification they conform to. In order to inspect the event, you can pluck the event
field from the first argument to your handler.
app.on("greeting", ({ event: { payload: person } }) => {
log("Hello ", person))
})
["Babar", "Isiah"].forEach(person => {
app.trigger("greeting", person)
})
// "Hello Babar"
// "Hello Isiah"
For symmetry with trigger, the event payload can also be grabbed from the second argument. If you're not using the first argument, the '_
' is a placeholder.
app.on("greeting", (_, person) => {
log("Hello ", person))
})
["Babar", "Isiah"].forEach(person => {
app.trigger("greeting", person)
})
// "Hello Babar"
// "Hello Isiah"
Lastly, the original way of processing a single event was called process
- and it still exists, and takes the entire event. Handling doesn't change whether it was process
or trigger
that was called- it's all driven by the event itself!
app.process({ type: "greeting", payload: { crazy: "stuff" } });
Listen Up - Don't Throw An Error!
As you've seen, a handler is a function that runs in response to events. The advantage of using events and handlers over regular function calls is that handlers have built-in error-isolation.
Error isolation is important! Imagine a speaker presenting to a crowd - if one of their handlers falls asleep, or has an issue with what they say, should that handler interrupt handlers nearby? Or, interrupt the speaker and interfere with all handlers? Rx-Helper handlers are polite, and if they are not they are silently removed! As this example shows:
const logEveryone = (_, person) => {
log("Asked to greet " + person);
};
const onlyLikesBethany = (_, person) => {
if (person !== "Bethany") {
throw new Error("I only greet Bethany!");
}
};
app.on("greeting", onlyLikesBethany);
app.on("greeting", logEveryone);
["Bob", "Bethany", "Babar"].forEach(person =>
app.process({ type: "greeting", payload: person })
);
// Error: I only greet Bethany!
// Asked to greet Bob
// Asked to greet Bethany
// Asked to greet Babar
Note the following:
- The running of
logEveryone
completed, regardless of the handleronlyLikesBethany
. - It did not matter that the handler which errored was declared 'first' in the source code.
- The
forEach
loop continued to process events - it did not 'see' the exception.
If you've heard of the Actor Model, which powered telephony switches in the 1960's, you'll understand why killing a misbehaving handler is the right thing to do. We'll show you the proper way to deal with handler errors later, but for now, remember that allowing an exception to escape a handler will kill that handler!. However it will not kill the rest of the program - and that is what error isolation is for. Rest assured that there is a way for the caller of trigger
/process
to find out about these handler errors, but that is an advanced topic as it will seldom be needed.
In short, handlers—when attached to the app via on
—act like little Job Boxes running their own processes, independently from each other. They're where you'll set up async handling, whose errors would not immediately visible to the trigger
-er, anyway.
But in the event that you must use a handler for authentication, or validation, or prevent future action from being taken, there is the second way to attach a handler: via filter
.
filter
API Intro 1.2 —Suppose the function onlyLikesBethany
was intended to only let events for Bethany into the app. No handlers should run, unless the event has "Bethany" in its paylaod. We can achieve that by throwing an exception, but from a handler attached via filter
.
app.on("greeting", onlyLikesBethany);
app.filter("greeting", logEveryone);
// Bob not allowed in.
// Asked to greet Bethany
// Babar not allowed in.
Unlike handlers attached with on
, where the order they are attached has no effect on the app, handlers attached via filter:
- Are run sequentially, in the order they were attached with
filter
- If they throw, they'll continue to be run on successive events - throwing is how they filter!
- If they throw, they'll prevent any
on
handlers (or successive filters) from being run.
Because of this, if we need to check for filter exceptions, we'll need synchronous try-catch exception handling in our forEach
.
["Bob", "Bethany", "Babar"].forEach(person => {
try {
app.process({ type: "greeting", payload: person });
} catch (ex) {
console.log(person + " not allowed in.");
}
});
Filters are perfect for the following:
- Validation: Checking type-safeness or required field validity
- Aggregation: Running a reducer to update a store, and throw if violations are detected.
- Logging: You'll see a message before any
on
handlers are run. - Timestamping: You can modify the event in a filter, and future handlers will see that change.
In short- for running synchronous functions of short duration, filters are your friend!
Async, and Paying it Forward with Return Values
API Intro 2.1 — Filter return values
You might realize now that Filters, or handlers attached via on
, called Handlers, can do anything - they're just functions. And that's right, but not the entire spirit of a Filter.
If a filter throws an error upon an event, the event is canceled. Otherwise, you get filter return values on the object returned, which is a clone of the event - how handy!
// Add a random system-generated id onto every event, and log all events.
// Tip: /.*/ matches everything
app.filter(/.*/, ({ event }) => randomId(), { name: "systemId" });
app.on(/.*/, ({ event }) => {
log(event);
});
// Make use of the fields on the returned event
[("Bob", "Bethany", "Babar")].forEach(person => {
const result = trigger("greeting", person);
log(result);
log(`${result.payload} given systemId ${result}`);
});
// Object {type: "greeting", payload: "Babar"}
// Object {type: "greeting", payload: "Babar", systemId: "2bc5aac", completed: Promise}
The return value from process
/trigger
is the event object created, supplemented with a field for the return value from each filter. You don't have to use them, but you could! We made use of a HandlerConfig argument to give a name. A name is automatically assigned in other cases, and named after your event pattern, if possible, but if you intend to use the result
capability of filters, it would be best to name them. A greppable code-base is an easy code-base!
API Intro 2.2— Handler return values
But what about that other field returned - completed
?
// Object {type: "greeting", payload: "Babar", systemId: "2bc5aac", completed: Promise}
The filter didn't put that field there. The handlers did.
The completed
Promise is the master Promise for all handlers (async handlers via on
) that ran upon this event. (Read that again, soak it in - got it?)
// Add a random system-generated id onto every event.
app.filter(/.*/, ({ event }) => randomId(), { name: "systemId" });
app.on(
/.*/,
({ event }) => {
const thisMoment = round(performance.now() % 10000);
log("S: creating at " + thisMoment);
return thisMoment;
},
{ name: "serverCreatedAt" }
);
// Make use of the fields on the returned event
[("Bob", "Bethany", "Babar")].forEach(person => {
const result = trigger("greeting", person);
log("C: ", result);
result.completed.then(({ serverCreatedAt }) => {
log("C: completed.serverCreatedAt: " + serverCreatedAt);
});
});
// S: creating at 417
// C:
// Object {type: "greeting", payload: "Babar", systemId: "a8e4a92", completed: Promise}
// C: completed.serverCreatedAt: 417
// https://codesandbox.io/s/staging-fire-l1hdq
And, so you don't have to wait on all handlers, just the one you are interested in, the Promise available under the handler name will be just for that handler's possibly async result. So, result.completed.subTask
will be available possibly sooner than result.completed
because it will not have to wait for result.completed.anotherTask
.
But now you really want to know - how do I return a value, if it's an async function I'm calling? Although returning a value works fine for synchronous functions, returning a Promise is much more common for async work. That will work with rx-helper
, but it's not the true Spirit (capital S), of HandlerS, oops, handlers :) Handlers might just as well return one of the older siblings of Promises, Observables.
Observables have been around for a very long time - longer in fact, than Promises. They are coded in a dozen languages through the ReactiveX project, and are on their way to being a standard langugage feature (currently a State 1 Proposal).
But they can be hard to use - and RxHelper aims to help with some, helpers. Let's check out after
, the surly cousin of setTimeout
:
import { after } from "rx-helper";
app.on(
/.*/,
({ event }) => {
return after(500, () => {
const thisMoment = round(performance.now() % 10000);
log("S: creating at " + thisMoment);
return thisMoment;
});
},
{ name: "serverCreatedAt" }
);
The helper after
takes a time in milliseconds, and either a value or a function that does some work and then returns a value, and creates an Observable
of that value after that amount of time. This stuff makes me giddy, being able to move into async so easily - no await
or confusing generator syntax. Just an object that you can think of as an unstarted Job. That's one lens you can view an Observable through is the definition of a Job, that is yet unstarted. Such as a string you'd pass to a system
command like: ls -l
. The work to get those results isn't done until you ask the Job to start. When you return an Observable from a Handler, the instance of an Agent
living in the variable app
will choose when to start that Job. By default it will run it immediately.
But one of the big challenges in async is managing the concurrency of results, so that it remains configurable. Sometimes you want to enqueue these jobs, as though on a Job Box of concurrency: 1
- other times the default, parallel, run-immediately mode will suffice, there's no one right answer.
With Rx-Helper, if you set things up this way:
- Assign a Handler to listen for events matching a pattern
- Return Observables from your handler
You are guaranteed to be able to adjust exactly WHEN and HOW the different instances of the Job interact. So you can leave the definition of the Job uncontaminated by details about whether it's to be run in parallel, in a queue, or if each should cut off the last.
So lets understand the full life cycle of the default concurrency mode, used when not specified, parallel
.
Run at once - parallel
In this listing, note the order of the log statements:
app.filter(/.*/, ({ event }) => round(performance.now() % 10000), {
name: "clientTime"
});
app.on(/.*/, ({ event }) => {
return after(500, () => {
const thisMoment = round(performance.now() % 10000);
log("S: creating at " + thisMoment);
return thisMoment;
});
},
{ name: "serverCreatedAt" }
);
["Bob", "Bethany"].forEach(person => {
const result = trigger("greeting", person);
log(`C: ${result.payload} processed at`, result.clientTime);
result.completed.serverCreatedAt.then( serverCreatedAt => {
log(`C: ${result.payload} completed.serverCreatedAt: ` + serverCreatedAt);
});
});
C: Bob processed at 437
C: Bethany processed at 439
S: creating at 940
C: Bob completed.serverCreatedAt: 940
S: creating at 941
C: Bethany completed.serverCreatedAt: 941
All of the calls to trigger
complete as fast as they can run through the filters, not held up at all by the serverCreatedAt
handler. Despite that it was async, the client was able to chain a then
handler onto the completedAt.serverCreatedAt
Promise, while not becoming blocked. Everyone is happy - nobody waiting on anyone else - and no async
or await
to be found!
2.2
Let's see how to return Observables when they're not as simple as after
:
import { Observable } from 'rxjs'
app.on(/.*/, ({ event }) => {
return new Observable(notify => {
setTimeout(() => {
...
notify.next(thisMoment);
notify.complete(); // Don't forget to complete Observables!
}, 500);
});
},
{ name: "serverCreatedAt" }
);
Like Promises, Observables, when created via new
, recieve some callbacks. Only with Observables, they are called {next, error, complete}
instead of (resolve, reject)
. Also they're bundled into an Object, whose technical name is an Observer. But I like to name the parameter notify
, notify.next
just seems a very natural way to express what calling the next
method of the Observer does.
Be sure to complete your Observables by calling notify.complete
, or the system can get really bogged down! And note that either complete
or error
will resolve or reject the completed
Promise for this handler, and to provide a value to that completed
handler (to resolve with a value), you must first call next
prior to complete
- it's two separate calls. While it may seem strange to you if you come from Promises, keep in mind Observables can call next
any number of times, so calling next
, then complete
is equivalent to calling a single resolve
with a Promise.
It's not usually necessary with Rx-Helper to call notify.error
, unless you want the handler to be killed. Ususally its best to catchError
and return an event of a custom type with meaning to your app, then gracefully complete
. This keeps your handler alive to process more events, and would be more appropriate for the occasional AJAX error scenario that would result in a rejected Promise.
API Intro 2.3 — Concurrency Modes
In that example, we are happy with the parallelism - but we will see some frequent cases where that is not the case. Returning Jobs (Observables) from Handlers allows the maximum possible flexibility in tuning the operation of the Jobs, and using plug-and-play concurrency modes is less error-prone than trying to figure out just the right combination of setTimeout
and cancelation to make it work in an air-tight fashion.
It's been said before and it's still true. Async. Hard. Is.
But, the question of what to do, when a handler is already running a Job, and another is trigger
-ed, often falls into one of 4 modes, as pictured here:
Concurrency options go right in the config, as one of these strings:
No-Loss | Limit 1 |
---|---|
Parallel | Cutoff (replace) |
Serial | Mute (drop) |
It's easy to see how the concurrency modes differ, and easy to imagine uses for either one.
parallel
is a fine default: just execute Jobs returned from Handlers with no fixed restriction. (Async lookups on a page)serial
- Jobs aren't begun until the previous one has finished. (Adding a song to a playback queue)cutoff
- A limit of 1 at a time - replacing any old Job with a new by canceling the first, then starting the new. (Autocomplete search results)mute
- A limit of 1 at a time. Ignoring/dropping new Jobs while another is processing. (Elevator buttons)
You might recognize these as corresponding to the RxJS operators mergeMap
, concatMap
, switchMap
, and exhaustMap
, only named friendlier :) That's the RxHelper making it easier for you!
API Intro 2.4 — HandlerConfig {type: 'nextSection'}
So far we've been describing Observable-returning Handlers whose results are visible to the caller of trigger
/process
. And that is one possible consumer of that data. But the power of Handlers is that their return values, that come from the Observables they return, can become events that app
sees, as though trigger
was called inside their handler functions!
So while you could do this:
on("greeting", () => {
return after(500, () => {
trigger("timestamp", round(performance.now() % 10000));
});
});
It's better practice to do this.
on(
"greeting",
() => {
return after(500, () => getTimeStamp());
},
{ type: "timestamp" }
);
The main benefit of this is that this way the Handler does not need to know
what event types its return values are processed as. It now says what gets returned, and config specifies how. Another benefit is that the function you call in the handler does not need to call trigger
, which keeps your business logic function from having a dependency on RxHelper.
So, to recap, Rx-Helper provides for you to get events into the app
by applying the type
parameter of config, to specify that a Handler's return values should become the payloads of events of that type
.
If it sounds confusing - don't worry. There's an article in The Wiki describing why that's cool, but maybe all you need is an example to see:
app.on(
"greeting",
({ event }) => {
return after(500, () => {
const thisMoment = round(performance.now() % 10000);
log("S: creating at " + thisMoment);
return thisMoment;
});
},
{ name: "serverCreatedAt", type: "timestamp" }
);
app.on("timestamp", (_, payload) => {
log(`Timestamped another at ${payload}!`);
});
// S: creating at 1091
// Timestamped another at 1091!
// S: creating at 1094
// Timestamped another at 1094!
Awww - the two handlers are sending love messages, and timestamps are their love language! The Observable returned from the greeting
handler feeds the app's events with events of type timestamp
. The caller of trigger
/process
knows no different, but there are now events in the system that weren't there before. This provides for events that new behaviors can hang off of, like our silly logger! But it's not silly at all when you find it is ridiculously easy to scale the complexity of your app by chaining handlers this way.
Lastly, if your handler Observable already returns events that have type
and payload
fields, and you want them to go back through the app, you can pass true
to the processResults
config field:
on(
"greeting",
() => {
return after(500, () => ({ type: "timestamp", payload: getTimeStamp() }));
},
{ processResults: true }
);
This does not require you to call trigger
, however it makes it less easy to see at a glance, in a standard configurable place, what new event types this handler is responsible for creating. Of course, if the handler may return several, or dynamic event types, processResults
is made for just that case.