Promise - advantageous/reakt GitHub Wiki
A Promise
is like a non-blocking Future
(java.util.concurrent.Future
).
With a promise you can get notified of changes instead of having to call get
.
A promise is both a Callback
(io.advantageous.reakt.Callback
),
and a Result
(io.advantageous.reakt.Result
). A promise is a sort of deferred value.
There are three types of promises, a blocking promise, a callback promise and a replay promise.
A blocking promise is for legacy integration and for testing. A callback promise will get called back (its then
, thenExpect
and catchError
handlers), but usually on a foreign thread. A replay promise gets called back on the the caller's thread not the callee. The replay promise usually works with a Reactor
(a concept from QBit).
Promises allow you to succinctly call async APIs using Java 8 lambda expressions.
private void lookupSnapshot(final Session session,
final Callback<Snapshot> callback,
final Long timestamp) {
/**
* Cassandra Query
*/
final Select.Where query = QueryBuilder
.select().all()
.from(RESULTS_TABLE)
.where(
eq("created_at", timestamp)
);
/** Create a resultSetPromise (which is a callback) and then use it to
* invoke Cassandra query.
*/
final Promise<ResultSet> resultSetPromise = Promises.<ResultSet>promise();
resultSetPromise
.thenExpect(expected ->
expected
.ifEmpty(() -> snapshotNotFoundReject(callback, timestamp))
.ifPresent(resultSet -> Expected.ofNullable(resultSet.one())
.ifEmpty(() ->
snapshotNotFoundReject(callback, timestamp)
)
.ifPresent(row ->
extractSnapshotFromRow(callback, row)
)
)
)
.catchError(error ->
callback.reject("Unable to load snapshot", error));
//uses Reakt Guava Bridge
registerCallback(session.executeAsync(query), resultSetPromise);
}
Reakt Promise
s and Callback
s are regular Java Consumer.
This allows you to use Reakt Promises with projects and libs that have no concept of Reakt.
//Now extends Consumer
public interface Callback<T> extends Consumer<T> {
...
default void accept(T t) { reply(t);}
default Consumer<Throwable> errorConsumer() { return this::reject; }
default Consumer<T> consumer() { return this; }
...
//A promise is a callback and a result as before
public interface Promise<T> extends Callback<T>, Result<T> {
Let's show an example.
If you had the following service that used Consumer for callbacks.
...
public class EmployeeService {
public void lookupEmployee(String id, Consumer<Employee> employeeConsumer,
Consumer<Throwable> errorConsumer) {
//lookup employee async.
employeeConsumer.accept(employee); // or call errorConsumer(exception);
...
}
You could call the above service with a Reakt promise like this.
Promise promise = Promises.promise();
promise.catchError((error)->...).then((employee)->...);
employeeService.lookupEmployee("123", promise.consumer(),
promise.errorConsumer());