Connecting Databricks and Azure Synapse with PolyBase (legacy)

Important

This documentation has been retired and might not be updated. The products, services, or technologies mentioned in this content are no longer supported. See Query data in Azure Synapse Analytics.

Databricks recommends using the default COPY functionality with Azure Data Lake Storage Gen2 for connections to Azure Synapse. This article includes legacy documentation around PolyBase and blob storage.

Azure Synapse Analytics (formerly SQL Data Warehouse) is a cloud-based enterprise data warehouse that leverages massively parallel processing (MPP) to quickly run complex queries across petabytes of data. Use Azure as a key component of a big data solution. Import big data into Azure with simple PolyBase T-SQL queries, or COPY statement and then use the power of MPP to run high-performance analytics. As you integrate and analyze, the data warehouse will become the single version of truth your business can count on for insights.

You can access Azure Synapse from Databricks using the Azure Synapse connector, a data source implementation for Apache Spark that uses Azure Blob storage, and PolyBase or the COPY statement in Azure Synapse to transfer large volumes of data efficiently between a Databricks cluster and an Azure Synapse instance.

Both the Databricks cluster and the Azure Synapse instance access a common Blob storage container to exchange data between these two systems. In Databricks, Apache Spark jobs are triggered by the Azure Synapse connector to read data from and write data to the Blob storage container. On the Azure Synapse side, data loading and unloading operations performed by PolyBase are triggered by the Azure Synapse connector through JDBC. In Databricks Runtime 7.0 and above, COPY is used by default to load data into Azure Synapse by the Azure Synapse connector through JDBC.

Note

COPY is available only on Azure Synapse Gen2 instances, which provide better performance. If your database still uses Gen1 instances, we recommend that you migrate the database to Gen2.

The Azure Synapse connector is more suited to ETL than to interactive queries, because each query execution can extract large amounts of data to Blob storage. If you plan to perform several queries against the same Azure Synapse table, we recommend that you save the extracted data in a format such as Parquet.

Requirements

An Azure Synapse database master key.

Authentication

The Azure Synapse connector uses three types of network connections:

  • Spark driver to Azure Synapse

  • Spark driver and executors to Azure storage account

  • Azure Synapse to Azure storage account

                                 ┌─────────┐
      ┌─────────────────────────>│ STORAGE │<────────────────────────┐
      │   Storage acc key /      │ ACCOUNT │  Storage acc key /      │
      │   Managed Service ID /   └─────────┘  OAuth 2.0 /            │
      │                               │                              │
      │                               │                              │
      │                               │ Storage acc key /            │
      │                               │ OAuth 2.0 /                  │
      │                               │                              │
      v                               v                       ┌──────v────┐
┌──────────┐                      ┌──────────┐                │┌──────────┴┐
│ Synapse  │                      │  Spark   │                ││ Spark     │
│ Analytics│<────────────────────>│  Driver  │<───────────────>│ Executors │
└──────────┘  JDBC with           └──────────┘    Configured   └───────────┘
              username & password /                in Spark

The following sections describe each connection’s authentication configuration options.

Spark driver to Azure Synapse

The Spark driver can connect to Azure Synapse using JDBC with a username and password or OAuth 2.0 with a service principal for authentication.

Username and password

We recommend that you use the connection strings provided by Azure portal for both authentication types, which enable Secure Sockets Layer (SSL) encryption for all data sent between the Spark driver and the Azure Synapse instance through the JDBC connection. To verify that the SSL encryption is enabled, you can search for encrypt=true in the connection string.

To allow the Spark driver to reach Azure Synapse, we recommend that you set Allow Azure services and resources to access this workspace to ON on the Networking pane under Security of the Azure Synapse workspace through the Azure portal. This setting allows communications from all Azure IP addresses and all Azure subnets, which allows Spark drivers to reach the Azure Synapse instance.

OAuth 2.0 with a service principal

You can authenticate to Azure Synapse Analytics using a service principal with access to the underlying storage account. For more information on using service principal credentials to access an Azure storage account, see Connect to Azure Data Lake Storage Gen2 and Blob Storage. You must set the enableServicePrincipalAuth option to true in the connection configuration Parameters to enable the connector to authenticate with a service principal.

You can optionally use a different service principal for the Azure Synapse Analytics connection. An example that configures service principal credentials for the storage account and optional service principal credentials for Synapse:

; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")

# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Spark driver and executors to Azure storage account

The Azure storage container acts as an intermediary to store bulk data when reading from or writing to Azure Synapse. Spark connects to ADLS Gen2 or Blob Storage using the abfss driver.

The following authentication options are available:

The examples below illustrate these two ways using the storage account access key approach. The same applies to OAuth 2.0 configuration.

Notebook session configuration (preferred)

Using this approach, the account access key is set in the session configuration associated with the notebook that runs the command. This configuration does not affect other notebooks attached to the same cluster. spark is the SparkSession object provided in the notebook.

spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

Global Hadoop configuration

This approach updates the global Hadoop configuration associated with the SparkContext object shared by all notebooks.

sc.hadoopConfiguration.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

hadoopConfiguration is not exposed in all versions of PySpark. Although the following command relies on some Spark internals, it should work with all PySpark versions and is unlikely to break or change in the future:

sc._jsc.hadoopConfiguration().set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

Azure Synapse to Azure storage account

Azure Synapse also connects to a storage account during loading and unloading of temporary data.

In case you have set up an account key and secret for the storage account, you can set forwardSparkAzureStorageCredentials to true, in which case Azure Synapse connector automatically discovers the account access key set in the notebook session configuration or the global Hadoop configuration and forwards the storage account access key to the connected Azure Synapse instance by creating a temporary Azure database scoped credential.

Alternatively, if you use ADLS Gen2 with OAuth 2.0 authentication or your Azure Synapse instance is configured to have a Managed Service Identity (typically in conjunction with a VNet + Service Endpoints setup), you must set useAzureMSI to true. In this case the connector will specify IDENTITY = 'Managed Service Identity' for the databased scoped credential and no SECRET.

Streaming support

The Azure Synapse connector offers efficient and scalable Structured Streaming write support for Azure Synapse that provides consistent user experience with batch writes, and uses PolyBase or COPY for large data transfers between a Databricks cluster and Azure Synapse instance. Similar to the batch writes, streaming is designed largely for ETL, thus providing higher latency that may not be suitable for real-time data processing in some cases.

Fault tolerance semantics

By default, Azure Synapse Streaming offers end-to-end exactly-once guarantee for writing data into an Azure Synapse table by reliably tracking progress of the query using a combination of checkpoint location in DBFS, checkpoint table in Azure Synapse, and locking mechanism to ensure that streaming can handle any types of failures, retries, and query restarts. Optionally, you can select less restrictive at-least-once semantics for Azure Synapse Streaming by setting spark.databricks.sqldw.streaming.exactlyOnce.enabled option to false, in which case data duplication could occur in the event of intermittent connection failures to Azure Synapse or unexpected query termination.

Usage (Batch)

You can use this connector via the data source API in Scala, Python, SQL, and R notebooks.

// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select x, count(*) as cnt from table group by x")
  .load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .save()

# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .load()

# Load data from an Azure Synapse query.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", "select x, count(*) as cnt from table group by x") \
  .load()

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .save()
-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;

-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);

-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
# Load SparkR
library(SparkR)

# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Load data from an Azure Synapse query.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   query = "select x, count(*) as cnt from table group by x",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
  df,
  source = "com.databricks.spark.sqldw",
  url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
  forward_spark_azure_storage_credentials = "true",
  dbTable = "<your-table-name>",
  tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")

Usage (Streaming)

You can write data using Structured Streaming in Scala and Python notebooks.

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

Configuration

This section describes how to configure write semantics for the connector, required permissions, and miscellaneous configuration parameters.

Supported save modes for batch writes

