Full example using Promises.all, promises, AsyncCallback and Reakt Guava bridge to implement Cassandra repository. - advantageous/reakt GitHub Wiki

This class uses an async supplier to connect to cassandra.

This example includes using Promises.all, promises, AsyncCallback and Reakt/Guava bridge to implement a Cassandra repository that is used by QBit and implements a circuit breaker. It also uses blockingPromises for testing.

Example of circuit breaker for Cassandra.

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.DriverException;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.time.Duration;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;

import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static io.advantageous.reakt.guava.Guava.*;
import static org.slf4j.LoggerFactory.getLogger;

public class CassandraImprintStorageService implements ImprintStorageService {

    /** Table to store impressions. */
    public static final String IMPRESSIONS_TABLE = "imprints";
    /** key space to do the storage. */
    public static final String KEY_SPACE = "keyspace_imprints";
    /** Logger. */
    private static final Logger logger = getLogger(CassandraImprintStorageService.class);
    /** Cassandra Session supplier. */
    private final AsyncSupplier<Session> sessionAsyncSupplier;
    /** QBit reactor for repeating tasks and callbacks that excecute on the caller's thread. */
    private final Reactor reactor;
    /** Reference to the cassandra session which get connected to async. */
    private Expected<Session> sessionExpected = Expected.empty();
    /** Error counts from Cassandra driver for the last time period. */
    private AtomicLong errorCount = new AtomicLong();

    /**
     *
     * @param sessionAsyncSupplier supplier to supply Cassandra session.
     * @param reactor reactor to manage callbacks and repeating tasks.
     */
    public CassandraImprintStorageService(final AsyncSupplier<Session> sessionAsyncSupplier,
                                          final Reactor reactor) {
        this.sessionAsyncSupplier = sessionAsyncSupplier;
        this.reactor = reactor;

        /* Connect the Cassandra session. */
        connectSession();

        /* This makes sure we are connected.
        *  Provide circuit breaker if sessionExpected is down to auto reconnect.
        */
        reactor.addRepeatingTask(Duration.SECONDS.units(5), this::cassandraCircuitBreaker);
    }


    /**
     * Store imprints into cassandra.
     * @param callback callback
     * @param imprints imprints
     */
    @Override
    public void store(final Callback<Boolean> callback,
                      final List<Imprint> imprints) {

        sessionExpected()
                /* if we are not connected, fail fast. */
                .ifEmpty(() -> callback.reject("Not connected to Cassandra"))
                /* If we are connected then call cassandra. */
                .ifPresent(session -> doStoreImprints(session, callback, imprints));

    }

    private void cassandraCircuitBreaker() {

        /** If the sessionExpected had more errors than allowed in the last time duration
         * then close the sessionExpected and reconnect.
         */
        if (errorCount.get() > 10) {

            final Expected<Session> oldExpected = sessionExpected(); //Get the old session.
            setsessionExpected(null); //shut it down.

            try {
                oldExpected.ifPresent(Session::close); //Close the old session.
            } catch (Exception ex) {
                logger.error("Shutting down cassandra and it failed", ex);
            }
            connectSession();
            return;
        }

        /** If the cassandra sessionExpected is not connected or present, then connect the
         * cassandra sessionExpected. */
        sessionExpected()
                .ifPresent(session -> {
                    /* If the session is closed then reconnect. */
                    if (session.isClosed()) {
                        setsessionExpected(null);
                        connectSession();
                    }
                })
                .ifEmpty(this::connectSession);

    }

    /**
     * Connects the cassandra connection.
     */
    private void connectSession() {

        sessionAsyncSupplier.get(
                Promises.<Session>promise()
                        .then(session -> {
                            logger.info("Cassandra sessionExpected is open");
                            setsessionExpected(session);
                        })
                        .catchError(error -> {
                            logger.error("Error connecting to Cassandra", error);
                            setsessionExpected(null);
                        })
        );
    }


    /** Does the low level cassandra storage. */
    private void doStoreImprints(final Session session,
                                 final Callback<Boolean> callback,
                                 final List<Imprint> imprints) {


        /* Make many calls to cassandra using its async lib to store
        each imprint. */
        final List<Promise<Boolean>> promises = imprints.stream().map(imprint
                -> doStoreImprint(session, imprint)).collect(Collectors.toList());


        /* Uses Reakt Promises.all
         * Create a parent promise to contain all of the promises we
         * just created for each imprint.
         */
        final Promise<Void> all = Promises.all(promises);

        /*
         * Store them all. Uses Reakt Promises.all
         */
        all.then(nil -> callback.accept(true))
                .catchError(callback::fail);

    }


