Tutorial: Crie seu primeiro pipeline usando o Editor LakeFlow Pipelines
Aprenda como criar um novo pipeline usando LakeFlow Spark Declarative Pipeline (SDP) para orquestração de dados e Auto Loader. Este tutorial amplia o pipeline de exemplo, limpando os dados e criando uma consulta para encontrar os 100 principais usuários.
Neste tutorial, você aprenderá como usar o Editor LakeFlow Pipelines para:
- Crie um novo pipeline com a estrutura de pastas default e comece com um conjunto de arquivos de exemplo.
- Defina restrições de qualidade de dados usando expectativas.
- Utilize o recurso do editor para estender o pipeline com novas transformações para realizar análises em seus dados.
Requisitos
Antes de começar este tutorial, você deve:
- O usuário "Be" faz login em um workspace Databricks .
- Ative Unity Catalog para seu workspace.
- Ter permissão para criar um recurso compute ou acesso a um recurso compute .
- Possuir permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGESouUSE CATALOGeCREATE SCHEMA.
o passo 1: Criar um pipeline
Nesta etapa, você cria um pipeline usando a estrutura de pastas default e exemplos de código. Os exemplos de código fazem referência à tabela users na fonte de dados de exemplo wanderbricks .
-
No seu workspace Databricks , clique em
Novo , então
pipelineETL . Isso abre o editor pipeline com um nome pipeline default como
New Pipeline <date> <time>. -
(Opcional) Selecione o nome e insira um nome descritivo para o pipeline.
-
(Opcional) À direita do nome, clique no catálogo e no esquema para definir valores padrão diferentes.
-
(Opcional) No arquivo de origem
my_transformationcriado para você, selecione Python ou SQL na lista suspensa de idiomas para definir o idioma do arquivo. -
Clique
Use o código de exemplo .
O código de exemplo no idioma selecionado aparece no arquivo fonte
my_transformationna pastatransformations. O conjunto de dados de saída ainda não foi criado e o gráfico do pipeline no lado direito da tela está vazio. -
Para executar o código pipeline (o código na pasta
transformations), clique em executar pipeline na parte superior direita da tela.Após a conclusão da execução, a parte inferior do workspace mostra as duas novas tabelas que foram criadas,
sample_users_<date_time>esample_aggregation_<date_time>. O gráfico do pipeline no lado direito do workspace agora mostra as duas tabelas, incluindo quesample_usersé a fonte parasample_aggregation. Anote o nome completo da tabelasample_users_<date_time>— você fará referência a ela no próximo passo.
a etapa 2: Aplicar verificações de qualidade de dados
Neste passo, você adiciona uma verificação de qualidade de dados à tabela sample_users . Você usa expectativas de pipeline para restringir os dados. Neste caso, você exclui todos os registros de usuários que não possuem um endereço email válido e gera a tabela limpa como users_cleaned.
-
No navegador ativopipeline à esquerda, clique
e selecione transformações .
-
Na caixa de diálogo Criar novo arquivo de transformações , faça as seguintes seleções:
- Escolha entre Python ou SQL como linguagem de programação . Esta opção não precisa corresponder à sua seleção anterior.
- Dê um nome ao arquivo. Neste caso, escolha
users_cleaned. - Para o caminho de destino , deixe o default.
- Para o tipo de conjunto de dados , deixe como "Nenhum selecionado" ou escolha " viewmaterializada" . Se você selecionar viewMaterializada , será gerado um código de exemplo para você.
-
Clique em Criar para criar o arquivo de código de transformações.
-
No seu novo arquivo de código, edite o código para que corresponda ao seguinte (use SQL ou Python, com base na sua seleção na tela anterior). Substitua
sample_users_<date_time>pelo nome completo da sua tabelasample_usersda seção anterior.
- SQL
- Python
-- Drop all rows that do not have an email address
CREATE MATERIALIZED VIEW users_cleaned
(
CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT *
FROM sample_users_<date_time>;
from pyspark import pipelines as dp
# Drop all rows that do not have an email address
@dp.materialized_view
@dp.expect_or_drop("no null emails", "email IS NOT NULL")
def users_cleaned():
return (
spark.read.table("sample_users_<date_time>")
)
- Clique em pipelinede execução para atualizar o pipeline. Agora deve ter três tabelas.
a etapa 3: Analisar os principais usuários
Em seguida, selecione os 100 principais usuários com base no número de reservas que eles geraram. junte a tabela wanderbricks.bookings à view materializada users_cleaned .
-
No navegador ativo pipeline à esquerda, clique
e selecione transformações .
-
Na caixa de diálogo Criar novo arquivo de transformações , faça as seguintes seleções:
- Escolha entre Python ou SQL como linguagem de programação . Isso não precisa corresponder às suas seleções anteriores.
- Dê um nome ao arquivo. Neste caso, escolha
users_and_bookings. - Para o caminho de destino , deixe o default.
- Para o tipo de conjunto de dados , deixe como "Nenhum selecionado" .
-
Clique em Criar para criar o arquivo de código de transformações.
-
No seu novo arquivo de código, edite o código para que corresponda ao seguinte (use SQL ou Python, com base na sua seleção na tela anterior).
- SQL
- Python
-- Get the top 100 users by number of bookings
CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
FROM users_cleaned u
JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
GROUP BY u.name
ORDER BY booking_count DESC
LIMIT 100;
from pyspark import pipelines as dp
from pyspark.sql.functions import col, count, desc
# Get the top 100 users by number of bookings
@dp.materialized_view
def users_and_bookings():
return (
spark.read.table("users_cleaned")
.join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
.groupBy(col("name"))
.agg(count("booking_id").alias("booking_count"))
.orderBy(desc("booking_count"))
.limit(100)
)
-
Clique em pipelinede execução para atualizar o conjunto de dados. Quando a execução for concluída, você poderá ver no gráfico do pipeline que existem quatro tabelas, incluindo a nova tabela
users_and_bookings.
Próximos passos
Agora que você aprendeu a usar alguns dos recursos do editor LakeFlow Pipelines e criou um pipeline, aqui estão alguns outros recursos sobre os quais você pode aprender mais:
-
Ferramentas para trabalhar e processar transformações durante a criação de pipeline:
- Execução seletiva
- Pré-visualizações de dados
- Gráfico interativo pipeline (gráfico do conjunto de dados em seu pipeline)
-
Integração de pacotes de automação declarativa para colaboração eficiente, controle de versão e integração CI/CD diretamente do editor: