layout: global title: Apache Avro Data Source Guide license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

  • This will become a table of contents (this text will be scraped). {:toc}

Since Spark 2.4 release, Spark SQL provides built-in support for reading and writing Apache Avro data.

Deploying

The spark-avro module is external and not included in spark-submit or spark-shell by default.

As with any Spark applications, spark-submit is used to launch your application. spark-avro_{{site.SCALA_BINARY_VERSION}} and its dependencies can be directly added to spark-submit using --packages, such as,

./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...

For experimenting on spark-shell, you can also use --packages to add org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}} and its dependencies directly,

./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...

See Application Submission Guide for more details about submitting applications with external dependencies.

Load and Save Functions

Since spark-avro module is external, there is no .avro API in DataFrameReader or DataFrameWriter.

To load/save data in Avro format, you need to specify the data source option format as avro(or org.apache.spark.sql.avro).

df = spark.read.format(“avro”).load(“examples/src/main/resources/users.avro”) df.select(“name”, “favorite_color”).write.format(“avro”).save(“namesAndFavColors.avro”)

{% endhighlight %}

val usersDF = spark.read.format(“avro”).load(“examples/src/main/resources/users.avro”) usersDF.select(“name”, “favorite_color”).write.format(“avro”).save(“namesAndFavColors.avro”)

{% endhighlight %}

Dataset usersDF = spark.read().format(“avro”).load(“examples/src/main/resources/users.avro”); usersDF.select(“name”, “favorite_color”).write().format(“avro”).save(“namesAndFavColors.avro”);

{% endhighlight %}

df <- read.df(“examples/src/main/resources/users.avro”, “avro”) write.df(select(df, “name”, “favorite_color”), “namesAndFavColors.avro”, “avro”)

{% endhighlight %}

to_avro() and from_avro()

The Avro package provides function to_avro to encode a column as binary in Avro format, and from_avro() to decode Avro binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.

Using Avro record as columns is useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.

  • If the “value” field that contains your data is in Avro, you could use from_avro() to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.
  • to_avro() can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.

from_avro requires Avro schema in JSON string format.

jsonFormatSchema = open(“examples/src/main/resources/user.avsc”, “r”).read()

df = spark
.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”)
.option(“subscribe”, “topic1”)
.load()

1. Decode the Avro data into a struct;

2. Filter by column favorite_color;

3. Encode the column name in Avro format.

output = df
.select(from_avro(“value”, jsonFormatSchema).alias(“user”))
.where(‘user.favorite_color == “red”’)
.select(to_avro(“user.name”).alias(“value”))

query = output
.writeStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”)
.option(“topic”, “topic2”)
.start()

{% endhighlight %}

// from_avro requires Avro schema in JSON string format. val jsonFormatSchema = new String(Files.readAllBytes(Paths.get(“./examples/src/main/resources/user.avsc”)))

val df = spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “topic1”) .load()

// 1. Decode the Avro data into a struct; // 2. Filter by column favorite_color; // 3. Encode the column name in Avro format. val output = df .select(from_avro($“value”, jsonFormatSchema) as $“user”) .where(“user.favorite_color == "red"”) .select(to_avro($“user.name”) as $“value”)

val query = output .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “topic2”) .start()

{% endhighlight %}

// from_avro requires Avro schema in JSON string format. String jsonFormatSchema = new String(Files.readAllBytes(Paths.get(“./examples/src/main/resources/user.avsc”)));

Dataset df = spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “topic1”) .load();

// 1. Decode the Avro data into a struct; // 2. Filter by column favorite_color; // 3. Encode the column name in Avro format. Dataset output = df .select(from_avro(col(“value”), jsonFormatSchema).as(“user”)) .where(“user.favorite_color == "red"”) .select(to_avro(col(“user.name”)).as(“value”));

StreamingQuery query = output .writeStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “topic2”) .start();

{% endhighlight %}

from_avro requires Avro schema in JSON string format.

jsonFormatSchema <- paste0(readLines(“examples/src/main/resources/user.avsc”), collapse=" ")

df <- read.stream( “kafka”, kafka.bootstrap.servers = “host1:port1,host2:port2”, subscribe = “topic1” )

1. Decode the Avro data into a struct;

2. Filter by column favorite_color;

3. Encode the column name in Avro format.

output <- select( filter( select(df, alias(from_avro(“value”, jsonFormatSchema), “user”)), column(“user.favorite_color”) == “red” ), alias(to_avro(“user.name”), “value”) )

write.stream( output, “kafka”, kafka.bootstrap.servers = “host1:port1,host2:port2”, topic = “topic2” ) {% endhighlight %}

Data Source Option

Data source options of Avro can be set via:

  • the .option method on DataFrameReader or DataFrameWriter.
  • the options parameter in function from_avro.

Configuration

Configuration of Avro can be done via spark.conf.set or by running SET key=value commands using SQL.

Compatibility with Databricks spark-avro

This Avro data source module is originally from and compatible with Databricks's open source repository spark-avro.

By default with the SQL configuration spark.sql.legacy.replaceDatabricksSparkAvro.enabled enabled, the data source provider com.databricks.spark.avro is mapped to this built-in Avro module. For the Spark tables created with Provider property as com.databricks.spark.avro in catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module.

Note in Databricks's spark-avro, implicit classes AvroDataFrameWriter and AvroDataFrameReader were created for shortcut function .avro(). In this built-in but external module, both implicit classes are removed. Please use .format("avro") in DataFrameWriter or DataFrameReader instead, which should be clean and good enough.

If you prefer using your own build of spark-avro jar file, you can simply disable the configuration spark.sql.legacy.replaceDatabricksSparkAvro.enabled, and use the option --jars on deploying your applications. Read the Advanced Dependency Management section in the Application Submission Guide for more details.

Supported types for Avro -> Spark SQL conversion

Currently Spark supports reading all primitive types and complex types under records of Avro.

In addition to the types listed above, it supports reading union types. The following three types are considered basic union types:

  1. union(int, long) will be mapped to LongType.
  2. union(float, double) will be mapped to DoubleType.
  3. union(something, null), where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.

It also supports reading the following Avro logical types:

Supported types for Spark SQL -> Avro conversion

Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:

You can also specify the whole output Avro schema with the option avroSchema, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema: