構造化ストリーミングの初回ワークロードを実行する

この記事では、Databricksで最初の構造化ストリーミングクエリを実行するために必要なコード例と基本概念を説明します。構造化ストリーミングは、ほぼリアルタイムおよび増分処理のワークロードに使用できます。

構造化ストリーミングは、デルタライブテーブルのストリーミングテーブルを支えるテクノロジーの一つです。Databricksは、すべての新しいETL、インジェスト、および構造化ストリーミングワークロードに Delta Live Tablesを使用することを推奨しています「 Delta Live Tables とは」を参照してください。

Delta Live Tablesでは、ストリーミングテーブルを宣言するためのわずかに変更された構文が提供されていますが、ストリーミングの読み取りと変換を構成するための一般的な構文は、Databricksのすべてのストリーミングユースケースに適用されます。また、Delta Live Tablesは、状態情報、メタデータ、および多数の構成を管理することにより、ストリーミングを簡素化します。

Auto Loaderを使用してオブジェクトストレージからストリーミングデータを読み込む

次の例は、形式とオプションを示すために cloudFiles を使用するAuto Loaderを使用してJSONデータを読み込む方法を示しています。schemaLocation オプションを使用すると、スキーマの推論と展開が可能になります。次のコードをDatabricksノートブックのセルに貼り付け、セルを実行して、raw_df という名前のストリーミングデータフレームを作成します。

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Databricks上の他の読み込み操作と同様に、ストリーミング読み込みを設定しても、実際にはデータは読み込まれません。ストリームが開始される前に、データに対するアクションをトリガーする必要があります。

ストリーミング DataFrame で display() を呼び出すと、ストリーミング ジョブが開始されます。 ほとんどの構造化ストリーミングのユースケースでは、ストリームをトリガーするアクションは、シンクにデータを書き込むことです。 構造化ストリーミングについては、本番運用に関する考慮事項を参照してください。

ストリーミング変換を実行する

構造化ストリーミングは、DatabricksやSpark SQLで利用可能な変換のほとんどをサポートしています。MLflowモデルをUDFとしてロードし、変換としてストリーミング予測を行うこともできます。

次のコード例では、Spark SQL関数を使用して、取り込まれたJSONデータを追加情報でエンリッチする簡単な変換を完了します。

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

結果の transformed_df には、データソースに到着したときに各レコードを読み込んで変換するためのクエリ命令が含まれています。

構造化ストリーミングは、データソースを無制限または無限のデータセットとして扱います。このように、いくつかの変換は、無限のアイテムをソートする必要があるため、構造化ストリーミングワークロードではサポートされません。

ほとんどの集計と多くの結合では、透かし、ウィンドウ、出力モードを使用して状態情報を管理する必要があります。「透かしを適用してデータ処理のしきい値を制御する」を参照してください。

Delta Lakeへの増分バッチ書き込みを実行する

以下の例では、指定されたファイルパスとチェックポイントを使用してデルタレイクに書き込みます。

重要

設定するストリーミングライターごとに、必ず一意のチェックポイント場所を指定してください。チェックポイントは、ストリームの一意のIDを提供し、処理されたすべてのレコードと、ストリーミングクエリに関連する状態情報を追跡します。

トリガーの availableNow 設定は、ソースデータセットの未処理のレコードをすべて処理してからシャットダウンするように構造化ストリーミングに指示するため、ストリームを実行したままにすることを心配することなく、次のコードを安全に実行できます。

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

この例では、データソースに新しいレコードが到着しないため、このコードを繰り返し実行しても新しいレコードは取り込まれません。

警告

構造化ストリーミングの実行により、自動終了によるコンピューティングリソースのシャットダウンを防ぐことができます。予期せぬ出費を避けるため、ストリーミングクエリーは必ず終了させてください。

Delta Lakeからデータを読み取り、変換し、Delta Lakeに書き込む

Delta Lake では、構造化ストリーミングをソースとシンクの両方として操作するための広範なサポートがあります。 「テーブル ストリーミングの読み取りと書き込みDelta 」を参照してください。

次の例は、デルタテーブルからすべての新しいレコードを増分的に読み込み、別のデルタテーブルのスナップショットと結合して、デルタテーブルに書き込む構文例を示しています。

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

ソーステーブルの読み込み、ターゲットテーブルおよび指定されたチェックポイントの場所への書き込みを行うには、適切な権限が設定されている必要があります。データソースとシンクに関連する値を使用して、角括弧(<>)で示されたすべてのパラメーターを入力します。

Delta Live Tablesは、Delta Lakeパイプラインを作成するための完全な宣言型構文を提供し、トリガーやチェックポイントなどのプロパティを自動的に管理します。「 Delta Live Tables とは」を参照してください。

Kafkaからデータを読み込み、変換し、Kafkaに書き込む

Apache Kafkaやその他のメッセージングバスでは、大規模なデータセットでレイテンシーが最も低くなります。Databricksを使用して、Kafkaから取り込んだデータに変換を適用し、データをKafkaに書き戻すことができます。

クラウドオブジェクトストレージにデータを書き込むと、レイテンシーのオーバーヘッドが増えます。メッセージングバスからのデータをDelta Lakeに保存したいが、ストリーミングワークロードのレイテンシーを可能な限り低くする必要がある場合、データをレイクハウスに取り込み、ダウンストリームのメッセージングバスシンクにほぼリアルタイムの変換を適用するために、個別のストリーミングジョブを構成することを推奨します。

次のコード例は、KafkaからのデータをDeltaテーブルのデータと結合し、Kafkaに書き戻すことでデータをエンリッチする簡単なパターンを示しています。

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

Kafka サービスにアクセスするための適切なアクセス許可が構成されている必要があります。 山括弧(<>)で示されているすべてのパラメーターを、データソースとシンクに関連する値を使用して入力します。 「Apache Kafka と Databricks を使用したストリーム処理」を参照してください。