Pular para o conteúdo principal

Carregue e processe dados de forma incremental com fluxos DLT

Os dados são processados na DLT por meio de fluxos . Cada fluxo consiste em uma consulta e, normalmente, em uma meta . O fluxo processa a consulta, seja como um lote ou de forma incremental, como uma transmissão de dados para o destino. Um fluxo reside em um pipeline de ETL na Databricks.

Normalmente, os fluxos são definidos automaticamente quando o usuário cria uma consulta na DLT que atualiza um destino, mas também é possível definir explicitamente fluxos adicionais para um processamento mais complexo, como anexar a um único destino a partir de várias fontes.

Atualizações

Um fluxo é executado toda vez que sua definição pipeline é atualizada. O fluxo criará ou atualizará tabelas com os dados mais recentes disponíveis. Dependendo do tipo de fluxo e do estado das alterações nos dados, a atualização pode executar um refresh incremental, que processa apenas os novos registros, ou executar um refresh completo, que reprocessa todos os registros da fonte de dados.

Crie um fluxo default

Ao criar um objeto DLT em um pipeline, o senhor normalmente define uma tabela ou um view juntamente com a consulta que o suporta. Por exemplo, nesta consulta SQL, o senhor cria uma tabela de transmissão chamada customers_silver lendo a partir da tabela chamada customers_bronze.

SQL
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)

O senhor também pode criar a mesma tabela de transmissão em Python. Em Python, o senhor normalmente usa a DLT criando uma função de consulta que retorna um dataframe, com decoradores para acessar a funcionalidade da DLT:

Python
import dlt

@dlt.table()
def customers_silver():
return spark.readStream.table("customers_bronze")

Neste exemplo, o senhor criou uma tabela de transmissão . O senhor também pode criar uma visualização materializada com sintaxe semelhante em SQL e Python. Para obter mais informações, consulte Tabelas de transmissão e Materialized view.

Este exemplo cria um fluxo default junto com a tabela de transmissão. O fluxo default para uma tabela de transmissão é um fluxo de acréscimo , que adiciona novas linhas a cada acionador. Essa é a maneira mais comum de usar a DLT, para criar um fluxo e o destino em uma única etapa. O senhor pode usar esse estilo para ingerir dados ou para transformação de dados.

Os fluxos de acréscimo também suportam o processamento que requer a leitura de dados de várias fontes de transmissão para atualizar um único destino. Por exemplo, o senhor pode usar a funcionalidade de anexar fluxo quando tiver uma tabela e um fluxo de transmissão existentes e quiser adicionar uma nova fonte de transmissão que grave nessa tabela de transmissão existente.

Usando vários fluxos para gravar em um único destino

No exemplo anterior, o senhor criou um fluxo e uma tabela de transmissão em uma única etapa. Você também pode criar fluxos para uma tabela criada anteriormente. Neste exemplo, você pode ver a criação de uma tabela e o fluxo associado a ela em etapas separadas. Esse código tem resultados idênticos aos da criação de um fluxo default, incluindo o uso do mesmo nome para a tabela de transmissão e o fluxo.

Python
import dlt

# create streaming table
dlt.create_streaming_table("customers_silver")

# add a flow
@dlt.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")

Criar um fluxo independente do destino significa que você também pode criar vários fluxos que anexam dados ao mesmo destino.

Use o decorador @append_flow na interface Python ou a cláusula CREATE FLOW...INSERT INTO na interface SQL para criar um novo fluxo, por exemplo, para direcionar uma tabela de transmissão de várias fontes de transmissão. Use o fluxo de acréscimo para processar tarefas como as seguintes:

  • Adicione fontes de transmissão que acrescentem dados a uma tabela de transmissão existente sem exigir um refresh completo. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que você opera. À medida que novas regiões são implementadas, o senhor pode adicionar os dados da nova região à tabela sem executar um refresh completo. Para obter um exemplo de adição de fontes de transmissão à tabela de transmissão existente, consulte Exemplo: Gravação em uma tabela de transmissão a partir de vários tópicos do site Kafka.
  • Atualizar uma tabela de transmissão anexando dados históricos ausentes (backfilling). Por exemplo, o senhor tem uma tabela de transmissão existente que é gravada por um tópico Apache Kafka . O senhor também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão, mas não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Para obter um exemplo de backfill, consulte Exemplo: execução de um backfill de dados único.
  • Combine dados de várias fontes e grave em uma única tabela de transmissão em vez de usar a cláusula UNION em uma consulta. Usar o processamento de fluxo de acréscimo em vez de UNION permite que o senhor atualize a tabela de destino de forma incremental sem executar uma atualização completa em refresh. Para ver um exemplo de união feita dessa maneira, consulte Exemplo: Use o processamento de fluxo de acréscimo em vez de UNION.

O alvo para a saída de registros pelo processamento do fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas em Python, use a função create_streaming_table() para criar uma tabela de destino.

O exemplo a seguir adiciona dois fluxos para o mesmo destino, criando uma união das duas tabelas de origem:

Python
import dlt

# create a streaming table
dlt.create_streaming_table("customers_us")

# add the first append flow
@dlt.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")

# add the second append flow
@dlt.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
important
  • Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função create_streaming_table() ou em uma definição de tabela existente. Você não pode definir expectativas na definição @append_flow.
  • Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de transmissão. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
    • Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
    • você não pode reutilizar um nome de fluxo em um pipeline porque o ponto de verificação existente não corresponderá à nova definição de fluxo.

Tipos de fluxos

Os fluxos default para tabelas de transmissão e visualização materializada são fluxos de acréscimo. O senhor também pode criar fluxos para ler a partir da fonte de dados da captura de dados de alterações (CDC) . A tabela a seguir descreve os diferentes tipos de fluxos.

Tipo de fluxo

Descrição

Acrescentar

Os fluxos de acréscimo são o tipo de fluxo mais comum, em que novos registros na origem são gravados no destino a cada atualização. Eles correspondem ao modo append na transmissão estruturada. O senhor pode adicionar o sinalizador ONCE, indicando uma consulta de lotes cujos dados devem ser inseridos no destino apenas uma vez, a menos que o destino seja totalmente atualizado. Qualquer número de fluxos de acréscimo pode ser gravado em um destino específico.

os fluxos padrão (criados com a tabela de transmissão de destino ou materializados view) terão o mesmo nome do destino. Outros alvos não têm fluxos default.

Aplicar alterações

Um fluxo de aplicação de alterações ingere uma consulta que contém dados de captura de dados de alterações (CDC) (CDC). Os fluxos de aplicação de alterações só podem ter como alvo tabelas de transmissão, e a origem deve ser uma origem de transmissão (mesmo no caso de fluxos ONCE ). Vários fluxos de aplicação de alterações podem ter como alvo uma única tabela de transmissão. Uma tabela de transmissão que atue como destino de um fluxo de aplicação de alterações só pode ser alvo de outros fluxos de aplicação de alterações.

Para obter mais informações sobre os dados do CDC, consulte a seção APPLY CHANGES APIs: Simplificar a captura de dados de alterações (CDC) com DLT.

Informações adicionais

Para obter mais informações sobre fluxos e seu uso, consulte os tópicos a seguir: