Query data in Azure Synapse Analytics
You can access Azure Synapse from Databricks using the Azure Synapse connector, which uses the COPY
statement in Azure Synapse to transfer large volumes of data efficiently between a Databricks cluster and an Azure Synapse instance using an Azure Data Lake Storage Gen2 storage account for temporary staging.
Experimental
The configurations described in this article are Experimental. Experimental features are provided as-is and are not supported by Databricks through customer technical support. To get full query federation support, you should instead use Lakehouse Federation, which enables your Databricks users to take advantage of Unity Catalog syntax and data governance tools.
Azure Synapse Analytics is a cloud-based enterprise data warehouse that leverages massively parallel processing (MPP) to quickly run complex queries across petabytes of data.
Important
This connector is for use with Synapse Dedicated Pool instances only and is not compatible with other Synapse components.
Note
COPY
is available only on Azure Data Lake Storage Gen2 instances. If you’re looking for details on working with Polybase, see Connecting Databricks and Azure Synapse with PolyBase (legacy).
Example syntax for Synapse
You can query Synapse in Scala, Python, SQL, and R. The following code examples use storage account keys and forward the storage credentials from Databricks to Synapse.
Note
Use the connection string provided by Azure portal, which enables Secure Sockets Layer (SSL) encryption for all data sent between the Spark driver and the Azure Synapse instance through the JDBC connection. To verify that the SSL encryption is enabled, you can search for encrypt=true
in the connection string.
Important
External locations defined in Unity Catalog are not supported as tempDir
locations.
// Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 1433 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
# Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
df = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 1433 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # If schemaName not provided, default to "dbo".
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
-- Set up the storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL. The following example applies to Databricks Runtime 11.3 LTS and above.
CREATE TABLE example_table_in_spark_read
USING sqldw
OPTIONS (
host '<hostname>',
port '<port>' /* Optional - will use default port 1433 if not specified. */
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* If schemaName not provided, default to "dbo". */
forwardSparkAzureStorageCredentials 'true',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Read data using SQL. The following example applies to Databricks Runtime 10.4 LTS and below.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbtable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
# Load SparkR
library(SparkR)
# Set up the storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
How does authentication between Databricks and Synapse work?
The Azure Synapse connector uses three types of network connections:
Spark driver to Azure Synapse
Spark cluster to Azure storage account
Azure Synapse to Azure storage account
Configuring access to Azure storage
Both Databricks and Synapse need privileged access to an Azure storage account to be used for temporary data storage.
Azure Synapse does not support using SAS for storage account access. You can configure access for both services by doing one of the following:
Use the account key and secret for the storage account and set
forwardSparkAzureStorageCredentials
totrue
. See Set Spark properties to configure Azure credentials to access Azure storage.Use Azure Data Lake Storage Gen2 with OAuth 2.0 authentication and set
enableServicePrincipalAuth
totrue
. See Configure connection from Databricks to Synapse with OAuth 2.0 with a service principal.Configure your Azure Synapse instance to have a Managed Service Identity and set
useAzureMSI
totrue
.
Required Azure Synapse permissions
Because it uses COPY
in the background, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:
If the destination table does not exist in Azure Synapse, permission to run the following command is required in addition to the command above:
The following table summarizes the permissions required for writes with COPY
:
Permissions (insert into an existing table) |
Permissions (insert into a new table) |
---|---|
ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Configure connection from Databricks to Synapse with OAuth 2.0 with a service principal
You can authenticate to Azure Synapse Analytics using a service principal with access to the underlying storage account. For more information on using service principal credentials to access an Azure storage account, see Connect to Azure Data Lake Storage Gen2 and Blob Storage. You must set the enableServicePrincipalAuth
option to true
in the connection configuration Databricks Synapse connector options reference to enable the connector to authenticate with a service principal.
You can optionally use a different service principal for the Azure Synapse Analytics connection. The following example configures service principal credentials for the storage account and optional service principal credentials for Synapse:
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Supported save modes for batch writes
The Azure Synapse connector supports ErrorIfExists
, Ignore
, Append
, and Overwrite
save modes with the default mode being ErrorIfExists
. For more information on supported save modes in Apache Spark, see Spark SQL documentation on Save Modes.
Databricks Synapse connector options reference
The OPTIONS
provided in Spark SQL support the following settings:
Parameter |
Required |
Default |
Notes |
---|---|---|---|
|
Yes, unless |
No default |
The table to create or read from in Azure Synapse. This parameter is required when saving data back to Azure Synapse. You can also use The previously supported |
|
Yes, unless |
No default |
The query to read from in Azure Synapse. For tables referred in the query, you can also use |
|
No |
No default |
The Azure Synapse username. Must be used in tandem with |
|
No |
No default |
The Azure Synapse password. Must be used in tandem with |
|
Yes |
No default |
A JDBC URL with |
|
No |
Determined by the JDBC URL’s subprotocol |
The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL’s subprotocol. The previously supported |
|
Yes |
No default |
A The previously supported You cannot use an External location defined in Unity Catalog as a |
|
No |
|
The compression algorithm to be used to encode/decode temporary by both Spark and Azure Synapse. Currently supported values are: |
|
No |
false |
If When configuring storage authentication, you must set exactly one of The previously supported |
|
No |
false |
If When configuring storage authentication, you must set exactly one of |
|
No |
false |
If If either |
|
No |
|
A string used to specify table options
when creating the Azure Synapse table set through The previously supported |
|
No |
No default (empty string) |
A If any of these commands fail, it is treated as an error and the write operation is not executed. |
|
No |
No default (empty string) |
A If any of these commands fail, it is treated as an error and you’ll get an exception after the data is successfully written to the Azure Synapse instance. |
|
No |
256 |
The previously supported |
|
No |
|
The tag of the connection for each query. If not specified or the value is an empty string, the default value of the tag is added the JDBC URL. The default value prevents the Azure DB Monitoring tool from raising spurious SQL injection alerts against queries. |
|
No |
No default |
Control the column length of |
|
No |
false |
Setting to |
|
No |
No default |
A pre-provisioned external data source to read data from Azure Synapse. An external data source can only be used with PolyBase and removes the CONTROL permission requirement because the connector does not need to create a scoped credential and an external data source to load data. For example usage and the list of permissions required when using an external data source, see Required Azure Synapse permissions for PolyBase with the external data source option. |
|
No |
0 |
The maximum number of rows that can be rejected during reads and writes before the loading operation is cancelled. The rejected rows will be ignored. For example, if two out of ten records have errors, only eight records will be processed. See REJECT_VALUE documentation in CREATE EXTERNAL TABLE and MAXERRORS documentation in COPY. |
|
No |
false |
If |
Note
tableOptions
,preActions
,postActions
, andmaxStrLength
are relevant only when writing data from Databricks to a new table in Azure Synapse.Even though all data source option names are case-insensitive, we recommend that you specify them in “camel case” for clarity.
Query pushdown into Azure Synapse
The Azure Synapse connector implements a set of optimization rules to push the following operators down into Azure Synapse:
Filter
Project
Limit
The Project
and Filter
operators support the following expressions:
Most boolean logic operators
Comparisons
Basic arithmetic operations
Numeric and string casts
For the Limit
operator, pushdown is supported only when there is no ordering specified. For example:
SELECT TOP(10) * FROM table
, but not SELECT TOP(10) * FROM table ORDER BY col
.
Note
The Azure Synapse connector does not push down expressions operating on strings, dates, or timestamps.
Query pushdown built with the Azure Synapse connector is enabled by default. You can disable it by setting spark.databricks.sqldw.pushdown
to false
.
Temporary data management
The Azure Synapse connector does not delete the temporary files that it creates in the Azure storage container. Databricks recommends that you periodically delete temporary files under the user-supplied tempDir
location.
To facilitate data cleanup, the Azure Synapse connector does not store data files directly under tempDir
, but instead creates a subdirectory of the form: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
. You can set up periodic jobs (using the Databricks jobs feature or otherwise) to recursively delete any subdirectories that are older than a given threshold (for example, 2 days), with the assumption that there cannot be Spark jobs running longer than that threshold.
A simpler alternative is to periodically drop the whole container and create a new one with the same name. This requires that you use a dedicated container for the temporary data produced by the Azure Synapse connector and that you can find a time window in which you can guarantee that no queries involving the connector are running.
Temporary object management
The Azure Synapse connector automates data transfer between a Databricks cluster and an Azure Synapse instance. For reading data from an Azure Synapse table or query or writing data to an Azure Synapse table, the Azure Synapse connector creates temporary objects, including DATABASE SCOPED CREDENTIAL
, EXTERNAL DATA SOURCE
, EXTERNAL FILE FORMAT
, and EXTERNAL TABLE
behind the scenes. These objects live only throughout the duration of the corresponding Spark job and are automatically dropped.
When a cluster is running a query using the Azure Synapse connector, if the Spark driver process crashes or is forcefully restarted, or if the cluster is forcefully terminated or restarted, temporary objects might not be dropped. To facilitate identification and manual deletion of these objects, the Azure Synapse connector prefixes the names of all intermediate temporary objects created in the Azure Synapse instance with a tag of the form: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
.
We recommend that you periodically look for leaked objects using queries such as the following:
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'