Tutorial: Crie um ETL pipeline com Apache Spark na plataforma Databricks
Este tutorial mostra aos senhores como desenvolver e implantar seu primeiro ETL (extract, transform, and load) pipeline para a obtenção de dados com o Apache Spark. Embora este tutorial use o Databricks para todos os fins compute, o senhor também pode usar o serverless compute se ele estiver habilitado para o seu workspace.
Você também pode usar o pipeline declarativo LakeFlow Spark para construir um pipeline ETL . O pipeline declarativo Databricks LakeFlow Spark reduz a complexidade de construir, implantar e manter um pipeline ETL de produção. Consulte o tutorial: Criar um pipeline ETL com o pipeline declarativo LakeFlow Spark.
Ao final deste artigo, o senhor saberá como fazer:
- Inicie um recurso compute de uso geral Databricks.
- Crie um notebook Databricks.
- Configure a ingestão incremental de dados para Delta Lake com Auto Loader.
- Processar e interagir com dados.
- Programar um Notebook como um Job Databricks.
Este tutorial utiliza notebooks interativos para realizar tarefas comuns de ETL em Python ou Scala.
O senhor também pode usar o provedorDatabricks Terraform para criar o recurso deste artigo. Consulte Criar clustering, Notebook e Job com Terraform.
Requisitos
- Você está conectado a um workspace do Databricks.
- Você tem permissão para criar um recurso compute.
Se você não tiver privilégios de controle compute , ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um recurso compute.
o passo 1: Crie um recurso compute
Para fazer análise exploratória de dados e engenharia de dados, crie um recurso compute para executar o comando.
Se seu workspace estiver habilitado para compute serverless, você poderá pular este o passo. Notebooks se conectam ao compute serverless automaticamente na execução do código, ou é possível selecionar Serverless no menu suspenso do compute. Consulte Compute serverless para Notebooks.
- Clique em
Calcular na barra lateral.
- Na página Computar, clique em Criar Computar .
- Especifique um nome exclusivo para o recurso compute , deixe os valores restantes em seus valores default e clique em Criar compute .
Para saber mais sobre Databricks compute, consulte compute.
Etapa 2: Criar um notebook Databricks
Para criar um notebook no seu workspace, clique em Novo na barra lateral e clique em Notebook . Um notebook em branco será aberto no workspace.
Para saber mais sobre como criar e gerenciar o Notebook, consulte gerenciar o Notebook.
Etapa 3: Configurar o Auto Loader para ingerir dados no Delta Lake
A Databricks recomenda o uso do Auto Loader para a ingestão de dados incrementais. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.
A Databricks recomenda o armazenamento de dados com o Delta Lake. O Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato default para tabelas criadas em Databricks.
Para configurar o Auto Loader para ingerir dados em uma tabela Delta Lake, copie e cole o código a seguir na célula vazia do seu notebook:
- Python
- Scala
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de entrar em conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do workspace para solucionar essas restrições.
Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?
Etapa 4: processar e interagir com os dados
Os notebooks executam a lógica célula por célula. Para executar a lógica em sua célula:
-
Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER .
-
Para consultar a tabela que o senhor acabou de criar, copie e cole o código a seguir em uma célula vazia e, em seguida, pressione SHIFT+ENTER para executar a célula.
- Python
- Scala
df = spark.read.table(table_name)
val df = spark.read.table(table_name)
- Para visualizar os dados em seu DataFrame, copie e cole o seguinte código em uma célula vazia e pressione SHIFT+ENTER para executar a célula.
- Python
- Scala
display(df)
display(df)
Para saber mais sobre as opções interativas de visualização de dados, consulte Visualizações no Databricks Notebook e no editor SQL.
Etapa 5: programar um emprego
Você pode executar Databricks Notebook como scripts de produção, adicionando-os como uma tarefa em um Databricks Job. Nesta passo, você criará um novo Job que pode ser acionado manualmente.
Caso esteja usando compute serverless, selecione Serverless no menu suspenso Compute em vez do recurso de compute d'o Passo 1.
Para programar seu notebook como uma tarefa:
- Clique em Agendar no lado direito da barra de cabeçalho.
- Insira um nome exclusivo para o Nome do job .
- Clique em Manual .
- Na lista suspensa de computação , selecione o recurso compute que você criou na etapa 1.
- Clique em Criar .
- Na janela exibida, clique em Executar agora .
- Para ver os resultados da execução do trabalho, clique no ícone
ao lado do registro de data e hora da última execução .
Para obter mais informações sobre o Job, consulte What are Job?
Integrações adicionais
Saiba mais sobre integrações e ferramentas para engenharia de dados com Databricks: