Type widening
This feature is in Public Preview in Databricks Runtime 15.4 LTS and above.
Tables with type widening enabled allow you to change column data types to a wider type without rewriting underlying data files. You can either change column types manually or use schema evolution to evolve column types.
Type widening is available in Databricks Runtime 15.4 LTS and above. Tables with type widening enabled can only be read in Databricks Runtime 15.4 LTS and above.
Type widening requires Delta Lake. All Unity Catalog managed tables use Delta Lake by default.
Supported type changes
You can widen types according to the following rules:
Source type | Supported wider types |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Type changes are supported for top-level columns and fields nested inside structs, maps, and arrays.
Spark truncates the fractional part of a value by default when an operation promotes an integer type to a decimal or double and a downstream ingestion writes the value back to an integer column. For details about assignment policy behavior, see Store assignment.
When changing any numeric type to decimal, the total precision must be equal to or greater than the starting precision. If you also increase the scale, the total precision must increase by a corresponding amount.
The minimum target for byte, short, and int types is decimal(10,0). The minimum target for long is decimal(20,0).
If you want to add two decimal places to a field with decimal(10,1), the minimum target is decimal(12,3).
Enable type widening
You can enable type widening on an existing table by setting the delta.enableTypeWidening table property to true:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
You can also enable type widening during table creation:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
When you enable type widening, it sets the table feature typeWidening, which upgrades the reader and writer protocols. You must use Databricks Runtime 15.4 or above for to interact with tables with type widening enabled. If external clients also interact with the table, verify that they support this table feature. See Delta Lake feature compatibility and protocols.
Manually apply a type change
Use the ALTER COLUMN command to manually change types:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
This operation updates the table schema without rewriting the underlying data files. See ALTER TABLE for more details.
Widen types with automatic schema evolution
Schema evolution works with type widening to update data types in target tables to match the type of incoming data.
Without type widening enabled, schema evolution always attempts to downcast data to match column types in the target table. If you don't want to automatically widen data types in your target tables, disable type widening before you run workloads with schema evolution enabled.
To use schema evolution to widen the data type of a column during ingestion, you must meet the following conditions:
- The write command runs with automatic schema evolution enabled.
- The target table has type widening enabled.
- The source column type is wider than the target column type.
- Type widening supports the type change.
Type mismatches that don't meet all of these conditions follow normal schema enforcement rules. See Schema enforcement.
The following examples demonstrate how type widening works with schema evolution for common write operations.
- Python
- Scala
- SQL
# Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
# Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
# Example 2: Automatic type widening in MERGE INTO
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
// Create target table with INT column and source table with BIGINT column
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
// Example 1: Automatic type widening in saveAsTable()
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
// Example 2: Automatic type widening in MERGE INTO
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
-- Create target table with INT column and source table with BIGINT column
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
-- Example 1: Automatic type widening in INSERT INTO
---- Enable schema evolution
SET spark.databricks.delta.schema.autoMerge.enabled = true;
---- Insert data with BIGINT value column - automatically widens INT to BIGINT
INSERT INTO target_table SELECT * FROM source_table;
-- Example 2: Automatic type widening in MERGE INTO
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Disable the type widening table feature
You can prevent accidental type widening on enabled tables by setting the property to false:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
This setting prevents future type changes to the table, but doesn't remove the type widening table feature or undo previous type changes.
If you need to completely remove the type widening table features, you can use the DROP FEATURE command as shown in the following example:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
Tables that enabled type widening using Databricks Runtime 15.4 LTS require dropping feature typeWidening-preview instead.
When dropping type widening, Databricks rewrites all data files that don't conform to the current table schema. See Drop a Delta Lake table feature and downgrade table protocol.
Streaming from a Delta table
Support for type widening in Structured Streaming is available in Databricks Runtime 16.4 LTS and above.
When streaming from a Delta table with type widening enabled, you can configure automatic type widening for streaming queries by enabling schema evolution with the mergeSchema option on the target table. The target table must have type widening enabled. See Enable type widening.
- Python
- Scala
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
When mergeSchema is enabled and the target table has type widening enabled:
- Type changes are applied automatically to the downstream table without requiring manual intervention.
- New columns are added automatically to the downstream table schema.
Without mergeSchema enabled, values are handled according to the spark.sql.storeAssignmentPolicy configuration, which by default downcasts values to match the target column type. For more information about assignment policy behavior, see Store assignment.
Manual type change acknowledgment
When streaming from a Delta table, you can provide a schema tracking location to track non-additive schema changes including type changes. Providing a schema tracking location is required in Databricks Runtime 18.0 and below, and it is optional in Databricks Runtime 18.1 and above.
- Python
- Scala
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
After providing a schema tracking location, the stream evolves its tracked schema when it detects a type change and then stops. At that time, you can take any necessary action to handle the type change, such as enabling type widening on the downstream table or updating the streaming query.
To resume processing, set the Spark configuration spark.databricks.delta.streaming.allowSourceColumnTypeChange or the DataFrame reader option allowSourceColumnTypeChange:
- Python
- Scala
- SQL
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
The checkpoint ID <checkpoint_id> and Delta Lake source table version <delta_source_table_version> are displayed in the error message when the stream stops.
Lakeflow Spark Declarative Pipelines
Support for type widening in Lakeflow Spark Declarative Pipelines is available in the PREVIEW channel.
You can enable type widening for Lakeflow Spark Declarative Pipelines at the pipeline level or for individual tables. Type widening allows column types to be automatically widened during pipeline execution without requiring a full refresh of streaming tables. Type changes in materialized views always trigger a full recompute, and when a type change is applied to a source table, materialized views that depend on that table require a full recompute to reflect the new types.
Enable type widening for an entire pipeline
To enable type widening for all tables in a pipeline, set the pipeline configuration pipelines.enableTypeWidening:
- JSON
- YAML
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
configuration:
pipelines.enableTypeWidening: 'true'
Enable type widening for specific tables
You can also enable type widening for individual tables by setting the table property delta.enableTypeWidening:
- Python
- SQL
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
Compatibility with downstream readers
Tables with type widening enabled can only be read in Databricks Runtime 15.4 LTS and above. If you want a table with type widening enabled in your pipeline to be readable by readers on Databricks Runtime 14.3 and below, you must either:
- Disable type widening by removing the property
delta.enableTypeWidening/pipelines.enableTypeWideningor setting it to false, and trigger a full refresh of the table. - Enable Compatibility Mode on your table.
Delta Sharing
Support for Type Widening in Delta Sharing is available in Databricks Runtime 16.1 and above.
Sharing a Delta Lake table with type widening enabled is supported in Databricks-to-Databricks Delta Sharing. The provider and recipient must be on Databricks Runtime 16.1 or above.
To read Change Data Feed from a Delta Lake table with type widening enabled using Delta Sharing, you must set the response format to delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
Reading Change Data Feed across type changes is not supported. You must instead split the operation into two separate reads, one ending at the table version containing the type change, and the other starting at the version containing the type change.
Limitations
Apache Iceberg Compatibility
Apache Iceberg doesn't support all type changes covered by type widening, see Iceberg Schema Evolution. In particular, Databricks does not support the following type changes:
byte,short,int,longtodecimalordouble- decimal scale increase
datetotimestampNTZ
When you enable UniForm with Iceberg compatibility on a Delta Lake table, applying one of these type changes results in an error. See Read Delta tables with Iceberg clients.
If you apply one of these unsupported type changes to a Delta Lake table, you have two options:
-
Regenerate Iceberg metadata: Use the following command to regenerate Iceberg metadata without the type widening table feature:
SQLALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')This allows you to maintain Uniform compatibility after applying incompatible type changes.
-
Drop the type widening table feature: See Disable the type widening table feature.
Type-dependent functions
Some SQL functions return results that depend on the input data type. For example, hash function returns different hash values for the same logical value if the argument type is different: hash(1::INT) returns a different result than hash(1::BIGINT).
Other type dependent functions are: xxhash64, bit_get, bit_reverse, typeof.
For stable results in queries that use these functions, explicitly cast values to the desired type:
- Python
- Scala
- SQL
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
Other Limitations
- You cannot provide a schema tracking location using SQL when streaming from a Delta Lake table with a type change.
- You cannot share a table with type widening enabled to non-Databricks consumers using Delta Sharing.