Usar uma tabela de pesquisa para matrizes de parâmetros grandes em uma tarefa For each
For each A tarefa passa matrizes de parâmetros para a tarefa aninhada de forma iterativa, para fornecer a cada tarefa aninhada informações para sua execução. As matrizes de parâmetros são limitadas a 10.000 caracteres ou 48 KB se o senhor usar referências de valor de tarefa para passá-las. Quando o senhor tem uma quantidade maior de dados para passar para a tarefa aninhada, não é possível usar diretamente os parâmetros de entrada, valores da tarefa ou Job para passar esses dados.
Uma alternativa para passar os dados completos é armazenar os dados da tarefa como um arquivo JSON e passar uma pesquisa key (nos dados JSON ) por meio da entrada da tarefa em vez dos dados completos. A tarefa aninhada pode usar o site key para recuperar os dados específicos necessários para cada iteração.
O exemplo a seguir mostra um arquivo de configuração JSON de amostra e como passar parâmetros para uma tarefa aninhada que procura os valores na configuração JSON.
Exemplo de configuração JSON
Esse exemplo de configuração é uma lista de etapas, com parâmetros (args) para cada iteração (somente três etapas são mostradas neste exemplo). Suponha que esse arquivo JSON seja salvo como /Workspace/Users/<user>/copy-filtered-table-config.json. Fazemos referência a isso na tarefa aninhada.
{
  "steps": [
    {
      "key": "table_1",
      "args": {
        "catalog": "my-catalog",
        "schema": "my-schema",
        "source_table": "raw_data_table_1",
        "destination_table": "filtered_table_1",
        "filter_column": "col_a",
        "filter_value": "value_1"
      }
    }
    {
      "key": "table_2",
      "args": {
        "catalog": "my-catalog",
        "schema": "my-schema",
        "source_table": "raw_data_table_2",
        "destination_table": "filtered_table_2",
        "filter_column": "col_b",
        "filter_value": "value_2"
      }
    },
    {
      "key": "table_3",
      "args": {
        "catalog": "my-catalog",
        "schema": "my-schema",
        "source_table": "raw_data_table_3",
        "destination_table": "filtered_table_3",
        "filter_column": "col_c",
        "filter_value": "value_3"
      }
    },
  ]
}
Amostra For each tarefa
A tarefa For each em seu trabalho inclui a entrada com a chave para cada iteração. Este exemplo mostra uma tarefa chamada copy-filtered-tables com as Entradas definidas como ["table_1","table_2","table_3"]. Essa lista é limitada a 10.000 caracteres, mas como o senhor está apenas passando a chave, ela é muito menor do que os dados completos.
Neste exemplo, as etapas não dependem de outras etapas ou tarefas, portanto, podemos definir uma simultaneidade maior que 1 para tornar a execução da tarefa mais rápida.

Exemplo de tarefa aninhada
A tarefa aninhada recebe a entrada da tarefa pai For each. Nesse caso, configuramos a entrada para ser usada como Key para o arquivo de configuração. A imagem a seguir mostra a tarefa aninhada, incluindo a configuração de um parâmetro chamado key com o valor {{input}}.

Essa tarefa é um Notebook que contém código. No Notebook, o senhor pode usar o seguinte código Python para ler a entrada e usá-la como key no arquivo de configuração JSON. Os dados do arquivo JSON são usados para ler, filtrar e gravar dados de uma tabela.
# copy-filtered-table (iteratable task code to read a table, filter by a value, and write as a new table)
from pyspark.sql.functions import expr
from types import SimpleNamespace
import json
# If the notebook is run outside of a job with a key parameter, this provides
# a default. This allows testing outside of a For each task
dbutils.widgets.text("key", "table_1", "key")
# load configuration (note that the path must be set to valid configuration file)
config_path = "/Workspace/Users/<user>/copy-filtered-table-config.json"
with open(config_path, "r") as file:
    config = json.loads(file.read())
# look up step and arguments
key = dbutils.widgets.get("key")
current_step = next((step for step in config['steps'] if step['key'] == key), None)
if current_step is None:
    raise ValueError(f"Could not find step '{key}' in the configuration")
args = SimpleNamespace(**current_step["args"])
# read the source table defined for the step, and filter it
df = spark.read.table(f"{args.catalog}.{args.schema}.{args.source_table}") \
          .filter(expr(f"{args.filter_column} like '%{args.filter_value}%'"))
# write the filtered table to the destination
df.write.mode("overwrite").saveAsTable(f"{args.catalog}.{args.schema}.{args.destination_table}")