Schema inference and evolution in Auto Loader

Note

JSON format support is available in Databricks Runtime 8.2 and above; CSV format support is available in Databricks Runtime 8.3 and above. For details on each format, see Data formats.

Auto Loader can automatically detect the introduction of new columns to your data and restart so you don’t have to manage the tracking and handling of schema changes yourself. Auto Loader can also “rescue” data that was unexpected (for example, of differing data types) in a JSON blob column, that you can choose to access later using the semi-structured data access APIs.

Schema inference

To infer the schema, Auto Loader samples the first 50 GB or 1000 files that it discovers, whichever limit is crossed first. To avoid incurring this inference cost at every stream start up, and to be able to provide a stable schema across stream restarts, you must set the option cloudFiles.schemaLocation. Auto Loader creates a hidden directory _schemas at this location to track schema changes to the input data over time. If your stream contains a single cloudFiles source to ingest data, you can provide the checkpoint location as cloudFiles.schemaLocation. Otherwise, provide a unique directory for this option. If your input data returns an unexpected schema for your stream, check that your schema location is being used by only a single Auto Loader source.

Note

To change the size of the sample that’s used you can set the SQL configurations:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(byte string, for example 10gb)

and

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(integer)

By default, Auto Loader infers columns in text-based file formats like CSV and JSON as string columns. In JSON datasets, nested columns are also inferred as string columns. Since JSON and CSV data is self-describing and can support many data types, inferring the data as string can help avoid schema evolution issues such as numeric type mismatches (integers, longs, floats). If you want to retain the original Spark schema inference behavior, set the option cloudFiles.inferColumnTypes to true.

Note

Unless case sensitivity is enabled, the columns foo, Foo, and FOO are considered the same column. The selection of which case the column will be represented in is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used.

Auto Loader also attempts to infer partition columns from the underlying directory structure of the data if the data is laid out in Hive style partitioning. For example, a file path such as base_path/event=click/date=2021-04-01/f0.json would result in the inference of date and event as partition columns. The data types for these columns will be strings unless you set cloudFiles.inferColumnTypes to true. If the underlying directory structure contains conflicting Hive partitions or doesn’t contain Hive style partitioning, the partition columns will be ignored. You can provide the option cloudFiles.partitionColumns as a comma-separated list of column names to always try and parse the given columns from the file path if these columns exist as key=value pairs in your directory structure.

When Auto Loader infers the schema, a rescued data column is automatically added to your schema as _rescued_data. See the section on rescued data column and schema evolution for details.

Note

Binary file (binaryFile) and text file formats have fixed data schemas, but also support partition column inference. The partition columns are inferred at each stream restart unless you specify cloudFiles.schemaLocation. To avoid any potential errors or information loss, Databricks recommends setting cloudFiles.schemaLocation or cloudFiles.partitionColumns as options for these file formats as cloudFiles.schemaLocation is not a required option for these formats.

Schema hints

The data types that are inferred may not always be exactly what you’re looking for. By using schema hints, you can superimpose the information that you know and expect on an inferred schema.

By default, Apache Spark has a standard approach for inferring the type of data columns. For example, it infers nested JSON as structs and integers as longs. In contrast, Auto Loader considers all columns as strings. When you know that a column is of a specific data type, or if you want to choose an even more general data type (for example, a double instead of an integer), you can provide an arbitrary number of hints for columns data types as follows:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

See the documentation on data types for the list of supported data types.

If a column is not present at the start of the stream, you can also use schema hints to add that column to the inferred schema.

Here is an example of an inferred schema to see the behavior with schema hints. Inferred schema:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

By specifying the following schema hints:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

you will get:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Note

Array and Map schema hints support is available in Databricks Runtime 9.1 (Beta) and Databricks Runtime 9.1 Photon (Beta) and above.

Here is an example of an inferred schema with complex datatypes to see the behavior with schema hints. Inferred schema:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

By specifying the following schema hints:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

you will get:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Note

Schema hints are used only if you do not provide a schema to Auto Loader. You can use schema hints whether cloudFiles.inferColumnTypes is enabled or disabled.

Schema evolution

Auto Loader detects the addition of new columns as it processes your data. By default, addition of a new column will cause your streams to stop with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data, and updates the schema location with the latest schema. New columns are merged to the end of the schema. The data types of existing columns remain unchanged. By setting your Auto Loader stream within a Databricks job, you can get your stream to restart automatically after such schema changes.

