Carregue e processe dados de forma incremental com fluxos DLT
Este artigo explica o que são fluxos e como o senhor pode usar fluxos no pipeline DLT para processar dados de forma incremental de uma fonte para uma tabela de transmissão de destino. No DLT, os fluxos são definidos de duas maneiras:
- Um fluxo é definido automaticamente quando você cria uma consulta que atualiza uma tabela de transmissão.
- A DLT também oferece funcionalidade para definir explicitamente fluxos para processamento mais complexo, como anexar a uma tabela de transmissão a partir de várias fontes de transmissão.
Este artigo discute os fluxos implícitos criados quando você define uma consulta para atualizar uma tabela de transmissão e, em seguida, apresenta detalhes sobre a sintaxe para definir fluxos mais complexos.
O que é um fluxo?
Na DLT, um fluxo é uma consulta de transmissão que processa os dados de origem de forma incremental para atualizar uma tabela de transmissão de destino. A maioria dos conjuntos de dados DLT que o senhor cria em um pipeline define o fluxo como parte da consulta e não exige a definição explícita do fluxo. Por exemplo, o senhor cria uma tabela de transmissão no DLT em um único comando DDL em vez de usar instruções de tabela e fluxo separadas para criar a tabela de transmissão:
Este exemplo CREATE FLOW
é fornecido apenas para fins ilustrativos e inclui palavras-chave que não são sintaxe DLT válida.
CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")
-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;
CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");
Além do fluxo default definido por uma consulta, as interfaces DLT Python e SQL oferecem a funcionalidade de fluxo de acréscimo . O fluxo Append suporta o processamento que requer a leitura de dados de várias fontes de transmissão para atualizar uma única tabela de transmissão. Por exemplo, o senhor pode usar a funcionalidade de anexar fluxo quando tiver uma tabela e um fluxo de transmissão existentes e quiser adicionar uma nova fonte de transmissão que grave nessa tabela de transmissão existente.
Usar o fluxo de acréscimo para gravar em uma tabela de transmissão a partir de várias fontes de transmissão
Utilize o decorador @append_flow
na interface em Python ou a cláusula CREATE FLOW
na interface em SQL para gravar em uma tabela de transmissão a partir de várias fontes de transmissão. Utilize o fluxo de acréscimo para tarefas de processamento como as seguintes:
- Adicione fontes de transmissão que acrescentem dados a uma tabela de transmissão existente sem exigir um refresh completo. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que você opera. À medida que novas regiões são implementadas, o senhor pode adicionar os dados da nova região à tabela sem executar um refresh completo. Veja o exemplo: Gravação em uma tabela de transmissão a partir de vários tópicos do site Kafka.
- Atualizar uma tabela de transmissão anexando dados históricos ausentes (backfilling). Por exemplo, o senhor tem uma tabela de transmissão existente que é gravada por um tópico Apache Kafka . O senhor também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão, mas não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Veja o exemplo: execução de um backfill de dados único.
- Combine dados de várias fontes e grave em uma única tabela de transmissão em vez de usar a cláusula
UNION
em uma consulta. O uso do processamento de fluxo de acréscimo em vez deUNION
permite que o senhor atualize a tabela de destino de forma incremental sem executar uma atualização completa em refresh. Veja o exemplo: use o processamento de fluxo de acréscimo em vez deUNION
.
O alvo para a saída de registros pelo processamento do fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas em Python, use a função create_streaming_table() para criar uma tabela de destino.
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função
create_streaming_table()
ou em uma definição de tabela existente. Você não pode definir expectativas na definição@append_flow
. - Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de transmissão. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
- você não pode reutilizar um nome de fluxo em um pipeline porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
Veja a seguir a sintaxe de @append_flow
- Python
- SQL
import dlt
dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dlt.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming query>)
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.
CREATE FLOW
flow_name
AS INSERT INTO
append_target BY NAME
SELECT * FROM
source;
Exemplo: Gravar em uma tabela de transmissão a partir de vários tópicos do site Kafka
Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target
e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:
- Python
- SQL
import dlt
dlt.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para saber mais sobre a função de valor de tabela read_kafka()
usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.
Em Python, o senhor pode criar, de forma programática, vários fluxos direcionados a uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.
Esse padrão tem os mesmos requisitos de usar um loop for
para criar tabelas. O senhor deve passar explicitamente um valor Python para a função que define o fluxo. Consulte Criar tabelas em um loop for
.
import dlt
dlt.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exemplo: execução de um backfill de dados único
Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:
Para garantir um verdadeiro preenchimento único quando a consulta de backfill fizer parte de um pipeline executado de forma agendada ou contínua, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem no diretório de backfill, deixe a consulta no lugar.
- Python
- SQL
import dlt
@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
STREAM read_files(
"path/to/sourceDir",
format => "csv"
);
CREATE FLOW
backfill
AS INSERT INTO
csv_target BY NAME
SELECT * FROM
STREAM read_files(
"path/to/backfill/data/dir",
format => "csv"
);
Exemplo: use o processamento de fluxo de acréscimo em vez de UNION
Em vez de usar uma consulta com uma cláusula UNION
, o senhor pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. O uso de consultas de fluxo de acréscimo em vez de UNION
permite que o senhor faça acréscimos a uma tabela de transmissão de várias fontes sem executar um refreshcompleto.
O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION
:
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
return raw_orders_us.union(raw_orders_eu)
Os exemplos a seguir substituem a consulta UNION
por consultas de fluxo de acréscimo:
- Python
- SQL
dlt.create_streaming_table("raw_orders")
@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
CREATE OR REFRESH STREAMING TABLE STREAM(raw_orders);
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);