Spark API options reference
This page lists available input and output options for Spark APIs that read and write data.
DataFrameReader options
Use these options with DataFrameReader.option(), DataFrameReader.options(), read_files, COPY INTO, and Auto Loader to control how Databricks reads data files.
Example
The following example sets multiLine to True for reading JSON files:
- Python
- Scala
- SQL
df = spark.read.format("json").option("multiLine", True).load("/path/to/data")
val df = spark.read.format("json").option("multiLine", "true").load("/path/to/data")
SELECT * FROM read_files("/path/to/data", format => "json", multiLine => true)
Common
The following options apply to all file formats.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Whether to ignore corrupt files. If true, the Spark jobs will continue to run when encountering corrupted files and the contents that have been read will still be returned. For |
|
|
| Whether to ignore missing files. If true, the Spark jobs continue to run when encountering missing files and the contents are still returned. Available in Databricks Runtime 11.3 LTS and above. |
| None | A timestamp string | An optional timestamp as a filter to only ingest files that have a modification timestamp after the specified timestamp. |
| None | A timestamp string | An optional timestamp as a filter to only ingest files that have a modification timestamp before the specified timestamp. |
| None | A glob pattern string | A potential glob pattern for choosing files. Equivalent to |
|
|
| When |
Avro
The following options apply when reading Avro files.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | An Avro schema string | Optional schema specified by a user in Avro format. When reading Avro, this option can be set to an evolved schema that is compatible but different from the actual Avro schema. The deserialization schema is consistent with the evolved schema. For example, if you set an evolved schema containing one additional column with a default value, the read result contains the new column too. |
|
|
| How to handle schema evolution when using a schema registry. |
|
|
| Controls the rebasing of the DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars. |
|
|
| Whether to use stable field names for Avro Union types. When enabled, union type field names are derived from their type names in lowercase (for example, |
|
|
| Whether to infer the schema across multiple files and to merge the schema of each file. |
|
|
| Parser mode for handling corrupt records. |
|
|
| Specifies the case sensitivity behavior when |
| None |
| The maximum recursion depth for recursive Avro fields. Set to |
| None | A column name string | Whether to collect all data that can't be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader.
For more details refer to What is the rescued data column?. |
|
| Any string | The prefix to use for stable union type field names when |
CSV
The following options apply when reading CSV files.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A path string | The path to store files for recording the information about bad CSV records. |
|
| A single character | The character used to escape the character used for escaping quotes. For example, for the following record:
|
|
| A column name string | Supported for Auto Loader. Not supported for |
|
| A single character | Defines the character that represents a line comment when found in the beginning of a line of text. Use |
|
| A date format string | The format for parsing date strings. |
| Empty string | Any string | String representation of an empty value. |
|
|
| Whether to fall back to the legacy date and timestamp parsing behavior when a value cannot be parsed with the specified format. When |
|
| A | The name of the encoding of the CSV files. See |
|
|
| Whether to forcibly apply the specified or inferred schema to the CSV files. If the option is enabled, headers of CSV files are ignored. This option is ignored by default when using Auto Loader to rescue data and allow schema evolution. |
|
| A single character | The escape character to use when parsing the data. |
|
| A file extension string | The expected filename extension for reads. Files without this extension are filtered out. |
|
|
| Whether to fail when the CSV record contains columns not present in the schema. When |
|
|
| Whether to fail when a field value cannot be parsed as the declared schema type without widening. When |
|
|
| Whether the CSV files contain a header. Auto Loader assumes that files have headers when inferring the schema. |
|
|
| Whether to ignore leading whitespaces for each parsed value. |
|
|
| Whether to ignore trailing whitespaces for each parsed value. |
|
|
| Whether to infer the data types of the parsed CSV records or to assume all columns are of |
|
| Positive integers | The buffer size in bytes for the CSV parser. Useful for tuning memory usage when parsing large CSV files. |
| None, which covers | A string | A string between two consecutive CSV records. |
|
| A | A Java locale identified that affects default date, timestamp, and decimal parsing within the CSV. |
|
| Positive integers, or | Maximum number of characters expected from a value to parse. Can be used to avoid memory errors. Defaults to |
|
| Positive integers | The hard limit of how many columns a record can have. |
|
|
| Whether to infer the schema across multiple files and to merge the schema of each file. Enabled by default for Auto Loader when inferring the schema. |
|
|
| Parser mode around handling malformed records. |
|
|
| Whether the CSV records span multiple lines. |
|
| Any string | The string representation of a non-a-number value when parsing |
|
| Any string | The string representation of negative infinity when parsing |
| Empty string | Any string | String representation of a null value. |
|
|
| While reading files, whether to align columns declared in the header with the schema case sensitively. This is |
|
| Any string | The string representation of positive infinity when parsing |
|
|
| Attempts to infer strings as dates instead of timestamp when possible. You must also use schema inference, either by enabling |
|
| A single character | The character used for escaping values where the field delimiter is part of the value. |
|
|
| Specifies the case sensitivity behavior when |
| None | A column name string | Whether to collect all data that can't be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details refer to What is the rescued data column?.
|
|
| A string | The separator string between columns. |
| None | A column name string | When set to a column name, reads the entire CSV record into a single |
|
| Positive integers or | The number of rows from the beginning of the CSV file that should be ignored, including commented and empty rows. If |
|
| A time format string | The format for parsing |
|
| A timestamp format string | The format for parsing timestamp strings. |
|
| A timestamp format string | The format for parsing timestamp without timezone ( |
| None | A | The |
|
|
| The strategy for handling unescaped quotes. The behavior of each allowed option is as follows:
|
Excel
The following options apply when reading Excel files.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A cell range or sheet name string | The cell range to read in Excel syntax. If omitted, reads all valid cells from the first sheet. Use |
|
|
| Number of initial rows to use as column name headers. When |
|
|
| Whether to silently skip files that do not contain the sheet specified by |
|
|
| Whether to include phonetic annotations (such as pinyin or furigana) concatenated to cell string values when reading XLSX files. |
|
|
| The operation to perform on the Excel workbook. |
|
| A timestamp format string | Custom format string for timestamp-without-timezone values stored as strings in Excel. Custom date formats follow the formats at Datetime patterns. |
|
| A date format string | Custom format string for string values read as |
JSON
The following options apply when reading JSON files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Whether to allow backslashes to escape any character that succeeds it. If not enabled, only characters that are explicitly listed by the JSON specification can be escaped. |
|
|
| Whether to allow the use of Java, C, and C++ style comments ( |
|
|
| Whether to allow the set of not-a-number ( |
|
|
| Whether to allow integral numbers to start with additional (ignorable) zeroes (for example, |
|
|
| Whether to allow use of single quotes (apostrophe, character |
|
|
| Whether to allow JSON strings to contain unescaped control characters (ASCII characters with value less than 32, including tab and line feed characters) or not. |
|
|
| Whether to allow use of unquoted field names, which are allowed by JavaScript, but not by the JSON specification. |
| None |
| The encoding used for Variant values in the source JSON. Set to |
| None | A path string | The path to store files for recording the information about bad JSON records. Using the
|
|
| A column name string | The column for storing records that are malformed and cannot be parsed. If the |
|
| A date format string | The format for parsing date strings. |
|
|
| Whether to ignore columns of all null values or empty arrays and structs during schema inference. |
|
| A | The name of the encoding of the JSON files. See |
|
|
| Whether to try and infer timestamp strings as a |
| None, which covers | A string | A string between two consecutive JSON records. |
|
| A | A Java locale identifier that affects default date, timestamp, and decimal parsing within the JSON. |
|
| Positive integers | The maximum allowed nesting depth for JSON objects and arrays. Increase this value for deeply nested documents. |
|
| Positive integers | The maximum length of number tokens in the JSON input. Increase this value for JSON with large numeric literals. |
| unlimited | Positive integers | The maximum length of string values in the JSON input. Set to limit memory usage when parsing JSON with large strings. |
|
|
| Parser mode around handling malformed records. |
|
|
| Whether the JSON records span multiple lines. |
|
|
| Attempts to infer strings as |
|
|
| Whether to infer primitive types like numbers and booleans as |
|
|
| Specifies the case sensitivity behavior when |
| None | A column name string | Whether to collect all data that can't be parsed due to a data type mismatch or schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details, refer to What is the rescued data column?.
|
| None | A column name string | Whether to ingest the entire JSON document, parsed into a single Variant column with the specified string as the column's name. If not set, the JSON fields are ingested into their own columns. |
|
| A timestamp format string | The format for parsing timestamp strings. |
|
| A timestamp format string | The format for parsing timestamp without timezone ( |
| None | A | The |
|
|
| Whether to treat type upgrade exceptions (for example, when a value can't be widened to the declared column type) as bad records rather than throwing an exception. |
Kafka
For the full list of Kafka reader options, see DataStreamReader Kafka options. The following options apply only to batch reads using spark.read.format("kafka").
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Where to stop reading. In the JSON string, |
| None | A JSON timestamp string | Per-partition ending offsets specified as timestamps in milliseconds. For example: |
| None | Positive integers or | Global ending timestamp in milliseconds applied to all partitions. |
ORC
The following options apply when reading ORC files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Whether to infer the schema across multiple files and to merge the schema of each file. |
Parquet
The following options apply when reading Parquet files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Controls the rebasing of the DATE and TIMESTAMP values between Julian and Proleptic Gregorian calendars. |
|
|
| Controls the rebasing of the INT96 timestamp values between Julian and Proleptic Gregorian calendars. |
|
|
| Whether to infer the schema across multiple files and to merge the schema of each file. |
|
|
| Specifies the case sensitivity behavior when |
| None | A column name string | Whether to collect all data that can't be parsed due to: a data type mismatch, and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details refer to What is the rescued data column?.
|
State store
Use these options with spark.read.format("statestore") or the read_statestore table-valued function to read Structured Streaming state data. See Read Structured Streaming state information.
Key | Default | Valid values | Description |
|---|---|---|---|
| Latest batch ID | Positive integers or | The target batch to read from. Use to query an earlier state of the query. The batch must be committed but not yet cleaned up. |
|
| Positive integers or | The target operator to read from. Use when the query has multiple stateful operators. |
|
| Any string | The target state store name to read from. Use when the stateful operator has multiple state store instances. You must specify either |
| None |
| The target side to read from for a stream-stream join. You must specify either |
| None | Positive integers or | The batch ID of the snapshot to use as the starting point when reading state. The reader rebuilds state by replaying changes from this snapshot until |
| None | Positive integers or | If specified, the query only reads this partition. Must specify together with |
|
|
| When For details, see Read Structured Streaming state changes. |
| None | Positive integers or | The starting batch ID for the change feed range. Required when |
| Latest batch ID | Positive integers or | The ending batch ID for the change feed range. Must be greater than or equal to |
| None | Any string | The state variable name to read. The state variable name is the unique name of each variable within the |
|
|
| When |
|
|
| When |
Text
The following options apply when reading text files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
| A | The name of the encoding of the TEXT file line separator. The content of the file is not affected by this option and is read as-is. |
| None, which covers | A string | A string between two consecutive TEXT records. |
|
|
| Whether to read a file as a single record. |
XML
The following options apply when reading XML files.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The row tag of the XML files to treat as a row. In the example XML |
|
|
| Defines a fraction of rows used for schema inference. XML built-in functions ignore this option. |
|
|
| Whether to exclude attributes in elements. |
| None |
| Mode for dealing with corrupt records during parsing.
|
|
|
| If |
|
| A column name string | Allows renaming the new field that contains a malformed string created by |
| None | Any string | The prefix for attributes to differentiate attributes from elements. This will be the prefix for field names. Default is |
|
| Any string | The tag used for the character data within elements that also have attribute(s) or child element(s) elements. User can specify the |
|
| A | For reading, decodes the XML files by the given encoding type. For writing, specifies encoding (charset) of saved XML files. XML built-in functions ignore this option. Also applies to DataFrameWriter XML options. |
|
|
| Whether white spaces surrounding values must be skipped. Whitespace-only character data are ignored. |
| None | A file path string | Path to an optional XSD file that is used to validate the XML for each row individually. Rows that fail to validate are treated like parse errors. The XSD does not otherwise affect the schema, whether specified or inferred. |
|
|
| If |
|
| A timestamp format string | Custom timestamp format string that follows the datetime pattern format. This applies to |
|
| A timestamp format string | Custom format string for timestamp without timezone that follows the datetime pattern format. This applies to TimestampNTZType type. Also applies to DataFrameWriter XML options. |
|
| A date format string | Custom date format string that follows the datetime pattern format. This applies to date type. Also applies to DataFrameWriter XML options. |
|
| An IETF BCP 47 language tag | Sets a locale as a language tag in IETF BCP 47 format. For instance, |
| string | Any string | Sets the string representation of a null value. When this is |
|
|
| Specifies the case sensitivity behavior when rescuedDataColumn is enabled. If true, rescue the data columns whose names differ by case from the schema. When false, read the data in a case-insensitive manner. |
| None | A column name string | Whether to collect all data that can't be parsed due to a data type mismatch and schema mismatch (including column casing) to a separate column. This column is included by default when using Auto Loader. For more details, see What is the rescued data column?. |
|
| A column name string | Specifies the name of the single variant column. If this option is specified for reading, parse the entire XML record into a single Variant column with the given option string value as the column's name. If this option is specified for writing, write the value of the single Variant column to XML files. Also applies to DataFrameWriter XML options. |
|
|
| Whether to use the legacy XML parser. The legacy parser has less stringent validation for malformed content but is less memory-efficient. Set to |
|
| A column name string | The column name used to capture XML elements that match the wildcard ( |
DataStreamReader options
Use these options with DataStreamReader.option() to configure streaming reads from Delta Lake tables and other file-based sources.
For file format options (JSON, CSV, Parquet, and others), see DataFrameReader options.
For Auto Loader (cloudFiles.*) options, see Auto Loader.
Example
The following example sets maxFilesPerTrigger to 10 for a Delta Lake table stream:
- Python
- Scala
df = spark.readStream.format("delta").option("maxFilesPerTrigger", 10).load("/path/to/delta-table")
val df = spark.readStream.format("delta").option("maxFilesPerTrigger", "10").load("/path/to/delta-table")
Common
The following options apply to Delta Lake tables and other file-based streaming sources.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| How to handle source files after they are processed by the stream. |
|
|
| Whether to identify already-processed files by filename only rather than by full path. When |
|
|
| Whether to process the most recently modified files first within each micro-batch. Useful when you want to process the latest data as quickly as possible. When |
| None | Positive integers | Soft maximum for the amount of data processed for each micro-batch. A batch may process more than the limit if the smallest input unit exceeds it. When used together with For Auto Loader, use |
|
| Positive integers or | Maximum number of unprocessed files to cache for subsequent micro-batches. Set to |
|
| A duration string such as | Maximum age of files considered for processing, relative to the timestamp of the most recently modified file rather than the current system time. Files older than this threshold are ignored. Ignored when |
|
| Positive integers | Upper bound for the number of new files processed in each micro-batch. When used together with For Auto Loader, use |
| None | A path string | Path to the archive directory when |
Auto Loader
Use these options with the cloudFiles source to configure Auto Loader for streaming ingestion from cloud storage. Options specific to the cloudFiles source are prefixed with cloudFiles to keep them in a separate namespace from other Structured Streaming source options.
Common
The following options apply to all Auto Loader configurations.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Whether to allow input directory file changes to overwrite existing data. For configuration caveats, see Does Auto Loader process the file again when the file gets appended or overwritten?. |
| None | A duration string such as | Auto Loader can trigger asynchronous backfills at a given interval. For more information, see Trigger regular backfills using cloudFiles.backfillInterval. Do not use when |
|
|
| Whether to automatically delete or move processed files from the input directory. When set to When set to When set to A file is considered processed when it has a non-null value for Review the following considerations before enabling
Available in Databricks Runtime 16.4 and above. |
|
| A CalendarInterval string such as | Amount of time to wait before processed files become candidates for archival with Available in Databricks Runtime 16.4 and above. |
| None | A cloud storage or Unity Catalog volume path | Path to archive processed files to when The move location must:
Auto Loader must have write permissions to this directory. Available in Databricks Runtime 16.4 and above. |
| None (required option) |
| The data file format in the source path. Valid values include:
|
|
|
| Whether to include existing files in the stream processing input path or to only process new files arriving after initial setup. This option is evaluated only when you start a stream for the first time. Changing this option after restarting the stream has no effect. |
|
|
| Whether to infer exact column types when leveraging schema inference. By default, columns are inferred as strings when inferring JSON and CSV datasets. See schema inference for more details. |
| None | A byte string such as | The maximum number of new bytes to be processed in every trigger. This is a soft maximum. If you have files that are 3 GB each, Databricks processes 12 GB in a microbatch. When used together with In Databricks Runtime 18.0 and above, this option is dynamically configured and does not need to be set manually. |
| None | A duration string | How long a file event is tracked for deduplication purposes. Databricks does not recommend tuning this parameter unless you are ingesting data at the order of millions of files an hour. See the section on File event tracking for more details. Tuning |
|
| Positive integers | The maximum number of new files to be processed in every trigger. When used together with In Databricks Runtime 18.0 and above, this option is dynamically configured and does not need to be set manually. |
| None | A comma-separated list of column names | A comma-separated list of Hive-style partition columns that you would like inferred from the directory structure of the files. Hive-style partition columns are key-value pairs combined by an equality sign such as
Specifying
|
|
|
| The mode for evolving the schema as new columns are discovered in the data. By default, columns are inferred as strings when inferring JSON datasets. See schema evolution for more details. |
| None | A schema string | Schema information that you specify to Auto Loader during schema inference. See schema hints for more details. |
| None (required to infer the schema) | A path string | The location to store the inferred schema and subsequent changes. See schema inference for more details. |
|
|
| Whether to use a strict globber that matches the default globbing behavior of other file sources in Apache Spark. See Common data loading patterns for more details. Available in Databricks Runtime 12.2 LTS and above. |
|
|
| Whether to validate Auto Loader options and return an error for unknown or inconsistent options. |
Directory listing
The following option applies when using directory listing mode.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| This feature has been deprecated. Databricks recommends using file notification mode with file events instead of Whether to use the incremental listing rather than the full listing in directory listing mode. By default, Auto Loader makes the best effort to automatically detect if a given directory is applicable for the incremental listing. You can explicitly use the incremental listing or use the full directory listing by setting it as Incorrectly enabling incremental listing on a non-lexically ordered directory prevents Auto Loader from discovering new files. Works with Azure Data Lake Storage ( Available in Databricks Runtime 9.1 LTS and above. |
File notification
For information about configuring file notification mode, including required cloud permissions, setup instructions, and authentication methods, see Configure Auto Loader streams in file notification mode.
Key | Default | Valid values | Description |
|---|---|---|---|
|
| Positive integers | Number of threads to use when fetching messages from the queueing service. Do not use when |
| None | A JSON map string | Required only if you specify a Do not use when |
| None | Key-value tag strings | A series of key-value tag pairs to help associate and identify related resources, for example:
Do not use when For more information, see Cloud provider resource tags. |
|
|
| When set to File events provide notifications-level performance in file discovery, because Auto Loader can discover new files after the last run. Unlike directory listing, this process does not need to list all files in the directory. There are some situations when Auto Loader uses directory listing even though the file events option is enabled:
See When does Auto Loader with file events use directory listing? for a comprehensive list of situations when Auto Loader uses directory listing with this option. Available in Databricks Runtime 14.3 LTS and above. |
|
|
| When set to |
|
|
| Whether to use file notification mode to determine when there are new files. If Do not use when |
Cloud provider resource tags
Auto Loader adds the following key-value tag pairs by default on a best-effort basis:
vendor:Databrickspath: The location from where the data is loaded. Unavailable in GCP due to labeling limitations.checkpointLocation: The location of the stream's checkpoint. Unavailable in GCP due to labeling limitations.streamId: A globally unique identifier for the stream.
Databricks reserves these key names, and you cannot overwrite their values.
For more information on AWS, see Amazon SQS cost allocation tags and Configuring tags for an Amazon SNS topic.
Cloud-specific
Auto Loader has options for configuring cloud infrastructure for file notification mode. For required cloud permissions and setup instructions, see Configure Auto Loader streams in file notification mode.
AWS
Specify the following options only if you choose cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:
Key | Default | Valid values | Description |
|---|---|---|---|
| The region of the EC2 instance | An AWS region string | The region where the source S3 bucket resides and where you want to create the AWS SNS and SQS services. |
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Only allow event notifications from AWS S3 buckets in the same account as the SNS topic. When true, Auto Loader only accepts event notifications from AWS S3 buckets in the same account as the SNS topic. When Available in Databricks Runtime 17.2 and above. |
Specify the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A URL string | The URL of the SQS queue. If specified, Auto Loader directly consumes events from this queue instead of setting up its own AWS SNS and SQS services. |
AWS authentication options
Specify the following authentication option to use a Databricks service credential:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The name of your Databricks service credential. Available in Databricks Runtime 16.1 and above. |
When Databricks service credentials or IAM roles are not available, you can specify the following authentication options instead:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The AWS access key ID for the user. Must be specified with |
| None | Any string | The AWS secret access key for the user. Must be specified with |
| None | An ARN string | The ARN of an IAM role to assume, if needed. The role can be assumed from your cluster's instance profile or by providing credentials with |
| None | Any string | An identifier to use while assuming a role using |
| None | Any string | An optional session name to use while assuming a role using |
| None | A URL string | An optional endpoint to use for accessing AWS STS when assuming a role using |
Azure
You must specify values for all of the following options if you specify cloudFiles.useNotifications = true and you want Auto Loader to set up the notification services for you:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The Azure Resource Group in which the storage account is created. |
| None | Any string | The Azure Subscription ID in which the resource group is created. |
| None | Any string | The name of your Databricks service credential. Available in Databricks Runtime 16.1 and above. |
If a Databricks service credential is not available, you can specify the following authentication options instead:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The client ID or application ID of the Databricks service principal. |
| None | Any string | The client secret of the Databricks service principal. |
| None | A connection string | The connection string for the storage account, based on either account access key or shared access signature (SAS). |
| None | Any string | The Azure Tenant ID in which the Databricks service principal is created. |
Specify the following option only if you set cloudFiles.useNotifications = true and you want Auto Loader to use an existing queue:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The name of the Azure queue. If specified, the cloud files source directly consumes events from this queue instead of setting up its own Azure Event Grid and Queue Storage services. In that case, your |
GCP
Auto Loader can automatically set up notification services for you by leveraging Databricks service credentials. The service account created with the Databricks service credential will require the permissions specified in Configure Auto Loader streams in file notification mode.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The ID of the project that the GCS bucket is in. The Google Cloud Pub/Sub subscription is also created within this project. |
| None | Any string | The name of your Databricks service credential. Available in Databricks Runtime 16.1 and above. |
If a Databricks service credential is not available, you can use Google Service Accounts directly. You can either configure your cluster to assume a service account by following Google service setup or specify the following authentication options:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The client ID of the Google Service Account. |
| None | An email address string | The email of the Google Service Account. |
| None | A private key string | The private key that's generated for the Google Service Account. |
| None | Any string | The ID of the private key that's generated for the Google Service Account. |
Specify the following option only if you choose cloudFiles.useNotifications = true and you want Auto Loader to use a queue that you have already set up:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The name of the Google Cloud Pub/Sub subscription. If specified, the cloud files source consumes events from this queue instead of setting up its own GCS Notification and Google Cloud Pub/Sub services. |
Delta Lake
The following options apply when reading from a Delta Lake table using spark.readStream.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A version number or | Set to a Delta table version number or |
| None | A version number or | Set to a Delta table version number or |
| None | A version number or | Set to a Delta table version number or |
| None | A Java regex string | A regular expression pattern. Files whose paths match the pattern are excluded from the streaming read. Useful for filtering out files that do not conform to the expected naming convention. |
|
|
| Whether to fail the streaming query if source data has been deleted due to log retention ( |
|
|
| Available in Databricks Runtime 11.3 LTS and lower. Re-emits rewritten data files after modification operations such as |
|
|
| Ignores transactions that delete data at partition boundaries (full partition drops only). Does not handle non-partition deletes, updates, or other modifications. Use |
|
|
| Whether to enable reading the change data feed for the streaming query. When enabled, the stream emits row-level changes (inserts, updates, and deletes) with additional metadata columns. See Use Delta Lake change data feed on Databricks. |
| None | A path string | Path to a directory where Delta Lake tracks schema changes for the streaming read. Required when streaming from tables with column mapping enabled and using |
|
|
| Ignores transactions that delete or modify existing records and processes only appends. Databricks recommends this option for most workloads that do not use change data feeds. Available in Databricks Runtime 12.2 LTS and above. See Skip upstream change commits with |
| Latest available | A timestamp string such as | Timestamp to start reading from. The stream reads all table changes committed at or after the specified timestamp. If the timestamp precedes all available table commits, the stream starts from the earliest available commit. Cannot be used together with |
| Latest available | A positive integer, | Delta table version to start reading from. The stream reads all changes committed at or after the specified version. Specify |
|
|
| Divides the initial table snapshot into event time buckets to prevent records from being incorrectly marked as late events and dropped in stateful queries with watermarks. Cannot be changed after initial snapshot processing has begun without deleting the checkpoint. Available in Databricks Runtime 11.3 LTS and above. See Process initial snapshot without dropping data. |
Kafka
Use these options with either spark.readStream.format("kafka") or spark.read.format("kafka"):
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A JSON string such as | The specific partitions to consume. You must specify exactly one of the |
|
|
| Whether to fail the query if data might have been lost, for example, due to deleted topics or offset truncation. Set to Databricks estimates conservatively whether data might have been lost. However, this might cause false alarms. |
|
| Positive integers or | The number of retries when fetching Kafka offsets fails. |
|
| Positive integers or | The interval in milliseconds between offset fetch retries. |
|
| Any string | The customized prefix to use for the auto-generated Kafka consumer group ID. If |
| None | Any string | The Kafka consumer group ID to use when reading. Use with caution: queries sharing the same group ID interfere with each other and might read only partial data. This can occur when running concurrent batch and streaming workloads, or when restarting queries quickly. If set, |
|
|
| Whether to include Kafka message headers as a column in the output. |
| None | Positive integers | The timeout in milliseconds for the Kafka consumer |
| None | A comma-separated list of | A comma-separated list of host:port addresses for Kafka brokers. Sets the Kafka client's If you find there is no data from Kafka, check this broker address list for incorrect addresses. If the broker address list is incorrect, there might not be any errors. Kafka clients assume the brokers will be available eventually and retry forever when they receive network errors. |
| None | Positive integers | The maximum number of records for each Spark partition. When set, the connector splits Kafka partitions so that each Spark partition reads at most this many records. You can also use this option with |
| None | Positive integers | The minimum number of Spark partitions to read from Kafka. When set, the connector splits large Kafka partitions to increase parallelism. When not set, Spark creates one partition for each Kafka topic-partition. Useful for handling data skew or peak loads. This option reinitializes Kafka consumers for each trigger, which might affect performance with SSL. |
|
|
| The offset that the query begins the read from. In the JSON string, For streaming queries, this option only applies when a new query starts. Resumed queries always use the checkpoint. During a query, new partitions start reading at the earliest offset. For batch queries, |
| None | A JSON timestamp string such as | A list of starting offsets for each partition, specified as timestamps in milliseconds. When no offset exists for a timestamp, query behavior is determined by For streaming queries, this option only applies when a new query starts. Resumed queries always use the checkpoint. During a query, new partitions start reading at the earliest offset. |
|
|
| The strategy to use when no offset is found for a timestamp specified in |
| None | Positive integers or | The global starting timestamp in milliseconds that applies to all partitions. When no offset exists for the timestamp, behavior is controlled by |
| None | A comma-separated list of topic names | The topics to subscribe to. You must specify exactly one of the |
| None | A Java regex string | The pattern used to subscribe to topics. You must specify exactly one of the |
The following options apply only to streaming reads with spark.readStream.format("kafka"):
Key | Default | Valid values | Description |
|---|---|---|---|
|
| Duration strings such as | The time window used to estimate remaining bytes for the |
| None | Positive integers | The maximum number of offsets to process per trigger interval. Offsets are distributed proportionally across topic partitions. |
|
| Duration strings such as | The maximum time to wait for |
| None | Positive integers | The minimum number of offsets to accumulate before triggering a micro-batch. When |
For offset options that apply only to batch reads with spark.read.format("kafka"), see DataFrameReader Kafka options.
Authentication
Databricks recommends using a Unity Catalog service credential to authenticate to cloud-managed Kafka services (AWS MSK, Azure Event Hubs, or Google Cloud Managed Kafka).
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The name of a Unity Catalog service credential for authenticating to cloud-managed Kafka services. Available in Databricks Runtime 16.1 and above. |
| None | Any string | The OAuth scope for the service credential. Set this only when Databricks cannot automatically infer the scope for your Kafka service. |
When a service credential is not available, use SASL/SSL options (passed through as kafka.* properties). When you use a service credential, you don't need to specify kafka.sasl.mechanism, kafka.sasl.jaas.config, or kafka.security.protocol.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A security protocol string, such as | The security protocol for broker communication. |
| None | A SASL mechanism string, such as | The SASL mechanism. |
| None | A JAAS configuration string | The JAAS login configuration string. |
| None | A fully qualified class name | The fully qualified class name of a login callback handler for SASL authentication. |
| None | A fully qualified class name | The fully qualified class name of a client callback handler for SASL authentication. |
| None | A file path string | The path to the SSL trust store file. |
| None | Any string | The password for the SSL trust store file. |
| None | A file path string | The path to the SSL key store file. |
| None | Any string | The password for the SSL key store file. |
For complete authentication setup instructions, see Authentication.
Kinesis
Use these options with spark.readStream.format("kinesis") to read from Amazon Kinesis Data Streams. You must specify either streamName or streamARN, but not both.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A comma-separated list of stream names | A comma-separated list of Kinesis stream names to subscribe to. |
| None | A comma-separated list of Kinesis stream ARNs | A comma-separated list of Kinesis stream ARNs. For example, |
The following options are also available:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | The AWS access key ID. Must be specified with |
| None | Any string | The AWS secret access key corresponding to |
|
| Positive integers | The approximate target block size in bytes after coalescing. |
|
| Positive integers | The threshold at which the automatic coalesce occurs. If the average block size is less than this value, pre-fetched blocks are coalesced to the |
|
|
| The consumer type. |
| Streaming query ID | A single consumer name or a comma-separated list matching the number of streams | The consumer name used to register the query with the Kinesis service in EFO mode. Available in Databricks Runtime 11.3 LTS and above. |
|
| Any string | The prefix prepended to |
|
| A duration string such as | The interval at which the EFO consumer registration is checked and refreshed. Available in Databricks Runtime 11.3 LTS and above. |
| Locally resolved region | Any string | The regional endpoint for Kinesis Data Streams. |
|
| A byte string such as | The amount of data to buffer for the next trigger. This is a stopping condition, not a strict upper bound. More data might be buffered than specified. |
|
|
| Where to start reading from in the stream. For |
|
| A duration string such as | The duration to buffer prefetched data before making it available for processing. |
|
| Positive decimals | The maximum data prefetch rate per shard in MB/s. This rate limits fetches to avoid Kinesis throttling. Kinesis allows a maximum of |
|
| Positive integers | The number of records to read per Kinesis API request. The number of records returned might be higher if sub-records were aggregated using the Kinesis Producer Library. |
|
| Positive integers up to | The maximum number of shards to read per API call when listing shards. |
|
| A duration string such as | The minimum duration to wait between consecutive prefetch attempts. This limits fetch frequency to avoid Kinesis throttling. |
| Locally resolved region | Any string | The region the streams are defined in. |
| None | A comma-separated list of consumer names or ARNs | A comma-separated list of identifiers for existing EFO consumers. Available in Databricks Runtime 16.1 and above. |
| None |
| Whether the identifiers in |
|
|
| Whether to de-register the enhanced fan-out consumer on query termination. Requires |
| None | An ARN string | The ARN of an IAM role to assume when accessing Kinesis. |
| None | Any string | An optional external ID to use when assuming the role specified by |
| None | Any string | An identifier for the assumed role session. Uniquely identifies a session when the same role is assumed by different principals. |
| None | Any string | The name of your Databricks service credential for authenticating to Kinesis. Available in Databricks Runtime 16.1 and above. |
| None | A URL string | A custom endpoint for AWS STS when assuming a role using |
|
| A duration string such as | The interval at which to poll Kinesis for resharding events. |
|
| Positive integers | The number of Kinesis shards to prefetch in parallel per Spark task. For minimum latency, ensure |
For more information on reading from Kinesis, see Connect to Amazon Kinesis.
Pub/Sub
Use these options with spark.readStream.format("pubsub") to subscribe to Google Pub/Sub. The options subscriptionId, topicId, and projectId are required.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | Any string | Required. The Pub/Sub subscription ID. The connector creates the subscription if it does not exist. |
| None | Any string | Required. The Pub/Sub topic ID. |
| None | Any string | Required. The Google Cloud project ID. |
| Half the number of executors available at stream initialization | Positive integers | The number of parallel Spark tasks that fetch rows from the subscription. |
| None | Positive integers | A soft limit on the number of bytes to process per micro-batch. |
|
| Positive integers | The number of rows to fetch per task before processing. |
|
| A duration string such as | The time duration for each task to fetch before processing rows. Databricks recommends using the default value. |
|
|
| When |
| None | Any string | The name of a Databricks service credential for authenticating to Pub/Sub. Available in Databricks Runtime 16.1 and above. |
| None | An email address string | The email address of the Google Service Account. Required when not using a service credential. |
| None | Any string | The client ID of the Google Service Account. Required when not using a service credential. |
| None | A private key string | The private key for the Google Service Account. Required when not using a service credential. |
| None | Any string | The private key ID for the Google Service Account. Required when not using a service credential. |
For more information on Pub/Sub, see Subscribe to Google Pub/Sub.
Pulsar
Use these options with spark.readStream.format("pulsar") to stream from Apache Pulsar. Available in Databricks Runtime 14.1 and above.
The following options are required. You must specify exactly one of topic, topics, or topicsPattern.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A Pulsar service URL string | The Pulsar |
| None | Any string | A single topic name to consume. |
| None | A comma-separated list of topic names | A comma-separated list of topic names to consume. |
| None | A Java regex string | A Java regex string to match topic names. |
The following options are also supported:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A URL string | The Pulsar admin service HTTP URL. Required when |
|
|
| If multiple topics with different schemas are read, use this option to turn off automatic schema-based topic value deserialization. Only the raw values are returned when this is |
|
|
| Whether to fail the query when data is lost. For example, data loss might occur when topics are deleted or messages expire due to retention policy. |
| None | Positive integers | A soft limit on the number of bytes to process per micro-batch. Requires |
|
| Positive integers | The timeout for reading messages from Pulsar in milliseconds. |
| None | Any string | The predefined subscription name used by the connector to track Spark application progress. |
|
|
| Where to start reading from. |
| None | Any string | The prefix used by the connector to generate a random subscription to track Spark application progress. |
|
|
| Whether the connector waits until the desired topics are created. |
You can specify additional Pulsar client, admin, and reader configurations using the following option patterns:
Pattern | Configuration options |
|---|---|
| |
| Pulsar client configuration, including authentication options such as |
|
For more information on Pulsar client and admin authentication options, see Authentication.
Authentication
Databricks supports truststore and keystore authentication to Pulsar. Databricks recommends using secrets to store authentication details. See Secret management.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A fully qualified class name | The fully qualified class name of the authentication plugin. For example, |
| None | A credential string | Authentication credentials passed to the authentication plugin as a string. For example, |
|
|
| When |
| None | Any string | The format of the TLS trust store file. For example, |
| None | A file path string | The path to the TLS trust store file containing trusted CA certificates. Required when |
| None | Any string | The password for the TLS trust store file. |
If the stream uses a PulsarAdmin, you can also set the following options:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A fully qualified class name | The fully qualified class name of the authentication plugin for the Pulsar admin client. |
| None | A credential string | Authentication credentials for the Pulsar admin client authentication plugin. |
| None |
| Whether to use TLS for the Pulsar admin client connection. |
| None |
| Whether to allow insecure TLS connections for the Pulsar admin client. |
| None | A file path string | Path to the trusted TLS certificate file for the Pulsar admin client. |
| None |
| Whether to use KeyStore-based TLS for the Pulsar admin client. |
| None | Any string | The format of the TLS trust store for the Pulsar admin client. For example, |
| None | A file path string | Path to the TLS trust store file for the Pulsar admin client. Required when |
| None | Any string | Password for the Pulsar admin client TLS trust store. |
For authentication examples, see Authenticate to Pulsar.
DataFrameWriter options
Use these options with DataFrameWriter.option() and DataFrameWriterV2.option() to control how Databricks writes data.
Example
The following example sets mergeSchema to True for writing a Delta Lake table:
- Python
- Scala
df.write.format("delta").option("mergeSchema", True).saveAsTable("my_table")
df.write.format("delta").option("mergeSchema", "true").saveAsTable("my_table")
Avro
The following options apply when writing Avro files.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A JSON schema string | The full Avro schema as a JSON string. Use this option to convert Spark SQL types to specific Avro types. Applies to Avro file. |
| None | A URL string | A URL pointing to an Avro schema file. Use instead of |
|
|
| Compression codec to use when writing. Applies to Avro file. |
|
| Any string | The top-level record name in the output Avro schema. Applies to Avro file. |
|
|
| Whether to match columns between the Spark schema and the Avro schema by field position instead of by name. Applies to Avro file. |
| Empty string | Any string | The namespace for the top-level record in the output Avro schema. Applies to Avro file. |
Delta Lake and Apache Iceberg
The following options apply when writing Delta Lake and Apache Iceberg tables.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Whether to enable automatic liquid clustering, where Databricks selects clustering columns based on query patterns. Only valid with |
| None |
| Whether to enable schema evolution for the write operation. New columns in the source DataFrame are added to the target table schema. Applies to batch and streaming appends. Applies to Update table schema. |
| None |
| Whether to replace the table schema and partitioning when overwriting. Requires |
| None |
| The partition overwrite mode. Set this to |
| None | A boolean expression string | A boolean expression that matches rows in the target table to replace with rows from the source query. Can reference columns from both the target table and the source query. Rows in the target that match a source row are deleted and replaced. If the source is empty, no deletions occur. Use |
| None | A comma-separated list of column names | A comma-separated list of column names used to match rows between the target table and the source query. Both the target and the source must contain all listed columns. Rows in the target that match a source row under equality comparison are deleted and replaced. |
| None | A predicate expression string | A predicate expression. Atomically overwrites only the records that match the predicate. Applies to Selectively overwrite data with Delta Lake. |
| None | Any string | A string alias for the target table. Use with |
| None | Any string | A unique string identifying the application for idempotent writes in |
| None | A monotonically increasing integer | A monotonically increasing number used as the transaction version for idempotent writes in |
| None |
| Whether to enable Auto Optimize Write for this write operation. Overrides the |
| None | Any string | A user-defined string appended to the commit metadata for the write operation. Visible in the output of |
CSV
The following options apply when writing CSV files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
| A single character | The character used to escape the escape character when it differs from the quote character. Applies to csv (DataFrameWriter). |
|
|
| Compression codec to use when writing. Applies to csv (DataFrameWriter). |
|
| A date format string | Format string for date column values. Applies to csv (DataFrameWriter). |
| Empty string | Any string | The string written for empty (non-null) values. Applies to csv (DataFrameWriter). |
|
| A | The character encoding for the output files. Applies to csv (DataFrameWriter). |
|
| A single character | The character used to escape quoted values. Applies to csv (DataFrameWriter). |
|
|
| Whether to escape quote characters inside quoted field values. Applies to csv (DataFrameWriter). |
|
|
| Whether to write column names as the first line of the output. Applies to csv (DataFrameWriter). |
|
|
| Whether to trim leading whitespace from values when writing. Applies to csv (DataFrameWriter). |
|
|
| Whether to trim trailing whitespace from values when writing. Applies to csv (DataFrameWriter). |
|
| A string | The line separator string used between records. Applies to csv (DataFrameWriter). |
|
| A | A |
| Empty string | Any string | String written for null values. Applies to csv (DataFrameWriter). |
|
| A single character | The character used to quote field values that contain the separator. Applies to csv (DataFrameWriter). |
|
|
| Whether to enclose all field values in quotes regardless of content. Applies to csv (DataFrameWriter). |
|
| A string | The field delimiter character. Applies to csv (DataFrameWriter). |
|
| A timestamp format string | The format string for timestamp column values. Applies to csv (DataFrameWriter). |
|
| A timestamp format string | Format string for timestamp without timezone ( |
Excel
The following options apply when writing Excel files.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A sheet name or cell reference string | The sheet name or starting cell for the write. If omitted, writes to a sheet named |
|
| An Excel date format string | Excel cell format string applied to |
|
|
| Whether to write column names as the first row. |
|
| An Excel timestamp format string | Excel cell format string applied to |
|
|
| The Excel file format version to write. |
JSON
The following options apply when writing JSON files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Compression codec to use when writing. Applies to json (DataFrameWriter). |
|
| A date format string | Format string for date column values. Applies to json (DataFrameWriter). |
|
| A | The character encoding for the output files. Applies to json (DataFrameWriter). |
| value of |
| Whether to omit fields with null values from the JSON output. Applies to json (DataFrameWriter). |
|
| A string | The line separator string used between records. Applies to json (DataFrameWriter). |
|
| A | A Java locale identifier that affects default date, timestamp, and decimal parsing within the JSON. |
|
|
| Whether to enable pretty (indented, multiline) JSON output. |
|
|
| Whether to sort the keys of JSON objects alphabetically in the output. Useful for producing deterministic output. |
|
| A timestamp format string | The format string for timestamp column values. Applies to json (DataFrameWriter). |
|
| A timestamp format string | Format string for timestamp without timezone ( |
|
|
| Whether to encode non-ASCII characters as |
ORC
The following options apply when writing ORC files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Compression codec to use when writing. Applies to orc (DataFrameWriter). |
Parquet
The following options apply when writing Parquet files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Compression codec to use when writing. Applies to parquet (DataFrameWriter). |
|
|
| The physical type used to encode timestamp columns. Use |
Text
The following options apply when writing text files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Compression codec to use when writing. Applies to text (DataFrameWriter). |
|
| A | The character encoding for the output files. |
|
| A string | The line separator string used between records. Applies to text (DataFrameWriter). |
XML
The following options apply when writing XML files.
Key | Default | Valid values | Description |
|---|---|---|---|
|
| Any string | The element name for array elements that have no explicit name. Applies to xml (DataFrameWriter). |
|
| Any string | The prefix prepended to field names that correspond to XML attributes. Applies to xml (DataFrameWriter). |
|
|
| Compression codec to use when writing. Applies to xml (DataFrameWriter). |
|
| A date format string | Format string for date column values. Applies to xml (DataFrameWriter). |
|
| An XML declaration string, or empty string to suppress | The XML declaration string written at the top of each output file. Set to an empty string to suppress the declaration. Applies to xml (DataFrameWriter). |
|
| A | The character encoding for the output files. Applies to xml (DataFrameWriter). |
| 4 spaces | Any string | The string used to indent child elements in the output. Set to an empty string to turn off indentation and write each row on a single line. |
|
| A | A Java locale identifier that affects default date, timestamp, and decimal formatting within the XML. |
|
| Any string | The string written for null values. When set to |
|
| Any string | The root element tag that wraps all row elements in the output. Applies to xml (DataFrameWriter). |
|
| Any string | The element tag that represents a row in the output. Applies to xml (DataFrameWriter). |
| None | A column name string | The name of the single Variant column to write to XML files. Applies to xml (DataFrameWriter). |
|
| A timestamp format string | The format string for timestamp column values. Applies to xml (DataFrameWriter). |
|
| A timestamp format string | Format string for timestamp without timezone column values. Applies to xml (DataFrameWriter). |
|
|
| Whether to throw an exception if a column name is not a valid XML element identifier. Applies to xml (DataFrameWriter). |
|
| Any string | The field name used for character data in XML elements that also have attributes or child elements. Applies to xml (DataFrameWriter). |
DataStreamWriter options
Use these options with DataStreamWriter.option() to configure streaming writes.
Example
The following example sets the checkpoint location for a stream:
- Python
- Scala
(df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table"))
df.writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint")
.start("/path/to/table")
Common
The following options apply to all streaming write operations.
Key | Default | Valid values | Description |
|---|---|---|---|
| None (required) | A path string | Path to the checkpoint directory for the streaming query. Required for fault tolerance and exactly-once processing guarantees. Each streaming query must use a unique checkpoint location. Databricks recommends storing checkpoints in a Unity Catalog volume or cloud storage path. See Structured Streaming checkpoints. |
| None | A path string | Output path for file-based streaming sinks such as Parquet. Applies to file-based formats only. |
Console sink
The following options apply when writing streams to the console sink.
Key | Default | Valid values | Description |
|---|---|---|---|
|
| Positive integers | The number of rows to display for each micro-batch when writing to the console sink. |
|
|
| Whether to truncate long strings when displaying rows. Set to |
Delta Lake
The following options apply when writing a stream to a Delta Lake table using format("delta"). Overwrite-only options such as overwriteSchema, replaceWhere, and partitionOverwriteMode are not supported for streaming writes.
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| Whether to evolve the Delta Lake table schema when the streaming DataFrame contains new columns. Applies to append output mode only. Applies to Update table schema. |
| None | Any string | A user-defined string appended to the commit metadata for the write operation. Visible in the output of |
File sink
The following option applies when writing a stream to file-based formats (Parquet, JSON, CSV, ORC, text). For format-specific options, see DataFrameWriter options.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A time string such as | How long to retain sink metadata files used for fault tolerance and compaction. When not set, metadata files are retained indefinitely. |
Kafka sink
The following options apply when writing to Kafka.
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A comma-separated list of | Required. A comma-separated list of Kafka broker |
| None | Any string | The target Kafka topic for all rows. Required if the DataFrame does not include a |
| None | Any Kafka producer configuration value | Any Kafka producer configuration prefixed with |
Memory sink
The following options apply when writing streams to the memory sink.
Key | Default | Valid values | Description |
|---|---|---|---|
| None (required) | Any string | The name of the in-memory table that the query writes to. Required for the memory sink. Also configurable via |
|
|
| Delivery guarantee for the memory sink. |
Spark function options
Some Spark SQL built-in functions accept an options map that controls parsing or serialization behavior. Pass options as a Python dict or a Scala Map[String, String].
Example
The following example parses a JSON column while dropping malformed records:
- Python
- Scala
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("name", StringType())])
df = df.withColumn("parsed", from_json("json_col", schema, {"mode": "DROPMALFORMED"}))
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("name", StringType)))
val df = df.withColumn("parsed", from_json(col("json_col"), schema, Map("mode" -> "DROPMALFORMED")))
Avro
Avro functions accept the same options as the corresponding DataFrame options:
from_avroandschema_of_avrouse DataFrameReader Avro options.to_avrouses DataFrameWriter Avro options.
Example
The following example decodes an Avro column with schema evolution enabled:
- Python
- Scala
from pyspark.sql.functions import from_avro
df = df.withColumn("decoded", from_avro("avro_col", json_schema, {"avroSchemaEvolutionMode": "restart"}))
import org.apache.spark.sql.avro.functions.from_avro
val df = df.withColumn("decoded", from_avro(col("avro_col"), jsonSchema, Map("avroSchemaEvolutionMode" -> "restart")))
In addition, the Schema Registry variants of from_avro and to_avro accept the following options:
Key | Default | Valid values | Description |
|---|---|---|---|
| None | A schema ID integer | Schema ID from the Confluent Schema Registry to use when decoding Avro data that was encoded with a schema incompatible with |
| None | Any Confluent SR client property value | Confluent Schema Registry client configuration properties. Pass any Confluent SR client property using this prefix, for example |
CSV
CSV functions accept the same options as the corresponding DataFrame options:
from_csvandschema_of_csvuse DataFrameReader CSV options.to_csvuses DataFrameWriter CSV options.
Example
The following example reads CSV with a custom separator and NULL value:
- Python
- Scala
from pyspark.sql.functions import from_csv
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType()), StructField("name", StringType())])
df = df.withColumn("parsed", from_csv("csv_col", schema, {"sep": "|", "nullValue": "N/A"}))
import org.apache.spark.sql.functions.from_csv
import org.apache.spark.sql.types._
val schema = StructType(Seq(StructField("id", IntegerType), StructField("name", StringType)))
val df = df.withColumn("parsed", from_csv(col("csv_col"), schema, Map("sep" -> "|", "nullValue" -> "N/A")))
JSON
JSON functions accept the same options as the corresponding DataFrame options:
from_jsonandschema_of_jsonuse DataFrameReader JSON options.to_jsonuses DataFrameWriter JSON options.
Example
The following example writes JSON with NULL fields ignored and pretty formatting enabled:
- Python
- Scala
from pyspark.sql.functions import to_json
df = df.withColumn("json_str", to_json("struct_col", {"pretty": "true", "ignoreNullFields": "true"}))
import org.apache.spark.sql.functions.to_json
val df = df.withColumn("json_str", to_json(col("struct_col"), Map("pretty" -> "true", "ignoreNullFields" -> "true")))
Protobuf
from_protobuf and to_protobuf do not use a file-based DataSource. Protobuf data is always read and written as binary columns using these functions. Options are passed as a Map[String, String] and are case-sensitive.
Example
The following example decodes a Protobuf column using PERMISSIVE mode:
- Python
- Scala
from pyspark.sql.functions import from_protobuf
df = df.withColumn("decoded", from_protobuf("proto_col", "MyMessage", "/path/to/descriptor.desc",
{"mode": "PERMISSIVE", "enums.as.ints": "true"}))
import org.apache.spark.sql.protobuf.functions.from_protobuf
val df = df.withColumn("decoded", from_protobuf(col("proto_col"), "MyMessage", "/path/to/descriptor.desc",
Map("mode" -> "PERMISSIVE", "enums.as.ints" -> "true")))
Protobuf functions use the following options:
Key | Default | Valid values | Description |
|---|---|---|---|
|
|
| How to handle corrupt records. |
|
|
| Maximum recursion depth for recursive Protobuf fields. Set to |
|
|
| Whether to convert Protobuf |
|
|
| Whether to emit fields with zero or default values (proto3 semantics). When |
|
|
| Whether to render enum fields as integer values instead of strings. Applies to |
|
|
| Whether to upcast |
|
|
| Whether to unwrap |
|
|
| Whether to retain empty Protobuf message types in the output schema by inserting a dummy column. Applies to |
| None | Any string | Schema Registry subject name. Required when using the Schema Registry variants of |
| None | A | Schema Registry address (host and port). Required when using the Schema Registry variants of |
| None | Any string | Specifies which Protobuf message to use when the schema registry subject contains multiple messages. Optional. |
XML
XML functions accept the same options as the corresponding DataFrame options:
from_xmlandschema_of_xmluse DataFrameReader XML options.to_xmluses DataFrameWriter XML options.
Example
The following example writes XML with custom root and row tags:
- Python
- Scala
from pyspark.sql.functions import to_xml
df = df.withColumn("xml_str", to_xml("struct_col", {"rootTag": "records", "rowTag": "record"}))
import org.apache.spark.sql.functions.to_xml
val df = df.withColumn("xml_str", to_xml(col("struct_col"), Map("rootTag" -> "records", "rowTag" -> "record")))