Tutorial: Executar seu primeiro pipeline Delta Live Tables
Este tutorial conduz o senhor pelos passos para configurar seu primeiro Delta Live Tables pipeline, escrever o código básico ETL e executar uma atualização pipeline.
Todos os passos neste tutorial foram projetados para o espaço de trabalho com o Unity Catalog ativado. O senhor também pode configurar o pipeline Delta Live Tables para trabalhar com o legado Hive metastore. Consulte Usar o pipeline Delta Live Tables com o legado Hive metastore.
Observação
Este tutorial contém instruções para o desenvolvimento e a validação do novo código pipeline usando o Databricks Notebook. O senhor também pode configurar o pipeline usando o código-fonte nos arquivos Python ou SQL.
O senhor pode configurar um pipeline para executar seu código se já tiver um código-fonte escrito usando a sintaxe do Delta Live Tables. Consulte Configurar um pipeline do Delta Live Tables.
O senhor pode usar a sintaxe SQL totalmente declarativa em Databricks SQL para registrar e definir refresh programar a visualização materializada e as tabelas de transmissão como objetos do Unity Catalog-gerenciar. Consulte Use materialized view em Databricks SQL e Load uso de dados transmission tables em Databricks SQL.
Exemplo: Ingerir e processar dados de nomes de bebês de Nova York
O exemplo deste artigo usa um site disponível publicamente dataset que contém registros de nomes de bebês do Estado de Nova York. Este exemplo demonstra o uso de um pipeline do Delta Live Tables para:
Ler dados CSV brutos de um volume em uma tabela.
Leia os registros da tabela de ingestão e use as expectativas do Delta Live Tables para criar uma nova tabela que contenha dados limpos.
Use os registros limpos como entrada para as consultas Delta Live Tables que criam conjuntos de dados derivados.
Este código demonstra um exemplo simplificado da arquitetura medalhão. Consulte Qual é a arquitetura medalhão do lakehouse?.
As implementações desse exemplo são fornecidas para Python e SQL. Siga as etapas para criar um novo pipeline e um Notebook e, em seguida, copie e cole o código fornecido.
Também é fornecido um notebook de exemplo com código completo.
Requisitos
Para iniciar um pipeline, o senhor deve ter permissão de criação de clusters ou acesso a uma política de cluster que defina clusters do Delta Live Tables. O tempo de execução do Delta Live Tables cria um cluster antes de executar o pipeline e falha se o senhor não tiver a permissão correta.
Todos os usuários podem acionar atualizações usando o pipeline serverless por default. O serverless deve ser ativado no nível account e pode não estar disponível em sua região workspace. Consulte Ativar serverless compute .
Os exemplos deste tutorial usam o Unity Catalog. Databricks recomenda a criação de um novo esquema para executar esse tutorial, pois vários objetos de banco de dados são criados no esquema de destino.
Para criar um novo esquema em um catálogo, você deve ter os privilégios
ALL PRIVILEGES
ouUSE CATALOG
eCREATE SCHEMA
.Se o senhor não puder criar um novo esquema, execute este tutorial em um esquema existente. Você deve ter os seguintes privilégios:
USE CATALOG
para o catálogo principal.ALL PRIVILEGES
ou privilégiosUSE SCHEMA
,CREATE MATERIALIZED VIEW
eCREATE TABLE
no esquema de destino.
Este tutorial usa um volume para armazenar dados de amostra. A Databricks recomenda a criação de um novo volume para este tutorial. Se o senhor criar um novo esquema para este tutorial, poderá criar um novo volume nesse esquema.
Para criar um novo volume em um esquema existente, você deve ter os seguintes privilégios:
USE CATALOG
para o catálogo principal.ALL PRIVILEGES
ou privilégiosUSE SCHEMA
eCREATE VOLUME
no esquema de destino.
Opcionalmente, você pode usar um volume existente. Você deve ter os seguintes privilégios:
USE CATALOG
para o catálogo principal.USE SCHEMA
para o esquema principal.ALL PRIVILEGES
ouREAD VOLUME
eWRITE VOLUME
no volume alvo.
Para definir essas permissões, entre em contato com o administrador da Databricks. Para saber mais sobre os privilégios do Unity Catalog, consulte Privilégios e objetos protegíveis do Unity Catalog.
o passo 0: download de dados
Este exemplo carrega dados de um volume do Unity Catalog. O código a seguir downloads um arquivo CSV e o armazena no volume especificado. Abra um novo Notebook e execute o seguinte código para download esses dados no volume especificado:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
Substitua <catalog-name>
, <schema-name>
e <volume-name>
pelos nomes de catálogo, esquema e volume de um volume do Unity Catalog. O código fornecido tentará criar o esquema e o volume especificados se esses objetos não existirem. O senhor deve ter os privilégios adequados para criar e gravar em objetos no Unity Catalog. Consulte os requisitos.
Observação
Certifique-se de que este Notebook foi executado com sucesso antes de continuar com o site tutorial. Não configure este Notebook como parte de seu pipeline.
o passo 1: Criar um pipeline
Delta Live Tables cria o pipeline resolvendo as dependências definidas no Notebook ou em arquivos (chamados de código-fonte) usando a sintaxe Delta Live Tables. Cada arquivo de código-fonte pode conter apenas um idioma, mas o senhor pode adicionar Notebook ou arquivos específicos de vários idiomas no site pipeline.
Importante
Não configure nenhum ativo no campo Código-fonte. Deixar esse campo em branco cria e configura um Notebook para criação de código-fonte.
As instruções neste tutorial usam serverless compute e Unity Catalog. Use as configurações do site default para todas as opções de configuração não especificadas nestas instruções.
Observação
Se o serverless não estiver habilitado ou não for compatível com o seu workspace, o senhor poderá concluir o tutorial conforme escrito usando as configurações do default compute . O senhor deve selecionar manualmente o Unity Catalog em Storage options (Opções de armazenamento) na seção Destination (Destino ) da interface de usuário Create pipeline (Criar pipeline ).
Para configurar um novo pipeline, faça o seguinte:
Na barra lateral, clique em Delta Live Tables.
Clique em Create pipeline (Criar pipeline).
No nome do pipeline, digite um nome exclusivo pipeline.
Marque a caixa de seleção sem servidor.
Em Destination, para configurar um local do Unity Catalog onde as tabelas são publicadas, selecione um Catalog e um Schema.
Em Advanced (Avançado), clique em Add configuration (Adicionar configuração ) e, em seguida, defina os parâmetros pipeline para o catálogo, o esquema e o volume para os quais o senhor faz download do uso de dados com os seguintes nomes de parâmetros:
my_catalog
my_schema
my_volume
Clique em Criar.
A interface do usuário do pipeline é exibida para o novo pipeline. Um Notebook de código-fonte é criado e configurado automaticamente para o site pipeline.
O Notebook é criado em um novo diretório no seu diretório de usuário. O nome do novo diretório e arquivo corresponde ao nome do seu pipeline. Por exemplo, /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Um link para acessar esse Notebook está abaixo do campo Código-fonte no painel de detalhes do pipeline. Clique no link para abrir o Notebook antes de prosseguir para o próximo passo.
o passo 2: Declarar a visualização materializada e as tabelas de transmissão em um Notebook com Python ou SQL
O senhor pode usar o Datbricks Notebook para desenvolver e validar interativamente o código-fonte do pipeline Delta Live Tables. O senhor deve anexar o Notebook ao site pipeline para usar essa funcionalidade. Para anexar o Notebook recém-criado ao site pipeline que o senhor acabou de criar:
Clique em Connect (Conectar ) no canto superior direito para abrir o menu de configuração do site compute.
Passe o mouse sobre o nome do site pipeline que o senhor criou no passo 1.
Clique em Conectar.
A interface do usuário muda para incluir os botões Validate e Começar no canto superior direito. Para saber mais sobre o suporte do Notebook para o desenvolvimento do código pipeline, consulte Desenvolver e depurar o pipeline Delta Live Tables no Notebook.
Importante
Delta Live Tables O pipeline avalia todas as células de um Notebook durante o planejamento. Diferentemente do Notebook, que é executado no site compute ou agendado como Job, o pipeline não garante que as células sejam executadas na ordem especificada.
O Notebook só pode conter uma única linguagem de programação. Não misture os códigos Python e SQL no Notebook de código-fonte pipeline.
Para obter detalhes sobre o desenvolvimento de código com Python ou SQL, consulte Desenvolver código de pipeline com Python ou Desenvolver código de pipeline com SQL.
Exemplo de código de pipeline
Para implementar o exemplo neste tutorial, copie e cole o código a seguir em uma célula do Notebook configurada como código-fonte para o seu pipeline.
O código fornecido faz o seguinte:
Importa os módulos necessários (somente Python).
Faz referência aos parâmetros definidos durante a configuração do pipeline.
Define uma tabela de transmissão denominada
baby_names_raw
que é ingerida a partir de um volume.Define um view materializado chamado
baby_names_prepared
que valida os dados ingeridos.Define um view materializado chamado
top_baby_names_2021
que tem um view altamente refinado dos dados.
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
o passo 3: começar a pipeline update
Para começar uma atualização do pipeline, clique no botão começar no canto superior direito da UI do Notebook.
Notebooks de Exemplo
O Notebook a seguir contém os mesmos exemplos de código fornecidos neste artigo. Esses Notebooks têm os mesmos requisitos que os passos deste artigo. Consulte os requisitos.
Para importar um Notebook, conclua os seguintes passos:
Abra a interface do usuário do Notebook.
Clique em + New > Notebook.
Um Notebook vazio é aberto.
Clique em Arquivo > Importar... A caixa de diálogo Importar é exibida.
Selecione a opção URL para Importar de.
Cole o URL do Notebook.
Clique em Importar.
Este tutorial requer que o senhor execute um Notebook de configuração de dados antes de configurar e executar seu Delta Live Tables pipeline. Importe o Notebook a seguir, anexe o Notebook a um compute recurso, preencha a variável necessária para my_catalog
, my_schema
e my_volume
e clique em executar tudo.
O Notebook a seguir fornece exemplos em Python ou SQL. Quando o senhor importa um Notebook, ele é salvo no diretório inicial do usuário.
Depois de importar um dos Notebooks abaixo, complete os passos para criar um pipeline, mas use o seletor de arquivo de código-fonte para selecionar o Notebook de downloads. Depois de criar o site pipeline com um Notebook configurado como código-fonte, clique em começar na UI do site pipeline para acionar uma atualização.