Structured Streaming writes to Azure Synapse
The Azure Synapse connector offers efficient and scalable Structured Streaming write support for Azure Synapse that
provides consistent user experience with batch writes and uses COPY
for large data transfers
between a Databricks cluster and Azure Synapse instance.
Structured Streaming support between Databricks and Synapse provides simple semantics for configuring incremental ETL jobs. The model used to load data from Databricks to Synapse introduces latency that might not meet SLA requirements for near-real time workloads. See Query data in Azure Synapse Analytics.
Supported output modes for streaming writes to Synapse
The Azure Synapse connector supports Append
and Complete
output modes for record appends and aggregations. For more details on output modes and compatibility matrix, see the Structured Streaming guide.
Synapse fault tolerance semantics
By default, Azure Synapse Streaming offers end-to-end exactly-once guarantee for writing data into an Azure Synapse table by reliably tracking progress of the query using a combination of checkpoint location in DBFS, checkpoint table in Azure Synapse, and locking mechanism to ensure that streaming can handle any types of failures, retries, and query restarts.
Optionally, you can select less restrictive at-least-once semantics for Azure Synapse Streaming by setting spark.databricks.sqldw.streaming.exactlyOnce.enabled
option to false
, in which case data duplication could occur in the event of intermittent connection failures to Azure Synapse or unexpected query termination.
Structured Streaming syntax for writing to Azure Synapse
The following code examples demonstrate streaming writes to Synapse using Structured Streaming in Scala and Python:
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
For a full list of configurations, see Query data in Azure Synapse Analytics.
Synapse streaming checkpoint table management
The Azure Synapse connector does not delete the streaming checkpoint table that is created when new streaming query is started. This behavior is consistent with the checkpointLocation
normally specified to object storage. Databricks recommends you periodically delete checkpoint tables for queries that are not going to be run in the future.
By default, all checkpoint tables have the name <prefix>_<query-id>
, where <prefix>
is a configurable prefix with default value databricks_streaming_checkpoint
and query_id
is a streaming query ID with _
characters removed.
To find all checkpoint tables for stale or deleted streaming queries, run the query:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
You can configure the prefix with the Spark SQL configuration option spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
.
Databricks Synapse connector streaming options reference
The OPTIONS
provided in Spark SQL support the following options for streaming in addition to the batch options:
Parameter |
Required |
Default |
Notes |
---|---|---|---|
|
Yes |
No default |
Location on DBFS that will be used by Structured Streaming to write metadata and checkpoint information. See Recovering from Failures with Checkpointing in Structured Streaming programming guide. |
|
No |
0 |
Indicates how many (latest) temporary directories to keep for periodic cleanup of micro batches
in streaming. When set to |
Note
checkpointLocation
and numStreamingTempDirsToKeep
are relevant only for streaming writes from Databricks to a new table in Azure Synapse.