Flink - nMoncho/helenus GitHub Wiki

Helenus integrates with Apache Flink by providing an implementation of a Source and SinkFunction for DataStreams. And an implementation of a InputFormat and OutputFormatBase for DataSets.

Our implementation follows Flink Cassandra Connector closely, but may change in the future.

Installation

Include the library into you project definition:

libraryDependencies += "net.nmoncho" %% "helenus-flink" % "1.7.0"

Setup

With Flink, reading from and writing to Cassandra is a bit different compared to Akka or Pekko. With Flink we don't define ScalaPreparedStatements, we define functions that given a [CqlSession] will produce a ScalaPreparedStatement. The reason behind this design decision stems from the fact that Flink distributes a computation over different machines, each which will create a session, prepare, bind, and execute the statement.

In Flink Cassandra Connector users define a CQL statement in the form of a String. This looses all the type-safety that's Helenus provides. We found out we could still use Helenus type-safety features was by providing ScalaPreparedStatement this way.

While using Helenus, creating a Flink application is done in the same way as other Flink application:

Reading from Cassandra

For a DataStream we can read from Cassandra by defining a ScalaPreparedStatement and using the asSource extension method:

import net.nmoncho.helenus._
import net.nmoncho.helenus.api.RowMapper
import net.nmoncho.helenus.api.cql.Adapter
import net.nmoncho.helenus.flink._
import net.nmoncho.helenus.flink.source.CassandraSource
import net.nmoncho.helenus.flink.typeinfo.TypeInformationDerivation._

case class Person(id: Int, name: String, city: String)

// Generic Derivation cannot be used :(
// implicit val rowMapper: RowMapper[Person] = RowMapper[Person] // this won't work

implicit val rowMapper: RowMapper[Person] = new RowMapper[Person] {
  override def apply(row: Row): Person = Person(
    row.getCol[Int]("id"),
    row.getCol[String]("name"),
    row.getCol[String]("city")
  )
}

implicit val personTypeInfo: TypeInformation[Person] = Pojo[Person]

val peopleQuery = (session: CqlSession) =>
  "SELECT * FROM flink_people".toCQL(session).prepareUnit.as[Person].apply()

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
  .setParallelism(2)

val dataStream: DataStream[Person] = streamEnv.fromSource(
  peopleQuery.asSource(
    CassandraSource
      .Config()
  ),
  WatermarkStrategy.noWatermarks(),
  "Cassandra Source"
)

For a DataSet we can write to Cassandra defining a ScalaPreparedStatement and using the asInputFormat extension method:

val execEnv = ExecutionEnvironment.getExecutionEnvironment

execEnv.setParallelism(2)

val dataSource: DataSource[Person] = execEnv.createDataSource(
  peopleQuery.asInputFormat(
    CassandraSource
      .Config()
  )
)

On RowMappers definition

Due to some serialization issue, we need to define RowMappers manually, like:

This may change in future releases once we understand where the problem is coming from and how can it be solved.

Writing to Cassandra

For a DataStream we can write to Cassandra by defining a ScalaPreparedStatement and using the addCassandraSink extension method on the DataStream:

import net.nmoncho.helenus.flink.sink.CassandraSink
import org.apache.flink.api.common.functions.MapFunction

implicit val personAdapter: Adapter[Person, (Int, String, String)] = new Adapter[Person, (Int, String, String)] with Serializable {
  def apply(p: Person): (Int, String, String) =
    (p.id, p.name, p.city)
}

val inputDataStream: DataStream[Person] = streamEnv.fromElements(
  Person(1, "John", "Rome"),
  Person(2, "Ralph", "New York"),
  Person(3, "Tim", "London")
)

val result: DataStream[(Int, String, String)] =
  inputDataStream.map(new MapFunction[Person, (Int, String, String)] {
    override def map(p: Person): (Int, String, String) =
      (p.id, p.name, p.city)
  })

result
  .addCassandraSink(
    "INSERT INTO flink_people(id, name, city) VALUES (?, ?, ?)"
      .toCQL(_)
      .prepare[Int, String, String],
    CassandraSink.Config()
  )

For a DataSet we can write to Cassandra defining a ScalaPreparedStatement and using the addCassandraOutput extension method on the DataSet:

import net.nmoncho.helenus.flink.sink.CassandraSink

val inputDataSet: DataSet[Person] = execEnv.fromElements(
  Person(1, "John", "Rome"),
  Person(2, "Ralph", "New York"),
  Person(3, "Tim", "London")
)

inputDataSet
  .addCassandraOutput(
    "INSERT INTO flink_people(id, name, city) VALUES (?, ?, ?)"
      .toCQL(_)
      .prepare[Int, String, String]
      .from[Person],
    CassandraSink.Config()
  )

Both extension method, addCassandraSink and addCassandraOutput, expect a function of the type CqlSession => ScalaPreparedStatement[T, Out], where Out is ignored.