    /**
     * This gets called one time for each imprint passed to the <code>store(callback, imprints)</code> method.
     * @param session cassandra session
     * @param imprint imprint to store
     * @return promise
     */
    private Promise<Boolean> doStoreImprint(final Session session,
                                            final Imprint imprint) {


        final ResultSetFuture resultSetFuture = session.executeAsync(QueryBuilder.insertInto(IMPRESSIONS_TABLE)
                .value("id", imprint.getId())
                .value("metricType", imprint.getMetricType().name().toLowerCase())
                .value("metricName", imprint.getMetricName())
                .value("provider", imprint.getProvider().toString())
                .value("externalId", imprint.getExternalId())
                .value("value", imprint.getValue())
                .value("created_at", imprint.getTimestamp())

        );


        final Promise<Boolean> returnedPromise = Promises.promise();
        final Promise<ResultSet> promise = Promises.<ResultSet>promise()
                .then(resultSet -> returnedPromise.reply(resultSet.wasApplied()))
                .catchError((error) -> {
                    returnedPromise.reject(error);
                    if (error instanceof DriverException) {
                        logger.error("Error storing imprint", error);
                        errorCount.incrementAndGet();
                    }
                });

        /** Using Guava/Reakt bridge. */
        registerCallback(resultSetFuture, promise);

        return returnedPromise;
    }

    private synchronized void setsessionExpected(Session session) {
        this.sessionExpected = Expected.ofNullable(session);
    }

    private synchronized Expected<Session> sessionExpected() {
        return sessionExpected;
    }

    @QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.IDLE, QueueCallbackType.LIMIT})
    public void process() {
        reactor.process();
    }
}

Example of Async supplier that finds Cassandra.

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.reakt.AsyncSupplier;
import io.advantageous.reakt.Callback;
import io.advantageous.reakt.promise.Promises;
import org.slf4j.Logger;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static io.advantageous.reakt.guava.Guava.registerCallback;
import static org.slf4j.LoggerFactory.getLogger;

public class CassandraSessionSupplier implements AsyncSupplier<Session> {

    private final static AtomicInteger index = new AtomicInteger();
    private static final Logger logger = getLogger(CassandraSessionSupplier.class);
    private final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionsAsyncSupplier;
    private final int replicationFactor;
    private final String keyspace;
    private final String tableName;
    private final ExecutorService executorService = Executors.newFixedThreadPool(3);

    public CassandraSessionSupplier(final AsyncSupplier<List<EndpointDefinition>> endpointDefinitionAsyncSupplier,
                                    final int replicationFactor,
                                    final String keyspace,
                                    final String tableName) {
        this.endpointDefinitionsAsyncSupplier = endpointDefinitionAsyncSupplier;
        this.replicationFactor = replicationFactor;
        this.keyspace = keyspace;
        this.tableName = tableName;
    }


    @Override
    public void get(final Callback<Session> callback) {

        logger.info("Loading Cassandra Session {} {}", keyspace, tableName);
        endpointDefinitionsAsyncSupplier.get(
                Promises.<List<EndpointDefinition>>promise()
                        .thenExpect(listExpected ->
                                listExpected.filter(endpointDefinitions -> endpointDefinitions.size() > 0)
                                        .map(this::getEndPointDef)
                                        .ifEmpty(() -> callback.reject("Cassandra was not found"))
                                        .ifPresent(endpointDefinition ->
                                                createCassandraSessionWithEndpoint(callback, endpointDefinition))
                        )
                        .catchError((e) -> callback.reject("Unable to lookup cassandra", e)));

    }

    private void createCassandraSessionWithEndpoint(final Callback<Session> callback,
                                                    final EndpointDefinition endpointDefinition) {

        /** Use Reakt/Guava bridge. */
        registerCallback(
                Cluster.builder()
                        .withPort(endpointDefinition.getPort())
                        .addContactPoints(endpointDefinition.getHost())
                        .build().connectAsync(),
                Promises.<Session>promise()
                        .catchError(e -> callback.reject("Unable to load initial session", e))
                        .thenExpect(sessionExpected ->
                                sessionExpected.ifEmpty(() -> callback.reject("Empty session returned from Cassandra Cluster"))
                                        .ifPresent((Consumer<Session>) sessionWithoutKeyspace ->
                                                buildDBIfNeeded(sessionWithoutKeyspace, callback, endpointDefinition))
                        ));

    }

    private EndpointDefinition getEndPointDef(List<EndpointDefinition> endpointDefinitions) {
        if (index.get() >= endpointDefinitions.size()) {
            index.set(0);
        }
        return endpointDefinitions.get(index.getAndIncrement());
    }

