This connector provides sinks that write data into an Apache Cassandra database.
To use this connector, add the following dependency to your project:
{% highlight xml %} com.alibaba.blink flink-connector-cassandra{{ site.scala_version_suffix }} {{site.version }} {% endhighlight %}
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution [here]({{ site.baseurl}}/dev/linking.html).
There are multiple ways to bring up a Cassandra instance on local machine:
Flink's Cassandra sink is created by using the static CassandraSink.addSink(DataStream input) method. This method returns a CassandraSinkBuilder, which offers methods to further configure the sink, and finally build()
the sink instance.
The following configuration methods can be used:
A checkpoint committer stores additional information about completed checkpoints in some resource. This information is used to prevent a full replay of the last completed checkpoint in case of a failure. You can use a CassandraCommitter
to store these in a separate table in Cassandra. Note that this table will NOT be cleaned up by Flink.
Flink can provide exactly-once guarantees if the query is idempotent (meaning it can be applied multiple times without changing the result) and checkpointing is enabled. In case of a failure, the failed checkpoint will be replayed completely.
Furthermore, for non-deterministic programs, the write-ahead log has to be enabled. For such a program the replayed checkpoint may be completely different from the previous attempt, which may leave the database in an inconsistent state since part of the first attempt may already be written. The write-ahead log guarantees that the replayed checkpoint is identical to the first attempt. Note that that enabling this feature will have an adverse impact on latency.
With checkpointing enabled, Cassandra Sink guarantees at-least-once delivery of action requests to C* instance.
More details on [checkpoints docs]({{ site.baseurl }}/dev/stream/state/checkpointing.html) and [fault tolerance guarantee docs]({{ site.baseurl }}/dev/connectors/guarantees.html)
The Cassandra sinks currently support both Tuple and POJO data types, and Flink automatically detects which type of input is used. For general use case of those streaming data type, please refer to [Supported Data Types]({{ site.baseurl }}/dev/api_concepts.html). We show two implementations based on SocketWindowWordCount, for Pojo and Tuple data types respectively.
In all these examples, we assumed the associated Keyspace example
and Table wordcount
have been created.
While storing the result with Java/Scala Tuple data type to a Cassandra sink, it is required to set a CQL upsert statement (via setQuery(‘stmt’)) to persist each record back to the database. With the upsert query cached as PreparedStatement
, each Tuple element is converted to parameters of the statement.
For details about PreparedStatement
and BoundStatement
, please visit DataStax Java Driver manual
// get input data by connecting to the socket DataStream text = env.socketTextStream(hostname, port, “\n”);
// parse the data, group it, window it, and aggregate the counts DataStream<Tuple2<String, Long>> result = text .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Long>> out) { // normalize and split the line String[] words = value.toLowerCase().split(“\s”);
// emit the pairs for (String word : words) { //Do not accept empty word, since word is defined as primary key in C* table if (!word.isEmpty()) { out.collect(new Tuple2<String, Long>(word, 1L)); } } } }) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1);
CassandraSink.addSink(result) .setQuery(“INSERT INTO example.wordcount(word, count) values (?, ?);”) .setHost(“127.0.0.1”) .build(); {% endhighlight %}
// get input data by connecting to the socket val text: DataStream[String] = env.socketTextStream(hostname, port, ‘\n’)
// parse the data, group it, window it, and aggregate the counts val result: DataStream[(String, Long)] = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(.toLowerCase.split(“\s”)) .filter(.nonEmpty) .map((_, 1L)) // group by the tuple field “0” and sum up tuple field “1” .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1)
CassandraSink.addSink(result) .setQuery(“INSERT INTO example.wordcount(word, count) values (?, ?);”) .setHost(“127.0.0.1”) .build()
result.print().setParallelism(1) {% endhighlight %}
An example of streaming a POJO data type and store the same POJO entity back to Cassandra. In addition, this POJO implementation needs to follow DataStax Java Driver Manual to annotate the class as each field of this entity is mapped to an associated column of the designated table using the DataStax Java Driver com.datastax.driver.mapping.Mapper
class.
The mapping of each table column can be defined through annotations placed on a field declaration in the Pojo class. For details of the mapping, please refer to CQL documentation on Definition of Mapped Classes and CQL Datatypes
// get input data by connecting to the socket DataStream text = env.socketTextStream(hostname, port, “\n”);
// parse the data, group it, window it, and aggregate the counts DataStream result = text .flatMap(new FlatMapFunction<String, WordCount>() { public void flatMap(String value, Collector out) { // normalize and split the line String[] words = value.toLowerCase().split(“\s”);
// emit the pairs for (String word : words) { if (!word.isEmpty()) { //Do not accept empty word, since word is defined as primary key in C* table out.collect(new WordCount(word, 1L)); } } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordCount>() { @Override public WordCount reduce(WordCount a, WordCount b) { return new WordCount(a.getWord(), a.getCount() + b.getCount()); } });
CassandraSink.addSink(result) .setHost(“127.0.0.1”) .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)}) .build();
@Table(keyspace = “example”, name = “wordcount”) public class WordCount {
@Column(name = "word") private String word = ""; @Column(name = "count") private long count = 0; public WordCount() {} public WordCount(String word, long count) { this.setWord(word); this.setCount(count); } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } @Override public String toString() { return getWord() + " : " + getCount(); }
} {% endhighlight %}
{% top %}