Pular para o conteúdo principal

Use uma tabela de controle para executar um trabalho For each

Pode ser necessário ingerir dados de muitas fontes. Quando essa lista muda, codificar essa informação na configuração do Job significa alterar o código e reimplementar. Utilize metadados para abordar essa questão, armazenando a lista de fontes em uma tabela que é lida e utilizada em tempo de execução. Adicione uma fonte como uma nova linha e a próxima execução do Job a incorpora sem alterações no próprio Job.

Este tutorial mostra como criar um Job usando essa abordagem. Uma tarefa SQL lê a tabela de controle e uma tarefa For each itera sobre cada linha em paralelo.

Como funciona

O padrão utiliza três tipos de tarefas interligadas em sequência:

Tarefa

Tipo

O que faz

read_markets

SQL

Consulta uma tabela de configuração e captura o resultado como uma matriz de linhas.

process_markets

Para cada

Itera sobre {{tasks.read_markets.output.rows}}, executando a tarefa aninhada uma vez por linha.

run_market_analysis_iteration

Notebook ou SQL (aninhado dentro de um For each)

Execução uma vez por linha, usando os valores das linhas passados como parâmetros para executar sua lógica de negócios.

A saída da tarefa SQL — uma matriz JSON de objetos de linha — flui diretamente para o campo Entradas da tarefa For each usando a referência de valor dinâmico {{tasks.read_markets.output.rows}}. A tarefa For each passa então cada linha para a tarefa aninhada como parâmetros, disponíveis como {{input.market}} e {{input.currency}}.

Pré-requisitos

  • Um workspace Databricks com permissão para criar Jobs e Notebooks.
  • Permissão para criar tabelas no Unity Catalog
  • Um esquema Unity Catalog onde você pode criar a tabela de configuração (por exemplo, config)
  • Um SQL warehouse para executar a tarefa SQL

o passo 1: Criar a tabela de configuração

A tabela de configurações é o seu plano de controle. Contém a lista de valores que o job processa. Quando você precisa adicionar ou remover trabalho, você atualiza esta tabela, e não o job.

Execute o seguinte SQL para criar uma tabela markets em seu esquema config :

SQL
CREATE OR REPLACE TABLE config.markets AS
SELECT * FROM VALUES
('NL', 'EUR'),
('UK', 'GBP'),
('US', 'USD')
AS t(market, currency);

Você pode usar um Notebook Databricks , o editor SQL ou qualquer tarefa SQL para executar esta instrução. Após este passo, config.markets contém três linhas, uma por mercado, cada uma com seu código de moeda.

o passo 2: Escreva o código de processamento

A tarefa aninhada dentro de For each tarefa execução uma vez por linha. Escolha uma tarefa de Notebook ou uma tarefa SQL , dependendo da sua lógica de negócios.

Crie um novo Notebook em um caminho como /Workspace/Users/<username>/process_market. Esta execução do Notebook ocorre uma vez por iteração da tarefa For each , recebendo um valor de mercado diferente a cada vez.

Adicione o seguinte código ao Notebook:

Python
# Set default values for testing the notebook outside of a job.
# When the notebook runs inside a For each task, the job overrides these defaults.
dbutils.widgets.text("market", "NL", "Market")
dbutils.widgets.text("currency", "EUR", "Currency")

# Read the parameters passed by the For each task
market = dbutils.widgets.get("market")
currency = dbutils.widgets.get("currency")

print(f"Processing market: {market} ({currency})")

# Your business logic goes here. For example:
df = spark.table("sales.transactions").filter(
f"market = '{market}' AND currency_code = '{currency}'"
)
display(df)

As chamadas dbutils.widgets.text() definem valores default para que você possa executar o Notebook diretamente em seu workspace sem conectá-lo a um Job. Quando a execução do Notebook é uma tarefa aninhada dentro de uma tarefa For each , o Job substitui o padrão pelos valores de parâmetro reais para essa iteração.

Chame dbutils.widgets.text() antes de dbutils.widgets.get(). Se get for chamado antes de text, o Notebook gera um erro InputWidgetNotDefined quando você o executa fora de um Job.

Usar padrões permite que você teste o notebook fora de um job, mas observe a desvantagem: se a tarefa For each estiver configurada incorretamente e não passar parâmetros, o notebook usa os padrões e é bem-sucedido silenciosamente em vez de falhar, o que pode tornar a configuração incorreta mais difícil de detectar.

o passo 3: Criar o trabalho

