blob: 2b0df4eb9743e9a6e8c0a6b427b3e9f6522c3be7 [file] [log] [blame]
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js lt-ie9 lt-ie8 lt-ie7"> <![endif]-->
<!--[if IE 7]> <html class="no-js lt-ie9 lt-ie8"> <![endif]-->
<!--[if IE 8]> <html class="no-js lt-ie9"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js"> <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Protobuf Data Source Guide - Spark 3.5.0 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="css/custom.css" rel="stylesheet">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<link rel="stylesheet" href="css/docsearch.min.css" />
<link rel="stylesheet" href="css/docsearch.css">
<!-- Matomo -->
<script type="text/javascript">
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body class="global">
<!--[if lt IE 7]>
<p class="chromeframe">You are using an outdated browser. <a href="https://browsehappy.com/">Upgrade your browser today</a> or <a href="http://www.google.com/chromeframe/?redirect=true">install Google Chrome Frame</a> to better experience this site.</p>
<![endif]-->
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar">
<div class="navbar-brand"><a href="index.html">
<img src="img/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">3.5.0</span>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="quick-start.html">Quick Start</a>
<a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="structured-streaming-programming-guide.html">Structured Streaming</a>
<a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="api/java/index.html">Java</a>
<a class="dropdown-item" href="api/python/index.html">Python</a>
<a class="dropdown-item" href="api/R/index.html">R</a>
<a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="cluster-overview.html">Overview</a>
<a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="running-on-mesos.html">Mesos</a>
<a class="dropdown-item" href="running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="configuration.html">Configuration</a>
<a class="dropdown-item" href="monitoring.html">Monitoring</a>
<a class="dropdown-item" href="tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="security.html">Security</a>
<a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v3.5.0</span></span>-->
</div>
</nav>
<div class="container">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3>
<ul>
<li>
<a href="sql-getting-started.html">
Getting Started
</a>
</li>
<li>
<a href="sql-data-sources.html">
Data Sources
</a>
</li>
<ul>
<li>
<a href="sql-data-sources-load-save-functions.html">
Generic Load/Save Functions
</a>
</li>
<li>
<a href="sql-data-sources-generic-options.html">
Generic File Source Options
</a>
</li>
<li>
<a href="sql-data-sources-parquet.html">
Parquet Files
</a>
</li>
<li>
<a href="sql-data-sources-orc.html">
ORC Files
</a>
</li>
<li>
<a href="sql-data-sources-json.html">
JSON Files
</a>
</li>
<li>
<a href="sql-data-sources-csv.html">
CSV Files
</a>
</li>
<li>
<a href="sql-data-sources-text.html">
Text Files
</a>
</li>
<li>
<a href="sql-data-sources-hive-tables.html">
Hive Tables
</a>
</li>
<li>
<a href="sql-data-sources-jdbc.html">
JDBC To Other Databases
</a>
</li>
<li>
<a href="sql-data-sources-avro.html">
Avro Files
</a>
</li>
<li>
<a href="sql-data-sources-protobuf.html">
Protobuf data
</a>
</li>
<li>
<a href="sql-data-sources-binaryFile.html">
Whole Binary Files
</a>
</li>
<li>
<a href="sql-data-sources-troubleshooting.html">
Troubleshooting
</a>
</li>
</ul>
<li>
<a href="sql-performance-tuning.html">
Performance Tuning
</a>
</li>
<li>
<a href="sql-distributed-sql-engine.html">
Distributed SQL Engine
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html">
PySpark Usage Guide for Pandas with Apache Arrow
</a>
</li>
<li>
<a href="sql-migration-guide.html">
Migration Guide
</a>
</li>
<li>
<a href="sql-ref.html">
SQL Reference
</a>
</li>
<li>
<a href="sql-error-conditions.html">
Error Conditions
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar mr-3" id="content">
<h1 class="title">Protobuf Data Source Guide</h1>
<p>Since Spark 3.4.0 release, <a href="https://spark.apache.org/docs/latest/sql-programming-guide.html">Spark SQL</a> provides built-in support for reading and writing protobuf data.</p>
<h2 id="deploying">Deploying</h2>
<p>The <code class="language-plaintext highlighter-rouge">spark-protobuf</code> module is external and not included in <code class="language-plaintext highlighter-rouge">spark-submit</code> or <code class="language-plaintext highlighter-rouge">spark-shell</code> by default.</p>
<p>As with any Spark applications, <code class="language-plaintext highlighter-rouge">spark-submit</code> is used to launch your application. <code class="language-plaintext highlighter-rouge">spark-protobuf_2.12</code>
and its dependencies can be directly added to <code class="language-plaintext highlighter-rouge">spark-submit</code> using <code class="language-plaintext highlighter-rouge">--packages</code>, such as,</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.0 ...
</code></pre></div></div>
<p>For experimenting on <code class="language-plaintext highlighter-rouge">spark-shell</code>, you can also use <code class="language-plaintext highlighter-rouge">--packages</code> to add <code class="language-plaintext highlighter-rouge">org.apache.spark:spark-protobuf_2.12</code> and its dependencies directly,</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.0 ...
</code></pre></div></div>
<p>See <a href="submitting-applications.html">Application Submission Guide</a> for more details about submitting applications with external dependencies.</p>
<h2 id="to_protobuf-and-from_protobuf">to_protobuf() and from_protobuf()</h2>
<p>The spark-protobuf package provides function <code class="language-plaintext highlighter-rouge">to_protobuf</code> to encode a column as binary in protobuf
format, and <code class="language-plaintext highlighter-rouge">from_protobuf()</code> to decode protobuf binary data into a column. Both functions transform one column to
another column, and the input/output SQL data type can be a complex type or a primitive type.</p>
<p>Using protobuf message as columns is useful when reading from or writing to a streaming source like Kafka. Each
Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.</p>
<ul>
<li>If the &#8220;value&#8221; field that contains your data is in protobuf, you could use <code class="language-plaintext highlighter-rouge">from_protobuf()</code> to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a different sink.</li>
<li><code class="language-plaintext highlighter-rouge">to_protobuf()</code> can be used to turn structs into protobuf message. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.</li>
</ul>
<p>Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to <code class="language-plaintext highlighter-rouge">from_protobuf</code> and <code class="language-plaintext highlighter-rouge">to_protobuf</code>. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results.</p>
<h3 id="python">Python</h3>
<div class="language-python highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">from</span> <span class="nn">pyspark.sql.protobuf.functions</span> <span class="kn">import</span> <span class="n">from_protobuf</span><span class="p">,</span> <span class="n">to_protobuf</span>
<span class="c1"># `from_protobuf` and `to_protobuf` provides two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span>\
<span class="p">.</span><span class="n">readStream</span>\
<span class="p">.</span><span class="nb">format</span><span class="p">(</span><span class="s">"kafka"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">option</span><span class="p">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">option</span><span class="p">(</span><span class="s">"subscribe"</span><span class="p">,</span> <span class="s">"topic1"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">load</span><span class="p">()</span>
<span class="c1"># 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
</span><span class="n">output</span> <span class="o">=</span> <span class="n">df</span>\
<span class="p">.</span><span class="n">select</span><span class="p">(</span><span class="n">from_protobuf</span><span class="p">(</span><span class="s">"value"</span><span class="p">,</span> <span class="s">"AppEvent"</span><span class="p">,</span> <span class="n">descriptorFilePath</span><span class="p">).</span><span class="n">alias</span><span class="p">(</span><span class="s">"event"</span><span class="p">))</span>\
<span class="p">.</span><span class="n">where</span><span class="p">(</span><span class="s">'event.name == "alice"'</span><span class="p">)</span>\
<span class="p">.</span><span class="n">select</span><span class="p">(</span><span class="n">to_protobuf</span><span class="p">(</span><span class="s">"event"</span><span class="p">,</span> <span class="s">"AppEvent"</span><span class="p">,</span> <span class="n">descriptorFilePath</span><span class="p">).</span><span class="n">alias</span><span class="p">(</span><span class="s">"event"</span><span class="p">))</span>
<span class="c1"># Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
</span>
<span class="n">output</span> <span class="o">=</span> <span class="n">df</span>\
<span class="p">.</span><span class="n">select</span><span class="p">(</span><span class="n">from_protobuf</span><span class="p">(</span><span class="s">"value"</span><span class="p">,</span> <span class="s">"org.sparkproject.spark_protobuf.protobuf.AppEvent"</span><span class="p">).</span><span class="n">alias</span><span class="p">(</span><span class="s">"event"</span><span class="p">))</span>\
<span class="p">.</span><span class="n">where</span><span class="p">(</span><span class="s">'event.name == "alice"'</span><span class="p">)</span>
<span class="n">output</span><span class="p">.</span><span class="n">printSchema</span><span class="p">()</span>
<span class="c1"># root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
</span>
<span class="n">output</span> <span class="o">=</span> <span class="n">output</span>
<span class="p">.</span><span class="n">select</span><span class="p">(</span><span class="n">to_protobuf</span><span class="p">(</span><span class="s">"event"</span><span class="p">,</span> <span class="s">"org.sparkproject.spark_protobuf.protobuf.AppEvent"</span><span class="p">).</span><span class="n">alias</span><span class="p">(</span><span class="s">"event"</span><span class="p">))</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">output</span>\
<span class="p">.</span><span class="n">writeStream</span>\
<span class="p">.</span><span class="nb">format</span><span class="p">(</span><span class="s">"kafka"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">option</span><span class="p">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="p">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">option</span><span class="p">(</span><span class="s">"topic"</span><span class="p">,</span> <span class="s">"topic2"</span><span class="p">)</span>\
<span class="p">.</span><span class="n">start</span><span class="p">()</span>
</code></pre></div></div>
<h3 id="scala">Scala</h3>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">import</span> <span class="nn">org.apache.spark.sql.protobuf.functions._</span>
<span class="c1">// `from_protobuf` and `to_protobuf` provides two schema choices. Via Protobuf descriptor file,</span>
<span class="c1">// or via shaded Java class.</span>
<span class="c1">// give input .proto protobuf schema</span>
<span class="c1">// syntax = "proto3"</span>
<span class="c1">// message AppEvent {</span>
<span class="c1">// string name = 1;</span>
<span class="c1">// int64 id = 2;</span>
<span class="c1">// string context = 3;</span>
<span class="c1">// }</span>
<span class="k">val</span> <span class="nv">df</span> <span class="k">=</span> <span class="nv">spark</span>
<span class="o">.</span><span class="py">readStream</span>
<span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span>
<span class="o">.</span><span class="py">load</span><span class="o">()</span>
<span class="c1">// 1. Decode the Protobuf data of schema `AppEvent` into a struct;</span>
<span class="c1">// 2. Filter by column `name`;</span>
<span class="c1">// 3. Encode the column `event` in Protobuf format.</span>
<span class="c1">// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.</span>
<span class="k">val</span> <span class="nv">output</span> <span class="k">=</span> <span class="nv">df</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="nf">from_protobuf</span><span class="o">(</span><span class="n">$</span><span class="s">"value"</span><span class="o">,</span> <span class="s">"AppEvent"</span><span class="o">,</span> <span class="n">descriptorFilePath</span><span class="o">)</span> <span class="n">as</span> <span class="n">$</span><span class="s">"event"</span><span class="o">)</span>
<span class="o">.</span><span class="py">where</span><span class="o">(</span><span class="s">"event.name == \"alice\""</span><span class="o">)</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="nf">to_protobuf</span><span class="o">(</span><span class="n">$</span><span class="s">"user"</span><span class="o">,</span> <span class="s">"AppEvent"</span><span class="o">,</span> <span class="n">descriptorFilePath</span><span class="o">)</span> <span class="n">as</span> <span class="n">$</span><span class="s">"event"</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">query</span> <span class="k">=</span> <span class="nv">output</span>
<span class="o">.</span><span class="py">writeStream</span>
<span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">start</span><span class="o">()</span>
<span class="c1">// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf</span>
<span class="c1">// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:</span>
<span class="c1">// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the</span>
<span class="c1">// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at</span>
<span class="c1">// https://github.com/rangadi/shaded-protobuf-classes.</span>
<span class="k">var</span> <span class="n">output</span> <span class="k">=</span> <span class="nv">df</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="nf">from_protobuf</span><span class="o">(</span><span class="n">$</span><span class="s">"value"</span><span class="o">,</span> <span class="s">"org.example.protos..AppEvent"</span><span class="o">)</span> <span class="n">as</span> <span class="n">$</span><span class="s">"event"</span><span class="o">)</span>
<span class="o">.</span><span class="py">where</span><span class="o">(</span><span class="s">"event.name == \"alice\""</span><span class="o">)</span>
<span class="nv">output</span><span class="o">.</span><span class="py">printSchema</span><span class="o">()</span>
<span class="c1">// root</span>
<span class="c1">// |--event: struct (nullable = true)</span>
<span class="c1">// | |-- name : string (nullable = true)</span>
<span class="c1">// | |-- id: long (nullable = true)</span>
<span class="c1">// | |-- context: string (nullable = true)</span>
<span class="n">output</span> <span class="k">=</span> <span class="nv">output</span><span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="nf">to_protobuf</span><span class="o">(</span><span class="n">$</span><span class="s">"event"</span><span class="o">,</span> <span class="s">"org.sparkproject.spark_protobuf.protobuf.AppEvent"</span><span class="o">)</span> <span class="n">as</span> <span class="n">$</span><span class="s">"event"</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">query</span> <span class="k">=</span> <span class="nv">output</span>
<span class="o">.</span><span class="py">writeStream</span>
<span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">start</span><span class="o">()</span>
</code></pre></div></div>
<h3 id="java">Java</h3>
<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">import</span> <span class="nn">static</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">functions</span><span class="o">.</span><span class="na">col</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">static</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">protobuf</span><span class="o">.</span><span class="na">functions</span><span class="o">.*;</span>
<span class="c1">// `from_protobuf` and `to_protobuf` provides two schema choices. Via Protobuf descriptor file,</span>
<span class="c1">// or via shaded Java class.</span>
<span class="c1">// give input .proto protobuf schema</span>
<span class="c1">// syntax = "proto3"</span>
<span class="c1">// message AppEvent {</span>
<span class="c1">// string name = 1;</span>
<span class="c1">// int64 id = 2;</span>
<span class="c1">// string context = 3;</span>
<span class="c1">// }</span>
<span class="nc">Dataset</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span>
<span class="o">.</span><span class="na">readStream</span><span class="o">()</span>
<span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">load</span><span class="o">();</span>
<span class="c1">// 1. Decode the Protobuf data of schema `AppEvent` into a struct;</span>
<span class="c1">// 2. Filter by column `name`;</span>
<span class="c1">// 3. Encode the column `event` in Protobuf format.</span>
<span class="c1">// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.</span>
<span class="nc">Dataset</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">df</span>
<span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">from_protobuf</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"value"</span><span class="o">),</span> <span class="s">"AppEvent"</span><span class="o">,</span> <span class="n">descriptorFilePath</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"event"</span><span class="o">))</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">"event.name == \"alice\""</span><span class="o">)</span>
<span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">to_protobuf</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"event"</span><span class="o">),</span> <span class="s">"AppEvent"</span><span class="o">,</span> <span class="n">descriptorFilePath</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"event"</span><span class="o">));</span>
<span class="c1">// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf</span>
<span class="c1">// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:</span>
<span class="c1">// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the</span>
<span class="c1">// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at</span>
<span class="c1">// https://github.com/rangadi/shaded-protobuf-classes.</span>
<span class="nc">Dataset</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">df</span>
<span class="o">.</span><span class="na">select</span><span class="o">(</span>
<span class="n">from_protobuf</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"value"</span><span class="o">),</span>
<span class="s">"org.sparkproject.spark_protobuf.protobuf.AppEvent"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"event"</span><span class="o">))</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">"event.name == \"alice\""</span><span class="o">)</span>
<span class="n">output</span><span class="o">.</span><span class="na">printSchema</span><span class="o">()</span>
<span class="c1">// root</span>
<span class="c1">// |--event: struct (nullable = true)</span>
<span class="c1">// | |-- name : string (nullable = true)</span>
<span class="c1">// | |-- id: long (nullable = true)</span>
<span class="c1">// | |-- context: string (nullable = true)</span>
<span class="n">output</span> <span class="o">=</span> <span class="n">output</span><span class="o">.</span><span class="na">select</span><span class="o">(</span>
<span class="n">to_protobuf</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"event"</span><span class="o">),</span>
<span class="s">"org.sparkproject.spark_protobuf.protobuf.AppEvent"</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"event"</span><span class="o">));</span>
<span class="nc">StreamingQuery</span> <span class="n">query</span> <span class="o">=</span> <span class="n">output</span>
<span class="o">.</span><span class="na">writeStream</span><span class="o">()</span>
<span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">start</span><span class="o">();</span>
</code></pre></div></div>
<h2 id="supported-types-for-protobuf---spark-sql-conversion">Supported types for Protobuf -&gt; Spark SQL conversion</h2>
<p>Currently Spark supports reading <a href="https://developers.google.com/protocol-buffers/docs/proto3#scalar">protobuf scalar types</a>, <a href="https://developers.google.com/protocol-buffers/docs/proto3#enum">enum types</a>, <a href="https://developers.google.com/protocol-buffers/docs/proto3#nested">nested type</a>, and <a href="https://developers.google.com/protocol-buffers/docs/proto3#maps">maps type</a> under messages of Protobuf.
In addition to the these types, <code class="language-plaintext highlighter-rouge">spark-protobuf</code> also introduces support for Protobuf <code class="language-plaintext highlighter-rouge">OneOf</code> fields. which allows you to handle messages that can have multiple possible sets of fields, but only one set can be present at a time. This is useful for situations where the data you are working with is not always in the same format, and you need to be able to handle messages with different sets of fields without encountering errors.</p>
<table class="table">
<tr><th><b>Protobuf type</b></th><th><b>Spark SQL type</b></th></tr>
<tr>
<td>boolean</td>
<td>BooleanType</td>
</tr>
<tr>
<td>int</td>
<td>IntegerType</td>
</tr>
<tr>
<td>long</td>
<td>LongType</td>
</tr>
<tr>
<td>float</td>
<td>FloatType</td>
</tr>
<tr>
<td>double</td>
<td>DoubleType</td>
</tr>
<tr>
<td>string</td>
<td>StringType</td>
</tr>
<tr>
<td>enum</td>
<td>StringType</td>
</tr>
<tr>
<td>bytes</td>
<td>BinaryType</td>
</tr>
<tr>
<td>Message</td>
<td>StructType</td>
</tr>
<tr>
<td>repeated</td>
<td>ArrayType</td>
</tr>
<tr>
<td>map</td>
<td>MapType</td>
</tr>
<tr>
<td>OneOf</td>
<td>Struct</td>
</tr>
<tr>
<td>Any</td>
<td>StructType</td>
</tr>
</table>
<p>It also supports reading the following Protobuf types <a href="https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#timestamp">Timestamp</a> and <a href="https://developers.google.com/protocol-buffers/docs/reference/google.protobuf#duration">Duration</a></p>
<table class="table">
<tr><th><b>Protobuf logical type</b></th><th><b>Protobuf schema</b></th><th><b>Spark SQL type</b></th></tr>
<tr>
<td>duration</td>
<td>MessageType{seconds: Long, nanos: Int}</td>
<td>DayTimeIntervalType</td>
</tr>
<tr>
<td>timestamp</td>
<td>MessageType{seconds: Long, nanos: Int}</td>
<td>TimestampType</td>
</tr>
</table>
<h2 id="supported-types-for-spark-sql---protobuf-conversion">Supported types for Spark SQL -&gt; Protobuf conversion</h2>
<p>Spark supports the writing of all Spark SQL types into Protobuf. For most types, the mapping from Spark types to Protobuf types is straightforward (e.g. IntegerType gets converted to int);</p>
<table class="table">
<tr><th><b>Spark SQL type</b></th><th><b>Protobuf type</b></th></tr>
<tr>
<td>BooleanType</td>
<td>boolean</td>
</tr>
<tr>
<td>IntegerType</td>
<td>int</td>
</tr>
<tr>
<td>LongType</td>
<td>long</td>
</tr>
<tr>
<td>FloatType</td>
<td>float</td>
</tr>
<tr>
<td>DoubleType</td>
<td>double</td>
</tr>
<tr>
<td>StringType</td>
<td>string</td>
</tr>
<tr>
<td>StringType</td>
<td>enum</td>
</tr>
<tr>
<td>BinaryType</td>
<td>bytes</td>
</tr>
<tr>
<td>StructType</td>
<td>message</td>
</tr>
<tr>
<td>ArrayType</td>
<td>repeated</td>
</tr>
<tr>
<td>MapType</td>
<td>map</td>
</tr>
</table>
<h2 id="handling-circular-references-protobuf-fields">Handling circular references protobuf fields</h2>
<p>One common issue that can arise when working with Protobuf data is the presence of circular references. In Protobuf, a circular reference occurs when a field refers back to itself or to another field that refers back to the original field. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior.
To address this issue, the latest version of spark-protobuf introduces a new feature: the ability to check for circular references through field types. This allows users use the <code class="language-plaintext highlighter-rouge">recursive.fields.max.depth</code> option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, <code class="language-plaintext highlighter-rouge">spark-protobuf</code> will not permit recursive fields by setting <code class="language-plaintext highlighter-rouge">recursive.fields.max.depth</code> to -1. However, you can set this option to 0 to 10 if needed.</p>
<p>Setting <code class="language-plaintext highlighter-rouge">recursive.fields.max.depth</code> to 0 drops all recursive fields, setting it to 1 allows it to be recursed once, and setting it to 2 allows it to be recursed twice. A <code class="language-plaintext highlighter-rouge">recursive.fields.max.depth</code> value greater than 10 is not allowed, as it can lead to performance issues and even stack overflows.</p>
<p>SQL Schema for the below protobuf message will vary based on the value of <code class="language-plaintext highlighter-rouge">recursive.fields.max.depth</code>.</p>
<div class="language-proto highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="na">syntax</span> <span class="o">=</span> <span class="s">"proto3"</span>
<span class="kd">message</span> <span class="nc">Person</span> <span class="p">{</span>
<span class="kt">string</span> <span class="na">name</span> <span class="o">=</span> <span class="mi">1</span><span class="p">;</span>
<span class="n">Person</span> <span class="na">bff</span> <span class="o">=</span> <span class="mi">2</span>
<span class="p">}</span>
<span class="c1">// The protobuf schema defined above, would be converted into a Spark SQL columns with the following</span>
<span class="c1">// structure based on `recursive.fields.max.depth` value.</span>
<span class="mi">0</span><span class="o">:</span> <span class="n">struct</span><span class="o">&lt;</span><span class="n">name</span><span class="o">:</span> <span class="kt">string</span><span class="p">,</span> <span class="n">bff</span><span class="o">:</span> <span class="n">null</span><span class="err">&gt;</span>
<span class="mi">1</span><span class="o">:</span> <span class="n">struct</span><span class="o">&lt;</span><span class="n">name</span> <span class="kt">string</span><span class="p">,</span> <span class="n">bff</span><span class="o">:</span> <span class="o">&lt;</span><span class="n">name</span><span class="o">:</span> <span class="kt">string</span><span class="p">,</span> <span class="n">bff</span><span class="o">:</span> <span class="n">null</span><span class="err">&gt;&gt;</span>
<span class="mi">2</span><span class="o">:</span> <span class="n">struct</span><span class="o">&lt;</span><span class="n">name</span> <span class="kt">string</span><span class="p">,</span> <span class="n">bff</span><span class="o">:</span> <span class="o">&lt;</span><span class="n">name</span><span class="o">:</span> <span class="kt">string</span><span class="p">,</span> <span class="n">bff</span><span class="o">:</span> <span class="n">struct</span><span class="o">&lt;</span><span class="n">name</span><span class="o">:</span> <span class="kt">string</span><span class="p">,</span> <span class="n">bff</span><span class="o">:</span> <span class="n">null</span><span class="err">&gt;&gt;&gt;</span> <span class="o">...</span>
</code></pre></div></div>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.5.1.min.js"></script>
<script src="js/vendor/bootstrap.bundle.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<script type="text/javascript" src="js/vendor/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'd62f962a82bc9abb53471cb7b89da35e',
appId: 'RAI69RXRSK',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:3.5.0"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>