Consultar dados no Azure Synapse Analytics
O senhor pode acessar Azure Synapse a partir de Databricks usando o conector Azure Synapse, que usa a instrução COPY
em Azure Synapse para transferir grandes volumes de dados de forma eficiente entre um cluster Databricks e uma instância Azure Synapse usando um armazenamento Azure Data Lake Storage Gen2 account para preparação temporária.
Experimental
As configurações descritas neste artigo são experimentais. Os recursos experimentais são fornecidos no estado em que se encontram e não recebem suporte do site Databricks por meio do suporte técnico ao cliente. Para obter suporte completo à federação de consultas, o senhor deve usar a lakehouse Federation, que permite que os usuários do Databricks aproveitem as ferramentas de sintaxe e governança de dados do Unity Catalog.
Azure Synapse Analytics é uma empresa baseada na nuvem data warehouse que aproveita o processamento massivamente paralelo (MPP) para executar rapidamente consultas complexas em petabytes de dados.
Esse conector deve ser usado somente com instâncias do pool Synapse Dedicated e não é compatível com outros componentes do Synapse.
COPY
está disponível apenas nas instâncias do Azure Data Lake Storage Gen2. Se estiver procurando detalhes sobre como trabalhar com o Polybase, consulte Conectando o Databricks e o Azure Synapse com o PolyBase (legado).
Exemplo de sintaxe para o Synapse
O senhor pode consultar Synapse em Scala, Python, SQL, e R. Os exemplos de código a seguir usam a chave de armazenamento account e encaminham as credenciais de armazenamento de Databricks para Synapse.
Use as cadeias de conexão fornecidas pelo portal Azure, que habilita a criptografia Secure Sockets Layer (SSL) para todos os dados enviados entre o driver Spark e a instância Azure Synapse por meio da conexão JDBC. Para verificar se a criptografia SSL está ativada, o senhor pode procurar por encrypt=true
nas cadeias de conexão.
Os locais externos definidos no Unity Catalog não são suportados como locais tempDir
.
A Databricks recomenda que o senhor use o fluxo de autenticação mais seguro disponível. O fluxo de autenticação descrito neste exemplo acarreta riscos que não estão presentes em outros fluxos. O senhor só deve usar esse fluxo quando outros fluxos mais seguros, como identidades gerenciais, não forem viáveis.
- Scala
- Python
- SQL
- R
// Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 1433 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
# Set up the storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
df = spark.read
.format("sqldw")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 1433 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # If schemaName not provided, default to "dbo".
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.load()
# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
-- Set up the storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL. The following example applies to Databricks Runtime 11.3 LTS and above.
CREATE TABLE example_table_in_spark_read
USING sqldw
OPTIONS (
host '<hostname>',
port '<port>' /* Optional - will use default port 1433 if not specified. */
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* If schemaName not provided, default to "dbo". */
forwardSparkAzureStorageCredentials 'true',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Read data using SQL. The following example applies to Databricks Runtime 10.4 LTS and below.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbtable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
# Load SparkR
library(SparkR)
# Set up the storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
Como funciona a autenticação entre o Databricks e o Synapse?
O conector do Azure Synapse usa três tipos de conexões de rede:
- Driver Spark para o Azure Synapse
- Spark clustering para Azure storage account
- Azure Synapse para Azure storage account
Configurar o acesso ao armazenamento do Azure
Tanto o Databricks quanto o Synapse precisam de acesso privilegiado a um armazenamento Azure account a ser usado para armazenamento temporário de dados.
Azure Synapse não oferece suporte ao uso do SAS para armazenamento account acesso. O senhor pode configurar o acesso para ambos os serviços executando uma das seguintes ações:
- Use o endereço account key e o segredo para o armazenamento account e defina
forwardSparkAzureStorageCredentials
comotrue
. Consulte Definir propriedades do Spark para configurar as credenciais do Azure para acessar o armazenamento do Azure. - Use o Azure Data Lake Storage Gen2 com autenticação OAuth 2.0 e defina
enableServicePrincipalAuth
comotrue
. Consulte Configurar conexão do Databricks para o Synapse com OAuth 2.0 com uma entidade de serviço. - Configure sua instância Azure Synapse para ter uma identidade de serviço gerenciado e defina
useAzureMSI
comotrue
.
Permissões necessárias do Azure Synapse
Como usa COPY
em segundo plano, o conector Azure Synapse exige que o usuário da conexão JDBC tenha permissão para executar o seguinte comando na instância Azure Synapse conectada:
Se a tabela de destino não existir em Azure Synapse, será necessária a permissão para executar o seguinte comando, além do comando acima:
A tabela a seguir resume as permissões necessárias para gravações com COPY
:
Permissões (inserir em uma tabela existente) | Permissões (inserir em uma nova tabela) |
---|---|
ADMINISTER DATABASE BULK operações INSERT | ADMINISTER DATABASE BULK operações INSERT CREATE TABLE ALTERAR NO ESQUEMA:: dbo |
Configurar a conexão do Databricks para o Synapse com o OAuth 2.0 com uma entidade de serviço
O senhor pode se autenticar em Azure Synapse Analytics usando uma entidade de serviço com acesso ao armazenamento subjacente account. Para obter mais informações sobre como usar credenciais de entidade de serviço para acessar um armazenamento Azure account, consulte Conectar-se a Azure Data Lake Storage Gen2 e Blob Storage. O senhor deve definir a opção enableServicePrincipalAuth
como true
na configuração de conexão da referência de opções do conector do Databricks Synapse para permitir que o conector se autentique com uma entidade de serviço.
Opcionalmente, o senhor pode usar uma entidade de serviço diferente para a conexão do Azure Synapse Analytics. O exemplo a seguir configura as credenciais de entidade de serviço para o storage account e as credenciais opcionais de entidade de serviço para Synapse:
- ini
- Scala
- Python
- R
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Modos de salvamento suportados para gravações de lotes
O conector Azure Synapse suporta os modos de salvamento ErrorIfExists
, Ignore
, Append
e Overwrite
, sendo que o modo default é ErrorIfExists
. Para obter mais informações sobre os modos de salvamento suportados em Apache Spark, consulte a documentaçãoSpark SQL em Save Modes.
Referência das opções do conector do Databricks Synapse
O site OPTIONS
fornecido no Spark SQL suporta as seguintes configurações:
Parâmetro | Obrigatório | Padrão | Notas |
---|---|---|---|
| Sim, a menos que | Não default | A tabela para criar ou ler no Azure Synapse. Esse parâmetro é necessário ao salvar dados de volta no Azure Synapse. Você também pode usar A variante |
| Sim, a menos que | Não default | A consulta para leitura no Azure Synapse. Para tabelas referenciadas na consulta, você também pode usar |
| Não | Não default | O nome de usuário do Azure Synapse. Deve ser usado em conjunto com a opção |
| Não | Não default | A senha do Azure Synapse. Deve ser usado em conjunto com a opção |
| Sim | Não default | Um URL JDBC com |
| Não | Determinado pelo subprotocolo do URL do JDBC | O nome da classe do driver JDBC a ser usado. Essa classe deve estar no classpath. Na maioria dos casos, não deve ser necessário especificar essa opção, pois o nome de classe do driver apropriado deve ser determinado automaticamente pelo subprotocolo do URL do JDBC. A variante |
| Sim | Não default | UM URI A variante O senhor não pode usar um local externo definido no Unity Catalog como um local |
| Não |
| O algoritmo de compactação a ser usado para codificar/decodificar temporariamente pelo Spark e pelo Azure Synapse. Os valores atualmente suportados são: |
| Não | falso | Se Ao configurar a autenticação de armazenamento, você deve definir exatamente um de A variante |
| Não | falso | Se for Ao configurar a autenticação de armazenamento, você deve definir exatamente um de |
| Não | falso | Se Se |
| Não |
| Uma cadeia de caracteres usada para especificar opções de tabela ao criar o conjunto de tabelas Azure Synapse por meio de A variante |
| Não | No default (cadeias de caracteres vazias) | Uma lista separada por Se algum desses comandos falhar, ele será tratado como um erro e as operações de gravação não serão executadas. |
| Não | No default (cadeias de caracteres vazias) | Uma lista separada por Se algum desses comandos falhar, ele será tratado como um erro e o senhor receberá uma exceção depois que os dados forem gravados com êxito na instância Azure Synapse. |
| Não | 256 |
A variante |
| Não |
| A tag da conexão para cada consulta. Se não for especificado ou se o valor for uma cadeia de caracteres vazia, o valor default da tag será adicionado ao URL JDBC. O valor default impede que a ferramenta de monitoramento Azure DB levante alertas de injeção SQL espúrios contra consultas. |
| Não | Não default | Controle o comprimento das colunas |
| Não | falso | Definir como Consulte Inserção explícita de valores em uma coluna IDENTITY. |
| Não | Não default | Um pré-provisionamento externo de fonte de dados para ler dados de Azure Synapse. Uma fonte de dados externa só pode ser usada com o PolyBase e remove o requisito de permissão CONTROL porque o conector não precisa criar uma credencial com escopo e uma fonte de dados externa para carregar dados. Para obter um exemplo de uso e a lista de permissões necessárias ao usar uma fonte de dados externa, consulte as permissões necessárias em Azure Synapse para o PolyBase com a opção de fonte de dados externa. |
| Não | 0 | O número máximo de linhas que podem ser rejeitadas durante as leituras e gravações antes que as operações de carregamento sejam canceladas. As linhas rejeitadas serão ignoradas. Por exemplo, se dois em cada dez registros tiverem erros, somente oito registros serão processados. Consulte a documentação REJECT_VALUE em CREATE EXTERNAL TABLE e a documentação MAXERRORS em COPY. |
| Não | falso | Se |
tableOptions
,preActions
,postActions
, emaxStrLength
são relevantes apenas ao gravar dados do Databricks em uma nova tabela no Azure Synapse.- Embora todos os nomes de opções de fonte de dados não diferenciem maiúsculas de minúsculas, recomendamos que o senhor os especifique em "camel case" para maior clareza.
Envio de consulta para o Azure Synapse
O conector do Azure Synapse implementa um conjunto de regras de otimização para enviar os seguintes operadores para o Azure Synapse:
Filter
Project
Limit
Os operadores Project
e Filter
suportam as seguintes expressões:
- A maioria dos operadores de lógica booleana
- Comparações
- Operações aritméticas básicas
- Conversão de números e strings
Para o operador Limit
, o pushdown é suportado somente quando não há nenhum pedido especificado. Por exemplo:
SELECT TOP(10) * FROM table
, mas não SELECT TOP(10) * FROM table ORDER BY col
.
O conector Azure Synapse não pressiona expressões que operam em strings, datas ou registros de data e hora.
O pushdown de consulta criado com o conector Azure Synapse é ativado por default. Você pode desativá-lo configurando spark.databricks.sqldw.pushdown
para false
.
Gerenciamento temporário de dados
O conector do Azure Synapse não exclui os arquivos temporários que ele cria no contêiner de armazenamento do Azure. A Databricks recomenda que o senhor exclua periodicamente os arquivos temporários no local tempDir
fornecido pelo usuário.
Para facilitar a limpeza dos dados, o conector do Azure Synapse não armazena os arquivos de dados diretamente em tempDir
, mas cria um subdiretório no formato: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
. O senhor pode configurar um trabalho periódico (usando o recurso Databricks Job ou de outra forma) para excluir recursivamente todos os subdiretórios mais antigos do que um determinado limite (por exemplo, 2 dias), com a suposição de que não pode haver um trabalho Spark em execução por mais tempo do que esse limite.
Uma alternativa mais simples é descartar periodicamente todo o contêiner e criar um novo com o mesmo nome. Isso requer que o senhor use um contêiner dedicado para os dados temporários produzidos pelo conector do Azure Synapse e que possa encontrar uma janela de tempo na qual possa garantir que nenhuma consulta envolvendo o conector esteja em execução.
Gerenciamento temporário de objetos
O conector Azure Synapse automatiza a transferência de dados entre um clustering Databricks e uma instância Azure Synapse. Para ler dados de uma tabela ou consulta do Azure Synapse ou gravar dados em uma tabela do Azure Synapse, o conector do Azure Synapse cria objetos temporários, incluindo DATABASE SCOPED CREDENTIAL
, EXTERNAL DATA SOURCE
, EXTERNAL FILE FORMAT
e EXTERNAL TABLE
nos bastidores. Esses objetos permanecem apenas durante a duração do trabalho correspondente em Spark e são descartados automaticamente.
Quando um clustering estiver executando uma consulta usando o conector Azure Synapse, se o processo do driver Spark falhar ou for reiniciado à força, ou se o clustering for encerrado ou reiniciado à força, os objetos temporários poderão não ser descartados. Para facilitar a identificação e a exclusão manual desses objetos, o conector do Azure Synapse prefixa os nomes de todos os objetos temporários intermediários criados na instância do Azure Synapse com uma tag do tipo: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
.
Recomendamos que você procure periodicamente objetos vazados usando consultas como as seguintes:
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'