メインコンテンツまでスキップ

宣言型パイプラインで更新を実行 Lakeflow

この記事では、パイプラインの更新について説明し、更新をトリガーする方法の詳細を示します。

パイプライン更新とは何ですか?

パイプラインを作成し、実行する準備ができたら、 更新 を開始します。パイプラインの更新では次の処理が行われます。

  • 正しい構成でクラスターを開始します。
  • 定義されたすべてのテーブルとビューを検出し、無効な列名、依存関係の欠落、構文エラーなどの分析エラーがないかチェックします。
  • 使用可能な最新のデータでテーブルとビューを作成または更新します。

ドライ実行を使用すると、テーブルの作成または更新を待たずに、パイプラインのソース コードに問題がないかチェックできます。 この機能は、間違ったテーブル名や列名など、パイプライン内のエラーをすばやく見つけて修正できるため、パイプラインの開発またはテスト時に役立ちます。

パイプラインの更新はどのようにトリガーされますか?

パイプラインの更新を開始するには、次のいずれかのオプションを使用します。

更新トリガー

詳細

手動

LakeFlow Pipelines Editor またはパイプライン リストからパイプラインの更新を手動でトリガーできます。 「パイプラインの更新を手動でトリガーする」を参照してください。

スケジュール

ジョブを使用してパイプラインの更新をスケジュールできます。ジョブのパイプライン タスクを参照してください。

プログラム

サードパーティのツール、 APIs、および CLI を使用して、プログラムで更新をトリガーできます。 「ワークフローの実行 Lakeflow 宣言型パイプライン」および「パイプライン API」を参照してください。

パイプラインの更新を手動でトリガーする

パイプラインの更新を手動でトリガーするには、次のいずれかのオプションを使用します。

  • LakeFlow Pipelines Editor から完全なパイプライン、またはパイプラインのサブセット (単一のソース ファイル、または単一のテーブル) を実行します。 詳細については、 「実行パイプライン コード」を参照してください。
  • ジョブとパイプラインの リストから完全なパイプラインを実行します。 クリック再生アイコン。リスト内のパイプラインと同じ行にあります。
  • パイプラインモニタリングページから、LDPスタートアイコンボタン。
注記

手動でトリガーされたパイプライン更新のデフォルトの動作は、パイプラインで定義されているすべてのデータセットを更新することです。

パイプライン更新セマンティクス

次の表は、マテリアライズドビューとストリーミング テーブルの確実更新、完全更新、およびリセット チェックポイントの動作を説明しています。

アップデートのタイプ

マテリアライズドビュー

ストリーミングテーブル

更新(デフォルト)

定義クエリの現在の結果を反映するように結果を更新します。コストを調査し、コスト効率が高い場合は増分更新を実行します。

ストリーミング テーブルとフローで定義されたロジックを通じて新しいレコードを処理します。

フルリフレッシュ

定義クエリの現在の結果を反映するように結果を更新します。

ストリーミングテーブルからデータをクリアし、フローから状態情報 (チェックポイント) をクリアし、データソースからすべてのレコードを再処理します。

ストリーミングフローのチェックポイントをリセットする

マテリアライズドビューには適用されません。

フローから状態情報 (チェックポイント) をクリアしますが、ストリーミング テーブルからデータはクリアせず、データ ソースからすべてのレコードを再処理します。

デフォルトでは、パイプライン内のすべてのマテリアライズドビューとストリーミングテーブルは、更新のたびに更新されます。 オプションで、次の機能を使用して更新からテーブルを省略できます。

これら両方の機能は、デフォルトの更新セマンティクスまたは完全な更新をサポートしています。オプションで、 [更新するテーブルの選択] ダイアログを使用して、失敗したテーブルの更新を実行するときに追加のテーブルを除外できます。

ストリーミング テーブルの場合、関連付けられたストリーミング テーブルからのデータではなく、選択したフローのストリーミング チェックポイントをクリアすることを選択できます。 選択したフローのチェックポイントをクリアするには、Databricks REST API を使用して更新を開始します。選択的ストリーミング フローのチェックポイントをクリアするには、パイプラインの更新を開始するを参照してください。

完全な更新を使用する必要がありますか?

Databricks では、必要な場合にのみ完全更新を実行することを推奨しています。完全な更新では、常に、データセットを定義するロジックを通じて、指定されたデータ ソースからのすべてのレコードが再処理されます。 完全な更新を完了するために必要な時間とリソースは、ソース データのサイズと相関関係にあります。

マテリアライズドビューは、安心または完全更新のどちらが使用されても同じ結果を返します。 ストリーミング テーブルで完全な更新を使用すると、すべての状態処理とチェックポイント情報がリセットされ、入力データが利用できなくなった場合にレコードが削除される可能性があります。

Databricks 、テーブルまたはビューの目的の状態を再作成するために必要なデータが入力データ ソースに含まれている場合にのみ、完全な更新を推奨します。 入力ソース データが利用できなくなった次のシナリオと、完全更新を実行した結果について考えてみましょう。

データソース

入力データが存在しない理由

完全リニューアルの成果

Kafka

短期保持閾値

Kafka ソースに存在しなくなったレコードは、ターゲット テーブルから削除されます。

オブジェクトストレージ内のファイル

ライフサイクルポリシー

ソース ディレクトリに存在しなくなったデータ ファイルは、ターゲット テーブルから削除されます。

テーブル内のレコード

コンプライアンスのため削除

ソース テーブルに存在するレコードのみが処理されます。

テーブルまたはビューで完全更新が実行されないようにするには、テーブル・プロパティの pipelines.reset.allowedfalseに設定します。 Lakeflow 宣言型パイプライン テーブルのプロパティを参照してください。また、追加フローを使用して、完全な更新を必要とせずに既存のストリーミングテーブルにデータを追加することもできます。

選択したテーブルのパイプラインの更新を開始します

オプションで、パイプライン内の選択したテーブルのデータのみを再処理することもできます。たとえば、開発中に 1 つのテーブルのみを変更してテスト時間を短縮したい場合や、パイプラインの更新が失敗し、失敗したテーブルのみを更新したい場合などです。

LakeFlow Pipelinesエディターには、ソース ファイル、選択したテーブル、または単一のテーブルを再処理するためのオプションがあります。 詳細については、 「パイプライン コードの実行」を参照してください。

失敗したテーブルのパイプラインの更新を開始する

パイプライン グラフ内の 1 つ以上のテーブルでエラーが発生したためにパイプラインの更新が失敗した場合は、失敗したテーブルと下流の依存関係のみの更新を開始できます。

注記

除外されたテーブルは、失敗したテーブルに依存している場合でも更新されません。

失敗したテーブルを更新するには、パイプラインモニタリングページで、 [失敗したテーブルを更新] をクリックします。

パイプライン監視ページから選択した失敗したテーブルのみを更新するには:

  1. クリックボタンダウン 失敗したテーブルの更新 ボタンの横にあるをクリックし、 更新するテーブルの選択を クリックします。 更新するテーブルの選択 ダイアログが表示されます。

  2. 更新するテーブルを選択するには、各テーブルをクリックします。選択されたテーブルは強調表示され、ラベルが付けられます。テーブルを更新から削除するには、テーブルをもう一度クリックします。

  3. 選択範囲の更新 をクリックします。

注記

選択範囲の更新 ボタンには、選択したテーブルの数が括弧内に表示されます。

選択したテーブルにすでに取り込まれたデータを再処理するには、ブルーダウンキャレット 選択を更新 ボタンの横にある をクリックし、 選択の完全更新 を クリックします。

パイプラインの更新を開始して、選択的ストリーミング フローのチェックポイントをクリアします。

オプションで、すでに取り込まれたデータをクリアせずに、パイプライン内の選択したストリーミング フローのデータを再処理することもできます。

注記

選択されていないフローは、REFRESH 更新を使用して実行されます。full_refresh_selectionまたはrefresh_selectionを指定して、他のテーブルを選択的に更新することもできます。

選択したストリーミング チェックポイントを更新するための更新を開始するには、Lakeflow宣言型パイプラインRESTAPI で updates リクエストを使用します。次の例では、 curl コマンドを使用して updates 要求を呼び出し、パイプラインの更新を開始します。

Bash
curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": [<streaming flow1>, <streaming flow 2>...]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates

テーブルの更新を待たずにパイプラインのエラーをチェックする

備考

プレビュー

LakeFlow宣言型パイプラインDry run機能はパブリック プレビュー段階にあります。

完全な更新を実行せずにパイプラインのソースコードが有効かどうかを確認するには、 ドライ ラン を使用します。ドライ実行は、パイプラインで定義されたデータセットとフローの定義を解決しますが、データセットを具体化または公開することはありません。 dry 実行中に見つかったエラー (テーブル名や列名が正しくないなど) は UI に報告されます。

ドライ実行を開始するには、ブルーダウンキャレット [スタート] の横にあるパイプラインの詳細ページで [Dry 実行] を クリックします。

dry 実行が完了すると、下部パネルのイベント トレイにエラーが表示されます。 イベント トレイをクリックすると、下部のパネルに見つかった問題が表示されます。さらに、イベント ログには、dry 実行に関連するイベントのみが表示され、DAG にはメトリクスが表示されません。 エラーが見つかった場合は、イベント ログに詳細が記録されます。

最新の dry 実行の結果のみが表示されます。 dry 実行 が最新の 実行 更新だった場合は、更新履歴でそれを選択すると結果を確認できます。 ドライ実行の後に別の更新が実行されると、結果は UI で利用できなくなります。

開発モード

開発モードをオンにして、 LakeFlow Pipelines Editor からパイプラインを実行します。 スケジュールされたパイプラインは、デフォルトで開発モードをオフにした状態で実行されます。本番運用でパイプラインがどのように実行されるかをテストしたい場合は、エディターのドロップダウンから 別の設定で実行 を選択することで、開発モードを使用するかどうかを対話的に選択できます。

注記

従来のノートブック エディターで作成されたパイプラインは、デフォルトで開発モードを使用します。パイプラインモニタリングページの 設定 を選択すると、設定を確認または変更できます。 モニタリングページは、ワークスペースの左側にある ジョブ&パイプライン ボタンからアクセスできます。 パイプライン アセット ブラウザー で実行結果をクリックして、パイプライン エディターからモニタリング ページに直接ジャンプすることもできます。

パイプラインを開発モードで実行すると、 Lakeflow 宣言型パイプライン システムは次の処理を行います。

  • 再起動のオーバーヘッドを回避するためにクラスターを再利用します。デフォルトでは、開発モードが有効になっている場合、クラスターは 2 時間実行されます。これはLakeFlow宣言型パイプラインのクラシック コンピュートの構成pipelines.clusterShutdown.delay設定で変更できます。
  • パイプラインの再試行を無効にするので、エラーをすぐに検出して修正できます。

開発モードをオフにすると、 LakeFlow宣言型パイプライン システムは次のことを行います。

  • メモリ リークや古い資格情報など、特定の回復可能なエラーのクラスターを再開します。
  • クラスターの開始の失敗など、特定のエラーが発生した場合に実行を再試行します。
注記

開発モードのオン/オフを切り替えると、クラスターとパイプラインの実行動作のみが制御されます。パブリッシュ テーブルのカタログ内の保存場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。