Ingest Avro data with Auto Loader

Note

Schema inference for Avro files is available in Databricks Runtime 10.2 and above.

You can use Auto Loader to ingest Avro data into Delta Lake with only a few lines of code. Auto Loader provides the following benefits:

  • Automatic discovery of new files to process: You do not need special logic to handle late arriving data or to keep track of which files that you have already processed.

  • Scalable file discovery: Auto Loader can ingest billions of files with ease.

  • Schema inference and evolution: Auto Loader can infer your data schema and detect schema drift in real time. It can also evolve the schema to add new columns and continue the ingestion with the new schema automatically when the stream is restarted.

  • Data rescue: You can configure Auto Loader to rescue data that cannot be read from your Avro file by placing that data in a rescued data column, which preserves the structure of your Avro record.

You can use the following code to run Auto Loader with schema inference and evolution capabilities on Avro files. You specify cloudFiles as the format to leverage Auto Loader. To ingest Avro files, specify avro with the option cloudFiles.format. In the option cloudFiles.schemaLocation specify a directory that Auto Loader can use to persist the schema changes in your source data over time:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "avro") \
  # The schema location directory keeps track of the data schema over time.
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "avro")
  // The schema location directory keeps track of the data schema over time.
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Schema inference and evolution

To infer the schema, Auto Loader uses a sample of data. By default, Auto Loader merges schemas across the sample of Avro files or throws an exception if there are mismatched column types.

Learn more about schema inference and evolution with Auto Loader in Schema inference and evolution in Auto Loader.

Rescued data column

The rescued data column contains any data that was not read because it was missing from the given schema, because there was a type mismatch, or because the casing of the column did not match. The rescued data column is part of the schema returned by Auto Loader as _rescued_data by default as the schema is inferred. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn.

Note

You can provide a rescued data column to all Avro readers in Databricks Runtime by using the option rescuedDataColumn, for example as an option to spark.read.format("avro") by using the DataFrameReader.

/path/to/table/f0.avro:
---------------------
{"name":"john","age":20,"lucky_number":4}
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "avro") \
  .option("rescuedDataColumn", "_rescue") \
  .schema("name string, age int") \
  .load("/path/to/table/")
+-------+------+--------------------------------------------+
| name  | age  |                 _rescue                    |
+-------+------+--------------------------------------------+
| john  | 20   | {                                          |
|       |      |    "lucky_number": 4,                      |
|       |      |    "_file_path": "/path/to/table/f0.avro"  |
|       |      | }                                          |
+-------+------+--------------------------------------------+

Changing the case-sensitive behavior

By default, the configuration spark.sql.caseSensitive is used to determine whether a column in the input data matches what is expected such that when the configuration value is true, columns mismatched only by case will also be rescued. You can also set the option readerCaseSensitive to control the case-sensitive behavior separately from the SQLConf.