Pular para o conteúdo principal

Tutorial: Crie seu primeiro pipeline usando o Editor LakeFlow Pipelines

Aprenda como criar um novo pipeline usando LakeFlow Spark Declarative Pipeline (SDP) para orquestração de dados e Auto Loader. Este tutorial amplia o pipeline de exemplo, limpando os dados e criando uma consulta para encontrar os 100 principais usuários.

Neste tutorial, você aprenderá como usar o Editor LakeFlow Pipelines para:

  • Crie um novo pipeline com a estrutura de pastas default e comece com um conjunto de arquivos de exemplo.
  • Defina restrições de qualidade de dados usando expectativas.
  • Utilize o recurso do editor para estender o pipeline com novas transformações para realizar análises em seus dados.

Requisitos

Antes de começar este tutorial, você deve:

  • O usuário "Be" faz login em um workspace Databricks .
  • Ative Unity Catalog para seu workspace.
  • Ter permissão para criar um recurso compute ou acesso a um recurso compute .
  • Possuir permissões para criar um novo esquema em um catálogo. As permissões necessárias são ALL PRIVILEGES ou USE CATALOG e CREATE SCHEMA.

o passo 1: Criar um pipeline

Nesta etapa, você cria um pipeline usando a estrutura de pastas default e exemplos de código. Os exemplos de código fazem referência à tabela users na fonte de dados de exemplo wanderbricks .

  1. No seu workspace Databricks , clique em Ícone de mais (+). Novo , então Ícone de tubulação. pipelineETL . Isso abre o editor pipeline com um nome pipeline default como New Pipeline <date> <time>.

  2. (Opcional) Selecione o nome e insira um nome descritivo para o pipeline.

  3. (Opcional) À direita do nome, clique no catálogo e no esquema para definir valores padrão diferentes.

  4. (Opcional) No arquivo de origem my_transformation criado para você, selecione Python ou SQL na lista suspensa de idiomas para definir o idioma do arquivo.

  5. Clique Ícone de código. Use o código de exemplo .

    O código de exemplo no idioma selecionado aparece no arquivo fonte my_transformation na pasta transformations . O conjunto de dados de saída ainda não foi criado e o gráfico do pipeline no lado direito da tela está vazio.

  6. Para executar o código pipeline (o código na pasta transformations ), clique em executar pipeline na parte superior direita da tela.

    Após a conclusão da execução, a parte inferior do workspace mostra as duas novas tabelas que foram criadas, sample_users_<date_time> e sample_aggregation_<date_time>. O gráfico do pipeline no lado direito do workspace agora mostra as duas tabelas, incluindo que sample_users é a fonte para sample_aggregation. Anote o nome completo da tabela sample_users_<date_time> — você fará referência a ela no próximo passo.

a etapa 2: Aplicar verificações de qualidade de dados

Neste passo, você adiciona uma verificação de qualidade de dados à tabela sample_users . Você usa expectativas de pipeline para restringir os dados. Neste caso, você exclui todos os registros de usuários que não possuem um endereço email válido e gera a tabela limpa como users_cleaned.

  1. No navegador ativopipeline à esquerda, clique Ícone de mais (+). e selecione transformações .

  2. Na caixa de diálogo Criar novo arquivo de transformações , faça as seguintes seleções:

    • Escolha entre Python ou SQL como linguagem de programação . Esta opção não precisa corresponder à sua seleção anterior.
    • Dê um nome ao arquivo. Neste caso, escolha users_cleaned.
    • Para o caminho de destino , deixe o default.
    • Para o tipo de conjunto de dados , deixe como "Nenhum selecionado" ou escolha " viewmaterializada" . Se você selecionar viewMaterializada , será gerado um código de exemplo para você.
  3. Clique em Criar para criar o arquivo de código de transformações.

  4. No seu novo arquivo de código, edite o código para que corresponda ao seguinte (use SQL ou Python, com base na sua seleção na tela anterior). Substitua sample_users_<date_time> pelo nome completo da sua tabela sample_users da seção anterior.

SQL
-- Drop all rows that do not have an email address

CREATE MATERIALIZED VIEW users_cleaned
(
CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT *
FROM sample_users_<date_time>;
  1. Clique em pipelinede execução para atualizar o pipeline. Agora deve ter três tabelas.

a etapa 3: Analisar os principais usuários

Em seguida, selecione os 100 principais usuários com base no número de reservas que eles geraram. junte a tabela wanderbricks.bookings à view materializada users_cleaned .

  1. No navegador ativo pipeline à esquerda, clique Ícone de mais (+). e selecione transformações .

  2. Na caixa de diálogo Criar novo arquivo de transformações , faça as seguintes seleções:

    • Escolha entre Python ou SQL como linguagem de programação . Isso não precisa corresponder às suas seleções anteriores.
    • Dê um nome ao arquivo. Neste caso, escolha users_and_bookings.
    • Para o caminho de destino , deixe o default.
    • Para o tipo de conjunto de dados , deixe como "Nenhum selecionado" .
  3. Clique em Criar para criar o arquivo de código de transformações.

  4. No seu novo arquivo de código, edite o código para que corresponda ao seguinte (use SQL ou Python, com base na sua seleção na tela anterior).

SQL
-- Get the top 100 users by number of bookings

CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
FROM users_cleaned u
JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
GROUP BY u.name
ORDER BY booking_count DESC
LIMIT 100;
  1. Clique em pipelinede execução para atualizar o conjunto de dados. Quando a execução for concluída, você poderá ver no gráfico do pipeline que existem quatro tabelas, incluindo a nova tabela users_and_bookings .

    Gráfico de tubulação mostrando quatro mesas em pipeline

Próximos passos

Agora que você aprendeu a usar alguns dos recursos do editor LakeFlow Pipelines e criou um pipeline, aqui estão alguns outros recursos sobre os quais você pode aprender mais: