Saving DStream to Geode

Spark Streaming extends the core API to allow high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc. Results can be stored in Geode.

A Simple Spark Streaming App: Stateful Network Word Count

Create a StreamingContext with a SparkConf configuration

val ssc = new StreamingContext(sparkConf, Seconds(1))

Create a DStream that will connect to net cat server host:port

val lines = ssc.socketTextStream(host, port)

Count each word in each batch

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

Use updateStateByKey to maintain a running count of each word seen in a text data stream. Here, the running count is the state and it is an integer. We define the update function as

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  val currentCount = values.foldLeft(0)(_ + _)
  val previousCount = state.getOrElse(0)
  Some(currentCount + previousCount)
}

This is applied on a DStream containing words (say, the pairs DStream containing (word, 3) pairs in the earlier example

val runningCounts = wordCounts.updateStateByKey[Int](updateFunction _)

Print a few of the counts to the console. Start the computation.

runningCounts.print()
ssc.start()
ssc.awaitTermination() // Wait for the computation to terminate

Spark Streaming With Geode

Now let's save the running word count to Geode region str_int_region, which simply replace print() with saveToGeode():

import io.pivotal.geode.spark.connector.streaming._
runningCounts.saveToGeode("str_int_region")

You can use the version of saveToGeode that has the parameter GeodeConnectionConf:

runningCounts.saveToGeode("str_int_region", connConf)

See [Spark Streaming Programming Guide] (http://spark.apache.org/docs/latest/streaming-programming-guide.html) for more details about Sarpk streaming programming.

Next: Geode OQL