Ingest CSV data with Auto Loader

Note

Schema inference for CSV files is available in Databricks Runtime 8.3 and above.

Using Auto Loader to ingest CSV data into Delta Lake takes only a few lines of code. By leveraging Auto Loader, you get the following benefits:

  • Automatic discovery of new files to process: You don’t need to have special logic to handle late arriving data or keep track of which files have been processed yourself.
  • Scalable file discovery: Auto Loader can ingest billions of files.
  • Schema inference and evolution: Auto Loader can infer your data schema and detect schema drift on the fly. It can also evolve the schema to add new columns and restart the stream with the new schema automatically.
  • Data rescue: You can configure Auto Loader to rescue data that couldn’t be parsed from your CSV files in a rescued data column.

You can use the following code to run Auto Loader with schema inference and evolution capabilities on CSV files. You specify cloudFiles as the format to leverage Auto Loader. You then specify csv with the option cloudFiles.format. In the option cloudFiles.schemaLocation specify a directory that Auto Loader can use to persist the schema changes in your source data over time:

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path-to-target")
spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path-to-target")

Auto Loader provides additional functionality to help make ingesting CSV data easier. In this section, we describe some of the behavior differences between the Apache Spark built-in CSV parser and Auto Loader.

Schema inference

To infer the schema, Auto Loader uses a sample of data. When inferring schema for CSV data, Auto Loader assumes that the files contain headers. If your CSV files do not contain headers, provide the option .option("header", "false"). In addition, Auto Loader merges the schemas of all the files in the sample to come up with a global schema. Auto Loader can then read each file according to its header and parse the CSV correctly. This is different behavior than the Apache Spark in-built CSV parser. The following example demonstrates the differences in behavior:

f0.csv:
-------
name,age,lucky_number
john,20,4

f1.csv:
-------
age,lucky_number,name
25,7,nadia

f2.csv:
-------
height,lucky_number
1.81,five

Apache Spark behavior:

+-------+------+--------------+
| name  | age  | lucky_number | <-- uses just the first file to infer schema
+-------+------+--------------+
| john  | 20   | 4            |
| 25    | 7    | nadia        | <-- all files are assumed to have the same schema
| 5.21  | five | null         |
+-------+------+--------------+

Auto Loader behavior:

+-------+------+--------------+--------+
| name  | age  | lucky_number | height | <-- schema is merged across files
+-------+------+--------------+--------+
| john  | 20   | 4            | null   |
| nadia | 25   | 7            | null   | <-- columns are parsed according to order specified in header
| null  | null | five         | 1.81   | <-- lucky_number's data type will be relaxed to a string
+-------+------+--------------+--------+

Note

To get the same schema inference and parsing semantics with the CSV reader in Databricks Runtime, you can use spark.read.option("mergeSchema", "true").csv(<path>)

By default, Auto Loader infers columns in your CSV data as string columns. Since CSV data can support many data types, inferring the data as string can help avoid schema evolution issues such as numeric type mismatches (integers, longs, floats). If you want to infer specific column types, set the option cloudFiles.inferColumnTypes to true. You don’t need to set inferSchema to true if you set cloudFiles.inferColumnTypes as true.

Note

Unless case sensitivity is enabled, the columns foo, Foo, and FOO are considered the same column. The selection of which case the column will be represented in is arbitrary and depends on the sampled data. You can use schema hints to enforce which case should be used. In addition, similar to the JSON parser, the parsing of the records will be case sensitive by default. For example, if you define the column as foo in your schema, a column that appears as FOO will be rescued instead of being parsed as foo.

Learn more about schema inference and evolution with Auto Loader in Schema inference and evolution in Auto Loader.

Rescued data column

The rescued data column contains any data that wasn’t parsed, because it was missing from the given schema, because there was a type mismatch, or because the casing of the column didn’t match. The rescued data column is part of the schema returned by Auto Loader as “rescueddata” by default when the schema is being inferred. You can rename the column or include it in cases where you provide a schema by setting the option “rescuedDataColumn”.

With CSV data, the additional cases can also be rescued:

  • Columns with an empty string or null value will be rescued as _c${index}, where index represents the zero-based ordinal of the column in the file.
    • If the same column name appears with exact same casing multiple times in the header, the duplicate instances (each instance except the first) of the column will be rescued as ${columnName}_${index}, where index represents the zero-based ordinal of the column in the file.

Note

You can provide a rescued data column to all CSV parsers in Databricks Runtime by using the option rescuedDataColumn. For example, as an option to spark.read.csv by using the DataFrameReader or the from_csv function within a SELECT query.

/path/to/table/f0.csv:
---------------------
name,age,lucky_number
john,20,4
spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescue") \
  .option("header", "true") \
  .schema("name string, age int") \
  .load("/path/to/table/")
+-------+------+------------------------------------------+
| name  | age  |                _rescue                   |
+-------+------+------------------------------------------+
| john  | 20   | {                                        |
|       |      |    "lucky_number": 4,                    |
|       |      |    "_file_path": "/path/to/table/f0.csv" |
|       |      | }                                        |
+-------+------+------------------------------------------+

Malformed record behavior

The CSV parser supports three modes when parsing malformed records: PERMISSIVE (default), DROPMALFORMED, and FAILFAST. When used together with rescuedDataColumn, data type mismatches do not cause records to be dropped in DROPMALFORMED mode or throw an error in FAILFAST mode. Only corrupt records, that is, incomplete or malformed CSV, will be dropped or will throw an error respectively. If you use badRecordsPath when parsing CSV, data type mismatches will not be considered as bad records when using the rescuedDataColumn. Only incomplete and malformed CSV records will be stored in badRecordsPath.

Auto Loader will treat:

  • Any row that doesn’t have the same amount of tokens as the header if headers are enabled
  • Any row that doesn’t have the same amount of tokens as the data schema if headers are disabled

as a malformed record. For files that contain records spanning multiple lines, you can set the option multiline to true. You may also need to configure quote ('"' by default) and escape ('\' by default) to get your desired parsing behavior.