初めての構造化ストリーミングワークロードを実行する
この記事では、Databricksで最初の構造化ストリーミングクエリを実行するために必要なコード例と基本概念を説明します。構造化ストリーミングは、ほぼリアルタイムおよび増分処理のワークロードに使用できます。
構造化ストリーミングは、DLTでストリーミングテーブルを強化するいくつかのテクノロジーの1つです。 Databricks では、すべての新しい ETL、インジェスト、構造化ストリーミング ワークロードに DLT を使用することをお勧めします。DLTとはを参照してください。
DLT では、ストリーミングテーブルを宣言するための構文が若干変更されていますが、ストリーミングの読み取りと変換を構成するための一般的な構文は、 Databricksのすべてのストリーミングのユースケースに適用されます。 また、DLT は、状態情報、メタデータ、および多数の構成を管理することで、ストリーミングを簡素化します。
Auto Loaderを使用してオブジェクトストレージからストリーミングデータを読み取る
次の例は、形式とオプションを示すために cloudFiles
を使用するAuto Loaderを使用してJSONデータを読み込む方法を示しています。schemaLocation
オプションを使用すると、スキーマの推論と展開が可能になります。次のコードをDatabricksノートブックのセルに貼り付け、セルを実行して、raw_df
という名前のストリーミングデータフレームを作成します。
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Databricks上の他の読み込み操作と同様に、ストリーミング読み込みを設定しても、実際にはデータは読み込まれません。ストリームが開始される前に、データに対するアクションをトリガーする必要があります。
ストリーミング データフレーム で display()
を呼び出すと、ストリーミング ジョブが開始されます。 ほとんどの構造化ストリーミングのユースケースでは、ストリームをトリガーするアクションは、シンクにデータを書き込むことです。 構造化ストリーミングについては、本番運用に関する考慮事項を参照してください。
ストリーミング変換の実行
構造化ストリーミングは、DatabricksやSpark SQLで利用可能な変換のほとんどをサポートしています。MLflowモデルをUDFとしてロードし、変換としてストリーミング予測を行うこともできます。
次のコード例では、Spark SQL関数を使用して、取り込まれたJSONデータを追加情報でエンリッチする簡単な変換を完了します。
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
結果の transformed_df
には、データソースに到着したときに各レコードを読み込んで変換するためのクエリ命令が含まれています。
構造化ストリーミングは、データソースを無制限または無限のデータセットとして扱います。このように、いくつかの変換は、無限のアイテムをソートする必要があるため、構造化ストリーミングワークロードではサポートされません。
ほとんどの集計と多くの結合では、ウォーターマーク、ウィンドウ、および出力モードで状態情報を管理する必要があります。 ウォーターマークを適用してデータ処理のしきい値を制御するを参照してください。
Delta Lake への増分バッチ書き込みを実行する
以下の例では、指定されたファイルパスとチェックポイントを使用してデルタレイクに書き込みます。
設定するストリーミングライターごとに、必ず一意のチェックポイント場所を指定してください。チェックポイントは、ストリームの一意のIDを提供し、処理されたすべてのレコードと、ストリーミングクエリに関連する状態情報を追跡します。
トリガーの availableNow
設定は、ソースデータセットの未処理のレコードをすべて処理してからシャットダウンするように構造化ストリーミングに指示するため、ストリームを実行したままにすることを心配することなく、次のコードを安全に実行できます。
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
この例では、データソースに新しいレコードが到着しないため、このコードを繰り返し実行しても新しいレコードは取り込まれません。
構造化ストリーミングの実行により、自動終了によるコンピューティングリソースのシャットダウンを防ぐことができます。予期せぬ出費を避けるため、ストリーミングクエリーは必ず終了させてください。
Delta Lake からのデータの読み取り、変換、Delta Lake への書き込み
Delta Lake は、ソースとシンクの両方として構造化ストリーミングを操作するための広範なサポートを備えています。 Delta テーブル ストリーミングの読み取りと書き込みを参照してください。
次の例は、デルタテーブルからすべての新しいレコードを増分的に読み込み、別のデルタテーブルのスナップショットと結合して、デルタテーブルに書き込む構文例を示しています。
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
ソーステーブルの読み込み、ターゲットテーブルおよび指定されたチェックポイントの場所への書き込みを行うには、適切な権限が設定されている必要があります。データソースとシンクに関連する値を使用して、角括弧(<>
)で示されたすべてのパラメーターを入力します。
DLT は、Delta Lake パイプラインを作成するための完全な宣言型構文を提供し、トリガーやチェックポイントなどのプロパティを自動的に管理します。DLTとはを参照してください。
Kafka からのデータの読み取り、変換、Kafka への書き込み
Apache Kafkaやその他のメッセージングバスでは、大規模なデータセットでレイテンシーが最も低くなります。Databricksを使用して、Kafkaから取り込んだデータに変換を適用し、データをKafkaに書き戻すことができます。
クラウドオブジェクトストレージにデータを書き込むと、レイテンシーのオーバーヘッドが増えます。メッセージングバスからのデータをDelta Lakeに保存したいが、ストリーミングワークロードのレイテンシーを可能な限り低くする必要がある場合、データをレイクハウスに取り込み、ダウンストリームのメッセージングバスシンクにほぼリアルタイムの変換を適用するために、個別のストリーミングジョブを構成することを推奨します。
次のコード例は、KafkaからのデータをDeltaテーブルのデータと結合し、Kafkaに書き戻すことでデータをエンリッチする簡単なパターンを示しています。
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Kafka サービスへのアクセスには、適切な権限が設定されている必要があります。 山括弧 (<>
) で示されているすべてのパラメーターには、データソースとシンクに関連する値を使用して入力します。 「Apache Kafka と Databricks を使用したストリーム処理」を参照してください。