制御テーブルを使用してFor eachジョブを駆動します
多くのソースから摂取する必要がある場合があります。 そのリストが変更された場合、ジョブ設定にそれをハードコーディングすると、コードを変更して再デプロイする必要が生じます。この問題を解決するには、 メタデータを 使用して、ソースのリストをテーブルに格納し、実行時に読み込んで使用します。ソースを新しい行として追加すると、次回のジョブ実行時にジョブ自体に変更を加えることなく、そのソースが自動的に取り込まれます。
このチュートリアルでは、この方法を用いてジョブを作成する方法を説明します。SQLタスクはコントロール テーブルを読み取り、 For eachタスクはすべての行を並行して反復します。
仕組み
このパターンでは、3種類のタスクが順番に接続されています。
タスク | Type | その機能 |
|---|---|---|
| SQL | 設定テーブルを照会し、結果を行配列として取得します。 |
| For each |
|
| ノートブックまたはSQL (For each 内にネストされている) | 行ごとに 1 回実行し、問題として渡された行値を使用してビジネス ロジックを実行します |
SQLタスクの出力 (行オブジェクトのJSON配列) は、動的値参照{{tasks.read_markets.output.rows}}を使用して、 For eachタスクの 入力 フィールドに直接流れ込みます。 次に、 For eachタスクは、各行をネストされたタスクに問題として渡します。これは、 {{input.market}}および{{input.currency}}として使用できます。
前提条件
- ジョブとノートブックを作成する権限を持つDatabricksワークスペース
- Unity Catalogでテーブルを作成する権限
- 設定テーブルを作成できるUnity Catalogスキーマ(例:
config) - SQL SQLハウス
ステップ 1: 構成テーブルを作成する
設定テーブルは、あなたのコントロールプレーンです。それは、あなたのジョブが処理する値のリストを保持しています。作業を追加または削除する必要がある場合は、ジョブ自体ではなく、このテーブルを更新します。
configスキーマにmarketsテーブルを作成するには、次のSQLを実行してください。
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Databricksノートブック、 SQLエディター、または任意のSQLタスクを使用して、このステートメントを実行できます。 このステップの後、 config.marketsは3つの行が含まれ、各行は市場ごとに1つずつ、それぞれの通貨コードが含まれています。
ステップ 2: 処理コードを書く
For eachタスク内のネストされたタスクは、行ごとに1回実行されます。ビジネス ロジックに応じて、ノートブック タスクまたはSQLタスクを選択します。
- Notebook task
- SQL task
/Workspace/Users/<username>/process_marketのようなパスに新しいノートブックを作成します。このノートブックは、 For eachタスクの反復ごとに1回実行され、毎回異なる市場価格を受け取ります。
ノートブックに以下のコードを追加してください。
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
dbutils.widgets.text()呼び出しはデフォルト値を設定するため、ノートブックをジョブに接続せずにワークスペースで直接実行できます。ノートブックをFor eachタスク内のネストされたタスクとして実行すると、ジョブはその反復の実際の確保値で当然をオーバーライドします。
dbutils.widgets.get()前にdbutils.widgets.text()呼び出してください。get textより前に呼び出された場合、ジョブ外でノートブックを実行するとInputWidgetNotDefinedエラーが発生します。
安全を使用すると、ジョブの外でノートブックをテストできますが、トレードオフに注意してください。 For eachタスクの構成が間違っていて問題に合格しない場合、ノートブックは失敗せずに安全を使用して、サイレントに成功します。これにより、構成ミスの検出が難しくなる可能性があります。
:param_name構文を使用した という名前のSQLタスクのサポート。 反復値を使用したい箇所では、クエリ内で:marketと:currencyを参照してください。
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
ステップ 5 でタスク エディターでこのクエリを直接設定します。 For eachタスクは、ランタイム時に現在の反復の値を、という名前の:marketと:currencyに渡します。 ノートブック タスクとは異なり、問題という名前のSQL 、間違いの値をサポートしません。問題が渡されない場合、クエリは問題解決エラーで失敗します。 クエリを実行する前に検証または安全性を確認するには、代わりにノートブックタスクを使用してください。
ステップ 3: ジョブを作成する
Databricksワークスペースで、サイドバーの [ワークフロー ] をクリックし、 [ジョブの作成] を クリックします。 ジョブにわかりやすい名前を付けます (例: Market Analysis )。
ステップ 4: SQLルックアップタスクを構成する
SQLタスクは構成クエリを実行し、その出力をダウンストリーム タスクで利用できるようにします。
-
ジョブエディタで、 [タスクの追加] をクリックします。
-
タスク名 を
read_marketsに設定します。 -
Set Type to SQL .
-
SQL フィールドに、次のクエリを入力してください。
SQLSELECT market, currency FROM config.markets -
SQLウェアハウスを ワークスペース内のウェアハウスに設定します。
-
「 タスクを作成 」をクリックします。
このタスクが実行されると、Databricks はクエリを実行し、結果を JSON 配列としてtasks.read_markets.output.rowsにキャプチャします。SQLタスクの出力は常にJSON配列として返されます。追加の設定は必要ありません。 この参照の一般的な形式はtasks.<task-name>.output.rowsであり、 <task-name>ジョブエディタで設定したタスクキーに一致します。出力は次のようになります。
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
ステップ 5: 各タスクの設定
For eachタスクはSQL出力を読み取り、行ごとに1つのネストされたタスクを実行します。
-
「タスクの追加」 をクリックし、 「依存先」 を
read_marketsに設定します。 -
タスク名 を
process_marketsに設定します。 -
タイプを 「各項目について 」に設定します。
-
「入力」 フィールドに以下を入力します。
{{tasks.read_markets.output.rows}}これは、 SQLタスクによってキャプチャされた行配列を参照します。
-
2つの反復処理を並行して実行するには、 Concurrencyを
2に設定してください。スループットを向上させる場合、またはネストされたタスクがより高い並列処理をサポートする場合は、この値を増やしてください。 -
[タスクの追加] をクリックしてループし 、ステップ 2 で選択したタイプに基づいてネストされたタスクを構成します。
- Notebook task
- SQL task
-
タスク名 を
run_market_analysis_iterationに設定します。 -
Set Type to ノートブック .
-
Path を ステップ 2 で作成したノートブックのパスに設定します。
-
[問題] をクリックし、[ 追加] をクリックして次のそれぞれを追加します。
- キー :
market、 値 :{{input.market}} - キー :
currency、 値 :{{input.currency}}
各
{{input.<key>}}参照は、現在のイテレーションの行オブジェクトの対応するフィールドに解決されます。 - キー :
-
「 タスクを作成 」をクリックします。
-
タスク名 を
run_market_analysis_iterationに設定します。 -
Set Type to SQL .
-
SQL フィールドに、名前付きクエリを入力します。例:
SQLSELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency -
SQLウェアハウスを ワークスペース内のウェアハウスに設定します。
-
[問題] をクリックし、[ 追加] をクリックして次のそれぞれを追加します。
- キー :
market、 値 :{{input.market}} - キー :
currency、 値 :{{input.currency}}
各
{{input.<key>}}参照は、現在のイテレーションの行オブジェクトの対応するフィールドに解決されます。 - キー :
-
「 タスクを作成 」をクリックします。
ジョブ DAG には、 read_markets process_marketsに流れ込んでいることが表示され、ネストされたタスクがFor eachノード内に表示されます。
ステップ 6: ジョブを実行して確認する
- 「今すぐ実行」 をクリックしてジョブを開始してください。
- ジョブ実行ページで、
process_marketsノードをクリックしてFor eachタスクを展開します。 - ジョブ実行ページには、反復処理の表が表示されます。各反復処理は、市場価値ごとに1行ずつ表示され、それぞれのステータス、開始時刻、および実行期間が表示されます。
- いずれかの反復行をクリックすると、タスク実行結果が開き、正しい市場価格が取得されたことを確認できます。
特定の反復処理が失敗した場合、ジョブ全体を再実行することなく、ジョブ実行ページからその反復処理のみを再実行できます。
パターンを拡張する
新しい市場を追加するには、設定テーブルに行を挿入します。
INSERT INTO config.markets VALUES ('DE', 'EUR');
次回のジョブ実行時には、ジョブ設定の変更やノートブックの編集は不要で、自動的にドイツが対象地域に含まれます。
このパターンは、データに基づいて反復処理を進めたいあらゆるユースケースに適用できます。
- 顧客ごとの処理 : 顧客 ID ごとに 1 行。ノートブックは顧客固有の変換を適用するか、顧客固有の宛先に配信します。
- テーブル取り込み :ソーステーブル名ごとに1行。ノートブックは各テーブルを読み込んで取り込みます。
- バックフィル処理 : 日付パーティションごとに 1 行。ノートブックはそのパーティションの履歴データを再処理します。
- 機能フラグ駆動の実行 : 有効な機能またはエクスペリメントごとに 1 行。ノートブックは対応するロジックをアクティブにします。
処理対象から項目を削除するには、該当する行を削除するか、 activeフラグ列を追加してSQLクエリでフィルタリングします。
SELECT market, currency FROM config.markets WHERE active = TRUE
次のステップ
For eachタスクを使用してループ内で別のタスクを実行する— 保留タイプや同時実行オプションを含む、For eachタスクを構成するための完全なリファレンスFor eachタスク内の大きな問題配列にルックアップ テーブルを使用する- 48 KB のタスク値制限を超える大きな問題配列を処理する方法- タスクからの論点値へのアクセス- ノートブック、 Pythonスクリプト、およびSQLタスクの論点値にアクセスするためのすべてのメソッド