The Azure Synapse connector supports ErrorIfExists, Ignore, Append, and Overwrite save modes with the default mode being ErrorIfExists. For more information on supported save modes in Apache Spark, see Spark SQL documentation on Save Modes.

Supported output modes for streaming writes

The Azure Synapse connector supports Append and Complete output modes for record appends and aggregations. For more details on output modes and compatibility matrix, see the Structured Streaming guide.

Write semantics

Note

COPY is available in Databricks Runtime 7.0 and above.

In addition to PolyBase, the Azure Synapse connector supports the COPY statement. The COPY statement offers a more convenient way of loading data into Azure Synapse without the need to create an external table, requires fewer permissions to load data, and improves the performance of data ingestion into Azure Synapse.

By default, the connector automatically discovers the best write semantics (COPY when targeting an Azure Synapse Gen2 instance, PolyBase otherwise). You can also specify the write semantics with the following configuration:

// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;
# Load SparkR
library(SparkR)

# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")

where <write-semantics> is either polybase to use PolyBase, or copy to use the COPY statement.

Required Azure Synapse permissions for PolyBase

When you use PolyBase, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:

As a prerequisite for the first command, the connector expects that a database master key already exists for the specified Azure Synapse instance. If not, you can create a key using the CREATE MASTER KEY command.

Additionally, to read the Azure Synapse table set through dbTable or tables referred in query, the JDBC user must have permission to access needed Azure Synapse tables. To write data back to an Azure Synapse table set through dbTable, the JDBC user must have permission to write to this Azure Synapse table.

The following table summarizes the required permissions for all operations with PolyBase:

Operation

Permissions

Permissions when using external data source

Batch write

CONTROL

See Batch write

Streaming write

CONTROL

See Streaming write

Read

CONTROL

See Read

Required Azure Synapse permissions for PolyBase with the external data source option

You can use PolyBase with a pre-provisioned external data source. See the externalDataSource parameter in Parameters for more information.

To use PolyBase with a pre-provisioned external data source, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:

To create an external data source, you should first create a database scoped credential. The following links describe how to create a scoped credential for service principals and an external data source for an ABFS location:

Note

The external data source location must point to a container. The connector will not work if the location is a directory in a container.

The following table summarizes the permissions for PolyBase write operations with the external data source option:

Operation

Permissions (insert into an existing table)

Permissions (insert into a new table)

Batch write

ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

Streaming write

ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

The following table summarizes the permissions for PolyBase read operations with external data source option:

Operation

Permissions

Read

CREATE TABLE

ALTER ANY SCHEMA

ALTER ANY EXTERNAL DATA SOURCE

ALTER ANY EXTERNAL FILE FORMAT

You can use this connector to read via the data source API in Scala, Python, SQL, and R notebooks.

// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("externalDataSource", "<your-pre-provisioned-data-source>")
  .option("dbTable", "<your-table-name>")
  .load()
# Get some data from an Azure Synapse table.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("externalDataSource", "<your-pre-provisioned-data-source>") \
  .option("dbTable", "<your-table-name>") \
  .load()
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
  externalDataSource '<your-pre-provisioned-data-source>'
);
# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
   externalDataSource = "<your-pre-provisioned-data-source>")

Required Azure Synapse permissions for the COPY statement

Note

Available in Databricks Runtime 7.0 and above.

When you use the COPY statement, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:

If the destination table does not exist in Azure Synapse, permission to run the following command is required in addition to the command above:

The following table summarizes the permissions for batch and streaming writes with COPY:

Operation

Permissions (insert into an existing table)

Permissions (insert into a new table)

Batch write

ADMINISTER DATABASE BULK OPERATIONS

INSERT

ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

Streaming write

ADMINISTER DATABASE BULK OPERATIONS

INSERT

ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

Parameters

The parameter map or OPTIONS provided in Spark SQL support the following settings:

Parameter

Required

Default

Notes

dbTable

Yes, unless query is specified

No default

The table to create or read from in Azure Synapse. This parameter is required when saving data back to Azure Synapse.

