Carregue e processe dados incrementalmente com fluxos de pipeline declarativos LakeFlow
Os dados são processados no pipeline declarativo LakeFlow por meio de fluxos . Cada fluxo consiste em uma consulta e, normalmente, um destino . O fluxo processa a consulta, seja como um lote ou incrementalmente como uma transmissão de dados para o destino. Um fluxo reside dentro de um pipeline ETL no Databricks.
Normalmente, os fluxos são definidos automaticamente quando você cria uma consulta no pipeline declarativo LakeFlow que atualiza um destino, mas você também pode definir explicitamente fluxos adicionais para processamento mais complexo, como anexar a um único destino de várias fontes.
Atualizações
Um fluxo é executado cada vez que seu pipeline de definição é atualizado. O fluxo criará ou atualizará tabelas com os dados mais recentes disponíveis. Dependendo do tipo de fluxo e do estado das alterações nos dados, a atualização pode executar uma refresh incremental, que processa apenas novos registros, ou executar uma refresh completa, que reprocessa todos os registros da fonte de dados.
- Para obter mais informações sobre atualizações pipeline , consulte execução de uma atualização no pipeline declarativo LakeFlow.
- Para obter mais informações sobre programação e acionamento de atualizações, consulte Modo pipeline acionado vs. contínuo.
Criar um fluxo default
Ao criar um objeto de pipeline declarativo LakeFlow em um pipeline, você normalmente define uma tabela ou uma view junto com a consulta que a suporta. Por exemplo, nesta consulta SQL , você cria uma tabela de transmissão chamada customers_silver
lendo da tabela chamada customers_bronze
.
CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(customers_bronze)
Você também pode criar a mesma tabela de transmissão em Python. Em Python, você normalmente usa o pipeline declarativo LakeFlow criando uma função de consulta que retorna um dataframe, com decoradores para acessar a funcionalidade do pipeline declarativo LakeFlow :
from pyspark import pipelines as dp
@dp.table()
def customers_silver():
return spark.readStream.table("customers_bronze")
Neste exemplo, você criou uma tabela de transmissão . Você também pode criar uma visualização materializada com sintaxe semelhante em SQL e Python. Para mais informações consulte tabelas de transmissão e visualização materializada.
Este exemplo cria um fluxo default junto com a tabela de transmissão. O fluxo default para uma tabela de transmissão é um fluxo de acréscimo , que adiciona novas linhas a cada gatilho. Esta é a maneira mais comum de usar o pipeline declarativo LakeFlow para criar um fluxo e o destino em uma única etapa. Você pode usar esse estilo para ingerir dados ou para transformação de dados.
Os fluxos de acréscimo também oferecem suporte ao processamento que requer a leitura de dados de várias fontes de transmissão para atualizar um único destino. Por exemplo, você pode usar a funcionalidade de anexar fluxo quando tiver uma tabela e um fluxo de transmissão existentes e quiser adicionar uma nova fonte de transmissão que grave nessa tabela de transmissão existente.
Usando vários fluxos para gravar em um único destino
No exemplo anterior, você criou um fluxo e uma tabela de transmissão em um único passo. Você também pode criar fluxos para uma tabela criada anteriormente. Neste exemplo, você pode ver a criação de uma tabela e o fluxo associado a ela em passos separados. Este código tem resultados idênticos à criação de um fluxo default , incluindo o uso do mesmo nome para a tabela de transmissão e o fluxo.
- Python
- SQL
from pyspark import pipelines as dp
# create streaming table
dp.create_streaming_table("customers_silver")
# add a flow
@dp.append_flow(
target = "customers_silver")
def customer_silver():
return spark.readStream.table("customers_bronze")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_silver;
-- add a flow
CREATE FLOW customers_silver
AS INSERT INTO customers_silver BY NAME
SELECT * FROM STREAM(customers_bronze);
Criar um fluxo independentemente do destino significa que você também pode criar vários fluxos que anexam dados ao mesmo destino.
Use o decorador @dp.append_flow
na interface Python ou a cláusula CREATE FLOW...INSERT INTO
na interface SQL para criar um novo fluxo, por exemplo, para direcionar uma tabela de transmissão de várias fontes de transmissão. Use o fluxo de acréscimo para processar tarefas como as seguintes:
- Adicione fontes de transmissão que anexam dados a uma tabela de transmissão existente sem exigir uma refresh completa. Por exemplo, você pode ter uma tabela combinando dados regionais de todas as regiões em que opera. À medida que novas regiões são lançadas, você pode adicionar os dados da nova região à tabela sem realizar uma refresh completa. Para um exemplo de como adicionar fontes de transmissão a uma tabela de transmissão existente, consulte Exemplo: Gravar em uma tabela de transmissão a partir de vários tópicos Kafka.
- Atualize uma tabela de transmissão anexando dados históricos faltantes (preenchimento). Você pode usar a sintaxe
INSERT INTO ONCE
para criar um anexo de preenchimento histórico que é executado uma vez. Por exemplo, você tem uma tabela de transmissão existente que é gravada por um tópico Apache Kafka . Você também tem dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão, e não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Para ver um exemplo de preenchimento, consulte Histórico de preenchimento de dados com o pipeline declarativo LakeFlow. - Combine dados de várias fontes e grave em uma única tabela de transmissão em vez de usar a cláusula
UNION
em uma consulta. Usar o processamento de fluxo de acréscimo em vez deUNION
permite que você atualize a tabela de destino incrementalmente sem executar uma refresh completa. Para um exemplo de uma união feita dessa maneira, veja Exemplo: Usar processamento de fluxo de acréscimo em vez deUNION
.
O destino para a saída de registros pelo processamento do fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas Python, use a função create_streaming_table() para criar uma tabela de destino.
O exemplo a seguir adiciona dois fluxos para o mesmo destino, criando uma união das duas tabelas de origem:
- Python
- SQL
from pyspark import pipelines as dp
# create a streaming table
dp.create_streaming_table("customers_us")
# add the first append flow
@dp.append_flow(target = "customers_us")
def append1():
return spark.readStream.table("customers_us_west")
# add the second append flow
@dp.append_flow(target = "customers_us")
def append2():
return spark.readStream.table("customers_us_east")
-- create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_us;
-- add the first append flow
CREATE FLOW append1
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_west);
-- add the second append flow
CREATE FLOW append2
AS INSERT INTO customers_us BY NAME
SELECT * FROM STREAM(customers_us_east);
- Se você precisar definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função
create_streaming_table()
ou em uma definição de tabela existente. Você não pode definir expectativas na definição@append_flow
. - Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de transmissão. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:
- Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.
- Não é possível reutilizar um nome de fluxo em um pipeline, porque o ponto de verificação existente não corresponderá à nova definição de fluxo.
Tipos de fluxos
Os fluxos default para tabelas de transmissão e visualização materializada são fluxos de acréscimo. Você também pode criar fluxos para ler a partir da fonte de dados de captura de dados de alterações (CDC) . A tabela a seguir descreve os diferentes tipos de fluxos.
Tipo de fluxo | Descrição |
---|---|
Acrescentar | Os fluxos de acréscimo são o tipo mais comum de fluxo, em que novos registros na origem são gravados no destino a cada atualização. Correspondem ao modo acréscimo na transmissão estruturada. Você pode adicionar o sinalizador os fluxos padrão (criados com a tabela de transmissão de destino ou com view materializada) terão o mesmo nome que o destino. Outros alvos não têm fluxos default . |
Auto CDC ( aplicar alterações anteriormente) | Um fluxo Auto CDC ingere uma consulta contendo dados de captura de dados de alterações (CDC) (CDC). Os fluxos CDC automáticos só podem ter como alvo tabelas de transmissão, e a origem deve ser uma fonte de transmissão (mesmo no caso de fluxos Para obter mais informações sobre dados CDC , consulte APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline declarativo LakeFlow. |
Informações adicionais
Para mais informações sobre fluxos e seu uso, consulte os seguintes tópicos:
- Exemplos de fluxos no pipeline declarativo LakeFlow
- As APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com o pipeline declarativo LakeFlow
- Preenchimento de histórico de dados com pipeline declarativo LakeFlow
- Escrevendo pipeline declarativo LakeFlow em Python ou SQL
- Tabelas de streaming
- Visualizações materializadas
- Use sinks para transmitir registros para serviço externo com pipeline declarativo LakeFlow