Ingest Avro data with Auto Loader

Note

Schema inference for Avro files is available in Databricks Runtime 10.2 and above.

You can use Auto Loader to ingest Avro data into Delta Lake with only a few lines of code. Auto Loader provides the following benefits:

  • Automatic discovery of new files to process: You do not need special logic to handle late arriving data or to keep track of which files that you have already processed.

  • Scalable file discovery: Auto Loader can ingest billions of files with ease.

  • Schema inference and evolution: Auto Loader can infer your data schema and detect schema drift in real time. It can also evolve the schema to add new columns and continue the ingestion with the new schema automatically when the stream is restarted.

  • Data rescue: You can configure Auto Loader to rescue data that cannot be read from your Avro file by placing that data in a rescued data column, which preserves the structure of your Avro record.

You can use the following code to run Auto Loader with schema inference and evolution capabilities on Avro files. You specify cloudFiles as the format to leverage Auto Loader. To ingest Avro files, specify avro with the option cloudFiles.format. In the option cloudFiles.schemaLocation specify a directory that Auto Loader can use to persist the schema changes in your source data over time:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "avro") \
  # The schema location directory keeps track of the data schema over time.
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "avro")
  // The schema location directory keeps track of the data schema over time.
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Schema inference and evolution

To infer the schema, Auto Loader uses a sample of data. By default, Auto Loader merges schemas across the sample of Avro files or throws an exception if there are mismatched column types.

Learn more about schema inference and evolution with Auto Loader in Configuring schema inference and evolution in Auto Loader.

Rescued data column

The rescued data column contains any data that was not read because it was missing from the given schema, because there was a type mismatch, or because the casing of the column did not match. The rescued data column is part of the schema returned by Auto Loader as _rescued_data by default as the schema is inferred. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn.

Note

You can provide a rescued data column to all Avro readers in Databricks Runtime by using the option rescuedDataColumn, for example as an option to spark.read.format("avro") by using the DataFrameReader.

/path/to/table/f0.avro:
---------------------
{"name":"john","age":20,"lucky_number":4}
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "avro") \
  .option("rescuedDataColumn", "_rescue") \
  .schema("name string, age int") \
  .load("/path/to/table/")
+-------+------+--------------------------------------------+
| name  | age  |                 _rescue                    |
+-------+------+--------------------------------------------+
| john  | 20   | {                                          |
|       |      |    "lucky_number": 4,                      |
|       |      |    "_file_path": "/path/to/table/f0.avro"  |
|       |      | }                                          |
+-------+------+--------------------------------------------+

Changing the case-sensitive behavior

When rescued data column is enabled, fields named in a case other than that of the schema will be loaded to the _rescued_data column. You can change this behavior by setting the option readerCaseSensitive to false, in which case Auto Loader will read data in a case-insensitive way.