No seu workspace Databricks , clique em Fluxo de trabalho na barra lateral e, em seguida, clique em Criar tarefa . Dê ao trabalho um nome descritivo, por exemplo Market Analysis.

o passo 4: Configurar a tarefa de pesquisa SQL

A tarefa SQL executa sua consulta de configuração e disponibiliza sua saída para as tarefas subsequentes.

  1. No editor de tarefas, clique em Adicionar tarefa .

  2. Defina o nome da tarefa como read_markets.

  3. Set Type to SQL .

  4. No campo SQL , insira a seguinte consulta:

    SQL
    SELECT market, currency FROM config.markets
  5. Configure SQL warehouse para um repositório em seu workspace.

  6. Clique em Criar tarefa .

Quando esta tarefa for executada, o Databricks executa a query e captura o resultado como um array JSON em tasks.read_markets.output.rows. A saída da tarefa SQL é sempre retornada como uma matriz JSON; nenhuma configuração adicional é necessária. A forma genérica desta referência é tasks.<task-name>.output.rows, onde <task-name> corresponde à key da tarefa que você definiu no editor de Jobs. A saída fica assim:

JSON
[
{ "market": "NL", "currency": "EUR" },
{ "market": "UK", "currency": "GBP" },
{ "market": "US", "currency": "USD" }
]

o passo 5: Configure o For each tarefa

A tarefa For each lê a saída SQL e inicia uma execução de tarefa aninhada por linha.

  1. Clique em Adicionar tarefa e defina Depende de para read_markets.

  2. Defina o nome da tarefa como process_markets.

  3. Defina o tipo como "Para cada" .

  4. No campo "Entradas" , digite:


    {{tasks.read_markets.output.rows}}

    Isso faz referência à matriz de linhas capturada pela tarefa SQL.

  5. Defina a Concorrência como 2 para permitir a execução de duas iterações em paralelo. Aumente esse valor para melhorar a taxa de transferência ou se sua tarefa aninhada suportar maior paralelismo.

  6. Clique em Adicionar uma tarefa para repetir e configure a tarefa aninhada com base no tipo que você escolheu na etapa 2:

  1. Defina o nome da tarefa como run_market_analysis_iteration.

  2. Set Type to Notebook .

  3. Defina o caminho para o caminho do Notebook que você criou na etapa 2.

  4. Clique em Parâmetros e, em seguida, clique em Adicionar para adicionar cada um dos seguintes parâmetros:

    • chave : market, Valor : {{input.market}}
    • chave : currency, Valor : {{input.currency}}

    Cada referência {{input.<key>}} resolve para o campo correspondente do objeto de linha da iteração atual.

  5. Clique em Criar tarefa .

Seu DAG de trabalho agora mostra read_markets fluindo para process_markets, com a tarefa aninhada visível dentro do nó For each .

o passo 6: execute o Job e verifique

  1. Clique em "Executar agora" para iniciar a tarefa.
  2. Na página Execução do trabalho, clique no nó process_markets para expandir a tarefa For each .
  3. A página de execução do Job mostra uma tabela de iterações, uma linha por valor de mercado, cada uma mostrando seu status, hora de início e duração.
  4. Clique em qualquer linha de iteração para abrir o resultado da execução da tarefa e confirmar se ela recebeu o valor de mercado correto.

Se uma iteração específica falhar, você pode executar novamente apenas essa iteração na página de execução do Job, sem precisar executar o Job inteiro novamente.

Amplie o padrão

Para adicionar um novo mercado, insira uma linha na tabela de configuração:

SQL
INSERT INTO config.markets VALUES ('DE', 'EUR');

A próxima execução do Job incluirá automaticamente a Alemanha, sem necessidade de alterações na configuração do Job ou edições no Notebook.

Esse mesmo padrão funciona para qualquer caso de uso em que você queira que os dados orientem a iteração:

  • Processamento por cliente : Uma linha por ID de cliente; o Notebook aplica transformações específicas do cliente ou entrega em destinos específicos do cliente.
  • Ingestão de tabelas : Uma linha por nome de tabela de origem; o Notebook lê e ingere cada tabela.
  • Processamento de preenchimento retroativo : Uma linha por partição de data; o Notebook reprocessa os dados históricos para essa partição.
  • Execução orientada por flags de recurso : Uma linha por recurso ou experimento habilitado; o Notebook ativa a lógica correspondente.

Para remover um item do processamento, exclua a linha correspondente ou adicione uma coluna de sinalização active e filtre na consulta SQL:

SQL
SELECT market, currency FROM config.markets WHERE active = TRUE

Próximos passos