Desenvolver código de pipeline com SQL
O DLT apresenta várias novas palavras-chave e funções do site SQL para definir a visualização materializada e as tabelas de transmissão no pipeline. SQL O suporte ao desenvolvimento de pipeline se baseia nos conceitos básicos do site Spark SQL e adiciona suporte à funcionalidade de transmissão estruturada.
Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver o código do pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são difíceis de implementar com o SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.
Para obter uma referência completa da sintaxe do DLT SQL, consulte Referência da linguagem DLT SQL.
Noções básicas de SQL para desenvolvimento de pipeline
SQL O código que cria o conjunto de dados DLT usa a sintaxe CREATE OR REFRESH
para definir a visualização materializada e as tabelas de transmissão em relação aos resultados da consulta.
A palavra-chave STREAM
indica se a fonte de dados referenciada em uma cláusula SELECT
deve ser lida com a semântica de transmissão.
Lê e grava o site default no catálogo e no esquema especificados durante a configuração do site pipeline. Consulte Definir o catálogo e o esquema de destino.
O código-fonte da DLT é muito diferente dos scripts SQL: O DLT avalia todas as definições do dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes da execução de qualquer consulta. A ordem das consultas que aparecem em um Notebook ou script define a ordem de avaliação do código, mas não a ordem de execução da consulta.
Crie um site view materializado com o SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar um view materializado com SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Criar uma tabela de transmissão com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL:
Nem todas as fontes de dados suportam leituras de transmissão, e algumas fontes de dados devem sempre ser processadas com a semântica de transmissão.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Carregar dados do armazenamento de objetos
O DLT suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.
Esses exemplos usam dados disponíveis no /databricks-datasets
montado automaticamente em seu site workspace. A Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que são volumes do Unity Catalog?
Databricks recomenda o uso das tabelas Auto Loader e de transmissão ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que é o Auto Loader?
O SQL usa a função read_files
para invocar a funcionalidade do Auto Loader. O senhor também deve usar a palavra-chave STREAM
para configurar uma transmissão lida com read_files
.
O exemplo a seguir cria uma tabela de transmissão a partir dos arquivos JSON usando Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
A função read_files
também suporta a semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar um view materializado:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Valide os dados de acordo com as expectativas
Você pode usar as expectativas para definir e aplicar restrições de qualidade de dados. Veja como gerenciar a qualidade dos dados com pipeline expectativas.
O código a seguir define uma expectativa chamada valid_data
que descarta registros nulos durante a ingestão de dados:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Consultar a visualização materializada e as tabelas de transmissão definidas em seu pipeline
O exemplo a seguir define quatro conjuntos de dados:
- Uma tabela de transmissão denominada
orders
que carrega dados do site JSON. - Um view materializado chamado
customers
que carrega os dados do CSV. - Um view materializado chamado
customer_orders
que une registros dos conjuntos de dadosorders
ecustomers
, converte o carimbo de data/hora da ordem em uma data e seleciona os camposcustomer_id
,order_number
,state
eorder_date
. - Um view materializado chamado
daily_orders_by_state
que agrega a contagem diária de pedidos para cada estado.
Ao consultar a visualização ou as tabelas no site pipeline, o senhor pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado no site pipeline. Neste exemplo, as tabelas orders
, customers
e customer_orders
são gravadas e lidas no catálogo default e no esquema configurado para o seu pipeline.
O modo de publicação legado usa o esquema LIVE
para consultar outras visualizações materializadas e tabelas de transmissão definidas em seu site pipeline. No novo pipeline, a sintaxe do esquema LIVE
é silenciosamente ignorada. Consulte esquema LIVE (legado).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;