Desenvolver código de pipeline com SQL
Delta Live Tables introduz várias novas palavras-chave e funções no site SQL para definir a visualização materializada e as tabelas de transmissão no pipeline. SQL O suporte ao desenvolvimento de pipeline se baseia nos conceitos básicos do site Spark SQL e adiciona suporte à funcionalidade de transmissão estruturada.
Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver o código do pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são difíceis de implementar com o SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.
Para obter uma referência completa da sintaxe SQL do Delta Live Tables, consulte Referência da linguagem SQL do Delta Live Tables.
Noções básicas de SQL para desenvolvimento de pipeline
SQL O código que cria o conjunto de dados Delta Live Tables usa a sintaxe CREATE OR REFRESH
para definir a visualização materializada e as tabelas de transmissão em relação aos resultados da consulta.
A palavra-chave STREAM
indica se a fonte de dados referenciada em uma cláusula SELECT
deve ser lida com a semântica de transmissão.
Delta Live Tables O código-fonte difere criticamente dos scripts do SQL: o Delta Live Tables avalia todas as definições do dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes da execução de qualquer consulta. A ordem das consultas que aparecem em um Notebook ou script não define a ordem de execução.
Criar uma visualização materializada com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar um view materializado com SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Criar uma tabela de transmissão com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL:
Observação
Nem todas as fontes de dados suportam leituras de transmissão, e algumas fontes de dados devem sempre ser processadas com a semântica de transmissão.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Carregar dados do armazenamento de objetos
O Delta Live Tables suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.
Observação
Esses exemplos usam dados disponíveis no /databricks-datasets
montado automaticamente em seu site workspace. Databricks recomenda o uso de caminhos de volume ou URIs cloud para fazer referência aos dados armazenados no armazenamento de objetos cloud. Consulte O que são volumes do Unity Catalog?
Databricks recomenda o uso das tabelas Auto Loader e transmissão ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos cloud. Consulte O que é o Auto Loader?
O SQL usa a função read_files
para invocar a funcionalidade do Auto Loader. O senhor também deve usar a palavra-chave STREAM
para configurar uma transmissão lida com read_files
.
O exemplo a seguir cria uma tabela de transmissão a partir dos arquivos JSON usando Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
A função read_files
também é compatível com a semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar um view materializado:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Valide os dados de acordo com as expectativas
Você pode usar as expectativas para definir e aplicar restrições de qualidade de dados. Veja como gerenciar a qualidade dos dados com pipeline expectativas.
O código a seguir define uma expectativa chamada valid_data
que descarta registros nulos durante a ingestão de dados:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Consultar a visualização materializada e as tabelas de transmissão definidas em seu pipeline
Use o esquema LIVE
para consultar outras visualizações materializadas e tabelas de transmissão definidas em seu site pipeline.
O exemplo a seguir define quatro conjuntos de dados:
Uma tabela de transmissão denominada
orders
que carrega dados do site JSON.Um view materializado chamado
customers
que carrega os dados do CSV.Um view materializado chamado
customer_orders
que une registros dos conjuntos de dadosorders
ecustomers
, converte o carimbo de data/hora da ordem em uma data e seleciona os camposcustomer_id
,order_number
,state
eorder_date
.Um view materializado chamado
daily_orders_by_state
que agrega a contagem diária de pedidos para cada estado.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;