Skip to main content

Infer and evolve the schema using from_json in DLT

Preview

This feature is in Public Preview.

This article describes how to infer and evolve the schema of JSON blobs with the from_json SQL function in DLT.

Overview

The from_json SQL function parses a JSON string column and returns a struct value. When used outside of DLT, you must explicitly provide the schema of the returned value using the schema argument. When used with DLT, you can enable schema inference and evolution, which automatically manages the schema of the returned value. This feature simplifies both the initial setup (especially when the schema is unknown) and ongoing operations when the schema changes frequently. It enables seamless processing of arbitrary JSON blobs from streaming data sources such as Auto Loader, Kafka, or Kinesis.

Specifically, when used in DLT, schema inference and evolution for the from_json SQL function can:

  • Detect new fields in incoming JSON records (including nested JSON objects)
  • Infer the field types and map them to appropriate Spark data types
  • Automatically evolve the schema to accommodate new fields
  • Automatically handle data that does not conform to the current schema

Syntax: Automatically infer and evolve the schema

If you use from_json with DLT, it can automatically infer and evolve the schema. To enable this, set the schema to NULL and specify the schemaLocationKey option. This enables it to infer and keep track of the schema.

SQL
from_json(jsonStr, NULL, map("schemaLocationKey", "<uniqueKey>[, otherOptions]))

A query can have multiple from_json expressions, but each expression must have a unique schemaLocationKey. The schemaLocationKey must also be unique per pipeline.

SQL
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) parsedX,
from_json(value, NULL, map('schemaLocationKey', 'keyY')) parsedY,
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Syntax: Fixed schema

If you want to enforce a particular schema instead, you can use the following from_json syntax to parse JSON string using that schema:

from_json(jsonStr, schema, [, options])

This syntax can be used in any Databricks environment, including DLT. More information is available here.

Schema Inference

from_json infers the schema from the first batch of JSON data columns and internally indexes it by its schemaLocationKey (required).

If the JSON string is a single object (for example, {"id": 123, "name": "John"}), from_json infers a schema of type STRUCT and adds a rescuedDataColumn to the list of fields.

STRUCT<id LONG, name STRING, _rescued_data STRING>

However, if the JSON string has a top-level array (such as ["id": 123, "name": "John"]), then from_json wraps the ARRAY in a STRUCT. This approach enables rescuing data that is incompatible with the inferred schema. You have the option to explode the array values into separate rows downstream.

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

Override schema inference using schema hints

You can optionally provide schemaHints to influence how from_json infers the type of a column. This is helpful when you know that a column is of a specific data type, or if you want to choose a more general data type (for example, a double instead of an integer). You can provide an arbitrary number of hints for column data types using SQL schema specification syntax. The semantics for schema hints are the same as for the Auto Loader schema hints. For example:

SQL
SELECT
-- The JSON `{"a": 1}` will treat `a` as a BIGINT
from_json(data, NULL, map('schemaLocationKey', 'w', 'schemaHints', '')),
-- The JSON `{"a": 1}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'x', 'schemaHints', 'a STRING')),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a MAP<STRING, BIGINT>
from_json(data, NULL, map('schemaLocationKey', 'y', 'schemaHints', 'a MAP<STRING, BIGINT'>)),
-- The JSON `{"a": {"b": 1}}` will treat `a` as a STRING
from_json(data, NULL, map('schemaLocationKey', 'z', 'schemaHints', 'a STRING')),
FROM STREAM READ_FILES(...)

When the JSON string contains a top-level ARRAY, it is wrapped in a STRUCT. In these cases, schema hints are applied to the ARRAY schema instead of the wrapped STRUCT. For example, consider a JSON string with a top-level array such as:

[{"id": 123, "name": "John"}]

The inferred ARRAY schema is wrapped in a STRUCT:

STRUCT<value ARRAY<id LONG, name STRING>, _rescued_data STRING>

To change the data type of id, specify the schema hint as element.id STRING. To add a new column of type DOUBLE, specify element.new_col DOUBLE. Because of these hints, the schema for the top-level JSON array becomes:

struct<value array<id STRING, name STRING, new_col DOUBLE>, _rescued_data STRING>

Evolve the schema using schemaEvolutionMode

from_json detects the addition of new columns as it processes your data. When from_json detects a new field, it updates the inferred schema with the latest schema by merging new columns to the end of the schema. The data types of existing columns remain unchanged. After the schema update, the pipeline restarts automatically with the updated schema.

from_json supports the following modes for schema evolution, which you set using the optional schemaEvolutionMode setting. These modes are consistent with Auto Loader.

schemaEvolutionMode

Behavior on reading a new column

addNewColumns (default)

Stream fails. New columns are added to the schema. Existing columns do not evolve data types.

rescue

Schema is never evolved and stream does not fail due to schema changes. All new columns are recorded in the rescued data column.

failOnNewColumns

Stream fails. Stream does not restart unless the schemaHints are updated or the offending data is removed.

none

Does not evolve the schema, new columns are ignored, and data is not rescued unless the rescuedDataColumn option is set. Stream does not fail due to schema changes.

For example:

SQL
SELECT
-- If a new column appears, the pipeline will automatically add it to the schema:
from_json(a, NULL, map('schemaLocationKey', 'w', 'schemaEvolutionMode', 'addNewColumns')),
-- If a new column appears, the pipeline will add it to the rescued data column:
from_json(b, NULL, map('schemaLocationKey', 'x', 'schemaEvolutionMode', 'rescue')),
-- If a new column appears, the pipeline will ignore it:
from_json(c, NULL, map('schemaLocationKey', 'y', 'schemaEvolutionMode', 'none')),
-- If a new column appears, the pipeline will fail:
from_json(d, NULL, map('schemaLocationKey', 'z', 'schemaEvolutionMode', 'failOnNewColumns')),
FROM STREAM READ_FILES(...)

Rescued data column

A rescued data column is automatically added to your schema as _rescued_data. You can rename the column by setting the rescuedDataColumn option. For example:

from_json(jsonStr, None, {"schemaLocationKey": "keyX", "rescuedDataColumn": "my_rescued_data"})

When you choose to use the rescued data column, any columns that don’t match the inferred schema are rescued instead of dropped. This might happen because of a data type mismatch, a missing column in the schema, or a column name casing difference.

Handle corrupt records

To store records that are malformed and cannot be parsed, add a _corrupt_record column by setting schema hints, like in the following example:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL,
map('schemaLocationKey', 'nycTaxi',
'schemaHints', '_corrupt_record STRING',
'columnNameOfCorruptRecord', '_corrupt_record')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

To rename the corrupt record column, set the columnNameOfCorruptRecord option.

The JSON parser supports three modes for handling corrupt records:

Mode

Description

PERMISSIVE

For corrupted records, puts the malformed string into a field configured by columnNameOfCorruptRecord and sets malformed fields to null. To keep corrupt records, you can set a string type field named columnNameOfCorruptRecord in a user-defined schema. If a schema does not have the field, corrupt records are dropped during parsing. When inferring a schema, the parser implicitly adds a columnNameOfCorruptRecord field in the output schema.

DROPMALFORMED

Ignores corrupted records.

When you use DROPMALFORMED mode with rescuedDataColumn, data type mismatches do not cause records to be dropped. Only corrupt records are dropped, such as incomplete or malformed JSON.

FAILFAST

Throws an exception when the parser meets corrupted records.

When you use FAILFAST mode with with rescuedDataColumn, data type mismatches do not throw an error. Only corrupt records throw errors, such as incomplete or malformed JSON.

Refer to a field in the from_json output

from_json infers the schema during pipeline execution. If a downstream query refers to a from_json field before the from_json function has executed successfully at least one time, the field doesn't resolve and the query is skipped. In the following example, analysis for the silver table query will be skipped until the from_json function in the bronze query has executed and inferred the schema.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

CREATE STREAMING TABLE silver AS
SELECT jsonCol.VendorID, jsonCol.total_amount
FROM bronze

If the from_json function and the fields it infers are referred to in the same query, analysis might fail like in the following example:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

You can fix this by moving the reference to the from_json field into a downstream query (like the bronze/silver example above.) Alternatively, you can specify schemaHints that contain the referred from_json fields. For example:

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi', 'schemaHints', 'total_amount DOUBLE')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')
WHERE jsonCol.total_amount > 100.0

Examples: Automatically infer and evolve the schema

This section provides example code for enabling automatic schema inference and evolution using from_json in DLT.

Create a streaming table from cloud object storage

The following example uses read_files syntax to create a streaming table from cloud object storage.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
from_json(value, NULL, map('schemaLocationKey', 'nycTaxi')) jsonCol
FROM STREAM READ_FILES('/databricks-datasets/nyctaxi/sample/json/', format => 'text')

Create a streaming table from Kafka

The following example uses read_kafka syntax to create a streaming table from Kafka.

SQL
CREATE STREAMING TABLE bronze AS
SELECT
value,
from_json(value, NULL, map('schemaLocationKey', 'keyX')) jsonCol,
FROM READ_KAFKA(
bootstrapSevers => '<server:ip>',
subscribe => 'events',
"startingOffsets", "latest"
)

Examples: Fixed schema

For example code using from_json with a fixed schema, see from_json function.

FAQs

This section answers frequently asked questions about schema inference and evolution support in the from_json function.

What is the difference between from_json and parse_json?

The parse_json function returns a VARIANT value from the JSON string.

VARIANT provides a flexible and efficient way to store semi-structured data. This circumvents schema inference and evolution by doing away with strict types altogether. However, if you want to enforce a schema at write time (for example, because you have a relatively strict schema), from_json might be a better option.

The following table describes the differences between from_json and parse_json:

Function

Use cases

Availability

from_json

Schema evolution with from_json maintains the schema. This is helpful when:

  • You want to enforce your data schema (for example, reviewing every schema change before persisting it).
  • You want to optimize storage and require low query latency and cost.
  • You want to fail on data with mismatched types.
  • You want to extract partial results from corrupted JSON records and store the malformed record in the _corrupt_record column. In contrast, VARIANT ingestion returns an error for invalid JSON.

Available with schema inference & evolution only in DLT

parse_json

VARIANT is particularly well-suited to holding data that does not need to be schematized. For example:

  • You want to keep the data semi-structured because it’s flexible.
  • The schema changes too quickly to cast it into a schema without frequent stream failures and restarts.
  • You don’t want to fail on data with mismatched types. (VARIANT ingestion will always succeed for valid JSON records—even if there are type mismatches.)
  • Your users don’t want to deal with the rescued data column containing fields that don’t conform to the schema.

Available with and without DLT

Can I use from_json schema inference and evolution syntax outside of DLT?

No, you can't use from_json schema inference and evolution syntax outside of DLT.

How do I access the schema inferred by from_json?

View the schema of the target streaming table.

Can I pass from_json a schema and also do evolution?

No, you can't pass from_json a schema and also do evolution. However, you can provide schema hints to override some or all the fields inferred by from_json.

What happens to the schema if the table is fully refreshed?

The schema locations associated with the table are cleared, and the schema is re-inferred from scratch.