Reading Avro Files

Apache Avro (TM) is a data serialization system.

Avro provides:

  • Rich data structures.
  • A compact, fast, binary data format.
  • A container file, to store persistent data.
  • Remote procedure call (RPC).
  • Simple integration with dynamic languages. Code generation is not required to read or write data files nor to use or implement RPC protocols. Code generation as an optional optimization, only worth implementing for statically typed languages.

Learn more about Apache Avro.

Installation

The installation steps vary depending on your Spark cluster’s cluster image version:

Features

Avro Data Source for Spark supports reading and writing of Avro data from Spark SQL.

  • Automatic schema conversion: It supports most conversions between Spark SQL and Avro records, making Avro a first-class citizen in Spark.
  • Partitioning: This library allows developers to easily read and write partitioned data witout any extra configuration. Just pass the columns you want to partition on, just like you would for Parquet.
  • Compression: You can specify the type of compression to use when writing Avro out to disk. The supported types are uncompressed, snappy, and deflate. You can also specify the deflate level.
  • Specifying record names: You can specify the record name and namespace to use by passing a map of parameters with recordName and recordNamespace.

Supported types for Avro -> Spark SQL conversion

This library supports reading all Avro types. It uses the following mapping from Avro types to Spark SQL types:

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
string StringType
record StructType
enum StringType
array ArrayType
map MapType
fixed BinaryType
union See below

In addition to the types listed above, it supports reading union types. Avro considers the following three types to be union types:

  1. union(int, long) maps to LongType.
  2. union(float, double) maps to DoubleType.
  3. union(something, null), where something is any supported Avro type. This maps to the same Spark SQL type as that of something, with nullable set to true.

All other union types are complex types. They map to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.

At the moment, it ignores docs, aliases and other properties present in the Avro file.

Supported types for Spark SQL -> Avro conversion

This library supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (for example IntegerType gets converted to int); below is a list of the few special cases:

Spark SQL type Avro type
ByteType int
ShortType int
DecimalType string
BinaryType bytes
TimestampType long
StructType record

Examples

The recommended way to read or write Avro data from Spark SQL is by using Spark’s DataFrame APIs, which are available in Scala, Python, and R.

These examples use an Avro file available for download here: episodes.avro

Scala API

// import needed for the .avro() method to become available
import com.databricks.spark.avro._

// The Avro records get converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.avro("/tmp/episodes.avro")
df.filter("doctor > 5").write.avro("/tmp/output")

Alternatively you can specify the format to use instead:

val df = spark.read
    .format("com.databricks.spark.avro")
    .load("/tmp/episodes.avro")

df.filter("doctor > 5").write.format("com.databricks.spark.avro").save("/tmp/output")

You can specify a custom Avro schema:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("user.avsc"))

spark
  .read
  .format("com.databricks.spark.avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro").show()

You can also specify Avro compression options:

import com.databricks.spark.avro._

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.avro("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.avro("/tmp/output")

You can write partitioned Avro records like this:

import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.partitionBy("year", "month").avro("/tmp/output")

You can specify the record name and namespace like this:

import com.databricks.spark.avro._

val df = spark.read.avro("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "com.databricks.spark.avro"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).avro("/tmp/output")

Python API

# Creates a DataFrame from a specified directory
df = spark.read.format("com.databricks.spark.avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("com.databricks.spark.avro").save("/tmp/output")

SQL API

In pure SQL one may query Avro data by registering the data file as a temporary table.

CREATE TEMPORARY TABLE episodes
USING com.databricks.spark.avro
OPTIONS (path "/tmp/episodes.avro")