Common data loading patterns
Auto Loader simplifies a number of common data ingestion tasks. This quick reference provides examples for several popular patterns.
Filtering directories or files using glob patterns
Glob patterns can be used for filtering directories and files when provided in the path.
Pattern |
Description |
---|---|
|
Matches any single character |
|
Matches zero or more characters |
|
Matches a single character from character set {a,b,c}. |
|
Matches a single character from the character range {a…z}. |
|
Matches a single character that is not from character set or range {a}.
Note that the |
|
Matches a string from the string set {ab, cd}. |
|
Matches a string from the string set {ab, cde, cfh}. |
Use the path
for providing prefix patterns, for example:
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", <format>) \
.schema(schema) \
.load("<base-path>/*/files")
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", <format>)
.schema(schema)
.load("<base-path>/*/files")
Important
You need to use the option pathGlobFilter
for explicitly providing suffix patterns. The path
only provides a prefix filter.
For example, if you would like to parse only png
files in a directory that contains files with different suffixes, you can do:
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("pathGlobfilter", "*.png") \
.load(<base-path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.option("pathGlobfilter", "*.png")
.load(<base-path>)
Note
The default globbing behavior of Auto Loader is different than the default behavior of other Spark file sources. Add .option("cloudFiles.useStrictGlobber", "true")
to your read to use globbing that matches default Spark behavior against file sources. See the following table for more on globbing:
Pattern |
File path |
Default globber |
Strict globber |
---|---|---|---|
/a/b |
/a/b/c/file.txt |
Yes |
Yes |
/a/b |
/a/b_dir/c/file.txt |
No |
No |
/a/b |
/a/b.txt |
No |
No |
/a/b/ |
/a/b.txt |
No |
No |
/a/*/c/ |
/a/b/c/file.txt |
Yes |
Yes |
/a/*/c/ |
/a/b/c/d/file.txt |
Yes |
Yes |
/a/*/c/ |
/a/b/x/y/c/file.txt |
Yes |
No |
/a/*/c |
/a/b/c_file.txt |
Yes |
No |
/a/*/c/ |
/a/b/c_file.txt |
Yes |
No |
/a/*/c/ |
/a/*/cookie/file.txt |
Yes |
No |
/a/b* |
/a/b.txt |
Yes |
Yes |
/a/b* |
/a/b/file.txt |
Yes |
Yes |
/a/{0.txt,1.txt} |
/a/0.txt |
Yes |
Yes |
/a/*/{0.txt,1.txt} |
/a/0.txt |
No |
No |
/a/b/[cde-h]/i/ |
/a/b/c/i/file.txt |
Yes |
Yes |
Enable easy ETL
An easy way to get your data into Delta Lake without losing any data is to use the following pattern and enabling schema inference with Auto Loader. Databricks recommends running the following code in a Databricks job for it to automatically restart your stream when the schema of your source data changes. By default, the schema is inferred as string types, any parsing errors (there should be none if everything remains as a string) will go to _rescued_data
, and any new columns will fail the stream and evolve the schema.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
.load("<path-to-source-data>") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Prevent data loss in well-structured data
When you know your schema, but want to know whenever you receive unexpected data, Databricks recommends using the rescuedDataColumn
.
spark.readStream.format("cloudFiles") \
.schema(expected_schema) \
.option("cloudFiles.format", "json") \
# will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.schema(expected_schema)
.option("cloudFiles.format", "json")
// will collect all new fields as well as data type mismatches in _rescued_data
.option("cloudFiles.schemaEvolutionMode", "rescue")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
If you want your stream to stop processing if a new field is introduced that doesn’t match your schema, you can add:
.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")
Enable flexible semi-structured data pipelines
When you’re receiving data from a vendor that introduces new columns to the information they provide, you may not be aware of exactly when they do it, or you may not have the bandwidth to update your data pipeline. You can now leverage schema evolution to restart the stream and let Auto Loader update the inferred schema automatically. You can also leverage schemaHints
for some of the “schemaless” fields that the vendor may be providing.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT") \
.load("/api/requests") \
.writeStream \
.option("mergeSchema", "true") \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// will ensure that the headers column gets processed as a map
.option("cloudFiles.schemaHints",
"headers map<string,string>, statusCode SHORT")
.load("/api/requests")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")
Transform nested JSON data
Because Auto Loader infers the top level JSON columns as strings, you can be left with nested JSON objects that require further transformations. You can use the semi-structured data access APIs to further transform complex JSON content.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.load("<source-data-with-nested-json>") \
.selectExpr(
"*",
"tags:page.name", # extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" # extracts {"tags":{"eventType":...}}
)
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.load("<source-data-with-nested-json>")
.selectExpr(
"*",
"tags:page.name", // extracts {"tags":{"page":{"name":...}}}
"tags:page.id::int", // extracts {"tags":{"page":{"id":...}}} and casts to int
"tags:eventType" // extracts {"tags":{"eventType":...}}
)
Infer nested JSON data
When you have nested data, you can use the cloudFiles.inferColumnTypes
option to infer the nested structure of your data and other column types.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
# The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("<source-data-with-nested-json>")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
// The schema location directory keeps track of your data schema over time
.option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
.option("cloudFiles.inferColumnTypes", "true")
.load("<source-data-with-nested-json>")
Load CSV files without headers
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Enforce a schema on CSV files with headers
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
.schema(<schema>) \ # provide a schema here for the files
.load(<path>)
val df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
.schema(<schema>) // provide a schema here for the files
.load(<path>)
Ingest image or binary data to Delta Lake for ML
Once the data is stored in Delta Lake, you can run distributed inference on the data. See Perform distributed inference using pandas UDF.
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.load("<path-to-source-data>") \
.writeStream \
.option("checkpointLocation", "<path-to-checkpoint>") \
.start("<path_to_target")
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("<path-to-source-data>")
.writeStream
.option("checkpointLocation", "<path-to-checkpoint>")
.start("<path_to_target")