transmissão estruturada grava no Azure Synapse
O conector Azure Synapse oferece suporte de gravação estructurada transmissão eficiente e escalonável para Azure Synapse que fornece experiência de usuário consistente com gravações de lotes e usa COPY
para grandes transferências de dados entre clusters Databricks e instância Azure Synapse.
O suporte de transmissão estruturada entre Databricks e Synapse fornece semântica simples para configuração Job ETL incremental. O modelo utilizado para carregar dados de Databricks para Synapse introduz latência que pode não cumprir os requisitos de SLA para cargas de trabalho quase em tempo real. Consulte os dadosquery no Azure Synapse Analytics.
Modos de saída suportados para gravações transmitidas no Synapse
O conector do Azure Synapse oferece suporte aos modos de saída Append
e Complete
para acréscimos e agregações de registros. Para mais detalhes sobre modos de saída e matriz de compatibilidade, consulte o guia estruturada da transmissão.
Semântica de tolerância a falhas Synapse
Por default, a transmissão do Azure Synapse oferece garantia exata de ponta a ponta para gravar dados em uma tabela do Azure Synapse, acompanhando de forma confiável o progresso da query usando uma combinação de localização do ponto de verificação no DBFS, tabela de ponto de verificação no Azure Synapse e mecanismo de bloqueio para garantir que a transmissão possa lidar com quaisquer tipos de falhas, tentativas e reinicializações query .
Opcionalmente, você pode selecionar uma semântica menos restritiva de pelo menos uma vez para a transmissão do Azure Synapse definindo a opção spark.databricks.sqldw.streaming.exactlyOnce.enabled
como false
, caso em que a duplicação de dados pode ocorrer no caso de falhas intermitentes de conexão com o Azure Synapse ou encerramento inesperado query .
sintaxe estruturada de transmissão para gravação no Azure Synapse
Os exemplos de código a seguir demonstram gravações de transmissão para Synapse usando transmissão estruturada em Scala e Python:
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
Para obter uma lista completa de configurações, consulte dadosquery no Azure Synapse Analytics.
Gerenciamento de tabela de ponto de verificação transmitido Synapse
O conector do Azure Synapse não exclui a tabela de checkpoint de transmissão que é criada quando uma nova query de transmissão é iniciada. Esse comportamento é consistente com o checkpointLocation
normalmente especificado para armazenamento de objeto. Databricks recomenda que você exclua periodicamente tabelas de ponto de verificação para query que não serão executadas no futuro.
Por default, todas as tabelas de ponto de verificação têm o nome <prefix>_<query-id>
, em que <prefix>
é um prefixo configurável com valor default databricks_streaming_checkpoint
e query_id
é um ID query de transmissão com _
caracteres removidos.
Para encontrar todas as tabelas de pontos de verificação para query de transmissão obsoletas ou excluídas, execute a query:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
Você pode configurar o prefixo com a opção de configuração do Spark SQL spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
.
Referência de opções de transmissão do conector Databricks Synapse
Os OPTIONS
fornecidos no Spark SQL suportam as seguintes opções de transmissão além das opções de lotes:
Parâmetro |
Obrigatório |
Padrão |
Notas |
---|---|---|---|
|
Sim |
Sem default |
Local no DBFS que será usado pela transmissão estruturada para gravar metadados e informações de ponto de verificação. Veja Recuperando-se de Falhas com Checkpointing no guia de programação estruturada da transmissão. |
|
Não |
0 |
Indica quantos (últimos) diretórios temporários manter para limpeza periódica de micro lotes em transmissão. Quando definido como |
Observação
checkpointLocation
e numStreamingTempDirsToKeep
são relevantes apenas para gravações de transmissão de Databricks para uma nova tabela no Azure Synapse.