メインコンテンツまでスキップ

Databricks の構造化ストリーミングパターン

これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミングの使用を開始する

構造化ストリーミングを初めて使用する場合は、「 初めての構造化ストリーミングワークロードを実行する」を参照してください。

Python での構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散型、低遅延、スケーラブル、高可用性の OLTP データベースです。

構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、 RDD と データフレーム APIsの両方をサポートしており、ストリーミング データの書き込みをネイティブにサポートしています。 大事な 対応するバージョンの spark-Cassandra-connector-assembly を使用する必要があります。

次の例では、 Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペースおよびテーブル名などの接続構成も指定します。

Python
spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()

Python で foreachBatch() を使用して Azure Synapse Analytics に書き込む

streamingDF.writeStream.foreachBatch() 既存のバッチデータライターを再利用して、 ストリーミング クエリの出力を Azure Synapse Analytics に出力します。 詳細については、 foreachBatch のドキュメント を参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics でデータのクエリを実行する」を参照してください。

Python
from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)

ストリームとストリームの結合

これらの 2 つのノートブックは、 Python と Scalaでストリーム-ストリーム結合を使用する方法を示しています。

ストリーム-ストリーム join Python ノートブック

Open notebook in new tab

ストリーム-ストリーム join Scala ノートブック

Open notebook in new tab