Use uma tabela de controle para executar um trabalho For each
Pode ser necessário ingerir dados de muitas fontes. Quando essa lista muda, codificar essa informação na configuração do Job significa alterar o código e reimplementar. Utilize metadados para abordar essa questão, armazenando a lista de fontes em uma tabela que é lida e utilizada em tempo de execução. Adicione uma fonte como uma nova linha e a próxima execução do Job a incorpora sem alterações no próprio Job.
Este tutorial mostra como criar um Job usando essa abordagem. Uma tarefa SQL lê a tabela de controle e uma tarefa For each itera sobre cada linha em paralelo.
Como funciona
O padrão utiliza três tipos de tarefas interligadas em sequência:
Tarefa | Tipo | O que faz |
|---|---|---|
| SQL | Consulta uma tabela de configuração e captura o resultado como uma matriz de linhas. |
| Para cada | Itera sobre |
| Notebook ou SQL (aninhado dentro de um For each) | Execução uma vez por linha, usando os valores das linhas passados como parâmetros para executar sua lógica de negócios. |
A saída da tarefa SQL — uma matriz JSON de objetos de linha — flui diretamente para o campo Entradas da tarefa For each usando a referência de valor dinâmico {{tasks.read_markets.output.rows}}. A tarefa For each passa então cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.
Pré-requisitos
- Um workspace Databricks com permissão para criar Jobs e Notebooks.
- Permissão para criar tabelas no Unity Catalog
- Um esquema Unity Catalog onde você pode criar a tabela de configuração (por exemplo,
config) - Um SQL warehouse para executar a tarefa SQL
o passo 1: Criar a tabela de configuração
A tabela de configurações é o seu plano de controle. Contém a lista de valores que o job processa. Quando você precisa adicionar ou remover trabalho, você atualiza esta tabela, e não o job.
Execute o seguinte SQL para criar uma tabela markets em seu esquema config :
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);
Você pode usar um Notebook Databricks , o editor SQL ou qualquer tarefa SQL para executar esta instrução. Após este passo, config.markets contém três linhas, uma por mercado, cada uma com seu código de moeda.
o passo 2: Escreva o código de processamento
A tarefa aninhada dentro de For each tarefa execução uma vez por linha. Escolha uma tarefa de Notebook ou uma tarefa SQL , dependendo da sua lógica de negócios.
- Notebook task
- SQL task
Crie um novo Notebook em um caminho como /Workspace/Users/<username>/process_market. Esta execução do Notebook ocorre uma vez por iteração da tarefa For each , recebendo um valor de mercado diferente a cada vez.
Adicione o seguinte código ao Notebook:
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")
# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")
print(f"Processing market: {market} ({currency})")
# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)
As chamadas dbutils.widgets.text() definem valores default para que você possa executar o Notebook diretamente em seu workspace sem conectá-lo a um Job. Quando a execução do Notebook é uma tarefa aninhada dentro de uma tarefa For each , o Job substitui o padrão pelos valores de parâmetro reais para essa iteração.
Chame dbutils.widgets.text() antes de dbutils.widgets.get(). Se get for chamado antes de text, o Notebook gera um erro InputWidgetNotDefined quando você o executa fora de um Job.
Usar padrões permite que você teste o notebook fora de um job, mas observe a desvantagem: se a tarefa For each estiver configurada incorretamente e não passar parâmetros, o notebook usa os padrões e é bem-sucedido silenciosamente em vez de falhar, o que pode tornar a configuração incorreta mais difícil de detectar.
SQL tarefa suporta parâmetros nomeados usando a sintaxe :param_name . Faça referência :market e :currency em sua consulta sempre que desejar usar os valores de iteração:
SELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency
Esta query é configurada diretamente no editor de tarefas na Etapa 5. A tarefa For each passa os valores da iteração atual para os parâmetros nomeados :market e :currency em tempo de execução. Ao contrário das tarefas de notebook, os parâmetros nomeados SQL não suportam valores default; se um parâmetro não for passado, a consulta falha com um erro de resolução de parâmetro. Para validar ou usar parâmetros default antes da execução da consulta, use uma tarefa de notebook em vez disso.
o passo 3: Criar o trabalho
No seu workspace Databricks , clique em Fluxo de trabalho na barra lateral e, em seguida, clique em Criar tarefa . Dê ao trabalho um nome descritivo, por exemplo Market Analysis.
o passo 4: Configurar a tarefa de pesquisa SQL
A tarefa SQL executa sua consulta de configuração e disponibiliza sua saída para as tarefas subsequentes.
-
No editor de tarefas, clique em Adicionar tarefa .
-
Defina o nome da tarefa como
read_markets. -
Set Type to SQL .
-
No campo SQL , insira a seguinte consulta:
SQLSELECT market, currency FROM config.markets -
Configure SQL warehouse para um repositório em seu workspace.
-
Clique em Criar tarefa .
Quando esta tarefa for executada, o Databricks executa a query e captura o resultado como um array JSON em tasks.read_markets.output.rows. A saída da tarefa SQL é sempre retornada como uma matriz JSON; nenhuma configuração adicional é necessária. A forma genérica desta referência é tasks.<task-name>.output.rows, onde <task-name> corresponde à key da tarefa que você definiu no editor de Jobs. A saída fica assim:
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]
o passo 5: Configure o For each tarefa
A tarefa For each lê a saída SQL e inicia uma execução de tarefa aninhada por linha.
-
Clique em Adicionar tarefa e defina Depende de para
read_markets. -
Defina o nome da tarefa como
process_markets. -
Defina o tipo como "Para cada" .
-
No campo "Entradas" , digite:
{{tasks.read_markets.output.rows}}Isso faz referência à matriz de linhas capturada pela tarefa SQL.
-
Defina a Concorrência como
2para permitir a execução de duas iterações em paralelo. Aumente esse valor para melhorar a taxa de transferência ou se sua tarefa aninhada suportar maior paralelismo. -
Clique em Adicionar uma tarefa para repetir e configure a tarefa aninhada com base no tipo que você escolheu na etapa 2:
- Notebook task
- SQL task
-
Defina o nome da tarefa como
run_market_analysis_iteration. -
Set Type to Notebook .
-
Defina o caminho para o caminho do Notebook que você criou na etapa 2.
-
Clique em Parâmetros e, em seguida, clique em Adicionar para adicionar cada um dos seguintes parâmetros:
- chave :
market, Valor :{{input.market}} - chave :
currency, Valor :{{input.currency}}
Cada referência
{{input.<key>}}resolve para o campo correspondente do objeto de linha da iteração atual. - chave :
-
Clique em Criar tarefa .
-
Defina o nome da tarefa como
run_market_analysis_iteration. -
Set Type to SQL .
-
No campo SQL , insira sua consulta com os parâmetros nomeados, por exemplo:
SQLSELECT *
FROM sales.transactions
WHERE market = :market
AND currency_code = :currency -
Configure SQL warehouse para um repositório em seu workspace.
-
Clique em Parâmetros e, em seguida, clique em Adicionar para adicionar cada um dos seguintes parâmetros:
- chave :
market, Valor :{{input.market}} - chave :
currency, Valor :{{input.currency}}
Cada referência
{{input.<key>}}resolve para o campo correspondente do objeto de linha da iteração atual. - chave :
-
Clique em Criar tarefa .
Seu DAG de trabalho agora mostra read_markets fluindo para process_markets, com a tarefa aninhada visível dentro do nó For each .
o passo 6: execute o Job e verifique
- Clique em "Executar agora" para iniciar a tarefa.
- Na página Execução do trabalho, clique no nó
process_marketspara expandir a tarefaFor each. - A página de execução do Job mostra uma tabela de iterações, uma linha por valor de mercado, cada uma mostrando seu status, hora de início e duração.
- Clique em qualquer linha de iteração para abrir o resultado da execução da tarefa e confirmar se ela recebeu o valor de mercado correto.
Se uma iteração específica falhar, você pode executar novamente apenas essa iteração na página de execução do Job, sem precisar executar o Job inteiro novamente.
Amplie o padrão
Para adicionar um novo mercado, insira uma linha na tabela de configuração:
INSERT INTO config.markets VALUES ('DE', 'EUR');
A próxima execução do Job incluirá automaticamente a Alemanha, sem necessidade de alterações na configuração do Job ou edições no Notebook.
Esse mesmo padrão funciona para qualquer caso de uso em que você queira que os dados orientem a iteração:
- Processamento por cliente : Uma linha por ID de cliente; o Notebook aplica transformações específicas do cliente ou entrega em destinos específicos do cliente.
- Ingestão de tabelas : Uma linha por nome de tabela de origem; o Notebook lê e ingere cada tabela.
- Processamento de preenchimento retroativo : Uma linha por partição de data; o Notebook reprocessa os dados históricos para essa partição.
- Execução orientada por flags de recurso : Uma linha por recurso ou experimento habilitado; o Notebook ativa a lógica correspondente.
Para remover um item do processamento, exclua a linha correspondente ou adicione uma coluna de sinalização active e filtre na consulta SQL:
SELECT market, currency FROM config.markets WHERE active = TRUE
Próximos passos
- Use uma
For eachtarefa para executar outra tarefa em um loop: Referência completa para configurarFor eachtarefas, incluindo tipos de parâmetro e opções de simultaneidade - Use uma tabela de pesquisa para grandes arrays de parâmetros em uma
For eachtarefa: como lidar com grandes arrays de parâmetros que excedem o limite de valor de tarefa de 48 KB - Acessar valores de parâmetro de uma tarefa: Todos os métodos para acessar valores de parâmetro em Notebooks, scripts Python e tarefas SQL