You can also use {SCHEMA NAME}.{TABLE NAME} to access a table in a given schema. If schema name is not provided, the default schema associated with the JDBC user is used.

The previously supported dbtable variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

query

Yes, unless dbTable is specified

No default

The query to read from in Azure Synapse.

For tables referred in the query, you can also use {SCHEMA NAME}.{TABLE NAME} to access a table in a given schema. If schema name is not provided, the default schema associated with the JDBC user is used.

user

No

No default

The Azure Synapse username. Must be used in tandem with password option. Can only be used if the user and password are not passed in the URL. Passing both will result in an error.

password

No

No default

The Azure Synapse password. Must be used in tandem with user option. Can only be used if the user and password are not passed in the URL. Passing both will result in an error.

url

Yes

No default

A JDBC URL with sqlserver set as the subprotocol. It is recommended to use the connection string provided by Azure portal. Setting encrypt=true is strongly recommended, because it enables SSL encryption of the JDBC connection. If user and password are set separately, you do not need to include them in the URL.

jdbcDriver

No

Determined by the JDBC URL’s subprotocol

The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL’s subprotocol.

The previously supported jdbc_driver variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

tempDir

Yes

No default

A abfss URI. We recommend you use a dedicated Blob storage container for the Azure Synapse.

The previously supported tempdir variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

tempFormat

No

PARQUET

The format in which to save temporary files to the blob store when writing to Azure Synapse. Defaults to PARQUET; no other values are allowed right now.

tempCompression

No

SNAPPY

The compression algorithm to be used to encode/decode temporary by both Spark and Azure Synapse. Currently supported values are: UNCOMPRESSED, SNAPPY and GZIP.

forwardSparkAzureStorageCredentials

No

false

If true, the library automatically discovers the credentials that Spark is using to connect to the Blob storage container and forwards those credentials to Azure Synapse over JDBC. These credentials are sent as part of the JDBC query. Therefore it is strongly recommended that you enable SSL encryption of the JDBC connection when you use this option.

The current version of Azure Synapse connector requires (exactly) one of forwardSparkAzureStorageCredentials, enableServicePrincipalAuth, or useAzureMSI to be explicitly set to true.

The previously supported forward_spark_azure_storage_credentials variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

useAzureMSI

No

false

If true, the library will specify IDENTITY = 'Managed Service Identity' and no SECRET for the database scoped credentials it creates.

The current version of Azure Synapse connector requires (exactly) one of forwardSparkAzureStorageCredentials, enableServicePrincipalAuth, or useAzureMSI to be explicitly set to true.

enableServicePrincipalAuth

No

false

If true, the library will use the provided service principal credentials to connect to the Azure storage account and Azure Synapse Analytics over JDBC.

The current version of Azure Synapse connector requires (exactly) one of forwardSparkAzureStorageCredentials, enableServicePrincipalAuth, or useAzureMSI to be explicitly set to true.

tableOptions

No

CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN

A string used to specify table options when creating the Azure Synapse table set through dbTable. This string is passed literally to the WITH clause of the CREATE TABLE SQL statement that is issued against Azure Synapse.

The previously supported table_options variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

preActions

No

No default (empty string)

A ; separated list of SQL commands to be executed in Azure Synapse before writing data to the Azure Synapse instance. These SQL commands are required to be valid commands accepted by Azure Synapse.

If any of these commands fail, it is treated as an error and the write operation is not executed.

postActions

No

No default (empty string)

A ; separated list of SQL commands to be executed in Azure Synapse after the connector successfully writes data to the Azure Synapse instance. These SQL commands are required to be valid commands accepted by Azure Synapse.

If any of these commands fail, it is treated as an error and you’ll get an exception after the data is successfully written to the Azure Synapse instance.

maxStrLength

No

256

StringType in Spark is mapped to the NVARCHAR(maxStrLength) type in Azure Synapse. You can use maxStrLength to set the string length for all NVARCHAR(maxStrLength) type columns that are in the table with name dbTable in Azure Synapse.

