Pular para o conteúdo principal

tutorial: execução de uma lakehouse analítica de ponta a ponta pipeline

Este tutorial mostra como configurar um pipeline de análise de ponta a ponta para um Lakehouse do Databricks.

important

Este tutorial usa o notebook interativo para concluir tarefas comuns do ETL em Python em um cluster habilitado para Unity Catalog. Se o senhor não estiver usando o site Unity Catalog, consulte executar sua primeira carga de trabalho ETL em Databricks.

tarefa neste tutorial

No final deste artigo, você se sentirá confortável:

  1. Iniciando um clustering Unity Catalog habilitado para compute.
  2. Criando um Databricks Notebook.
  3. Gravação e leitura de dados de um local externo do Unity Catalog.
  4. Configuração da ingestão de dados incrementais em uma tabela do Unity Catalog com o Auto Loader.
  5. Execução de células do Notebook para processar, consultar e visualizar dados.
  6. Programar um notebook como um trabalho Databricks.
  7. Consulta de tabelas do Unity Catalog a partir do Databricks SQL

O Databricks oferece um conjunto de ferramentas prontas para produção que permitem aos profissionais de dados desenvolver e implantar rapidamente pipelines de extração, transformação e carga (ETL).O Unity Catalog permite que os administradores de dados configurem e protejam credenciais de armazenamento, locais externos e objetos de banco de dados para usuários em toda a organização. O Databricks SQL permite que os analistas executem consultas SQL nas mesmas tabelas usadas em cargas de trabalho ETL de produção, permitindo business intelligence em tempo real em escala.

O senhor também pode usar Delta Live Tables para criar o pipeline ETL. Databricks Criamos o site Delta Live Tables para reduzir a complexidade da criação, implantação e manutenção do pipeline de produção ETL. Veja o tutorial: Execute seu primeiro Delta Live Tables pipeline .

Requisitos

  • O senhor está conectado ao site Databricks.
nota

Se o senhor não tiver privilégios de controle de clustering, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um clustering.

Etapa 1: Criar um clustering

Para realizar análise exploratória de dados e engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.

  1. Clique em ícone de computação Calcular na barra lateral.
  2. Clique em Novo ícone New na barra lateral e selecione clustering . Isso abre a página New clustering/compute (Novo cluster/computação).
  3. Especifique um nome exclusivo para o cluster.
  4. Na seção de desempenho , selecione o botão de rádio Single node (Nó único ).
  5. Em Avançado , alterne a configuração do modo de acesso para Manual e selecione Dedicado .
  6. Em Usuário único ou grupo , selecione seu nome de usuário.
  7. Selecione a versão de Databricks runtime desejada, 11.1 ouacima para utilizar o catálogo Unity.1.
  8. Clique em Create compute para criar o clustering.

Para saber mais sobre o clustering Databricks, consulte computação.

Etapa 2: Criar um notebook Databricks

Para criar um notebook no seu workspace, clique em Novo ícone Novo na barra lateral e clique em Notebook . Um notebook em branco será aberto no workspace.

Para saber mais sobre como criar e gerenciar o Notebook, consulte gerenciar o Notebook.

Etapa 3: Gravar e ler dados de um local externo gerenciar por Unity Catalog

A Databricks recomenda o uso do Auto Loader para a ingestão de dados incrementais. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.

Use o Unity Catalog para gerenciar o acesso seguro a locais externos. Usuários ou entidades de serviço com permissões READ FILES em um local externo podem usar Auto Loader para ingerir dados.

Normalmente, os dados chegarão a uma localização externa devido às gravações de outros sistemas.Nesta demonstração, você pode simular a chegada de dados escrevendo arquivos JSON em um local externo.

Copie o código abaixo em uma célula do note6.Substitua o valor da string de catalog pelo nome de um catálogo com permissões CREATE CATALOG e USE CATALOG . Substitua o valor da string de external_location pelo caminho de um local externo com permissões READ FILES, WRITE FILES e CREATE EXTERNAL TABLE .

As localizações externas podem ser definidas como um contêiner de armazenamento inteiro, mas frequentemente apontam para um diretório dentro de um contêiner.

O formato correto para um caminho de localização externo é "s3://bucket-name/path/to/external_location".

Python

external_location = "<your-external-location>"
catalog = "<your-catalog>"

dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")

display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

A execução dessa célula deve imprimir uma linha de 12 bytes, imprimir a cadeia de caracteres "Hello world!" e exibir todos os bancos de dados presentes no catálogo fornecido. Se não conseguir executar essa célula, confirme que o senhor está em um Unity Catalog habilitado workspace e solicite as devidas permissões ao administrador do workspace para concluir este tutorial.

O código Python abaixo usa seu endereço email para criar um banco de dados exclusivo no catálogo fornecido e um local de armazenamento exclusivo no local externo fornecido. A execução dessa célula removerá todos os dados associados a esse tutorial, permitindo que o senhor execute esse exemplo de forma idempotente. É definida e instanciada uma classe que o senhor usará para simular lotes de dados que chegam de um sistema conectado ao seu local externo de origem.

Copie este código para uma nova célula em seu notebook e execute-o para configurar seu ambiente.

nota

As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de entrar em conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do workspace para solucionar essas restrições.

Python

from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

def __init__(self, source):
self.source = source

def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date

def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)

def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)

def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)

RawData = LoadData(source)

Agora, o senhor pode obter uma grande quantidade de dados copiando o código a seguir em uma célula e executando-o. Você pode executar manualmente essa célula até 60 vezes para acionar a chegada de novos dados.

Python
RawData.land_batch()

Etapa 4: Configurar o Auto Loader para ingerir dados no Unity Catalog

A Databricks recomenda o armazenamento de dados com o Delta Lake. O Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato default para tabelas criadas em Databricks.

Para configurar o Auto Loader para importar dados em uma tabela Unity Catalog, copie e cole o código abaixo em uma célula vazia do seu notebook:

Python
# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))

Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?

Para saber mais sobre transmissão estruturada com Unity Catalog, consulte Usando Unity Catalog com transmissão estruturada.

Etapa 5: processar e interagir com os dados

Os notebooks executam a lógica célula por célula. Use estas etapas para executar a lógica em sua célula:

  1. Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER .

  2. Para realizar consultas na tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e, em seguida, pressione SHIFT+ENTER para executar a célula.

    Python
    df = spark.read.table(table)
  3. Para visualizar os dados em seu DataFrame, copie e cole o seguinte código em uma célula vazia e pressione SHIFT+ENTER para executar a célula.

    Python
    display(df)

Para saber mais sobre as opções interativas de visualização de dados, consulte Visualizações em Databricks Notebook.

Etapa 6: programar um emprego

Você pode executar Databricks Notebook como scripts de produção, adicionando-os como uma tarefa em um Databricks Job. Nesta passo, você criará um novo Job que pode ser acionado manualmente.

Para programar seu notebook como uma tarefa:

  1. Clique em Agendar no lado direito da barra de cabeçalho.
  2. Insira um nome exclusivo para o Nome do job .
  3. Clique em Manual .
  4. No menu suspenso Cluster , selecione o cluster que você criou na etapa 1.
  5. Clique em Criar .
  6. Na janela exibida, clique em Executar agora .
  7. Para ver os resultados da execução do trabalho, clique no ícone Link externo ao lado do registro de data e hora da última execução .

Para obter mais informações sobre o Job, consulte What are Job?

Etapa 7: Consultar a tabela do Databricks SQL

Qualquer pessoa com a permissão USE CATALOG no catálogo atual, a permissão USE SCHEMA no esquema atual e as permissões SELECT na tabela pode consultar o conteúdo da tabela a partir de sua API Databricks preferida.

Você precisa ter acesso a um depósito SQL em execução para executar consultas no Databricks SQL.

A tabela que o senhor criou anteriormente neste tutorial tem o nome target_table. Você pode consultá-lo usando o catálogo fornecido na primeira célula e o banco de dados com o padrão e2e_lakehouse_<your-username>. Você pode usar o Catalog Explorer para encontrar os objetos de dados que você criou.

Integrações adicionais

Saiba mais sobre integrações e ferramentas para engenharia de dados com Databricks: