blob: 35ec56bef0262c0972f0d548a783a19edef8f479 [file] [log] [blame]
<!DOCTYPE html>
<html class="no-js">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Apache Avro Data Source Guide - Spark 4.1.0-preview1 Documentation</title>
<link rel="stylesheet" href="css/bootstrap.min.css">
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=DM+Sans:ital,wght@0,400;0,500;0,700;1,400;1,500;1,700&Courier+Prime:wght@400;700&display=swap" rel="stylesheet">
<link href="css/custom.css" rel="stylesheet">
<script src="js/vendor/modernizr-2.6.1-respond-1.1.0.min.js"></script>
<link rel="stylesheet" href="css/pygments-default.css">
<link rel="stylesheet" href="css/docsearch.min.css" />
<link rel="stylesheet" href="css/docsearch.css">
<!-- Matomo -->
<script>
var _paq = window._paq = window._paq || [];
/* tracker methods like "setCustomDimension" should be called before "trackPageView" */
_paq.push(["disableCookies"]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="https://analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '40']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
<!-- End Matomo Code -->
</head>
<body class="global">
<!-- This code is taken from http://twitter.github.com/bootstrap/examples/hero.html -->
<nav class="navbar navbar-expand-lg navbar-dark p-0 px-4 fixed-top" style="background: #1d6890;" id="topbar">
<div class="navbar-brand"><a href="index.html">
<img src="https://spark.apache.org/images/spark-logo-rev.svg" width="141" height="72"/></a><span class="version">4.1.0-preview1</span>
</div>
<button class="navbar-toggler" type="button" data-toggle="collapse"
data-target="#navbarCollapse" aria-controls="navbarCollapse"
aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarCollapse">
<ul class="navbar-nav me-auto">
<li class="nav-item"><a href="index.html" class="nav-link">Overview</a></li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarQuickStart" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Programming Guides</a>
<div class="dropdown-menu" aria-labelledby="navbarQuickStart">
<a class="dropdown-item" href="quick-start.html">Quick Start</a>
<a class="dropdown-item" href="rdd-programming-guide.html">RDDs, Accumulators, Broadcasts Vars</a>
<a class="dropdown-item" href="sql-programming-guide.html">SQL, DataFrames, and Datasets</a>
<a class="dropdown-item" href="streaming/index.html">Structured Streaming</a>
<a class="dropdown-item" href="streaming-programming-guide.html">Spark Streaming (DStreams)</a>
<a class="dropdown-item" href="ml-guide.html">MLlib (Machine Learning)</a>
<a class="dropdown-item" href="graphx-programming-guide.html">GraphX (Graph Processing)</a>
<a class="dropdown-item" href="sparkr.html">SparkR (R on Spark)</a>
<a class="dropdown-item" href="api/python/getting_started/index.html">PySpark (Python on Spark)</a>
<a class="dropdown-item" href="declarative-pipelines-programming-guide.html">Declarative Pipelines</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarAPIDocs" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">API Docs</a>
<div class="dropdown-menu" aria-labelledby="navbarAPIDocs">
<a class="dropdown-item" href="api/python/index.html">Python</a>
<a class="dropdown-item" href="api/scala/org/apache/spark/index.html">Scala</a>
<a class="dropdown-item" href="api/java/index.html">Java</a>
<a class="dropdown-item" href="api/R/index.html">R</a>
<a class="dropdown-item" href="api/sql/index.html">SQL, Built-in Functions</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarDeploying" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">Deploying</a>
<div class="dropdown-menu" aria-labelledby="navbarDeploying">
<a class="dropdown-item" href="cluster-overview.html">Overview</a>
<a class="dropdown-item" href="submitting-applications.html">Submitting Applications</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="spark-standalone.html">Spark Standalone</a>
<a class="dropdown-item" href="running-on-yarn.html">YARN</a>
<a class="dropdown-item" href="running-on-kubernetes.html">Kubernetes</a>
</div>
</li>
<li class="nav-item dropdown">
<a href="#" class="nav-link dropdown-toggle" id="navbarMore" role="button" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">More</a>
<div class="dropdown-menu" aria-labelledby="navbarMore">
<a class="dropdown-item" href="configuration.html">Configuration</a>
<a class="dropdown-item" href="monitoring.html">Monitoring</a>
<a class="dropdown-item" href="tuning.html">Tuning Guide</a>
<a class="dropdown-item" href="job-scheduling.html">Job Scheduling</a>
<a class="dropdown-item" href="security.html">Security</a>
<a class="dropdown-item" href="hardware-provisioning.html">Hardware Provisioning</a>
<a class="dropdown-item" href="migration-guide.html">Migration Guide</a>
<div class="dropdown-divider"></div>
<a class="dropdown-item" href="building-spark.html">Building Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/contributing.html">Contributing to Spark</a>
<a class="dropdown-item" href="https://spark.apache.org/third-party-projects.html">Third Party Projects</a>
</div>
</li>
<li class="nav-item">
<input type="text" id="docsearch-input" placeholder="Search the docs…">
</li>
</ul>
<!--<span class="navbar-text navbar-right"><span class="version-text">v4.1.0-preview1</span></span>-->
</div>
</nav>
<div class="container">
<div class="left-menu-wrapper">
<div class="left-menu">
<h3><a href="sql-programming-guide.html">Spark SQL Guide</a></h3>
<ul>
<li>
<a href="sql-getting-started.html">
Getting Started
</a>
</li>
<li>
<a href="sql-data-sources.html">
Data Sources
</a>
</li>
<ul>
<li>
<a href="sql-data-sources-load-save-functions.html">
Generic Load/Save Functions
</a>
</li>
<li>
<a href="sql-data-sources-generic-options.html">
Generic File Source Options
</a>
</li>
<li>
<a href="sql-data-sources-parquet.html">
Parquet Files
</a>
</li>
<li>
<a href="sql-data-sources-orc.html">
ORC Files
</a>
</li>
<li>
<a href="sql-data-sources-json.html">
JSON Files
</a>
</li>
<li>
<a href="sql-data-sources-csv.html">
CSV Files
</a>
</li>
<li>
<a href="sql-data-sources-text.html">
Text Files
</a>
</li>
<li>
<a href="sql-data-sources-xml.html">
XML Files
</a>
</li>
<li>
<a href="sql-data-sources-hive-tables.html">
Hive Tables
</a>
</li>
<li>
<a href="sql-data-sources-jdbc.html">
JDBC To Other Databases
</a>
</li>
<li>
<a href="sql-data-sources-avro.html">
Avro Files
</a>
</li>
<li>
<a href="sql-data-sources-protobuf.html">
Protobuf data
</a>
</li>
<li>
<a href="sql-data-sources-binaryFile.html">
Whole Binary Files
</a>
</li>
<li>
<a href="sql-data-sources-troubleshooting.html">
Troubleshooting
</a>
</li>
</ul>
<li>
<a href="sql-performance-tuning.html">
Performance Tuning
</a>
</li>
<li>
<a href="sql-distributed-sql-engine.html">
Distributed SQL Engine
</a>
</li>
<li>
<a href="sql-pyspark-pandas-with-arrow.html">
PySpark Usage Guide for Pandas with Apache Arrow
</a>
</li>
<li>
<a href="sql-migration-guide.html">
Migration Guide
</a>
</li>
<li>
<a href="sql-ref.html">
SQL Reference
</a>
</li>
<li>
<a href="sql-error-conditions.html">
Error Conditions
</a>
</li>
</ul>
</div>
</div>
<input id="nav-trigger" class="nav-trigger" checked type="checkbox">
<label for="nav-trigger"></label>
<div class="content-with-sidebar mr-3" id="content">
<h1 class="title">Apache Avro Data Source Guide</h1>
<ul id="markdown-toc">
<li><a href="#deploying" id="markdown-toc-deploying">Deploying</a></li>
<li><a href="#load-and-save-functions" id="markdown-toc-load-and-save-functions">Load and Save Functions</a></li>
<li><a href="#to_avro-and-from_avro" id="markdown-toc-to_avro-and-from_avro">to_avro() and from_avro()</a></li>
<li><a href="#data-source-option" id="markdown-toc-data-source-option">Data Source Option</a></li>
<li><a href="#configuration" id="markdown-toc-configuration">Configuration</a></li>
<li><a href="#compatibility-with-databricks-spark-avro" id="markdown-toc-compatibility-with-databricks-spark-avro">Compatibility with Databricks spark-avro</a></li>
<li><a href="#supported-types-for-avro---spark-sql-conversion" id="markdown-toc-supported-types-for-avro---spark-sql-conversion">Supported types for Avro -&gt; Spark SQL conversion</a></li>
<li><a href="#supported-types-for-spark-sql---avro-conversion" id="markdown-toc-supported-types-for-spark-sql---avro-conversion">Supported types for Spark SQL -&gt; Avro conversion</a></li>
<li><a href="#handling-circular-references-of-avro-fields" id="markdown-toc-handling-circular-references-of-avro-fields">Handling circular references of Avro fields</a></li>
</ul>
<p>Since Spark 2.4 release, <a href="https://spark.apache.org/docs/latest/sql-programming-guide.html">Spark SQL</a> provides built-in support for reading and writing Apache Avro data.</p>
<h2 id="deploying">Deploying</h2>
<p>The <code class="language-plaintext highlighter-rouge">spark-avro</code> module is external and not included in <code class="language-plaintext highlighter-rouge">spark-submit</code> or <code class="language-plaintext highlighter-rouge">spark-shell</code> by default.</p>
<p>As with any Spark applications, <code class="language-plaintext highlighter-rouge">spark-submit</code> is used to launch your application. <code class="language-plaintext highlighter-rouge">spark-avro_2.13</code>
and its dependencies can be directly added to <code class="language-plaintext highlighter-rouge">spark-submit</code> using <code class="language-plaintext highlighter-rouge">--packages</code>, such as,</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-submit --packages org.apache.spark:spark-avro_2.13:4.1.0-preview1 ...
</code></pre></div></div>
<p>For experimenting on <code class="language-plaintext highlighter-rouge">spark-shell</code>, you can also use <code class="language-plaintext highlighter-rouge">--packages</code> to add <code class="language-plaintext highlighter-rouge">org.apache.spark:spark-avro_2.13</code> and its dependencies directly,</p>
<div class="language-plaintext highlighter-rouge"><div class="highlight"><pre class="highlight"><code>./bin/spark-shell --packages org.apache.spark:spark-avro_2.13:4.1.0-preview1 ...
</code></pre></div></div>
<p>See <a href="submitting-applications.html">Application Submission Guide</a> for more details about submitting applications with external dependencies.</p>
<h2 id="load-and-save-functions">Load and Save Functions</h2>
<p>Since <code class="language-plaintext highlighter-rouge">spark-avro</code> module is external, there is no <code class="language-plaintext highlighter-rouge">.avro</code> API in
<code class="language-plaintext highlighter-rouge">DataFrameReader</code> or <code class="language-plaintext highlighter-rouge">DataFrameWriter</code>.</p>
<p>To load/save data in Avro format, you need to specify the data source option <code class="language-plaintext highlighter-rouge">format</code> as <code class="language-plaintext highlighter-rouge">avro</code>(or <code class="language-plaintext highlighter-rouge">org.apache.spark.sql.avro</code>).</p>
<div class="codetabs">
<div data-lang="python">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="n">df</span> <span class="o">=</span> <span class="n">spark</span><span class="p">.</span><span class="n">read</span><span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">avro</span><span class="sh">"</span><span class="p">).</span><span class="nf">load</span><span class="p">(</span><span class="sh">"</span><span class="s">examples/src/main/resources/users.avro</span><span class="sh">"</span><span class="p">)</span>
<span class="n">df</span><span class="p">.</span><span class="nf">select</span><span class="p">(</span><span class="sh">"</span><span class="s">name</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">favorite_color</span><span class="sh">"</span><span class="p">).</span><span class="n">write</span><span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">avro</span><span class="sh">"</span><span class="p">).</span><span class="nf">save</span><span class="p">(</span><span class="sh">"</span><span class="s">namesAndFavColors.avro</span><span class="sh">"</span><span class="p">)</span></code></pre></figure>
</div>
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">val</span> <span class="nv">usersDF</span> <span class="k">=</span> <span class="nv">spark</span><span class="o">.</span><span class="py">read</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"avro"</span><span class="o">).</span><span class="py">load</span><span class="o">(</span><span class="s">"examples/src/main/resources/users.avro"</span><span class="o">)</span>
<span class="nv">usersDF</span><span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"favorite_color"</span><span class="o">).</span><span class="py">write</span><span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"avro"</span><span class="o">).</span><span class="py">save</span><span class="o">(</span><span class="s">"namesAndFavColors.avro"</span><span class="o">)</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">Dataset</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">usersDF</span> <span class="o">=</span> <span class="n">spark</span><span class="o">.</span><span class="na">read</span><span class="o">().</span><span class="na">format</span><span class="o">(</span><span class="s">"avro"</span><span class="o">).</span><span class="na">load</span><span class="o">(</span><span class="s">"examples/src/main/resources/users.avro"</span><span class="o">);</span>
<span class="n">usersDF</span><span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="s">"name"</span><span class="o">,</span> <span class="s">"favorite_color"</span><span class="o">).</span><span class="na">write</span><span class="o">().</span><span class="na">format</span><span class="o">(</span><span class="s">"avro"</span><span class="o">).</span><span class="na">save</span><span class="o">(</span><span class="s">"namesAndFavColors.avro"</span><span class="o">);</span></code></pre></figure>
</div>
<div data-lang="r">
<figure class="highlight"><pre><code class="language-r" data-lang="r"><span class="n">df</span><span class="w"> </span><span class="o">&lt;-</span><span class="w"> </span><span class="n">read.df</span><span class="p">(</span><span class="s2">"examples/src/main/resources/users.avro"</span><span class="p">,</span><span class="w"> </span><span class="s2">"avro"</span><span class="p">)</span><span class="w">
</span><span class="n">write.df</span><span class="p">(</span><span class="n">select</span><span class="p">(</span><span class="n">df</span><span class="p">,</span><span class="w"> </span><span class="s2">"name"</span><span class="p">,</span><span class="w"> </span><span class="s2">"favorite_color"</span><span class="p">),</span><span class="w"> </span><span class="s2">"namesAndFavColors.avro"</span><span class="p">,</span><span class="w"> </span><span class="s2">"avro"</span><span class="p">)</span></code></pre></figure>
</div>
</div>
<h2 id="to_avro-and-from_avro">to_avro() and from_avro()</h2>
<p>The Avro package provides function <code class="language-plaintext highlighter-rouge">to_avro</code> to encode a column as binary in Avro
format, and <code class="language-plaintext highlighter-rouge">from_avro()</code> to decode Avro binary data into a column. Both functions transform one column to
another column, and the input/output SQL data type can be a complex type or a primitive type.</p>
<p>Using Avro record as columns is useful when reading from or writing to a streaming source like Kafka. Each
Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.</p>
<ul>
<li>If the &#8220;value&#8221; field that contains your data is in Avro, you could use <code class="language-plaintext highlighter-rouge">from_avro()</code> to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.</li>
<li><code class="language-plaintext highlighter-rouge">to_avro()</code> can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.</li>
</ul>
<div class="codetabs">
<div data-lang="python">
<figure class="highlight"><pre><code class="language-python" data-lang="python"><span class="kn">from</span> <span class="n">pyspark.sql.avro.functions</span> <span class="kn">import</span> <span class="n">from_avro</span><span class="p">,</span> <span class="n">to_avro</span>
<span class="c1"># `from_avro` requires Avro schema in JSON string format.
</span><span class="n">jsonFormatSchema</span> <span class="o">=</span> <span class="nf">open</span><span class="p">(</span><span class="sh">"</span><span class="s">examples/src/main/resources/user.avsc</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">r</span><span class="sh">"</span><span class="p">).</span><span class="nf">read</span><span class="p">()</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">spark</span>\
<span class="p">.</span><span class="n">readStream</span>\
<span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka</span><span class="sh">"</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka.bootstrap.servers</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">host1:port1,host2:port2</span><span class="sh">"</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">subscribe</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">topic1</span><span class="sh">"</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">load</span><span class="p">()</span>
<span class="c1"># 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
</span><span class="n">output</span> <span class="o">=</span> <span class="n">df</span>\
<span class="p">.</span><span class="nf">select</span><span class="p">(</span><span class="nf">from_avro</span><span class="p">(</span><span class="sh">"</span><span class="s">value</span><span class="sh">"</span><span class="p">,</span> <span class="n">jsonFormatSchema</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">user</span><span class="sh">"</span><span class="p">))</span>\
<span class="p">.</span><span class="nf">where</span><span class="p">(</span><span class="sh">'</span><span class="s">user.favorite_color == </span><span class="sh">"</span><span class="s">red</span><span class="sh">"'</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">select</span><span class="p">(</span><span class="nf">to_avro</span><span class="p">(</span><span class="sh">"</span><span class="s">user.name</span><span class="sh">"</span><span class="p">).</span><span class="nf">alias</span><span class="p">(</span><span class="sh">"</span><span class="s">value</span><span class="sh">"</span><span class="p">))</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">output</span>\
<span class="p">.</span><span class="n">writeStream</span>\
<span class="p">.</span><span class="nf">format</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka</span><span class="sh">"</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">kafka.bootstrap.servers</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">host1:port1,host2:port2</span><span class="sh">"</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">option</span><span class="p">(</span><span class="sh">"</span><span class="s">topic</span><span class="sh">"</span><span class="p">,</span> <span class="sh">"</span><span class="s">topic2</span><span class="sh">"</span><span class="p">)</span>\
<span class="p">.</span><span class="nf">start</span><span class="p">()</span></code></pre></figure>
</div>
<div data-lang="scala">
<figure class="highlight"><pre><code class="language-scala" data-lang="scala"><span class="k">import</span> <span class="nn">org.apache.spark.sql.avro.functions._</span>
<span class="c1">// `from_avro` requires Avro schema in JSON string format.</span>
<span class="k">val</span> <span class="nv">jsonFormatSchema</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">String</span><span class="o">(</span><span class="nv">Files</span><span class="o">.</span><span class="py">readAllBytes</span><span class="o">(</span><span class="nv">Paths</span><span class="o">.</span><span class="py">get</span><span class="o">(</span><span class="s">"./examples/src/main/resources/user.avsc"</span><span class="o">)))</span>
<span class="k">val</span> <span class="nv">df</span> <span class="k">=</span> <span class="n">spark</span>
<span class="o">.</span><span class="py">readStream</span>
<span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span>
<span class="o">.</span><span class="py">load</span><span class="o">()</span>
<span class="c1">// 1. Decode the Avro data into a struct;</span>
<span class="c1">// 2. Filter by column `favorite_color`;</span>
<span class="c1">// 3. Encode the column `name` in Avro format.</span>
<span class="k">val</span> <span class="nv">output</span> <span class="k">=</span> <span class="n">df</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="nf">from_avro</span><span class="o">(</span><span class="n">$</span><span class="s">"value"</span><span class="o">,</span> <span class="n">jsonFormatSchema</span><span class="o">)</span> <span class="n">as</span> <span class="n">$</span><span class="s">"user"</span><span class="o">)</span>
<span class="o">.</span><span class="py">where</span><span class="o">(</span><span class="s">"user.favorite_color == \"red\""</span><span class="o">)</span>
<span class="o">.</span><span class="py">select</span><span class="o">(</span><span class="nf">to_avro</span><span class="o">(</span><span class="n">$</span><span class="s">"user.name"</span><span class="o">)</span> <span class="n">as</span> <span class="n">$</span><span class="s">"value"</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">query</span> <span class="k">=</span> <span class="n">output</span>
<span class="o">.</span><span class="py">writeStream</span>
<span class="o">.</span><span class="py">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic2"</span><span class="o">)</span>
<span class="o">.</span><span class="py">start</span><span class="o">()</span></code></pre></figure>
</div>
<div data-lang="java">
<figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">static</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">functions</span><span class="o">.</span><span class="na">col</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">static</span> <span class="n">org</span><span class="o">.</span><span class="na">apache</span><span class="o">.</span><span class="na">spark</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">avro</span><span class="o">.</span><span class="na">functions</span><span class="o">.*;</span>
<span class="c1">// `from_avro` requires Avro schema in JSON string format.</span>
<span class="nc">String</span> <span class="n">jsonFormatSchema</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">String</span><span class="o">(</span><span class="nc">Files</span><span class="o">.</span><span class="na">readAllBytes</span><span class="o">(</span><span class="nc">Paths</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"./examples/src/main/resources/user.avsc"</span><span class="o">)));</span>
<span class="nc">Dataset</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">df</span> <span class="o">=</span> <span class="n">spark</span>
<span class="o">.</span><span class="na">readStream</span><span class="o">()</span>
<span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"subscribe"</span><span class="o">,</span> <span class="s">"topic1"</span><span class="o">)</span>
<span class="o">.</span><span class="na">load</span><span class="o">();</span>
<span class="c1">// 1. Decode the Avro data into a struct;</span>
<span class="c1">// 2. Filter by column `favorite_color`;</span>
<span class="c1">// 3. Encode the column `name` in Avro format.</span>
<span class="nc">Dataset</span><span class="o">&lt;</span><span class="nc">Row</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">df</span>
<span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">from_avro</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"value"</span><span class="o">),</span> <span class="n">jsonFormatSchema</span><span class="o">).</span><span class="na">as</span><span class="o">(</span><span class="s">"user"</span><span class="o">))</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="s">"user.favorite_color == \"red\""</span><span class="o">)</span>
<span class="o">.</span><span class="na">select</span><span class="o">(</span><span class="n">to_avro</span><span class="o">(</span><span class="n">col</span><span class="o">(</span><span class="s">"user.name"</span><span class="o">)).</span><span class="na">as</span><span class="o">(</span><span class="s">"value"</span><span class="o">));</span>
<span class="nc">StreamingQuery</span> <span class="n">query</span> <span class="o">=</span> <span class="n">output</span>
<span class="o">.</span><span class="na">writeStream</span><span class="o">()</span>
<span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"kafka.bootstrap.servers"</span><span class="o">,</span> <span class="s">"host1:port1,host2:port2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">option</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"topic2"</span><span class="o">)</span>
<span class="o">.</span><span class="na">start</span><span class="o">();</span></code></pre></figure>
</div>
<div data-lang="r">
<figure class="highlight"><pre><code class="language-r" data-lang="r"><span class="c1"># `from_avro` requires Avro schema in JSON string format.</span><span class="w">
</span><span class="n">jsonFormatSchema</span><span class="w"> </span><span class="o">&lt;-</span><span class="w"> </span><span class="n">paste0</span><span class="p">(</span><span class="n">readLines</span><span class="p">(</span><span class="s2">"examples/src/main/resources/user.avsc"</span><span class="p">),</span><span class="w"> </span><span class="n">collapse</span><span class="o">=</span><span class="s2">" "</span><span class="p">)</span><span class="w">
</span><span class="n">df</span><span class="w"> </span><span class="o">&lt;-</span><span class="w"> </span><span class="n">read.stream</span><span class="p">(</span><span class="w">
</span><span class="s2">"kafka"</span><span class="p">,</span><span class="w">
</span><span class="n">kafka.bootstrap.servers</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"host1:port1,host2:port2"</span><span class="p">,</span><span class="w">
</span><span class="n">subscribe</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"topic1"</span><span class="w">
</span><span class="p">)</span><span class="w">
</span><span class="c1"># 1. Decode the Avro data into a struct;</span><span class="w">
</span><span class="c1"># 2. Filter by column `favorite_color`;</span><span class="w">
</span><span class="c1"># 3. Encode the column `name` in Avro format.</span><span class="w">
</span><span class="n">output</span><span class="w"> </span><span class="o">&lt;-</span><span class="w"> </span><span class="n">select</span><span class="p">(</span><span class="w">
</span><span class="n">filter</span><span class="p">(</span><span class="w">
</span><span class="n">select</span><span class="p">(</span><span class="n">df</span><span class="p">,</span><span class="w"> </span><span class="n">alias</span><span class="p">(</span><span class="n">from_avro</span><span class="p">(</span><span class="s2">"value"</span><span class="p">,</span><span class="w"> </span><span class="n">jsonFormatSchema</span><span class="p">),</span><span class="w"> </span><span class="s2">"user"</span><span class="p">)),</span><span class="w">
</span><span class="n">column</span><span class="p">(</span><span class="s2">"user.favorite_color"</span><span class="p">)</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="s2">"red"</span><span class="w">
</span><span class="p">),</span><span class="w">
</span><span class="n">alias</span><span class="p">(</span><span class="n">to_avro</span><span class="p">(</span><span class="s2">"user.name"</span><span class="p">),</span><span class="w"> </span><span class="s2">"value"</span><span class="p">)</span><span class="w">
</span><span class="p">)</span><span class="w">
</span><span class="n">write.stream</span><span class="p">(</span><span class="w">
</span><span class="n">output</span><span class="p">,</span><span class="w">
</span><span class="s2">"kafka"</span><span class="p">,</span><span class="w">
</span><span class="n">kafka.bootstrap.servers</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"host1:port1,host2:port2"</span><span class="p">,</span><span class="w">
</span><span class="n">topic</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s2">"topic2"</span><span class="w">
</span><span class="p">)</span></code></pre></figure>
</div>
<div data-lang="sql">
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">TABLE</span> <span class="n">t</span> <span class="k">AS</span>
<span class="k">SELECT</span> <span class="n">NAMED_STRUCT</span><span class="p">(</span><span class="s1">'u'</span><span class="p">,</span> <span class="n">NAMED_STRUCT</span><span class="p">(</span><span class="s1">'member0'</span><span class="p">,</span> <span class="n">member0</span><span class="p">,</span> <span class="s1">'member1'</span><span class="p">,</span> <span class="n">member1</span><span class="p">))</span> <span class="k">AS</span> <span class="n">s</span>
<span class="k">FROM</span> <span class="k">VALUES</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="k">NULL</span><span class="p">),</span> <span class="p">(</span><span class="k">NULL</span><span class="p">,</span> <span class="s1">'a'</span><span class="p">)</span> <span class="n">tab</span><span class="p">(</span><span class="n">member0</span><span class="p">,</span> <span class="n">member1</span><span class="p">);</span>
<span class="k">DECLARE</span> <span class="n">avro_schema</span> <span class="n">STRING</span><span class="p">;</span>
<span class="k">SET</span> <span class="k">VARIABLE</span> <span class="n">avro_schema</span> <span class="o">=</span>
<span class="s1">'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }'</span><span class="p">;</span>
<span class="k">SELECT</span> <span class="n">TO_AVRO</span><span class="p">(</span><span class="n">s</span><span class="p">,</span> <span class="n">avro_schema</span><span class="p">)</span> <span class="k">AS</span> <span class="k">RESULT</span> <span class="k">FROM</span> <span class="n">t</span><span class="p">;</span>
<span class="k">SELECT</span> <span class="n">FROM_AVRO</span><span class="p">(</span><span class="k">result</span><span class="p">,</span> <span class="n">avro_schema</span><span class="p">,</span> <span class="k">MAP</span><span class="p">()).</span><span class="n">u</span> <span class="k">FROM</span> <span class="p">(</span>
<span class="k">SELECT</span> <span class="n">TO_AVRO</span><span class="p">(</span><span class="n">s</span><span class="p">,</span> <span class="n">avro_schema</span><span class="p">)</span> <span class="k">AS</span> <span class="k">RESULT</span> <span class="k">FROM</span> <span class="n">t</span><span class="p">);</span>
<span class="k">DROP</span> <span class="k">TEMPORARY</span> <span class="k">VARIABLE</span> <span class="n">avro_schema</span><span class="p">;</span>
<span class="k">DROP</span> <span class="k">TABLE</span> <span class="n">t</span><span class="p">;</span></code></pre></figure>
</div>
</div>
<h2 id="data-source-option">Data Source Option</h2>
<p>Data source options of Avro can be set via:</p>
<ul>
<li>the <code class="language-plaintext highlighter-rouge">.option</code> method on <code class="language-plaintext highlighter-rouge">DataFrameReader</code> or <code class="language-plaintext highlighter-rouge">DataFrameWriter</code>.</li>
<li>the <code class="language-plaintext highlighter-rouge">options</code> parameter in function <code class="language-plaintext highlighter-rouge">from_avro</code>.</li>
</ul>
<table class="spark-config">
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Scope</th><th>Since Version</th></tr></thead>
<tr>
<td><code>avroSchema</code></td>
<td>None</td>
<td>Optional schema provided by a user in JSON format.
<ul>
<li>
When reading Avro files or calling function <code>from_avro</code>, this option can be set to an evolved schema, which is compatible but different with
the actual Avro schema. The deserialization schema will be consistent with the evolved schema.
For example, if we set an evolved schema containing one additional column with a default value,
the reading result in Spark will contain the new column too. Note that when using this option with
<code>from_avro</code>, you still need to pass the actual Avro schema as a parameter to the function.
</li>
<li>
When writing Avro, this option can be set if the expected output Avro schema doesn't match the
schema converted by Spark. For example, the expected schema of one column is of "enum" type,
instead of "string" type in the default converted schema.
</li>
</ul>
</td>
<td> read, write and function <code>from_avro</code></td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>recordName</code></td>
<td>topLevelRecord</td>
<td>Top level record name in write result, which is required in Avro spec.</td>
<td>write</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>recordNamespace</code></td>
<td>""</td>
<td>Record namespace in write result.</td>
<td>write</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>ignoreExtension</code></td>
<td>true</td>
<td>The option controls ignoring of files without <code>.avro</code> extensions in read.<br /> If the option is enabled, all files (with and without <code>.avro</code> extension) are loaded.<br /> The option has been deprecated, and it will be removed in the future releases. Please use the general data source option <a href="./sql-data-sources-generic-options.html#path-glob-filter">pathGlobFilter</a> for filtering file names.</td>
<td>read</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>compression</code></td>
<td>snappy</td>
<td>The <code>compression</code> option allows to specify a compression codec used in write.<br />
Currently supported codecs are <code>uncompressed</code>, <code>snappy</code>, <code>deflate</code>, <code>bzip2</code>, <code>xz</code> and <code>zstandard</code>.<br /> If the option is not set, the configuration <code>spark.sql.avro.compression.codec</code> config is taken into account.</td>
<td>write</td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>mode</code></td>
<td>FAILFAST</td>
<td>The <code>mode</code> option allows to specify parse mode for function <code>from_avro</code>.<br />
Currently supported modes are:
<ul>
<li><code>FAILFAST</code>: Throws an exception on processing corrupted record.</li>
<li><code>PERMISSIVE</code>: Corrupt records are processed as null result. Therefore, the
data schema is forced to be fully nullable, which might be different from the one user provided.</li>
</ul>
</td>
<td>function <code>from_avro</code></td>
<td>2.4.0</td>
</tr>
<tr>
<td><code>datetimeRebaseMode</code></td>
<td>(value of <code>spark.sql.avro.datetimeRebaseModeInRead</code> configuration)</td>
<td>The <code>datetimeRebaseMode</code> option allows to specify the rebasing mode for the values of the <code>date</code>, <code>timestamp-micros</code>, <code>timestamp-millis</code> logical types from the Julian to Proleptic Gregorian calendar.<br />
Currently supported modes are:
<ul>
<li><code>EXCEPTION</code>: fails in reads of ancient dates/timestamps that are ambiguous between the two calendars.</li>
<li><code>CORRECTED</code>: loads dates/timestamps without rebasing.</li>
<li><code>LEGACY</code>: performs rebasing of ancient dates/timestamps from the Julian to Proleptic Gregorian calendar.</li>
</ul>
</td>
<td>read and function <code>from_avro</code></td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>positionalFieldMatching</code></td>
<td>false</td>
<td>This can be used in tandem with the `avroSchema` option to adjust the behavior for matching the fields in the provided Avro schema with those in the SQL schema. By default, the matching will be performed using field names, ignoring their positions. If this option is set to "true", the matching will be based on the position of the fields.</td>
<td>read and write</td>
<td>3.2.0</td>
</tr>
<tr>
<td><code>enableStableIdentifiersForUnionType</code></td>
<td>false</td>
<td>If it is set to true, Avro schema is deserialized into Spark SQL schema, and the Avro Union type is transformed into a structure where the field names remain consistent with their respective types. The resulting field names are converted to lowercase, e.g. member_int or member_string. If two user-defined type names or a user-defined type name and a built-in type name are identical regardless of case, an exception will be raised. However, in other cases, the field names can be uniquely identified.</td>
<td>read</td>
<td>3.5.0</td>
</tr>
<tr>
<td><code>stableIdentifierPrefixForUnionType</code></td>
<td>member_</td>
<td>When `enableStableIdentifiersForUnionType` is enabled, the option allows to configure the prefix for fields of Avro Union type.</td>
<td>read</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>recursiveFieldMaxDepth</code></td>
<td>-1</td>
<td>If this option is specified to negative or is set to 0, recursive fields are not permitted. Setting it to 1 drops all recursive fields, 2 allows recursive fields to be recursed once, and 3 allows it to be recursed twice and so on, up to 15. Values larger than 15 are not allowed in order to avoid inadvertently creating very large schemas. If an avro message has depth beyond this limit, the Spark struct returned is truncated after the recursion limit. An example of usage can be found in section <a href="#handling-circular-references-of-avro-fields">Handling circular references of Avro fields</a></td>
<td>read</td>
<td>4.0.0</td>
</tr>
</table>
<h2 id="configuration">Configuration</h2>
<p>Configuration of Avro can be done via <code class="language-plaintext highlighter-rouge">spark.conf.set</code> or by running <code class="language-plaintext highlighter-rouge">SET key=value</code> commands using SQL.</p>
<table class="spark-config">
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
<tr>
<td>spark.sql.legacy.replaceDatabricksSparkAvro.enabled</td>
<td>true</td>
<td>
If it is set to true, the data source provider <code>com.databricks.spark.avro</code> is mapped
to the built-in but external Avro data source module for backward compatibility.
<br /><b>Note:</b> the SQL config has been deprecated in Spark 3.2 and might be removed in the future.
</td>
<td>2.4.0</td>
</tr>
<tr>
<td>spark.sql.avro.compression.codec</td>
<td>snappy</td>
<td>
Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate,
snappy, bzip2, xz and zstandard. Default codec is snappy.
</td>
<td>2.4.0</td>
</tr>
<tr>
<td>spark.sql.avro.deflate.level</td>
<td>-1</td>
<td>
Compression level for the deflate codec used in writing of AVRO files. Valid value must be in
the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level
in the current implementation.
</td>
<td>2.4.0</td>
</tr>
<tr>
<td>spark.sql.avro.xz.level</td>
<td>6</td>
<td>
Compression level for the xz codec used in writing of AVRO files. Valid value must be in
the range of from 1 to 9 inclusive. The default value is 6 in the current implementation.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.zstandard.level</td>
<td>3</td>
<td>
Compression level for the zstandard codec used in writing of AVRO files.
The default value is 3 in the current implementation.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.zstandard.bufferPool.enabled</td>
<td>false</td>
<td>
If true, enable buffer pool of ZSTD JNI library when writing of AVRO files.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.datetimeRebaseModeInRead</td>
<td><code>EXCEPTION</code></td>
<td>The rebasing mode for the values of the <code>date</code>, <code>timestamp-micros</code>, <code>timestamp-millis</code> logical types from the Julian to Proleptic Gregorian calendar:<br />
<ul>
<li><code>EXCEPTION</code>: Spark will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.</li>
<li><code>CORRECTED</code>: Spark will not do rebase and read the dates/timestamps as it is.</li>
<li><code>LEGACY</code>: Spark will rebase dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files.</li>
</ul>
This config is only effective if the writer info (like Spark, Hive) of the Avro files is unknown.
</td>
<td>3.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.datetimeRebaseModeInWrite</td>
<td><code>EXCEPTION</code></td>
<td>The rebasing mode for the values of the <code>date</code>, <code>timestamp-micros</code>, <code>timestamp-millis</code> logical types from the Proleptic Gregorian to Julian calendar:<br />
<ul>
<li><code>EXCEPTION</code>: Spark will fail the writing if it sees ancient dates/timestamps that are ambiguous between the two calendars.</li>
<li><code>CORRECTED</code>: Spark will not do rebase and write the dates/timestamps as it is.</li>
<li><code>LEGACY</code>: Spark will rebase dates/timestamps from Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files.</li>
</ul>
</td>
<td>3.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.filterPushdown.enabled</td>
<td>true</td>
<td>
When true, enable filter pushdown to Avro datasource.
</td>
<td>3.1.0</td>
</tr>
</table>
<h2 id="compatibility-with-databricks-spark-avro">Compatibility with Databricks spark-avro</h2>
<p>This Avro data source module is originally from and compatible with Databricks&#8217;s open source repository
<a href="https://github.com/databricks/spark-avro">spark-avro</a>.</p>
<p>By default with the SQL configuration <code class="language-plaintext highlighter-rouge">spark.sql.legacy.replaceDatabricksSparkAvro.enabled</code> enabled, the data source provider <code class="language-plaintext highlighter-rouge">com.databricks.spark.avro</code> is
mapped to this built-in Avro module. For the Spark tables created with <code class="language-plaintext highlighter-rouge">Provider</code> property as <code class="language-plaintext highlighter-rouge">com.databricks.spark.avro</code> in
catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module.</p>
<p>Note in Databricks&#8217;s <a href="https://github.com/databricks/spark-avro">spark-avro</a>, implicit classes
<code class="language-plaintext highlighter-rouge">AvroDataFrameWriter</code> and <code class="language-plaintext highlighter-rouge">AvroDataFrameReader</code> were created for shortcut function <code class="language-plaintext highlighter-rouge">.avro()</code>. In this
built-in but external module, both implicit classes are removed. Please use <code class="language-plaintext highlighter-rouge">.format("avro")</code> in
<code class="language-plaintext highlighter-rouge">DataFrameWriter</code> or <code class="language-plaintext highlighter-rouge">DataFrameReader</code> instead, which should be clean and good enough.</p>
<p>If you prefer using your own build of <code class="language-plaintext highlighter-rouge">spark-avro</code> jar file, you can simply disable the configuration
<code class="language-plaintext highlighter-rouge">spark.sql.legacy.replaceDatabricksSparkAvro.enabled</code>, and use the option <code class="language-plaintext highlighter-rouge">--jars</code> on deploying your
applications. Read the <a href="submitting-applications.html#advanced-dependency-management">Advanced Dependency Management</a> section in the Application
Submission Guide for more details.</p>
<h2 id="supported-types-for-avro---spark-sql-conversion">Supported types for Avro -&gt; Spark SQL conversion</h2>
<p>Currently Spark supports reading all <a href="https://avro.apache.org/docs/1.12.0/specification/#primitive-types">primitive types</a> and <a href="https://avro.apache.org/docs/1.12.0/specification/#complex-types">complex types</a> under records of Avro.</p>
<table>
<thead><tr><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr></thead>
<tr>
<td>boolean</td>
<td>BooleanType</td>
</tr>
<tr>
<td>int</td>
<td>IntegerType</td>
</tr>
<tr>
<td>long</td>
<td>LongType</td>
</tr>
<tr>
<td>float</td>
<td>FloatType</td>
</tr>
<tr>
<td>double</td>
<td>DoubleType</td>
</tr>
<tr>
<td>string</td>
<td>StringType</td>
</tr>
<tr>
<td>enum</td>
<td>StringType</td>
</tr>
<tr>
<td>fixed</td>
<td>BinaryType</td>
</tr>
<tr>
<td>bytes</td>
<td>BinaryType</td>
</tr>
<tr>
<td>record</td>
<td>StructType</td>
</tr>
<tr>
<td>array</td>
<td>ArrayType</td>
</tr>
<tr>
<td>map</td>
<td>MapType</td>
</tr>
<tr>
<td>union</td>
<td>See below</td>
</tr>
</table>
<p>In addition to the types listed above, it supports reading <code class="language-plaintext highlighter-rouge">union</code> types. The following three types are considered basic <code class="language-plaintext highlighter-rouge">union</code> types:</p>
<ol>
<li><code class="language-plaintext highlighter-rouge">union(int, long)</code> will be mapped to LongType.</li>
<li><code class="language-plaintext highlighter-rouge">union(float, double)</code> will be mapped to DoubleType.</li>
<li><code class="language-plaintext highlighter-rouge">union(something, null)</code>, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true.
All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.</li>
</ol>
<p>It also supports reading the following Avro <a href="https://avro.apache.org/docs/1.12.0/specification/#logical-types">logical types</a>:</p>
<table>
<thead><tr><th><b>Avro logical type</b></th><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr></thead>
<tr>
<td>date</td>
<td>int</td>
<td>DateType</td>
</tr>
<tr>
<td>timestamp-millis</td>
<td>long</td>
<td>TimestampType</td>
</tr>
<tr>
<td>timestamp-micros</td>
<td>long</td>
<td>TimestampType</td>
</tr>
<tr>
<td>decimal</td>
<td>fixed</td>
<td>DecimalType</td>
</tr>
<tr>
<td>decimal</td>
<td>bytes</td>
<td>DecimalType</td>
</tr>
</table>
<p>At the moment, it ignores docs, aliases and other properties present in the Avro file.</p>
<h2 id="supported-types-for-spark-sql---avro-conversion">Supported types for Spark SQL -&gt; Avro conversion</h2>
<p>Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:</p>
<table>
<thead><tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr></thead>
<tr>
<td>ByteType</td>
<td>int</td>
<td></td>
</tr>
<tr>
<td>ShortType</td>
<td>int</td>
<td></td>
</tr>
<tr>
<td>BinaryType</td>
<td>bytes</td>
<td></td>
</tr>
<tr>
<td>DateType</td>
<td>int</td>
<td>date</td>
</tr>
<tr>
<td>TimestampType</td>
<td>long</td>
<td>timestamp-micros</td>
</tr>
<tr>
<td>DecimalType</td>
<td>fixed</td>
<td>decimal</td>
</tr>
</table>
<p>You can also specify the whole output Avro schema with the option <code class="language-plaintext highlighter-rouge">avroSchema</code>, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema:</p>
<table>
<thead><tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr></thead>
<tr>
<td>BinaryType</td>
<td>fixed</td>
<td></td>
</tr>
<tr>
<td>StringType</td>
<td>enum</td>
<td></td>
</tr>
<tr>
<td>TimestampType</td>
<td>long</td>
<td>timestamp-millis</td>
</tr>
<tr>
<td>DecimalType</td>
<td>bytes</td>
<td>decimal</td>
</tr>
</table>
<h2 id="handling-circular-references-of-avro-fields">Handling circular references of Avro fields</h2>
<p>In Avro, a circular reference occurs when the type of a field is defined in one of the parent records. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior.
To read Avro data with schema that has circular reference, users can use the <code class="language-plaintext highlighter-rouge">recursiveFieldMaxDepth</code> option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, Spark Avro data source will not permit recursive fields by setting <code class="language-plaintext highlighter-rouge">recursiveFieldMaxDepth</code> to -1. However, you can set this option to 1 to 15 if needed.</p>
<p>Setting <code class="language-plaintext highlighter-rouge">recursiveFieldMaxDepth</code> to 1 drops all recursive fields, setting it to 2 allows it to be recursed once, and setting it to 3 allows it to be recursed twice. A <code class="language-plaintext highlighter-rouge">recursiveFieldMaxDepth</code> value greater than 15 is not allowed, as it can lead to performance issues and even stack overflows.</p>
<p>SQL Schema for the below Avro message will vary based on the value of <code class="language-plaintext highlighter-rouge">recursiveFieldMaxDepth</code>.</p>
<div data-lang="avro">
<div class="d-none">
This div is only used to make markdown editor/viewer happy and does not display on web
```avro
</div>
<figure class="highlight"><pre><code class="language-avro" data-lang="avro">{
"type": "record",
"name": "Node",
"fields": [
{"name": "Id", "type": "int"},
{"name": "Next", "type": ["null", "Node"]}
]
}
// The Avro schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursiveFieldMaxDepth` value.
1: struct&lt;Id: int&gt;
2: struct&lt;Id: int, Next: struct&lt;Id: int&gt;&gt;
3: struct&lt;Id: int, Next: struct&lt;Id: int, Next: struct&lt;Id: int&gt;&gt;&gt;</code></pre></figure>
<div class="d-none">
```
</div>
</div>
</div>
<!-- /container -->
</div>
<script src="js/vendor/jquery-3.5.1.min.js"></script>
<script src="js/vendor/bootstrap.bundle.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>
<script type="text/javascript" src="js/vendor/docsearch.min.js"></script>
<script type="text/javascript">
// DocSearch is entirely free and automated. DocSearch is built in two parts:
// 1. a crawler which we run on our own infrastructure every 24 hours. It follows every link
// in your website and extract content from every page it traverses. It then pushes this
// content to an Algolia index.
// 2. a JavaScript snippet to be inserted in your website that will bind this Algolia index
// to your search input and display its results in a dropdown UI. If you want to find more
// details on how works DocSearch, check the docs of DocSearch.
docsearch({
apiKey: 'd62f962a82bc9abb53471cb7b89da35e',
appId: 'RAI69RXRSK',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:4.1.0-preview1"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
</script>
<!-- MathJax Section -->
<script type="text/x-mathjax-config">
MathJax.Hub.Config({
TeX: { equationNumbers: { autoNumber: "AMS" } }
});
</script>
<script>
// Note that we load MathJax this way to work with local file (file://), HTTP and HTTPS.
// We could use "//cdn.mathjax...", but that won't support "file://".
(function(d, script) {
script = d.createElement('script');
script.type = 'text/javascript';
script.async = true;
script.onload = function(){
MathJax.Hub.Config({
tex2jax: {
inlineMath: [ ["$", "$"], ["\\\\(","\\\\)"] ],
displayMath: [ ["$$","$$"], ["\\[", "\\]"] ],
processEscapes: true,
skipTags: ['script', 'noscript', 'style', 'textarea', 'pre']
}
});
};
script.src = ('https:' == document.location.protocol ? 'https://' : 'http://') +
'cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.1/MathJax.js' +
'?config=TeX-AMS-MML_HTMLorMML';
d.getElementsByTagName('head')[0].appendChild(script);
}(document));
</script>
</body>
</html>