The previously supported maxstrlength variant is deprecated and will be ignored in future releases. Use the “camel case” name instead.

checkpointLocation

Yes

No default

Location on DBFS that will be used by Structured Streaming to write metadata and checkpoint information. See Recovering from Failures with Checkpointing in Structured Streaming programming guide.

numStreamingTempDirsToKeep

No

0

Indicates how many (latest) temporary directories to keep for periodic cleanup of micro batches in streaming. When set to 0, directory deletion is triggered immediately after micro batch is committed, otherwise provided number of latest micro batches is kept and the rest of directories is removed. Use -1 to disable periodic cleanup.

applicationName

No

Databricks-User-Query

The tag of the connection for each query. If not specified or the value is an empty string, the default value of the tag is added the JDBC URL. The default value prevents the Azure DB Monitoring tool from raising spurious SQL injection alerts against queries.

maxbinlength

No

No default

Control the column length of BinaryType columns. This parameter is translated as VARBINARY(maxbinlength).

identityInsert

No

false

Setting to true enables IDENTITY_INSERT mode, which inserts a DataFrame provided value in the identity column of the Azure Synapse table.

See Explicitly inserting values into an IDENTITY column.

externalDataSource

No

No default

A pre-provisioned external data source to read data from Azure Synapse. An external data source can only be used with PolyBase and removes the CONTROL permission requirement since the connector does not need to create a scoped credential and an external data source to load data.

For example usage and the list of permissions required when using an external data source, see Required Azure Synapse permissions for PolyBase with the external data source option.

maxErrors

No

0

The maximum number of rows that can be rejected during reads and writes before the loading operation (either PolyBase or COPY) is cancelled. The rejected rows will be ignored. For example, if two out of ten records have errors, only eight records will be processed.

See REJECT_VALUE documentation in CREATE EXTERNAL TABLE and MAXERRORS documentation in COPY.

Note

  • tableOptions, preActions, postActions, and maxStrLength are relevant only when writing data from Databricks to a new table in Azure Synapse.

  • externalDataSource is relevant only when reading data from Azure Synapse and writing data from Databricks to a new table in Azure Synapse with PolyBase semantics. You should not specify other storage authentication types while using externalDataSource such as forwardSparkAzureStorageCredentials or useAzureMSI.

  • checkpointLocation and numStreamingTempDirsToKeep are relevant only for streaming writes from Databricks to a new table in Azure Synapse.

  • Even though all data source option names are case-insensitive, we recommend that you specify them in “camel case” for clarity.

Query pushdown into Azure Synapse

The Azure Synapse connector implements a set of optimization rules to push the following operators down into Azure Synapse:

  • Filter

  • Project

  • Limit

The Project and Filter operators support the following expressions:

  • Most boolean logic operators

  • Comparisons

  • Basic arithmetic operations

  • Numeric and string casts

For the Limit operator, pushdown is supported only when there is no ordering specified. For example:

SELECT TOP(10) * FROM table, but not SELECT TOP(10) * FROM table ORDER BY col.

Note

The Azure Synapse connector does not push down expressions operating on strings, dates, or timestamps.

Query pushdown built with the Azure Synapse connector is enabled by default. You can disable it by setting spark.databricks.sqldw.pushdown to false.

Temporary data management

The Azure Synapse connector does not delete the temporary files that it creates in the Blob storage container. Therefore we recommend that you periodically delete temporary files under the user-supplied tempDir location.

To facilitate data cleanup, the Azure Synapse connector does not store data files directly under tempDir, but instead creates a subdirectory of the form: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/. You can set up periodic jobs (using the Databricks jobs feature or otherwise) to recursively delete any subdirectories that are older than a given threshold (for example, 2 days), with the assumption that there cannot be Spark jobs running longer than that threshold.

A simpler alternative is to periodically drop the whole container and create a new one with the same name. This requires that you use a dedicated container for the temporary data produced by the Azure Synapse connector and that you can find a time window in which you can guarantee that no queries involving the connector are running.

