Read and Write Streaming Avro Data with DataFrames

Apache Avro is a commonly used data serialization system in the streaming world, and many users have a requirement to read and write Avro data in Apache Kafka. One commonly used solution is to put data in Kafka with Avro format, metadata in Confluent Schema Registry, and then run queries with a streaming framework that can connect to both Kafka and Schema Registry.

In Databricks, the from_avro and to_avro functions, provided by the Avro data source, can be used in Spark Structured Streaming to build streaming pipelines with Avro data in Kafka and metadata in Schema Registry.

Note

The from_avro and to_avro functions are:

  • In preview and may change in future releases.
  • Available only in Scala and Java.
  • Normal SQL functions and can be used in both batch and streaming queries.

For other aspects of Avro, see Avro Files.

Basic Example

Similar to from_json and to_json, from_avro and to_avro can also be used with any binary column, but you must specify the Avro schema manually.

import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka article, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save them to a Kafka article.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("article", "t")
  .save()

Example with Schema Registry

If your cluster has a Schema Registry service, from_avro can work with it so that you don’t need to specify the Avro schema manually.

import org.apache.spark.sql.avro._

// Read a Kafka article "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

For to_avro, the default output Avro schema might not match the schema of the target subject in the Schema Registry service for the following reasons:

  • The mapping from Spark SQL type to Avro schema is not one-to-one. See the Supported types for Spark SQL -> Avro conversion.
  • If the converted output Avro schema is of record type, the record name is “topLevelRecord” and there is no namespace by default.

If the default output schema of to_avro matches the schema of the target subject, you can do the following:

// The converted data is saved to Kafka as a Kafka article "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("article", "t")
.save()

Otherwise, you must provide the schema of the target subject in the to_avro function:

// The AVRO schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka article "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("article", "t")
.save()