Carregue e processe dados de forma incremental com o LakeFlow Declarative pipeline flows
Os dados são processados no pipeline declarativo LakeFlow por meio de fluxos . Cada fluxo consiste em uma consulta e, normalmente, em uma meta . O fluxo processa a consulta, seja como um lote ou de forma incremental, como uma transmissão de dados para o destino. Um fluxo reside em um pipeline de ETL na Databricks.
Normalmente, os fluxos são definidos automaticamente quando o usuário cria uma consulta no pipeline LakeFlow Declarative que atualiza um destino, mas também é possível definir explicitamente fluxos adicionais para um processamento mais complexo, como anexar a um único destino a partir de várias origens.
Atualizações
Um fluxo é executado toda vez que sua definição pipeline é atualizada. O fluxo criará ou atualizará tabelas com os dados mais recentes disponíveis. Dependendo do tipo de fluxo e do estado das alterações nos dados, a atualização pode executar um refresh incremental, que processa apenas os novos registros, ou executar um refresh completo, que reprocessa todos os registros da fonte de dados.
- Para obter mais informações sobre atualizações em pipeline, consulte executar uma atualização em LakeFlow Declarative pipeline.
- Para obter mais informações sobre como programar e acionar atualizações, consulte Modo acionado vs. contínuo pipeline.
Crie um fluxo default
Ao criar um objeto LakeFlow Declarative pipeline em um pipeline, o senhor normalmente define uma tabela ou um view juntamente com a consulta que o suporta. Por exemplo, nesta consulta SQL, o senhor cria uma tabela de transmissão chamada customers_silver
lendo a partir da tabela chamada customers_bronze
.
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
O senhor também pode criar a mesma tabela de transmissão em Python. Em Python, o senhor normalmente usa o pipeline declarativo LakeFlow criando uma função de consulta que retorna um dataframe, com decoradores para acessar a funcionalidade do pipeline declarativo LakeFlow:
import dlt
@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
Neste exemplo, o senhor criou uma tabela de transmissão . O senhor também pode criar uma visualização materializada com sintaxe semelhante em SQL e Python. Para obter mais informações, consulte Tabelas de transmissão e Materialized view.
Este exemplo cria um fluxo default junto com a tabela de transmissão. O fluxo default para uma tabela de transmissão é um fluxo de acréscimo , que adiciona novas linhas a cada acionador. Essa é a maneira mais comum de usar o pipeline declarativo LakeFlow, para criar um fluxo e o destino em uma única etapa. O senhor pode usar esse estilo para ingerir dados ou para transformação de dados.
Os fluxos de acréscimo também suportam o processamento que requer a leitura de dados de várias fontes de transmissão para atualizar um único destino. Por exemplo, o senhor pode usar a funcionalidade de anexar fluxo quando tiver uma tabela e um fluxo de transmissão existentes e quiser adicionar uma nova fonte de transmissão que grave nessa tabela de transmissão existente.
Usando vários fluxos para gravar em um único destino
No exemplo anterior, o senhor criou um fluxo e uma tabela de transmissão em uma única etapa. Você também pode criar fluxos para uma tabela criada anteriormente. Neste exemplo, você pode ver a criação de uma tabela e o fluxo associado a ela em etapas separadas. Esse código tem resultados idênticos aos da criação de um fluxo default, incluindo o uso do mesmo nome para a tabela de transmissão e o fluxo.
- Python
- SQL
import dlt
# create streaming table
dlt.create_streaming_table("customers_silver")
# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Criar um fluxo independente do destino significa que você também pode criar vários fluxos que anexam dados ao mesmo destino.
Use o decorador @append_flow
na interface Python ou a cláusula CREATE FLOW...INSERT INTO
na interface SQL para criar um novo fluxo, por exemplo, para direcionar uma tabela de transmissão de várias fontes de transmissão. Use o fluxo de acréscimo para processar tarefas como as seguintes:
- Adicione fontes de transmissão que acrescentem dados a uma tabela de transmissão existente sem exigir um refresh completo. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que você opera. À medida que novas regiões são implementadas, o senhor pode adicionar os dados da nova região à tabela sem executar um refresh completo. Para obter um exemplo de adição de fontes de transmissão à tabela de transmissão existente, consulte Exemplo: Gravação em uma tabela de transmissão a partir de vários tópicos do site Kafka.
- Atualizar uma tabela de transmissão anexando dados históricos ausentes (backfilling). Por exemplo, o senhor tem uma tabela de transmissão existente que é gravada por um tópico Apache Kafka . O senhor também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão, mas não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Para obter um exemplo de backfill, consulte Exemplo: execução de um backfill de dados único.
- Combine dados de várias fontes e grave em uma única tabela de transmissão em vez de usar a cláusula
UNION
em uma consulta. Usar o processamento de fluxo de acréscimo em vez deUNION
permite que o senhor atualize a tabela de destino de forma incremental sem executar uma atualização completa em refresh. Para ver um exemplo de união feita dessa maneira, consulte Exemplo: Use o processamento de fluxo de acréscimo em vez deUNION
.
O alvo para a saída de registros pelo processamento do fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas em Python, use a função create_streaming_table() para criar uma tabela de destino.
O exemplo a seguir adiciona dois fluxos para o mesmo destino, criando uma união das duas tabelas de origem:
- Python
- SQL
import dlt
# create a streaming table
dlt.create_streaming_table("customers_us")
# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função
create_streaming_table()
ou em uma definição de tabela existente. Você não pode definir expectativas na definição@append_flow
. - Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de transmissão. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
- O senhor não pode reutilizar um nome de fluxo em um pipeline porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
Tipos de fluxos
Os fluxos default para tabelas de transmissão e visualização materializada são fluxos de acréscimo. O senhor também pode criar fluxos para ler a partir da fonte de dados da captura de dados de alterações (CDC) . A tabela a seguir descreve os diferentes tipos de fluxos.
Tipo de fluxo | Descrição |
---|---|
Acrescentar | Os fluxos de acréscimo são o tipo de fluxo mais comum, em que novos registros na origem são gravados no destino a cada atualização. Eles correspondem ao modo append na transmissão estruturada. O senhor pode adicionar o sinalizador os fluxos padrão (criados com a tabela de transmissão de destino ou materializados view) terão o mesmo nome do destino. Outros alvos não têm fluxos default. |
Auto CDC ( aplicar previamente as alterações) | Um fluxo Auto CDC ingere uma consulta que contém dados de captura de dados de alterações (CDC) (CDC). Os fluxos do Auto CDC só podem ter como alvo tabelas de transmissão, e a origem deve ser uma origem de transmissão (mesmo no caso de fluxos do Para obter mais informações sobre dados em CDC, consulte AUTO CDC APIs: Simplifique a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow. |
Informações adicionais
Para obter mais informações sobre fluxos e seu uso, consulte os tópicos a seguir: