Tutorial: Crie seu primeiro pipeline usando o Editor LakeFlow Pipelines
Aprenda como criar um novo pipeline usando LakeFlow Spark Declarative Pipeline (SDP) para orquestração de dados e Auto Loader. Este tutorial amplia o pipeline de exemplo, limpando os dados e criando uma consulta para encontrar os 100 principais usuários.
Neste tutorial, você aprenderá como usar o Editor LakeFlow Pipelines para:
- Crie um novo pipeline com a estrutura de pastas default e comece com um conjunto de arquivos de exemplo.
- Defina restrições de qualidade de dados usando expectativas.
- Utilize o recurso do editor para estender o pipeline com novas transformações para realizar análises em seus dados.
Requisitos
Antes de começar este tutorial, você deve:
- O usuário "Be" faz login em um workspace Databricks .
- Ative Unity Catalog para seu workspace.
- É necessário habilitar o editor LakeFlow Pipelines em seu workspace e optar por participar. Consulte Ativar o Editor LakeFlow Pipelines e monitoramento atualizado.
- Ter permissão para criar um recurso compute ou acesso a um recurso compute .
- Possuir permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGESouUSE CATALOGeCREATE SCHEMA.
o passo 1: Criar um pipeline
Nesta etapa, você cria um pipeline usando a estrutura de pastas default e exemplos de código. Os exemplos de código fazem referência à tabela users na fonte de dados de exemplo wanderbricks .
-
No seu workspace Databricks , clique em
Novo , então
pipelineETL . Isso abre o editor pipeline na página de criação de pipeline .
-
Clique no cabeçalho para dar um nome ao seu pipeline.
-
Logo abaixo do nome, escolha o catálogo e o esquema default para suas tabelas de saída. Essas opções são usadas quando você não especifica um catálogo e um esquema nas definições do seu pipeline.
-
Em Próximo passo para seu pipeline , clique em
Comece com um exemplo de código em SQL ou
Comece com um código de exemplo em Python , com base na sua preferência de linguagem. Isso altera o idioma default do seu código de exemplo, mas você pode adicionar código no outro idioma posteriormente. Isso cria uma estrutura de pastas default com código de exemplo para você começar.
-
Você pode view o código de exemplo no navegador ativo pipeline , localizado no lado esquerdo da workspace. Em
transformationsestão dois arquivos que geram um dataset pipeline cada. Emexplorationsencontra-se um Notebook que contém código para ajudar você view a saída do seu pipeline. Clicar em um arquivo permite view e editar o código no editor.O conjunto de dados de saída ainda não foi criado e o gráfico do pipeline no lado direito da tela está vazio.
-
Para executar o código pipeline (o código na pasta
transformations), clique em executar pipeline na parte superior direita da tela.Após a conclusão da execução, a parte inferior do workspace mostra as duas novas tabelas que foram criadas,
sample_users_<pipeline-name>esample_aggregation_<pipeline-name>. Você também pode ver que o gráfico do pipeline no lado direito do workspace agora mostra as duas tabelas, incluindo quesample_usersé a fonte parasample_aggregation.
a etapa 2: Aplicar verificações de qualidade de dados
Neste passo, você adiciona uma verificação de qualidade de dados à tabela sample_users . Você usa expectativas de pipeline para restringir os dados. Neste caso, você exclui todos os registros de usuários que não possuem um endereço email válido e gera a tabela limpa como users_cleaned.
-
No navegador ativo pipeline , clique
e selecione transformações .
-
Na caixa de diálogo Criar novo arquivo de transformações , faça as seguintes seleções:
- Escolha entre Python ou SQL como linguagem de programação . Esta opção não precisa corresponder à sua seleção anterior.
- Dê um nome ao arquivo. Neste caso, escolha
users_cleaned. - Para o caminho de destino , deixe o default.
- Para o tipo de conjunto de dados , deixe como "Nenhum selecionado" ou escolha " viewmaterializada" . Se você selecionar viewMaterializada , será gerado um código de exemplo para você.
-
No seu novo arquivo de código, edite o código para que corresponda ao seguinte (use SQL ou Python, com base na sua seleção na tela anterior). Substitua
<pipeline-name>pelo nome completo da sua tabelasample_users.
- SQL
- Python
-- Drop all rows that do not have an email address
CREATE MATERIALIZED VIEW users_cleaned
(
CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT *
FROM sample_users_<pipeline-name>;
from pyspark import pipelines as dp
# Drop all rows that do not have an email address
@dp.table
@dp.expect_or_drop("no null emails", "email IS NOT NULL")
def users_cleaned():
return (
spark.read.table("sample_users_<pipeline_name>")
)
- Clique em pipelinede execução para atualizar o pipeline. Agora deve ter três tabelas.
a etapa 3: Analisar os principais usuários
Em seguida, selecione os 100 principais usuários com base no número de reservas que realizaram. junte a tabela wanderbricks.bookings à view materializada users_cleaned .
-
No navegador ativo pipeline , clique
e selecione transformações .
-
Na caixa de diálogo Criar novo arquivo de transformações , faça as seguintes seleções:
- Escolha entre Python ou SQL como linguagem de programação . Isso não precisa corresponder às suas seleções anteriores.
- Dê um nome ao arquivo. Neste caso, escolha
users_and_bookings. - Para o caminho de destino , deixe o default.
- Para o tipo de conjunto de dados , deixe como "Nenhum selecionado" .
-
No seu novo arquivo de código, edite o código para que corresponda ao seguinte (use SQL ou Python, com base na sua seleção na tela anterior).
- SQL
- Python
-- Get the top 100 users by number of bookings
CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
FROM users_cleaned u
JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
GROUP BY u.name
ORDER BY booking_count DESC
LIMIT 100;
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, desc
# Get the top 100 users by number of bookings
@dp.table
def users_and_bookings():
return (
spark.read.table("users_cleaned")
.join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
.groupBy(col("name"))
.agg(count("booking_id").alias("booking_count"))
.orderBy(desc("booking_count"))
.limit(100)
)
-
Clique em pipelinede execução para atualizar o conjunto de dados. Quando a execução for concluída, você poderá ver no gráfico do pipeline que existem quatro tabelas, incluindo a nova tabela
users_and_bookings.
Próximos passos
Agora que você aprendeu a usar alguns dos recursos do editor LakeFlow Pipelines e criou um pipeline, aqui estão alguns outros recursos sobre os quais você pode aprender mais:
-
Ferramentas para trabalhar e processar transformações durante a criação de pipeline:
- Execução seletiva
- Pré-visualizações de dados
- DAG interativo (gráfico do conjunto de dados em seu pipeline)
-
Integração Databricks Ativo Bundles para colaboração eficiente, controle de versão e integração de CI/CD diretamente do editor: