blob: 9752f486e2d9d658e39646671d8c039d67b007a0 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.5.0 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="css/custom.css" rel="stylesheet">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<link rel="stylesheet" href="css/docsearch.min.css" />
<link rel="stylesheet" href="css/docsearch.css">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body class="global">
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar">
<div class="navbar-brand"><a href="index.html">
<img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.0</span>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="quick-start.html">Quick Start</a>
<a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a>
<a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="api/java/index.html">Java</a>
<a class="dropdown-item" href="api/python/index.html">Python</a>
<a class="dropdown-item" href="api/R/index.html">R</a>
<a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="cluster-overview.html">Overview</a>
<a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="running-on-mesos.html">Mesos</a>
<a class="dropdown-item" href="running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="configuration.html">Configuration</a>
<a class="dropdown-item" href="monitoring.html">Monitoring</a>
<a class="dropdown-item" href="tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="security.html">Security</a>
<a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.0</span></span>-->
</div>
</nav>
<div class="container">
<div class="content mr-3" id="content">
<h1 class="title">Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)</h1>
<p>The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka
partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses
the <a href="https://kafka.apache.org/documentation.html#newconsumerapi">new Kafka consumer API</a> instead of the simple API,
there are notable differences in usage.</p>
<h3 id="linking">Linking</h3>
<p>For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see <a href="streaming-programming-guide.html#linking">Linking section</a> in the main programming guide for further information).</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.0
</code></pre></div></div>
<p><strong>Do not</strong> manually add dependencies on <code class="language-plaintext highlighter-rouge">org.apache.kafka</code> artifacts (e.g. <code class="language-plaintext highlighter-rouge">kafka-clients</code>). The <code class="language-plaintext highlighter-rouge">spark-streaming-kafka-0-10</code> artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose ways.</p>
<h3 id="creating-a-direct-stream">Creating a Direct Stream</h3>
<p>Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span>
<span class="k">import</span> <span class="nn">org.apache.kafka.common.serialization.StringDeserializer</span>
<span class="k">import</span> <span class="nn">org.apache.spark.streaming.kafka010._</span>
<span class="k">import</span> <span class="nn">org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent</span>
<span class="k">import</span> <span class="nn">org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe</span>
<span class="k">val</span> <span class="nv">kafkaParams</span> <span class="k">=</span> <span class="nc">Map</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Object</span><span class="o">](</span>
<span class="s">"bootstrap.servers"</span> <span class="o">-&gt;</span> <span class="s">"localhost:9092,anotherhost:9092"</span><span class="o">,</span>
<span class="s">"key.deserializer"</span> <span class="o">-&gt;</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">StringDeserializer</span><span class="o">],</span>
<span class="s">"value.deserializer"</span> <span class="o">-&gt;</span> <span class="n">classOf</span><span class="o">[</span><span class="kt">StringDeserializer</span><span class="o">],</span>
<span class="s">"group.id"</span> <span class="o">-&gt;</span> <span class="s">"use_a_separate_group_id_for_each_stream"</span><span class="o">,</span>
<span class="s">"auto.offset.reset"</span> <span class="o">-&gt;</span> <span class="s">"latest"</span><span class="o">,</span>
<span class="s">"enable.auto.commit"</span> <span class="o">-&gt;</span> <span class="o">(</span><span class="kc">false</span><span class="k">:</span> <span class="kt">java.lang.Boolean</span><span class="o">)</span>
<span class="o">)</span>
<span class="k">val</span> <span class="nv">topics</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="s">"topicA"</span><span class="o">,</span> <span class="s">"topicB"</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">stream</span> <span class="k">=</span> <span class="nv">KafkaUtils</span><span class="o">.</span><span class="py">createDirectStream</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span>
<span class="n">streamingContext</span><span class="o">,</span>
<span class="nc">PreferConsistent</span><span class="o">,</span>
<span class="nc">Subscribe</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span><span class="n">topics</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">)</span>
<span class="o">)</span>
<span class="nv">stream</span><span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">record</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nv">record</span><span class="o">.</span><span class="py">key</span><span class="o">,</span> <span class="nv">record</span><span class="o">.</span><span class="py">value</span><span class="o">))</span></code></pre></figure>
<p>Each item in the stream is a <a href="http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html">ConsumerRecord</a></p>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">java.util.*</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.SparkConf</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.TaskContext</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.*</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.api.java.function.*</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.streaming.api.java.*</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.spark.streaming.kafka010.*</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.clients.consumer.ConsumerRecord</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.TopicPartition</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.StringDeserializer</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">scala.Tuple2</span><span class="o">;</span>
<span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Object</span><span class="o">&gt;</span> <span class="n">kafkaParams</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashMap</span><span class="o">&lt;&gt;();</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092,anotherhost:9092"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.deserializer"</span><span class="o">,</span> <span class="nc">StringDeserializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.deserializer"</span><span class="o">,</span> <span class="nc">StringDeserializer</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"group.id"</span><span class="o">,</span> <span class="s">"use_a_separate_group_id_for_each_stream"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"auto.offset.reset"</span><span class="o">,</span> <span class="s">"latest"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"enable.auto.commit"</span><span class="o">,</span> <span class="kc">false</span><span class="o">);</span>
<span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">topics</span> <span class="o">=</span> <span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="s">"topicA"</span><span class="o">,</span> <span class="s">"topicB"</span><span class="o">);</span>
<span class="nc">JavaInputDStream</span><span class="o">&lt;</span><span class="nc">ConsumerRecord</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">stream</span> <span class="o">=</span>
<span class="nc">KafkaUtils</span><span class="o">.</span><span class="na">createDirectStream</span><span class="o">(</span>
<span class="n">streamingContext</span><span class="o">,</span>
<span class="nc">LocationStrategies</span><span class="o">.</span><span class="na">PreferConsistent</span><span class="o">(),</span>
<span class="nc">ConsumerStrategies</span><span class="o">.&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span><span class="nc">Subscribe</span><span class="o">(</span><span class="n">topics</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">)</span>
<span class="o">);</span>
<span class="n">stream</span><span class="o">.</span><span class="na">mapToPair</span><span class="o">(</span><span class="n">record</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="nc">Tuple2</span><span class="o">&lt;&gt;(</span><span class="n">record</span><span class="o">.</span><span class="na">key</span><span class="o">(),</span> <span class="n">record</span><span class="o">.</span><span class="na">value</span><span class="o">()));</span></code></pre></figure>
</div>
</div>
<p>For possible kafkaParams, see <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">Kafka consumer config docs</a>.
If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker.
Note that the example sets enable.auto.commit to false, for discussion see <a href="streaming-kafka-0-10-integration.html#storing-offsets">Storing Offsets</a> below.</p>
<h3 id="locationstrategies">LocationStrategies</h3>
<p>The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.</p>
<p>In most cases, you should use <code class="language-plaintext highlighter-rouge">LocationStrategies.PreferConsistent</code> as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use <code class="language-plaintext highlighter-rouge">PreferBrokers</code>, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use <code class="language-plaintext highlighter-rouge">PreferFixed</code>. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).</p>
<p>The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via <code class="language-plaintext highlighter-rouge">spark.streaming.kafka.consumer.cache.maxCapacity</code>.</p>
<p>If you would like to disable the caching for Kafka consumers, you can set <code class="language-plaintext highlighter-rouge">spark.streaming.kafka.consumer.cache.enabled</code> to <code class="language-plaintext highlighter-rouge">false</code>.</p>
<p>The cache is keyed by topicpartition and group.id, so use a <strong>separate</strong> <code class="language-plaintext highlighter-rouge">group.id</code> for each call to <code class="language-plaintext highlighter-rouge">createDirectStream</code>.</p>
<h3 id="consumerstrategies">ConsumerStrategies</h3>
<p>The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup. <code class="language-plaintext highlighter-rouge">ConsumerStrategies</code> provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.</p>
<p><code class="language-plaintext highlighter-rouge">ConsumerStrategies.Subscribe</code>, as shown above, allows you to subscribe to a fixed collection of topics. <code class="language-plaintext highlighter-rouge">SubscribePattern</code> allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using <code class="language-plaintext highlighter-rouge">Subscribe</code> or <code class="language-plaintext highlighter-rouge">SubscribePattern</code> should respond to adding partitions during a running stream. Finally, <code class="language-plaintext highlighter-rouge">Assign</code> allows you to specify a fixed collection of partitions. All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.</p>
<p>If you have specific consumer setup needs that are not met by the options above, <code class="language-plaintext highlighter-rouge">ConsumerStrategy</code> is a public class that you can extend.</p>
<h3 id="creating-an-rdd">Creating an RDD</h3>
<p>If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// Import dependencies and create kafka params as in Create Direct Stream above</span>
<span class="k">val</span> <span class="nv">offsetRanges</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span>
<span class="c1">// topic, partition, inclusive starting offset, exclusive ending offset</span>
<span class="nc">OffsetRange</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">100</span><span class="o">),</span>
<span class="nc">OffsetRange</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">100</span><span class="o">)</span>
<span class="o">)</span>
<span class="k">val</span> <span class="nv">rdd</span> <span class="k">=</span> <span class="nv">KafkaUtils</span><span class="o">.</span><span class="py">createRDD</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span><span class="n">sparkContext</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="n">offsetRanges</span><span class="o">,</span> <span class="nc">PreferConsistent</span><span class="o">)</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Import dependencies and create kafka params as in Create Direct Stream above</span>
<span class="nc">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">{</span>
<span class="c1">// topic, partition, inclusive starting offset, exclusive ending offset</span>
<span class="nc">OffsetRange</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">100</span><span class="o">),</span>
<span class="nc">OffsetRange</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="s">"test"</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="mi">0</span><span class="o">,</span> <span class="mi">100</span><span class="o">)</span>
<span class="o">};</span>
<span class="nc">JavaRDD</span><span class="o">&lt;</span><span class="nc">ConsumerRecord</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">rdd</span> <span class="o">=</span> <span class="nc">KafkaUtils</span><span class="o">.</span><span class="na">createRDD</span><span class="o">(</span>
<span class="n">sparkContext</span><span class="o">,</span>
<span class="n">kafkaParams</span><span class="o">,</span>
<span class="n">offsetRanges</span><span class="o">,</span>
<span class="nc">LocationStrategies</span><span class="o">.</span><span class="na">PreferConsistent</span><span class="o">()</span>
<span class="o">);</span></code></pre></figure>
</div>
</div>
<p>Note that you cannot use <code class="language-plaintext highlighter-rouge">PreferBrokers</code>, because without the stream there is not a driver-side consumer to automatically look up broker metadata for you. Use <code class="language-plaintext highlighter-rouge">PreferFixed</code> with your own metadata lookups if necessary.</p>
<h3 id="obtaining-offsets">Obtaining Offsets</h3>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">stream</span><span class="o">.</span><span class="py">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">offsetRanges</span> <span class="k">=</span> <span class="nv">rdd</span><span class="o">.</span><span class="py">asInstanceOf</span><span class="o">[</span><span class="kt">HasOffsetRanges</span><span class="o">].</span><span class="py">offsetRanges</span>
<span class="nv">rdd</span><span class="o">.</span><span class="py">foreachPartition</span> <span class="o">{</span> <span class="n">iter</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">o</span><span class="k">:</span> <span class="kt">OffsetRange</span> <span class="o">=</span> <span class="nf">offsetRanges</span><span class="o">(</span><span class="nv">TaskContext</span><span class="o">.</span><span class="py">get</span><span class="o">.</span><span class="py">partitionId</span><span class="o">)</span>
<span class="nf">println</span><span class="o">(</span><span class="n">s</span><span class="s">"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}"</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="n">rdd</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="nc">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">((</span><span class="nc">HasOffsetRanges</span><span class="o">)</span> <span class="n">rdd</span><span class="o">.</span><span class="na">rdd</span><span class="o">()).</span><span class="na">offsetRanges</span><span class="o">();</span>
<span class="n">rdd</span><span class="o">.</span><span class="na">foreachPartition</span><span class="o">(</span><span class="n">consumerRecords</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="nc">OffsetRange</span> <span class="n">o</span> <span class="o">=</span> <span class="n">offsetRanges</span><span class="o">[</span><span class="nc">TaskContext</span><span class="o">.</span><span class="na">get</span><span class="o">().</span><span class="na">partitionId</span><span class="o">()];</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span>
<span class="n">o</span><span class="o">.</span><span class="na">topic</span><span class="o">()</span> <span class="o">+</span> <span class="s">" "</span> <span class="o">+</span> <span class="n">o</span><span class="o">.</span><span class="na">partition</span><span class="o">()</span> <span class="o">+</span> <span class="s">" "</span> <span class="o">+</span> <span class="n">o</span><span class="o">.</span><span class="na">fromOffset</span><span class="o">()</span> <span class="o">+</span> <span class="s">" "</span> <span class="o">+</span> <span class="n">o</span><span class="o">.</span><span class="na">untilOffset</span><span class="o">());</span>
<span class="o">});</span>
<span class="o">});</span></code></pre></figure>
</div>
</div>
<p>Note that the typecast to <code class="language-plaintext highlighter-rouge">HasOffsetRanges</code> will only succeed if it is done in the first method called on the result of <code class="language-plaintext highlighter-rouge">createDirectStream</code>, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().</p>
<h3 id="storing-offsets">Storing Offsets</h3>
<p>Kafka delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are <a href="streaming-programming-guide.html#semantics-of-output-operations">at-least-once</a>. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliability (and code complexity), for how to store offsets.</p>
<h4 id="checkpoints">Checkpoints</h4>
<p>If you enable Spark <a href="streaming-programming-guide.html#checkpointing">checkpointing</a>, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.</p>
<h4 id="kafka-itself">Kafka itself</h4>
<p>Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. This is why the stream example above sets &#8220;enable.auto.commit&#8221; to false. However, you can commit offsets to Kafka after you know your output has been stored, using the <code class="language-plaintext highlighter-rouge">commitAsync</code> API. The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code. However, Kafka is not transactional, so your outputs must still be idempotent.</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="nv">stream</span><span class="o">.</span><span class="py">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">offsetRanges</span> <span class="k">=</span> <span class="nv">rdd</span><span class="o">.</span><span class="py">asInstanceOf</span><span class="o">[</span><span class="kt">HasOffsetRanges</span><span class="o">].</span><span class="py">offsetRanges</span>
<span class="c1">// some time later, after outputs have completed</span>
<span class="nv">stream</span><span class="o">.</span><span class="py">asInstanceOf</span><span class="o">[</span><span class="kt">CanCommitOffsets</span><span class="o">].</span><span class="py">commitAsync</span><span class="o">(</span><span class="n">offsetRanges</span><span class="o">)</span>
<span class="o">}</span></code></pre></figure>
<p>As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.</p>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="n">rdd</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="nc">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">((</span><span class="nc">HasOffsetRanges</span><span class="o">)</span> <span class="n">rdd</span><span class="o">.</span><span class="na">rdd</span><span class="o">()).</span><span class="na">offsetRanges</span><span class="o">();</span>
<span class="c1">// some time later, after outputs have completed</span>
<span class="o">((</span><span class="nc">CanCommitOffsets</span><span class="o">)</span> <span class="n">stream</span><span class="o">.</span><span class="na">inputDStream</span><span class="o">()).</span><span class="na">commitAsync</span><span class="o">(</span><span class="n">offsetRanges</span><span class="o">);</span>
<span class="o">});</span></code></pre></figure>
</div>
</div>
<h4 id="your-own-data-store">Your own data store</h4>
<p>For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you&#8217;re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="c1">// The details depend on your data store, but the general idea looks like this</span>
<span class="c1">// begin from the offsets committed to the database</span>
<span class="k">val</span> <span class="nv">fromOffsets</span> <span class="k">=</span> <span class="nv">selectOffsetsFromYourDatabase</span><span class="o">.</span><span class="py">map</span> <span class="o">{</span> <span class="n">resultSet</span> <span class="k">=&gt;</span>
<span class="k">new</span> <span class="nc">TopicPartition</span><span class="o">(</span><span class="nv">resultSet</span><span class="o">.</span><span class="py">string</span><span class="o">(</span><span class="s">"topic"</span><span class="o">),</span> <span class="nv">resultSet</span><span class="o">.</span><span class="py">int</span><span class="o">(</span><span class="s">"partition"</span><span class="o">))</span> <span class="o">-&gt;</span> <span class="nv">resultSet</span><span class="o">.</span><span class="py">long</span><span class="o">(</span><span class="s">"offset"</span><span class="o">)</span>
<span class="o">}.</span><span class="py">toMap</span>
<span class="k">val</span> <span class="nv">stream</span> <span class="k">=</span> <span class="nv">KafkaUtils</span><span class="o">.</span><span class="py">createDirectStream</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span>
<span class="n">streamingContext</span><span class="o">,</span>
<span class="nc">PreferConsistent</span><span class="o">,</span>
<span class="nc">Assign</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">](</span><span class="nv">fromOffsets</span><span class="o">.</span><span class="py">keys</span><span class="o">.</span><span class="py">toList</span><span class="o">,</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="n">fromOffsets</span><span class="o">)</span>
<span class="o">)</span>
<span class="nv">stream</span><span class="o">.</span><span class="py">foreachRDD</span> <span class="o">{</span> <span class="n">rdd</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">offsetRanges</span> <span class="k">=</span> <span class="nv">rdd</span><span class="o">.</span><span class="py">asInstanceOf</span><span class="o">[</span><span class="kt">HasOffsetRanges</span><span class="o">].</span><span class="py">offsetRanges</span>
<span class="k">val</span> <span class="nv">results</span> <span class="k">=</span> <span class="nf">yourCalculation</span><span class="o">(</span><span class="n">rdd</span><span class="o">)</span>
<span class="c1">// begin your transaction</span>
<span class="c1">// update results</span>
<span class="c1">// update offsets where the end of existing offsets matches the beginning of this batch of offsets</span>
<span class="c1">// assert that offsets were updated correctly</span>
<span class="c1">// end your transaction</span>
<span class="o">}</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// The details depend on your data store, but the general idea looks like this</span>
<span class="c1">// begin from the offsets committed to the database</span>
<span class="nc">Map</span><span class="o">&lt;</span><span class="nc">TopicPartition</span><span class="o">,</span> <span class="nc">Long</span><span class="o">&gt;</span> <span class="n">fromOffsets</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashMap</span><span class="o">&lt;&gt;();</span>
<span class="k">for</span> <span class="o">(</span><span class="n">resultSet</span> <span class="o">:</span> <span class="n">selectOffsetsFromYourDatabase</span><span class="o">)</span>
<span class="n">fromOffsets</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="k">new</span> <span class="nc">TopicPartition</span><span class="o">(</span><span class="n">resultSet</span><span class="o">.</span><span class="na">string</span><span class="o">(</span><span class="s">"topic"</span><span class="o">),</span> <span class="n">resultSet</span><span class="o">.</span><span class="na">int</span><span class="o">(</span><span class="s">"partition"</span><span class="o">)),</span> <span class="n">resultSet</span><span class="o">.</span><span class="na">long</span><span class="o">(</span><span class="s">"offset"</span><span class="o">));</span>
<span class="o">}</span>
<span class="nc">JavaInputDStream</span><span class="o">&lt;</span><span class="nc">ConsumerRecord</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="nc">KafkaUtils</span><span class="o">.</span><span class="na">createDirectStream</span><span class="o">(</span>
<span class="n">streamingContext</span><span class="o">,</span>
<span class="nc">LocationStrategies</span><span class="o">.</span><span class="na">PreferConsistent</span><span class="o">(),</span>
<span class="nc">ConsumerStrategies</span><span class="o">.&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;</span><span class="nc">Assign</span><span class="o">(</span><span class="n">fromOffsets</span><span class="o">.</span><span class="na">keySet</span><span class="o">(),</span> <span class="n">kafkaParams</span><span class="o">,</span> <span class="n">fromOffsets</span><span class="o">)</span>
<span class="o">);</span>
<span class="n">stream</span><span class="o">.</span><span class="na">foreachRDD</span><span class="o">(</span><span class="n">rdd</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="nc">OffsetRange</span><span class="o">[]</span> <span class="n">offsetRanges</span> <span class="o">=</span> <span class="o">((</span><span class="nc">HasOffsetRanges</span><span class="o">)</span> <span class="n">rdd</span><span class="o">.</span><span class="na">rdd</span><span class="o">()).</span><span class="na">offsetRanges</span><span class="o">();</span>
<span class="nc">Object</span> <span class="n">results</span> <span class="o">=</span> <span class="n">yourCalculation</span><span class="o">(</span><span class="n">rdd</span><span class="o">);</span>
<span class="c1">// begin your transaction</span>
<span class="c1">// update results</span>
<span class="c1">// update offsets where the end of existing offsets matches the beginning of this batch of offsets</span>
<span class="c1">// assert that offsets were updated correctly</span>
<span class="c1">// end your transaction</span>
<span class="o">});</span></code></pre></figure>
</div>
</div>
<h3 id="ssl--tls">SSL / TLS</h3>
<p>The new Kafka consumer <a href="http://kafka.apache.org/documentation.html#security_ssl">supports SSL</a>. To enable it, set kafkaParams appropriately before passing to <code class="language-plaintext highlighter-rouge">createDirectStream</code> / <code class="language-plaintext highlighter-rouge">createRDD</code>. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately <a href="security.html">securing</a> Spark inter-node communication.</p>
<div class="codetabs">
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">kafkaParams</span> <span class="k">=</span> <span class="nc">Map</span><span class="o">[</span><span class="kt">String</span>, <span class="kt">Object</span><span class="o">](</span>
<span class="c1">// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS</span>
<span class="s">"security.protocol"</span> <span class="o">-&gt;</span> <span class="s">"SSL"</span><span class="o">,</span>
<span class="s">"ssl.truststore.location"</span> <span class="o">-&gt;</span> <span class="s">"/some-directory/kafka.client.truststore.jks"</span><span class="o">,</span>
<span class="s">"ssl.truststore.password"</span> <span class="o">-&gt;</span> <span class="s">"test1234"</span><span class="o">,</span>
<span class="s">"ssl.keystore.location"</span> <span class="o">-&gt;</span> <span class="s">"/some-directory/kafka.client.keystore.jks"</span><span class="o">,</span>
<span class="s">"ssl.keystore.password"</span> <span class="o">-&gt;</span> <span class="s">"test1234"</span><span class="o">,</span>
<span class="s">"ssl.key.password"</span> <span class="o">-&gt;</span> <span class="s">"test1234"</span>
<span class="o">)</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">Map</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Object</span><span class="o">&gt;</span> <span class="n">kafkaParams</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">HashMap</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Object</span><span class="o">&gt;();</span>
<span class="c1">// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"security.protocol"</span><span class="o">,</span> <span class="s">"SSL"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.truststore.location"</span><span class="o">,</span> <span class="s">"/some-directory/kafka.client.truststore.jks"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.truststore.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.keystore.location"</span><span class="o">,</span> <span class="s">"/some-directory/kafka.client.keystore.jks"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.keystore.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span>
<span class="n">kafkaParams</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"ssl.key.password"</span><span class="o">,</span> <span class="s">"test1234"</span><span class="o">);</span></code></pre></figure>
</div>
</div>
<h3 id="deploying">Deploying</h3>
<p>As with any Spark applications, <code class="language-plaintext highlighter-rouge">spark-submit</code> is used to launch your application.</p>
<p>For Scala and Java applications, if you are using SBT or Maven for project management, then package <code class="language-plaintext highlighter-rouge">spark-streaming-kafka-0-10_2.12</code> and its dependencies into the application JAR. Make sure <code class="language-plaintext highlighter-rouge">spark-core_2.12</code> and <code class="language-plaintext highlighter-rouge">spark-streaming_2.12</code> are marked as <code class="language-plaintext highlighter-rouge">provided</code> dependencies as those are already present in a Spark installation. Then use <code class="language-plaintext highlighter-rouge">spark-submit</code> to launch your application (see <a href="streaming-programming-guide.html#deploying-applications">Deploying section</a> in the main programming guide).</p>
<h3 id="security">Security</h3>
<p>See <a href="structured-streaming-kafka-integration.html#security">Structured Streaming Security</a>.</p>
<h5 id="additional-caveats">Additional Caveats</h5>
<ul>
<li>Kafka native sink is not available so delegation token used only on consumer side.</li>
</ul>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.5.1.min.js"></script>
<script src="js/vendor/bootstrap.bundle.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<script type="text/javascript" src="js/vendor/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'd62f962a82bc9abb53471cb7b89da35e',
appId: 'RAI69RXRSK',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:3.5.0"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>