Ingest data as semi-structured variant type
Preview
This feature is in Public Preview.
In Databricks Runtime 15.3 and above, you can use the VARIANT
type to ingest semi-structured data. This article describes behavior and provides example patterns for ingesting data from cloud object storage using Auto Loader and COPY INTO
, streaming records from Kafka, and SQL commands for creating new tables with variant data or inserting new records using the variant type.
See Query variant data.
Create a table with a variant column
VARIANT
is a standard SQL type in Databricks Runtime 15.3 and above and supported by tables backed by Delta Lake. Managed tables on Databricks use Delta Lake by default, so you can create an empty table with a single VARIANT
column using the following syntax:
CREATE TABLE table_name (variant_column VARIANT)
Alternately, you can use the PARSE_JSON
function on a JSON string to use a CTAS statement to create a table with a variant column. The following example creates a table with two columns:
The
id
column extracted from the JSON string as aSTRING
type.The
variant_column
column contains the entire JSON string encoded asVARIANT
type.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Note
Databricks recommends extracting and storing fields as non-variant columns that you plan to use to accelerate queries and optimize storage layout.
VARIANT
columns cannot be used for clustering keys, partitions, or Z-order keys. The VARIANT
data type cannot be used for comparisons, grouping, ordering, and set operations. For a full list of limitations, see Limitations.
Insert data using parse_json
If the target table already contains a column encoded as VARIANT
, you can use parse_json
to insert JSON string records as VARIANT
, as in the following example:
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Ingest data from cloud object storage as variant
In Databricks Runtime 15.3 and above, you can use Auto Loader to load all data from JSON sources as a single VARIANT
column in a target table. Because VARIANT
is flexible to schema and type changes and maintains case sensitivity and NULL
values present in the data source, this pattern is robust to most ingestion scenarios with the following caveats:
Malformed JSON records cannot be encoded using
VARIANT
type.VARIANT
type can only hold records up to 16mb in size.
Note
Variant treats overly large records records similar to corrupt records. In the default PERMISSIVE
processing mode, overly large records are captured in the _malformed_data
column alongside malformed JSON records.
Because all data from the JSON source is recorded as a single VARIANT
column, no schema evolution occurs during ingestion and rescuedDataColumn
is not supported. The following example assumes that the target table already exists with a single VARIANT
column.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
You can also specify VARIANT
when defining a schema or passing schemaHints
. The data in the referenced source field must contain a valid JSON string. The following examples demonstrate this syntax:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Use COPY INTO
with variant
Databricks recommends using Auto Loader over COPY INTO
when available.
COPY INTO
supports ingesting the entire contents of a JSON data source as a single column. The following example creates a new table with a single VARIANT
column and then uses COPY INTO
to ingest records from a JSON file source.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
You can also define any field in a target table as VARIANT
. When you run COPY INTO
, the corresponding fields in the data source are ingested and cast to VARIANT
type, as in the following examples:
-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
Stream Kafka data as variant
Many Kafka streams encode their payloads using JSON. Ingesting Kafka streams using VARIANT
makes these workloads robust to schema changes.
The following example demonstrates reading a Kafka streaming source, casting the key
as a STRING
and the value
as VARIANT
, and writing out to a target table.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)