tutorial: execução de uma lakehouse analítica de ponta a ponta pipeline
Este tutorial mostra como configurar um pipeline de análise de ponta a ponta para um Lakehouse do Databricks.
Este tutorial usa o notebook interativo para concluir tarefas comuns do ETL em Python em um cluster habilitado para Unity Catalog. Se o senhor não estiver usando o site Unity Catalog, consulte executar sua primeira carga de trabalho ETL em Databricks.
tarefa neste tutorial
No final deste artigo, você se sentirá confortável:
- Iniciando um clustering Unity Catalog habilitado para compute.
- Criando um Databricks Notebook.
- Gravação e leitura de dados de um local externo do Unity Catalog.
- Configuração da ingestão de dados incrementais em uma tabela do Unity Catalog com o Auto Loader.
- Execução de células do Notebook para processar, consultar e visualizar dados.
- Programar um notebook como um trabalho Databricks.
- Consulta de tabelas do Unity Catalog a partir do Databricks SQL
O Databricks oferece um conjunto de ferramentas prontas para produção que permitem aos profissionais de dados desenvolver e implantar rapidamente pipelines de extração, transformação e carga (ETL).O Unity Catalog permite que os administradores de dados configurem e protejam credenciais de armazenamento, locais externos e objetos de banco de dados para usuários em toda a organização. O Databricks SQL permite que os analistas executem consultas SQL nas mesmas tabelas usadas em cargas de trabalho ETL de produção, permitindo business intelligence em tempo real em escala.
O senhor também pode usar Delta Live Tables para criar o pipeline ETL. Databricks Criamos o site Delta Live Tables para reduzir a complexidade da criação, implantação e manutenção do pipeline de produção ETL. Veja o tutorial: Execute seu primeiro Delta Live Tables pipeline .
Requisitos
- O senhor está conectado ao site Databricks.
Se o senhor não tiver privilégios de controle de clustering, ainda poderá concluir a maioria das etapas abaixo, desde que tenha acesso a um clustering.
Etapa 1: Criar um clustering
Para realizar análise exploratória de dados e engenharia de dados, crie um cluster para fornecer os recursos de computação necessários para executar comandos.
- Clique em
Calcular na barra lateral.
- Clique em
New na barra lateral e selecione clustering . Isso abre a página New clustering/compute (Novo cluster/computação).
- Especifique um nome exclusivo para o cluster.
- Na seção de desempenho , selecione o botão de rádio Single node (Nó único ).
- Em Avançado , alterne a configuração do modo de acesso para Manual e selecione Dedicado .
- Em Usuário único ou grupo , selecione seu nome de usuário.
- Selecione a versão de Databricks runtime desejada, 11.1 ouacima para utilizar o catálogo Unity.1.
- Clique em Create compute para criar o clustering.
Para saber mais sobre o clustering Databricks, consulte computação.
Etapa 2: Criar um notebook Databricks
Para criar um notebook no seu workspace, clique em Novo na barra lateral e clique em Notebook . Um notebook em branco será aberto no workspace.
Para saber mais sobre como criar e gerenciar o Notebook, consulte gerenciar o Notebook.
Etapa 3: Gravar e ler dados de um local externo gerenciar por Unity Catalog
A Databricks recomenda o uso do Auto Loader para a ingestão de dados incrementais. O Auto Loader detecta e processa automaticamente novos arquivos à medida que eles chegam ao armazenamento de objetos na nuvem.
Use o Unity Catalog para gerenciar o acesso seguro a locais externos. Usuários ou entidades de serviço com permissões READ FILES
em um local externo podem usar Auto Loader para ingerir dados.
Normalmente, os dados chegarão a uma localização externa devido às gravações de outros sistemas.Nesta demonstração, você pode simular a chegada de dados escrevendo arquivos JSON em um local externo.
Copie o código abaixo em uma célula do note6.Substitua o valor da string de catalog
pelo nome de um catálogo com permissões CREATE CATALOG
e USE CATALOG
. Substitua o valor da string de external_location
pelo caminho de um local externo com permissões READ FILES
, WRITE FILES
e CREATE EXTERNAL TABLE
.
As localizações externas podem ser definidas como um contêiner de armazenamento inteiro, mas frequentemente apontam para um diretório dentro de um contêiner.
O formato correto para um caminho de localização externo é "s3://bucket-name/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
A execução dessa célula deve imprimir uma linha de 12 bytes, imprimir a cadeia de caracteres "Hello world!" e exibir todos os bancos de dados presentes no catálogo fornecido. Se não conseguir executar essa célula, confirme que o senhor está em um Unity Catalog habilitado workspace e solicite as devidas permissões ao administrador do workspace para concluir este tutorial.
O código Python abaixo usa seu endereço email para criar um banco de dados exclusivo no catálogo fornecido e um local de armazenamento exclusivo no local externo fornecido. A execução dessa célula removerá todos os dados associados a esse tutorial, permitindo que o senhor execute esse exemplo de forma idempotente. É definida e instanciada uma classe que o senhor usará para simular lotes de dados que chegam de um sistema conectado ao seu local externo de origem.
Copie este código para uma nova célula em seu notebook e execute-o para configurar seu ambiente.
As variáveis definidas neste código devem permitir que você o execute com segurança sem risco de entrar em conflito com ativos de workspace existentes ou outros usuários. Permissões restritas de rede ou armazenamento gerarão erros ao executar esse código; Entre em contato com o administrador do workspace para solucionar essas restrições.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Agora, o senhor pode obter uma grande quantidade de dados copiando o código a seguir em uma célula e executando-o. Você pode executar manualmente essa célula até 60 vezes para acionar a chegada de novos dados.
RawData.land_batch()
Etapa 4: Configurar o Auto Loader para ingerir dados no Unity Catalog
A Databricks recomenda o armazenamento de dados com o Delta Lake. O Delta Lake é uma camada de armazenamento de código aberto que fornece transações ACID e habilita o data lakehouse. Delta Lake é o formato default para tabelas criadas em Databricks.
Para configurar o Auto Loader para importar dados em uma tabela Unity Catalog, copie e cole o código abaixo em uma célula vazia do seu notebook:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Para saber mais sobre o Auto Loader, consulte O que é o Auto Loader?
Para saber mais sobre transmissão estruturada com Unity Catalog, consulte Usando Unity Catalog com transmissão estruturada.
Etapa 5: processar e interagir com os dados
Os notebooks executam a lógica célula por célula. Use estas etapas para executar a lógica em sua célula:
-
Para executar a célula concluída na etapa anterior, selecione a célula e pressione SHIFT+ENTER .
-
Para realizar consultas na tabela que você acabou de criar, copie e cole o código a seguir em uma célula vazia e, em seguida, pressione SHIFT+ENTER para executar a célula.
Pythondf = spark.read.table(table)
-
Para visualizar os dados em seu DataFrame, copie e cole o seguinte código em uma célula vazia e pressione SHIFT+ENTER para executar a célula.
Pythondisplay(df)
Para saber mais sobre as opções interativas de visualização de dados, consulte Visualizações em Databricks Notebook.
Etapa 6: programar um emprego
Você pode executar Databricks Notebook como scripts de produção, adicionando-os como uma tarefa em um Databricks Job. Nesta passo, você criará um novo Job que pode ser acionado manualmente.
Para programar seu notebook como uma tarefa:
- Clique em Agendar no lado direito da barra de cabeçalho.
- Insira um nome exclusivo para o Nome do job .
- Clique em Manual .
- No menu suspenso Cluster , selecione o cluster que você criou na etapa 1.
- Clique em Criar .
- Na janela exibida, clique em Executar agora .
- Para ver os resultados da execução do trabalho, clique no ícone
ao lado do registro de data e hora da última execução .
Para obter mais informações sobre o Job, consulte What are Job?
Etapa 7: Consultar a tabela do Databricks SQL
Qualquer pessoa com a permissão USE CATALOG
no catálogo atual, a permissão USE SCHEMA
no esquema atual e as permissões SELECT
na tabela pode consultar o conteúdo da tabela a partir de sua API Databricks preferida.
Você precisa ter acesso a um depósito SQL em execução para executar consultas no Databricks SQL.
A tabela que o senhor criou anteriormente neste tutorial tem o nome target_table
. Você pode consultá-lo usando o catálogo fornecido na primeira célula e o banco de dados com o padrão e2e_lakehouse_<your-username>
. Você pode usar o Catalog Explorer para encontrar os objetos de dados que você criou.
Integrações adicionais
Saiba mais sobre integrações e ferramentas para engenharia de dados com Databricks: