Skip to main content

Spark API options reference

This page lists available input and output options for Spark APIs that read and write data.

DataFrameReader options

Use these options with DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO, and Auto Loader to control how Databricks reads data files.

Example

The following example sets multiLine to True for reading JSON files:

Python
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")

Common

The following options apply to all file formats.

Key

Default

Valid values

Description

ignoreCorruptFiles

false

true, false

Whether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned. For COPY INTO, you can observe skipped corrupt files as numSkippedCorruptFiles in the operationMetrics column of the Delta Lake history. Available in Databricks Runtime 11.3 LTS and above.

ignoreMissingFiles

false for Auto Loader, true for COPY INTO (legacy)

true, false

Whether to ignore missing files. If true, the Spark jobs continue to run when encountering missing files and the contents are still returned. Available in Databricks Runtime 11.3 LTS and above.

modifiedAfter

None

A timestamp string

An optional timestamp as a filter to only ingest files that have a modification timestamp after the specified timestamp.

modifiedBefore

None

A timestamp string

An optional timestamp as a filter to only ingest files that have a modification timestamp before the specified timestamp.

pathGlobFilter or fileNamePattern

None

A glob pattern string

A potential glob pattern for choosing files. Equivalent to PATTERN in COPY INTO (legacy). fileNamePattern can be used in read_files.

recursiveFileLookup

false

true, false

When true, this option searches through nested directories even if their names do not follow a partition naming scheme like date=2019-07-01.

Avro

The following options apply when reading Avro files.

Key

Default

Valid values

Description

avroSchema

None

An Avro schema string

Optional schema specified by a user in Avro format. When reading Avro, this option can be set to an evolved schema that is compatible but different from the actual Avro schema. The deserialization schema is consistent with the evolved schema. For example, if you set an evolved schema containing one additional column with a default value, the read result contains the new column too.

avroSchemaEvolutionMode

none

none, restart

How to handle schema evolution when using a schema registry. none ignores schema changes and continues the job. restart raises an UnknownFieldException when schema changes are detected and requires a job restart.

datetimeRebaseMode

LEGACY

EXCEPTION, LEGACY, CORRECTED

Controls the rebasing of the DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars.

enableStableIdentifiersForUnionType

false

true, false

Whether to use stable field names for Avro Union types. When enabled, union type field names are derived from their type names in lowercase (for example, member_int, member_string). Throws an exception if two type names are identical after lowercasing.

mergeSchema

false

true, false

Whether to infer the schema across multiple files and to merge the schema of each file. mergeSchema for Avro does not relax data types.

mode

FAILFAST

FAILFAST, PERMISSIVE, DROPMALFORMED

Parser mode for handling corrupt records. FAILFAST throws an exception. PERMISSIVE sets malformed fields to null. DROPMALFORMED silently drops bad records.

readerCaseSensitive

true

true, false

Specifies the case sensitivity behavior when rescuedDataColumn is enabled. If true, rescue the data columns whose names differ by case from the schema. When false, read the data in a case-insensitive manner.

recursiveFieldMaxDepth

None

0 to 15

The maximum recursion depth for recursive Avro fields. Set to 1 to truncate all recursive fields, 2 to allow one level of recursion, and so on up to 15. When unset or 0, recursive fields are not permitted.

rescuedDataColumn

None

A column name string

Whether to collect all data that can't be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader.

COPY INTO (legacy) does not support the rescued data column because you cannot manually set the schema using COPY INTO. Databricks recommends using Auto Loader for most ingestion scenarios.

For more details refer to What is the rescued data column?.

stableIdentifierPrefixForUnionType

member_

Any string

The prefix to use for stable union type field names when enableStableIdentifiersForUnionType=true.

CSV

The following options apply when reading CSV files.

Key

Default

Valid values

Description

badRecordsPath

None

A path string

The path to store files for recording the information about bad CSV records.

charToEscapeQuoteEscaping

\0

A single character

The character used to escape the character used for escaping quotes. For example, for the following record: [ " a\\", b ]:

  • If the character to escape the '\' is undefined, the record won't be parsed. The parser will read characters: [a],[\],["],[,],[ ],[b] and throw an error because it cannot find a closing quote.
  • If the character to escape the '\' is defined as '\', the record will be read with 2 values: [a\] and [b].

columnNameOfCorruptRecord

_corrupt_record

A column name string

Supported for Auto Loader. Not supported for COPY INTO (legacy). The column for storing records that are malformed and cannot be parsed. If the mode for parsing is set as DROPMALFORMED, this column will be empty.

comment

\0

A single character

Defines the character that represents a line comment when found in the beginning of a line of text. Use '\0' to disable comment skipping.

dateFormat

yyyy-MM-dd

A date format string

The format for parsing date strings.

emptyValue

Empty string

Any string

String representation of an empty value.

enableDateTimeParsingFallback

false

true, false

Whether to fall back to the legacy date and timestamp parsing behavior when a value cannot be parsed with the specified format. When false, parsing failures raise an error or produce null depending on mode.

encoding or charset

UTF-8

A java.nio.charset.Charset name

The name of the encoding of the CSV files. See java.nio.charset.Charset for the list of options. UTF-16 and UTF-32 cannot be used when multiline is true.

enforceSchema

true

true, false

Whether to forcibly apply the specified or inferred schema to the CSV files. If the option is enabled, headers of CSV files are ignored. This option is ignored by default when using Auto Loader to rescue data and allow schema evolution.

escape

\

A single character

The escape character to use when parsing the data.

extension

csv

A file extension string

The expected filename extension for reads. Files without this extension are filtered out.

failOnUnknownFields

false

true, false

Whether to fail when the CSV record contains columns not present in the schema. When false, unrecognized columns are silently dropped or rescued depending on rescuedDataColumn.

failOnWidenedFields

false

true, false

Whether to fail when a field value cannot be parsed as the declared schema type without widening. When false, type-widened values are silently rescued depending on rescuedDataColumn. Setting failOnUnknownFields=true can mask the effects of this option.

header

false

true, false

Whether the CSV files contain a header. Auto Loader assumes that files have headers when inferring the schema.

ignoreLeadingWhiteSpace

false

true, false

Whether to ignore leading whitespaces for each parsed value.

ignoreTrailingWhiteSpace

false

true, false

Whether to ignore trailing whitespaces for each parsed value.

inferSchema

false

true, false

Whether to infer the data types of the parsed CSV records or to assume all columns are of StringType. Requires an additional pass over the data if set to true. For Auto Loader, use cloudFiles.inferColumnTypes instead.

inputBufferSize

1048576 (1 MB)

Positive integers

The buffer size in bytes for the CSV parser. Useful for tuning memory usage when parsing large CSV files.

lineSep

None, which covers \r, \r\n, and \n

A string

A string between two consecutive CSV records.

locale

US

A java.util.Locale identifier

A Java locale identified that affects default date, timestamp, and decimal parsing within the CSV.

maxCharsPerColumn

-1

Positive integers, or -1 for unlimited

Maximum number of characters expected from a value to parse. Can be used to avoid memory errors. Defaults to -1, which means unlimited.

maxColumns

20480

Positive integers

The hard limit of how many columns a record can have.

mergeSchema

false

true, false

Whether to infer the schema across multiple files and to merge the schema of each file. Enabled by default for Auto Loader when inferring the schema.

mode

PERMISSIVE

PERMISSIVE, DROPMALFORMED, FAILFAST

Parser mode around handling malformed records.

multiLine

false

true, false

Whether the CSV records span multiple lines.

nanValue

NaN

Any string

The string representation of a non-a-number value when parsing FloatType and DoubleType columns.

negativeInf

-Inf

Any string

The string representation of negative infinity when parsing FloatType or DoubleType columns.

nullValue

Empty string

Any string

String representation of a null value.

parserCaseSensitive (deprecated)

false

true, false

While reading files, whether to align columns declared in the header with the schema case sensitively. This is true by default for Auto Loader. Columns that differ by case will be rescued in the rescuedDataColumn if enabled. This option has been deprecated in favor of readerCaseSensitive.

positiveInf

Inf

Any string

The string representation of positive infinity when parsing FloatType or DoubleType columns.

preferDate

true

true, false

Attempts to infer strings as dates instead of timestamp when possible. You must also use schema inference, either by enabling inferSchema or using cloudFiles.inferColumnTypes with Auto Loader.

quote

"

A single character

The character used for escaping values where the field delimiter is part of the value.

readerCaseSensitive

true

true, false

Specifies the case sensitivity behavior when rescuedDataColumn is enabled. If true, rescue the data columns whose names differ by case from the schema. When false, read the data in a case-insensitive manner.

rescuedDataColumn

None

A column name string

Whether to collect all data that can't be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details refer to What is the rescued data column?.

COPY INTO (legacy) does not support the rescued data column because you cannot manually set the schema using COPY INTO. Databricks recommends using Auto Loader for most ingestion scenarios.

sep or delimiter

,

A string

The separator string between columns.

singleVariantColumn

None

A column name string

When set to a column name, reads the entire CSV record into a single VariantType column with that name instead of parsing each field into its own column. Requires header=true.

skipRows

0

Positive integers or 0

The number of rows from the beginning of the CSV file that should be ignored, including commented and empty rows. If header is true, the header will be the first unskipped and uncommented row.

timeFormat

HH:mm:ss

A time format string

The format for parsing TimeType column values.

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

A timestamp format string

The format for parsing timestamp strings.

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

The format for parsing timestamp without timezone (TimestampNTZType) strings.

timeZone

None

A java.time.ZoneId string

The java.time.ZoneId to use when parsing timestamps and dates.

unescapedQuoteHandling

STOP_AT_DELIMITER

STOP_AT_CLOSING_QUOTE, BACK_TO_DELIMITER, STOP_AT_DELIMITER, SKIP_VALUE, RAISE_ERROR

The strategy for handling unescaped quotes. The behavior of each allowed option is as follows:

  • STOP_AT_CLOSING_QUOTE: If unescaped quotes are found in the input, accumulate the quote character and proceed parsing the value as a quoted value, until a closing quote is found.
  • BACK_TO_DELIMITER: If unescaped quotes are found in the input, consider the value as an unquoted value. This will make the parser accumulate all characters of the current parsed value until the delimiter defined by sep is found. If no delimiter is found in the value, the parser will continue accumulating characters from the input until a delimiter or line ending is found.
  • STOP_AT_DELIMITER: If unescaped quotes are found in the input, consider the value as an unquoted value. This will make the parser accumulate all characters until the delimiter defined by sep, or a line ending is found in the input.
  • SKIP_VALUE: If unescaped quotes are found in the input, the content parsed for the given value will be skipped (until the next delimiter is found) and the value set in nullValue will be produced instead.
  • RAISE_ERROR: If unescaped quotes are found in the input, a TextParsingException will be thrown.

Excel

The following options apply when reading Excel files.

Key

Default

Valid values

Description

dataAddress

None

A cell range or sheet name string

The cell range to read in Excel syntax. If omitted, reads all valid cells from the first sheet. Use SheetName!C5:H10 to read a range from a named sheet, C5:H10 to read a range from the first sheet, or SheetName to read all data from a specific sheet.

headerRows

0

0, 1

Number of initial rows to use as column name headers. When dataAddress is specified, this applies within the cell range. When 0, column names are auto-generated as _c1, _c2, _c3, etc.

ignoreMissingSheet

false

true, false

Whether to silently skip files that do not contain the sheet specified by dataAddress. When false, an error is thrown if a file is missing the requested sheet. Only applies when a sheet name is specified in dataAddress.

includePhoneticRuns

false

true, false

Whether to include phonetic annotations (such as pinyin or furigana) concatenated to cell string values when reading XLSX files.

operation

readSheet

readSheet, listSheets

The operation to perform on the Excel workbook. readSheet reads data from a sheet. listSheets returns a struct with fields sheetIndex: long and sheetName: String for each sheet.

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

Custom format string for timestamp-without-timezone values stored as strings in Excel. Custom date formats follow the formats at Datetime patterns.

dateFormat

yyyy-MM-dd

A date format string

Custom format string for string values read as Date. Custom date formats follow the formats at Datetime patterns.

JSON

The following options apply when reading JSON files.

Key

Default

Valid values

Description

allowBackslashEscapingAnyCharacter

false

true, false

Whether to allow backslashes to escape any character that succeeds it. If not enabled, only characters that are explicitly listed by the JSON specification can be escaped.

allowComments

false

true, false

Whether to allow the use of Java, C, and C++ style comments ('/', '*', and '//' varieties) within parsed content or not.

allowNonNumericNumbers

true

true, false

Whether to allow the set of not-a-number (NaN) tokens as legal floating number values.

allowNumericLeadingZeros

false

true, false

Whether to allow integral numbers to start with additional (ignorable) zeroes (for example, 000001).

allowSingleQuotes

true

true, false

Whether to allow use of single quotes (apostrophe, character '\') for quoting strings (names and String values).

allowUnquotedControlChars

false

true, false

Whether to allow JSON strings to contain unescaped control characters (ASCII characters with value less than 32, including tab and line feed characters) or not.

allowUnquotedFieldNames

false

true, false

Whether to allow use of unquoted field names, which are allowed by JavaScript, but not by the JSON specification.

alternateVariantEncoding

None

Z85

The encoding used for Variant values in the source JSON. Set to Z85 to decode Variant values that have been Base85-encoded instead of stored as inline JSON.

badRecordsPath

None

A path string

The path to store files for recording the information about bad JSON records.

Using the badRecordsPath option in a file-based data source has the following limitations:

  • It is non-transactional and can lead to inconsistent results.
  • Transient errors are treated as failures.

columnNameOfCorruptRecord

_corrupt_record

A column name string

The column for storing records that are malformed and cannot be parsed. If the mode for parsing is set as DROPMALFORMED, this column will be empty.

dateFormat

yyyy-MM-dd

A date format string

The format for parsing date strings.

dropFieldIfAllNull

false

true, false

Whether to ignore columns of all null values or empty arrays and structs during schema inference.

encoding or charset

UTF-8

A java.nio.charset.Charset name

The name of the encoding of the JSON files. See java.nio.charset.Charset for list of options. You cannot use UTF-16 and UTF-32 when multiline is true.

inferTimestamp

false

true, false

Whether to try and infer timestamp strings as a TimestampType. When set to true, schema inference might take noticeably longer. You must enable cloudFiles.inferColumnTypes to use with Auto Loader.

lineSep

None, which covers \r, \r\n, and \n

A string

A string between two consecutive JSON records.

locale

US

A java.util.Locale identifier

A Java locale identifier that affects default date, timestamp, and decimal parsing within the JSON.

maxNestingDepth

500

Positive integers

The maximum allowed nesting depth for JSON objects and arrays. Increase this value for deeply nested documents.

maxNumLen

1000

Positive integers

The maximum length of number tokens in the JSON input. Increase this value for JSON with large numeric literals.

maxStringLen

unlimited

Positive integers

The maximum length of string values in the JSON input. Set to limit memory usage when parsing JSON with large strings.

mode

PERMISSIVE

PERMISSIVE, DROPMALFORMED, FAILFAST

Parser mode around handling malformed records.

multiLine

false

true, false

Whether the JSON records span multiple lines.

prefersDecimal

false

true, false

Attempts to infer strings as DecimalType instead of float or double type when possible. You must also use schema inference, either by enabling inferSchema or using cloudFiles.inferColumnTypes with Auto Loader.

primitivesAsString

false

true, false

Whether to infer primitive types like numbers and booleans as StringType.

readerCaseSensitive

true

true, false

Specifies the case sensitivity behavior when rescuedDataColumn is enabled. If true, rescue the data columns whose names differ by case from the schema. When false, read the data in a case-insensitive manner. Available in Databricks Runtime 13.3 and above.

rescuedDataColumn

None

A column name string

Whether to collect all data that can't be parsed due to a data type mismatch or schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details, refer to What is the rescued data column?.

COPY INTO (legacy) does not support the rescued data column because you cannot manually set the schema using COPY INTO. Databricks recommends using Auto Loader for most ingestion scenarios.

singleVariantColumn

None

A column name string

Whether to ingest the entire JSON document, parsed into a single Variant column with the specified string as the column's name. If not set, the JSON fields are ingested into their own columns.

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

A timestamp format string

The format for parsing timestamp strings.

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

The format for parsing timestamp without timezone (TimestampNTZType) strings.

timeZone

None

A java.time.ZoneId string

The java.time.ZoneId to use when parsing timestamps and dates.

upgradeExceptionAsBadRecord

false

true, false

Whether to treat type upgrade exceptions (for example, when a value can't be widened to the declared column type) as bad records rather than throwing an exception.

Kafka

For the full list of Kafka reader options, see DataStreamReader Kafka options. The following options apply only to batch reads using spark.read.format("kafka").

Key

Default

Valid values

Description

endingOffsets

latest

latest, or a JSON offset string

Where to stop reading. In the JSON string, -1 is the latest offset. -2, which is the earliest offset, is not allowed as an ending offset. This is an example JSON offset string: {"topicA":{"0":50,"1":-1}}.

endingOffsetsByTimestamp

None

A JSON timestamp string

Per-partition ending offsets specified as timestamps in milliseconds. For example: {"topicA":{"0":2000,"1":3000}}.

endingTimestamp

None

Positive integers or 0

Global ending timestamp in milliseconds applied to all partitions.

ORC

The following options apply when reading ORC files.

Key

Default

Valid values

Description

mergeSchema

false

true, false

Whether to infer the schema across multiple files and to merge the schema of each file.

Parquet

The following options apply when reading Parquet files.

Key

Default

Valid values

Description

datetimeRebaseMode

LEGACY

EXCEPTION, LEGACY, CORRECTED

Controls the rebasing of the DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars.

int96RebaseMode

LEGACY

EXCEPTION, LEGACY, CORRECTED

Controls the rebasing of the INT96 timestamp values between Julian and Proleptic Gregorian calendars.

mergeSchema

false

true, false

Whether to infer the schema across multiple files and to merge the schema of each file.

readerCaseSensitive

true

true, false

Specifies the case sensitivity behavior when rescuedDataColumn is enabled. If true, rescue the data columns whose names differ by case from the schema. When false, read the data in a case-insensitive manner.

rescuedDataColumn

None

A column name string

Whether to collect all data that can't be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details refer to What is the rescued data column?.

COPY INTO (legacy) does not support the rescued data column because you cannot manually set the schema using COPY INTO. Databricks recommends using Auto Loader for most ingestion scenarios.

State store

Use these options with spark.read.format("statestore") or the read_statestore table-valued function to read Structured Streaming state data. See Read Structured Streaming state information.

Key

Default

Valid values

Description

batchId

Latest batch ID

Positive integers or 0

The target batch to read from. Use to query an earlier state of the query. The batch must be committed but not yet cleaned up.

operatorId

0

Positive integers or 0

The target operator to read from. Use when the query has multiple stateful operators.

storeName

DEFAULT

Any string

The target state store name to read from. Use when the stateful operator has multiple state store instances. You must specify either storeName or joinSide for a stream-stream join, but not both.

joinSide

None

left, right

The target side to read from for a stream-stream join. You must specify either storeName or joinSide for a stream-stream join, but not both.

snapshotStartBatchId

None

Positive integers or 0

The batch ID of the snapshot to use as the starting point when reading state. The reader rebuilds state by replaying changes from this snapshot until batchId. Useful when a snapshot is corrupted. Must specify together with snapshotPartitionId. Cannot use with readChangeFeed. Supports the HDFS-backed state store and the RocksDB state store with changelog checkpointing enabled. Available in Databricks Runtime 15.4 LTS and above.

snapshotPartitionId

None

Positive integers or 0

If specified, the query only reads this partition. Must specify together with snapshotStartBatchId. Cannot use with readChangeFeed. Available in Databricks Runtime 15.4 LTS and above.

readChangeFeed

false

true, false

When true, returns state changes across a specified range of batches between changeStartBatchId and changeEndBatchId. Requires changeStartBatchId. Cannot use with joinSide, batchId, snapshotStartBatchId, or snapshotPartitionId. Available in Databricks Runtime 16.4 LTS and above.

For details, see Read Structured Streaming state changes.

changeStartBatchId

None

Positive integers or 0

The starting batch ID for the change feed range. Required when readChangeFeed is true. Only applies when readChangeFeed is set to true. Available in Databricks Runtime 16.4 LTS and above.

changeEndBatchId

Latest batch ID

Positive integers or 0

The ending batch ID for the change feed range. Must be greater than or equal to changeStartBatchId. Only applies when readChangeFeed is set to true. Available in Databricks Runtime 16.4 LTS and above.

stateVarName

None

Any string

The state variable name to read. The state variable name is the unique name of each variable within the init function of a StatefulProcessor used by the transformWithState operator. Required when you use the transformWithState operator. Available in Databricks Runtime 16.4 LTS and above.

readRegisteredTimers

false

true, false

When true, reads registered timers used by the transformWithState operator. Only applies to the transformWithState operator. Available in Databricks Runtime 16.4 LTS and above.

flattenCollectionTypes

true

true, false

When true, flattens the records returned for map and list state variables. When false, returns records as a Spark SQL Array or Map. Only applies to the transformWithState operator. Available in Databricks Runtime 16.4 LTS and above.

Text

The following options apply when reading text files.

Key

Default

Valid values

Description

encoding

UTF-8

A java.nio.charset.Charset name

The name of the encoding of the TEXT file line separator. The content of the file is not affected by this option and is read as-is.

lineSep

None, which covers \r, \r\n and \n

A string

A string between two consecutive TEXT records.

wholeText

false

true, false

Whether to read a file as a single record.

XML

The following options apply when reading XML files.

Key

Default

Valid values

Description

rowTag

None

Any string

The row tag of the XML files to treat as a row. In the example XML <book> <page><page>...<book>, the appropriate value is page. This is a required option.

samplingRatio

1.0

0.0 to 1.0

Defines a fraction of rows used for schema inference. XML built-in functions ignore this option.

excludeAttribute

false

true, false

Whether to exclude attributes in elements.

mode

None

PERMISSIVE, DROPMALFORMED, FAILFAST

Mode for dealing with corrupt records during parsing.

  • PERMISSIVE: For corrupted records, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets malformed fields to null. To keep corrupt records, you can set a string type field named columnNameOfCorruptRecord in a user-defined schema. If a schema does not have the field, corrupt records are dropped during parsing. When inferring a schema, the parser implicitly adds a columnNameOfCorruptRecord field in an output schema.
  • DROPMALFORMED: Ignores corrupted records. This mode is unsupported for XML built-in functions.
  • FAILFAST: Throws an exception when the parser meets corrupted records.

inferSchema

true

true, false

If true, attempts to infer an appropriate type for each resulting DataFrame column. If false, all resulting columns are of string type. XML built-in functions ignore this option.

columnNameOfCorruptRecord

spark.sql.columnNameOfCorruptRecord

A column name string

Allows renaming the new field that contains a malformed string created by PERMISSIVE mode.

attributePrefix

None

Any string

The prefix for attributes to differentiate attributes from elements. This will be the prefix for field names. Default is _. Can be empty for reading XML, but not for writing. Also applies to DataFrameWriter XML options.

valueTag

_VALUE

Any string

The tag used for the character data within elements that also have attribute(s) or child element(s) elements. User can specify the valueTag field in the schema or it will be added automatically during schema inference when character data is present in elements with other elements or attributes. Also applies to DataFrameWriter XML options.

encoding

UTF-8

A java.nio.charset.Charset name

For reading, decodes the XML files by the given encoding type. For writing, specifies encoding (charset) of saved XML files. XML built-in functions ignore this option. Also applies to DataFrameWriter XML options.

ignoreSurroundingSpaces

true

true, false

Whether white spaces surrounding values must be skipped. Whitespace-only character data are ignored.

rowValidationXSDPath

None

A file path string

Path to an optional XSD file that is used to validate the XML for each row individually. Rows that fail to validate are treated like parse errors. The XSD does not otherwise affect the schema, whether specified or inferred.

ignoreNamespace

false

true, false

If true, namespaces' prefixes on XML elements and attributes are ignored. Tags <abc:author> and <def:author>, for example, are treated as if both are just <author>. Namespaces cannot be ignored on the rowTag element, only its read children. XML parsing is not namespace-aware even if false.

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

A timestamp format string

Custom timestamp format string that follows the datetime pattern format. This applies to timestamp type. Also applies to DataFrameWriter XML options.

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

Custom format string for timestamp without timezone that follows the datetime pattern format. This applies to TimestampNTZType type. Also applies to DataFrameWriter XML options.

dateFormat

yyyy-MM-dd

A date format string

Custom date format string that follows the datetime pattern format. This applies to date type. Also applies to DataFrameWriter XML options.

locale

en-US

An IETF BCP 47 language tag

Sets a locale as a language tag in IETF BCP 47 format. For instance, locale is used while parsing dates and timestamps.

nullValue

string null

Any string

Sets the string representation of a null value. When this is null, the parser does not write attributes and elements for fields. Also applies to DataFrameWriter XML options.

readerCaseSensitive

true

true, false

Specifies the case sensitivity behavior when rescuedDataColumn is enabled. If true, rescue the data columns whose names differ by case from the schema. When false, read the data in a case-insensitive manner.

rescuedDataColumn

None

A column name string

Whether to collect all data that can't be parsed due to a data type mismatch and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details, see What is the rescued data column?. COPY INTO (legacy) does not support the rescued data column because you cannot manually set the schema using COPY INTO. Databricks recommends using Auto Loader for most ingestion scenarios.

singleVariantColumn

none

A column name string

Specifies the name of the single variant column. If this option is specified for reading, parse the entire XML record into a single Variant column with the given option string value as the column's name. If this option is specified for writing, write the value of the single Variant column to XML files. Also applies to DataFrameWriter XML options.

useLegacyXMLParser

true

true, false

Whether to use the legacy XML parser. The legacy parser has less stringent validation for malformed content but is less memory-efficient. Set to false to opt into the stricter default parser.

wildcardColName

xs_any

A column name string

The column name used to capture XML elements that match the wildcard (xs:any) schema element. Cannot be used together with rescuedDataColumn.

DataStreamReader options

Use these options with DataStreamReader.option() to configure streaming reads from Delta Lake tables and other file-based sources.

For file format options (JSON, CSV, Parquet, and others), see DataFrameReader options.

For Auto Loader (cloudFiles.*) options, see Auto Loader.

Example

The following example sets maxFilesPerTrigger to 10 for a Delta Lake table stream:

Python
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")

Common

The following options apply to Delta Lake tables and other file-based streaming sources.

Key

Default

Valid values

Description

cleanSource

off

off, delete, archive

How to handle source files after they are processed by the stream. off takes no action. delete permanently deletes the source file. archive moves the file to sourceArchiveDir. When set to archive, sourceArchiveDir must also be set. Does not apply to Delta Lake table streaming.

fileNameOnly

false

true, false

Whether to identify already-processed files by filename only rather than by full path. When true, files at different paths with the same filename are treated as the same file and are not reprocessed. Does not apply to Delta Lake table streaming.

latestFirst

false

true, false

Whether to process the most recently modified files first within each micro-batch. Useful when you want to process the latest data as quickly as possible. When true and maxFilesPerTrigger or maxBytesPerTrigger is set, maxFileAge is ignored. Does not apply to Delta Lake table streaming.

maxBytesPerTrigger

None

Positive integers

Soft maximum for the amount of data processed for each micro-batch. A batch may process more than the limit if the smallest input unit exceeds it. When used together with maxFilesPerTrigger, the micro-batch processes data until either limit is reached first.

For Auto Loader, use cloudFiles.maxBytesPerTrigger instead. See Common.

maxCachedFiles

10000

Positive integers or 0

Maximum number of unprocessed files to cache for subsequent micro-batches. Set to 0 to turn off caching. Increase this value when the source directory contains many new files for each trigger. Does not apply to Delta Lake table streaming.

maxFileAge

7d

A duration string such as 7d or 4h

Maximum age of files considered for processing, relative to the timestamp of the most recently modified file rather than the current system time. Files older than this threshold are ignored. Ignored when latestFirst is true and maxFilesPerTrigger or maxBytesPerTrigger is set. Does not apply to Delta Lake table streaming.

maxFilesPerTrigger

1000 for Delta Lake and Auto Loader. No maximum for other file-based sources.

Positive integers

Upper bound for the number of new files processed in each micro-batch. When used together with maxBytesPerTrigger, the micro-batch processes data until either limit is reached first.

For Auto Loader, use cloudFiles.maxFilesPerTrigger instead. See Common.

sourceArchiveDir

None

A path string

Path to the archive directory when cleanSource is set to archive. Source files are moved to this path after processing, preserving their relative directory structure. Does not apply to Delta Lake table streaming.

Auto Loader

Use these options with the cloudFiles source to configure Auto Loader for streaming ingestion from cloud storage. Options specific to the cloudFiles source are prefixed with cloudFiles to keep them in a separate namespace from other Structured Streaming source options.

Common

The following options apply to all Auto Loader configurations.

Key

Default

Valid values

Description

cloudFiles.allowOverwrites

false

true, false

Whether to allow input directory file changes to overwrite existing data.

For configuration caveats, see Does Auto Loader process the file again when the file gets appended or overwritten?.

cloudFiles.backfillInterval

None

A duration string such as 1 day or 1 week

Auto Loader can trigger asynchronous backfills at a given interval. For more information, see Trigger regular backfills using cloudFiles.backfillInterval.

Do not use when cloudFiles.useManagedFileEvents is set to true.

cloudFiles.cleanSource

OFF

OFF, DELETE, MOVE

Whether to automatically delete or move processed files from the input directory. When set to OFF (default), no files are deleted.

When set to DELETE, Auto Loader automatically deletes files 30 days after they are processed. To do this, Auto Loader must have write permissions to the source directory.

When set to MOVE, Auto Loader automatically moves files to the specified location in cloudFiles.cleanSource.moveDestination 30 days after they are processed. To do this, Auto Loader must have write permissions to the source directory as well as to the move location.

A file is considered processed when it has a non-null value for commit_time in the result of the cloud_files_state table-valued function. See cloud_files_state table-valued function. The 30-day additional wait after processing can be configured using cloudFiles.cleanSource.retentionDuration.

Review the following considerations before enabling cloudFiles.cleanSource:

  • Databricks does not recommend using this option if there are multiple streams consuming data from the source location because the fastest consumer will delete the files and they will not be ingested in the slower sources.
  • Enabling this feature requires Auto Loader to maintain additional state in its checkpoint, which incurs performance overhead but enables improved observability through the cloud_files_state table-valued function. See cloud_files_state table-valued function.
  • cleanSource uses the current setting to decide whether to MOVE or DELETE a given file. For example, suppose that the setting was MOVE when the file was originally processed but was changed to DELETE when the file became a candidate for cleanup 30 days later. In this case, cleanSource will delete the file.
  • Files are not guaranteed to be cleaned as soon as the retentionDuration expires. To keep costs low, Auto Loader deletes files concurrently with stream processing and terminates as soon as the stream processing is complete or is terminated. Files that were candidates for cleanup, but couldn't be cleaned during the stream processing will be picked up the next time Auto Loader runs.

Available in Databricks Runtime 16.4 and above.

cloudFiles.cleanSource.retentionDuration

30 days

A CalendarInterval string such as 14 days, 2 weeks, or 1 month

Amount of time to wait before processed files become candidates for archival with cleanSource. Must be greater than 7 days for DELETE. No minimum restriction for MOVE.

Available in Databricks Runtime 16.4 and above.

cloudFiles.cleanSource.moveDestination

None

A cloud storage or Unity Catalog volume path

Path to archive processed files to when cloudFiles.cleanSource is set to MOVE. This can be a cloud storage path or a Unity Catalog volume path (for example, /Volumes/my_catalog/my_schema/my_volume/archive/).

The move location must:

  • Not be a child of the source directory. If you place the move destination inside the source directory, the archived files are ingested again.
  • Be in the same external location, volume, or DBFS mount as the source. Cross-bucket and cross-container moves are not supported and result in an error.

Auto Loader must have write permissions to this directory.

Available in Databricks Runtime 16.4 and above.

cloudFiles.format

None (required option)

avro, binaryFile, csv, json, orc, parquet, text, xml

The data file format in the source path. Valid values include:

cloudFiles.includeExistingFiles

true

true, false

Whether to include existing files in the stream processing input path or to only process new files arriving after initial setup. This option is evaluated only when you start a stream for the first time. Changing this option after restarting the stream has no effect.

cloudFiles.inferColumnTypes

false

true, false

Whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON and CSV datasets. See schema inference for more details.

cloudFiles.maxBytesPerTrigger

None

A byte string such as 10g

The maximum number of new bytes to be processed in every trigger. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a microbatch. When used together with cloudFiles.maxFilesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once() (Trigger.Once() is deprecated).

In Databricks Runtime 18.0 and above, this option is dynamically configured and does not need to be set manually.

cloudFiles.maxFileAge

None

A duration string

How long a file event is tracked for deduplication purposes. Databricks does not recommend tuning this parameter unless you are ingesting data at the order of millions of files an hour. See the section on File event tracking for more details.

Tuning cloudFiles.maxFileAge too aggressively can cause data quality issues such as duplicate ingestion or missing files. Therefore, Databricks recommends a conservative setting for cloudFiles.maxFileAge, such as 90 days, which is similar to what comparable data ingestion solutions recommend.

cloudFiles.maxFilesPerTrigger

1000

Positive integers

The maximum number of new files to be processed in every trigger. When used together with cloudFiles.maxBytesPerTrigger, Databricks consumes up to the lower limit of cloudFiles.maxFilesPerTrigger or cloudFiles.maxBytesPerTrigger, whichever is reached first. This option has no effect when used with Trigger.Once() (deprecated).

In Databricks Runtime 18.0 and above, this option is dynamically configured and does not need to be set manually.

cloudFiles.partitionColumns

None

A comma-separated list of column names

A comma-separated list of Hive-style partition columns that you would like inferred from the directory structure of the files. Hive-style partition columns are key-value pairs combined by an equality sign such as <base-path>/a=x/b=1/c=y/file.format. In this example, the partition columns are a, b, and c. By default these columns are automatically added to your schema if you are using schema inference and specify the <base-path> to load data from. If you specify a schema, Auto Loader expects these columns to be included in the schema. If you do not want these columns as part of your schema, you can specify "" to ignore these columns. In addition, you can use this option when you want columns to be inferred the file path in complex directory structures, like the example below:

<base-path>/year=2022/week=1/file1.csv <base-path>/year=2022/month=2/day=3/file2.csv <base-path>/year=2022/month=2/day=4/file3.csv

Specifying cloudFiles.partitionColumns as year,month,day returns year=2022 for file1.csv, but the month and day columns are null.

month and day are parsed correctly for file2.csv and file3.csv.

cloudFiles.schemaEvolutionMode

addNewColumns when a schema is not specified, none otherwise

addNewColumns, none, rescue, failOnNewColumns

The mode for evolving the schema as new columns are discovered in the data. By default, columns are inferred as strings when inferring JSON datasets. See schema evolution for more details.

cloudFiles.schemaHints

None

A schema string

Schema information that you specify to Auto Loader during schema inference. See schema hints for more details.

cloudFiles.schemaLocation

None (required to infer the schema)

A path string

The location to store the inferred schema and subsequent changes. See schema inference for more details.

cloudFiles.useStrictGlobber

false

true, false

Whether to use a strict globber that matches the default globbing behavior of other file sources in Apache Spark. See Common data loading patterns for more details. Available in Databricks Runtime 12.2 LTS and above.

cloudFiles.validateOptions

true

true, false

Whether to validate Auto Loader options and return an error for unknown or inconsistent options.

Directory listing

The following option applies when using directory listing mode.

Key

Default

Valid values

Description

cloudFiles.useIncrementalListing (deprecated)

auto on Databricks Runtime 17.2 and below, false on Databricks Runtime 17.3 and above

auto, true, false

This feature has been deprecated. Databricks recommends using file notification mode with file events instead of cloudFiles.useIncrementalListing.

Whether to use the incremental listing rather than the full listing in directory listing mode. By default, Auto Loader makes the best effort to automatically detect if a given directory is applicable for the incremental listing. You can explicitly use the incremental listing or use the full directory listing by setting it as true or false respectively.

Incorrectly enabling incremental listing on a non-lexically ordered directory prevents Auto Loader from discovering new files.

Works with Azure Data Lake Storage (abfss://), S3 (s3://), and GCS (gs://).

Available in Databricks Runtime 9.1 LTS and above.

File notification

For information about configuring file notification mode, including required cloud permissions, setup instructions, and authentication methods, see Configure Auto Loader streams in file notification mode.

Key

Default

Valid values

Description

cloudFiles.fetchParallelism

1

Positive integers

Number of threads to use when fetching messages from the queueing service.

Do not use when cloudFiles.useManagedFileEvents is set to true.

cloudFiles.pathRewrites

None

A JSON map string

Required only if you specify a queueUrl that receives file notifications from multiple S3 buckets and you want to use mount points configured for accessing data in these containers. Use this option to rewrite the prefix of the bucket/key path with the mount point. Only prefixes can be rewritten. For example, for the configuration {"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}, the path s3://<databricks-mounted-bucket>/path/2017/08/fileA.json is rewritten to dbfs:/mnt/data-warehouse/2017/08/fileA.json.

Do not use when cloudFiles.useManagedFileEvents is set to true.

cloudFiles.resourceTag

None

Key-value tag strings

A series of key-value tag pairs to help associate and identify related resources, for example:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue") .option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

Do not use when cloudFiles.useManagedFileEvents is set to true. Instead set resource tags using the cloud provider console.

For more information, see Cloud provider resource tags.

cloudFiles.useManagedFileEvents

false

true, false

When set to true, Auto Loader uses the file events service to discover files in your external location. You can use this option only if the load path is in an external location with file events enabled. See Use file notification mode with file events.

File events provide notifications-level performance in file discovery, because Auto Loader can discover new files after the last run. Unlike directory listing, this process does not need to list all files in the directory.

There are some situations when Auto Loader uses directory listing even though the file events option is enabled:

  • During initial load, when includeExistingFiles is set to true, a full directory listing takes place to discover all of the files that were present in the directory before Auto Loader started.
  • The file events service optimizes file discovery by caching the most recently created files. If Auto Loader runs infrequently, this cache can expire, and Auto Loader falls back to directory listing to discover files and update the cache. To avoid this scenario, invoke Auto Loader at least once every seven days.

See When does Auto Loader with file events use directory listing? for a comprehensive list of situations when Auto Loader uses directory listing with this option.

Available in Databricks Runtime 14.3 LTS and above.

cloudFiles.listOnStart

false

true, false

When set to true, Auto Loader performs a full directory listing when the stream starts, instead of starting with the continuation token in the checkpoint. Use this option to recover from errors, such as CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN. See How do I recover from a CF_MANAGED_FILE_EVENTS_INVALID_CONTINUATION_TOKEN error?.

cloudFiles.useNotifications

false

true, false

Whether to use file notification mode to determine when there are new files. If false, use directory listing mode. See Compare Auto Loader file detection modes.

Do not use when cloudFiles.useManagedFileEvents is set to true.

Cloud provider resource tags

Auto Loader adds the following key-value tag pairs by default on a best-effort basis:

  • vendor: Databricks
  • path: The location from where the data is loaded. Unavailable in GCP due to labeling limitations.
  • checkpointLocation: The location of the stream's checkpoint. Unavailable in GCP due to labeling limitations.
  • streamId: A globally unique identifier for the stream.

Databricks reserves these key names, and you cannot overwrite their values.

For more information on AWS, see Amazon SQS cost allocation tags and Configuring tags for an Amazon SNS topic.

Cloud-specific

Auto Loader has options for configuring cloud infrastructure for file notification mode. For required cloud permissions and setup instructions, see Configure Auto Loader streams in file notification mode.

AWS

Specify the following options only if you choose cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:

Key

Default

Valid values

Description

cloudFiles.region

The region of the EC2 instance

An AWS region string

The region where the source S3 bucket resides and where you want to create the AWS SNS and SQS services.

Key

Default

Valid values

Description

cloudFiles.restrictNotificationSetupToSameAWSAccountId

false

true, false

Only allow event notifications from AWS S3 buckets in the same account as the SNS topic. When true, Auto Loader only accepts event notifications from AWS S3 buckets in the same account as the SNS topic.

When false, the access policy does not restrict cross-account bucket and SNS topic setups. This is useful when the SNS topic and bucket path are associated with different accounts.

Available in Databricks Runtime 17.2 and above.

Specify the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:

Key

Default

Valid values

Description

cloudFiles.queueUrl

None

A URL string

The URL of the SQS queue. If specified, Auto Loader directly consumes events from this queue instead of setting up its own AWS SNS and SQS services.

AWS authentication options

Specify the following authentication option to use a Databricks service credential:

Key

Default

Valid values

Description

databricks.serviceCredential

None

Any string

The name of your Databricks service credential. Available in Databricks Runtime 16.1 and above.

When Databricks service credentials or IAM roles are not available, you can specify the following authentication options instead:

Key

Default

Valid values

Description

cloudFiles.awsAccessKey

None

Any string

The AWS access key ID for the user. Must be specified with cloudFiles.awsSecretKey.

cloudFiles.awsSecretKey

None

Any string

The AWS secret access key for the user. Must be specified with cloudFiles.awsAccessKey.

cloudFiles.roleArn

None

An ARN string

The ARN of an IAM role to assume, if needed. The role can be assumed from your cluster's instance profile or by providing credentials with cloudFiles.awsAccessKey and cloudFiles.awsSecretKey.

cloudFiles.roleExternalId

None

Any string

An identifier to use while assuming a role using cloudFiles.roleArn.

cloudFiles.roleSessionName

None

Any string

An optional session name to use while assuming a role using cloudFiles.roleArn.

cloudFiles.stsEndpoint

None

A URL string

An optional endpoint to use for accessing AWS STS when assuming a role using cloudFiles.roleArn.

Azure

You must specify values for all of the following options if you specify cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:

Key

Default

Valid values

Description

cloudFiles.resourceGroup

None

Any string

The Azure Resource Group in which the storage account is created.

cloudFiles.subscriptionId

None

Any string

The Azure Subscription ID in which the resource group is created.

databricks.serviceCredential

None

Any string

The name of your Databricks service credential. Available in Databricks Runtime 16.1 and above.

If a Databricks service credential is not available, you can specify the following authentication options instead:

Key

Default

Valid values

Description

cloudFiles.clientId

None

Any string

The client ID or application ID of the Databricks service principal.

cloudFiles.clientSecret

None

Any string

The client secret of the Databricks service principal.

cloudFiles.connectionString

None

A connection string

The connection string for the storage account, based on either account access key or shared access signature (SAS).

cloudFiles.tenantId

None

Any string

The Azure Tenant ID in which the Databricks service principal is created.

Specify the following option only if you set cloudFiles.useNotifications = true and you want Auto Loader to use an existing queue:

Key

Default

Valid values

Description

cloudFiles.queueName

None

Any string

The name of the Azure queue. If specified, the cloud files source directly consumes events from this queue instead of setting up its own Azure Event Grid and Queue Storage services. In that case, your databricks.serviceCredential or cloudFiles.connectionString requires only read permissions on the queue.

GCP

Auto Loader can automatically set up notification services for you by leveraging Databricks service credentials. The service account created with the Databricks service credential will require the permissions specified in Configure Auto Loader streams in file notification mode.

Key

Default

Valid values

Description

cloudFiles.projectId

None

Any string

The ID of the project that the GCS bucket is in. The Google Cloud Pub/Sub subscription is also created within this project.

databricks.serviceCredential

None

Any string

The name of your Databricks service credential. Available in Databricks Runtime 16.1 and above.

If a Databricks service credential is not available, you can use Google Service Accounts directly. You can either configure your cluster to assume a service account by following Google service setup or specify the following authentication options:

Key

Default

Valid values

Description

cloudFiles.client

None

Any string

The client ID of the Google Service Account.

cloudFiles.clientEmail

None

An email address string

The email of the Google Service Account.

cloudFiles.privateKey

None

A private key string

The private key that's generated for the Google Service Account.

cloudFiles.privateKeyId

None

Any string

The ID of the private key that's generated for the Google Service Account.

Specify the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:

Key

Default

Valid values

Description

cloudFiles.subscription

None

Any string

The name of the Google Cloud Pub/Sub subscription. If specified, the cloud files source consumes events from this queue instead of setting up its own GCS Notification and Google Cloud Pub/Sub services.

Delta Lake

The following options apply when reading from a Delta Lake table using spark.readStream.

Key

Default

Valid values

Description

allowSourceColumnDrop

None

A version number or always

Set to a Delta table version number or always to allow the stream to continue after columns are dropped from the source table schema. When set to a version number, acknowledges all schema changes up to that version. Requires schemaTrackingLocation. See Rename and drop columns with Delta Lake column mapping.

allowSourceColumnRename

None

A version number or always

Set to a Delta table version number or always to allow the stream to continue after columns are renamed in the source table. When set to a version number, acknowledges all schema changes up to that version. Requires schemaTrackingLocation. See Rename and drop columns with Delta Lake column mapping.

allowSourceColumnTypeChange

None

A version number or always

Set to a Delta table version number or always to allow the stream to continue after column types are changed in the source table. When set to a version number, acknowledges all schema changes up to that version. Requires schemaTrackingLocation. See Type widening.

excludeRegex

None

A Java regex string

A regular expression pattern. Files whose paths match the pattern are excluded from the streaming read. Useful for filtering out files that do not conform to the expected naming convention.

failOnDataLoss

true

true, false

Whether to fail the streaming query if source data has been deleted due to log retention (logRetentionDuration). Set to false to skip missing data and continue processing. See Configure data retention for time travel queries.

ignoreChanges (deprecated)

false

true, false

Available in Databricks Runtime 11.3 LTS and lower. Re-emits rewritten data files after modification operations such as UPDATE, MERGE INTO, DELETE, or OVERWRITE. Unchanged rows may be emitted alongside new rows, so downstream consumers must handle duplicates. Deletes are not propagated downstream. Replaced by skipChangeCommits in Databricks Runtime 12.2 LTS and above.

ignoreDeletes (deprecated)

false

true, false

Ignores transactions that delete data at partition boundaries (full partition drops only). Does not handle non-partition deletes, updates, or other modifications. Use skipChangeCommits instead.

readChangeFeed or readChangeData

false

true, false

Whether to enable reading the change data feed for the streaming query. When enabled, the stream emits row-level changes (inserts, updates, and deletes) with additional metadata columns. See Use Delta Lake change data feed on Databricks.

schemaTrackingLocation

None

A path string

Path to a directory where Delta Lake tracks schema changes for the streaming read. Required when streaming from tables with column mapping enabled and using allowSourceColumn* options to handle schema evolution. Must be within the checkpointLocation of the streaming query. See Rename and drop columns with Delta Lake column mapping.

skipChangeCommits

false

true, false

Ignores transactions that delete or modify existing records and processes only appends. Databricks recommends this option for most workloads that do not use change data feeds. Available in Databricks Runtime 12.2 LTS and above. See Skip upstream change commits with skipChangeCommits.

startingTimestamp

Latest available

A timestamp string such as 2019-01-01T00:00:00.000Z or a date string such as 2019-01-01

Timestamp to start reading from. The stream reads all table changes committed at or after the specified timestamp. If the timestamp precedes all available table commits, the stream starts from the earliest available commit. Cannot be used together with startingVersion. Ignored if the streaming checkpoint already exists.

startingVersion

Latest available

A positive integer, 0, or latest

Delta table version to start reading from. The stream reads all changes committed at or after the specified version. Specify latest to start from only the most recent changes. Cannot be used together with startingTimestamp. Ignored if the streaming checkpoint already exists. See Work with table history.

withEventTimeOrder

false

true, false

Divides the initial table snapshot into event time buckets to prevent records from being incorrectly marked as late events and dropped in stateful queries with watermarks. Cannot be changed after initial snapshot processing has begun without deleting the checkpoint. Available in Databricks Runtime 11.3 LTS and above. See Process initial snapshot without dropping data.

Kafka

Use these options with either spark.readStream.format("kafka") or spark.read.format("kafka"):

Key

Default

Valid values

Description

assign

None

A JSON string such as {"topicA":[0,1],"topicB":[2,4]}

The specific partitions to consume. You must specify exactly one of the subscribe, subscribePattern, or assign options.

failOnDataLoss

true

true, false

Whether to fail the query if data might have been lost, for example, due to deleted topics or offset truncation. Set to false to skip missing data and continue.

Databricks estimates conservatively whether data might have been lost. However, this might cause false alarms.

fetchoffset.numretries

3

Positive integers or 0

The number of retries when fetching Kafka offsets fails.

fetchoffset.retryintervalms

1000

Positive integers or 0

The interval in milliseconds between offset fetch retries.

groupIdPrefix

spark-kafka-source (streaming), spark-kafka-relation (batch)

Any string

The customized prefix to use for the auto-generated Kafka consumer group ID. If kafka.group.id is explicitly set, the connector ignores this option.

kafka.group.id

None

Any string

The Kafka consumer group ID to use when reading. Use with caution: queries sharing the same group ID interfere with each other and might read only partial data. This can occur when running concurrent batch and streaming workloads, or when restarting queries quickly. If set, groupIdPrefix is ignored. To minimize issues, set the Kafka consumer configuration session.timeout.ms to a small value.

includeHeaders

false

true, false

Whether to include Kafka message headers as a column in the output.

kafkaconsumer.polltimeoutms

None

Positive integers

The timeout in milliseconds for the Kafka consumer poll() call.

kafka.bootstrap.servers

None

A comma-separated list of host:port strings

A comma-separated list of host:port addresses for Kafka brokers. Sets the Kafka client's bootstrap.servers property.

If you find there is no data from Kafka, check this broker address list for incorrect addresses. If the broker address list is incorrect, there might not be any errors. Kafka clients assume the brokers will be available eventually and retry forever when they receive network errors.

maxRecordsPerPartition

None

Positive integers

The maximum number of records for each Spark partition. When set, the connector splits Kafka partitions so that each Spark partition reads at most this many records.

You can also use this option with minPartitions. When both options are set, Spark uses whichever option results in more partitions.

minPartitions

None

Positive integers

The minimum number of Spark partitions to read from Kafka. When set, the connector splits large Kafka partitions to increase parallelism. When not set, Spark creates one partition for each Kafka topic-partition. Useful for handling data skew or peak loads.

This option reinitializes Kafka consumers for each trigger, which might affect performance with SSL.

startingOffsets

latest (streaming), earliest (batch)

earliest, latest, or a JSON offset string

The offset that the query begins the read from. In the JSON string, -1 is the latest offset. -2 is the earliest offset. For example: {"topicA":{"0":23,"1":-2}}.

For streaming queries, this option only applies when a new query starts. Resumed queries always use the checkpoint. During a query, new partitions start reading at the earliest offset.

For batch queries, latest is not allowed.

startingOffsetsByTimestamp

None

A JSON timestamp string such as {"topicA":{"0":1000,"1":2000}}

A list of starting offsets for each partition, specified as timestamps in milliseconds. When no offset exists for a timestamp, query behavior is determined by startingOffsetsByTimestampStrategy.

For streaming queries, this option only applies when a new query starts. Resumed queries always use the checkpoint. During a query, new partitions start reading at the earliest offset.

startingOffsetsByTimestampStrategy

error

error, latest

The strategy to use when no offset is found for a timestamp specified in startingOffsetsByTimestamp or startingTimestamp. error raises an exception. latest uses the latest available offset.

startingTimestamp

None

Positive integers or 0

The global starting timestamp in milliseconds that applies to all partitions. When no offset exists for the timestamp, behavior is controlled by startingOffsetsByTimestampStrategy.

subscribe

None

A comma-separated list of topic names

The topics to subscribe to. You must specify exactly one of the subscribe, subscribePattern, or assign options.

subscribePattern

None

A Java regex string

The pattern used to subscribe to topics. You must specify exactly one of the subscribe, subscribePattern, or assign options. For example, topic.*.

The following options apply only to streaming reads with spark.readStream.format("kafka"):

Key

Default

Valid values

Description

bytesEstimateWindowLength

300s

Duration strings such as 10m or 600s

The time window used to estimate remaining bytes for the estimatedTotalBytesBehindLatest metric. See Retrieve Kafka metrics.

maxOffsetsPerTrigger

None

Positive integers

The maximum number of offsets to process per trigger interval. Offsets are distributed proportionally across topic partitions.

maxTriggerDelay

15m

Duration strings such as 10m or 600s

The maximum time to wait for minOffsetsPerTrigger to accumulate before triggering.

minOffsetsPerTrigger

None

Positive integers

The minimum number of offsets to accumulate before triggering a micro-batch. When maxTriggerDelay is reached, the micro-batch runs regardless.

For offset options that apply only to batch reads with spark.read.format("kafka"), see DataFrameReader Kafka options.

Authentication

Databricks recommends using a Unity Catalog service credential to authenticate to cloud-managed Kafka services (AWS MSK, Azure Event Hubs, or Google Cloud Managed Kafka).

Key

Default

Valid values

Description

databricks.serviceCredential

None

Any string

The name of a Unity Catalog service credential for authenticating to cloud-managed Kafka services. Available in Databricks Runtime 16.1 and above.

databricks.serviceCredential.scope

None

Any string

The OAuth scope for the service credential. Set this only when Databricks cannot automatically infer the scope for your Kafka service.

When a service credential is not available, use SASL/SSL options (passed through as kafka.* properties). When you use a service credential, you don't need to specify kafka.sasl.mechanism, kafka.sasl.jaas.config, or kafka.security.protocol.

Key

Default

Valid values

Description

kafka.security.protocol

None

A security protocol string, such as SASL_SSL, SSL, PLAINTEXT

The security protocol for broker communication.

kafka.sasl.mechanism

None

A SASL mechanism string, such as PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER, AWS_MSK_IAM

The SASL mechanism.

kafka.sasl.jaas.config

None

A JAAS configuration string

The JAAS login configuration string.

kafka.sasl.login.callback.handler.class

None

A fully qualified class name

The fully qualified class name of a login callback handler for SASL authentication.

kafka.sasl.client.callback.handler.class

None

A fully qualified class name

The fully qualified class name of a client callback handler for SASL authentication.

kafka.ssl.truststore.location

None

A file path string

The path to the SSL trust store file.

kafka.ssl.truststore.password

None

Any string

The password for the SSL trust store file.

kafka.ssl.keystore.location

None

A file path string

The path to the SSL key store file.

kafka.ssl.keystore.password

None

Any string

The password for the SSL key store file.

For complete authentication setup instructions, see Authentication.

Kinesis

Use these options with spark.readStream.format("kinesis") to read from Amazon Kinesis Data Streams. You must specify either streamName or streamARN, but not both.

Key

Default

Valid values

Description

streamName

None

A comma-separated list of stream names

A comma-separated list of Kinesis stream names to subscribe to.

streamARN

None

A comma-separated list of Kinesis stream ARNs

A comma-separated list of Kinesis stream ARNs. For example, arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2. Available in Databricks Runtime 16.1 and above.

The following options are also available:

Key

Default

Valid values

Description

awsAccessKey

None

Any string

The AWS access key ID. Must be specified with awsSecretKey.

awsSecretKey

None

Any string

The AWS secret access key corresponding to awsAccessKey.

coalesceBinSize

128000000

Positive integers

The approximate target block size in bytes after coalescing.

coalesceThresholdBlockSize

10000000

Positive integers

The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced to the coalesceBinSize.

consumerMode

polling

polling, efo

The consumer type. efo provides enhanced fan-out with dedicated 2 MB/s throughput per shard. Available in Databricks Runtime 11.3 LTS and above.

consumerName

Streaming query ID

A single consumer name or a comma-separated list matching the number of streams

The consumer name used to register the query with the Kinesis service in EFO mode. Available in Databricks Runtime 11.3 LTS and above.

consumerNamePrefix

databricks_

Any string

The prefix prepended to consumerName when registering consumers in EFO mode. Available in Databricks Runtime 16.0 and above.

consumerRefreshInterval

300s (max 3600s)

A duration string such as 1s

The interval at which the EFO consumer registration is checked and refreshed. Available in Databricks Runtime 11.3 LTS and above.

endpoint

Locally resolved region

Any string

The regional endpoint for Kinesis Data Streams.

fetchBufferSize

20gb

A byte string such as 2gb or 10mb

The amount of data to buffer for the next trigger. This is a stopping condition, not a strict upper bound. More data might be buffered than specified.

initialPosition

latest

latest, trim_horizon, earliest, at_timestamp

Where to start reading from in the stream. trim_horizon is an alias for earliest.

For at_timestamp, specify a JSON string using Java timestamp format, such as {"at_timestamp": "06/25/2020 10:23:45 PDT"}. You can also specify a custom format: {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}.

maxFetchDuration

10s

A duration string such as 1m

The duration to buffer prefetched data before making it available for processing.

maxFetchRate

1.0 (max 2.0)

Positive decimals

The maximum data prefetch rate per shard in MB/s. This rate limits fetches to avoid Kinesis throttling. Kinesis allows a maximum of 2.0 MB/s.

maxRecordsPerFetch

10000

Positive integers

The number of records to read per Kinesis API request. The number of records returned might be higher if sub-records were aggregated using the Kinesis Producer Library.

maxShardsPerDescribe

100

Positive integers up to 10000

The maximum number of shards to read per API call when listing shards.

minFetchPeriod

400ms (min 200ms)

A duration string such as 1s

The minimum duration to wait between consecutive prefetch attempts. This limits fetch frequency to avoid Kinesis throttling. 200ms is the minimum because Kinesis allows a maximum of 5 fetches/sec.

region

Locally resolved region

Any string

The region the streams are defined in.

registeredConsumerId

None

A comma-separated list of consumer names or ARNs

A comma-separated list of identifiers for existing EFO consumers. Available in Databricks Runtime 16.1 and above.

registeredConsumerIdType

None

name, ARN

Whether the identifiers in registeredConsumerId are consumer names or ARNs. Available in Databricks Runtime 16.1 and above.

requireConsumerDeregistration

false

true, false

Whether to de-register the enhanced fan-out consumer on query termination. Requires consumerMode set to efo. Available in Databricks Runtime 11.3 LTS and above.

roleArn

None

An ARN string

The ARN of an IAM role to assume when accessing Kinesis.

roleExternalId

None

Any string

An optional external ID to use when assuming the role specified by roleArn. See How to Use an External ID.

roleSessionName

None

Any string

An identifier for the assumed role session. Uniquely identifies a session when the same role is assumed by different principals.

serviceCredential

None

Any string

The name of your Databricks service credential for authenticating to Kinesis. Available in Databricks Runtime 16.1 and above.

stsEndpoint

None

A URL string

A custom endpoint for AWS STS when assuming a role using roleArn.

shardFetchInterval

1s

A duration string such as 2m

The interval at which to poll Kinesis for resharding events.

shardsPerTask

5

Positive integers

The number of Kinesis shards to prefetch in parallel per Spark task. For minimum latency, ensure # cores in cluster >= # Kinesis shards / shardsPerTask.

For more information on reading from Kinesis, see Connect to Amazon Kinesis.

Pub/Sub

Use these options with spark.readStream.format("pubsub") to subscribe to Google Pub/Sub. The options subscriptionId, topicId, and projectId are required.

Key

Default

Valid values

Description

subscriptionId

None

Any string

Required. The Pub/Sub subscription ID. The connector creates the subscription if it does not exist.

topicId

None

Any string

Required. The Pub/Sub topic ID.

projectId

None

Any string

Required. The Google Cloud project ID.

numFetchPartitions

Half the number of executors available at stream initialization

Positive integers

The number of parallel Spark tasks that fetch rows from the subscription.

maxBytesPerTrigger

None

Positive integers

A soft limit on the number of bytes to process per micro-batch.

maxRecordsPerFetch

1000

Positive integers

The number of rows to fetch per task before processing.

maxFetchPeriod

10s

A duration string such as 1s or 1m

The time duration for each task to fetch before processing rows. Databricks recommends using the default value.

deleteSubscriptionOnStreamStop

false

true, false

When true, the subscription, from subscriptionId, is deleted when the streaming query ends.

serviceCredential

None

Any string

The name of a Databricks service credential for authenticating to Pub/Sub. Available in Databricks Runtime 16.1 and above.

clientEmail

None

An email address string

The email address of the Google Service Account. Required when not using a service credential.

clientId

None

Any string

The client ID of the Google Service Account. Required when not using a service credential.

privateKey

None

A private key string

The private key for the Google Service Account. Required when not using a service credential.

privateKeyId

None

Any string

The private key ID for the Google Service Account. Required when not using a service credential.

For more information on Pub/Sub, see Subscribe to Google Pub/Sub.

Pulsar

Use these options with spark.readStream.format("pulsar") to stream from Apache Pulsar. Available in Databricks Runtime 14.1 and above.

The following options are required. You must specify exactly one of topic, topics, or topicsPattern.

Key

Default

Valid values

Description

service.url

None

A Pulsar service URL string

The Pulsar serviceURL for the Pulsar service, for example pulsar://broker.example.com:6650.

topic

None

Any string

A single topic name to consume.

topics

None

A comma-separated list of topic names

A comma-separated list of topic names to consume.

topicsPattern

None

A Java regex string

A Java regex string to match topic names.

The following options are also supported:

Key

Default

Valid values

Description

admin.url

None

A URL string

The Pulsar admin service HTTP URL. Required when maxBytesPerTrigger is set.

allowDifferentTopicSchemas

false

true, false

If multiple topics with different schemas are read, use this option to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is true.

failOnDataLoss

true

true, false

Whether to fail the query when data is lost. For example, data loss might occur when topics are deleted or messages expire due to retention policy.

maxBytesPerTrigger

None

Positive integers

A soft limit on the number of bytes to process per micro-batch. Requires admin.url.

pollTimeoutMs

120000

Positive integers

The timeout for reading messages from Pulsar in milliseconds.

predefinedSubscription

None

Any string

The predefined subscription name used by the connector to track Spark application progress.

startingOffsets

latest

latest, earliest, or a JSON offset string

Where to start reading from.

subscriptionPrefix

None

Any string

The prefix used by the connector to generate a random subscription to track Spark application progress.

waitingForNonExistedTopic

false

true, false

Whether the connector waits until the desired topics are created.

You can specify additional Pulsar client, admin, and reader configurations using the following option patterns:

Pattern

Configuration options

pulsar.admin.*

Pulsar admin configuration

pulsar.client.*

Pulsar client configuration, including authentication options such as pulsar.client.authPluginClassName and pulsar.client.authParams.

pulsar.reader.*

Pulsar reader configuration

For more information on Pulsar client and admin authentication options, see Authentication.

Authentication

Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends using secrets to store authentication details. See Secret management.

Key

Default

Valid values

Description

pulsar.client.authPluginClassName

None

A fully qualified class name

The fully qualified class name of the authentication plugin. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls.

pulsar.client.authParams

None

A credential string

Authentication credentials passed to the authentication plugin as a string. For example, tlsCertFile:/path/to/my-role.cert.pem,tlsKeyFile:/path/to/my-role.key-pk8.pem.

pulsar.client.useKeyStoreTls

false

true, false

When true, enables KeyStore-based TLS configuration instead of PEM-format files.

pulsar.client.tlsTrustStoreType

None

Any string

The format of the TLS trust store file. For example, JKS.

pulsar.client.tlsTrustStorePath

None

A file path string

The path to the TLS trust store file containing trusted CA certificates. Required when pulsar.client.useKeyStoreTls is true.

pulsar.client.tlsTrustStorePassword

None

Any string

The password for the TLS trust store file.

If the stream uses a PulsarAdmin, you can also set the following options:

Key

Default

Valid values

Description

pulsar.admin.authPluginClassName

None

A fully qualified class name

The fully qualified class name of the authentication plugin for the Pulsar admin client.

pulsar.admin.authParams

None

A credential string

Authentication credentials for the Pulsar admin client authentication plugin.

pulsar.admin.useTls

None

true, false

Whether to use TLS for the Pulsar admin client connection.

pulsar.admin.tlsAllowInsecureConnection

None

true, false

Whether to allow insecure TLS connections for the Pulsar admin client.

pulsar.admin.tlsTrustCertsFilePath

None

A file path string

Path to the trusted TLS certificate file for the Pulsar admin client.

pulsar.admin.useKeyStoreTls

None

true, false

Whether to use KeyStore-based TLS for the Pulsar admin client.

pulsar.admin.tlsTrustStoreType

None

Any string

The format of the TLS trust store for the Pulsar admin client. For example, JKS.

pulsar.admin.tlsTrustStorePath

None

A file path string

Path to the TLS trust store file for the Pulsar admin client. Required when pulsar.admin.useKeyStoreTls is true.

pulsar.admin.tlsTrustStorePassword

None

Any string

Password for the Pulsar admin client TLS trust store.

For authentication examples, see Authenticate to Pulsar.

DataFrameWriter options

Use these options with DataFrameWriter.option() and DataFrameWriterV2.option() to control how Databricks writes data.

Example

The following example sets mergeSchema to True for writing a Delta Lake table:

Python
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")

Avro

The following options apply when writing Avro files.

Key

Default

Valid values

Description

avroSchema

None

A JSON schema string

The full Avro schema as a JSON string. Use this option to convert Spark SQL types to specific Avro types. Applies to Avro file.

avroSchemaUrl

None

A URL string

A URL pointing to an Avro schema file. Use instead of avroSchema when the schema is stored externally. Mutually exclusive with avroSchema. Applies to Avro file.

compression

snappy

uncompressed, deflate, snappy, bzip2, xz, zstandard

Compression codec to use when writing. Applies to Avro file.

recordName

topLevelRecord

Any string

The top-level record name in the output Avro schema. Applies to Avro file.

positionalFieldMatching

false

true, false

Whether to match columns between the Spark schema and the Avro schema by field position instead of by name. Applies to Avro file.

recordNamespace

Empty string

Any string

The namespace for the top-level record in the output Avro schema. Applies to Avro file.

Delta Lake and Apache Iceberg

The following options apply when writing Delta Lake and Apache Iceberg tables.

Key

Default

Valid values

Description

clusterByAuto

false

true, false

Whether to enable automatic liquid clustering, where Databricks selects clustering columns based on query patterns. Only valid with mode("overwrite"). Cannot be used with append mode. Available in Databricks Runtime 16.4 and above. Applies to Use liquid clustering for tables.

mergeSchema

None

true, false

Whether to enable schema evolution for the write operation. New columns in the source DataFrame are added to the target table schema. Applies to batch and streaming appends. Applies to Update table schema.

overwriteSchema

None

true, false

Whether to replace the table schema and partitioning when overwriting. Requires mode("overwrite") without replaceWhere. Cannot be used with partitionOverwriteMode. Applies to Update table schema.

partitionOverwriteMode

None

static, dynamic

The partition overwrite mode. Set this to dynamic to overwrite only partitions containing new data, leaving all other partitions unchanged. Legacy mode, not supported on serverless compute or Databricks SQL. Applies to Selectively overwrite data with Delta Lake.

replaceOn

None

A boolean expression string

A boolean expression that matches rows in the target table to replace with rows from the source query. Can reference columns from both the target table and the source query. Rows in the target that match a source row are deleted and replaced. If the source is empty, no deletions occur. Use targetAlias to disambiguate column references. Available in Databricks Runtime 17.1 and above. Applies to Selectively overwrite data with Delta Lake.

replaceUsing

None

A comma-separated list of column names

A comma-separated list of column names used to match rows between the target table and the source query. Both the target and the source must contain all listed columns. Rows in the target that match a source row under equality comparison are deleted and replaced. NULL values are treated as not equal and won't match. Available in Databricks Runtime 16.3 and above. Applies to Selectively overwrite data with Delta Lake.

replaceWhere

None

A predicate expression string

A predicate expression. Atomically overwrites only the records that match the predicate. Applies to Selectively overwrite data with Delta Lake.

targetAlias

None

Any string

A string alias for the target table. Use with replaceOn or replaceWhere to disambiguate column references when the condition references columns from both the target table and the source query. Applies to Selectively overwrite data with Delta Lake.

txnAppId

None

Any string

A unique string identifying the application for idempotent writes in foreachBatch operations. Use together with txnVersion to ensure exactly-once writes to multiple Delta Lake tables. Applies to Use foreachBatch for idempotent table writes.

txnVersion

None

A monotonically increasing integer

A monotonically increasing number used as the transaction version for idempotent writes in foreachBatch operations. Use together with txnAppId to ensure exactly-once writes to multiple Delta Lake tables. Applies to Use foreachBatch for idempotent table writes.

optimizeWrite

None

true, false

Whether to enable Auto Optimize Write for this write operation. Overrides the spark.databricks.delta.optimizeWrite.enabled configuration. Applies to What is Delta Lake in Databricks?.

userMetadata

None

Any string

A user-defined string appended to the commit metadata for the write operation. Visible in the output of DESCRIBE HISTORY. Applies to Enrich tables with custom metadata.

CSV

The following options apply when writing CSV files.

Key

Default

Valid values

Description

charToEscapeQuoteEscaping

\0 (not enabled)

A single character

The character used to escape the escape character when it differs from the quote character. Applies to csv (DataFrameWriter).

compression

none

none, bzip2, gzip, lz4, snappy, deflate, zstd

Compression codec to use when writing. Applies to csv (DataFrameWriter).

dateFormat

yyyy-MM-dd

A date format string

Format string for date column values. Applies to csv (DataFrameWriter).

emptyValue

Empty string

Any string

The string written for empty (non-null) values. Applies to csv (DataFrameWriter).

encoding

UTF-8

A java.nio.charset.Charset name

The character encoding for the output files. Applies to csv (DataFrameWriter).

escape

\

A single character

The character used to escape quoted values. Applies to csv (DataFrameWriter).

escapeQuotes

true

true, false

Whether to escape quote characters inside quoted field values. Applies to csv (DataFrameWriter).

header

false

true, false

Whether to write column names as the first line of the output. Applies to csv (DataFrameWriter).

ignoreLeadingWhiteSpace

false

true, false

Whether to trim leading whitespace from values when writing. Applies to csv (DataFrameWriter).

ignoreTrailingWhiteSpace

false

true, false

Whether to trim trailing whitespace from values when writing. Applies to csv (DataFrameWriter).

lineSep

\n

A string

The line separator string used between records. Applies to csv (DataFrameWriter).

locale

en-US

A java.util.Locale identifier

A java.util.Locale identifier. A Java locale identified that affects default date, timestamp, and decimal parsing within the CSV.

nullValue

Empty string

Any string

String written for null values. Applies to csv (DataFrameWriter).

quote

"

A single character

The character used to quote field values that contain the separator. Applies to csv (DataFrameWriter).

quoteAll

false

true, false

Whether to enclose all field values in quotes regardless of content. Applies to csv (DataFrameWriter).

sep

,

A string

The field delimiter character. Applies to csv (DataFrameWriter).

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

A timestamp format string

The format string for timestamp column values. Applies to csv (DataFrameWriter).

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

Format string for timestamp without timezone (TimestampNTZType) column values.

Excel

The following options apply when writing Excel files.

Key

Default

Valid values

Description

dataAddress

None

A sheet name or cell reference string

The sheet name or starting cell for the write. If omitted, writes to a sheet named Sheet1 starting at cell A1. Accepts a sheet name (SheetName) or a single cell reference (SheetName!A1). Cell ranges are not supported for writes.

dateFormatInWrite

yyyy-mm-dd

An Excel date format string

Excel cell format string applied to Date columns. Uses Excel format syntax.

headerRows

0

0, 1

Whether to write column names as the first row.

timestampNTZFormat

yyyy-mm-dd hh:mm:ss

An Excel timestamp format string

Excel cell format string applied to TimestampNTZ and Timestamp columns. Uses Excel format syntax.

version

xlsx

xlsx, xls

The Excel file format version to write.

JSON

The following options apply when writing JSON files.

Key

Default

Valid values

Description

compression

none

none, bzip2, gzip, lz4, snappy, deflate, zstd

Compression codec to use when writing. Applies to json (DataFrameWriter).

dateFormat

yyyy-MM-dd

A date format string

Format string for date column values. Applies to json (DataFrameWriter).

encoding

UTF-8

A java.nio.charset.Charset name

The character encoding for the output files. Applies to json (DataFrameWriter).

ignoreNullFields

value of spark.sql.jsonGenerator.ignoreNullFields

true, false

Whether to omit fields with null values from the JSON output. Applies to json (DataFrameWriter).

lineSep

\n

A string

The line separator string used between records. Applies to json (DataFrameWriter).

locale

en-US

A java.util.Locale identifier

A Java locale identifier that affects default date, timestamp, and decimal parsing within the JSON.

pretty

false

true, false

Whether to enable pretty (indented, multiline) JSON output.

sortKeys

false

true, false

Whether to sort the keys of JSON objects alphabetically in the output. Useful for producing deterministic output.

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

A timestamp format string

The format string for timestamp column values. Applies to json (DataFrameWriter).

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

Format string for timestamp without timezone (TimestampNTZType) column values.

writeNonAsciiCharacterAsCodePoint

false

true, false

Whether to encode non-ASCII characters as \uXXXX Unicode escape sequences instead of literal UTF-8 characters in the output.

ORC

The following options apply when writing ORC files.

Key

Default

Valid values

Description

compression

zstd

none, uncompressed, snappy, zlib, lzo, zstd, lz4, brotli

Compression codec to use when writing. Applies to orc (DataFrameWriter).

Parquet

The following options apply when writing Parquet files.

Key

Default

Valid values

Description

compression

snappy

none, uncompressed, snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd

Compression codec to use when writing. Applies to parquet (DataFrameWriter).

spark.sql.parquet.outputTimestampType

INT96

INT96, TIMESTAMP_MICROS, TIMESTAMP_MILLIS

The physical type used to encode timestamp columns. Use INT96 for compatibility with legacy Parquet readers that do not support the standard timestamp types.

Text

The following options apply when writing text files.

Key

Default

Valid values

Description

compression

none

none, bzip2, gzip, lz4, snappy, deflate, zstd

Compression codec to use when writing. Applies to text (DataFrameWriter).

encoding

UTF-8

A java.nio.charset.Charset name

The character encoding for the output files.

lineSep

\n

A string

The line separator string used between records. Applies to text (DataFrameWriter).

XML

The following options apply when writing XML files.

Key

Default

Valid values

Description

arrayElementName

item

Any string

The element name for array elements that have no explicit name. Applies to xml (DataFrameWriter).

attributePrefix

_

Any string

The prefix prepended to field names that correspond to XML attributes. Applies to xml (DataFrameWriter).

compression

none

none, bzip2, gzip, lz4, snappy, deflate, zstd

Compression codec to use when writing. Applies to xml (DataFrameWriter).

dateFormat

yyyy-MM-dd

A date format string

Format string for date column values. Applies to xml (DataFrameWriter).

declaration

version="1.0" encoding="UTF-8" standalone="yes"

An XML declaration string, or empty string to suppress

The XML declaration string written at the top of each output file. Set to an empty string to suppress the declaration. Applies to xml (DataFrameWriter).

encoding

UTF-8

A java.nio.charset.Charset name

The character encoding for the output files. Applies to xml (DataFrameWriter).

indent

4 spaces

Any string

The string used to indent child elements in the output. Set to an empty string to turn off indentation and write each row on a single line.

locale

en-US

A java.util.Locale identifier

A Java locale identifier that affects default date, timestamp, and decimal formatting within the XML.

nullValue

null

Any string

The string written for null values. When set to null, attributes and child elements for null fields are omitted. Applies to xml (DataFrameWriter).

rootTag

ROWS

Any string

The root element tag that wraps all row elements in the output. Applies to xml (DataFrameWriter).

rowTag

ROW

Any string

The element tag that represents a row in the output. Applies to xml (DataFrameWriter).

singleVariantColumn

None

A column name string

The name of the single Variant column to write to XML files. Applies to xml (DataFrameWriter).

timestampFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]

A timestamp format string

The format string for timestamp column values. Applies to xml (DataFrameWriter).

timestampNTZFormat

yyyy-MM-dd'T'HH:mm:ss[.SSS]

A timestamp format string

Format string for timestamp without timezone column values. Applies to xml (DataFrameWriter).

validateName

true

true, false

Whether to throw an exception if a column name is not a valid XML element identifier. Applies to xml (DataFrameWriter).

valueTag

_VALUE

Any string

The field name used for character data in XML elements that also have attributes or child elements. Applies to xml (DataFrameWriter).

DataStreamWriter options

Use these options with DataStreamWriter.option() to configure streaming writes.

Example

The following example sets the checkpoint location for a stream:

Python
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))

Common

The following options apply to all streaming write operations.

Key

Default

Valid values

Description

checkpointLocation

None (required)

A path string

Path to the checkpoint directory for the streaming query. Required for fault tolerance and exactly-once processing guarantees. Each streaming query must use a unique checkpoint location. Databricks recommends storing checkpoints in a Unity Catalog volume or cloud storage path. See Structured Streaming checkpoints.

path

None

A path string

Output path for file-based streaming sinks such as Parquet. Applies to file-based formats only.

Console sink

The following options apply when writing streams to the console sink.

Key

Default

Valid values

Description

numRows

20

Positive integers

The number of rows to display for each micro-batch when writing to the console sink.

truncate

true

true, false

Whether to truncate long strings when displaying rows. Set to false to show full string values.

Delta Lake

The following options apply when writing a stream to a Delta Lake table using format("delta"). Overwrite-only options such as overwriteSchema, replaceWhere, and partitionOverwriteMode are not supported for streaming writes.

Key

Default

Valid values

Description

mergeSchema

false

true, false

Whether to evolve the Delta Lake table schema when the streaming DataFrame contains new columns. Applies to append output mode only. Applies to Update table schema.

userMetadata

None

Any string

A user-defined string appended to the commit metadata for the write operation. Visible in the output of DESCRIBE HISTORY. Applies to Enrich tables with custom metadata.

File sink

The following option applies when writing a stream to file-based formats (Parquet, JSON, CSV, ORC, text). For format-specific options, see DataFrameWriter options.

Key

Default

Valid values

Description

retention

None

A time string such as 7 days or 24 hours

How long to retain sink metadata files used for fault tolerance and compaction. When not set, metadata files are retained indefinitely.

Kafka sink

The following options apply when writing to Kafka.

Key

Default

Valid values

Description

kafka.bootstrap.servers

None

A comma-separated list of host:port strings

Required. A comma-separated list of Kafka broker host:port addresses.

topic

None

Any string

The target Kafka topic for all rows. Required if the DataFrame does not include a topic column.

kafka.*

None

Any Kafka producer configuration value

Any Kafka producer configuration prefixed with kafka.. For example, kafka.compression.type.

Memory sink

The following options apply when writing streams to the memory sink.

Key

Default

Valid values

Description

queryName

None (required)

Any string

The name of the in-memory table that the query writes to. Required for the memory sink. Also configurable via .queryName().

mode

exactlyonce

exactlyonce, atleastonce

Delivery guarantee for the memory sink. exactlyonce uses micro-batch mode with exactly-once semantics. atleastonce uses continuous mode with at-least-once semantics.

Spark function options

Some Spark SQL built-in functions accept an options map that controls parsing or serialization behavior. Pass options as a Python dict or a Scala Map[String, String].

Example

The following example parses a JSON column while dropping malformed records:

Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))

Avro

Avro functions accept the same options as the corresponding DataFrame options:

Example

The following example decodes an Avro column with schema evolution enabled:

Python
from pyspark.sql.functions import from_avro

df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))

In addition, the Schema Registry variants of from_avro and to_avro accept the following options:

Key

Default

Valid values

Description

schemaId

None

A schema ID integer

Schema ID from the Confluent Schema Registry to use when decoding Avro data that was encoded with a schema incompatible with jsonFormatSchema. Applies to from_avro only.

confluent.schema.registry.*

None

Any Confluent SR client property value

Confluent Schema Registry client configuration properties. Pass any Confluent SR client property using this prefix, for example confluent.schema.registry.basic.auth.user.info for basic authentication credentials. Required for the Schema Registry variants of from_avro and to_avro.

CSV

CSV functions accept the same options as the corresponding DataFrame options:

Example

The following example reads CSV with a custom separator and NULL value:

Python
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))

JSON

JSON functions accept the same options as the corresponding DataFrame options:

Example

The following example writes JSON with NULL fields ignored and pretty formatting enabled:

Python
from pyspark.sql.functions import to_json

df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))

Protobuf

from_protobuf and to_protobuf do not use a file-based DataSource. Protobuf data is always read and written as binary columns using these functions. Options are passed as a Map[String, String] and are case-sensitive.

Example

The following example decodes a Protobuf column using PERMISSIVE mode:

Python
from pyspark.sql.functions import from_protobuf

df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))

Protobuf functions use the following options:

Key

Default

Valid values

Description

mode

FAILFAST

FAILFAST, PERMISSIVE

How to handle corrupt records. FAILFAST throws an exception. PERMISSIVE sets malformed fields to null. Applies to from_protobuf.

recursive.fields.max.depth

-1 (disabled)

0 to 10

Maximum recursion depth for recursive Protobuf fields. Set to 0 to turn off recursive field support. Applies to from_protobuf.

convert.any.fields.to.json

false

true, false

Whether to convert Protobuf Any fields to a JSON string instead of a STRUCT. Applies to from_protobuf.

emit.default.values

false

true, false

Whether to emit fields with zero or default values (proto3 semantics). When false, fields with default values are omitted from the output. Applies to from_protobuf.

enums.as.ints

false

true, false

Whether to render enum fields as integer values instead of strings. Applies to from_protobuf.

upcast.unsigned.ints

false

true, false

Whether to upcast uint32 to Long and uint64 to Decimal(20,0) to prevent integer overflow. Applies to from_protobuf.

unwrap.primitive.wrapper.types

false

true, false

Whether to unwrap google.protobuf wrapper types (for example, Int32Value and StringValue) to their corresponding primitive Spark types. Applies to from_protobuf.

retain.empty.message.types

false

true, false

Whether to retain empty Protobuf message types in the output schema by inserting a dummy column. Applies to from_protobuf.

schema.registry.subject

None

Any string

Schema Registry subject name. Required when using the Schema Registry variants of from_protobuf and to_protobuf.

schema.registry.address

None

A host:port string

Schema Registry address (host and port). Required when using the Schema Registry variants of from_protobuf and to_protobuf.

schema.registry.protobuf.name

None

Any string

Specifies which Protobuf message to use when the schema registry subject contains multiple messages. Optional.

XML

XML functions accept the same options as the corresponding DataFrame options:

Example

The following example writes XML with custom root and row tags:

Python
from pyspark.sql.functions import to_xml

df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))