Promises.all - advantageous/reakt GitHub Wiki
We have all
functionality with promises. You can create a promise that waits on all promise
s passed to it to be async returned.
Promises.all(promise1, promise2, promise3).catchError(returnPromise::reject)
.then(v -> returnPromise.resolve(true)).invoke()
If every promise you pass to an all promise is invokeable then the all promise is invokeable as well, and calling invoke on it will invoke all of the children promises.
The Promise.all(list or array) method returns a promise that resolves when all of the promises have resolved, or rejects with the reason of the first passed promise that rejects.
Promises.all(
//Call to save Todo item in two table, don't respond until
// both calls come back from Cassandra.
// First call to cassandra.
futureToPromise(
session.executeAsync(insertInto("Todo")
.value("id", todo.getId())
.value("updatedTime", todo.getUpdatedTime())
.value("createdTime", todo.getCreatedTime())
.value("name", todo.getName())
.value("description", todo.getDescription()))
).catchError(error -> recordCassandraError("add.todo", error))
.thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.todo")),
// Second call to cassandra.
futureToPromise(
session.executeAsync(insertInto("TodoLookup")
.value("id", todo.getId())
.value("updatedTime", todo.getUpdatedTime()))
).catchError(error -> recordCassandraError("add.lookup", error))
.thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.lookup")
).catchError(returnPromise::reject)
.then(v -> returnPromise.resolve(true)).invoke()
/** Employee service. */
EmployeeService employeeService = ...
/* Promise that expects an employee. */
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
/* Promise that returns when all employees are returned. */
final Promise<Void> allPromise = Promises.all(promise1, promise2);
allPromise.then(nil -> System.out.println("All DONE!"));
assertFalse("Not done yet", allPromise.complete());
/** Call service. */
employeeService.loadEmployee("1", promise1);
/** Still not done because only one service has been called. */
assertFalse("Still not done yet", allPromise.complete());
/** Ok now second service is called. */
employeeService.loadEmployee("2", promise2);
/** Wait some time. */
//...
assertTrue(allPromise.complete());
assertTrue(allPromise.success());
We have three types of all
promises: callback, blocking and replay callback. (A replay callback is a callback that gets replayed in the caller's thread).
...
public interface Promises ...{
/**
* All promises must complete.
* @param promises promises
* @return return containing promise
*/
static Promise<Void> all(Promise<?>... promises) {
return new AllPromise(promises);
}
/**
* All promises must complete.
* @param promises promises
* @return return containing promise that is blocking.
*/
static Promise<Void> allBlocking(Promise<?>... promises) {
return new AllBlockingPromise(promises);
}
/**
* All promises must complete.
* @param timeout timeout
* @param time time
* @param promises promises
* @return returns replay promise so promise can be replayed in caller's thread.
*/
static ReplayPromise<Void> allReplay(final Duration timeout, long time, Promise<?>... promises) {
return new AllReplayPromise(timeout, time, promises);
}
/**
*
* All promises must complete.
* @param timeout timeout
* @param promises promises
* @return returns replay promise so promise can be replayed in caller's thread.
*/
static ReplayPromise<Void> allReplay(final Duration timeout, Promise<?>... promises) {
return Promises.allReplay(timeout, System.currentTimeMillis(), promises);
}
...
Given this test service.
public static class TestService {
public void simple(Callback<Employee> callback) {
callback.reply(new Employee("Rick"));
}
public void async(final Callback<Employee> callback) {
new Thread(() -> {
callback.reply(new Employee("Rick"));
}).start();
}
public void asyncTimeout(final Callback<Employee> callback) {
new Thread(() -> {
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.reply(new Employee("Rick"));
}).start();
}
public void asyncError(final Callback<Employee> callback) {
new Thread(() -> {
callback.reject("Rick");
}).start();
}
public void error(Callback<Employee> callback) {
callback.reject("Error");
}
public void exception(Callback<Employee> callback) {
callback.reject(new IllegalStateException("Error"));
}
}
We can have these three example tests.
import io.advantageous.reakt.Callback;
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.Promises.promise;
@Test
public void testAllBlocking() throws Exception {
TestService testService = new TestService();
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
final Promise<Void> promise = Promises.allBlocking(promise1, promise2);
assertFalse(promise.complete());
testService.async(promise1);
assertFalse(promise.complete());
testService.async(promise2);
assertTrue(promise.success());
}
@Test
public void testAll() throws Exception {
/** Test service. */
TestService testService = new TestService();
/* Promise that expects an employee. */
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
/* Promise that returns when all employees are returned. */
final Promise<Void> promise = Promises.all(promise1, promise2);
promise.then(nil -> System.out.println("DONE!"));
assertFalse("Not done yet", promise.complete());
/** Call service. */
testService.simple(promise1);
/** Still not done because only one service has been called. */
assertFalse(promise.complete());
/** Ok now second service is called. */
testService.simple(promise2);
/** Wait some time. */
//...
assertTrue(promise.complete());
assertTrue(promise.success());
}
@Test
public void testAllReplay() throws Exception {
TestService testService = new TestService();
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
final ReplayPromise<Void> promise = Promises.allReplay(Duration.ofMillis(1000),
promise1, promise2);
assertFalse(promise.complete());
testService.async(promise1);
assertFalse(promise.complete());
testService.async(promise2);
for (int index=0; index < 10; index++) {
promise.check(System.currentTimeMillis());
if (promise.complete()) break;
Thread.sleep(10);
}
assertTrue(promise.complete());
assertTrue(promise.success());
}