ストリーミングとインクリメンタル インジェスト
Databricks では、 Apache Spark 構造化ストリーミングを使用して、インジェスト ワークロードに関連する次のような多数の製品をバックアップします。
Auto Loader
COPY INTO
Delta Liveテーブルパイプライン
Databricks SQL のマテリアライズド ビューとストリーミング テーブル
この記事では、ストリーミング バッチ処理セマンティクスと増分バッチ処理セマンティクスの違いの一部について説明し、Databricks で必要なセマンティクスのインジェスト ワークロードを構成する概要について説明します。
ストリーミングと増分バッチ取り込みの違いは何ですか?
可能なインジェストワークフロー構成は、ほぼリアルタイムの処理から頻度の低い増分バッチ処理まで多岐にわたります。 どちらのパターンも Apache Spark 構造化ストリーミングを使用してインクリメンタル処理を強化しますが、セマンティクスは異なります。 わかりやすくするために、この記事では、ほぼリアルタイムのインジェストを ストリーミング インジェスト と呼び、頻度の低いインクリメンタル処理を インクリメンタル バッチ インジェストと呼んでいます。
ジョブによる取り込み
Databricks ジョブを使用すると、ノートブック、ライブラリ、Delta Live Tables パイプライン、Databricks SQL クエリなどのワークフローを調整し、タスクをスケジュールできます。
注:
すべての Databricks コンピュート タイプとタスク タイプを使用して、増分バッチ インジェストを構成できます。 ストリーミング インジェストは、クラシック ジョブ コンピュート と Delta Live Tablesの本番運用でのみサポートされています。
ジョブには、主に 2 つの操作モードがあります。
連続ジョブ は、障害が発生すると自動的に再試行します。 このモードは、ストリーミング インジェスト用です。
トリガーされたジョブは、 トリガーされたときにタスクを実行します。 トリガーには、次のものが含まれます。
指定したスケジュールでジョブを実行する時間ベースのトリガー。
指定した場所にファイルが配置されたときにジョブを実行するファイルベースのトリガー。
REST API 呼び出し、Databricks CLI コマンドの実行、ワークスペース UI の [ 今すぐ実行 ] ボタンのクリックなど、その他のトリガー。
増分バッチ ワークロードの場合は、次のように AvailableNow
トリガー モードを使用してジョブを構成します。
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
ストリーミングワークロードの場合、デフォルトのトリガー間隔は processingTime ="500ms"
です。 次の例は、マイクロバッチを 5 秒ごとに処理する方法を示しています。
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
重要
サーバレス ジョブは、構造化ストリーミングの Scala、連続モード、または時間ベースのトリガ間隔をサポートしていません。 クラシック ジョブは、ほぼリアルタイムのインジェスト セマンティクスが必要な場合は、使用します。
Delta Live Tables を使用したインジェスト
ジョブと同様に、Delta Live Tables パイプラインは、トリガー モードまたは連続モードで実行できます。 ストリーミング テーブルでほぼリアルタイムのストリーミング セマンティクスを実現するには、連続モードを使用します。
ストリーミングテーブルを使用して、クラウドオブジェクトストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からのストリーミングまたは増分バッチ取り込みを設定します。
LakeFlow Connectは、Delta Live Tablesを使用して、接続されたシステムからのインジェストパイプラインを構成します。 LakeFlow Connectを参照してください。
具体化されたビューは、バッチ ワークロードと同等の操作セマンティクスを保証しますが、多くの操作を最適化して結果を増分的に計算できます。 マテリアライズド・ビューの増分更新を参照してください。
Databricks SQL を使用したインジェスト
ストリーミングテーブルを使用して、クラウドオブジェクトストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からの増分バッチ取り込みを設定できます。
マテリアライズドビューを使用すると、指定した一連の操作に対して完全に再生可能なソースからの増分バッチ処理を設定できます。 マテリアライズド・ビューの増分更新を参照してください。
COPY INTO
は、クラウド・オブジェクト・ストレージ内のデータ・ファイルの増分バッチ処理のための使い慣れた SQL 構文を提供します。 COPY INTO
動作は、クラウド・オブジェクト・ストレージのストリーミング・テーブルでサポートされるパターンと似ていますが、すべてのデフォルト設定が、サポートされているすべてのファイル・フォーマットで同等であるとは限りません。