Pular para o conteúdo principal

Criar um pipeline de dados de ponta a ponta na Databricks

Este artigo demonstra como criar e implementar um pipeline de processamento de dados de ponta a ponta, abrangendo desde a ingestão de dados brutos, a transformação dos dados e a realização de análises nos dados processados.

nota

Embora este artigo demonstre como criar um pipeline de dados completo usando o Databricks Notebook e um Databricks Job para orquestrar um fluxo de trabalho, o Databricks recomenda o uso do DLT, uma interface declarativa para criar um pipeline de processamento de dados confiável, sustentável e testável.

O que é um pipeline de dados?

Um pipeline de dados implementa as etapas necessárias para mover dados dos sistemas de origem, transformar esses dados com base nos requisitos e armazenar os dados em um sistema de destino. Um pipeline de dados inclui todos os processos necessários para transformar dados brutos em dados preparados que os usuários podem consumir. Por exemplo, um pipeline de dados pode preparar os dados para que os analistas de dados e cientistas de dados possam extrair valor dos dados por meio de análises e relatórios.

Um fluxo de trabalho de extração, transformação e carregamento (ETL) é um exemplo comum de um pipeline de dados. No processamento ETL, os dados são coletados dos sistemas de origem e armazenados em uma área de estágio, são transformados conforme os requisitos (garantindo qualidade dos dados, eliminando registros duplicados, entre outros) e, em seguida, são gravados em um sistema de destino, como um data warehouse ou um data lake.

Etapas do pipeline de dados

Para ajudá-lo a começar a criar pipelines de dados no Databricks, o exemplo incluído neste artigo mostra a criação de um fluxo de trabalho de processamento de dados:

  • Use os recursos do Databricks para explorar um conjunto de dados brutos.
  • Crie um notebook Databricks para fazer a ingestão de dados de origem brutos e escrever os dados brutos em uma tabela de destino.
  • Crie um notebook para transformar os dados de origem brutos e escreva os dados transformados em uma tabela de destino.
  • Crie um notebook Databricks para consultar os dados transformados.
  • Automatize o pipeline de dados com um trabalho do Databricks.

Requisitos

Exemplo: Million Song dataset

O site dataset usado neste exemplo é um subconjunto do conjunto de dados Million Song, uma coleção de recursos e metadados para faixas de música contemporânea. Esse dataset está disponível no conjunto de dados de amostra incluído em seu Databricks workspace.

Etapa 1: Criar um recurso compute

Para realizar o processamento e a análise de dados neste exemplo, crie um recurso compute para executar o comando.

nota

Como este exemplo usa uma amostra de dataset armazenada em DBFS e recomenda a persistência de tabelas em Unity Catalogo senhor cria um recurso compute configurado com o modo de acesso dedicado. O modo de acesso dedicado fornece acesso total ao DBFS e também permite o acesso ao Unity Catalog. Consulte Práticas recomendadas para DBFS e Unity Catalog.

  1. Clique em Calcular na barra lateral.
  2. Na página Computar, clique em Criar Computar .
  3. Na nova página compute, digite um nome exclusivo para o recurso compute.
  4. Em Avançado , alterne a configuração do modo de acesso para Manual e selecione Dedicado .
  5. Em Usuário único ou grupo , selecione seu nome de usuário.
  6. Deixe os valores restantes em seu estado default e clique em Create .

Para saber mais sobre Databricks compute recurso, consulte computar.

Etapa 2: explorar os dados de origem

Para saber como usar a interface do Databricks para explorar os dados brutos de origem, consulte Explorar os dados de origem de um pipeline de dados. Se o senhor quiser ir diretamente para a ingestão e preparação dos dados, continue na Etapa 3: Ingestão dos dados brutos.

Etapa 3: ingestão dos dados brutos

Nessa etapa, o senhor carrega os dados brutos em uma tabela para torná-los disponíveis para processamento posterior. Para gerenciar dados ativos na plataforma Databricks, como tabelas, o site Databricks recomenda Unity Catalog. No entanto, se o senhor não tiver permissões para criar o catálogo e o esquema necessários para publicar tabelas em Unity Catalog, ainda assim poderá concluir as etapas a seguir publicando tabelas em Hive metastore.

Para ingerir dados, a Databricks recomenda o uso do Auto Loader. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.

Você pode configurar o Auto Loader para detectar automaticamente o esquema dos dados carregados, permitindo que você inicialize tabelas sem declarar explicitamente o esquema dos dados e adapte o esquema da tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear manualmente e aplicar alterações de esquema ao longo do tempo. Databricks recomenda a inferência de esquema ao usar Auto Loader. No entanto, como visto na passo de exploração de dados, os dados das músicas não contêm informações de cabeçalho. Como o cabeçalho não é armazenado com os dados, você precisará definir explicitamente o esquema, conforme mostrado no próximo exemplo.

  1. Na barra lateral, clique em Novo ícone New e selecione Notebook no menu. A caixa de diálogo Create Notebook é exibida.

  2. Insira um nome para o notebook, por exemplo, Ingest songs data. Por padrão:

    • Python é a linguagem selecionada.
    • O Notebook está anexado ao último recurso compute que o senhor utilizou. Nesse caso, o recurso que o senhor criou na Etapa 1: Criar um recurso compute.
  3. Insira o seguinte na primeira célula do notebook:

    Python
    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"

    schema = StructType(
    [
    StructField("artist_id", StringType(), True),
    StructField("artist_lat", DoubleType(), True),
    StructField("artist_long", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("end_of_fade_in", DoubleType(), True),
    StructField("key", IntegerType(), True),
    StructField("key_confidence", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("release", StringType(), True),
    StructField("song_hotnes", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("start_of_fade_out", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", DoubleType(), True),
    StructField("time_signature_confidence", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("partial_sequence", IntegerType(), True)
    ]
    )

    (spark.readStream
    .format("cloudFiles")
    .schema(schema)
    .option("cloudFiles.format", "csv")
    .option("sep","\t")
    .load(file_path)
    .writeStream
    .option("checkpointLocation", checkpoint_path)
    .trigger(availableNow=True)
    .toTable(table_name)
    )

    Se você estiver usando o Unity Catalog, <table-name> substitua por um catálogo, esquema e nome de tabela para conter os registros ingeridos (por exemplo, data_pipelines.songs_data.raw_song_data). Caso contrário, <table-name> substitua pelo nome de uma tabela para conter os registros ingeridos, por exemplo, raw_song_data.

    Substitua <checkpoint-path> por um caminho para um diretório no DBFS para manter arquivos de ponto de verificação, por exemplo, /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Clique em Menu Executar e selecione executar Cell . Este exemplo define o esquema de dados usando as informações do site README, ingere os dados das músicas de todos os arquivos contidos em file_path e grava os dados na tabela especificada por table_name.

Etapa 4: Preparar os dados brutos

Para preparar os dados brutos para análise, os seguintes passos transformam os dados brutos das músicas filtrando as colunas não necessárias e adicionando um novo campo contendo um registro de data e hora para a criação do novo registro.

  1. Na barra lateral, clique em Novo ícone New e selecione Notebook no menu. A caixa de diálogo Create Notebook é exibida.

  2. Insira um nome para o notebook. Por exemplo, Prepare songs data. Altere o idioma padrão para SQL.

  3. Insira o seguinte na primeira célula do notebook:

    SQL
    CREATE OR REPLACE TABLE
    <table-name> (
    artist_id STRING,
    artist_name STRING,
    duration DOUBLE,
    release STRING,
    tempo DOUBLE,
    time_signature DOUBLE,
    title STRING,
    year DOUBLE,
    processed_time TIMESTAMP
    );

    INSERT INTO
    <table-name>
    SELECT
    artist_id,
    artist_name,
    duration,
    release,
    tempo,
    time_signature,
    title,
    year,
    current_timestamp()
    FROM
    <raw-songs-table-name>

    Se você estiver utilizando o Unity Catalog, substitua <table-name> por um catálogo, esquema e nome da tabela para conter os registros filtrados e transformados (por exemplo, data_pipelines.songs_data.prepared_song_data). Caso contrário, substitua <table-name> pelo nome de uma tabela para conter os registros filtrados e transformados (por exemplo, prepared_song_data).

    Substitua <raw-songs-table-name> pelo nome da tabela que contém os registros de músicas brutas ingeridas na passo anterior.

  4. Clique em Menu Executar e selecione executar Cell .

Etapa 5: consultar os dados transformados

Nesta etapa, você estende o pipeline de processamento adicionando uma consulta para analisar os dados das músicas. Essas consultas usam os registros preparados criados na passo anterior.

  1. Na barra lateral, clique em Novo ícone New e selecione Notebook no menu. A caixa de diálogo Create Notebook é exibida.

  2. Insira um nome para o notebook. Por exemplo, Analyze songs data. Altere o idioma padrão para SQL.

  3. Insira o seguinte na primeira célula do notebook:

    SQL
    -- Which artists released the most songs each year?
    SELECT
    artist_name,
    count(artist_name)
    AS
    num_songs,
    year
    FROM
    <prepared-songs-table-name>
    WHERE
    year > 0
    GROUP BY
    artist_name,
    year
    ORDER BY
    num_songs DESC,
    year DESC

    Substitua <prepared-songs-table-name> pelo nome da tabela que contém os dados preparados. Por exemplo, data_pipelines.songs_data.prepared_song_data.

  4. Clique em Abaixo do cursor no menu de ações da célula, selecione Add Cell Below (Adicionar célula abaixo ) e digite o seguinte na nova célula:

    SQL
     -- Find songs for your DJ list
    SELECT
    artist_name,
    title,
    tempo
    FROM
    <prepared-songs-table-name>
    WHERE
    time_signature = 4
    AND
    tempo between 100 and 140;

    Substitua <prepared-songs-table-name> pelo nome da tabela preparada criada na passo anterior. Por exemplo, data_pipelines.songs_data.prepared_song_data.

  5. Para executar as consultas e visualizar a saída, clique em Executar tudo .

Etapa 6: Crie um trabalho no site Databricks para executar o pipeline

É possível criar um fluxo de trabalho para automatizar a execução das etapas de ingestão, processamento e análise de dados usando um trabalho Databricks.

  1. Em seu workspace do Data Science & Engineering, faça um dos seguintes:

    • Clique em fluxo de trabalho Icon fluxo de trabalho na barra lateral e clique em Botão criar job.
    • Na barra lateral, clique em Novo ícone Novo e selecione Job .
  2. Na caixa de diálogo da tarefa na guia Tarefas , substitua Adicionar um nome para o seu trabalho… pelo nome do seu trabalho. Por exemplo, "Fluxo de trabalho de músicas".

  3. Em Nome da tarefa , insira um nome para a primeira tarefa, por exemplo, Ingest_songs_data.

  4. Em Tipo , selecione o tipo de tarefa Notebook .

  5. Em Origem , selecione workspace .

  6. No campo Path (Caminho ), use o navegador de arquivos para localizar o Notebook de ingestão de dados e clique em Confirm (Confirmar ).

  7. Em computar , selecione o recurso compute que o senhor criou na etapa Create a compute resource.

  8. Clique em Criar .

  9. Clique em Botão adicionar tarefa abaixo da tarefa que o senhor acabou de criar e selecione Notebook .

  10. Em Nome da tarefa , insira um nome para a tarefa, por exemplo, Prepare_songs_data.

  11. Em Tipo , selecione o tipo de tarefa Notebook .

  12. Em Origem , selecione workspace .

  13. Use o navegador de arquivos para localizar o notebook de preparação de dados, clique no nome do notebook e clique em Confirmar .

  14. Em computar , selecione o recurso compute que o senhor criou na etapa Create a compute resource.

  15. Clique em Criar .

  16. Clique em Botão adicionar tarefa abaixo da tarefa que o senhor acabou de criar e selecione Notebook .

  17. Em Nome da tarefa , insira um nome para a tarefa, por exemplo, Analyze_songs_data.

  18. Em Tipo , selecione o tipo de tarefa Notebook .

  19. Em Origem , selecione workspace .

  20. Use o navegador de arquivos para localizar o notebook de análise de dados, clique no nome do notebook e clique em Confirmar .

  21. Em computar , selecione o recurso compute que o senhor criou na etapa Create a compute resource.

  22. Clique em Criar .

  23. Para executar o fluxo de trabalho, clique em Botão executar agora. Para view os detalhes da execução, clique no link na coluna de tempo de início da execução no Job execução view. Clique em cada tarefa para acessar view detalhes da execução da tarefa.

  24. Para visualizar os resultados quando o fluxo de trabalho for concluído, clique na tarefa final de análise de dados. A página Saída aparece e exibe os resultados da consulta.

Etapa 7: programar o pipeline de dados Job

nota

Para demonstrar o uso de um trabalho do Databricks para orquestrar um fluxo de trabalho programado, este exemplo de como começar separa as etapas de ingestão, preparação e análise em notebooks separados, e cada notebook é usado para criar uma tarefa no trabalho. Se todo o processamento estiver contido em um único Notebook, o senhor poderá programar facilmente o Notebook diretamente da UI do Notebook Databricks. Consulte Criar e gerenciar o trabalho agendado do Notebook.

Um requisito comum é executar um pipeline de dados de forma programada. Para definir um agendamento para o trabalho que executa o pipeline:

  1. Clique em fluxo de trabalho Icon fluxo de trabalho na barra lateral.
  2. Na coluna Nome , clique no nome do trabalho. O painel lateral exibe os detalhes do trabalho .
  3. Clique em Adicionar acionador no painel Detalhes do trabalho e selecione Programado no Tipo de acionador .
  4. Especifique o período, a hora inicial e o fuso horário. Opcionalmente, marque a caixa de seleção Mostrar Cron Syntax para exibir e editar a programação na Sintaxe Quartz Cron.
  5. Clique em Salvar .

Saiba mais