Short examples - advantageous/reakt GitHub Wiki
public interface TodoRepo {
Promise<List<Todo>> loadTodos();
...
}
todoRep.loadTodos()
.then(todos -> {
logger.info("list todos");
returnPromise.resolve(todos);
})
.catchError(error -> {
logger.error("Unable to add todo to repo", error);
returnPromise.reject("Unable to add todo to repo");
})
.invoke();
reactor.runTaskAfter(Duration.ofSeconds(60), () -> {
logger.info("Registering health check and recovery for repo");
reactor.addRepeatingTask(Duration.ofSeconds(30), this::circuitBreakerTest);
});
//Connect to repo.
connect().catchError(error -> {
notConnectedCount++;
logger.error("Not connected to repo " + notConnectedCount, error);
...
}).thenSafe(connected -> {
...
notConnectedCount = 0;
}).invokeWithReactor(reactor);
private Breaker<Session> sessionBreaker =
Breaker.opened();
reactor.runTaskAfter(Duration.ofSeconds(60), () -> {
logger.info("Registering health check and recovery for repo");
reactor.addRepeatingTask(Duration.ofSeconds(30), this::circuitBreakerTest);
});
private void circuitBreakerTest() {
sessionBreaker.ifBroken(() -> {
serviceMgmt.increment("repo.breaker.broken");
//Clean up the old session.
sessionBreaker.cleanup(session -> {
try {
if (!session.isClosed()) { session.close(); }
} catch (Exception ex) { logger.warn("unable to clean up old session", ex); }
});
//Connect to repo.
connect().catchError(error -> {
notConnectedCount++;
logger.error("Not connected to repo " + notConnectedCount, error);
...
if (notConnectedCount > 10) {
logger.error("Attempts to reconnect to Repo failed. Mark it.");
serviceMgmt.increment("repo.connect.error.fatal");
serviceMgmt.setFailingWithError(error);
}
}).thenSafe(connected -> {
if (serviceMgmt.isFailing()) {
serviceMgmt.increment("repo.connect.recover");
serviceMgmt.recover();
}
notConnectedCount = 0;
}).invokeWithReactor(reactor);
});
}
The circuitBreakerTest
runs every 30 seconds.
reactor.runTaskAfter(Duration.ofSeconds(60), () -> {
reactor.addRepeatingTask(Duration.ofSeconds(30),
this::circuitBreakerTest);
});
sessionBreaker = Breaker.operational(session, 10,
theSession ->
!theSession.isClosed()
&& criticalRepoErrors.get() > 25
);
@Override
public Promise<Boolean> addTodo(final Todo todo) {
logger.info("Add Todo called");
return invokablePromise(promise -> sessionBreaker
.ifBroken(() -> {
final String message = "Not connected to repo while adding todo";
promise.reject(message);
logger.error(message);
serviceMgmt.increment("repo.breaker.broken");
})
.ifOperational(session ->
doAddTodo(todo, promise, session)
)
);
}