メインコンテンツまでスキップ

制御テーブルを使用してFor eachジョブを駆動します

多くのソースから摂取する必要がある場合があります。 そのリストが変更された場合、ジョブ設定にそれをハードコーディングすると、コードを変更して再デプロイする必要が生じます。この問題を解決するには、 メタデータを 使用して、ソースのリストをテーブルに格納し、実行時に読み込んで使用します。ソースを新しい行として追加すると、次回のジョブ実行時にジョブ自体に変更を加えることなく、そのソースが自動的に取り込まれます。

このチュートリアルでは、この方法を用いてジョブを作成する方法を説明します。SQLタスクはコントロール テーブルを読み取り、 For eachタスクはすべての行を並行して反復します。

仕組み

このパターンでは、3種類のタスクが順番に接続されています。

タスク

Type

その機能

read_markets

SQL

設定テーブルを照会し、結果を行配列として取得します。

process_markets

For each

{{tasks.read_markets.output.rows}}を反復処理し、各行ごとにネストされたタスクを一度実行します。

run_market_analysis_iteration

ノートブックまたはSQL (For each 内にネストされている)

行ごとに 1 回実行し、問題として渡された行値を使用してビジネス ロジックを実行します

SQLタスクの出力 (行オブジェクトのJSON配列) は、動的値参照{{tasks.read_markets.output.rows}}を使用して、 For eachタスクの 入力 フィールドに直接流れ込みます。 次に、 For eachタスクは、各行をネストされたタスクに問題として渡します。これは、 {{input.market}}および{{input.currency}}として使用できます。

前提条件

  • ジョブとノートブックを作成する権限を持つDatabricksワークスペース
  • Unity Catalogでテーブルを作成する権限
  • 設定テーブルを作成できるUnity Catalogスキーマ(例: config
  • SQL SQLハウス

ステップ 1: 構成テーブルを作成する

設定テーブルは、あなたのコントロールプレーンです。それは、あなたのジョブが処理する値のリストを保持しています。作業を追加または削除する必要がある場合は、ジョブ自体ではなく、このテーブルを更新します。

configスキーマにmarketsテーブルを作成するには、次のSQLを実行してください。

SQL
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);

Databricksノートブック、 SQLエディター、または任意のSQLタスクを使用して、このステートメントを実行できます。 このステップの後、 config.marketsは3つの行が含まれ、各行は市場ごとに1つずつ、それぞれの通貨コードが含まれています。

ステップ 2: 処理コードを書く

For eachタスク内のネストされたタスクは、行ごとに1回実行されます。ビジネス ロジックに応じて、ノートブック タスクまたはSQLタスクを選択します。

/Workspace/Users/<username>/process_marketのようなパスに新しいノートブックを作成します。このノートブックは、 For eachタスクの反復ごとに1回実行され、毎回異なる市場価格を受け取ります。

ノートブックに以下のコードを追加してください。

Python
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

dbutils.widgets.text()呼び出しはデフォルト値を設定するため、ノートブックをジョブに接続せずにワークスペースで直接実行できます。ノートブックをFor eachタスク内のネストされたタスクとして実行すると、ジョブはその反復の実際の確保値で当然をオーバーライドします。

dbutils.widgets.get()前にdbutils.widgets.text()呼び出してください。get textより前に呼び出された場合、ジョブ外でノートブックを実行するとInputWidgetNotDefinedエラーが発生します。

安全を使用すると、ジョブの外でノートブックをテストできますが、トレードオフに注意してください。 For eachタスクの構成が間違っていて問題に合格しない場合、ノートブックは失敗せずに安全を使用して、サイレントに成功します。これにより、構成ミスの検出が難しくなる可能性があります。

ステップ 3: ジョブを作成する

Databricksワークスペースで、サイドバーの [ワークフロー ] をクリックし、 [ジョブの作成] を クリックします。 ジョブにわかりやすい名前を付けます (例: Market Analysis )。

ステップ 4: SQLルックアップタスクを構成する

SQLタスクは構成クエリを実行し、その出力をダウンストリーム タスクで利用できるようにします。

  1. ジョブエディタで、 [タスクの追加] をクリックします。

  2. タスク名read_marketsに設定します。

  3. Set Type to SQL .

  4. SQL フィールドに、次のクエリを入力してください。

    SQL
    SELECT market, currency FROM config.markets
  5. SQLウェアハウスを ワークスペース内のウェアハウスに設定します。

  6. タスクを作成 」をクリックします。

このタスクが実行されると、Databricks はクエリを実行し、結果を JSON 配列としてtasks.read_markets.output.rowsにキャプチャします。SQLタスクの出力は常にJSON配列として返されます。追加の設定は必要ありません。 この参照の一般的な形式はtasks.<task-name>.output.rowsであり、 <task-name>ジョブエディタで設定したタスクキーに一致します。出力は次のようになります。

JSON
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]

ステップ 5: 各タスクの設定

For eachタスクはSQL出力を読み取り、行ごとに1つのネストされたタスクを実行します。

  1. 「タスクの追加」 をクリックし、 「依存先」read_marketsに設定します。

  2. タスク名process_marketsに設定します。

  3. タイプを 「各項目について 」に設定します。

  4. 「入力」 フィールドに以下を入力します。


    {{tasks.read_markets.output.rows}}

    これは、 SQLタスクによってキャプチャされた行配列を参照します。

  5. 2つの反復処理を並行して実行するには、 Concurrencyを 2に設定してください。スループットを向上させる場合、またはネストされたタスクがより高い並列処理をサポートする場合は、この値を増やしてください。

  6. [タスクの追加] をクリックしてループし 、ステップ 2 で選択したタイプに基づいてネストされたタスクを構成します。

  1. タスク名run_market_analysis_iterationに設定します。

  2. Set Type to ノートブック .

  3. Path を ステップ 2 で作成したノートブックのパスに設定します。

  4. [問題] をクリックし、[ 追加] をクリックして次のそれぞれを追加します。

    • キー : market : {{input.market}}
    • キー : currency : {{input.currency}}

    {{input.<key>}}参照は、現在のイテレーションの行オブジェクトの対応するフィールドに解決されます。

  5. タスクを作成 」をクリックします。

ジョブ DAG には、 read_markets process_marketsに流れ込んでいることが表示され、ネストされたタスクがFor eachノード内に表示されます。

ステップ 6: ジョブを実行して確認する

  1. 「今すぐ実行」 をクリックしてジョブを開始してください。
  2. ジョブ実行ページで、 process_marketsノードをクリックしてFor eachタスクを展開します。
  3. ジョブ実行ページには、反復処理の表が表示されます。各反復処理は、市場価値ごとに1行ずつ表示され、それぞれのステータス、開始時刻、および実行期間が表示されます。
  4. いずれかの反復行をクリックすると、タスク実行結果が開き、正しい市場価格が取得されたことを確認できます。

特定の反復処理が失敗した場合、ジョブ全体を再実行することなく、ジョブ実行ページからその反復処理のみを再実行できます。

パターンを拡張する

新しい市場を追加するには、設定テーブルに行を挿入します。

SQL
INSERT INTO config.markets VALUES ('DE', 'EUR');

次回のジョブ実行時には、ジョブ設定の変更やノートブックの編集は不要で、自動的にドイツが対象地域に含まれます。

このパターンは、データに基づいて反復処理を進めたいあらゆるユースケースに適用できます。

  • 顧客ごとの処理 : 顧客 ID ごとに 1 行。ノートブックは顧客固有の変換を適用するか、顧客固有の宛先に配信します。
  • テーブル取り込み :ソーステーブル名ごとに1行。ノートブックは各テーブルを読み込んで取り込みます。
  • バックフィル処理 : 日付パーティションごとに 1 行。ノートブックはそのパーティションの履歴データを再処理します。
  • 機能フラグ駆動の実行 : 有効な機能またはエクスペリメントごとに 1 行。ノートブックは対応するロジックをアクティブにします。

処理対象から項目を削除するには、該当する行を削除するか、 activeフラグ列を追加してSQLクエリでフィルタリングします。

SQL
SELECT market, currency FROM config.markets WHERE active = TRUE

次のステップ