Read and write streaming Avro data
Apache Avro is a commonly used data serialization system in the streaming world. A typical solution is to put data in Avro format in Apache Kafka, metadata in Confluent Schema Registry, and then run queries with a streaming framework that connects to both Kafka and Schema Registry.
Databricks supports the from_avro
and to_avro
functions to build streaming pipelines with Avro data in Kafka and metadata in Schema Registry. The function to_avro
encodes a column as binary in Avro format and from_avro
decodes Avro binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.
Note
The from_avro
and to_avro
functions:
Are available in Python, Scala, and Java.
Can be passed to SQL functions in both batch and streaming queries.
Also see Avro file data source.
Manually specified schema example
Similar to from_json and to_json, you can use from_avro
and to_avro
with any binary column. You can specify the Avro schema manually, as in the following example:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema example
You can also specify a schema as a JSON string. For example, if /tmp/user.avsc
is:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
You can create a JSON string:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Then use the schema in from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Example with Schema Registry
If your cluster has a Schema Registry service, from_avro
can work with it so that you don’t need to specify the Avro schema manually.
The following example demonstrates reading a Kafka topic “t”, assuming the key and value are already registered in Schema Registry as subjects “t-key” and “t-value” of types STRING
and INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
For to_avro
, the default output Avro schema might not match the schema of the target subject in the Schema Registry service for the following reasons:
The mapping from Spark SQL type to Avro schema is not one-to-one. See Supported types for Spark SQL -> Avro conversion.
If the converted output Avro schema is of record type, the record name is
topLevelRecord
and there is no namespace by default.
If the default output schema of to_avro
matches the schema of the target subject, you can do the following:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Otherwise, you must provide the schema of the target subject in the to_avro
function:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Authenticate to an external Confluent Schema Registry
In Databricks Runtime 12.2 LTS and above, you can authenticate to an external Confluent Schema Registry. The following examples demonstrate how to configure your schema registry options to include auth credentials and API keys.
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Use truststore and keystore files in Unity Catalog volumes
In Databricks Runtime 14.3 LTS and above, you can use truststore and keystore files in Unity Catalog volumes to authenticate to a Confluent Schema Registry. Update the configuration in the previous example using the following syntax:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Use schema evolution mode with from_avro
In Databricks Runtime 14.2 and above, you can use schema evolution mode with from_avro
. Enabling schema evolution mode causes the job to throw an UnknownFieldException
after detecting schema evolution. Databricks recommends configuring jobs with schema evolution mode to automatically restart on task failure. See Configure Structured Streaming jobs to restart streaming queries on failure.
Schema evolution is useful if you expect the schema of your source data to evolve over time and ingest all fields from your data source. If your queries already explicitly specify which fields to query in your data source, added fields are ignored regardless of schema evolution.
Use the avroSchemaEvolutionMode
option to enable schema evolution. The following table describes the options for schema evolution mode:
Option |
Behavior |
---|---|
|
Default. Ignores schema evolution and the job continues. |
|
Throws an |
Note
You can change this configuration between streaming jobs and reuse the same checkpoint. Disabling schema evolution can result in dropped columns.
Configure the parse mode
You can configure the parse mode to determine whether you want to fail or emit null records when schema evolution mode is disabled and the schema evolves in a non-backward compatible way. With default settings, from_avro
fails when it observes incompatible schema changes.
Use the mode
option to specify parse mode. The following table describes the option for parse mode:
Option |
Behavior |
---|---|
|
Default. A parsing error throws a |
|
A parsing error is ignored and a null record is emitted. |
Note
With schema evolution enabled, FAILFAST
only throws exceptions if a record is corrupted.
Example using schema evolution and setting parse mode
The following example demonstrates enabling schema evolution and specifying FAILFAST
parse mode with a Confluent Schema Registry:
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)