    private void buildDBIfNeeded(final Session sessionWithoutKeyspace,
                                 final Callback<Session> callback,
                                 final EndpointDefinition endpointDefinition) {

        executorService.execute(() -> {

            try {
                doBuildDatabase(sessionWithoutKeyspace);
            } catch (Exception ex) {
                callback.reject("Unable to create database", ex);
                return;
            }
            loadSession(callback, endpointDefinition);
        });



    }

    private void doBuildDatabase(Session sessionWithoutKeyspace) {

        logger.info("Initializing Cassandra Tables if needed {} {}", keyspace, tableName);
        sessionWithoutKeyspace.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION "
                + "= {'class':'SimpleStrategy', 'replication_factor':" + replicationFactor + "};");
        sessionWithoutKeyspace.execute("USE " + keyspace);
        sessionWithoutKeyspace.execute(
                "CREATE TABLE IF NOT EXISTS " + tableName +
                        " (id bigint,\n" +
                        " metricType text,\n" +
                        " metricName text,\n" +
                        " provider text,\n" +
                        " externalId text,\n" +
                        " value bigint,\n" +
                        " created_at timestamp,\n" +
                        " primary key (artistId, created_at))\n" +
                        "WITH CLUSTERING ORDER BY (created_at desc);");

        sessionWithoutKeyspace.close();
    }

    private void loadSession(final Callback<Session> callback,
                             final EndpointDefinition endpointDefinition) {


        logger.info("Loading session with keyspace {} {}", keyspace, tableName);


        registerCallback(
                Cluster.builder()
                        .withPort(endpointDefinition.getPort())
                        .addContactPoints(endpointDefinition.getHost())
                        .build().connectAsync(keyspace),
                Promises.<Session>promise()
                        .catchError(e -> callback.reject("Unable to load session", e))
                        .then(callback::reply)
        );

    }


}

Here is a test that shows using blocking promises to simplify the testing.

Using a blocking promise from the test

import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.reakt.Reakt;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;
import org.junit.Test;


public class CassandraImprintStorageServiceTest {

    @Test
    public void testStore() throws Exception {

        /* Supplier that will look up cassandra in lookup service. */
        final CassandraSessionSupplier supplier = new CassandraSessionSupplier(endpointDefinitionsAsyncSupplier(), 2,
                KEY_SPACE, IMPRESSIONS_TABLE);

        /* Reactor that we use to react to callbacks in the same thread as the caller. */
        final Reactor reactor = ReactorBuilder.reactorBuilder().build();

        /* Storage service. */
        final CassandraImprintStorageService cassandraImprintStorageService = new CassandraImprintStorageService(supplier,
                reactor);

        Thread.sleep(1000);

        /* Promise to handle the callback.
        * Since we are testing, we can use a blocking callback.
        */
        final Promise<Boolean> promise = Promises.<Boolean>blockingPromise().then(worked -> {

            System.out.println("Did it work? " + worked);
        }).catchError(Throwable::printStackTrace);

        cassandraImprintStorageService.store(Reakt.convertPromise(promise),
                asList(
                    ImprintBuilder.imprintBuilder()
                        .setArtistId(1L).setMetricType(MetricType.PLAYS)
                        .setExternalId("track").setTimestamp(System.currentTimeMillis())
                        .setMetricName("flips")
                        .setProvider("flipgram")
                        .setValue(100).build(),
                    ImprintBuilder.imprintBuilder()
                        .setArtistId(2L).setMetricType(MetricType.REACH)
                        .setExternalId("track").setTimestamp(System.currentTimeMillis())
                        .setMetricName("views")
                        .setProvider("facebook")
                        .setValue(50_000).build()
                )
        );

        assertTrue(promise.get());
    }

}

Here is another test showing testing a AsyncSupplier

Using blocking promise from a test.

import com.datastax.driver.core.Session;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.promise.Promises;

import org.junit.Test;

public class CassandraSessionSupplierTest {

    @Test
    public void testGet() throws Exception {

        CassandraSessionSupplier supplier = new CassandraSessionSupplier(endpointDefinitionsAsyncSupplier(), 2,
                KEY_SPACE, IMPRESSIONS_TABLE);

        /** Use blocking promise to simplify the test. */
        final Promise<Session> promise = Promises.<Session>blockingPromise();

        promise.catchError(throwable -> {
            System.err.println("Big problems");
            throwable.printStackTrace();
        });

        supplier.get(promise);
        final Session session = promise.get();


        assertNotNull(session);
    }
}
⚠️ **GitHub.com Fallback** ⚠️