Execute sua primeira carga de trabalho ETL no Databricks

Saiba como usar ferramentas prontas para produção do Databricks para desenvolver e implementar seus primeiros pipelines de extração, transformação e carregamento (ETL) para orquestração de dados.

No final deste artigo, você se sentirá confortável:

  1. Iniciando clusters computepara todos os fins do Databricks.

  2. Criando um Notebook do Databricks.

  3. Configurando a ingestão de dados incrementais para Delta Lake com Auto Loader.

  4. Executando células Notebook para processar, query e visualizar dados.

  5. programar um Notebook como um Jobdo Databricks.

Este tutorial utiliza notebooks interativos para realizar tarefas comuns de ETL em Python ou Scala.

Você também pode usar Delta Live Tables para criar pipelines ETL. O Databricks criou o Delta Live Tables para reduzir a complexidade da criação, implantação e manutenção de pipelines ETL de produção. Consulte Tutorial: Execute seu primeiro pipeline das Delta Live Tables.

Você também pode usar o provedor Terraform do Databricks para criar os recursos deste artigo. Consulte Criar clusters, notebooks e trabalhos com o Terraform.

Requisitos

Observação

Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das passos abaixo, desde que tenha acesso a um cluster.

Etapa 1: criar um cluster

Para realizar análise exploratória de dados e engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.

  1. Clique em Ícone de computação Calcular na barra lateral.

  2. Na página Computação, clique em Criar cluster. Isso abre a página "Novo cluster".

  3. Especifique um nome exclusivo para o cluster, deixe os demais valores em seus estados padrão e clique em Criar Cluster.

Para saber mais sobre clusters Databricks, consulte Compute.

Etapa 2: criar um notebook do Databricks

Para começar a escrever e executar o código interativo no Databricks, crie um notebook.

  1. Clique Novo ícone Novo na barra lateral e clique em Notebook.

  2. Na página "Criar Notebook":

    • Especifique um nome exclusivo para o seu notebook.

    • Verifique se o idioma padrão está definido como Python ou Scala.

    • Selecione o cluster que você criou na etapa 1 no menu dropdown Cluster.

    • Clique em Criar.

Um notebook é aberto com uma célula vazia na parte superior.

Para saber mais sobre como criar e gerenciar notebooks, consulte Gerenciar notebooks.

Etapa 3: configurar o Auto Loader para ingerir dados no Delta Lake

O Databricks recomenda o uso do Auto Loader para a ingestão de dados incrementais. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.

O Databricks recomenda armazenar dados com o Delta Lake. O Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato padrão para tabelas criadas no Databricks.

Para configurar o Auto Loader para ingerir dados em uma tabela Delta Lake, copie e cole o código a seguir na célula vazia do seu notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))
// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Observação

As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de entrar em conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do workspace para solucionar essas restrições.

Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?.

Etapa 4: processar e interagir com os dados

Os notebooks executam a lógica célula por célula. Para executar a lógica em sua célula:

  1. Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER.

  2. Para realizar consultas na tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e, em seguida, pressione SHIFT+ENTER para executar a célula.

    df = spark.read.table(table_name)
    
    val df = spark.read.table(table_name)
    
  3. Para visualizar os dados em seu DataFrame, copie e cole o seguinte código em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    display(df)
    
    display(df)
    

Para saber mais sobre as opções interativas de visualização de dados, consulte Visualizações nos notebooks do Databricks.

Etapa 5: agendar um trabalho

Você pode executar Databricks Notebook como scripts de produção, adicionando-os como uma tarefa em um Databricks Job. Nesta passo, você criará um novo Job que pode ser acionado manualmente.

Para programar seu notebook como uma tarefa:

  1. Clique em Agendar no lado direito da barra de cabeçalho.

  2. Insira um nome exclusivo para o Nome do job.

  3. Clique em Manual.

  4. No menu suspenso Cluster , selecione o cluster que você criou na etapa 1.

  5. Clique em Criar.

  6. Na janela exibida, clique em Executar agora.

  7. Para ver os resultados da execução Job , clique no botão Link externo ícone ao lado do carimbo de data/hora da última execução .

Para obter mais informações sobre o Job, consulte O que são trabalhos do Databricks?.

Integrações adicionais

Saiba mais sobre integrações e ferramentas para engenharia de dados com Databricks: