Ingest Avro data with Auto Loader
Note
Schema inference for Avro files is available in Databricks Runtime 10.2 and above.
You can use Auto Loader to ingest Avro data into Delta Lake with only a few lines of code. Auto Loader provides the following benefits:
Automatic discovery of new files to process: You do not need special logic to handle late arriving data or to keep track of which files that you have already processed.
Scalable file discovery: Auto Loader can ingest billions of files with ease.
Schema inference and evolution: Auto Loader can infer your data schema and detect schema drift in real time. It can also evolve the schema to add new columns and continue the ingestion with the new schema automatically when the stream is restarted.
Data rescue: You can configure Auto Loader to rescue data that cannot be read from your Avro file by placing that data in a rescued data column, which preserves the structure of your Avro record.
You can use the following code to run Auto Loader with schema inference and evolution capabilities on Avro files. You specify cloudFiles
as the format
to leverage Auto Loader. To ingest Avro files, specify avro
with the option cloudFiles.format
. In the option cloudFiles.schemaLocation
specify a directory that Auto Loader can use to persist the schema changes in your source data over time:
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "avro") \
# The schema location directory keeps track of the data schema over time.
.option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
.load("<path_to_source_data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path_to_checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "avro")
// The schema location directory keeps track of the data schema over time.
.option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
.load("<path_to_source_data>")
.writeStream
.option("checkpointLocation", "<path_to_checkpoint>")
.start("<path_to_target")
Schema inference and evolution
To infer the schema, Auto Loader uses a sample of data. By default, Auto Loader merges schemas across the sample of Avro files or throws an exception if there are mismatched column types.
Learn more about schema inference and evolution with Auto Loader in Configuring schema inference and evolution in Auto Loader.
Rescued data column
The rescued data column contains any data that was not read because it was missing from the given schema, because there was a type mismatch, or because the casing of the column did not match. The rescued data column is part of the schema returned by Auto Loader as _rescued_data
by default as the schema is inferred. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn
.
Note
You can provide a rescued data column to all Avro readers in Databricks Runtime by using the option rescuedDataColumn
, for example as an option to spark.read.format("avro")
by using the DataFrameReader.
/path/to/table/f0.avro:
---------------------
{"name":"john","age":20,"lucky_number":4}
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "avro") \
.option("rescuedDataColumn", "_rescue") \
.schema("name string, age int") \
.load("/path/to/table/")
+-------+------+--------------------------------------------+
| name | age | _rescue |
+-------+------+--------------------------------------------+
| john | 20 | { |
| | | "lucky_number": 4, |
| | | "_file_path": "/path/to/table/f0.avro" |
| | | } |
+-------+------+--------------------------------------------+
Changing the case-sensitive behavior
When rescued data column is enabled, fields named in a case other than that of the schema will be loaded to the _rescued_data
column. You can change this behavior by setting the option readerCaseSensitive
to false, in which case Auto Loader will read data in a case-insensitive way.