Use uma tabela de controle para executar um trabalho For each
Você pode precisar ingerir de várias fontes. Quando essa lista muda, codificá-la diretamente na configuração do Job significa alterar o código e reimplantar o serviço. Utilize metadados para resolver isso, armazenando a lista de fontes em uma tabela que é lida e usada em tempo de execução. Adicione uma origem como uma nova linha e a próxima execução do Job a utilizará sem alterações no próprio Job.
Este tutorial mostra como criar um Job usando essa abordagem. Uma tarefa SQL lê a tabela de controle e uma tarefa For each itera sobre cada linha em paralelo.
Como funciona
O padrão utiliza três tipos de tarefas interligadas em sequência:
Tarefa | Tipo | O que faz |
|---|---|---|
| SQL | Consulta uma tabela de configuração e captura o resultado como uma matriz de linhas. |
| Para cada | Itera sobre |
| Notebook ou SQL (aninhado dentro de um For each) | Execução uma vez por linha, usando os valores das linhas passados como parâmetros para executar sua lógica de negócios. |
A saída da tarefa SQL — uma matriz JSON de objetos de linha — flui diretamente para o campo Entradas da tarefa For each usando a referência de valor dinâmico {{tasks.read_markets.output.rows}}. A tarefa For each passa então cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.
Pré-requisitos
- Um workspace Databricks com permissão para criar Jobs e Notebooks.
- Permissão para criar tabelas no Unity Catalog
- Um esquema Unity Catalog onde você pode criar a tabela de configuração (por exemplo,
config) - Um SQL warehouse para executar a tarefa SQL
o passo 1: Criar a tabela de configuração
A tabela de configuração é o seu plano de controle. Contém a lista de valores que seu trabalho processa. Quando você precisar adicionar ou remover tarefas, atualize esta tabela — não a tarefa em si.
Execute o seguinte SQL para criar uma tabela markets em seu esquema config :
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Você pode usar um Notebook Databricks , o editor SQL ou qualquer tarefa SQL para executar esta instrução. Após este passo, config.markets contém três linhas, uma por mercado, cada uma com seu código de moeda.
o passo 2: Escreva o código de processamento
A tarefa aninhada dentro de For each tarefa execução uma vez por linha. Escolha uma tarefa de Notebook ou uma tarefa SQL , dependendo da sua lógica de negócios.
- Notebook task
- SQL task
Crie um novo Notebook em um caminho como /Workspace/Users/<username>/process_market. Esta execução do Notebook ocorre uma vez por iteração da tarefa For each , recebendo um valor de mercado diferente a cada vez.
Adicione o seguinte código ao Notebook:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
As chamadas dbutils.widgets.text() definem valores default para que você possa executar o Notebook diretamente em seu workspace sem conectá-lo a um Job. Quando a execução do Notebook é uma tarefa aninhada dentro de uma tarefa For each , o Job substitui o padrão pelos valores de parâmetro reais para essa iteração.
Chame dbutils.widgets.text() antes de dbutils.widgets.get(). Se get for chamado antes de text, o Notebook gera um erro InputWidgetNotDefined quando você o executa fora de um Job.
Usar o padrão permite testar o Notebook fora de um Job, mas observe a desvantagem: se a tarefa For each estiver mal configurada e não passar parâmetros, o Notebook usa o padrão e é concluído silenciosamente em vez de falhar — o que pode dificultar a detecção da configuração incorreta.
SQL tarefa suporta parâmetros nomeados usando a sintaxe :param_name . Faça referência :market e :currency em sua consulta sempre que desejar usar os valores de iteração:
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
Você configura essa consulta diretamente no editor de tarefas na etapa 5. A tarefa For each passa os valores da iteração atual para os parâmetros nomeados :market e :currency em tempo de execução. Diferentemente das tarefas do Notebook, os parâmetros nomeados SQL não suportam valores default — se um parâmetro não for passado, a consulta falhará com um erro de resolução de parâmetro. Para validar ou definir parâmetros default antes da execução da consulta, utilize uma tarefa do Notebook.
o passo 3: Criar o trabalho
No seu workspace Databricks , clique em Fluxo de trabalho na barra lateral e, em seguida, clique em Criar tarefa . Dê ao trabalho um nome descritivo, por exemplo Market Analysis.
o passo 4: Configurar a tarefa de pesquisa SQL
A tarefa SQL executa sua consulta de configuração e disponibiliza sua saída para as tarefas subsequentes.
-
No editor de tarefas, clique em Adicionar tarefa .
-
Defina o nome da tarefa como
read_markets. -
Set Type to SQL .
-
No campo SQL , insira a seguinte consulta:
SQLSELECT market, currency FROM config.markets -
Configure SQL warehouse para um repositório em seu workspace.
-
Clique em Criar tarefa .
Quando esta tarefa é executada, Databricks executa a consulta e captura o resultado como uma matriz JSON em tasks.read_markets.output.rows. O resultado da tarefa SQL é sempre retornado como uma matriz JSON — nenhuma configuração adicional é necessária. A forma genérica desta referência é tasks.<task-name>.output.rows, onde <task-name> corresponde à key da tarefa que você definiu no editor de tarefas. O resultado será semelhante a este:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
o passo 5: Configure o For each tarefa
A tarefa For each lê a saída SQL e inicia uma execução de tarefa aninhada por linha.
-
Clique em Adicionar tarefa e defina Depende de para
read_markets. -
Defina o nome da tarefa como
process_markets. -
Defina o tipo como "Para cada" .
-
No campo "Entradas" , digite:
{{tasks.read_markets.output.rows}}Isso faz referência à matriz de linhas capturada pela tarefa SQL.
-
Defina a Concorrência como
2para permitir a execução de duas iterações em paralelo. Aumente esse valor para melhorar a taxa de transferência ou se sua tarefa aninhada suportar maior paralelismo. -
Clique em Adicionar uma tarefa para repetir e configure a tarefa aninhada com base no tipo que você escolheu na etapa 2:
- Notebook task
- SQL task
-
Defina o nome da tarefa como
run_market_analysis_iteration. -
Set Type to Notebook .
-
Defina o caminho para o caminho do Notebook que você criou na etapa 2.
-
Clique em Parâmetros e, em seguida, clique em Adicionar para adicionar cada um dos seguintes parâmetros:
- chave :
market, Valor :{{input.market}} - chave :
currency, Valor :{{input.currency}}
Cada referência
{{input.<key>}}resolve para o campo correspondente do objeto de linha da iteração atual. - chave :
-
Clique em Criar tarefa .
-
Defina o nome da tarefa como
run_market_analysis_iteration. -
Set Type to SQL .
-
No campo SQL , insira sua consulta com os parâmetros nomeados, por exemplo:
SQLSELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency -
Configure SQL warehouse para um repositório em seu workspace.
-
Clique em Parâmetros e, em seguida, clique em Adicionar para adicionar cada um dos seguintes parâmetros:
- chave :
market, Valor :{{input.market}} - chave :
currency, Valor :{{input.currency}}
Cada referência
{{input.<key>}}resolve para o campo correspondente do objeto de linha da iteração atual. - chave :
-
Clique em Criar tarefa .
Seu DAG de trabalho agora mostra read_markets fluindo para process_markets, com a tarefa aninhada visível dentro do nó For each .
o passo 6: execute o Job e verifique
- Clique em "Executar agora" para iniciar a tarefa.
- Na página Execução do trabalho, clique no nó
process_marketspara expandir a tarefaFor each. - A página de execução do trabalho exibe uma tabela de iterações — uma linha por valor de mercado — cada uma mostrando seu status, horário de início e duração.
- Clique em qualquer linha de iteração para abrir o resultado da execução da tarefa e confirmar se ela recebeu o valor de mercado correto.
Se uma iteração específica falhar, você pode executar novamente apenas essa iteração na página de execução do Job, sem precisar executar o Job inteiro novamente.
Amplie o padrão
Para adicionar um novo mercado, insira uma linha na tabela de configuração:
INSERT INTO config.markets VALUES ('DE', 'EUR');
A próxima execução do Job incluirá automaticamente a Alemanha, sem necessidade de alterações na configuração do Job ou edições no Notebook.
Esse mesmo padrão funciona para qualquer caso de uso em que você queira que os dados orientem a iteração:
- Processamento por cliente : Uma linha por ID de cliente; o Notebook aplica transformações específicas do cliente ou entrega em destinos específicos do cliente.
- Ingestão de tabelas : Uma linha por nome de tabela de origem; o Notebook lê e ingere cada tabela.
- Processamento de preenchimento retroativo : Uma linha por partição de data; o Notebook reprocessa os dados históricos para essa partição.
- Execução orientada por flags de recurso : Uma linha por recurso ou experimento habilitado; o Notebook ativa a lógica correspondente.
Para remover um item do processamento, exclua a linha correspondente ou adicione uma coluna de sinalização active e filtre na consulta SQL:
SELECT market, currency FROM config.markets WHERE active = TRUE
Próximos passos
- Use uma tarefa
For eachpara executar outra tarefa em um loop — Referência completa para configurar tarefasFor each, incluindo tipos de parâmetros e opções de concorrência. - Usar uma tabela de consulta para grandes matrizes de parâmetros em uma tarefa
For each— Como lidar com grandes matrizes de parâmetros que excedem o limite de 48 KB para valores de tarefa - Acessar valores de parâmetros de uma tarefa — Todos os métodos para acessar valores de parâmetros em Notebooks, scripts Python e tarefas SQL .