宣言型パイプラインで更新を実行 Lakeflow
この記事では、パイプラインの更新について説明し、更新をトリガーする方法について詳しく説明します。
パイプラインの更新とは
パイプラインを作成し、実行する準備ができたら、 更新 を開始します。 パイプラインの更新では、次の処理が行われます。
- 正しい構成でクラスターを開始します。
- 定義されたすべてのテーブルとビューを検出し、無効な列名、依存関係の欠落、構文エラーなどの分析エラーをチェックします。
- 使用可能な最新のデータでテーブルとビューを作成または更新します。
ドライ実行を使用すると、テーブルの作成または更新を待たずに、パイプラインのソース コードに問題がないかチェックできます。 この機能は、間違ったテーブル名や列名など、パイプライン内のエラーをすばやく見つけて修正できるため、パイプラインの開発またはテスト時に役立ちます。
パイプラインの更新はどのようにトリガーされますか?
次のいずれかのオプションを使用して、パイプラインの更新を開始します。
更新トリガー | 詳細 |
---|---|
手動 | LakeFlow Pipelines Editor またはパイプライン リストからパイプラインの更新を手動でトリガーできます。 「パイプラインの更新を手動でトリガーする」を参照してください。 |
スケジュール | パイプラインの更新は、ジョブを使用してスケジュールできます。ジョブのパイプラインタスクを参照してください。 |
プログラム | サードパーティのツール、 APIs、および CLI を使用して、プログラムで更新をトリガーできます。 「ワークフローの実行 Lakeflow 宣言型パイプライン」および「パイプライン API」を参照してください。 |
パイプラインの更新を手動でトリガーする
次のいずれかのオプションを使用して、パイプラインの更新を手動でトリガーします。
- LakeFlow Pipelines Editor から完全なパイプライン、またはパイプラインのサブセット (単一のソース ファイル、または単一のテーブル) を実行します。 詳細については、 「実行パイプライン コード」を参照してください。
- ジョブとパイプラインの リストから完全なパイプラインを実行します。 クリック
リスト内のパイプラインと同じ行にあります。
- パイプラインモニタリングページから、
ボタン。
手動でトリガーされたパイプライン更新のデフォルトの動作は、パイプラインで定義されているすべてのデータセットを更新することです。
パイプライン更新セマンティクス
次の表では、マテリアライズドビュー とストリーミング テーブルのデフォルト 更新、完全更新、およびリセット チェックポイントの動作について説明します。
アップデートのタイプ | マテリアライズドビュー | ストリーミングテーブル |
---|---|---|
更新 (デフォルト) | 定義クエリの現在の結果を反映するように結果を更新します。コストを調査し、コスト効率が高い場合は増分更新を実行します。 | ストリーミングテーブルとフローで定義されたロジックを通じて、新しいレコードを処理します。 |
フルリフレッシュ | 結果を更新して、定義クエリの現在の結果を反映します。 | ストリーミングテーブルからデータをクリアし、フローから状態情報 (checkpoints) をクリアし、データソースからすべてのレコードを再処理します。 |
ストリーミング フロー チェックポイントのリセット | マテリアライズドビューには適用されません。 | フローから状態情報 (checkpoints) をクリアしますが、ストリーミングテーブルからはデータを消去せず、データソースからすべてのレコードを再処理します。 |
デフォルトでは、パイプライン内のすべてのマテリアライズドビューとストリーミングテーブルは、更新のたびに更新されます。 オプションで、次の機能を使用して更新からテーブルを省略できます。
- 更新するテーブルの選択 : この UI を使用して、更新を実行する前にマテリアライズドビュー と ストリーミングテーブルを追加または削除します。 「選択したテーブルのパイプライン更新を開始する」を参照してください。
- 失敗したテーブルの更新 : 失敗したマテリアライズドビューとストリーミングテーブル (ダウンストリームの依存関係を含む) の更新を開始します。 「失敗したテーブルのパイプライン更新を開始する」を参照してください。
これらの機能はどちらも、デフォルトの更新セマンティクスまたは完全更新をサポートしています。 オプションで、[ Select tables for update ] ダイアログを使用して、失敗したテーブルの更新を実行するときに追加のテーブルを除外できます。
ストリーミングテーブルの場合、関連付けられたストリーミングテーブルのデータではなく、選択したフローのストリーミングチェックポイントをクリアすることを選択できます。選択したフローのチェックポイントをクリアするには、Databricks REST API を使用して更新を開始します。「パイプラインの更新を開始して、選択的ストリーミング フローのチェックポイントをクリアする」を参照してください。
完全な更新を使用する必要がありますか?
Databricks では、必要な場合にのみフル更新を実行することをお勧めします。 完全更新では、データセットを定義するロジックを通じて、指定されたデータソースのすべてのレコードが常に再処理されます。 完全更新を完了するための時間とリソースは、ソース データのサイズと相関しています。
マテリアライズドビュー は、デフォルト更新と full 更新のどちらを使用しても同じ結果を返します。 ストリーミングテーブルで完全更新を使用すると、すべての状態処理とチェックポイント情報がリセットされ、入力データが使用できなくなった場合にレコードがドロップされる可能性があります。
Databricks は、入力データソースにテーブルまたはビューの目的の状態を再作成するために必要なデータが含まれている場合にのみ、完全な更新を推奨します。 入力ソース データが使用できなくなった次のシナリオと、完全更新を実行した結果について考えてみます。
データソース | 入力データが存在しない理由 | 全更新の結果 |
---|---|---|
Kafka | 短い保持しきい値 | Kafka ソースに存在しなくなったレコードは、ターゲットテーブルから削除されます。 |
オブジェクトストレージ内のファイル | ライフサイクルポリシー | ソース・ディレクトリーに存在しなくなったデータ・ファイルは、ターゲット・テーブルからドロップされます。 |
テーブル内のレコード | コンプライアンスのために削除されました | ソース テーブルに存在するレコードのみが処理されます。 |
テーブルまたはビューで完全更新が実行されないようにするには、テーブル・プロパティの pipelines.reset.allowed
を false
に設定します。 Lakeflow 宣言型パイプライン テーブルのプロパティを参照してください。また、追加フローを使用して、完全な更新を必要とせずに既存のストリーミングテーブルにデータを追加することもできます。
選択したテーブルのパイプライン更新を開始する
必要に応じて、パイプライン内の選択したテーブルのデータのみを再処理できます。 たとえば、開発中に 1 つのテーブルのみを変更してテスト時間を短縮したい場合や、パイプラインの更新が失敗し、 失敗したテーブルのみを更新する場合などです。
LakeFlow Pipelinesエディターには、ソース ファイル、選択したテーブル、または単一のテーブルを再処理するためのオプションがあります。 詳細については、 「パイプライン コードの実行」を参照してください。
失敗したテーブルのパイプライン更新を開始する
パイプライン グラフの 1 つ以上のテーブルのエラーが原因でパイプラインの更新が失敗した場合は、失敗したテーブルとダウンストリームの依存関係のみの更新を開始できます。
除外されたテーブルは、障害が発生したテーブルに依存している場合でも、更新されません。
失敗したテーブルを更新するには、パイプラインモニタリングページで、 [失敗したテーブルを更新] をクリックします。
パイプライン監視ページから選択した失敗したテーブルのみを更新するには、次の手順を実行します。
-
「更新に失敗しました 」 ボタンの横にある「」をクリックし、「 更新用のテーブルを選択」 をクリックします。 [ 更新するテーブルの選択 ] ダイアログが表示されます。
-
更新するテーブルを選択するには、各テーブルをクリックします。 選択したテーブルが強調表示され、ラベルが付けられます。 更新からテーブルを削除するには、テーブルをもう一度クリックします。
-
選択範囲の更新 をクリックします。
選択範囲の更新 ボタンには、選択したテーブルの数が括弧内に表示されます。
選択したテーブルに既に取り込まれたデータを再処理するには、[ 選択の更新] ボタンの横にある [] をクリックし、[ 選択の完全更新] をクリックします。
パイプラインの更新を開始して、選択的ストリーミング フローのチェックポイントをクリアする
オプションで、パイプライン内の選択したストリーミングフローのデータを再処理し、すでに取り込まれたデータを消去せずに行うことができます。
選択されていないフローは、REFRESH 更新を使用して実行されます。また、 full_refresh_selection
または refresh_selection
を指定して、他のテーブルを選択的に更新することもできます。
選択したストリーミング チェックポイントを更新するための更新を開始するには、Lakeflow宣言型パイプラインRESTAPI で updates リクエストを使用します。次の例では、 curl
コマンドを使用して updates
要求を呼び出し、パイプラインの更新を開始します。
curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": [<streaming flow1>, <streaming flow 2>...]
}' \
https://<your-databricks-instance>/api/2.0/pipelines/<your-pipeline-id>/updates
テーブルの更新を待たずにパイプラインにエラーがないか確認する
プレビュー
LakeFlow宣言型パイプラインDry run
機能はパブリック プレビュー段階にあります。
完全な更新を実行せずにパイプラインのソースコードが有効かどうかを確認するには、 ドライ ラン を使用します。ドライ実行は、パイプラインで定義されたデータセットとフローの定義を解決しますが、データセットを具体化または公開することはありません。 dry 実行中に見つかったエラー (テーブル名や列名が正しくないなど) は UI に報告されます。
ドライ実行を開始するには、 [スタート] の横にあるパイプラインの詳細ページで [Dry 実行] を クリックします。
dry 実行が完了すると、下部パネルのイベント トレイにエラーが表示されます。 イベント トレイをクリックすると、下部のパネルに見つかった問題が表示されます。さらに、イベント ログには、dry 実行に関連するイベントのみが表示され、DAG にはメトリクスは表示されません。 エラーが見つかった場合は、イベント ログに詳細が記録されます。
最新の dry 実行の結果のみが表示されます。 dry 実行 が最新の 実行 更新だった場合は、更新履歴でそれを選択すると結果を確認できます。 ドライ実行の後に別の更新が実行されると、結果は UI で利用できなくなります。
開発モード
開発モードをオンにして、 LakeFlow Pipelines Editor からパイプラインを実行します。 スケジュールされたパイプラインは、デフォルトで開発モードをオフにした状態で実行されます。本番運用でパイプラインがどのように実行されるかをテストしたい場合は、エディターのドロップダウンから 別の設定で実行 を選択することで、開発モードを使用するかどうかを対話的に選択できます。
従来のノートブック エディターで作成されたパイプラインは、デフォルトで開発モードを使用します。パイプラインモニタリングページの 設定 を選択すると、設定を確認または変更できます。 モニタリングページは、ワークスペースの左側にある ジョブ&パイプライン ボタンからアクセスできます。 パイプライン アセット ブラウザー で実行結果をクリックして、パイプライン エディターからモニタリング ページに直接ジャンプすることもできます。
パイプラインを開発モードで実行すると、 Lakeflow 宣言型パイプライン システムは次の処理を行います。
- クラスタリングを再利用して、再起動のオーバーヘッドを回避します。 デフォルトでは、クラスタリング 開発モードが有効な場合、2 時間実行します。 これは、Configure classic conpiュート for LakeFlow Declarative パイプラインの [
pipelines.clusterShutdown.delay
] 設定で変更できます。 - パイプラインの再試行を無効にして、エラーをすぐに検出して修正できるようにします。
開発モードをオフにすると、 LakeFlow宣言型パイプライン システムは次のことを行います。
- メモリ リークや古い資格情報など、特定の回復可能なエラーのクラスターを再開します。
- クラスターの開始の失敗など、特定のエラーが発生した場合に実行を再試行します。
開発モードのオン/オフを切り替えると、クラスターとパイプラインの実行動作のみが制御されます。パブリッシュ テーブルのカタログ内の保存場所とターゲット スキーマは、パイプライン設定の一部として構成する必要があり、モードを切り替えても影響を受けません。