Ingest Parquet data with Auto Loader

Note

Schema inference for Parquet files is available in Databricks Runtime 11.1 and above.

Using Auto Loader to ingest Parquet data into Delta Lake takes only a few lines of code. By leveraging Auto Loader, you get the following benefits:

  • Automatic discovery of new files to process: You don’t need to have special logic to handle late arriving data or keep track of which files have been processed yourself.

  • Scalable file discovery: Auto Loader can ingest billions of files without a hiccup.

  • Schema inference and evolution: Auto Loader can infer your data schema and detect schema drift on the fly. It can also evolve the schema to add new columns and restart the stream with the new schema automatically.

  • Data rescue: You can configure Auto Loader to rescue data that couldn’t be read properly in a rescued data column that preserves the structure of your nested record.

You can use the following code to run Auto Loader with schema inference and evolution capabilities on Parquet files. You specify cloudFiles as the format to leverage Auto Loader. To ingest Parquet files, specify parquet with the option cloudFiles.format. In the option cloudFiles.schemaLocation specify a directory that Auto Loader can use to persist the schema changes in your source data over time:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "parquet") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Schema inference and evolution

Each Parquet file is self describing and associated with a typed schema. To infer the schema of the Parquet data, Auto Loader samples a subset of Parquet files and merges the schemas of these individual files.

If a column has different data types in two Parquet files, Auto Loader will determine if one data type can be safely upcast to the other. If upcasting is possible, Auto Loader can merge the two schemas and choose the more encompassing data type for this column; otherwise the inference will fail. For example, a: int and a: double can be merged as a: double; a: double and a: string can be merged as a: string; but a: int and a: struct cannot be merged. Note that, after merging a: int and a: double as a: double, Auto Loader can read Parquet files with column a: double as normal, but for the Parquet files with a: int, Auto Loader needs to read as a as part of the rescued data column, because the data type is different from the inferred schema. Users still have a chance to safely upcast the rescued a:int and backfill a:double later.

Unless case sensitivity is enabled, the columns abc, Abc, and ABC are considered the same column for the purposes of schema inference. The selection of which case will be chosen is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used. Once a selection has been made and the schema is inferred, Auto Loader will not consider the casing variants that were not selected consistent with the schema. These columns may be found in the rescued data column.

Learn more about schema inference and evolution with Auto Loader in Configuring schema inference and evolution in Auto Loader.

Rescued data column

Since Parquet files are self describing, Parquet data can have the following problems:

  • A column can appear with different data types in different files

  • A column can appear with different cases in different files (for example, as “foo” and “Foo”)

  • A new column can appear in a subset of files

  • There may not be a globally consistent schema across all files

The rescued data column addresses these issues and will capture data:

  1. Missing from the given schema

  2. Containing a type mismatch

  3. From fields with inconsistent name casing

Auto Loader includes the rescued data column as part of the inferred schema with the default name _rescued_data. You can rename the column or include it when you provide a schema by setting the option rescuedDataColumn.

/path/to/table/f0.parquet (data shown as JSON):
----------------------------------------------
{"name":"john","age":20,"lucky_number":4}
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "parquet") \
  .option("rescuedDataColumn", "_rescue") \
  .schema("name string, age int") \
  .load("/path/to/table/")
+-------+------+-----------------------------------------------+
| name  | age  |                 _rescue                       |
+-------+------+-----------------------------------------------+
| john  | 20   | {                                             |
|       |      |    "lucky_number": 4,                         |
|       |      |    "_file_path": "/path/to/table/f0.parquet"  |
|       |      | }                                             |
+-------+------+-----------------------------------------------+

Changing the case-sensitive behavior

When rescued data column is enabled, fields named in a case other than that of the schema will be loaded to the _rescued_data column. You can change this behavior by setting the option readerCaseSensitive to false, in which case Auto Loader will read data in a case-insensitive way.