制御テーブルを使用してFor eachジョブを駆動します
多くのソースから取り込む必要がある場合があります。そのリストが変更された場合、ジョブ構成にそれをハードコーディングすることは、コードを変更し、再デプロイすることになります。実行時に読み取られ使用されるテーブルにソースのリストを格納することで、 メタデータ を使用してこれに対処します。新しい行としてソースを追加すると、次回のジョブ実行でジョブ自体に変更を加えることなく取り込まれます。
このチュートリアルでは、この方法を用いてジョブを作成する方法を説明します。SQLタスクはコントロール テーブルを読み取り、 For eachタスクはすべての行を並行して反復します。
仕組み
このパターンでは、3種類のタスクが順番に接続されています。
タスク | Type | その機能 |
|---|---|---|
| SQL | 設定テーブルを照会し、結果を行配列として取得します。 |
| For each |
|
| ノートブックまたはSQL (For each 内にネストされている) | 行ごとに 1 回実行し、問題として渡された行値を使用してビジネス ロジックを実行します |
SQLタスクの出力 (行オブジェクトのJSON配列) は、動的値参照{{tasks.read_markets.output.rows}}を使用して、 For eachタスクの 入力 フィールドに直接流れ込みます。 次に、 For eachタスクは、各行をネストされたタスクに問題として渡します。これは、 {{input.market}}および{{input.currency}}として使用できます。
前提条件
- ジョブとノートブックを作成する権限を持つDatabricksワークスペース
- Unity Catalogでテーブルを作成する権限
- 設定テーブルを作成できるUnity Catalogスキーマ(例:
config) - SQL SQLハウス
ステップ 1: 構成テーブルを作成する
設定テーブルはコントロールプレーンです。ジョブが処理する値のリストです。作業を追加または削除する必要がある場合、ジョブではなくこのテーブルを更新します。
configスキーマにmarketsテーブルを作成するには、次のSQLを実行してください。
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Databricksノートブック、 SQLエディター、または任意のSQLタスクを使用して、このステートメントを実行できます。 このステップの後、 config.marketsは3つの行が含まれ、各行は市場ごとに1つずつ、それぞれの通貨コードが含まれています。
ステップ 2: 処理コードを書く
For eachタスク内のネストされたタスクは、行ごとに1回実行されます。ビジネス ロジックに応じて、ノートブック タスクまたはSQLタスクを選択します。
- Notebook task
- SQL task
/Workspace/Users/<username>/process_marketのようなパスに新しいノートブックを作成します。このノートブックは、 For eachタスクの反復ごとに1回実行され、毎回異なる市場価格を受け取ります。
ノートブックに以下のコードを追加してください。
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
dbutils.widgets.text()呼び出しはデフォルト値を設定するため、ノートブックをジョブに接続せずにワークスペースで直接実行できます。ノートブックをFor eachタスク内のネストされたタスクとして実行すると、ジョブはその反復の実際の確保値で当然をオーバーライドします。
dbutils.widgets.get()前にdbutils.widgets.text()呼び出してください。get textより前に呼び出された場合、ジョブ外でノートブックを実行するとInputWidgetNotDefinedエラーが発生します。
デフォルトを使用すると、ジョブの外部でノートブックをテストできます。ただし、トレードオフにご注意ください。For eachタスクが誤って構成され、パラメーターを渡さない場合、ノートブックはデフォルトを使用し、失敗する代わりにサイレントに成功します。これにより、構成ミスが検出されにくくなる可能性があります。
:param_name構文を使用した という名前のSQLタスクのサポート。 反復値を使用したい箇所では、クエリ内で:marketと:currencyを参照してください。
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
このクエリーは、ステップ5でタスクエディタにて直接構成します。For each タスクは、現在のイテレーションの値を :market と :currency という名前のパラメーターにランタイム時に渡します。ノートブックタスクとは異なり、SQL の名前付きパラメーターはデフォルト値をサポートしていません;パラメーターが渡されない場合、クエリはパラメーター解決エラーで失敗します。クエリ実行前にパラメーターの検証またはデフォルト設定を行うには、代わりにノートブック タスクを使用してください。
ステップ 3: ジョブを作成する
Databricksワークスペースで、サイドバーの [ワークフロー ] をクリックし、 [ジョブの作成] を クリックします。 ジョブにわかりやすい名前を付けます (例: Market Analysis )。
ステップ 4: SQLルックアップタスクを構成する
SQLタスクは構成クエリを実行し、その出力をダウンストリーム タスクで利用できるようにします。
-
ジョブエディタで、 [タスクの追加] をクリックします。
-
タスク名 を
read_marketsに設定します。 -
Set Type to SQL .
-
SQL フィールドに、次のクエリを入力してください。
SQLSELECT market, currency FROM config.markets -
SQLウェアハウスを ワークスペース内のウェアハウスに設定します。
-
「 タスクを作成 」をクリックします。
このタスクが実行されると、Databricks はクエリーを実行し、その結果を tasks.read_markets.output.rows に JSON 配列としてキャプチャします。SQL タスク出力は常に JSON 配列として返されます。追加の構成は必要ありません。この参照の汎用形式はtasks.<task-name>.output.rowsで、<task-name>はジョブエディターで設定したタスクキーと一致します。出力は次のようになります。
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
ステップ 5: 各タスクの設定
For eachタスクはSQL出力を読み取り、行ごとに1つのネストされたタスクを実行します。
-
「タスクの追加」 をクリックし、 「依存先」 を
read_marketsに設定します。 -
タスク名 を
process_marketsに設定します。 -
タイプを 「各項目について 」に設定します。
-
「入力」 フィールドに以下を入力します。
{{tasks.read_markets.output.rows}}これは、 SQLタスクによってキャプチャされた行配列を参照します。
-
2つの反復処理を並行して実行するには、 Concurrencyを
2に設定してください。スループットを向上させる場合、またはネストされたタスクがより高い並列処理をサポートする場合は、この値を増やしてください。 -
[タスクの追加] をクリックしてループし 、ステップ 2 で選択したタイプに基づいてネストされたタスクを構成します。
- Notebook task
- SQL task
-
タスク名 を
run_market_analysis_iterationに設定します。 -
Set Type to ノートブック .
-
Path を ステップ 2 で作成したノートブックのパスに設定します。
-
[問題] をクリックし、[ 追加] をクリックして次のそれぞれを追加します。
- キー :
market、 値 :{{input.market}} - キー :
currency、 値 :{{input.currency}}
各
{{input.<key>}}参照は、現在のイテレーションの行オブジェクトの対応するフィールドに解決されます。 - キー :
-
「 タスクを作成 」をクリックします。
-
タスク名 を
run_market_analysis_iterationに設定します。 -
Set Type to SQL .
-
SQL フィールドに、名前付きクエリを入力します。例:
SQLSELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency -
SQLウェアハウスを ワークスペース内のウェアハウスに設定します。
-
[問題] をクリックし、[ 追加] をクリックして次のそれぞれを追加します。
- キー :
market、 値 :{{input.market}} - キー :
currency、 値 :{{input.currency}}
各
{{input.<key>}}参照は、現在のイテレーションの行オブジェクトの対応するフィールドに解決されます。 - キー :
-
「 タスクを作成 」をクリックします。
ジョブ DAG には、 read_markets process_marketsに流れ込んでいることが表示され、ネストされたタスクがFor eachノード内に表示されます。
ステップ 6: ジョブを実行して確認する
- 「今すぐ実行」 をクリックしてジョブを開始してください。
- ジョブ実行ページで、
process_marketsノードをクリックしてFor eachタスクを展開します。 - ジョブ実行ページには、市場価値ごとに1行のイテレーションテーブルが表示され、各行にステータス、開始時刻、および継続時間が表示されます。
- いずれかの反復行をクリックすると、タスク実行結果が開き、正しい市場価格が取得されたことを確認できます。
特定の反復処理が失敗した場合、ジョブ全体を再実行することなく、ジョブ実行ページからその反復処理のみを再実行できます。
パターンを拡張する
新しい市場を追加するには、設定テーブルに行を挿入します。
INSERT INTO config.markets VALUES ('DE', 'EUR');
次回のジョブ実行時には、ジョブ設定の変更やノートブックの編集は不要で、自動的にドイツが対象地域に含まれます。
このパターンは、データに基づいて反復処理を進めたいあらゆるユースケースに適用できます。
- 顧客ごとの処理 : 顧客 ID ごとに 1 行。ノートブックは顧客固有の変換を適用するか、顧客固有の宛先に配信します。
- テーブル取り込み :ソーステーブル名ごとに1行。ノートブックは各テーブルを読み込んで取り込みます。
- バックフィル処理 : 日付パーティションごとに 1 行。ノートブックはそのパーティションの履歴データを再処理します。
- 機能フラグ駆動の実行 : 有効な機能またはエクスペリメントごとに 1 行。ノートブックは対応するロジックをアクティブにします。
処理対象から項目を削除するには、該当する行を削除するか、 activeフラグ列を追加してSQLクエリでフィルタリングします。
SELECT market, currency FROM config.markets WHERE active = TRUE
次のステップ
For eachタスクを使用して別のタスクをループで実行する:For eachタスクの構成に関する完全なリファレンス。パラメーターの型と同時実行オプションを含みます。- 「
For eachタスクでの大きなパラメーター配列のルックアップテーブルの使用」: 48 KBのタスク値制限を超える大きなパラメーター配列を処理する方法 - タスクからのパラメーター値へのアクセス:ノートブック、Pythonスクリプト、およびSQLタスクでパラメーター値にアクセスするためのすべてのメソッド