Ingest JSON data with Auto Loader

Note

Schema inference for JSON files is available in Databricks Runtime 8.2 and above.

Using Auto Loader to ingest JSON data into Delta Lake takes only a few lines of code. By leveraging Auto Loader, you get the following benefits:

  • Automatic discovery of new files to process: You don’t need to have special logic to handle late arriving data or keep track of which files have been processed yourself.
  • Scalable file discovery: Auto Loader can ingest billions of files without a hiccup.
  • Schema inference and evolution: Auto Loader can infer your data schema and detect schema drift on the fly. It can also evolve the schema to add new columns and restart the stream with the new schema automatically.
  • Data rescue: You can configure Auto Loader to rescue data that couldn’t be parsed from your JSON in a rescued data column that preserves the structure of your JSON record.

You can use the following code to run Auto Loader with schema inference and evolution capabilities on JSON files. You specify cloudFiles as the format to leverage Auto Loader. To ingest JSON files, specify json with the option cloudFiles.format. In the option cloudFiles.schemaLocation specify a directory that Auto Loader can use to persist the schema changes in your source data over time:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Schema inference and evolution

To infer the schema, Auto Loader uses a sample of data. By default, Auto Loader infers all top-level columns in your JSON data as string columns. Since JSON data is self describing and can support many data types, inferring the data as string can help avoid schema evolution issues such as numeric type mismatches (integers, longs, floats). If you want to retain the original Spark schema inference behavior, set the option cloudFiles.inferColumnTypes to true.

Learn more about schema inference and evolution with Auto Loader in Schema inference and evolution in Auto Loader.

Rescued data column

JSON data is a very popular format because it allows you to store arbitrary data without having to adhere to a specific schema. However, it can cause difficulties for data engineers. In particular, JSON data can have the following problems:

  • A column can appear with different data types in different records
  • A column can appear with different cases in different records (for example, as “foo” and “Foo”)
  • A new column can appear in a subset of records
  • There may not be a globally consistent schema across all records

To address the first three problems, use the rescued data column. The rescued data column contains any data that wasn’t parsed, because it was missing from the given schema, because there was a type mismatch, or because the casing of the column didn’t match. The rescued data column is part of the schema returned by Auto Loader as _rescued_data by default when the schema is being inferred. You can rename the column or include it in cases where you provide a schema by setting the option rescuedDataColumn.

Since the default value of cloudFiles.inferColumnTypes is false, and cloudFiles.schemaEvolutionMode is addNewColumns when the schema is being inferred, rescuedDataColumn captures only columns that have a different case than that in the schema.

The JSON parser supports three modes when parsing malformed records: PERMISSIVE (default), DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records, that is, incomplete or malformed JSON, will be dropped or will throw an error respectively. If you use badRecordsPath when parsing JSON, data type mismatches will not be considered as bad records when using the rescuedDataColumn. Only incomplete and malformed JSON records are stored in badRecordsPath.

Note

You can provide a rescued data column to all JSON parsers in Databricks Runtime by using the option rescuedDataColumn. For example, as an option to spark.read.json by using the DataFrameReader or the from_json function within a SELECT query.

/path/to/table/f0.json:
---------------------
{"name":"john","age":20,"lucky_number":4}
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescue") \
  .option("header", "true") \
  .schema("name string, age int") \
  .load("/path/to/table/")
+-------+------+--------------------------------------------+
| name  | age  |                 _rescue                    |
+-------+------+--------------------------------------------+
| john  | 20   | {                                          |
|       |      |    "lucky_number": 4,                      |
|       |      |    "_file_path": "/path/to/table/f0.json"  |
|       |      | }                                          |
+-------+------+--------------------------------------------+

Transform nested JSON data

Since Auto Loader infers the top level JSON columns as strings, you may be left with nested JSON objects that require further transformations. You can leverage the semi-structured data access APIs to further transform complex JSON content.

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<source_data_with_nested_json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<source_data_with_nested_json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )