Pular para o conteúdo principal

Use uma tabela de controle para executar um trabalho For each

Você pode precisar ingerir de várias fontes. Quando essa lista muda, codificá-la diretamente na configuração do Job significa alterar o código e reimplantar o serviço. Utilize metadados para resolver isso, armazenando a lista de fontes em uma tabela que é lida e usada em tempo de execução. Adicione uma origem como uma nova linha e a próxima execução do Job a utilizará sem alterações no próprio Job.

Este tutorial mostra como criar um Job usando essa abordagem. Uma tarefa SQL lê a tabela de controle e uma tarefa For each itera sobre cada linha em paralelo.

Como funciona

O padrão utiliza três tipos de tarefas interligadas em sequência:

Tarefa

Tipo

O que faz

read_markets

SQL

Consulta uma tabela de configuração e captura o resultado como uma matriz de linhas.

process_markets

Para cada

Itera sobre {{tasks.read_markets.output.rows}}, executando a tarefa aninhada uma vez por linha.

run_market_analysis_iteration

Notebook ou SQL (aninhado dentro de um For each)

Execução uma vez por linha, usando os valores das linhas passados como parâmetros para executar sua lógica de negócios.

A saída da tarefa SQL — uma matriz JSON de objetos de linha — flui diretamente para o campo Entradas da tarefa For each usando a referência de valor dinâmico {{tasks.read_markets.output.rows}}. A tarefa For each passa então cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.

Pré-requisitos

  • Um workspace Databricks com permissão para criar Jobs e Notebooks.
  • Permissão para criar tabelas no Unity Catalog
  • Um esquema Unity Catalog onde você pode criar a tabela de configuração (por exemplo, config)
  • Um SQL warehouse para executar a tarefa SQL

o passo 1: Criar a tabela de configuração

A tabela de configuração é o seu plano de controle. Contém a lista de valores que seu trabalho processa. Quando você precisar adicionar ou remover tarefas, atualize esta tabela — não a tarefa em si.

Execute o seguinte SQL para criar uma tabela markets em seu esquema config :

SQL
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);

Você pode usar um Notebook Databricks , o editor SQL ou qualquer tarefa SQL para executar esta instrução. Após este passo, config.markets contém três linhas, uma por mercado, cada uma com seu código de moeda.

o passo 2: Escreva o código de processamento

A tarefa aninhada dentro de For each tarefa execução uma vez por linha. Escolha uma tarefa de Notebook ou uma tarefa SQL , dependendo da sua lógica de negócios.

Crie um novo Notebook em um caminho como /Workspace/Users/<username>/process_market. Esta execução do Notebook ocorre uma vez por iteração da tarefa For each , recebendo um valor de mercado diferente a cada vez.

Adicione o seguinte código ao Notebook:

Python
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

As chamadas dbutils.widgets.text() definem valores default para que você possa executar o Notebook diretamente em seu workspace sem conectá-lo a um Job. Quando a execução do Notebook é uma tarefa aninhada dentro de uma tarefa For each , o Job substitui o padrão pelos valores de parâmetro reais para essa iteração.

Chame dbutils.widgets.text() antes de dbutils.widgets.get(). Se get for chamado antes de text, o Notebook gera um erro InputWidgetNotDefined quando você o executa fora de um Job.

Usar o padrão permite testar o Notebook fora de um Job, mas observe a desvantagem: se a tarefa For each estiver mal configurada e não passar parâmetros, o Notebook usa o padrão e é concluído silenciosamente em vez de falhar — o que pode dificultar a detecção da configuração incorreta.

o passo 3: Criar o trabalho

No seu workspace Databricks , clique em Fluxo de trabalho na barra lateral e, em seguida, clique em Criar tarefa . Dê ao trabalho um nome descritivo, por exemplo Market Analysis.

o passo 4: Configurar a tarefa de pesquisa SQL

A tarefa SQL executa sua consulta de configuração e disponibiliza sua saída para as tarefas subsequentes.

  1. No editor de tarefas, clique em Adicionar tarefa .

  2. Defina o nome da tarefa como read_markets.

  3. Set Type to SQL .

  4. No campo SQL , insira a seguinte consulta:

    SQL
    SELECT market, currency FROM config.markets
  5. Configure SQL warehouse para um repositório em seu workspace.

  6. Clique em Criar tarefa .

Quando esta tarefa é executada, Databricks executa a consulta e captura o resultado como uma matriz JSON em tasks.read_markets.output.rows. O resultado da tarefa SQL é sempre retornado como uma matriz JSON — nenhuma configuração adicional é necessária. A forma genérica desta referência é tasks.<task-name>.output.rows, onde <task-name> corresponde à key da tarefa que você definiu no editor de tarefas. O resultado será semelhante a este:

JSON
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]

o passo 5: Configure o For each tarefa

A tarefa For each lê a saída SQL e inicia uma execução de tarefa aninhada por linha.

  1. Clique em Adicionar tarefa e defina Depende de para read_markets.

  2. Defina o nome da tarefa como process_markets.

  3. Defina o tipo como "Para cada" .

  4. No campo "Entradas" , digite:


    {{tasks.read_markets.output.rows}}

    Isso faz referência à matriz de linhas capturada pela tarefa SQL.

  5. Defina a Concorrência como 2 para permitir a execução de duas iterações em paralelo. Aumente esse valor para melhorar a taxa de transferência ou se sua tarefa aninhada suportar maior paralelismo.

  6. Clique em Adicionar uma tarefa para repetir e configure a tarefa aninhada com base no tipo que você escolheu na etapa 2:

  1. Defina o nome da tarefa como run_market_analysis_iteration.

  2. Set Type to Notebook .

  3. Defina o caminho para o caminho do Notebook que você criou na etapa 2.

  4. Clique em Parâmetros e, em seguida, clique em Adicionar para adicionar cada um dos seguintes parâmetros:

    • chave : market, Valor : {{input.market}}
    • chave : currency, Valor : {{input.currency}}

    Cada referência {{input.<key>}} resolve para o campo correspondente do objeto de linha da iteração atual.

  5. Clique em Criar tarefa .

Seu DAG de trabalho agora mostra read_markets fluindo para process_markets, com a tarefa aninhada visível dentro do nó For each .

o passo 6: execute o Job e verifique

  1. Clique em "Executar agora" para iniciar a tarefa.
  2. Na página Execução do trabalho, clique no nó process_markets para expandir a tarefa For each .
  3. A página de execução do trabalho exibe uma tabela de iterações — uma linha por valor de mercado — cada uma mostrando seu status, horário de início e duração.
  4. Clique em qualquer linha de iteração para abrir o resultado da execução da tarefa e confirmar se ela recebeu o valor de mercado correto.

Se uma iteração específica falhar, você pode executar novamente apenas essa iteração na página de execução do Job, sem precisar executar o Job inteiro novamente.

Amplie o padrão

Para adicionar um novo mercado, insira uma linha na tabela de configuração:

SQL
INSERT INTO config.markets VALUES ('DE', 'EUR');

A próxima execução do Job incluirá automaticamente a Alemanha, sem necessidade de alterações na configuração do Job ou edições no Notebook.

Esse mesmo padrão funciona para qualquer caso de uso em que você queira que os dados orientem a iteração:

  • Processamento por cliente : Uma linha por ID de cliente; o Notebook aplica transformações específicas do cliente ou entrega em destinos específicos do cliente.
  • Ingestão de tabelas : Uma linha por nome de tabela de origem; o Notebook lê e ingere cada tabela.
  • Processamento de preenchimento retroativo : Uma linha por partição de data; o Notebook reprocessa os dados históricos para essa partição.
  • Execução orientada por flags de recurso : Uma linha por recurso ou experimento habilitado; o Notebook ativa a lógica correspondente.

Para remover um item do processamento, exclua a linha correspondente ou adicione uma coluna de sinalização active e filtre na consulta SQL:

SQL
SELECT market, currency FROM config.markets WHERE active = TRUE

Próximos passos