Pular para o conteúdo principal

transmissão estructurada patterns on Databricks

Contém Notebook e exemplos de código para padrões comuns de trabalho com transmissão estruturada em Databricks.

Como começar com a transmissão estruturada

Se o senhor for novato em transmissão estruturada, veja como executar sua primeira carga de trabalho de transmissão estruturada.

Escreva para Cassandra como um sumidouro para transmissão estruturada em Python

O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalável e altamente disponível.

A transmissão estruturada funciona com o site Cassandra por meio do conectorSpark Cassandra. Esse conector é compatível com RDD e DataFrame APIs, e tem suporte nativo para gravação de dados de transmissão. Importante O senhor deve usar a versão correspondente da montagem do conector de faíscaCassandra.

O exemplo a seguir se conecta a um ou mais hosts em um clustering de banco de dados Cassandra. Ele também especifica configurações de conexão, como a localização do ponto de verificação e os nomes específicos do espaço de teclas e da tabela:

Python
spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()

Escreva para o Azure Synapse Analytics usando foreachBatch() em Python

streamingDF.writeStream.foreachBatch() permite que o senhor reutilize os gravadores de dados de lotes existentes para gravar a saída de uma consulta de transmissão em Azure Synapse Analytics. Consulte a documentação do ForEachBatch para obter detalhes.

Para executar este exemplo, o senhor precisa do conector Azure Synapse Analytics. Para obter detalhes sobre o conector do Azure Synapse Analytics, consulte Consultar dados no Azure Synapse Analytics.

Python
from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)

transmissão-transmissão join

Estes dois Notebooks mostram como usar a união transmissão-transmissão em Python e Scala.

transmissão-transmissão join Python Notebook

Open notebook in new tab

transmissão-transmissão join Scala Notebook

Open notebook in new tab