| |
| <!DOCTYPE html> |
| <!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]--> |
| <!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]--> |
| <!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1"> |
| <title>Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 2.4.5 Documentation</title> |
| |
| |
| |
| |
| <link rel="stylesheet" href="css/bootstrap.min.css"> |
| <style> |
| body { |
| padding-top: 60px; |
| padding-bottom: 40px; |
| } |
| </style> |
| <meta name="viewport" content="width=device-width"> |
| <link rel="stylesheet" href="css/bootstrap-responsive.min.css"> |
| <link rel="stylesheet" href="css/main.css"> |
| |
| <script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script> |
| |
| <link rel="stylesheet" href="css/pygments-default.css"> |
| |
| |
| <!-- Google analytics script --> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-32518208-2']); |
| _gaq.push(['_trackPageview']); |
| |
| (function() { |
| var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true; |
| ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s); |
| })(); |
| </script> |
| |
| |
| </head> |
| <body> |
| <!--[if lt IE 7]> |
| <p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p> |
| <![endif]--> |
| |
| <!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html --> |
| |
| <div class="navbar navbar-fixed-top" id="topbar"> |
| <div class="navbar-inner"> |
| <div class="container"> |
| <div class="brand"><a href="index.html"> |
| <img src="img/spark-logo-hd.png" style="height:50px;"/></a><span class="version">2.4.5</span> |
| </div> |
| <ul class="nav"> |
| <!--TODO(andyk): Add class="active" attribute to li some how.--> |
| <li><a href="index.html">Overview</a></li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="quick-start.html">Quick Start</a></li> |
| <li><a href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a></li> |
| <li><a href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a></li> |
| <li><a href="structured-streaming-programming-guide.html">Structured Streaming</a></li> |
| <li><a href="streaming-programming-guide.html">Spark Streaming (DStreams)</a></li> |
| <li><a href="ml-guide.html">MLlib (Machine Learning)</a></li> |
| <li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li> |
| <li><a href="sparkr.html">SparkR (R on Spark)</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">API Docs<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="api/scala/index.html#org.apache.spark.package">Scala</a></li> |
| <li><a href="api/java/index.html">Java</a></li> |
| <li><a href="api/python/index.html">Python</a></li> |
| <li><a href="api/R/index.html">R</a></li> |
| <li><a href="api/sql/index.html">SQL, Built-in Functions</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="cluster-overview.html">Overview</a></li> |
| <li><a href="submitting-applications.html">Submitting Applications</a></li> |
| <li class="divider"></li> |
| <li><a href="spark-standalone.html">Spark Standalone</a></li> |
| <li><a href="running-on-mesos.html">Mesos</a></li> |
| <li><a href="running-on-yarn.html">YARN</a></li> |
| <li><a href="running-on-kubernetes.html">Kubernetes</a></li> |
| </ul> |
| </li> |
| |
| <li class="dropdown"> |
| <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="configuration.html">Configuration</a></li> |
| <li><a href="monitoring.html">Monitoring</a></li> |
| <li><a href="tuning.html">Tuning Guide</a></li> |
| <li><a href="job-scheduling.html">Job Scheduling</a></li> |
| <li><a href="security.html">Security</a></li> |
| <li><a href="hardware-provisioning.html">Hardware Provisioning</a></li> |
| <li class="divider"></li> |
| <li><a href="building-spark.html">Building Spark</a></li> |
| <li><a href="https://spark.apache.org/contributing.html">Contributing to Spark</a></li> |
| <li><a href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a></li> |
| </ul> |
| </li> |
| </ul> |
| <!--<p class="navbar-text pull-right"><span class="version-text">v2.4.5</span></p>--> |
| </div> |
| </div> |
| </div> |
| |
| <div class="container-wrapper"> |
| |
| |
| <div class="content" id="content"> |
| |
| <h1 class="title">Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)</h1> |
| |
| |
| <p>Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka.</p> |
| |
| <h2 id="linking">Linking</h2> |
| <p>For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:</p> |
| |
| <pre><code>groupId = org.apache.spark |
| artifactId = spark-sql-kafka-0-10_2.12 |
| version = 2.4.5 |
| </code></pre> |
| |
| <p>For Python applications, you need to add this above library and its dependencies when deploying your |
| application. See the <a href="#deploying">Deploying</a> subsection below.</p> |
| |
| <p>For experimenting on <code>spark-shell</code>, you need to add this above library and its dependencies too when invoking <code>spark-shell</code>. Also, see the <a href="#deploying">Deploying</a> subsection below.</p> |
| |
| <h2 id="reading-data-from-kafka">Reading Data from Kafka</h2> |
| |
| <h3 id="creating-a-kafka-source-for-streaming-queries">Creating a Kafka Source for Streaming Queries</h3> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Subscribe to 1 topic</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="n">readStream</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">as</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)]</span> |
| |
| <span class="c1">// Subscribe to multiple topics</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="n">readStream</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1,topic2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">as</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)]</span> |
| |
| <span class="c1">// Subscribe to a pattern</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="n">readStream</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"subscribePattern"</span><span class="o">,</span> <span class="s">"topic.*"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">as</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)]</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Subscribe to 1 topic</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="na">readStream</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| |
| <span class="c1">// Subscribe to multiple topics</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="na">readStream</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1,topic2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| |
| <span class="c1">// Subscribe to a pattern</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="na">readStream</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribePattern"</span><span class="o">,</span> <span class="s">"topic.*"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Subscribe to 1 topic</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> \ |
| <span class="o">.</span><span class="n">readStream</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"subscribe"</span><span class="p">,</span> <span class="s2">"topic1"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">load</span><span class="p">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> |
| |
| <span class="c1"># Subscribe to multiple topics</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> \ |
| <span class="o">.</span><span class="n">readStream</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"subscribe"</span><span class="p">,</span> <span class="s2">"topic1,topic2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">load</span><span class="p">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> |
| |
| <span class="c1"># Subscribe to a pattern</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> \ |
| <span class="o">.</span><span class="n">readStream</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"subscribePattern"</span><span class="p">,</span> <span class="s2">"topic.*"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">load</span><span class="p">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span></code></pre></figure> |
| |
| </div> |
| </div> |
| |
| <h3 id="creating-a-kafka-source-for-batch-queries">Creating a Kafka Source for Batch Queries</h3> |
| <p>If you have a use case that is better suited to batch processing, |
| you can create a Dataset/DataFrame for a defined range of offsets.</p> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Subscribe to 1 topic defaults to the earliest and latest offsets</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="n">read</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">as</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)]</span> |
| |
| <span class="c1">// Subscribe to multiple topics, specifying explicit Kafka offsets</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="n">read</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1,topic2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"startingOffsets"</span><span class="o">,</span> <span class="s">"""{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"endingOffsets"</span><span class="o">,</span> <span class="s">"""{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}"""</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">as</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)]</span> |
| |
| <span class="c1">// Subscribe to a pattern, at the earliest and latest offsets</span> |
| <span class="k">val</span> <span class="n">df</span> <span class="k">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="n">read</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"subscribePattern"</span><span class="o">,</span> <span class="s">"topic.*"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"startingOffsets"</span><span class="o">,</span> <span class="s">"earliest"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"endingOffsets"</span><span class="o">,</span> <span class="s">"latest"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">load</span><span class="o">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">as</span><span class="o">[(</span><span class="kt">String</span>, <span class="kt">String</span><span class="o">)]</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Subscribe to 1 topic defaults to the earliest and latest offsets</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="na">read</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">load</span><span class="o">();</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">);</span> |
| |
| <span class="c1">// Subscribe to multiple topics, specifying explicit Kafka offsets</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="na">read</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1,topic2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"startingOffsets"</span><span class="o">,</span> <span class="s">"{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"endingOffsets"</span><span class="o">,</span> <span class="s">"{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">load</span><span class="o">();</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">);</span> |
| |
| <span class="c1">// Subscribe to a pattern, at the earliest and latest offsets</span> |
| <span class="n">Dataset</span><span class="o"><</span><span class="n">Row</span><span class="o">></span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> |
| <span class="o">.</span><span class="na">read</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribePattern"</span><span class="o">,</span> <span class="s">"topic.*"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"startingOffsets"</span><span class="o">,</span> <span class="s">"earliest"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"endingOffsets"</span><span class="o">,</span> <span class="s">"latest"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">load</span><span class="o">();</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">);</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Subscribe to 1 topic defaults to the earliest and latest offsets</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> \ |
| <span class="o">.</span><span class="n">read</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"subscribe"</span><span class="p">,</span> <span class="s2">"topic1"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">load</span><span class="p">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> |
| |
| <span class="c1"># Subscribe to multiple topics, specifying explicit Kafka offsets</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> \ |
| <span class="o">.</span><span class="n">read</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"subscribe"</span><span class="p">,</span> <span class="s2">"topic1,topic2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"startingOffsets"</span><span class="p">,</span> <span class="s2">"""{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}"""</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"endingOffsets"</span><span class="p">,</span> <span class="s2">"""{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}"""</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">load</span><span class="p">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> |
| |
| <span class="c1"># Subscribe to a pattern, at the earliest and latest offsets</span> |
| <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span> \ |
| <span class="o">.</span><span class="n">read</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"subscribePattern"</span><span class="p">,</span> <span class="s2">"topic.*"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"startingOffsets"</span><span class="p">,</span> <span class="s2">"earliest"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"endingOffsets"</span><span class="p">,</span> <span class="s2">"latest"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">load</span><span class="p">()</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span></code></pre></figure> |
| |
| </div> |
| </div> |
| |
| <p>Each row in the source has the following schema:</p> |
| <table class="table"> |
| <tr><th>Column</th><th>Type</th></tr> |
| <tr> |
| <td>key</td> |
| <td>binary</td> |
| </tr> |
| <tr> |
| <td>value</td> |
| <td>binary</td> |
| </tr> |
| <tr> |
| <td>topic</td> |
| <td>string</td> |
| </tr> |
| <tr> |
| <td>partition</td> |
| <td>int</td> |
| </tr> |
| <tr> |
| <td>offset</td> |
| <td>long</td> |
| </tr> |
| <tr> |
| <td>timestamp</td> |
| <td>long</td> |
| </tr> |
| <tr> |
| <td>timestampType</td> |
| <td>int</td> |
| </tr> |
| </table> |
| |
| <p>The following options must be set for the Kafka source |
| for both batch and streaming queries.</p> |
| |
| <table class="table"> |
| <tr><th>Option</th><th>value</th><th>meaning</th></tr> |
| <tr> |
| <td>assign</td> |
| <td>json string {"topicA":[0,1],"topicB":[2,4]}</td> |
| <td>Specific TopicPartitions to consume. |
| Only one of "assign", "subscribe" or "subscribePattern" |
| options can be specified for Kafka source.</td> |
| </tr> |
| <tr> |
| <td>subscribe</td> |
| <td>A comma-separated list of topics</td> |
| <td>The topic list to subscribe. |
| Only one of "assign", "subscribe" or "subscribePattern" |
| options can be specified for Kafka source.</td> |
| </tr> |
| <tr> |
| <td>subscribePattern</td> |
| <td>Java regex string</td> |
| <td>The pattern used to subscribe to topic(s). |
| Only one of "assign, "subscribe" or "subscribePattern" |
| options can be specified for Kafka source.</td> |
| </tr> |
| <tr> |
| <td>kafka.bootstrap.servers</td> |
| <td>A comma-separated list of host:port</td> |
| <td>The Kafka "bootstrap.servers" configuration.</td> |
| </tr> |
| </table> |
| |
| <p>The following configurations are optional:</p> |
| |
| <table class="table"> |
| <tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr> |
| <tr> |
| <td>startingOffsets</td> |
| <td>"earliest", "latest" (streaming only), or json string |
| """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ |
| </td> |
| <td>"latest" for streaming, "earliest" for batch</td> |
| <td>streaming and batch</td> |
| <td>The start point when a query is started, either "earliest" which is from the earliest offsets, |
| "latest" which is just from the latest offsets, or a json string specifying a starting offset for |
| each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. |
| Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. |
| For streaming queries, this only applies when a new query is started, and that resuming will |
| always pick up from where the query left off. Newly discovered partitions during a query will start at |
| earliest.</td> |
| </tr> |
| <tr> |
| <td>endingOffsets</td> |
| <td>latest or json string |
| {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} |
| </td> |
| <td>latest</td> |
| <td>batch query</td> |
| <td>The end point when a batch query is ended, either "latest" which is just referred to the |
| latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 |
| as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.</td> |
| </tr> |
| <tr> |
| <td>failOnDataLoss</td> |
| <td>true or false</td> |
| <td>true</td> |
| <td>streaming query</td> |
| <td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or |
| offsets are out of range). This may be a false alarm. You can disable it when it doesn't work |
| as you expected. Batch queries will always fail if it fails to read any data from the provided |
| offsets due to lost data.</td> |
| </tr> |
| <tr> |
| <td>kafkaConsumer.pollTimeoutMs</td> |
| <td>long</td> |
| <td>512</td> |
| <td>streaming and batch</td> |
| <td>The timeout in milliseconds to poll data from Kafka in executors.</td> |
| </tr> |
| <tr> |
| <td>fetchOffset.numRetries</td> |
| <td>int</td> |
| <td>3</td> |
| <td>streaming and batch</td> |
| <td>Number of times to retry before giving up fetching Kafka offsets.</td> |
| </tr> |
| <tr> |
| <td>fetchOffset.retryIntervalMs</td> |
| <td>long</td> |
| <td>10</td> |
| <td>streaming and batch</td> |
| <td>milliseconds to wait before retrying to fetch Kafka offsets</td> |
| </tr> |
| <tr> |
| <td>maxOffsetsPerTrigger</td> |
| <td>long</td> |
| <td>none</td> |
| <td>streaming and batch</td> |
| <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td> |
| </tr> |
| <tr> |
| <td>minPartitions</td> |
| <td>int</td> |
| <td>none</td> |
| <td>streaming</td> |
| <td>Desired minimum number of partitions to read from Kafka. |
| By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. |
| If you set this option to a value greater than your topicPartitions, Spark will divvy up large |
| Kafka partitions to smaller pieces. Please note that this configuration is like a `hint`: the |
| number of Spark tasks will be **approximately** `minPartitions`. It can be less or more depending on |
| rounding errors or Kafka partitions that didn't receive any new data.</td> |
| </tr> |
| </table> |
| |
| <h2 id="writing-data-to-kafka">Writing Data to Kafka</h2> |
| |
| <p>Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that |
| Apache Kafka only supports at least once write semantics. Consequently, when writing—either Streaming Queries |
| or Batch Queries—to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs |
| to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record. |
| Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, |
| if writing the query is successful, then you can assume that the query output was written at least once. A possible |
| solution to remove duplicates when reading the written data could be to introduce a primary (unique) key |
| that can be used to perform de-duplication when reading.</p> |
| |
| <p>The Dataframe being written to Kafka should have the following columns in schema:</p> |
| <table class="table"> |
| <tr><th>Column</th><th>Type</th></tr> |
| <tr> |
| <td>key (optional)</td> |
| <td>string or binary</td> |
| </tr> |
| <tr> |
| <td>value (required)</td> |
| <td>string or binary</td> |
| </tr> |
| <tr> |
| <td>topic (*optional)</td> |
| <td>string</td> |
| </tr> |
| </table> |
| <p>* The topic column is required if the “topic” configuration option is not specified.<br /></p> |
| |
| <p>The value column is the only required option. If a key column is not specified then |
| a <code>null</code> valued key column will be automatically added (see Kafka semantics on |
| how <code>null</code> valued key values are handled). If a topic column exists then its value |
| is used as the topic when writing the given row to Kafka, unless the “topic” configuration |
| option is set i.e., the “topic” configuration option overrides the topic column.</p> |
| |
| <p>The following options must be set for the Kafka sink |
| for both batch and streaming queries.</p> |
| |
| <table class="table"> |
| <tr><th>Option</th><th>value</th><th>meaning</th></tr> |
| <tr> |
| <td>kafka.bootstrap.servers</td> |
| <td>A comma-separated list of host:port</td> |
| <td>The Kafka "bootstrap.servers" configuration.</td> |
| </tr> |
| </table> |
| |
| <p>The following configurations are optional:</p> |
| |
| <table class="table"> |
| <tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr> |
| <tr> |
| <td>topic</td> |
| <td>string</td> |
| <td>none</td> |
| <td>streaming and batch</td> |
| <td>Sets the topic that all rows will be written to in Kafka. This option overrides any |
| topic column that may exist in the data.</td> |
| </tr> |
| </table> |
| |
| <h3 id="creating-a-kafka-sink-for-streaming-queries">Creating a Kafka Sink for Streaming Queries</h3> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Write key-value data from a DataFrame to a specific Kafka topic specified in an option</span> |
| <span class="k">val</span> <span class="n">ds</span> <span class="k">=</span> <span class="n">df</span> |
| <span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">writeStream</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">start</span><span class="o">()</span> |
| |
| <span class="c1">// Write key-value data from a DataFrame to Kafka using a topic specified in the data</span> |
| <span class="k">val</span> <span class="n">ds</span> <span class="k">=</span> <span class="n">df</span> |
| <span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">writeStream</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">start</span><span class="o">()</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Write key-value data from a DataFrame to a specific Kafka topic specified in an option</span> |
| <span class="n">StreamingQuery</span> <span class="n">ds</span> <span class="o">=</span> <span class="n">df</span> |
| <span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">writeStream</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">start</span><span class="o">()</span> |
| |
| <span class="c1">// Write key-value data from a DataFrame to Kafka using a topic specified in the data</span> |
| <span class="n">StreamingQuery</span> <span class="n">ds</span> <span class="o">=</span> <span class="n">df</span> |
| <span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">writeStream</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">start</span><span class="o">()</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Write key-value data from a DataFrame to a specific Kafka topic specified in an option</span> |
| <span class="n">ds</span> <span class="o">=</span> <span class="n">df</span> \ |
| <span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">writeStream</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"topic"</span><span class="p">,</span> <span class="s2">"topic1"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">start</span><span class="p">()</span> |
| |
| <span class="c1"># Write key-value data from a DataFrame to Kafka using a topic specified in the data</span> |
| <span class="n">ds</span> <span class="o">=</span> <span class="n">df</span> \ |
| <span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"topic"</span><span class="p">,</span> <span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">writeStream</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">start</span><span class="p">()</span></code></pre></figure> |
| |
| </div> |
| </div> |
| |
| <h3 id="writing-the-output-of-batch-queries-to-kafka">Writing the output of Batch Queries to Kafka</h3> |
| |
| <div class="codetabs"> |
| <div data-lang="scala"> |
| |
| <figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span></span><span class="c1">// Write key-value data from a DataFrame to a specific Kafka topic specified in an option</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">write</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">save</span><span class="o">()</span> |
| |
| <span class="c1">// Write key-value data from a DataFrame to Kafka using a topic specified in the data</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">write</span> |
| <span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="n">save</span><span class="o">()</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="java"> |
| |
| <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="c1">// Write key-value data from a DataFrame to a specific Kafka topic specified in an option</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">write</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">save</span><span class="o">()</span> |
| |
| <span class="c1">// Write key-value data from a DataFrame to Kafka using a topic specified in the data</span> |
| <span class="n">df</span><span class="o">.</span><span class="na">selectExpr</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"CAST(key AS STRING)"</span><span class="o">,</span> <span class="s">"CAST(value AS STRING)"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">write</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">save</span><span class="o">()</span></code></pre></figure> |
| |
| </div> |
| <div data-lang="python"> |
| |
| <figure class="highlight"><pre><code class="language-python" data-lang="python"><span></span><span class="c1"># Write key-value data from a DataFrame to a specific Kafka topic specified in an option</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">write</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"topic"</span><span class="p">,</span> <span class="s2">"topic1"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">save</span><span class="p">()</span> |
| |
| <span class="c1"># Write key-value data from a DataFrame to Kafka using a topic specified in the data</span> |
| <span class="n">df</span><span class="o">.</span><span class="n">selectExpr</span><span class="p">(</span><span class="s2">"topic"</span><span class="p">,</span> <span class="s2">"CAST(key AS STRING)"</span><span class="p">,</span> <span class="s2">"CAST(value AS STRING)"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">write</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="s2">"kafka"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">option</span><span class="p">(</span><span class="s2">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s2">"host1:port1,host2:port2"</span><span class="p">)</span> \ |
| <span class="o">.</span><span class="n">save</span><span class="p">()</span> |
| </code></pre></figure> |
| |
| </div> |
| </div> |
| |
| <h2 id="kafka-specific-configurations">Kafka Specific Configurations</h2> |
| |
| <p>Kafka’s own configurations can be set via <code>DataStreamReader.option</code> with <code>kafka.</code> prefix, e.g, |
| <code>stream.option("kafka.bootstrap.servers", "host:port")</code>. For possible kafka parameters, see |
| <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">Kafka consumer config docs</a> for |
| parameters related to reading data, and <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka producer config docs</a> |
| for parameters related to writing data.</p> |
| |
| <p>Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:</p> |
| |
| <ul> |
| <li><strong>group.id</strong>: Kafka source will create a unique group id for each query automatically.</li> |
| <li><strong>auto.offset.reset</strong>: Set the source option <code>startingOffsets</code> to specify |
| where to start instead. Structured Streaming manages which offsets are consumed internally, rather |
| than rely on the kafka Consumer to do it. This will ensure that no data is missed when new |
| topics/partitions are dynamically subscribed. Note that <code>startingOffsets</code> only applies when a new |
| streaming query is started, and that resuming will always pick up from where the query left off.</li> |
| <li><strong>key.deserializer</strong>: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use |
| DataFrame operations to explicitly deserialize the keys.</li> |
| <li><strong>value.deserializer</strong>: Values are always deserialized as byte arrays with ByteArrayDeserializer. |
| Use DataFrame operations to explicitly deserialize the values.</li> |
| <li><strong>key.serializer</strong>: Keys are always serialized with ByteArraySerializer or StringSerializer. Use |
| DataFrame operations to explicitly serialize the keys into either strings or byte arrays.</li> |
| <li><strong>value.serializer</strong>: values are always serialized with ByteArraySerializer or StringSerializer. Use |
| DataFrame operations to explicitly serialize the values into either strings or byte arrays.</li> |
| <li><strong>enable.auto.commit</strong>: Kafka source doesn’t commit any offset.</li> |
| <li><strong>interceptor.classes</strong>: Kafka source always read keys and values as byte arrays. It’s not safe to |
| use ConsumerInterceptor as it may break the query.</li> |
| </ul> |
| |
| <h2 id="deploying">Deploying</h2> |
| |
| <p>As with any Spark applications, <code>spark-submit</code> is used to launch your application. <code>spark-sql-kafka-0-10_2.12</code> |
| and its dependencies can be directly added to <code>spark-submit</code> using <code>--packages</code>, such as,</p> |
| |
| <pre><code>./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 ... |
| </code></pre> |
| |
| <p>For experimenting on <code>spark-shell</code>, you can also use <code>--packages</code> to add <code>spark-sql-kafka-0-10_2.12</code> and its dependencies directly,</p> |
| |
| <pre><code>./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5 ... |
| </code></pre> |
| |
| <p>See <a href="submitting-applications.html">Application Submission Guide</a> for more details about submitting |
| applications with external dependencies.</p> |
| |
| |
| </div> |
| |
| <!-- /container --> |
| </div> |
| |
| <script src="js/vendor/jquery-1.12.4.min.js"></script> |
| <script src="js/vendor/bootstrap.min.js"></script> |
| <script src="js/vendor/anchor.min.js"></script> |
| <script src="js/main.js"></script> |
| |
| <!-- MathJax Section --> |
| <script type="text/x-mathjax-config"> |
| MathJax.Hub.Config({ |
| TeX: { equationNumbers: { autoNumber: "AMS" } } |
| }); |
| </script> |
| <script> |
| // Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS. |
| // We could use "//cdn.mathjax...", but that won't support "file://". |
| (function(d, script) { |
| script = d.createElement('script'); |
| script.type = 'text/javascript'; |
| script.async = true; |
| script.onload = function(){ |
| MathJax.Hub.Config({ |
| tex2jax: { |
| inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ], |
| displayMath: [ ["$$","$$"], ["\\[", "\\]"] ], |
| processEscapes: true, |
| skipTags: ['script', 'noscript', 'style', 'textarea', 'pre'] |
| } |
| }); |
| }; |
| script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') + |
| 'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' + |
| '?config=TeX-AMS-MML_HTMLorMML'; |
| d.getElementsByTagName('head')[0].appendChild(script); |
| }(document)); |
| </script> |
| </body> |
| </html> |