トリガー パイプライン モードと連続パイプライン モード
この記事では、Lakeflow宣言型パイプラインのトリガー パイプライン モードと継続的パイプライン モードの操作セマンティクスについて説明します。
パイプライン モードは、コンピュートされているテーブルの種類には依存しません。 マテリアライズドビューとストリーミングテーブルはどちらのパイプラインモードでも更新できます。
トリガーと連続を切り替えるには、パイプラインの作成または編集中に、パイプライン設定の パイプライン モード オプションを使用します。 「Lakeflow宣言型パイプラインの構成」を参照してください。
Databricks SQL で定義されているマテリアライズドビューとストリーミング テーブルの更新操作は、常にトリガー パイプライン モードを使用して実行されます。
トリガーされたパイプライン モードとは何ですか?
パイプラインが トリガー モードを使用する場合、システムはすべてのテーブルまたは選択されたテーブルを正常に更新した後で処理を停止し、更新の開始時に使用可能なデータに基づいて更新内の各テーブルが更新されるようにします。
連続パイプラインモードとは何ですか?
パイプラインが 連続 実行を使用する場合、Lakeflow宣言型パイプラインは、データソースに到着した新しいデータを処理して、パイプライン全体のテーブルを最新の状態に保ちます。
継続的な実行モードでの不要な処理を回避するために、パイプラインは依存する Delta テーブルを自動的に監視し、依存テーブルの内容が変更された場合にのみ更新を実行します。
データパイプラインモードを選択する
次の表は、トリガー パイプライン モードと連続パイプライン モードの違いを示しています。
重要な質問 | トリガー | 連続 |
---|---|---|
アップデートはいつ停止しますか? | 完了すると自動的に実行されます。 | 手動で停止するまで継続的に実行されます。 |
どのようなデータが処理されますか? | アップデート開始時に利用可能なデータ。 | 構成されたソースに到着したすべてのデータ。 |
これはどのようなデータの鮮度要件に最適ですか? | データの更新は 10 分ごと、1 時間ごと、または毎日実行されます。 | データの更新は 10 秒から数分ごとに行う必要があります。 |
トリガーされたパイプラインは、クラスターの実行がパイプラインを更新するのに十分な時間だけ行われるため、リソースの消費と費用を削減できます。 ただし、パイプラインがトリガーされるまで、新しいデータは処理されません。連続 パイプラインには 常時稼働中のクラスターが必要ですが、これはより高価ですが、処理の待ち時間が短縮されます。
連続パイプラインのトリガー間隔を設定する
パイプラインを連続モードに構成する場合、トリガー間隔を設定して、パイプラインが各フローの更新を開始する頻度を制御できます。
pipelines.trigger.interval
を使用すると、テーブルまたはパイプライン全体を更新するフローのトリガー間隔を制御できます。トリガーされたパイプラインは各テーブルを 1 回処理するため、 pipelines.trigger.interval
は連続パイプラインでのみ使用されます。
ストリーミング クエリとバッチ クエリでは安全性が異なるため、 Databricks個々のテーブルにpipelines.trigger.interval
設定することをお勧めします。 処理でパイプライン グラフ全体の更新を制御する必要がある場合のみ、パイプラインに値を設定します。
Python ではspark_conf
、SQL ではSET
を使用してテーブルにpipelines.trigger.interval
設定します。
@dp.table(
spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
return (<query>)
SET pipelines.trigger.interval=10 seconds;
CREATE OR REFRESH MATERIALIZED VIEW TABLE_NAME
AS SELECT ...
パイプラインにpipelines.trigger.interval
設定するには、パイプライン設定のconfiguration
オブジェクトに追加します。
{
"configuration": {
"pipelines.trigger.interval": "10 seconds"
}
}