Temporary object management

The Azure Synapse connector automates data transfer between a Databricks cluster and an Azure Synapse instance. For reading data from an Azure Synapse table or query or writing data to an Azure Synapse table, the Azure Synapse connector creates temporary objects, including DATABASE SCOPED CREDENTIAL, EXTERNAL DATA SOURCE, EXTERNAL FILE FORMAT, and EXTERNAL TABLE behind the scenes. These objects live only throughout the duration of the corresponding Spark job and should automatically be dropped thereafter.

When a cluster is running a query using the Azure Synapse connector, if the Spark driver process crashes or is forcefully restarted, or if the cluster is forcefully terminated or restarted, temporary objects might not be dropped. To facilitate identification and manual deletion of these objects, the Azure Synapse connector prefixes the names of all intermediate temporary objects created in the Azure Synapse instance with a tag of the form: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>.

We recommend that you periodically look for leaked objects using queries such as the following:

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'

  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'

  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'

  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'

Streaming checkpoint table management

The Azure Synapse connector does not delete the streaming checkpoint table that is created when new streaming query is started. This behavior is consistent with the checkpointLocation on DBFS. Therefore we recommend that you periodically delete checkpoint tables at the same time as removing checkpoint locations on DBFS for queries that are not going to be run in the future or already have checkpoint location removed.

By default, all checkpoint tables have the name <prefix>_<query-id>, where <prefix> is a configurable prefix with default value databricks_streaming_checkpoint and query_id is a streaming query ID with _ characters removed. To find all checkpoint tables for stale or deleted streaming queries, run the query:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

You can configure the prefix with the Spark SQL configuration option spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix.

Frequently asked questions (FAQ)

I received an error while using the Azure Synapse connector. How can I tell if this error is from Azure Synapse or Databricks?

To help you debug errors, any exception thrown by code that is specific to the Azure Synapse connector is wrapped in an exception extending the SqlDWException trait. Exceptions also make the following distinction:

  • SqlDWConnectorException represents an error thrown by the Azure Synapse connector

  • SqlDWSideException represents an error thrown by the connected Azure Synapse instance

What should I do if my query failed with the error “No access key found in the session conf or the global Hadoop conf”?

This error means that Azure Synapse connector could not find the storage account access key in the notebook session configuration or global Hadoop configuration for the storage account specified in tempDir. See Usage (Batch) for examples of how to configure Storage Account access properly. If a Spark table is created using Azure Synapse connector, you must still provide the storage account access credentials in order to read or write to the Spark table.

Can I use a Shared Access Signature (SAS) to access the Blob storage container specified by tempDir?

Azure Synapse does not support using SAS to access Blob storage. Therefore the Azure Synapse connector does not support SAS to access the Blob storage container specified by tempDir.

I created a Spark table using Azure Synapse connector with the dbTable option, wrote some data to this Spark table, and then dropped this Spark table. Will the table created at the Azure Synapse side be dropped?

No. Azure Synapse is considered an external data source. The Azure Synapse table with the name set through dbTable is not dropped when the Spark table is dropped.

When writing a DataFrame to Azure Synapse, why do I need to say .option("dbTable", tableName).save() instead of just .saveAsTable(tableName)?

That is because we want to make the following distinction clear: .option("dbTable", tableName) refers to the database (that is, Azure Synapse) table, whereas .saveAsTable(tableName) refers to the Spark table. In fact, you could even combine the two: df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark) which creates a table in Azure Synapse called tableNameDW and an external table in Spark called tableNameSpark that is backed by the Azure Synapse table.

Warning

Beware of the following difference between .save() and .saveAsTable():

  • For df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save(), writeMode acts on the Azure Synapse table, as expected.

  • For df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark), writeMode acts on the Spark table, whereas tableNameDW is silently overwritten if it already exists in Azure Synapse.

This behavior is no different from writing to any other data source. It is just a caveat of the Spark DataFrameWriter API.