Pular para o conteúdo principal

Carregue e processe dados de forma incremental com fluxos DLT

Este artigo explica o que são fluxos e como o senhor pode usar fluxos no pipeline DLT para processar dados de forma incremental de uma fonte para uma tabela de transmissão de destino. No DLT, os fluxos são definidos de duas maneiras:

  1. Um fluxo é definido automaticamente quando você cria uma consulta que atualiza uma tabela de transmissão.
  2. A DLT também oferece funcionalidade para definir explicitamente fluxos para processamento mais complexo, como anexar a uma tabela de transmissão a partir de várias fontes de transmissão.

Este artigo discute os fluxos implícitos criados quando você define uma consulta para atualizar uma tabela de transmissão e, em seguida, apresenta detalhes sobre a sintaxe para definir fluxos mais complexos.

O que é um fluxo?

Na DLT, um fluxo é uma consulta de transmissão que processa os dados de origem de forma incremental para atualizar uma tabela de transmissão de destino. A maioria dos conjuntos de dados DLT que o senhor cria em um pipeline define o fluxo como parte da consulta e não exige a definição explícita do fluxo. Por exemplo, o senhor cria uma tabela de transmissão no DLT em um único comando DDL em vez de usar instruções de tabela e fluxo separadas para criar a tabela de transmissão:

nota

Este exemplo CREATE FLOW é fornecido apenas para fins ilustrativos e inclui palavras-chave que não são sintaxe DLT válida.

SQL
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Além do fluxo default definido por uma consulta, as interfaces DLT Python e SQL oferecem a funcionalidade de fluxo de acréscimo . O fluxo Append suporta o processamento que requer a leitura de dados de várias fontes de transmissão para atualizar uma única tabela de transmissão. Por exemplo, o senhor pode usar a funcionalidade de anexar fluxo quando tiver uma tabela e um fluxo de transmissão existentes e quiser adicionar uma nova fonte de transmissão que grave nessa tabela de transmissão existente.

Usar o fluxo de acréscimo para gravar em uma tabela de transmissão a partir de várias fontes de transmissão

Utilize o decorador @append_flow na interface em Python ou a cláusula CREATE FLOW na interface em SQL para gravar em uma tabela de transmissão a partir de várias fontes de transmissão. Utilize o fluxo de acréscimo para tarefas de processamento como as seguintes:

  • Adicione fontes de transmissão que acrescentem dados a uma tabela de transmissão existente sem exigir um refresh completo. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que você opera. À medida que novas regiões são implementadas, o senhor pode adicionar os dados da nova região à tabela sem executar um refresh completo. Veja o exemplo: Gravação em uma tabela de transmissão a partir de vários tópicos do site Kafka.
  • Atualizar uma tabela de transmissão anexando dados históricos ausentes (backfilling). Por exemplo, o senhor tem uma tabela de transmissão existente que é gravada por um tópico Apache Kafka . O senhor também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão, mas não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Veja o exemplo: execução de um backfill de dados único.
  • Combine dados de várias fontes e grave em uma única tabela de transmissão em vez de usar a cláusula UNION em uma consulta. O uso do processamento de fluxo de acréscimo em vez de UNION permite que o senhor atualize a tabela de destino de forma incremental sem executar uma atualização completa em refresh. Veja o exemplo: use o processamento de fluxo de acréscimo em vez de UNION.

O alvo para a saída de registros pelo processamento do fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas em Python, use a função create_streaming_table() para criar uma tabela de destino.

important
  • Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função create_streaming_table() ou em uma definição de tabela existente. Você não pode definir expectativas na definição @append_flow.
  • Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de transmissão. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
    • Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
    • você não pode reutilizar um nome de fluxo em um pipeline porque o ponto de verificação existente não corresponderá à nova definição de fluxo.

Veja a seguir a sintaxe de @append_flow

Python
import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)

Exemplo: Gravar em uma tabela de transmissão a partir de vários tópicos do site Kafka

Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:

Python
import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

Para saber mais sobre a função de valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Em Python, o senhor pode criar, de forma programática, vários fluxos direcionados a uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.

nota

Esse padrão tem os mesmos requisitos de usar um loop for para criar tabelas. O senhor deve passar explicitamente um valor Python para a função que define o fluxo. Consulte Criar tabelas em um loop for.

Python
import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

Exemplo: execução de um backfill de dados único

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:

nota

Para garantir um verdadeiro preenchimento único quando a consulta de backfill fizer parte de um pipeline executado de forma agendada ou contínua, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem no diretório de backfill, deixe a consulta no lugar.

Python
import dlt

@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

Exemplo: use o processamento de fluxo de acréscimo em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION, o senhor pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. O uso de consultas de fluxo de acréscimo em vez de UNION permite que o senhor faça acréscimos a uma tabela de transmissão de várias fontes sem executar um refreshcompleto.

O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

Python
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

Python
dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")