Infer and evolve the schema using from_json
in DLT
This feature is in Public Preview.
This article describes how to infer and evolve the schema of JSON blobs with the from_json
SQL function in DLT.
Overview
The from_json
SQL function parses a JSON string column and returns a struct value. When used outside of DLT, you must explicitly provide the schema of the returned value using the schema
argument. When used with DLT, you can enable schema inference and evolution, which automatically manages the schema of the returned value. This feature simplifies both the initial setup (especially when the schema is unknown) and ongoing operations when the schema changes frequently. It enables seamless processing of arbitrary JSON blobs from streaming data sources such as Auto Loader, Kafka, or Kinesis.
Specifically, when used in DLT, schema inference and evolution for the from_json
SQL function can:
- Detect new fields in incoming JSON records (including nested JSON objects)
- Infer the field types and map them to appropriate Spark data types
- Automatically evolve the schema to accommodate new fields
- Automatically handle data that does not conform to the current schema
Syntax: Automatically infer and evolve the schema
If you use from_json
with DLT, it can automatically infer and evolve the schema. To enable this, set the schema to NULL and specify the schemaLocationKey
option. This enables it to infer and keep track of the schema.
- SQL
- Python
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>” [, otherOptions]))
from_json(jsonStr, None, {"schemaLocationKey": "<uniqueKey>”[, otherOptions]})
A query can have multiple from_json
expressions, but each expression must have a unique schemaLocationKey
. The schemaLocationKey
must also be unique per pipeline.
- SQL
- Python
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(
col("value"),
from_json(col("value"), None, {"schemaLocationKey": "keyX"}).alias("parsedX"),
from_json(col("value"), None, {"schemaLocationKey": "keyY"}).alias("parsedY"))
)
Syntax: Fixed schema
If you want to enforce a particular schema instead, you can use the following from_json
syntax to parse JSON string using that schema:
from_json(jsonStr, schema, [, options])
This syntax can be used in any Databricks environment, including DLT. More information is available here.
Schema Inference
from_json
infers the schema from the first batch of JSON data columns and internally indexes it by its schemaLocationKey
(required).
If the JSON string is a single object (for example, {"id": 123, "name": "John"}
), from_json
infers a schema of type STRUCT and adds a rescuedDataColumn
to the list of fields.
STRUCT<id LONG, name STRING, _rescued_data STRING>
However, if the JSON string has a top-level array (such as ["id": 123, "name": "John"]
), then from_json
wraps the ARRAY in a STRUCT. This approach enables rescuing data that is incompatible with the inferred schema. You have the option to explode the array values into separate rows downstream.
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
Override schema inference using schema hints
You can optionally provide schemaHints
to influence how from_json
infers the type of a column. This is helpful when you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double instead of an integer). You can provide an arbitrary number of hints for column data types using SQL schema specification syntax. The semantics for schema hints are the same as for the Auto Loader schema hints. For example:
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)
When the JSON string contains a top-level ARRAY, it is wrapped in a STRUCT. In these cases, schema hints are applied to the ARRAY schema instead of the wrapped STRUCT. For example, consider a JSON string with a top-level array such as:
[{"id": 123, "name": "John"}]
The inferred ARRAY schema is wrapped in a STRUCT:
STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>
To change the data type of id
, specify the schema hint as element.id
STRING. To add a new column of type DOUBLE, specify element.new_col
DOUBLE. Because of these hints, the schema for the top-level JSON array becomes:
struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>
Evolve the schema using schemaEvolutionMode
from_json
detects the addition of new columns as it processes your data. When from_json
detects a new field, it updates the inferred schema with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged. After the schema update, the pipeline restarts automatically with the updated schema.
from_json
supports the following modes for schema evolution, which you set using the optional schemaEvolutionMode
setting. These modes are consistent with Auto Loader.
| Behavior on reading a new column |
---|---|
| Stream fails. New columns are added to the schema. Existing columns do not evolve data types. |
| Schema is never evolved and stream does not fail due to schema changes. All new columns are recorded in the rescued data column. |
| Stream fails. Stream does not restart unless the |
| Does not evolve the schema, new columns are ignored, and data is not rescued unless the |
For example:
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)
Rescued data column
A rescued data column is automatically added to your schema as _rescued_data
. You can rename the column by setting the rescuedDataColumn
option. For example:
from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})
When you choose to use the rescued data column, any columns that don’t match the inferred schema are rescued instead of dropped. This might happen because of a data type mismatch, a missing column in the schema, or a column name casing difference.
Handle corrupt records
To store records that are malformed and cannot be parsed, add a _corrupt_record
column by setting schema hints, like in the following example:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
To rename the corrupt record column, set the columnNameOfCorruptRecord
option.
The JSON parser supports three modes for handling corrupt records:
Mode | Description |
---|---|
| For corrupted records, puts the malformed string into a field configured by |
| Ignores corrupted records. When you use |
| Throws an exception when the parser meets corrupted records. When you use |
Refer to a field in the from_json output
from_json
infers the schema during pipeline execution. If a downstream query refers to a from_json
field before the from_json
function has executed successfully at least one time, the field doesn't resolve and the query is skipped. In the following example, analysis for the silver table query will be skipped until the from_json
function in the bronze query has executed and inferred the schema.
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze
If the from_json
function and the fields it infers are referred to in the same query, analysis might fail like in the following example:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
You can fix this by moving the reference to the from_json
field into a downstream query (like the bronze/silver example above.) Alternatively, you can specify schemaHints
that contain the referred from_json
fields. For example:
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0
Examples: Automatically infer and evolve the schema
This section provides example code for enabling automatic schema inference and evolution using from_json
in DLT.
Create a streaming table from cloud object storage
The following example uses read_files
syntax to create a streaming table from cloud object storage.
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
@dlt.table(comment="from_json autoloader example")
def bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "text")
.load("/databricks-datasets/nyctaxi/sample/json/")
.select(from_json(col("value"), None, {"schemaLocationKey": "nycTaxi"}).alias("jsonCol"))
)
Create a streaming table from Kafka
The following example uses read_kafka
syntax to create a streaming table from Kafka.
- SQL
- Python
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)
@dlt.table(comment="from_json kafka example")
def bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.select(col(“value”), from_json(col(“value”), None, {"schemaLocationKey": "keyX"}).alias("jsonCol"))
)
Examples: Fixed schema
For example code using from_json
with a fixed schema, see from_json
function.
FAQs
This section answers frequently asked questions about schema inference and evolution support in the from_json
function.
What is the difference between from_json
and parse_json
?
The parse_json
function returns a VARIANT
value from the JSON string.
VARIANT provides a flexible and efficient way to store semi-structured data. This circumvents schema inference and evolution by doing away with strict types altogether. However, if you want to enforce a schema at write time (for example, because you have a relatively strict schema), from_json
might be a better option.
The following table describes the differences between from_json
and parse_json
:
Function | Use cases | Availability |
---|---|---|
| Schema evolution with
| Available with schema inference & evolution only in DLT |
| VARIANT is particularly well-suited to holding data that does not need to be schematized. For example:
| Available with and without DLT |
Can I use from_json
schema inference and evolution syntax outside of DLT?
No, you can't use from_json
schema inference and evolution syntax outside of DLT.
How do I access the schema inferred by from_json
?
View the schema of the target streaming table.
Can I pass from_json
a schema and also do evolution?
No, you can't pass from_json
a schema and also do evolution. However, you can provide schema hints to override some or all the fields inferred by from_json
.
What happens to the schema if the table is fully refreshed?
The schema locations associated with the table are cleared, and the schema is re-inferred from scratch.