blob: a0019c66d268b6784c5018c84850a6ef3b5b6518 [file] [log] [blame] [view]
## Saving DStream to Geode
Spark Streaming extends the core API to allow high-throughput, fault-tolerant
stream processing of live data streams. Data can be ingested from many
sources such as Akka, Kafka, Flume, Twitter, ZeroMQ, TCP sockets, etc.
Results can be stored in Geode.
### A Simple Spark Streaming App: Stateful Network Word Count
Create a `StreamingContext` with a `SparkConf` configuration
```
val ssc = new StreamingContext(sparkConf, Seconds(1))
```
Create a DStream that will connect to net cat server `host:port`
```
val lines = ssc.socketTextStream(host, port)
```
Count each word in each batch
```
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
```
Use `updateStateByKey` to maintain a running count of each word seen in a text
data stream. Here, the running count is the state and it is an integer. We
define the update function as
```
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
```
This is applied on a DStream containing words (say, the pairs DStream containing
`(word, 3)` pairs in the earlier example
```
val runningCounts = wordCounts.updateStateByKey[Int](updateFunction _)
```
Print a few of the counts to the console. Start the computation.
```
runningCounts.print()
ssc.start()
ssc.awaitTermination() // Wait for the computation to terminate
```
#### Spark Streaming With Geode
Now let's save the running word count to Geode region `str_int_region`, which
simply replace print() with saveToGeode():
```
import io.pivotal.geode.spark.connector.streaming._
runningCounts.saveToGeode("str_int_region")
```
You can use the version of saveToGeode that has the parameter `GeodeConnectionConf`:
```
runningCounts.saveToGeode("str_int_region", connConf)
```
See [Spark Streaming Programming Guide]
(http://spark.apache.org/docs/latest/streaming-programming-guide.html) for
more details about Sarpk streaming programming.
Next: [Geode OQL](8_oql.md)