Auto Loader supports the following modes for schema evolution, which you set in the option cloudFiles.schemaEvolutionMode:

  • addNewColumns: The default mode when a schema is not provided to Auto Loader. The streaming job will fail with an UnknownFieldException. New columns are added to the schema. Existing columns do not evolve data types. addNewColumns is not allowed when the schema of the stream is provided. You can instead provide your schema as a schema hint instead if you want to use this mode.
  • failOnNewColumns: If Auto Loader detects a new column, the stream will fail. It will not restart unless the provided schema is updated, or the offending data file is removed.
  • rescue: The stream runs with the very first inferred or provided schema. Any data type changes or new columns that are added are rescued in the rescued data column that is automatically added to your stream’s schema as _rescued_data. In this mode, your stream will not fail due to schema changes.
  • none: The default mode when a schema is provided. Does not evolve the schema, new columns are ignored, and data is not rescued unless the rescued data column is provided separately as an option.

Partition columns are not considered for schema evolution. If you had an initial directory structure like base_path/event=click/date=2021-04-01/f0.json, and then start receiving new files as base_path/event=click/date=2021-04-01/hour=01/f1.json, the hour column is ignored. To capture information for new partition columns, set cloudFiles.partitionColumns to event,date,hour.

Rescued data column

The rescued data column ensures that you never lose or miss out on data during ETL. The rescued data column contains any data that wasn’t parsed, either because it was missing from the given schema, or because there was a type mismatch, or because the casing of the column in the record or file didn’t match with that in the schema. The rescued data column is returned as a JSON blob containing the columns that were rescued, and the source file path of the record (source file path is available in Databricks Runtime 8.3 and above). The rescued data column is part of the schema returned by Auto Loader as _rescued_data by default when the schema is being inferred. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn.

Since the default value of cloudFiles.inferColumnTypes is false, and cloudFiles.schemaEvolutionMode is addNewColumns when the schema is being inferred, rescuedDataColumn captures only columns that have a different case than that in the schema.

The JSON and CSV parsers support three modes when parsing records: PERMISSIVE, DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records—that is, incomplete or malformed JSON or CSV—are dropped or throw errors. If you use badRecordsPath when parsing JSON or CSV, data type mismatches are not considered as bad records when using the rescuedDataColumn. Only incomplete and malformed JSON or CSV records are stored in badRecordsPath.

Limitations

  • Schema evolution is not supported in Python applications running on Databricks Runtime 8.2 and 8.3 that use foreachBatch. You can use foreachBatch in Scala instead.

Example use cases

Enable easy ETL

An easy way to get your data into Delta Lake without losing any data is to use the following pattern and enabling schema inference with Auto Loader. Databricks recommends running the following code in a Databricks job for it to automatically restart your stream when the schema of your source data changes. By default, the schema is inferred as string types, any parsing errors (there should be none if everything remains as a string) will go to _rescued_data, and any new columns will fail the stream and evolve the schema.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Prevent data loss in well-structured data

When you know your schema, but want to know whenever you receive unexpected data, Databricks recommends using the rescuedDataColumn.

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

If you want your stream to stop processing if a new field is introduced that doesn’t match your schema, you can add:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Enable flexible semi-structured data pipelines

When you’re receiving data from a vendor that introduces new columns to the information they provide, you may not be aware of exactly when they do it, or you may not have the bandwidth to update your data pipeline. You can now leverage schema evolution to restart the stream and let Auto Loader update the inferred schema automatically. You can also leverage schemaHints for some of the “schemaless” fields that the vendor may be providing.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Frequently asked questions (FAQs)

How does Auto Loader infer schema?

When the DataFrame is first defined, Auto Loader lists your source directory and chooses the most recent (by file modification time) 50 GB or 1000 files, and uses those to infer your data schema.

Auto Loader also infers partition columns by examining the source directory structure and looks for file paths that contain the /key=value/ structure. If the source directory has an inconsistent structure, for example:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

Auto Loader infers the partition columns as empty. Use cloudFiles.partitionColumns to explicitly parse columns from the directory structure.

How does Auto Loader behave when the source folder is empty?

If the source directory is empty, Auto Loader requires you to provide a schema as there is no data to perform inference.

When does Autoloader infer schema? Does it evolve automatically after every micro-batch?

The schema is inferred when the DataFrame is first defined within your code. During each micro-batch, schema changes are evaluated on the fly; therefore, you don’t need to worry about performance hits. When the stream restarts, it picks up the evolved schema from the schema location and starts executing without any overhead of inference.

What’s the performance impact on ingesting the data when using Auto Loader schema inference?

You should expect schema inference to take a couple of minutes for very large source directories during initial schema inference. You shouldn’t observe significant performance hits otherwise during stream execution. If you run your code in a Databricks notebook, you can see status updates that specify when Auto Loader will be listing your directory for sampling and inferring your data schema.