execução sua primeira carga de trabalho ETL no Databricks
Saiba como usar ferramentas prontas para produção do Databricks para desenvolver e implementar seus primeiros pipelines de extração, transformação e carregamento (ETL) para orquestração de dados.
No final deste artigo, você se sentirá confortável:
- Lançamento de um Databricks clustering para todos os fins compute.
- Criando um Databricks Notebook.
- Configuração da ingestão de dados incrementais no Delta Lake com o Auto Loader.
- Execução de células do Notebook para processar, consultar e visualizar dados.
- Programar um notebook como um trabalho Databricks.
Este tutorial utiliza notebooks interativos para realizar tarefas comuns de ETL em Python ou Scala.
O senhor também pode usar a DLT para criar o pipeline ETL. Databricks criou a DLT para reduzir a complexidade da criação, implantação e manutenção do pipeline de produção ETL. Veja o tutorial: executando seu primeiro DLT pipeline.
O senhor também pode usar o provedorDatabricks Terraform para criar o recurso deste artigo. Consulte Criar clustering, Notebook e Job com Terraform.
Requisitos
- Você está conectado a um workspace do Databricks.
- O senhor tem permissão para criar um clustering.
Se o senhor não tiver privilégios de controle de clustering, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um clustering.
Etapa 1: Criar um clustering
Para realizar análise exploratória de dados e engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.
- Clique em
Calcular na barra lateral.
- Na página Computação, clique em Criar cluster . Isso abre a página "Novo cluster".
- Especifique um nome exclusivo para o cluster, deixe os demais valores em seus estados padrão e clique em Criar Cluster .
Para saber mais sobre o clustering Databricks, consulte computação.
Etapa 2: Criar um notebook Databricks
Para criar um notebook no seu workspace, clique em Novo na barra lateral e clique em Notebook . Um notebook em branco será aberto no workspace.
Para saber mais sobre como criar e gerenciar o Notebook, consulte gerenciar o Notebook.
Etapa 3: Configurar o Auto Loader para ingerir dados no Delta Lake
A Databricks recomenda o uso do Auto Loader para a ingestão de dados incrementais. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.
A Databricks recomenda o armazenamento de dados com o Delta Lake. O Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato default para tabelas criadas em Databricks.
Para configurar o Auto Loader para ingerir dados em uma tabela Delta Lake, copie e cole o código a seguir na célula vazia do seu notebook:
- Python
- Scala
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._
// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"
// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)
// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
.select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(Trigger.AvailableNow)
.toTable(table_name)
As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de entrar em conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do workspace para solucionar essas restrições.
Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?
Etapa 4: processar e interagir com os dados
Os notebooks executam a lógica célula por célula. Para executar a lógica em sua célula:
-
Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER .
-
Para realizar consultas na tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e, em seguida, pressione SHIFT+ENTER para executar a célula.
- Python
- Scala
df = spark.read.table(table_name)
val df = spark.read.table(table_name)
- Para visualizar os dados em seu DataFrame, copie e cole o seguinte código em uma célula vazia e pressione SHIFT+ENTER para executar a célula.
- Python
- Scala
display(df)
display(df)
Para saber mais sobre as opções interativas de visualização de dados, consulte Visualizações em Databricks Notebook.
Etapa 5: programar um emprego
Você pode executar Databricks Notebook como scripts de produção, adicionando-os como uma tarefa em um Databricks Job. Nesta passo, você criará um novo Job que pode ser acionado manualmente.
Para programar seu notebook como uma tarefa:
- Clique em Agendar no lado direito da barra de cabeçalho.
- Insira um nome exclusivo para o Nome do job .
- Clique em Manual .
- No menu suspenso Cluster , selecione o cluster que você criou na etapa 1.
- Clique em Criar .
- Na janela exibida, clique em Executar agora .
- Para ver os resultados da execução do trabalho, clique no ícone
ao lado do registro de data e hora da última execução .
Para obter mais informações sobre o Job, consulte What are Job?
Integrações adicionais
Saiba mais sobre integrações e ferramentas para engenharia de dados com Databricks: