Ingest data as semi-structured variant type
This feature is in Public Preview.
In Databricks Runtime 15.3 and above, you can use the VARIANT
type to ingest semi-structured data. This article describes behavior and provides example patterns for ingesting data from cloud object storage using Auto Loader and COPY INTO
, streaming records from Kafka, and SQL commands for creating new tables with variant data or inserting new records using the variant type. The following table summarizes the supported file formats and Databricks Runtime version support:
File format | Supported Databricks Runtime version |
---|---|
JSON | 15.3 and above |
XML | 16.4 and above |
See Query variant data.
Create a table with a variant column
VARIANT
is a standard SQL type in Databricks Runtime 15.3 and above and supported by tables backed by Delta Lake. Managed tables on Databricks use Delta Lake by default, so you can create an empty table with a single VARIANT
column using the following syntax:
CREATE TABLE table_name (variant_column VARIANT)
Alternately, you can use the PARSE_JSON
function on a JSON string or the FROM_XML
function on an XML string to use a CTAS statement to create a table with a variant column. The following example creates a table with two columns:
- The
id
column extracted from the JSON string as aSTRING
type. - The
variant_column
column contains the entire JSON string encoded asVARIANT
type.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Databricks recommends extracting and storing fields as non-variant columns that you plan to use to accelerate queries and optimize storage layout.
VARIANT
columns cannot be used for clustering keys, partitions, or Z-order keys. The VARIANT
data type cannot be used for comparisons, grouping, ordering, and set operations. For a full list of limitations, see Limitations.
Insert data using parse_json
If the target table already contains a column encoded as VARIANT
, you can use parse_json
to insert JSON string records as VARIANT
, as in the following example:
- SQL
- Python
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Insert data using from_xml
If the target table already contains a column encoded as VARIANT
, you can use from_xml
to insert XML string records as VARIANT
. For example:
- SQL
- Python
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Ingest data from cloud object storage as variant
Auto Loader can be used to load all data from the supported file sources as a single VARIANT
column in a target table. Because VARIANT
is flexible to schema and type changes and maintains case sensitivity and NULL
values present in the data source, this pattern is robust to most ingestion scenarios with the following caveats:
- Malformed records cannot be encoded using
VARIANT
type. VARIANT
type can only hold records up to 16mb in size.
Variant treats overly large records similar to corrupt records. In the default PERMISSIVE
processing mode, overly large records are captured in the corruptRecordColumn
.
Because the entire record is recorded as a single VARIANT
column, no schema evolution occurs during ingestion and rescuedDataColumn
is not supported. The following example assumes that the target table already exists with a single VARIANT
column.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
You can also specify VARIANT
when defining a schema or passing schemaHints
. The data in the referenced source field must contain a valid record. The following examples demonstrate this syntax:
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Use COPY INTO
with variant
Databricks recommends using Auto Loader over COPY INTO
when available.
COPY INTO
supports ingesting the entire contents of a supported data source as a single column. The following example creates a new table with a single VARIANT
column and then uses COPY INTO
to ingest records from a JSON file source.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FORMAT_OPTIONS ('singleVariantColumn' = 'name')
Stream Kafka data as variant
Many Kafka streams encode their payloads using JSON. Ingesting Kafka streams using VARIANT
makes these workloads robust to schema changes.
The following example demonstrates reading a Kafka streaming source, casting the key
as a STRING
and the value
as VARIANT
, and writing out to a target table.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)