ストリーミングとインクリメンタル インジェスト
Databricks では、 Apache Spark 構造化ストリーミングを使用して、インジェスト ワークロードに関連する次のような多数の製品をバックアップします。
- Auto Loader
COPY INTO
- DLT パイプライン
- Databricks SQL のマテリアライズド ビューとストリーミング テーブル
この記事では、ストリーミング バッチ処理セマンティクスと増分バッチ処理セマンティクスの違いの一部について説明し、Databricks で必要なセマンティクスのインジェスト ワークロードを構成する概要について説明します。
ストリーミングと増分バッチ取り込みの違いは何か
可能なインジェストワークフロー構成は、ほぼリアルタイムの処理から頻度の低い増分バッチ処理まで多岐にわたります。 どちらのパターンも Apache Spark 構造化ストリーミングを使用してインクリメンタル処理を強化しますが、セマンティクスは異なります。 わかりやすくするために、この記事では、ほぼリアルタイムのインジェストを ストリーミング インジェスト と呼び、頻度の低いインクリメンタル処理を インクリメンタル バッチ インジェスト と呼んでいます。
ストリーミング インジェスト
ストリーミングは、データ取り込みとテーブル更新のコンテキストでは、 Databricks が常時オンのインフラストラクチャを使用してソースからシンクにレコードをマイクロバッチで取り込む、リアルタイムデータに近い処理を指します。 ストリーミングワークロードは、インジェストを停止する障害が発生しない限り、設定されたデータソースから更新を継続的に取り込みます。
増分バッチ取り込み
増分バッチインジェストとは、すべての新しいレコードが短命なジョブのデータソースから処理されるパターンを指します。 増分バッチ取り込みは、多くの場合、スケジュールに従って行われますが、手動またはファイルの到着に基づいてトリガーすることもできます。
増分バッチ取り込みは 、バッチ 取り込み とは異なり、データソース内の新しいレコードを自動的に検出し、既に取り込まれたレコードを無視します。
ジョブによる取り込み
Databricks ジョブを使用すると、ワークフローを調整し、ノートブック、ライブラリ、DLT パイプライン、Databricks SQL クエリなどのタスクをスケジュールできます。
すべての Databricks コンピュート タイプとタスク タイプを使用して、増分バッチ インジェストを構成できます。 ストリーミング インジェストは、クラシック ジョブ コンピュートと DLT の本番運用でのみサポートされています。
ジョブには、主に 2 つの操作モードがあります。
- 連続ジョブ は、障害が発生すると自動的に再試行します。 このモードは、ストリーミング インジェスト用です。
- トリガーされたジョブは、 トリガーされたときにタスクを実行します。 トリガーには、次のものが含まれます。
- 指定したスケジュールでジョブを実行する時間ベースのトリガー。
- 指定した場所にファイルが配置されたときにジョブを実行するファイルベースのトリガー。
- REST API 呼び出し、Databricks CLI コマンドの実行、ワークスペース UI の [ 今すぐ実行 ] ボタンのクリックなど、その他のトリガー。
増分バッチ ワークロードの場合は、次のように AvailableNow
トリガー モードを使用してジョブを構成します。
- Python
- Scala
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(availableNow=True)
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.AvailableNow)
.toTable("table_name")
ストリーミングワークロードの場合、デフォルトのトリガー間隔は processingTime ="500ms"
です。 次の例は、マイクロバッチを 5 秒ごとに処理する方法を示しています。
- Python
- Scala
(df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(processingTime="5 seconds")
.toTable("table_name")
)
import org.apache.spark.sql.streaming.Trigger
df.writeStream
.option("checkpointLocation", <checkpoint-path>)
.trigger(Trigger.ProcessingTime, "5 seconds")
.toTable("table_name")
サーバレス ジョブは、構造化ストリーミングの Scala、連続モード、または時間ベースのトリガ間隔をサポートしていません。 クラシック ジョブは、ほぼリアルタイムのインジェスト セマンティクスが必要な場合は、使用します。
DLTによるインジェスト
ジョブと同様に、DLT パイプラインはトリガー モードまたは連続モードで実行できます。ストリーミングテーブルとリアルタイム ストリーミングに近いセマンティクスの場合は、連続モードを使用します。
ストリーミングテーブルを使用して、クラウドオブジェクトストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からのストリーミングまたは増分バッチ取り込みを設定します。
LakeFlow Connect は DLT を使用して、接続されたシステムからの取り込みパイプラインを構成します。 LakeFlow Connectを参照してください。
具体化されたビューは、バッチ ワークロードと同等の操作セマンティクスを保証しますが、多くの操作を最適化して結果を増分的に計算できます。 マテリアライズド・ビューの増分更新を参照してください。
Databricks SQL を使用したインジェスト
ストリーミングテーブルを使用して、クラウドオブジェクトストレージ、Apache Kafka、Amazon Kinesis、Google Pub/Sub、または Apache Pulsar からの増分バッチ取り込みを設定できます。
マテリアライズドビューを使用して、Delta ソースからのインクリメンタルバッチ処理を設定できます。 マテリアライズド・ビューの増分更新を参照してください。
COPY INTO
は、クラウド・オブジェクト・ストレージ内のデータ・ファイルの増分バッチ処理のための使い慣れた SQL 構文を提供します。 COPY INTO
動作は、クラウド・オブジェクト・ストレージのストリーミング・テーブルでサポートされるパターンと似ていますが、すべてのデフォルト設定が、サポートされているすべてのファイル・フォーマットで同等であるとは限りません。