| |
| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>Spark Streaming Custom Receivers - Spark 3.5.0 Documentation</title> |
| |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <link rel="preconnect" href="https://fonts.googleapis.com"> |
| <link rel="preconnect" href="https://fonts.gstatic.com" crossorigin> |
| <link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet"> |
| <link href="css/custom.css" rel="stylesheet"> |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| <link rel="stylesheet" href="css/docsearch.min.css" /> |
| <link rel="stylesheet" href="css/docsearch.css"> |
| |
| <!-- Matomo --> |
| <script type="text/javascript"> |
| var _paq = window._paq = window._paq || []; |
| /* tracker methods like "setCustomDimension" should be called before "trackPageView" */ |
| _paq.push(["disableCookies"]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="https://analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '40']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| <!-- End Matomo Code --> |
| </head> |
| <body class="global"> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar"> |
| <div class="navbar-brand"><a href="index.html"> |
| <img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.0</span> |
| </div> |
| <button class="navbar-toggler" type="button" data-toggle="collapse" |
| data-target="#navbarCollapse" aria-controls="navbarCollapse" |
| aria-expanded="false" aria-label="Toggle navigation"> |
| <span class="navbar-toggler-icon"></span> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarCollapse"> |
| <ul class="navbar-nav me-auto"> |
| <li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a> |
| <div class="dropdown-menu" aria-labelledby="navbarQuickStart"> |
| <a class="dropdown-item" href="quick-start.html">Quick Start</a> |
| <a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a> |
| <a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a> |
| <a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a> |
| <a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a> |
| <a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a> |
| <a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a> |
| <a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a> |
| <a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a> |
| <div class="dropdown-menu" aria-labelledby="navbarAPIDocs"> |
| <a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a> |
| <a class="dropdown-item" href="api/java/index.html">Java</a> |
| <a class="dropdown-item" href="api/python/index.html">Python</a> |
| <a class="dropdown-item" href="api/R/index.html">R</a> |
| <a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a> |
| <div class="dropdown-menu" aria-labelledby="navbarDeploying"> |
| <a class="dropdown-item" href="cluster-overview.html">Overview</a> |
| <a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a> |
| <a class="dropdown-item" href="running-on-mesos.html">Mesos</a> |
| <a class="dropdown-item" href="running-on-yarn.html">YARN</a> |
| <a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a> |
| </div> |
| </li> |
| |
| <li class="nav-item dropdown"> |
| <a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a> |
| <div class="dropdown-menu" aria-labelledby="navbarMore"> |
| <a class="dropdown-item" href="configuration.html">Configuration</a> |
| <a class="dropdown-item" href="monitoring.html">Monitoring</a> |
| <a class="dropdown-item" href="tuning.html">Tuning Guide</a> |
| <a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a> |
| <a class="dropdown-item" href="security.html">Security</a> |
| <a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a> |
| <a class="dropdown-item" href="migration-guide.html">Migration Guide</a> |
| <div class="dropdown-divider"></div> |
| <a class="dropdown-item" href="building-spark.html">Building Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a> |
| <a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a> |
| </div> |
| </li> |
| |
| <li class="nav-item"> |
| <input type="text" id="docsearch-input" placeholder="Search the docs…"> |
| </li> |
| </ul> |
| <!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.0</span></span>--> |
| </div> |
| </nav> |
| |
| |
| |
| <div class="container"> |
| |
| |
| <div class="content mr-3" id="content"> |
| |
| |
| <h1 class="title">Spark Streaming Custom Receivers</h1> |
| |
| |
| <p>Spark Streaming can receive streaming data from any arbitrary data source beyond |
| the ones for which it has built-in support (that is, beyond Kafka, Kinesis, files, sockets, etc.). |
| This requires the developer to implement a <em>receiver</em> that is customized for receiving data from |
| the concerned data source. This guide walks through the process of implementing a custom receiver |
| and using it in a Spark Streaming application. Note that custom receivers can be implemented |
| in Scala or Java.</p> |
| |
| <h2 id="implementing-a-custom-receiver">Implementing a Custom Receiver</h2> |
| |
| <p>This starts with implementing a <strong>Receiver</strong> |
| (<a href="api/scala/org/apache/spark/streaming/receiver/Receiver.html">Scala doc</a>, |
| <a href="api/java/org/apache/spark/streaming/receiver/Receiver.html">Java doc</a>). |
| A custom receiver must extend this abstract class by implementing two methods</p> |
| |
| <ul> |
| <li><code class="language-plaintext highlighter-rouge">onStart()</code>: Things to do to start receiving data.</li> |
| <li><code class="language-plaintext highlighter-rouge">onStop()</code>: Things to do to stop receiving data.</li> |
| </ul> |
| |
| <p>Both <code class="language-plaintext highlighter-rouge">onStart()</code> and <code class="language-plaintext highlighter-rouge">onStop()</code> must not block indefinitely. Typically, <code class="language-plaintext highlighter-rouge">onStart()</code> would start the threads |
| that are responsible for receiving the data, and <code class="language-plaintext highlighter-rouge">onStop()</code> would ensure that these threads receiving the data |
| are stopped. The receiving threads can also use <code class="language-plaintext highlighter-rouge">isStopped()</code>, a <code class="language-plaintext highlighter-rouge">Receiver</code> method, to check whether they |
| should stop receiving data.</p> |
| |
| <p>Once the data is received, that data can be stored inside Spark |
| by calling <code class="language-plaintext highlighter-rouge">store(data)</code>, which is a method provided by the Receiver class. |
| There are a number of flavors of <code class="language-plaintext highlighter-rouge">store()</code> which allow one to store the received data |
| record-at-a-time or as whole collection of objects / serialized bytes. Note that the flavor of |
| <code class="language-plaintext highlighter-rouge">store()</code> used to implement a receiver affects its reliability and fault-tolerance semantics. |
| This is discussed <a href="#receiver-reliability">later</a> in more detail.</p> |
| |
| <p>Any exception in the receiving threads should be caught and handled properly to avoid silent |
| failures of the receiver. <code class="language-plaintext highlighter-rouge">restart(<exception>)</code> will restart the receiver by |
| asynchronously calling <code class="language-plaintext highlighter-rouge">onStop()</code> and then calling <code class="language-plaintext highlighter-rouge">onStart()</code> after a delay. |
| <code class="language-plaintext highlighter-rouge">stop(<exception>)</code> will call <code class="language-plaintext highlighter-rouge">onStop()</code> and terminate the receiver. Also, <code class="language-plaintext highlighter-rouge">reportError(<error>)</code> |
| reports an error message to the driver (visible in the logs and UI) without stopping / restarting |
| the receiver.</p> |
| |
| <p>The following is a custom receiver that receives a stream of text over a socket. It treats |
| ‘\n’ delimited lines in the text stream as records and stores them with Spark. If the receiving thread |
| has any error connecting or receiving, the receiver is restarted to make another attempt to connect.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">class</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">port</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> |
| <span class="k">extends</span> <span class="nc">Receiver</span><span class="o">[</span><span class="kt">String</span><span class="o">](</span><span class="nv">StorageLevel</span><span class="o">.</span><span class="py">MEMORY_AND_DISK_2</span><span class="o">)</span> <span class="k">with</span> <span class="nc">Logging</span> <span class="o">{</span> |
| |
| <span class="k">def</span> <span class="nf">onStart</span><span class="o">()</span> <span class="o">{</span> |
| <span class="c1">// Start the thread that receives data over a connection</span> |
| <span class="k">new</span> <span class="nc">Thread</span><span class="o">(</span><span class="s">"Socket Receiver"</span><span class="o">)</span> <span class="o">{</span> |
| <span class="k">override</span> <span class="k">def</span> <span class="nf">run</span><span class="o">()</span> <span class="o">{</span> <span class="nf">receive</span><span class="o">()</span> <span class="o">}</span> |
| <span class="o">}.</span><span class="py">start</span><span class="o">()</span> |
| <span class="o">}</span> |
| |
| <span class="k">def</span> <span class="nf">onStop</span><span class="o">()</span> <span class="o">{</span> |
| <span class="c1">// There is nothing much to do as the thread calling receive()</span> |
| <span class="c1">// is designed to stop by itself if isStopped() returns false</span> |
| <span class="o">}</span> |
| |
| <span class="cm">/** Create a socket connection and receive data until receiver is stopped */</span> |
| <span class="k">private</span> <span class="k">def</span> <span class="nf">receive</span><span class="o">()</span> <span class="o">{</span> |
| <span class="k">var</span> <span class="n">socket</span><span class="k">:</span> <span class="kt">Socket</span> <span class="o">=</span> <span class="kc">null</span> |
| <span class="k">var</span> <span class="n">userInput</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="kc">null</span> |
| <span class="k">try</span> <span class="o">{</span> |
| <span class="c1">// Connect to host:port</span> |
| <span class="n">socket</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Socket</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">)</span> |
| |
| <span class="c1">// Until stopped or connection broken continue reading</span> |
| <span class="k">val</span> <span class="nv">reader</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">BufferedReader</span><span class="o">(</span> |
| <span class="k">new</span> <span class="nc">InputStreamReader</span><span class="o">(</span><span class="nv">socket</span><span class="o">.</span><span class="py">getInputStream</span><span class="o">(),</span> <span class="nv">StandardCharsets</span><span class="o">.</span><span class="py">UTF_8</span><span class="o">))</span> |
| <span class="n">userInput</span> <span class="k">=</span> <span class="nv">reader</span><span class="o">.</span><span class="py">readLine</span><span class="o">()</span> |
| <span class="nf">while</span><span class="o">(!</span><span class="n">isStopped</span> <span class="o">&&</span> <span class="n">userInput</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> |
| <span class="nf">store</span><span class="o">(</span><span class="n">userInput</span><span class="o">)</span> |
| <span class="n">userInput</span> <span class="k">=</span> <span class="nv">reader</span><span class="o">.</span><span class="py">readLine</span><span class="o">()</span> |
| <span class="o">}</span> |
| <span class="nv">reader</span><span class="o">.</span><span class="py">close</span><span class="o">()</span> |
| <span class="nv">socket</span><span class="o">.</span><span class="py">close</span><span class="o">()</span> |
| |
| <span class="c1">// Restart in an attempt to connect again when server is active again</span> |
| <span class="nf">restart</span><span class="o">(</span><span class="s">"Trying to connect again"</span><span class="o">)</span> |
| <span class="o">}</span> <span class="k">catch</span> <span class="o">{</span> |
| <span class="k">case</span> <span class="n">e</span><span class="k">:</span> <span class="kt">java.net.ConnectException</span> <span class="o">=></span> |
| <span class="c1">// restart if could not connect to server</span> |
| <span class="nf">restart</span><span class="o">(</span><span class="s">"Error connecting to "</span> <span class="o">+</span> <span class="n">host</span> <span class="o">+</span> <span class="s">":"</span> <span class="o">+</span> <span class="n">port</span><span class="o">,</span> <span class="n">e</span><span class="o">)</span> |
| <span class="k">case</span> <span class="n">t</span><span class="k">:</span> <span class="kt">Throwable</span> <span class="o">=></span> |
| <span class="c1">// restart if there is any other error</span> |
| <span class="nf">restart</span><span class="o">(</span><span class="s">"Error receiving data"</span><span class="o">,</span> <span class="n">t</span><span class="o">)</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">JavaCustomReceiver</span> <span class="kd">extends</span> <span class="nc">Receiver</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="o">{</span> |
| |
| <span class="nc">String</span> <span class="n">host</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> |
| <span class="kt">int</span> <span class="n">port</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span> |
| |
| <span class="kd">public</span> <span class="nf">JavaCustomReceiver</span><span class="o">(</span><span class="nc">String</span> <span class="n">host_</span> <span class="o">,</span> <span class="kt">int</span> <span class="n">port_</span><span class="o">)</span> <span class="o">{</span> |
| <span class="kd">super</span><span class="o">(</span><span class="nc">StorageLevel</span><span class="o">.</span><span class="na">MEMORY_AND_DISK_2</span><span class="o">());</span> |
| <span class="n">host</span> <span class="o">=</span> <span class="n">host_</span><span class="o">;</span> |
| <span class="n">port</span> <span class="o">=</span> <span class="n">port_</span><span class="o">;</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStart</span><span class="o">()</span> <span class="o">{</span> |
| <span class="c1">// Start the thread that receives data over a connection</span> |
| <span class="k">new</span> <span class="nf">Thread</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">receive</span><span class="o">).</span><span class="na">start</span><span class="o">();</span> |
| <span class="o">}</span> |
| |
| <span class="nd">@Override</span> |
| <span class="kd">public</span> <span class="kt">void</span> <span class="nf">onStop</span><span class="o">()</span> <span class="o">{</span> |
| <span class="c1">// There is nothing much to do as the thread calling receive()</span> |
| <span class="c1">// is designed to stop by itself if isStopped() returns false</span> |
| <span class="o">}</span> |
| |
| <span class="cm">/** Create a socket connection and receive data until receiver is stopped */</span> |
| <span class="kd">private</span> <span class="kt">void</span> <span class="nf">receive</span><span class="o">()</span> <span class="o">{</span> |
| <span class="nc">Socket</span> <span class="n">socket</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> |
| <span class="nc">String</span> <span class="n">userInput</span> <span class="o">=</span> <span class="kc">null</span><span class="o">;</span> |
| |
| <span class="k">try</span> <span class="o">{</span> |
| <span class="c1">// connect to the server</span> |
| <span class="n">socket</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">Socket</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">);</span> |
| |
| <span class="nc">BufferedReader</span> <span class="n">reader</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">BufferedReader</span><span class="o">(</span> |
| <span class="k">new</span> <span class="nf">InputStreamReader</span><span class="o">(</span><span class="n">socket</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(),</span> <span class="nc">StandardCharsets</span><span class="o">.</span><span class="na">UTF_8</span><span class="o">));</span> |
| |
| <span class="c1">// Until stopped or connection broken continue reading</span> |
| <span class="k">while</span> <span class="o">(!</span><span class="n">isStopped</span><span class="o">()</span> <span class="o">&&</span> <span class="o">(</span><span class="n">userInput</span> <span class="o">=</span> <span class="n">reader</span><span class="o">.</span><span class="na">readLine</span><span class="o">())</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span> |
| <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Received data '"</span> <span class="o">+</span> <span class="n">userInput</span> <span class="o">+</span> <span class="s">"'"</span><span class="o">);</span> |
| <span class="n">store</span><span class="o">(</span><span class="n">userInput</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="n">reader</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> |
| <span class="n">socket</span><span class="o">.</span><span class="na">close</span><span class="o">();</span> |
| |
| <span class="c1">// Restart in an attempt to connect again when server is active again</span> |
| <span class="n">restart</span><span class="o">(</span><span class="s">"Trying to connect again"</span><span class="o">);</span> |
| <span class="o">}</span> <span class="k">catch</span><span class="o">(</span><span class="nc">ConnectException</span> <span class="n">ce</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">// restart if could not connect to server</span> |
| <span class="n">restart</span><span class="o">(</span><span class="s">"Could not connect"</span><span class="o">,</span> <span class="n">ce</span><span class="o">);</span> |
| <span class="o">}</span> <span class="k">catch</span><span class="o">(</span><span class="nc">Throwable</span> <span class="n">t</span><span class="o">)</span> <span class="o">{</span> |
| <span class="c1">// restart if there is any other error</span> |
| <span class="n">restart</span><span class="o">(</span><span class="s">"Error receiving data"</span><span class="o">,</span> <span class="n">t</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">}</span> |
| <span class="o">}</span></code></pre></figure> |
| |
| </div> |
| </div> |
| |
| <h2 id="using-the-custom-receiver-in-a-spark-streaming-application">Using the custom receiver in a Spark Streaming application</h2> |
| |
| <p>The custom receiver can be used in a Spark Streaming application by using |
| <code class="language-plaintext highlighter-rouge">streamingContext.receiverStream(<instance of custom receiver>)</code>. This will create |
| an input DStream using data received by the instance of custom receiver, as shown below:</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Assuming ssc is the StreamingContext</span> |
| <span class="k">val</span> <span class="nv">customReceiverStream</span> <span class="k">=</span> <span class="nv">ssc</span><span class="o">.</span><span class="py">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">CustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">))</span> |
| <span class="k">val</span> <span class="nv">words</span> <span class="k">=</span> <span class="nv">customReceiverStream</span><span class="o">.</span><span class="py">flatMap</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> |
| <span class="o">...</span></code></pre></figure> |
| |
| <p>The full source code is in the example <a href="https://github.com/apache/spark/blob/v3.5.0/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala">CustomReceiver.scala</a>.</p> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Assuming ssc is the JavaStreamingContext</span> |
| <span class="nc">JavaDStream</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">customReceiverStream</span> <span class="o">=</span> <span class="n">ssc</span><span class="o">.</span><span class="na">receiverStream</span><span class="o">(</span><span class="k">new</span> <span class="nc">JavaCustomReceiver</span><span class="o">(</span><span class="n">host</span><span class="o">,</span> <span class="n">port</span><span class="o">));</span> |
| <span class="nc">JavaDStream</span><span class="o"><</span><span class="nc">String</span><span class="o">></span> <span class="n">words</span> <span class="o">=</span> <span class="n">customReceiverStream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">s</span> <span class="o">-></span> <span class="o">...);</span> |
| <span class="o">...</span></code></pre></figure> |
| |
| <p>The full source code is in the example <a href="https://github.com/apache/spark/blob/v3.5.0/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java">JavaCustomReceiver.java</a>.</p> |
| |
| </div> |
| </div> |
| |
| <h2 id="receiver-reliability">Receiver Reliability</h2> |
| <p>As discussed in brief in the |
| <a href="streaming-programming-guide.html#receiver-reliability">Spark Streaming Programming Guide</a>, |
| there are two kinds of receivers based on their reliability and fault-tolerance semantics.</p> |
| |
| <ol> |
| <li><em>Reliable Receiver</em> - For <em>reliable sources</em> that allow sent data to be acknowledged, a |
| <em>reliable receiver</em> correctly acknowledges to the source that the data has been received |
| and stored in Spark reliably (that is, replicated successfully). Usually, |
| implementing this receiver involves careful consideration of the semantics of source |
| acknowledgements.</li> |
| <li><em>Unreliable Receiver</em> - An <em>unreliable receiver</em> does <em>not</em> send acknowledgement to a source. This can be used for sources that do not support acknowledgement, or even for reliable sources when one does not want or need to go into the complexity of acknowledgement.</li> |
| </ol> |
| |
| <p>To implement a <em>reliable receiver</em>, you have to use <code class="language-plaintext highlighter-rouge">store(multiple-records)</code> to store data. |
| This flavor of <code class="language-plaintext highlighter-rouge">store</code> is a blocking call which returns only after all the given records have |
| been stored inside Spark. If the receiver’s configured storage level uses replication |
| (enabled by default), then this call returns after replication has completed. |
| Thus it ensures that the data is reliably stored, and the receiver can now acknowledge the |
| source appropriately. This ensures that no data is lost when the receiver fails in the middle |
| of replicating data – the buffered data will not be acknowledged and hence will be later resent |
| by the source.</p> |
| |
| <p>An <em>unreliable receiver</em> does not have to implement any of this logic. It can simply receive |
| records from the source and insert them one-at-a-time using <code class="language-plaintext highlighter-rouge">store(single-record)</code>. While it does |
| not get the reliability guarantees of <code class="language-plaintext highlighter-rouge">store(multiple-records)</code>, it has the following advantages:</p> |
| |
| <ul> |
| <li>The system takes care of chunking that data into appropriate sized blocks (look for block |
| interval in the <a href="streaming-programming-guide.html">Spark Streaming Programming Guide</a>).</li> |
| <li>The system takes care of controlling the receiving rates if the rate limits have been specified.</li> |
| <li>Because of these two, unreliable receivers are simpler to implement than reliable receivers.</li> |
| </ul> |
| |
| <p>The following table summarizes the characteristics of both types of receivers</p> |
| |
| <table class="table table-striped"> |
| <thead> |
| <tr> |
| <th>Receiver Type</th> |
| <th>Characteristics</th> |
| </tr> |
| </thead> |
| <tr> |
| <td><b>Unreliable Receivers</b></td> |
| <td> |
| Simple to implement.<br /> |
| System takes care of block generation and rate control. |
| No fault-tolerance guarantees, can lose data on receiver failure. |
| </td> |
| </tr> |
| <tr> |
| <td><b>Reliable Receivers</b></td> |
| <td> |
| Strong fault-tolerance guarantees, can ensure zero data loss.<br /> |
| Block generation and rate control to be handled by the receiver implementation.<br /> |
| Implementation complexity depends on the acknowledgement mechanisms of the source. |
| </td> |
| </tr> |
| <tr> |
| <td></td> |
| <td></td> |
| </tr> |
| </table> |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-3.5.1.min.js"></script> |
| <script src="js/vendor/bootstrap.bundle.min.js"></script> |
| |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <script type="text/javascript" src="js/vendor/docsearch.min.js"></script> |
| <script type="text/javascript"> |
| // DocSearch is entirely free and automated. DocSearch is built in two parts: |
| // 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link |
| // in your website and extract content from every page it traverses. It then pushes this |
| // content to an Algolia index. |
| // 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index |
| // to your search input and display its results in a dropdown UI. If you want to find more |
| // details on how works DocSearch, check the docs of DocSearch. |
| docsearch({ |
| apiKey: 'd62f962a82bc9abb53471cb7b89da35e', |
| appId: 'RAI69RXRSK', |
| indexName: 'apache_spark', |
| inputSelector: '#docsearch-input', |
| enhancedSearchInput: true, |
| algoliaOptions: { |
| 'facetFilters': ["version:3.5.0"] |
| }, |
| debug: false // Set debug to true if you want to inspect the dropdown |
| }); |
| |
| </script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |