Tutorial: Crie um site ETL pipeline com o DLT
Aprenda a criar e implantar um ETL (extract, transform, and load) pipeline para a obtenção de dados usando DLT e Auto Loader. Um pipeline de ETL implementa as etapas para ler dados dos sistemas de origem, transformar esses dados com base nos requisitos, como verificações de qualidade de dados e eliminação de duplicação de registros, e gravar os dados em um sistema de destino, como um data warehouse ou um data lake.
Neste tutorial, o senhor usará o DLT e o Auto Loader para:
- Ingira dados de origem brutos em uma tabela de destino.
- Transformar os dados brutos de origem e gravar os dados transformados em duas visualizações materializadas de destino.
- Consulte os dados transformados.
- Automatize o ETL pipeline com um trabalho Databricks.
Para obter mais informações sobre DLT e Auto Loader, consulte DLT e O que é Auto Loader?
Requisitos
Para completar este tutorial, você deve atender aos seguintes requisitos:
- O senhor pode entrar em um site Databricks workspace.
- O senhor tem Unity Catalog habilitado para o seu workspace.
- O senhor tem serverless compute habilitado para o seu account. O pipeline sem servidor DLT não está disponível em todas as regiões workspace. Consulte recurso com disponibilidade regional limitada para obter uma lista das regiões disponíveis.
- Ter permissão para criar um recurso compute ou acesso a um recurso compute.
- Tenha permissões para criar um novo esquema em um catálogo. As permissões necessárias são
ALL PRIVILEGES
ouUSE CATALOG
eCREATE SCHEMA
. - Tenha permissões para criar um novo volume em um esquema existente. As permissões necessárias são
ALL PRIVILEGES
ouUSE SCHEMA
eCREATE VOLUME
.
Sobre o dataset
O site dataset usado neste exemplo é um subconjunto do conjunto de dados Million Song, uma coleção de recursos e metadados para faixas de música contemporânea. Esse dataset está disponível no conjunto de dados de amostra incluído em seu Databricks workspace.
Etapa 1: Criar um pipeline
Primeiro, o senhor criará um pipeline de ETL no DLT. DLT cria o pipeline resolvendo as dependências definidas no Notebook ou em arquivos (chamados de código-fonte ) usando a sintaxe DLT. Cada arquivo de código-fonte pode conter apenas um idioma, mas o senhor pode adicionar Notebook ou arquivos específicos de vários idiomas no site pipeline. Para saber mais, consulte DLT
Deixe o campo Código-fonte em branco para criar e configurar automaticamente um Notebook para criação de código-fonte.
Este tutorial usa serverless compute e Unity Catalog. Para todas as opções de configuração não especificadas, use as configurações do site default. Se o serverless compute não estiver habilitado ou não for compatível com o seu workspace, o senhor poderá concluir o tutorial conforme escrito usando as configurações do default compute . Se o senhor usar as configurações do default compute , deverá selecionar manualmente Unity Catalog em Storage options (Opções de armazenamento) na seção Destination (Destino ) da interface de usuário Create pipeline .
Para criar um novo pipeline de ETL no DLT, siga estas etapas:
- Na barra lateral, clique em pipeline .
- Clique em Create pipeline e ETL pipeline .
- Em nome do pipeline , digite um nome exclusivo pipeline.
- Marque a caixa de seleção sem servidor .
- No Destination , para configurar um local do Unity Catalog onde as tabelas são publicadas, selecione um catálogo existente e escreva um novo nome em Schema para criar um novo esquema em seu catálogo.
- Clique em Criar .
- Clique no link do Notebook do código-fonte sob o campo Código-fonte no painel de detalhes do pipeline .
A interface do usuário do pipeline é exibida para o novo pipeline.
Etapa 2: Desenvolver um pipeline de DLT
O Notebook só pode conter uma única linguagem de programação. Não misture os códigos Python e SQL no Notebook de código-fonte pipeline.
Nesta etapa, o senhor usará o Databricks Notebook para desenvolver e validar interativamente o código-fonte do pipeline DLT.
O código usa Auto Loader para ingestão incremental de dados. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem. Para saber mais, consulte O que é o Auto Loader?
Um Notebook de código-fonte em branco é criado e configurado automaticamente para o site pipeline. O Notebook é criado em um novo diretório no seu diretório de usuário. O nome do novo diretório e arquivo corresponde ao nome do seu pipeline. Por exemplo, /Users/someone@example.com/my_pipeline/my_pipeline
.
Ao desenvolver um pipeline de DLT, o senhor pode escolher entre Python ou SQL. Exemplos estão incluídos para os dois idiomas. Com base em sua escolha de idioma, certifique-se de selecionar o idioma do notebook default. Para saber mais sobre o suporte do Notebook para DLT pipeline desenvolvimento de código, consulte Desenvolver e depurar o pipeline ETL com um Notebook em DLT.
-
Um link para acessar esse Notebook está abaixo do campo Código-fonte no painel de detalhes do pipeline . Clique no link para abrir o Notebook antes de prosseguir para a próxima etapa.
-
Clique em Connect (Conectar ) no canto superior direito para abrir o menu de configuração do site compute.
-
Passe o mouse sobre o nome do pipeline que o senhor criou na Etapa 1.
-
Clique em Conectar .
-
Ao lado do título do Notebook na parte superior, selecione o idioma do Notebook default (Python ou SQL).
-
Copie e cole o código a seguir em uma célula do Notebook.
aba :::tab-item[Python]
Python# Import modules
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
# Define the path to source data
file_path = f"/databricks-datasets/songs/data-001/"
# Define a streaming table to ingest data from a volume
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
@dlt.table(
comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.option("inferSchema", True)
.load(file_path))
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="Million Song Dataset with data cleaned and prepared for analysis."
)
@dlt.expect("valid_artist_name", "artist_name IS NOT NULL")
@dlt.expect("valid_title", "song_title IS NOT NULL")
@dlt.expect("valid_duration", "duration > 0")
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of songs released by the artists each year who released most songs."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
):::
:::tab-item[sql]
SQL-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE songs_raw
(
artist_id STRING,
artist_lat DOUBLE,
artist_long DOUBLE,
artist_location STRING,
artist_name STRING,
duration DOUBLE,
end_of_fade_in DOUBLE,
key INT,
key_confidence DOUBLE,
loudness DOUBLE,
release STRING,
song_hotnes DOUBLE,
song_id STRING,
start_of_fade_out DOUBLE,
tempo DOUBLE,
time_signature INT,
time_signature_confidence DOUBLE,
title STRING,
year INT,
partial_sequence STRING,
value STRING
)
COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks."
AS SELECT *
FROM STREAM read_files(
'/databricks-datasets/songs/data-001/');
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW songs_prepared(
CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL),
CONSTRAINT valid_title EXPECT (song_title IS NOT NULL),
CONSTRAINT valid_duration EXPECT (duration > 0)
)
COMMENT "Million Song Dataset with data cleaned and prepared for analysis."
AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year
FROM songs_raw;
-- Define a materialized view that has a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year
COMMENT "A table summarizing counts of songs released by the artists each year who released most songs."
AS SELECT
artist_name,
year,
COUNT(*) AS total_number_of_songs
FROM songs_prepared
WHERE year > 0
GROUP BY artist_name, year
ORDER BY total_number_of_songs DESC, year DESC::: ::::
Etapa 3: consultar os dados transformados
Nesta etapa, o senhor consultará os dados processados no pipeline ETL para analisar os dados da música. Essas consultas usam os registros preparados criados na etapa anterior.
Primeiro, execute uma consulta que encontre os artistas que lançaram o maior número de músicas a cada ano, começando em 1990.
-
Na barra lateral, clique em
SQL Editor .
-
Clique no ícone
new tab e selecione Create new query (Criar nova consulta ) no menu.
-
Digite o seguinte:
SQL-- Which artists released the most songs each year in 1990 or later?
SELECT artist_name, total_number_of_songs, year
FROM <catalog>.<schema>.top_artists_by_year
WHERE year >= 1990
ORDER BY total_number_of_songs DESC, year DESCSubstitua
<catalog>
e<schema>
pelo nome do catálogo e do esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.top_artists_by_year
. -
Clique em executar selecionado .
Agora, execute outra consulta que encontre músicas com uma batida 4/4 e ritmo dançante.
-
Clique no
novo ícone de toque e selecione Criar nova consulta no menu.
-
Insira o código a seguir:
SQL-- Find songs with a 4/4 beat and danceable tempo
SELECT artist_name, song_title, tempo
FROM <catalog>.<schema>.songs_prepared
WHERE time_signature = 4 AND tempo between 100 and 140;Substitua
<catalog>
e<schema>
pelo nome do catálogo e do esquema em que a tabela está. Por exemplo,data_pipelines.songs_data.songs_prepared
. -
Clique em executar selecionado .
Etapa 4: Criar um trabalho para executar o DLT pipeline
Em seguida, crie um fluxo de trabalho para automatizar a execução das etapas de ingestão de dados, processamento e análise usando um Databricks Job.
- Em seu site workspace, clique em
fluxo de trabalho na barra lateral e clique em Create Job .
- Na caixa de título da tarefa, substitua New Job pelo nome do <date and time> trabalho. Por exemplo,
Songs workflow
. - Em Nome da tarefa , insira um nome para a primeira tarefa, por exemplo,
ETL_songs_data
. - Em Type , selecione pipeline .
- No pipeline , selecione o DLT pipeline que o senhor criou na etapa 1.
- Clique em Criar .
- Para executar o fluxo de trabalho, clique em executar Now . Para acessar view os detalhes da execução , clique em tab. Clique na tarefa para acessar view detalhes da execução da tarefa.
- Para view os resultados quando o fluxo de trabalho for concluído, clique em Go to the latest successful execution (Ir para a última execução bem-sucedida ) ou no Começar time (Hora de início ) para a execução do trabalho. A página Saída é exibida e exibe os resultados da consulta.
Para obter mais informações sobre a execução de trabalhos, consulte monitoramento e observabilidade para Databricks Jobs.
Etapa 5: programar o trabalho DLT pipeline
Para executar o ETL pipeline em um programador, siga estas etapas:
- Clique em
fluxo de trabalho na barra lateral.
- Na coluna Nome , clique no nome do trabalho. O painel lateral exibe os detalhes do trabalho .
- Clique em Add trigger (Adicionar acionador ) no painel Programar & Triggers (Acionadores ) e selecione Scheduled (Programado ) em Trigger type (Tipo de acionador) .
- Especifique o período, a hora inicial e o fuso horário.
- Clique em Salvar .
Saiba mais
- Para saber mais sobre o pipeline de processamento de dados com DLT, consulte DLT
- Para saber mais sobre o Databricks Notebook, consulte Introdução ao Databricks Notebook.
- Para saber mais sobre Databricks Jobs, consulte What are Job?
- Para saber mais sobre o Delta Lake, consulte O que é o Delta Lake?