Skip to main content

Use a control table to drive a For each job

You may need to ingest from many sources. When that list changes, hardcoding it in job configuration means changing code, and redeploying. Use metadata to address this by storing the list of sources in a table that is read and used at run time. Add a source as a new row and the next job run picks it up with no changes to the job itself.

This tutorial shows you how to build a job using this approach. A SQL task reads the control table, and a For each task iterates over every row in parallel.

How it works

The pattern uses three task types wired together in sequence:

Task

Type

What it does

read_markets

SQL

Queries a config table and captures the result as a row array

process_markets

For each

Iterates over {{tasks.read_markets.output.rows}}, running the nested task once per row

run_market_analysis_iteration

Notebook or SQL (nested inside For each)

Runs once per row, using row values passed as parameters to execute your business logic

The SQL task's output—a JSON array of row objects—flows directly into the For each task's Inputs field using the dynamic value reference {{tasks.read_markets.output.rows}}. The For each task then passes each row to the nested task as parameters, available as {{input.market}} and {{input.currency}}.

Prerequisites

  • A Databricks workspace with permission to create jobs and notebooks
  • Permission to create tables in Unity Catalog
  • A Unity Catalog schema where you can create the config table (for example, config)
  • A SQL warehouse to run the SQL tasks

Step 1: Create the config table

The config table is your control plane. It holds the list of values your job processes. When you need to add or remove work, you update this table — not the job.

Run the following SQL to create a markets table in your config schema:

SQL
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);

You can use a Databricks notebook, the SQL editor, or any SQL task to run this statement. After this step, config.markets contains three rows, one per market, each with its currency code.

Step 2: Write the processing code

The nested task inside the For each task runs once per row. Choose a notebook task or a SQL task depending on your business logic.

Create a new notebook at a path such as /Workspace/Users/<username>/process_market. This notebook runs once per iteration of the For each task, receiving a different market value each time.

Add the following code to the notebook:

Python
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

The dbutils.widgets.text() calls set default values so you can run the notebook directly in your workspace without connecting it to a job. When the notebook runs as a nested task inside a For each task, the job overrides the defaults with the actual parameter values for that iteration.

Call dbutils.widgets.text() before dbutils.widgets.get(). If get is called before text, the notebook raises an InputWidgetNotDefined error when you run it outside a job.

Using defaults lets you test the notebook outside a job, but note the trade-off: if the For each task is misconfigured and does not pass parameters, the notebook uses the defaults and succeeds silently instead of failing — which can make the misconfiguration harder to detect.

Step 3: Create the job

In your Databricks workspace, click Workflows in the sidebar, then click Create job. Give the job a descriptive name, for example Market Analysis.

Step 4: Configure the SQL lookup task

The SQL task runs your config query and makes its output available to downstream tasks.

  1. In the job editor, click Add task.

  2. Set Task name to read_markets.

  3. Set Type to SQL.

  4. In the SQL field, enter the following query:

    SQL
    SELECT market, currency FROM config.markets
  5. Set SQL warehouse to a warehouse in your workspace.

  6. Click Create task.

When this task runs, Databricks runs the query and captures the result as a JSON array in tasks.read_markets.output.rows. SQL task output is always returned as a JSON array — no additional configuration is required. The generic form of this reference is tasks.<task-name>.output.rows, where <task-name> matches the task key you set in the job editor. The output looks like this:

JSON
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]

Step 5: Configure the For each task

The For each task reads the SQL output and launches one nested task run per row.

  1. Click Add task and set Depends on to read_markets.

  2. Set Task name to process_markets.

  3. Set Type to For each.

  4. In the Inputs field, enter:

    {{tasks.read_markets.output.rows}}

    This references the row array captured by the SQL task.

  5. Set Concurrency to 2 to allow two iterations to run in parallel. Increase this value to improve throughput or if your nested task supports higher parallelism.

  6. Click Add a task to loop over and configure the nested task based on the type you chose in Step 2:

  1. Set Task name to run_market_analysis_iteration.

  2. Set Type to Notebook.

  3. Set Path to the path of the notebook you created in Step 2.

  4. Click Parameters, then click Add to add each of the following parameters:

    • Key: market, Value: {{input.market}}
    • Key: currency, Value: {{input.currency}}

    Each {{input.<key>}} reference resolves to the corresponding field from the current iteration's row object.

  5. Click Create task.

Your job DAG now shows read_markets flowing into process_markets, with the nested task visible inside the For each node.

Step 6: Run the job and verify

  1. Click Run now to trigger the job.
  2. On the job run page, click the process_markets node to expand the For each task.
  3. The job run page shows a table of iterations — one row per market value — each showing its status, start time, and duration.
  4. Click any iteration row to open the task run output and confirm it received the correct market value.

If a specific iteration fails, you can rerun only that iteration from the job run page without rerunning the entire job.

Extend the pattern

To add a new market, insert a row into the config table:

SQL
INSERT INTO config.markets VALUES ('DE', 'EUR');

The next job run automatically includes Germany, with no job configuration changes or notebook edits required.

This same pattern works for any use case where you want data to drive iteration:

  • Per-customer processing: One row per customer ID; the notebook applies customer-specific transformations or delivers to customer-specific destinations.
  • Table ingestion: One row per source table name; the notebook reads and ingests each table.
  • Backfill processing: One row per date partition; the notebook reprocesses historical data for that partition.
  • Feature flag-driven execution: One row per enabled feature or experiment; the notebook activates the corresponding logic.

To remove an item from processing, delete its row or add an active flag column and filter in the SQL query:

SQL
SELECT market, currency FROM config.markets WHERE active = TRUE

Next steps