Ingest data as semi-structured variant type

Preview

This feature is in Public Preview.

In Databricks Runtime 15.3 and above, you can use the VARIANT type to ingest semi-structured data. This article describes behavior and provides example patterns for ingesting data from cloud object storage using Auto Loader and COPY INTO, streaming records from Kafka, and SQL commands for creating new tables with variant data or inserting new records using the variant type.

See Query variant data.

Create a table with a variant column

VARIANT is a standard SQL type in Databricks Runtime 15.3 and above and supported by tables backed by Delta Lake. Managed tables on Databricks use Delta Lake by default, so you can create an empty table with a single VARIANT column using the following syntax:

CREATE TABLE table_name (variant_column VARIANT)

Alternately, you can use the PARSE_JSON function on a JSON string to use a CTAS statement to create a table with a variant column. The following example creates a table with two columns:

  • The id column extracted from the JSON string as a STRING type.

  • The variant_column column contains the entire JSON string encoded as VARIANT type.

CREATE TABLE table_name AS
  SELECT json_string:id AS id,
    PARSE_JSON(json_string) variant_column
  FROM source_data

Note

VARIANT columns cannot be used for clustering keys, partitions, or Z-order keys. Data stored with VARIANT type cannot be used for comparisons and ordering.

Databricks recommends extracting and storing fields as non-variant columns that you plan to use to accelerate queries and optimize storage layout.

Insert data using parse_json

If the target table already contains a column encoded as VARIANT, you can use parse_json to insert JSON string records as VARIANT, as in the following example:

INSERT INTO table_name (variant_column)
  SELECT PARSE_JSON(json_string)
  FROM source_data
from pyspark.sql.functions import col, parse_json

(spark.read
  .table("source_data")
  .select(parse_json(col("json_string")))
  .write
  .mode("append")
  .saveAsTable("table_name")
)

Ingest data from cloud object storage as variant

In Databricks Runtime 15.3 and above, you can use Auto Loader to load all data from JSON sources as a single VARIANT column in a target table. Because VARIANT is flexible to schema and type changes and maintains case sensitivity and NULL values present in the data source, this pattern is robust to most ingestion scenarios with the following caveats:

  • Malformed JSON records cannot be encoded using VARIANT type.

  • VARIANT type can only hold records up to 16mb in size.

Note

Variant treats overly large records records similar to corrupt records. In the default PERMISSIVE processing mode, overly large records are captured in the _malformed_data column alongside malformed JSON records.

Because all data from the JSON source is recorded as a single VARIANT column, no schema evolution occurs during ingestion. The following example assumes that the target table already exists with a single VARIANT column.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("singleVariantColumn", "variant_column")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

You can also specify VARIANT when defining a schema or passing schemaHints. The data in the referenced source field must contain a valid JSON string. The following examples demonstrate this syntax:

# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("name STRING, address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .schema("payload VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaHints", "address VARIANT")
  .load("/Volumes/catalog_name/schema_name/volume_name/path")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)

Use COPY INTO with variant

Databricks recommends using Auto Loader over COPY INTO when avaiable.

COPY INTO supports ingesting the entire contents of a JSON data source as a single column. The following example creates a new table with a single VARIANT column and then uses COPY INTO to ingest records from a JSON file source.

CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON
  FORMAT_OPTIONS ('singleVariantColumn' = 'name')

You can also define any field in a target table as VARIANT. When you run COPY INTO, the corresponding fields in the data source are ingested and cast to VARIANT type, as in the following examples:

-- Extracts the `address` field from the JSON record and casts to variant
CREATE TABLE table_name (address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

-- Extracts `name` and `address` from the JSON record and casts `address` to variant
CREATE TABLE table_name (name STRING, address VARIANT);
COPY INTO table_name
  FROM '/Volumes/catalog_name/schema_name/volume_name/path'
  FILEFORMAT = JSON

Stream Kafka data as variant

Many Kafka streams encode their payloads using JSON. Ingesting Kafka streams using VARIANT makes these workloads robust to schema changes.

The following example demonstrates reading a Kafka streaming source, casting the key as a STRING and the value as VARIANT, and writing out to a target table.

from pyspark.sql.functions import col, parse_json

(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", "earliest")
  .load()
  .select(
    col("key").cast("string"),
    parse_json(col("value").cast("string"))
  ).writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("table_name")
)