Automatic type widening with Auto Loader
This feature is in Public Preview in Databricks Runtime 16.4 and above.
Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage. It also reduces pipeline maintenance by automatically handling complex schema changes. For example, you can configure Auto Loader to automatically detect the schema of loaded data, allowing you to initialize tables without explicitly declaring the data schema. You can also evolve the table schema as new columns are introduced, eliminating the need to manually track and apply schema changes over time. Auto Loader can even rescue data that is unexpected (for example, because of differing data types) in a rescued data column, helping you avoid data loss.
However, the rescued data column requires that you manually deal with any data type changes.
To automatically handle some of these data type changes, use type widening in Auto Loader. Delta Lake now supports different data type widening changes without requiring data rewrite or user intervention. See Delta Lake Type widening. The new mode for schema evolution, addNewColumnsWithTypeWidening, automatically evolve schema on compatible data type changes.
You can widen primitive types like int to long, float to double, and more. Type widening is available for all file formats with schema evolution support in Auto Loader. This includes text formats (like JSON, CSV, or XML) and binary formats (like Avro or Parquet). There is no change in schema evolution behavior for existing schema evolution modes (such as addNewColumns, rescue, failOnNewColumns, or none).
Supported type changes
The following type changes are supported:
Source type | Supported wider types |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
When widening any numeric type to decimal, Auto Loader widens to decimal with precision equal to or greater than the starting precision. If you increase the scale, the total precision increases by a corresponding amount.
The starting precision of integer types is the following:
Type | Starting precision |
|---|---|
|
|
|
|
|
|
|
|
For example, if the current type of a column is int, and a file with the type of that column as decimal(5, 2) is read, Auto Loader widens the type of that column to decimal(12, 2).
Prerequisites
To use type widening with Auto Loader, you must meet the following requirements:
- Use Databricks Runtime 16.4 or above.
- If the write sink is a Delta Lake table, enable type widening for the Delta Lake table using one of the following methods:
-
If using an existing table:
SQLALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true') -
If creating a new table with type widening enabled:
SQLCREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
-
For more information about type widening in Delta Lake tables, see Type widening.
Enable type widening with schema evolution
To use type widening with Auto Loader, specify addNewColumnsWithTypeWidening when using schema evolution. Auto Loader detects the addition of new columns and type changes as it processes your data.
- Python
- Scala
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", True)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(availableNow=True)
.toTable("table_name")
)
val query = spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", true)
.option("cloudFiles.schemaLocation", <schemaPath>)
.option("cloudFiles.schemaEvolutionMode", "addNewColumnsWithTypeWidening")
.load(<inputPath>)
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", <checkpointPath>)
.trigger(Trigger.AvailableNow())
.toTable("table_name")
When Auto Loader detects a new column or a type change that's supported by type widening, the stream stops with an UnknownFieldException. Before your stream throws this error, Auto Loader performs schema inference on the latest micro-batch of data and updates the schema location with the latest schema by widening the existing columns or merging new columns to the end of the schema.
Schema evolution behavior on data type changes
If you were to ingest a CSV with the following content, Auto Loader infers the schema as STRUCT<id INT, name STRING, _rescued_data STRING>.
id, name
1, John
2, Mary
The target table looks as follows:
id | name | _rescued_data |
|---|---|---|
1 | John | NULL |
2 | Mary | NULL |
Now, ingest another CSV file where the values in the id column are wider than INT type:
id, name, age
2147483648, Bob, 25
The following table explains the behavior and output with different schema evolution modes in Auto Loader:
Mode | Behavior with supported widenable data type change |
|---|---|
| Data type does not evolve and stream does not fail due to data type change. Columns with type mismatched values are set to |
| Schema does not evolve and streams do not fail due to any schema changes. Columns with type mismatched values are set to |
| Data type does not evolve and stream does not fail due to data type change. Columns with type mismatched values are set to |
| Does not evolve the schema, new columns are ignored, and data is not rescued unless the |
| Stream fails. New columns are added to the schema, and supported data type changes are widened. Unsupported data type changes (for example, |
Example results
The following table shows the inferred schema and values for each schema evolution mode after ingesting the second CSV file:
Mode | Inferred schema and values | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Schema: Values:
| ||||||||||||||||
| Schema: Values:
| ||||||||||||||||
| Schema: Values:
| ||||||||||||||||
| Schema: Values:
| ||||||||||||||||
| Schema: Values:
|
Limitations
- The option
prefersDecimalcannot be set tofalsewhen usingaddNewColumnsWithTypeWidening. WhenaddNewColumnsWithTypeWideningis specified, the default value ofprefersDecimalistrue. datetotimestampNTZwidening is only supported for Parquet files.