| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <title>Spark Streaming Programming Guide - Spark 1.0.0 Documentation</title> |
| <meta name="description" content=""> |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <meta name="viewport" content="width=device-width"> |
| <link rel="stylesheet" href="css/bootstrap-responsive.min.css"> |
| <link rel="stylesheet" href="css/main.css"> |
| |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| |
| |
| <!-- Google analytics script --> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-32518208-1']); |
| _gaq.push(['_trackPageview']); |
| |
| (function() { |
| var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; |
| ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); |
| })(); |
| </script> |
| |
| |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="http://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <div class="navbar navbar-fixed-top" id="topbar"> |
| <div class="navbar-inner"> |
| <div class="container"> |
| <div class="brand"><a href="index.html"> |
| <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">1.0.0</span> |
| </div> |
| <ul class="nav"> |
| <!--TODO(andyk): Add class="active" attribute to li some how.--> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="quick-start.html">Quick Start</a></li> |
| <li><a href="programming-guide.html">Spark Programming Guide</a></li> |
| <li class="divider"></li> |
| <li><a href="streaming-programming-guide.html">Spark Streaming</a></li> |
| <li><a href="sql-programming-guide.html">Spark SQL</a></li> |
| <li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li> |
| <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> |
| <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="api/scala/index.html#org.apache.spark.package">Scaladoc</a></li> |
| <li><a href="api/java/index.html">Javadoc</a></li> |
| <li><a href="api/python/index.html">Python API</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="cluster-overview.html">Overview</a></li> |
| <li><a href="submitting-applications.html">Submitting Applications</a></li> |
| <li class="divider"></li> |
| <li><a href="ec2-scripts.html">Amazon EC2</a></li> |
| <li><a href="spark-standalone.html">Standalone Mode</a></li> |
| <li><a href="running-on-mesos.html">Mesos</a></li> |
| <li><a href="running-on-yarn.html">YARN</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="configuration.html">Configuration</a></li> |
| <li><a href="monitoring.html">Monitoring</a></li> |
| <li><a href="tuning.html">Tuning Guide</a></li> |
| <li><a href="job-scheduling.html">Job Scheduling</a></li> |
| <li><a href="security.html">Security</a></li> |
| <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> |
| <li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li> |
| <li class="divider"></li> |
| <li><a href="building-with-maven.html">Building Spark with Maven</a></li> |
| <li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!--<p class="navbar-text pull-right"><span class="version-text">v1.0.0</span></p>--> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container" id="content"> |
| |
| <h1 class="title">Spark Streaming Programming Guide</h1> |
| |
| |
| <ul id="markdown-toc"> |
| <li><a href="#overview">Overview</a></li> |
| <li><a href="#a-quick-example">A Quick Example</a></li> |
| <li><a href="#basics">Basics</a> <ul> |
| <li><a href="#linking">Linking</a></li> |
| <li><a href="#initializing">Initializing</a></li> |
| <li><a href="#dstreams">DStreams</a></li> |
| <li><a href="#input-sources">Input Sources</a></li> |
| <li><a href="#operations">Operations</a> <ul> |
| <li><a href="#transformations">Transformations</a></li> |
| <li><a href="#output-operations">Output Operations</a></li> |
| </ul> |
| </li> |
| <li><a href="#persistence">Persistence</a></li> |
| <li><a href="#rdd-checkpointing">RDD Checkpointing</a></li> |
| <li><a href="#deployment">Deployment</a></li> |
| <li><a href="#monitoring">Monitoring</a></li> |
| </ul> |
| </li> |
| <li><a href="#performance-tuning">Performance Tuning</a> <ul> |
| <li><a href="#reducing-the-processing-time-of-each-batch">Reducing the Processing Time of each Batch</a> <ul> |
| <li><a href="#level-of-parallelism-in-data-receiving">Level of Parallelism in Data Receiving</a></li> |
| <li><a href="#level-of-parallelism-in-data-processing">Level of Parallelism in Data Processing</a></li> |
| <li><a href="#data-serialization">Data Serialization</a></li> |
| <li><a href="#task-launching-overheads">Task Launching Overheads</a></li> |
| </ul> |
| </li> |
| <li><a href="#setting-the-right-batch-size">Setting the Right Batch Size</a></li> |
| <li><a href="#memory-tuning">Memory Tuning</a></li> |
| </ul> |
| </li> |
| <li><a href="#fault-tolerance-properties">Fault-tolerance Properties</a> <ul> |
| <li><a href="#failure-of-a-worker-node">Failure of a Worker Node</a></li> |
| <li><a href="#failure-of-the-driver-node">Failure of the Driver Node</a></li> |
| </ul> |
| </li> |
| <li><a href="#migration-guide-from-091-or-below-to-1x">Migration Guide from 0.9.1 or below to 1.x</a></li> |
| <li><a href="#where-to-go-from-here">Where to Go from Here</a></li> |
| </ul> |
| |
| <h1 id="overview">Overview</h1> |
| <p>Spark Streaming is an extension of the core Spark API that allows enables high-throughput, |
| fault-tolerant stream processing of live data streams. Data can be ingested from many sources |
| like Kafka, Flume, Twitter, ZeroMQ or plain old TCP sockets and be processed using complex |
| algorithms expressed with high-level functions like <code>map</code>, <code>reduce</code>, <code>join</code> and <code>window</code>. |
| Finally, processed data can be pushed out to filesystems, databases, |
| and live dashboards. In fact, you can apply Spark’s in-built |
| <a href="mllib-guide.html">machine learning</a> algorithms, and |
| <a href="graphx-programming-guide.html">graph processing</a> algorithms on data streams.</p> |
| |
| <p style="text-align: center;"> |
| <img src="img/streaming-arch.png" title="Spark Streaming architecture" alt="Spark Streaming" width="70%" /> |
| </p> |
| |
| <p>Internally, it works as follows. Spark Streaming receives live input data streams and divides |
| the data into batches, which are then processed by the Spark engine to generate the final |
| stream of results in batches.</p> |
| |
| <p style="text-align: center;"> |
| <img src="img/streaming-flow.png" title="Spark Streaming data flow" alt="Spark Streaming" width="70%" /> |
| </p> |
| |
| <p>Spark Streaming provides a high-level abstraction called <em>discretized stream</em> or <em>DStream</em>, |
| which represents a continuous stream of data. DStreams can be created either from input data |
| stream from sources such as Kafka and Flume, or by applying high-level |
| operations on other DStreams. Internally, a DStream is represented as a sequence of |
| <a href="api/scala/index.html#org.apache.spark.rdd.RDD">RDDs</a>.</p> |
| |
| <p>This guide shows you how to start writing Spark Streaming programs with DStreams. You can |
| write Spark Streaming programs in Scala or Java, both of which are presented in this guide. You |
| will find tabs throughout this guide that let you choose between Scala and Java |
| code snippets.</p> |
| |
| <hr /> |
| |
| <h1 id="a-quick-example">A Quick Example</h1> |
| <p>Before we go into the details of how to write your own Spark Streaming program, |
| let’s take a quick look at what a simple Spark Streaming program looks like. Let’s say we want to |
| count the number of words in text data received from a data server listening on a TCP |
| socket. All you need to |
| do is as follows.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| <p>First, we import the names of the Spark Streaming classes, and some implicit |
| conversions from StreamingContext into our environment, to add useful methods to |
| other classes we need (like DStream).</p> |
| |
| <p><a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a> is the |
| main entry point for all streaming functionality.</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">import</span> <span class="nn">org.apache.spark.streaming._</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.streaming.StreamingContext._</span> |
| </code></pre></div> |
| |
| <p>Then we create a |
| <a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a> object. |
| Besides Spark’s configuration, we specify that any DStream will be processed |
| in 1 second batches.</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">import</span> <span class="nn">org.apache.spark.api.java.function._</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.streaming._</span> |
| <span class="k">import</span> <span class="nn">org.apache.spark.streaming.api._</span> |
| <span class="c1">// Create a StreamingContext with a local master</span> |
| <span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="s">"local"</span><span class="o">,</span> <span class="s">"NetworkWordCount"</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span> |
| </code></pre></div> |
| |
| <p>Using this context, we then create a new DStream |
| by specifying the IP address and port of the data server.</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="c1">// Create a DStream that will connect to serverIP:serverPort, like localhost:9999</span> |
| <span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">)</span> |
| </code></pre></div> |
| |
| <p>This <code>lines</code> DStream represents the stream of data that will be received from the data |
| server. Each record in this DStream is a line of text. Next, we want to split the lines by |
| space into words.</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="c1">// Split each line into words</span> |
| <span class="k">val</span> <span class="n">words</span> <span class="k">=</span> <span class="n">lines</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> |
| </code></pre></div> |
| |
| <p><code>flatMap</code> is a one-to-many DStream operation that creates a new DStream by |
| generating multiple new records from each record in the source DStream. In this case, |
| each line will be split into multiple words and the stream of words is represented as the |
| <code>words</code> DStream. Next, we want to count these words.</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">import</span> <span class="nn">org.apache.spark.streaming.StreamingContext._</span> |
| <span class="c1">// Count each word in each batch</span> |
| <span class="k">val</span> <span class="n">pairs</span> <span class="k">=</span> <span class="n">words</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">word</span> <span class="k">=></span> <span class="o">(</span><span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> |
| <span class="k">val</span> <span class="n">wordCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKey</span><span class="o">(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">)</span> |
| |
| <span class="c1">// Print a few of the counts to the console</span> |
| <span class="n">wordCounts</span><span class="o">.</span><span class="n">print</span><span class="o">()</span> |
| </code></pre></div> |
| |
| <p>The <code>words</code> DStream is further mapped (one-to-one transformation) to a DStream of <code>(word, |
| 1)</code> pairs, which is then reduced to get the frequency of words in each batch of data. |
| Finally, <code>wordCounts.print()</code> will print a few of the counts generated every second.</p> |
| |
| <p>Note that when these lines are executed, Spark Streaming only sets up the computation it |
| will perform when it is started, and no real processing has started yet. To start the processing |
| after all the transformations have been setup, we finally call</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="n">ssc</span><span class="o">.</span><span class="n">start</span><span class="o">()</span> <span class="c1">// Start the computation</span> |
| <span class="n">ssc</span><span class="o">.</span><span class="n">awaitTermination</span><span class="o">()</span> <span class="c1">// Wait for the computation to terminate</span> |
| </code></pre></div> |
| |
| <p>The complete code can be found in the Spark Streaming example |
| <a href="https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala">NetworkWordCount</a>. |
| <br /></p> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <p>First, we create a |
| <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html">JavaStreamingContext</a> object, |
| which is the main entry point for all streaming |
| functionality. Besides Spark’s configuration, we specify that any DStream would be processed |
| in 1 second batches.</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.*</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.streaming.*</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> |
| <span class="kn">import</span> <span class="nn">scala.Tuple2</span><span class="o">;</span> |
| <span class="c1">// Create a StreamingContext with a local master</span> |
| <span class="n">JavaStreamingContext</span> <span class="n">jssc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JavaStreamingContext</span><span class="o">(</span><span class="s">"local"</span><span class="o">,</span> <span class="s">"JavaNetworkWordCount"</span><span class="o">,</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">1000</span><span class="o">))</span> |
| </code></pre></div> |
| |
| <p>Using this context, we then create a new DStream |
| by specifying the IP address and port of the data server.</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="c1">// Create a DStream that will connect to serverIP:serverPort, like localhost:9999</span> |
| <span class="n">JavaReceiverInputDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="na">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">);</span> |
| </code></pre></div> |
| |
| <p>This <code>lines</code> DStream represents the stream of data that will be received from the data |
| server. Each record in this stream is a line of text. Then, we want to split the the lines by |
| space into words.</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="c1">// Split each line into words</span> |
| <span class="n">JavaDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">lines</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Iterable</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">call</span><span class="o">(</span><span class="n">String</span> <span class="n">x</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">x</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">));</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| </code></pre></div> |
| |
| <p><code>flatMap</code> is a DStream operation that creates a new DStream by |
| generating multiple new records from each record in the source DStream. In this case, |
| each line will be split into multiple words and the stream of words is represented as the |
| <code>words</code> DStream. Note that we defined the transformation using a |
| <a href="api/scala/index.html#org.apache.spark.api.java.function.FlatMapFunction">FlatMapFunction</a> object. |
| As we will discover along the way, there are a number of such convenience classes in the Java API |
| that help define DStream transformations.</p> |
| |
| <p>Next, we want to count these words.</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="c1">// Count each word in each batch</span> |
| <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">pairs</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">map</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">PairFunction</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">call</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="k">new</span> <span class="n">Tuple2</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>(</span><span class="n">s</span><span class="o">,</span> <span class="mi">1</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">reduceByKey</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Integer</span> <span class="n">call</span><span class="o">(</span><span class="n">Integer</span> <span class="n">i1</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">i2</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">i1</span> <span class="o">+</span> <span class="n">i2</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| <span class="n">wordCounts</span><span class="o">.</span><span class="na">print</span><span class="o">();</span> <span class="c1">// Print a few of the counts to the console</span> |
| </code></pre></div> |
| |
| <p>The <code>words</code> DStream is further mapped (one-to-one transformation) to a DStream of <code>(word, |
| 1)</code> pairs, using a <a href="api/scala/index.html#org.apache.spark.api.java.function.PairFunction">PairFunction</a> |
| object. Then, it is reduced to get the frequency of words in each batch of data, |
| using a <a href="api/scala/index.html#org.apache.spark.api.java.function.Function2">Function2</a> object. |
| Finally, <code>wordCounts.print()</code> will print a few of the counts generated every second.</p> |
| |
| <p>Note that when these lines are executed, Spark Streaming only sets up the computation it |
| will perform when it is started, and no real processing has started yet. To start the processing |
| after all the transformations have been setup, we finally call</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="n">jssc</span><span class="o">.</span><span class="na">start</span><span class="o">();</span> <span class="c1">// Start the computation</span> |
| <span class="n">jssc</span><span class="o">.</span><span class="na">awaitTermination</span><span class="o">();</span> <span class="c1">// Wait for the computation to terminate</span> |
| </code></pre></div> |
| |
| <p>The complete code can be found in the Spark Streaming example |
| <a href="https://github.com/apache/spark/blob/master/examples/src/main/java/index.html?org/apache/spark/examples/streaming/JavaNetworkWordCount.java">JavaNetworkWordCount</a>. |
| <br /></p> |
| |
| </div> |
| </div> |
| |
| <p>If you have already <a href="index.html#downloading">downloaded</a> and <a href="index.html#building">built</a> Spark, |
| you can run this example as follows. You will first need to run Netcat |
| (a small utility found in most Unix-like systems) as a data server by using</p> |
| |
| <div class="highlight"><pre><code class="bash"><span class="nv">$ </span>nc -lk 9999 |
| </code></pre></div> |
| |
| <p>Then, in a different terminal, you can start the example by using</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="bash"><span class="nv">$ </span>./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="bash"><span class="nv">$ </span>./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999 |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>Then, any lines typed in the terminal running the netcat server will be counted and printed on |
| screen every second. It will look something like this.</p> |
| |
| <table width="100%"> |
| <td> |
| |
| <div class="highlight"><pre><code class="bash"><span class="c"># TERMINAL 1:</span> |
| <span class="c"># Running Netcat</span> |
| |
| <span class="nv">$ </span>nc -lk 9999 |
| |
| hello world |
| |
| |
| |
| ... |
| </code></pre></div> |
| |
| </td> |
| <td width="2%"></td> |
| <td> |
| |
| <div class="highlight"><pre><code class="bash"><span class="c"># TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount</span> |
| |
| <span class="nv">$ </span>./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 |
| ... |
| ------------------------------------------- |
| Time: 1357008430000 ms |
| ------------------------------------------- |
| <span class="o">(</span>hello,1<span class="o">)</span> |
| <span class="o">(</span>world,1<span class="o">)</span> |
| ... |
| </code></pre></div> |
| |
| </td> |
| </table> |
| |
| <p>You can also use Spark Streaming directly from the Spark shell:</p> |
| |
| <div class="highlight"><pre><code class="bash"><span class="nv">$ </span>bin/spark-shell |
| </code></pre></div> |
| |
| <p>… and create your StreamingContext by wrapping the existing interactive shell |
| SparkContext object, <code>sc</code>:</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">sc</span><span class="o">,</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span> |
| </code></pre></div> |
| |
| <p>When working with the shell, you may also need to send a <code>^D</code> to your netcat session |
| to force the pipeline to print the word counts to the console at the sink.</p> |
| |
| <hr /> |
| |
| <h1 id="basics">Basics</h1> |
| |
| <p>Next, we move beyond the simple example and elaborate on the basics of Spark Streaming that you |
| need to know to write your streaming applications.</p> |
| |
| <h2 id="linking">Linking</h2> |
| |
| <p>To write your own Spark Streaming program, you will have to add the following dependency to your |
| SBT or Maven project:</p> |
| |
| <pre><code>groupId = org.apache.spark |
| artifactId = spark-streaming_2.10 |
| version = 1.0.0 |
| </code></pre> |
| |
| <p>For ingesting data from sources like Kafka and Flume that are not present in the Spark |
| Streaming core |
| API, you will have to add the corresponding |
| artifact <code>spark-streaming-xyz_2.10</code> to the dependencies. For example, |
| some of the common ones are as follows.</p> |
| |
| <table class="table"> |
| <tr><th>Source</th><th>Artifact</th></tr> |
| <tr><td> Kafka </td><td> spark-streaming-kafka_2.10 </td></tr> |
| <tr><td> Flume </td><td> spark-streaming-flume_2.10 </td></tr> |
| <tr><td> Twitter </td><td> spark-streaming-twitter_2.10 </td></tr> |
| <tr><td> ZeroMQ </td><td> spark-streaming-zeromq_2.10 </td></tr> |
| <tr><td> MQTT </td><td> spark-streaming-mqtt_2.10 </td></tr> |
| <tr><td> </td><td></td></tr> |
| </table> |
| |
| <p>For an up-to-date list, please refer to the |
| <a href="http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%221.0.0%22">Apache repository</a> |
| for the full list of supported sources and artifacts.</p> |
| |
| <h2 id="initializing">Initializing</h2> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>To initialize a Spark Streaming program in Scala, a |
| <a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext"><code>StreamingContext</code></a> |
| object has to be created, which is the main entry point of all Spark Streaming functionality. |
| A <code>StreamingContext</code> object can be created by using</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(</span><span class="n">master</span><span class="o">,</span> <span class="n">appName</span><span class="o">,</span> <span class="n">batchDuration</span><span class="o">,</span> <span class="o">[</span><span class="kt">sparkHome</span><span class="o">],</span> <span class="o">[</span><span class="kt">jars</span><span class="o">])</span> |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <p>To initialize a Spark Streaming program in Java, a |
| <a href="api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext"><code>JavaStreamingContext</code></a> |
| object has to be created, which is the main entry point of all Spark Streaming functionality. |
| A <code>JavaStreamingContext</code> object can be created by using</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">new</span> <span class="nc">JavaStreamingContext</span><span class="o">(</span><span class="n">master</span><span class="o">,</span> <span class="n">appName</span><span class="o">,</span> <span class="n">batchInterval</span><span class="o">,</span> <span class="o">[</span><span class="kt">sparkHome</span><span class="o">],</span> <span class="o">[</span><span class="kt">jars</span><span class="o">])</span> |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>The <code>master</code> parameter is a standard <a href="programming-guide.html#master-urls">Spark cluster URL</a> |
| and can be “local” for local testing. The <code>appName</code> is a name of your program, |
| which will be shown on your cluster’s web UI. The <code>batchInterval</code> is the size of the batches, |
| as explained earlier. Finally, the last two parameters are needed to deploy your code to a cluster |
| if running in distributed mode, as described in the |
| <a href="programming-guide.html#deploying-code-on-a-cluster">Spark programming guide</a>. |
| Additionally, the underlying SparkContext can be accessed as |
| <code>ssc.sparkContext</code>.</p> |
| |
| <p>The batch interval must be set based on the latency requirements of your application |
| and available cluster resources. See the <a href="#setting-the-right-batch-size">Performance Tuning</a> |
| section for more details.</p> |
| |
| <h2 id="dstreams">DStreams</h2> |
| <p><em>Discretized Stream</em> or <em>DStream</em> is the basic abstraction provided by Spark Streaming. |
| It represents a continuous stream of data, either the input data stream received from source, |
| or the processed data stream generated by transforming the input stream. Internally, |
| it is represented by a continuous sequence of RDDs, which is Spark’s abstraction of an immutable, |
| distributed dataset. Each RDD in a DStream contains data from a certain interval, |
| as shown in the following figure.</p> |
| |
| <p style="text-align: center;"> |
| <img src="img/streaming-dstream.png" title="Spark Streaming data flow" alt="Spark Streaming" width="70%" /> |
| </p> |
| |
| <p>Any operation applied on a DStream translates to operations on the underlying RDDs. For example, |
| in the <a href="#a-quick-example">earlier example</a> of converting a stream of lines to words, |
| the <code>flatmap</code> operation is applied on each RDD in the <code>lines</code> DStream to generate the RDDs of the |
| <code>words</code> DStream. This is shown the following figure.</p> |
| |
| <p style="text-align: center;"> |
| <img src="img/streaming-dstream-ops.png" title="Spark Streaming data flow" alt="Spark Streaming" width="70%" /> |
| </p> |
| |
| <p>These underlying RDD transformations are computed by the Spark engine. The DStream operations |
| hide most of these details and provides the developer with higher-level API for convenience. |
| These operations are discussed in detail in later sections.</p> |
| |
| <h2 id="input-sources">Input Sources</h2> |
| |
| <p>We have already taken a look at the <code>ssc.socketTextStream(...)</code> in the <a href="#a-quick-example">quick |
| example</a> which creates a DStream from text |
| data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides |
| methods for creating DStreams from files and Akka actors as input sources.</p> |
| |
| <p>Specifically, for files, the DStream can be created as</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="scala"><span class="n">ssc</span><span class="o">.</span><span class="n">fileStream</span><span class="o">(</span><span class="n">dataDirectory</span><span class="o">)</span> |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="java"><span class="n">jssc</span><span class="o">.</span><span class="na">fileStream</span><span class="o">(</span><span class="n">dataDirectory</span><span class="o">);</span> |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>Spark Streaming will monitor the directory <code>dataDirectory</code> for any Hadoop-compatible filesystem |
| and process any files created in that directory. Note that</p> |
| |
| <ul> |
| <li>The files must have the same data format.</li> |
| <li>The files must be created in the <code>dataDirectory</code> by atomically <em>moving</em> or <em>renaming</em> them into |
| the data directory.</li> |
| <li>Once moved the files must not be changed.</li> |
| </ul> |
| |
| <p>For more details on streams from files, Akka actors and sockets, |
| see the API documentations of the relevant functions in |
| <a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a> for |
| Scala and <a href="api/scala/index.html#org.apache.spark.streaming.api.java.JavaStreamingContext">JavaStreamingContext</a> |
| for Java.</p> |
| |
| <p>Additional functionality for creating DStreams from sources such as Kafka, Flume, and Twitter |
| can be imported by adding the right dependencies as explained in an |
| <a href="#linking">earlier</a> section. To take the |
| case of Kafka, after adding the artifact <code>spark-streaming-kafka_2.10</code> to the |
| project dependencies, you can create a DStream from Kafka as</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">import</span> <span class="nn">org.apache.spark.streaming.kafka._</span> |
| <span class="nc">KafkaUtils</span><span class="o">.</span><span class="n">createStream</span><span class="o">(</span><span class="n">ssc</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="o">...)</span> |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="java"><span class="kn">import</span> <span class="nn">org.apache.spark.streaming.kafka.*</span><span class="o">;</span> |
| <span class="n">KafkaUtils</span><span class="o">.</span><span class="na">createStream</span><span class="o">(</span><span class="n">jssc</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="o">...);</span> |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>For more details on these additional sources, see the corresponding <a href="#where-to-go-from-here">API documentation</a>. |
| Furthermore, you can also implement your own custom receiver for your sources. See the |
| <a href="streaming-custom-receivers.html">Custom Receiver Guide</a>.</p> |
| |
| <h2 id="operations">Operations</h2> |
| <p>There are two kinds of DStream operations - <em>transformations</em> and <em>output operations</em>. Similar to |
| RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams |
| with transformed data. After applying a sequence of transformations to the input streams, output |
| operations need to called, which write data out to an external data sink, such as a filesystem or a |
| database.</p> |
| |
| <h3 id="transformations">Transformations</h3> |
| <p>DStreams support many of the transformations available on normal Spark RDD’s. Some of the |
| common ones are as follows.</p> |
| |
| <table class="table"> |
| <tr><th style="width:25%">Transformation</th><th>Meaning</th></tr> |
| <tr> |
| <td> <b>map</b>(<i>func</i>) </td> |
| <td> Return a new DStream by passing each element of the source DStream through a |
| function <i>func</i>. </td> |
| </tr> |
| <tr> |
| <td> <b>flatMap</b>(<i>func</i>) </td> |
| <td> Similar to map, but each input item can be mapped to 0 or more output items. </td> |
| </tr> |
| <tr> |
| <td> <b>filter</b>(<i>func</i>) </td> |
| <td> Return a new DStream by selecting only the records of the source DStream on which |
| <i>func</i> returns true. </td> |
| </tr> |
| <tr> |
| <td> <b>repartition</b>(<i>numPartitions</i>) </td> |
| <td> Changes the level of parallelism in this DStream by creating more or fewer partitions. </td> |
| </tr> |
| <tr> |
| <td> <b>union</b>(<i>otherStream</i>) </td> |
| <td> Return a new DStream that contains the union of the elements in the source DStream and |
| <i>otherDStream</i>. </td> |
| </tr> |
| <tr> |
| <td> <b>count</b>() </td> |
| <td> Return a new DStream of single-element RDDs by counting the number of elements in each RDD |
| of the source DStream. </td> |
| </tr> |
| <tr> |
| <td> <b>reduce</b>(<i>func</i>) </td> |
| <td> Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the |
| source DStream using a function <i>func</i> (which takes two arguments and returns one). |
| The function should be associative so that it can be computed in parallel. </td> |
| </tr> |
| <tr> |
| <td> <b>countByValue</b>() </td> |
| <td> When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs |
| where the value of each key is its frequency in each RDD of the source DStream. </td> |
| </tr> |
| <tr> |
| <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td> |
| <td> When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the |
| values for each key are aggregated using the given reduce function. <b>Note:</b> By default, |
| this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number |
| is determined by the config property <code>spark.default.parallelism</code>) to do the grouping. |
| You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td> |
| </tr> |
| <tr> |
| <td> <b>join</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td> |
| <td> When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) |
| pairs with all pairs of elements for each key. </td> |
| </tr> |
| <tr> |
| <td> <b>cogroup</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td> |
| <td> When called on DStream of (K, V) and (K, W) pairs, return a new DStream of |
| (K, Seq[V], Seq[W]) tuples.</td> |
| </tr> |
| <tr> |
| <td> <b>transform</b>(<i>func</i>) </td> |
| <td> Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. |
| This can be used to do arbitrary RDD operations on the DStream. </td> |
| </tr> |
| <tr> |
| <td> <b>updateStateByKey</b>(<i>func</i>) </td> |
| <td> Return a new "state" DStream where the state for each key is updated by applying the |
| given function on the previous state of the key and the new values for the key. This can be |
| used to maintain arbitrary state data for each key.</td> |
| </tr> |
| <tr><td></td><td></td></tr> |
| </table> |
| |
| <p>The last two transformations are worth highlighting again.</p> |
| |
| <h4>UpdateStateByKey Operation</h4> |
| |
| <p>The <code>updateStateByKey</code> operation allows you to maintain arbitrary state while continuously updating |
| it with new information. To use this, you will have to do two steps.</p> |
| |
| <ol> |
| <li>Define the state - The state can be of arbitrary data type.</li> |
| <li>Define the state update function - Specify with a function how to update the state using the |
| previous state and the new values from input stream.</li> |
| </ol> |
| |
| <p>Let’s illustrate this with an example. Say you want to maintain a running count of each word |
| seen in a text data stream. Here, the running count is the state and it is an integer. We |
| define the update function as</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">def</span> <span class="n">updateFunction</span><span class="o">(</span><span class="n">newValues</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Int</span><span class="o">],</span> <span class="n">runningCount</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Int</span><span class="o">])</span><span class="k">:</span> <span class="kt">Option</span><span class="o">[</span><span class="kt">Int</span><span class="o">]</span> <span class="k">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">newCount</span> <span class="k">=</span> <span class="o">...</span> <span class="c1">// add the new values with the previous running count to get the new count</span> |
| <span class="nc">Some</span><span class="o">(</span><span class="n">newCount</span><span class="o">)</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| |
| <p>This is applied on a DStream containing words (say, the <code>pairs</code> DStream containing <code>(word, |
| 1)</code> pairs in the <a href="#a-quick-example">earlier example</a>).</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">runningCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">updateStateByKey</span><span class="o">[</span><span class="kt">Int</span><span class="o">](</span><span class="n">updateFunction</span> <span class="k">_</span><span class="o">)</span> |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="java"><span class="kn">import</span> <span class="nn">com.google.common.base.Optional</span><span class="o">;</span> |
| <span class="n">Function2</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>></span> <span class="n">updateFunction</span> <span class="o">=</span> |
| <span class="k">new</span> <span class="n">Function2</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">call</span><span class="o">(</span><span class="n">List</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">values</span><span class="o">,</span> <span class="n">Optional</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">state</span><span class="o">)</span> <span class="o">{</span> |
| <span class="n">Integer</span> <span class="n">newSum</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// add the new values with the previous running count to get the new count</span> |
| <span class="k">return</span> <span class="n">Optional</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">newSum</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">};</span> |
| </code></pre></div> |
| |
| <p>This is applied on a DStream containing words (say, the <code>pairs</code> DStream containing <code>(word, |
| 1)</code> pairs in the <a href="#a-quick-example">quick example</a>).</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">runningCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">updateStateByKey</span><span class="o">(</span><span class="n">updateFunction</span><span class="o">);</span> |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>The update function will be called for each word, with <code>newValues</code> having a sequence of 1’s (from |
| the <code>(word, 1)</code> pairs) and the <code>runningCount</code> having the previous count. For the complete |
| Scala code, take a look at the example |
| <a href="https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala">StatefulNetworkWordCount</a>.</p> |
| |
| <h4>Transform Operation</h4> |
| |
| <p>The <code>transform</code> operation (along with its variations like <code>transformWith</code>) allows |
| arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD |
| operation that is not exposed in the DStream API. |
| For example, the functionality of joining every batch in a data stream |
| with another dataset is not directly exposed in the DStream API. However, |
| you can easily use <code>transform</code> to do this. This enables very powerful possibilities. For example, |
| if you want to do real-time data cleaning by joining the input data stream with precomputed |
| spam information (maybe generated with Spark as well) and then filtering based on it.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="scala"><span class="k">val</span> <span class="n">spamInfoRDD</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">newAPIHadoopRDD</span><span class="o">(...)</span> <span class="c1">// RDD containing spam information</span> |
| |
| <span class="k">val</span> <span class="n">cleanedDStream</span> <span class="k">=</span> <span class="n">wordCounts</span><span class="o">.</span><span class="n">transform</span><span class="o">(</span><span class="n">rdd</span> <span class="k">=></span> <span class="o">{</span> |
| <span class="n">rdd</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">spamInfoRDD</span><span class="o">).</span><span class="n">filter</span><span class="o">(...)</span> <span class="c1">// join data stream with spam information to do data cleaning</span> |
| <span class="o">...</span> |
| <span class="o">})</span> |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="java"><span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span> |
| <span class="c1">// RDD containing spam information</span> |
| <span class="kd">final</span> <span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Double</span><span class="o">></span> <span class="n">spamInfoRDD</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="na">sparkContext</span><span class="o">().</span><span class="na">newAPIHadoopRDD</span><span class="o">(...);</span> |
| |
| <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">cleanedDStream</span> <span class="o">=</span> <span class="n">wordCounts</span><span class="o">.</span><span class="na">transform</span><span class="o">(</span> |
| <span class="k">new</span> <span class="n">Function</span><span class="o"><</span><span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>,</span> <span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">call</span><span class="o">(</span><span class="n">JavaPairRDD</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">rdd</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="n">rdd</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">spamInfoRDD</span><span class="o">).</span><span class="na">filter</span><span class="o">(...);</span> <span class="c1">// join data stream with spam information to do data cleaning</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| <span class="o">});</span> |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>In fact, you can also use <a href="mllib-guide.html">machine learning</a> and |
| <a href="graphx-programming-guide.html">graph computation</a> algorithms in the <code>transform</code> method.</p> |
| |
| <h4>Window Operations</h4> |
| |
| <p>Finally, Spark Streaming also provides <em>windowed computations</em>, which allow you to apply |
| transformations over a sliding window of data. This following figure illustrates this sliding |
| window.</p> |
| |
| <p style="text-align: center;"> |
| <img src="img/streaming-dstream-window.png" title="Spark Streaming data flow" alt="Spark Streaming" width="60%" /> |
| </p> |
| |
| <p>As shown in the figure, every time the window <em>slides</em> over a source DStream, |
| the source RDDs that fall within the window are combined and operated upon to produce the |
| RDDs of the windowed DStream. In this specific case, the operation is applied over last 3 time |
| units of data, and slides by 2 time units. This shows that any window-based operation needs to |
| specify two parameters.</p> |
| |
| <ul> |
| <li><i>window length</i> - The duration of the window (3 in the figure)</li> |
| <li><i>slide interval</i> - The interval at which the window-based operation is performed (2 in |
| the figure).</li> |
| </ul> |
| |
| <p>These two parameters must be multiples of the batch interval of the source DStream (1 in the |
| figure).</p> |
| |
| <p>Let’s illustrate the window operations with an example. Say, you want to extend the |
| <a href="#a-quick-example">earlier example</a> by generating word counts over last 30 seconds of data, |
| every 10 seconds. To do this, we have to apply the <code>reduceByKey</code> operation on the <code>pairs</code> DStream of |
| <code>(word, 1)</code> pairs over the last 30 seconds of data. This is done using the |
| operation <code>reduceByKeyAndWindow</code>.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <div class="highlight"><pre><code class="scala"><span class="c1">// Reduce last 30 seconds of data, every 10 seconds</span> |
| <span class="k">val</span> <span class="n">windowedWordCounts</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">reduceByKeyAndWindow</span><span class="o">((</span><span class="n">a</span><span class="k">:</span><span class="kt">Int</span><span class="o">,</span><span class="n">b</span><span class="k">:</span><span class="kt">Int</span><span class="o">)</span> <span class="k">=></span> <span class="o">(</span><span class="n">a</span> <span class="o">+</span> <span class="n">b</span><span class="o">),</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="nc">Seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">))</span> |
| </code></pre></div> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <div class="highlight"><pre><code class="java"><span class="c1">// Reduce function adding two integers, defined separately for clarity</span> |
| <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">reduceFunc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Function2</span><span class="o"><</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">Integer</span><span class="o">>()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">Integer</span> <span class="n">call</span><span class="o">(</span><span class="n">Integer</span> <span class="n">i1</span><span class="o">,</span> <span class="n">Integer</span> <span class="n">i2</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span> |
| <span class="k">return</span> <span class="n">i1</span> <span class="o">+</span> <span class="n">i2</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">};</span> |
| |
| <span class="c1">// Reduce last 30 seconds of data, every 10 seconds</span> |
| <span class="n">JavaPairDStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">></span> <span class="n">windowedWordCounts</span> <span class="o">=</span> <span class="n">pairs</span><span class="o">.</span><span class="na">reduceByKeyAndWindow</span><span class="o">(</span><span class="n">reduceFunc</span><span class="o">,</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">30000</span><span class="o">),</span> <span class="k">new</span> <span class="n">Duration</span><span class="o">(</span><span class="mi">10000</span><span class="o">));</span> |
| </code></pre></div> |
| |
| </div> |
| </div> |
| |
| <p>Some of the common window-based operations are as follows. All of these operations take the |
| said two parameters - <i>windowLength</i> and <i>slideInterval</i>.</p> |
| |
| <table class="table"> |
| <tr><th style="width:25%">Transformation</th><th>Meaning</th></tr> |
| <tr> |
| <td> <b>window</b>(<i>windowLength</i>, <i>slideInterval</i>) </td> |
| <td> Return a new DStream which is computed based on windowed batches of the source DStream. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>countByWindow</b>(<i>windowLength</i>, <i>slideInterval</i>) </td> |
| <td> Return a sliding window count of elements in the stream. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>) </td> |
| <td> Return a new single-element stream, created by aggregating elements in the stream over a |
| sliding interval using <i>func</i>. The function should be associative so that it can be computed |
| correctly in parallel. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>, |
| [<i>numTasks</i>]) </td> |
| <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) |
| pairs where the values for each key are aggregated using the given reduce function <i>func</i> |
| over batches in a sliding window. <b>Note:</b> By default, this uses Spark's default number of |
| parallel tasks (2 for local mode, and in cluster mode the number is determined by the config |
| property <code>spark.default.parallelism</code>) to do the grouping. You can pass an optional |
| <code>numTasks</code> argument to set a different number of tasks. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>invFunc</i>, <i>windowLength</i>, |
| <i>slideInterval</i>, [<i>numTasks</i>]) </td> |
| <td> A more efficient version of the above <code>reduceByKeyAndWindow()</code> where the reduce |
| value of each window is calculated incrementally using the reduce values of the previous window. |
| This is done by reducing the new data that enter the sliding window, and "inverse reducing" the |
| old data that leave the window. An example would be that of "adding" and "subtracting" counts |
| of keys as the window slides. However, it is applicable to only "invertible reduce functions", |
| that is, those reduce functions which have a corresponding "inverse reduce" function (taken as |
| parameter <i>invFunc</i>. Like in <code>reduceByKeyAndWindow</code>, the number of reduce tasks |
| is configurable through an optional argument. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>countByValueAndWindow</b>(<i>windowLength</i>, |
| <i>slideInterval</i>, [<i>numTasks</i>]) </td> |
| <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the |
| value of each key is its frequency within a sliding window. Like in |
| <code>reduceByKeyAndWindow</code>, the number of reduce tasks is configurable through an |
| optional argument. |
| </td> |
| </tr> |
| <tr><td></td><td></td></tr> |
| </table> |
| |
| <h3 id="output-operations">Output Operations</h3> |
| <p>When an output operator is called, it triggers the computation of a stream. Currently the following |
| output operators are defined:</p> |
| |
| <table class="table"> |
| <tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr> |
| <tr> |
| <td> <b>print</b>() </td> |
| <td> Prints first ten elements of every batch of data in a DStream on the driver. </td> |
| </tr> |
| <tr> |
| <td> <b>foreachRDD</b>(<i>func</i>) </td> |
| <td> The fundamental output operator. Applies a function, <i>func</i>, to each RDD generated from |
| the stream. This function should have side effects, such as printing output, saving the RDD to |
| external files, or writing it over the network to an external system. </td> |
| </tr> |
| <tr> |
| <td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td> |
| <td> Save this DStream's contents as a <code>SequenceFile</code> of serialized objects. The file |
| name at each batch interval is generated based on <i>prefix</i> and |
| <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. |
| </td> |
| </tr> |
| <tr> |
| <td> <b>saveAsTextFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td> |
| <td> Save this DStream's contents as a text files. The file name at each batch interval is |
| generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td> |
| </tr> |
| <tr> |
| <td> <b>saveAsHadoopFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td> |
| <td> Save this DStream's contents as a Hadoop file. The file name at each batch interval is |
| generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td> |
| </tr> |
| <tr><td></td><td></td></tr> |
| </table> |
| |
| <p>The complete list of DStream operations is available in the API documentation. For the Scala API, |
| see <a href="api/scala/index.html#org.apache.spark.streaming.dstream.DStream">DStream</a> |
| and <a href="api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions">PairDStreamFunctions</a>. |
| For the Java API, see <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html">JavaDStream</a> |
| and <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html">JavaPairDStream</a>.</p> |
| |
| <h2 id="persistence">Persistence</h2> |
| <p>Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, |
| using <code>persist()</code> method on a DStream would automatically persist every RDD of that DStream in |
| memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple |
| operations on the same data). For window-based operations like <code>reduceByWindow</code> and |
| <code>reduceByKeyAndWindow</code> and state-based operations like <code>updateStateByKey</code>, this is implicitly true. |
| Hence, DStreams generated by window-based operations are automatically persisted in memory, without |
| the developer calling <code>persist()</code>.</p> |
| |
| <p>For input streams that receive data over the network (such as, Kafka, Flume, sockets, etc.), the |
| default persistence level is set to replicate the data to two nodes for fault-tolerance.</p> |
| |
| <p>Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in |
| memory. This is further discussed in the <a href="#memory-tuning">Performance Tuning</a> section. More |
| information on different persistence levels can be found in |
| <a href="programming-guide.html#rdd-persistence">Spark Programming Guide</a>.</p> |
| |
| <h2 id="rdd-checkpointing">RDD Checkpointing</h2> |
| <p>A <em>stateful operation</em> is one which operates over multiple batches of data. This includes all |
| window-based operations and the <code>updateStateByKey</code> operation. Since stateful operations have a |
| dependency on previous batches of data, they continuously accumulate metadata over time. |
| To clear this metadata, streaming supports periodic <em>checkpointing</em> by saving intermediate data |
| to HDFS. Note that checkpointing also incurs the cost of saving to HDFS which may cause the |
| corresponding batch to take longer to process. Hence, the interval of checkpointing needs to be |
| set carefully. At small batch sizes (say 1 second), checkpointing every batch may significantly |
| reduce operation throughput. Conversely, checkpointing too slowly causes the lineage and task |
| sizes to grow which may have detrimental effects. Typically, a checkpoint interval of 5 - 10 |
| times of sliding interval of a DStream is good setting to try.</p> |
| |
| <p>To enable checkpointing, the developer has to provide the HDFS path to which RDD will be saved. |
| This is done by using</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="n">ssc</span><span class="o">.</span><span class="n">checkpoint</span><span class="o">(</span><span class="n">hdfsPath</span><span class="o">)</span> <span class="c1">// assuming ssc is the StreamingContext or JavaStreamingContext</span> |
| </code></pre></div> |
| |
| <p>The interval of checkpointing of a DStream can be set by using</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="n">dstream</span><span class="o">.</span><span class="n">checkpoint</span><span class="o">(</span><span class="n">checkpointInterval</span><span class="o">)</span> |
| </code></pre></div> |
| |
| <p>For DStreams that must be checkpointed (that is, DStreams created by <code>updateStateByKey</code> and |
| <code>reduceByKeyAndWindow</code> with inverse function), the checkpoint interval of the DStream is by |
| default set to a multiple of the DStream’s sliding interval such that its at least 10 seconds.</p> |
| |
| <h2 id="deployment">Deployment</h2> |
| <p>A Spark Streaming application is deployed on a cluster in the same way as any other Spark application. |
| Please refer to the <a href="cluster-overview.html">deployment guide</a> for more details.</p> |
| |
| <p>If a running Spark Streaming application needs to be upgraded (with new application code), then |
| there are two possible mechanism.</p> |
| |
| <ul> |
| <li> |
| <p>The upgraded Spark Streaming application is started and run in parallel to the existing application. |
| Once the new one (receiving the same data as the old one) has been warmed up and ready |
| for prime time, the old one be can be brought down. Note that this can be done for data sources that support |
| sending the data to two destinations (i.e., the earlier and upgraded applications).</p> |
| </li> |
| <li> |
| <p>The existing application is shutdown gracefully (see |
| <a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext"><code>StreamingContext.stop(...)</code></a> |
| or <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html"><code>JavaStreamingContext.stop(...)</code></a> |
| for graceful shutdown options) which ensure data that have been received is completely |
| processed before shutdown. Then the |
| upgraded application can be started, which will start processing from the same point where the earlier |
| application left off. Note that this can be done only with input sources that support source-side buffering |
| (like Kafka, and Flume) as data needs to be buffered while the previous application down and |
| the upgraded application is not yet up.</p> |
| </li> |
| </ul> |
| |
| <h2 id="monitoring">Monitoring</h2> |
| <p>Beyond Spark’s <a href="monitoring.html">monitoring capabilities</a>, there are additional capabilities |
| specific to Spark Streaming. When a StreamingContext is used, the |
| <a href="monitoring.html#web-interfaces">Spark web UI</a> shows |
| an additional <code>Streaming</code> tab which shows statistics about running receivers (whether |
| receivers are active, number of records received, receiver error, etc.) |
| and completed batches (batch processing times, queueing delays, etc.). This can be used to |
| monitor the progress of the streaming application.</p> |
| |
| <p>The following two metrics in web UI is particularly important - |
| <em>Processing Time</em> and <em>Scheduling Delay</em> (under <em>Batch Processing Statistics</em>). The first is the |
| time to process each batch of data, and the second is the time a batch waits in a queue |
| for the processing of previous batches to finish. If the batch processing time is consistently more |
| than the batch interval and/or the queueing delay keeps increasing, then it indicates the system is |
| not able to process the batches as fast they are being generated and falling behind. |
| In that case, consider |
| <a href="#reducing-the-processing-time-of-each-batch">reducing</a> the batch processing time.</p> |
| |
| <p>The progress of a Spark Streaming program can also be monitored using the |
| <a href="api/scala/index.html#org.apache.spark.scheduler.StreamingListener">StreamingListener</a> interface, |
| which allows you to get receiver status and processing times. Note that this is a developer API |
| and it is likely to be improved upon (i.e., more information reported) in the future.</p> |
| |
| <hr /> |
| |
| <h1 id="performance-tuning">Performance Tuning</h1> |
| <p>Getting the best performance of a Spark Streaming application on a cluster requires a bit of |
| tuning. This section explains a number of the parameters and configurations that can tuned to |
| improve the performance of you application. At a high level, you need to consider two things:</p> |
| |
| <ol> |
| <li> |
| Reducing the processing time of each batch of data by efficiently using cluster resources. |
| </li> |
| <li> |
| Setting the right batch size such that the batches of data can be processed as fast as they |
| are received (that is, data processing keeps up with the data ingestion). |
| </li> |
| </ol> |
| |
| <h2 id="reducing-the-processing-time-of-each-batch">Reducing the Processing Time of each Batch</h2> |
| <p>There are a number of optimizations that can be done in Spark to minimize the processing time of |
| each batch. These have been discussed in detail in <a href="tuning.html">Tuning Guide</a>. This section |
| highlights some of the most important ones.</p> |
| |
| <h3 id="level-of-parallelism-in-data-receiving">Level of Parallelism in Data Receiving</h3> |
| <p>Receiving data over the network (like Kafka, Flume, socket, etc.) requires the data to deserialized |
| and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider |
| parallelizing the data receiving. Note that each input DStream |
| creates a single receiver (running on a worker machine) that receives a single stream of data. |
| Receiving multiple data streams can therefore be achieved by creating multiple input DStreams |
| and configuring them to receive different partitions of the data stream from the source(s). |
| For example, a single Kafka input stream receiving two topics of data can be split into two |
| Kafka input streams, each receiving only one topic. This would run two receivers on two workers, |
| thus allowing data to received in parallel, and increasing overall throughput.</p> |
| |
| <p>Another parameter that should be considered is the receiver’s blocking interval. For most receivers, |
| the received data is coalesced together into large blocks of data before storing inside Spark’s memory. |
| The number of blocks in each batch determines the number of tasks that will be used to process those |
| the received data in a map-like transformation. This blocking interval is determined by the |
| <a href="configuration.html">configuration parameter</a> <code>spark.streaming.blockInterval</code> and the default value |
| is 200 milliseconds.</p> |
| |
| <p>An alternative to receiving data with multiple input streams / receivers is to explicitly repartition |
| the input data stream (using <code>inputStream.repartition(<number of partitions>)</code>). |
| This distributes the received batches of data across all the machines in the cluster |
| before further processing.</p> |
| |
| <h3 id="level-of-parallelism-in-data-processing">Level of Parallelism in Data Processing</h3> |
| <p>Cluster resources maybe under-utilized if the number of parallel tasks used in any stage of the |
| computation is not high enough. For example, for distributed reduce operations like <code>reduceByKey</code> |
| and <code>reduceByKeyAndWindow</code>, the default number of parallel tasks is decided by the [config property] |
| (configuration.html#spark-properties) <code>spark.default.parallelism</code>. You can pass the level of |
| parallelism as an argument (see [<code>PairDStreamFunctions</code>] |
| (api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions) |
| documentation), or set the <a href="configuration.html#spark-properties">config property</a> |
| <code>spark.default.parallelism</code> to change the default.</p> |
| |
| <h3 id="data-serialization">Data Serialization</h3> |
| <p>The overhead of data serialization can be significant, especially when sub-second batch sizes are |
| to be achieved. There are two aspects to it.</p> |
| |
| <ul> |
| <li> |
| <p><strong>Serialization of RDD data in Spark</strong>: Please refer to the detailed discussion on data |
| serialization in the <a href="tuning.html">Tuning Guide</a>. However, note that unlike Spark, by default |
| RDDs are persisted as serialized byte arrays to minimize pauses related to GC.</p> |
| </li> |
| <li> |
| <p><strong>Serialization of input data</strong>: To ingest external data into Spark, data received as bytes |
| (say, from the network) needs to deserialized from bytes and re-serialized into Spark’s |
| serialization format. Hence, the deserialization overhead of input data may be a bottleneck.</p> |
| </li> |
| </ul> |
| |
| <h3 id="task-launching-overheads">Task Launching Overheads</h3> |
| <p>If the number of tasks launched per second is high (say, 50 or more per second), then the overhead |
| of sending out tasks to the slaves maybe significant and will make it hard to achieve sub-second |
| latencies. The overhead can be reduced by the following changes:</p> |
| |
| <ul> |
| <li> |
| <p><strong>Task Serialization</strong>: Using Kryo serialization for serializing tasks can reduced the task |
| sizes, and therefore reduce the time taken to send them to the slaves.</p> |
| </li> |
| <li> |
| <p><strong>Execution mode</strong>: Running Spark in Standalone mode or coarse-grained Mesos mode leads to |
| better task launch times than the fine-grained Mesos mode. Please refer to the |
| <a href="running-on-mesos.html">Running on Mesos guide</a> for more details.</p> |
| </li> |
| </ul> |
| |
| <p>These changes may reduce batch processing time by 100s of milliseconds, |
| thus allowing sub-second batch size to be viable.</p> |
| |
| <h2 id="setting-the-right-batch-size">Setting the Right Batch Size</h2> |
| <p>For a Spark Streaming application running on a cluster to be stable, the system should be able to |
| process data as fast as it is being received. In other words, batches of data should be processed |
| as fast as they are being generated. Whether this is true for an application can be found by |
| <a href="#monitoring">monitoring</a> the processing times in the streaming web UI, where the batch |
| processing time should be less than the batch interval.</p> |
| |
| <p>Depending on the nature of the streaming |
| computation, the batch interval used may have significant impact on the data rates that can be |
| sustained by the application on a fixed set of cluster resources. For example, let us |
| consider the earlier WordCountNetwork example. For a particular data rate, the system may be able |
| to keep up with reporting word counts every 2 seconds (i.e., batch interval of 2 seconds), but not |
| every 500 milliseconds. So the batch interval needs to be set such that the expected data rate in |
| production can be sustained.</p> |
| |
| <p>A good approach to figure out the right batch size for your application is to test it with a |
| conservative batch interval (say, 5-10 seconds) and a low data rate. To verify whether the system |
| is able to keep up with data rate, you can check the value of the end-to-end delay experienced |
| by each processed batch (either look for “Total delay” in Spark driver log4j logs, or use the |
| <a href="api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener">StreamingListener</a> |
| interface). |
| If the delay is maintained to be comparable to the batch size, then system is stable. Otherwise, |
| if the delay is continuously increasing, it means that the system is unable to keep up and it |
| therefore unstable. Once you have an idea of a stable configuration, you can try increasing the |
| data rate and/or reducing the batch size. Note that momentary increase in the delay due to |
| temporary data rate increases maybe fine as long as the delay reduces back to a low value |
| (i.e., less than batch size).</p> |
| |
| <h2 id="memory-tuning">Memory Tuning</h2> |
| <p>Tuning the memory usage and GC behavior of Spark applications have been discussed in great detail |
| in the <a href="tuning.html">Tuning Guide</a>. It is recommended that you read that. In this section, |
| we highlight a few customizations that are strongly recommended to minimize GC related pauses |
| in Spark Streaming applications and achieving more consistent batch processing times.</p> |
| |
| <ul> |
| <li> |
| <p><strong>Default persistence level of DStreams</strong>: Unlike RDDs, the default persistence level of DStreams |
| serializes the data in memory (that is, |
| <a href="api/scala/index.html#org.apache.spark.storage.StorageLevel$">StorageLevel.MEMORY_ONLY_SER</a> for |
| DStream compared to |
| <a href="api/scala/index.html#org.apache.spark.storage.StorageLevel$">StorageLevel.MEMORY_ONLY</a> for RDDs). |
| Even though keeping the data serialized incurs higher serialization/deserialization overheads, |
| it significantly reduces GC pauses.</p> |
| </li> |
| <li> |
| <p><strong>Clearing persistent RDDs</strong>: By default, all persistent RDDs generated by Spark Streaming will |
| be cleared from memory based on Spark’s in-built policy (LRU). If <code>spark.cleaner.ttl</code> is set, |
| then persistent RDDs that are older than that value are periodically cleared. As mentioned |
| <a href="#operation">earlier</a>, this needs to be careful set based on operations used in the Spark |
| Streaming program. However, a smarter unpersisting of RDDs can be enabled by setting the |
| <a href="configuration.html#spark-properties">configuration property</a> <code>spark.streaming.unpersist</code> to |
| <code>true</code>. This makes the system to figure out which RDDs are not necessary to be kept around and |
| unpersists them. This is likely to reduce |
| the RDD memory usage of Spark, potentially improving GC behavior as well.</p> |
| </li> |
| <li> |
| <p><strong>Concurrent garbage collector</strong>: Using the concurrent mark-and-sweep GC further |
| minimizes the variability of GC pauses. Even though concurrent GC is known to reduce the |
| overall processing throughput of the system, its use is still recommended to achieve more |
| consistent batch processing times.</p> |
| </li> |
| </ul> |
| |
| <hr /> |
| |
| <h1 id="fault-tolerance-properties">Fault-tolerance Properties</h1> |
| <p>In this section, we are going to discuss the behavior of Spark Streaming application in the event |
| of a node failure. To understand this, let us remember the basic fault-tolerance properties of |
| Spark’s RDDs.</p> |
| |
| <ol> |
| <li>An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD |
| remembers the lineage of deterministic operations that were used on a fault-tolerant input |
| dataset to create it.</li> |
| <li>If any partition of an RDD is lost due to a worker node failure, then that partition can be |
| re-computed from the original fault-tolerant dataset using the lineage of operations.</li> |
| </ol> |
| |
| <p>Since all data transformations in Spark Streaming are based on RDD operations, as long as the input |
| dataset is present, all intermediate data can recomputed. Keeping these properties in mind, we are |
| going to discuss the failure semantics in more detail.</p> |
| |
| <h2 id="failure-of-a-worker-node">Failure of a Worker Node</h2> |
| <p>There are two failure behaviors based on which input sources are used.</p> |
| |
| <ol> |
| <li><em>Using HDFS files as input source</em> - Since the data is reliably stored on HDFS, all data can |
| re-computed and therefore no data will be lost due to any failure.</li> |
| <li><em>Using any input source that receives data through a network</em> - For network-based data sources |
| like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster |
| (default replication factor is 2). So if a worker node fails, then the system can recompute the |
| lost from the the left over copy of the input data. However, if the worker node where a network |
| receiver was running fails, then a tiny bit of data may be lost, that is, the data received by |
| the system but not yet replicated to other node(s). The receiver will be started on a different |
| node and it will continue to receive data.</li> |
| </ol> |
| |
| <p>Since all data is modeled as RDDs with their lineage of deterministic operations, any recomputation |
| always leads to the same result. As a result, all DStream transformations are guaranteed to have |
| <em>exactly-once</em> semantics. That is, the final transformed result will be same even if there were |
| was a worker node failure. However, output operations (like <code>foreachRDD</code>) have <em>at-least once</em> |
| semantics, that is, the transformed data may get written to an external entity more than once in |
| the event of a worker failure. While this is acceptable for saving to HDFS using the |
| <code>saveAs*Files</code> operations (as the file will simply get over-written by the same data), |
| additional transactions-like mechanisms may be necessary to achieve exactly-once semantics |
| for output operations.</p> |
| |
| <h2 id="failure-of-the-driver-node">Failure of the Driver Node</h2> |
| <p>For a streaming application to operate 24/7, Spark Streaming allows a streaming computation |
| to be resumed even after the failure of the driver node. Spark Streaming periodically writes the |
| metadata information of the DStreams setup through the <code>StreamingContext</code> to a |
| HDFS directory (can be any Hadoop-compatible filesystem). This periodic |
| <em>checkpointing</em> can be enabled by setting the checkpoint |
| directory using <code>ssc.checkpoint(<checkpoint directory>)</code> as described |
| <a href="#rdd-checkpointing">earlier</a>. On failure of the driver node, |
| the lost <code>StreamingContext</code> can be recovered from this information, and restarted.</p> |
| |
| <p>To allow a Spark Streaming program to be recoverable, it must be written in a way such that |
| it has the following behavior:</p> |
| |
| <ol> |
| <li>When the program is being started for the first time, it will create a new StreamingContext, |
| set up all the streams and then call start().</li> |
| <li>When the program is being restarted after failure, it will re-create a StreamingContext |
| from the checkpoint data in the checkpoint directory.</li> |
| </ol> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <p>This behavior is made simple by using <code>StreamingContext.getOrCreate</code>. This is used as follows.</p> |
| |
| <div class="highlight"><pre><code class="scala"><span class="c1">// Function to create and setup a new StreamingContext</span> |
| <span class="k">def</span> <span class="n">functionToCreateContext</span><span class="o">()</span><span class="k">:</span> <span class="kt">StreamingContext</span> <span class="o">=</span> <span class="o">{</span> |
| <span class="k">val</span> <span class="n">ssc</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StreamingContext</span><span class="o">(...)</span> <span class="c1">// new context</span> |
| <span class="k">val</span> <span class="n">lines</span> <span class="k">=</span> <span class="n">ssc</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(...)</span> <span class="c1">// create DStreams</span> |
| <span class="o">...</span> |
| <span class="n">ssc</span><span class="o">.</span><span class="n">checkpoint</span><span class="o">(</span><span class="n">checkpointDirectory</span><span class="o">)</span> <span class="c1">// set checkpoint directory</span> |
| <span class="n">ssc</span> |
| <span class="o">}</span> |
| |
| <span class="c1">// Get StreaminContext from checkpoint data or create a new one</span> |
| <span class="k">val</span> <span class="n">context</span> <span class="k">=</span> <span class="nc">StreamingContext</span><span class="o">.</span><span class="n">getOrCreate</span><span class="o">(</span><span class="n">checkpointDirectory</span><span class="o">,</span> <span class="n">functionToCreateContext</span> <span class="k">_</span><span class="o">)</span> |
| |
| <span class="c1">// Do additional setup on context that needs to be done,</span> |
| <span class="c1">// irrespective of whether it is being started or restarted</span> |
| <span class="n">context</span><span class="o">.</span> <span class="o">...</span> |
| |
| <span class="c1">// Start the context</span> |
| <span class="n">context</span><span class="o">.</span><span class="n">start</span><span class="o">()</span> |
| <span class="n">context</span><span class="o">.</span><span class="n">awaitTermination</span><span class="o">()</span> |
| </code></pre></div> |
| |
| <p>If the <code>checkpointDirectory</code> exists, then the context will be recreated from the checkpoint data. |
| If the directory does not exist (i.e., running for the first time), |
| then the function <code>functionToCreateContext</code> will be called to create a new |
| context and set up the DStreams. See the Scala example |
| <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala">RecoverableNetworkWordCount</a>. |
| This example appends the word counts of network data into a file.</p> |
| |
| <p>You can also explicitly create a <code>StreamingContext</code> from the checkpoint data and start the |
| computation by using <code>new StreamingContext(checkpointDirectory)</code>.</p> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <p>This behavior is made simple by using <code>JavaStreamingContext.getOrCreate</code>. This is used as follows.</p> |
| |
| <div class="highlight"><pre><code class="java"><span class="c1">// Create a factory object that can create a and setup a new JavaStreamingContext</span> |
| <span class="n">JavaStreamingContextFactory</span> <span class="n">contextFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JavaStreamingContextFactory</span><span class="o">()</span> <span class="o">{</span> |
| <span class="nd">@Override</span> <span class="kd">public</span> <span class="n">JavaStreamingContext</span> <span class="n">create</span><span class="o">()</span> <span class="o">{</span> |
| <span class="n">JavaStreamingContext</span> <span class="n">jssc</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JavaStreamingContext</span><span class="o">(...);</span> <span class="c1">// new context</span> |
| <span class="n">JavaDStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span> <span class="n">lines</span> <span class="o">=</span> <span class="n">jssc</span><span class="o">.</span><span class="na">socketTextStream</span><span class="o">(...);</span> <span class="c1">// create DStreams</span> |
| <span class="o">...</span> |
| <span class="n">jssc</span><span class="o">.</span><span class="na">checkpoint</span><span class="o">(</span><span class="n">checkpointDirectory</span><span class="o">);</span> <span class="c1">// set checkpoint directory</span> |
| <span class="k">return</span> <span class="n">jssc</span><span class="o">;</span> |
| <span class="o">}</span> |
| <span class="o">};</span> |
| |
| <span class="c1">// Get JavaStreamingContext from checkpoint data or create a new one</span> |
| <span class="n">JavaStreamingContext</span> <span class="n">context</span> <span class="o">=</span> <span class="n">JavaStreamingContext</span><span class="o">.</span><span class="na">getOrCreate</span><span class="o">(</span><span class="n">checkpointDirectory</span><span class="o">,</span> <span class="n">contextFactory</span><span class="o">);</span> |
| |
| <span class="c1">// Do additional setup on context that needs to be done,</span> |
| <span class="c1">// irrespective of whether it is being started or restarted</span> |
| <span class="n">context</span><span class="o">.</span> <span class="o">...</span> |
| |
| <span class="c1">// Start the context</span> |
| <span class="n">context</span><span class="o">.</span><span class="na">start</span><span class="o">();</span> |
| <span class="n">context</span><span class="o">.</span><span class="na">awaitTermination</span><span class="o">();</span> |
| </code></pre></div> |
| |
| <p>If the <code>checkpointDirectory</code> exists, then the context will be recreated from the checkpoint data. |
| If the directory does not exist (i.e., running for the first time), |
| then the function <code>contextFactory</code> will be called to create a new |
| context and set up the DStreams. See the Scala example |
| <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaRecoverableWordCount.scala">JavaRecoverableWordCount</a> |
| (note that this example is missing in the 0.9 release, so you can test it using the master branch). |
| This example appends the word counts of network data into a file.</p> |
| |
| <p>You can also explicitly create a <code>JavaStreamingContext</code> from the checkpoint data and start |
| the computation by using <code>new JavaStreamingContext(checkpointDirectory)</code>.</p> |
| |
| </div> |
| </div> |
| |
| <p><strong>Note</strong>: If Spark Streaming and/or the Spark Streaming program is recompiled, |
| you <em>must</em> create a new <code>StreamingContext</code> or <code>JavaStreamingContext</code>, |
| not recreate from checkpoint data. This is because trying to load a |
| context from checkpoint data may fail if the data was generated before recompilation of the |
| classes. So, if you are using <code>getOrCreate</code>, then make sure that the checkpoint directory is |
| explicitly deleted every time recompiled code needs to be launched.</p> |
| |
| <p>This failure recovery can be done automatically using Spark’s |
| <a href="spark-standalone.html">standalone cluster mode</a>, which allows the driver of any Spark application |
| to be launched within the cluster and be restarted on failure (see |
| <a href="spark-standalone.html#launching-applications-inside-the-cluster">supervise mode</a>). This can be |
| tested locally by launching the above example using the supervise mode in a |
| local standalone cluster and killing the java process running the driver (will be shown as |
| <em>DriverWrapper</em> when <code>jps</code> is run to show all active Java processes). The driver should be |
| automatically restarted, and the word counts will cont</p> |
| |
| <p>For other deployment environments like Mesos and Yarn, you have to restart the driver through other |
| mechanisms.</p> |
| |
| <h4>Recovery Semantics</h4> |
| |
| <p>There are two different failure behaviors based on which input sources are used.</p> |
| |
| <ol> |
| <li><em>Using HDFS files as input source</em> - Since the data is reliably stored on HDFS, all data can |
| re-computed and therefore no data will be lost due to any failure.</li> |
| <li><em>Using any input source that receives data through a network</em> - The received input data is |
| replicated in memory to multiple nodes. Since all the data in the Spark worker’s memory is lost |
| when the Spark driver fails, the past input data will not be accessible and driver recovers. |
| Hence, if stateful and window-based operations are used |
| (like <code>updateStateByKey</code>, <code>window</code>, <code>countByValueAndWindow</code>, etc.), then the intermediate state |
| will not be recovered completely.</li> |
| </ol> |
| |
| <p>In future releases, we will support full recoverability for all input sources. Note that for |
| non-stateful transformations like <code>map</code>, <code>count</code>, and <code>reduceByKey</code>, with <em>all</em> input streams, |
| the system, upon restarting, will continue to receive and process new data.</p> |
| |
| <p>To better understand the behavior of the system under driver failure with a HDFS source, let’s |
| consider what will happen with a file input stream. Specifically, in the case of the file input |
| stream, it will correctly identify new files that were created while the driver was down and |
| process them in the same way as it would have if the driver had not failed. To explain further |
| in the case of file input stream, we shall use an example. Let’s say, files are being generated |
| every second, and a Spark Streaming program reads every new file and output the number of lines |
| in the file. This is what the sequence of outputs would be with and without a driver failure.</p> |
| |
| <table class="table"> |
| <!-- Results table headers --> |
| <tr> |
| <th> Time </th> |
| <th> Number of lines in input file </th> |
| <th> Output without driver failure </th> |
| <th> Output with driver failure </th> |
| </tr> |
| <tr> |
| <td>1</td> |
| <td>10</td> |
| <td>10</td> |
| <td>10</td> |
| </tr> |
| <tr> |
| <td>2</td> |
| <td>20</td> |
| <td>20</td> |
| <td>20</td> |
| </tr> |
| <tr> |
| <td>3</td> |
| <td>30</td> |
| <td>30</td> |
| <td>30</td> |
| </tr> |
| <tr> |
| <td>4</td> |
| <td>40</td> |
| <td>40</td> |
| <td>[DRIVER FAILS]<br />no output</td> |
| </tr> |
| <tr> |
| <td>5</td> |
| <td>50</td> |
| <td>50</td> |
| <td>no output</td> |
| </tr> |
| <tr> |
| <td>6</td> |
| <td>60</td> |
| <td>60</td> |
| <td>no output</td> |
| </tr> |
| <tr> |
| <td>7</td> |
| <td>70</td> |
| <td>70</td> |
| <td>[DRIVER RECOVERS]<br />40, 50, 60, 70</td> |
| </tr> |
| <tr> |
| <td>8</td> |
| <td>80</td> |
| <td>80</td> |
| <td>80</td> |
| </tr> |
| <tr> |
| <td>9</td> |
| <td>90</td> |
| <td>90</td> |
| <td>90</td> |
| </tr> |
| <tr> |
| <td>10</td> |
| <td>100</td> |
| <td>100</td> |
| <td>100</td> |
| </tr> |
| </table> |
| |
| <p>If the driver had crashed in the middle of the processing of time 3, then it will process time 3 |
| and output 30 after recovery.</p> |
| |
| <hr /> |
| |
| <h1 id="migration-guide-from-091-or-below-to-1x">Migration Guide from 0.9.1 or below to 1.x</h1> |
| <p>Between Spark 0.9.1 and Spark 1.0, there were a few API changes made to ensure future API stability. |
| This section elaborates the steps required to migrate your existing code to 1.0.</p> |
| |
| <p><strong>Input DStreams</strong>: All operations that create an input stream (e.g., <code>StreamingContext.socketStream</code>, |
| <code>FlumeUtils.createStream</code>, etc.) now returns |
| <a href="api/scala/index.html#org.apache.spark.streaming.dstream.InputDStream">InputDStream</a> / |
| <a href="api/scala/index.html#org.apache.spark.streaming.dstream.ReceiverInputDStream">ReceiverInputDStream</a> |
| (instead of DStream) for Scala, and <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaInputDStream.html">JavaInputDStream</a> / |
| <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaPairInputDStream.html">JavaPairInputDStream</a> / |
| <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaReceiverInputDStream.html">JavaReceiverInputDStream</a> / |
| <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaPairReceiverInputDStream.html">JavaPairReceiverInputDStream</a> |
| (instead of JavaDStream) for Java. This ensures that functionality specific to input streams can |
| be added to these classes in the future without breaking binary compatibility. |
| Note that your existing Spark Streaming applications should not require any change |
| (as these new classes are subclasses of DStream/JavaDStream) but may require recompilation with Spark 1.0.</p> |
| |
| <p><strong>Custom Network Receivers</strong>: Since the release to Spark Streaming, custom network receivers could be defined |
| in Scala using the class NetworkReceiver. However, the API was limited in terms of error handling |
| and reporting, and could not be used from Java. Starting Spark 1.0, this class has been |
| replaced by <a href="api/scala/index.html#org.apache.spark.streaming.receiver.Receiver">Receiver</a> which has |
| the following advantages.</p> |
| |
| <ul> |
| <li>Methods like <code>stop</code> and <code>restart</code> have been added to for better control of the lifecycle of a receiver. See |
| the <a href="streaming-custom-receiver.html">custom receiver guide</a> for more details.</li> |
| <li>Custom receivers can be implemented using both Scala and Java.</li> |
| </ul> |
| |
| <p>To migrate your existing custom receivers from the earlier NetworkReceiver to the new Receiver, you have |
| to do the following.</p> |
| |
| <ul> |
| <li>Make your custom receiver class extend |
| <a href="api/scala/index.html#org.apache.spark.streaming.receiver.Receiver"><code>org.apache.spark.streaming.receiver.Receiver</code></a> |
| instead of <code>org.apache.spark.streaming.dstream.NetworkReceiver</code>.</li> |
| <li>Earlier, a BlockGenerator object had to be created by the custom receiver, to which received data was |
| added for being stored in Spark. It had to be explicitly started and stopped from <code>onStart()</code> and <code>onStop()</code> |
| methods. The new Receiver class makes this unnecessary as it adds a set of methods named <code>store(<data>)</code> |
| that can be called to store the data in Spark. So, to migrate your custom network receiver, remove any |
| BlockGenerator object (does not exist any more in Spark 1.0 anyway), and use <code>store(...)</code> methods on |
| received data.</li> |
| </ul> |
| |
| <p><strong>Actor-based Receivers</strong>: Data could have been received using any Akka Actors by extending the actor class with |
| <code>org.apache.spark.streaming.receivers.Receiver</code> trait. This has been renamed to |
| <a href="api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper"><code>org.apache.spark.streaming.receiver.ActorHelper</code></a> |
| and the <code>pushBlock(...)</code> methods to store received data has been renamed to <code>store(...)</code>. Other helper classes in |
| the <code>org.apache.spark.streaming.receivers</code> package were also moved |
| to <a href="api/scala/index.html#org.apache.spark.streaming.receiver.package"><code>org.apache.spark.streaming.receiver</code></a> |
| package and renamed for better clarity.</p> |
| |
| <hr /> |
| |
| <h1 id="where-to-go-from-here">Where to Go from Here</h1> |
| |
| <ul> |
| <li>API documentation |
| <ul> |
| <li>Scala docs |
| <ul> |
| <li><a href="api/scala/index.html#org.apache.spark.streaming.StreamingContext">StreamingContext</a> and |
| <a href="api/scala/index.html#org.apache.spark.streaming.dstream.DStream">DStream</a></li> |
| <li><a href="api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$">KafkaUtils</a>, |
| <a href="api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$">FlumeUtils</a>, |
| <a href="api/scala/index.html#org.apache.spark.streaming.twitter.TwitterUtils$">TwitterUtils</a>, |
| <a href="api/scala/index.html#org.apache.spark.streaming.zeromq.ZeroMQUtils$">ZeroMQUtils</a>, and |
| <a href="api/scala/index.html#org.apache.spark.streaming.mqtt.MQTTUtils$">MQTTUtils</a></li> |
| </ul> |
| </li> |
| <li>Java docs |
| <ul> |
| <li><a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html">JavaStreamingContext</a>, |
| <a href="api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html">JavaDStream</a> and |
| <a href="api/java/index.html?org/apache/spark/streaming/api/java/PairJavaDStream.html">PairJavaDStream</a></li> |
| <li><a href="api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html">KafkaUtils</a>, |
| <a href="api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html">FlumeUtils</a>, |
| <a href="api/java/index.html?org/apache/spark/streaming/twitter/TwitterUtils.html">TwitterUtils</a>, |
| <a href="api/java/index.html?org/apache/spark/streaming/zeromq/ZeroMQUtils.html">ZeroMQUtils</a>, and |
| <a href="api/java/index.html?org/apache/spark/streaming/mqtt/MQTTUtils.html">MQTTUtils</a></li> |
| </ul> |
| </li> |
| </ul> |
| </li> |
| <li>More examples in <a href="https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming">Scala</a> |
| and <a href="https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming">Java</a></li> |
| <li><a href="http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf">Paper</a> and <a href="http://youtu.be/g171ndOHgJ0">video</a> describing Spark Streaming.</li> |
| </ul> |
| |
| |
| </div> <!-- /container --> |
| |
| <script src="js/vendor/jquery-1.8.0.min.js"></script> |
| <script src="js/vendor/bootstrap.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script type="text/javascript" |
| src="http://cdn.mathjax.org/mathjax/latest/MathJax.js?config=TeX-AMS-MML_HTMLorMML"></script> |
| <script> |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| </script> |
| </body> |
| </html> |