transmissão estruturada grava no Azure Synapse

O conector Azure Synapse oferece suporte de gravação estructurada transmissão eficiente e escalonável para Azure Synapse que fornece experiência de usuário consistente com gravações de lotes e usa COPY para grandes transferências de dados entre clusters Databricks e instância Azure Synapse.

O suporte de transmissão estruturada entre Databricks e Synapse fornece semântica simples para configuração Job ETL incremental. O modelo utilizado para carregar dados de Databricks para Synapse introduz latência que pode não cumprir os requisitos de SLA para cargas de trabalho quase em tempo real. Consulte os dadosquery no Azure Synapse Analytics.

Modos de saída suportados para gravações transmitidas no Synapse

O conector do Azure Synapse oferece suporte aos modos de saída Append e Complete para acréscimos e agregações de registros. Para mais detalhes sobre modos de saída e matriz de compatibilidade, consulte o guia estruturada da transmissão.

Semântica de tolerância a falhas Synapse

Por default, a transmissão do Azure Synapse oferece garantia exata de ponta a ponta para gravar dados em uma tabela do Azure Synapse, acompanhando de forma confiável o progresso da query usando uma combinação de localização do ponto de verificação no DBFS, tabela de ponto de verificação no Azure Synapse e mecanismo de bloqueio para garantir que a transmissão possa lidar com quaisquer tipos de falhas, tentativas e reinicializações query .

Opcionalmente, você pode selecionar uma semântica menos restritiva de pelo menos uma vez para a transmissão do Azure Synapse definindo a opção spark.databricks.sqldw.streaming.exactlyOnce.enabled como false, caso em que a duplicação de dados pode ocorrer no caso de falhas intermitentes de conexão com o Azure Synapse ou encerramento inesperado query .

sintaxe estruturada de transmissão para gravação no Azure Synapse

Os exemplos de código a seguir demonstram gravações de transmissão para Synapse usando transmissão estruturada em Scala e Python:

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

Para obter uma lista completa de configurações, consulte dadosquery no Azure Synapse Analytics.

Gerenciamento de tabela de ponto de verificação transmitido Synapse

O conector do Azure Synapse não exclui a tabela de checkpoint de transmissão que é criada quando uma nova query de transmissão é iniciada. Esse comportamento é consistente com o checkpointLocation normalmente especificado para armazenamento de objeto. Databricks recomenda que você exclua periodicamente tabelas de ponto de verificação para query que não serão executadas no futuro.

Por default, todas as tabelas de ponto de verificação têm o nome <prefix>_<query-id>, em que <prefix> é um prefixo configurável com valor default databricks_streaming_checkpoint e query_id é um ID query de transmissão com _ caracteres removidos.

Para encontrar todas as tabelas de pontos de verificação para query de transmissão obsoletas ou excluídas, execute a query:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

Você pode configurar o prefixo com a opção de configuração do Spark SQL spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix.

Referência de opções de transmissão do conector Databricks Synapse

Os OPTIONS fornecidos no Spark SQL suportam as seguintes opções de transmissão além das opções de lotes:

Parâmetro

Obrigatório

Padrão

Notas

checkpointLocation

Sim

Sem default

Local no DBFS que será usado pela transmissão estruturada para gravar metadados e informações de ponto de verificação. Veja Recuperando-se de Falhas com Checkpointing no guia de programação estruturada da transmissão.

numStreamingTempDirsToKeep

Não

0

Indica quantos (últimos) diretórios temporários manter para limpeza periódica de micro lotes em transmissão. Quando definido como 0, a exclusão do diretório é acionada imediatamente após a confirmação do microlote, caso contrário, desde que o número de microlotes mais recentes seja mantido e o restante dos diretórios seja removido. Use -1 para desativar a limpeza periódica.

Observação

checkpointLocation e numStreamingTempDirsToKeep são relevantes apenas para gravações de transmissão de Databricks para uma nova tabela no Azure Synapse.