Desenvolver código de pipeline com SQL
O pipeline declarativo LakeFlow introduz diversas novas palavras-chave e funções SQL para definir tabelas de visualização materializada e transmissão no pipeline. O suporte SQL para desenvolvimento de pipeline se baseia nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de transmissão estruturada.
Usuários familiarizados com PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são desafiadores de implementar com SQL, como operações de metaprogramação. Veja Desenvolver código de pipeline com Python.
Para uma referência completa da sintaxe SQL do pipeline declarativo LakeFlow , consulte Referência da linguagem SQL do pipeline declarativoLakeFlow.
Noções básicas de SQL para desenvolvimento de pipeline
O código SQL que cria o conjunto de dados do pipeline declarativo LakeFlow usa a sintaxe CREATE OR REFRESH
para definir tabelas de exibição materializada e transmissão em relação aos resultados da consulta.
A palavra-chave STREAM
indica se a fonte de dados referenciada em uma cláusula SELECT
deve ser lida com semântica de transmissão.
Lê e grava default no catálogo e esquema especificados durante a configuração pipeline . Consulte Definir o catálogo de destino e o esquema.
O código-fonte do pipeline declarativo LakeFlow difere criticamente dos scripts SQL : o pipeline declarativo LakeFlow avalia todas as definições dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes que qualquer consulta seja executada. A ordem das consultas que aparecem nos arquivos de origem define a ordem de avaliação do código, mas não a ordem de execução da consulta.
Crie uma view materializada com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma view materializada com SQL:
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Crie uma tabela de transmissão com SQL
O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL. Ao ler uma fonte para uma tabela de transmissão, a palavra-chave STREAM
indica o uso da semântica de transmissão para a fonte. Não use a palavra-chave STREAM
ao criar uma view materializada:
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Use a palavra-chave transmissão para usar a semântica de transmissão para ler a fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos. Para ingerir dados que tenham confirmação de alteração, você pode usar Python e a opção SkipChangeCommits
para lidar com erros.
Carregar dados do armazenamento de objetos
O pipeline declarativo LakeFlow suporta o carregamento de dados de todos os formatos suportados pelo Databricks. Veja Opções de formato de dados.
Esses exemplos usam dados disponíveis em /databricks-datasets
montados automaticamente em seu workspace. Databricks recomenda usar caminhos de volume ou URIs cloud para referenciar dados armazenados no armazenamento de objetos cloud . Veja O que são volumes Unity Catalog ?.
Databricks recomenda o uso Auto Loader e das tabelas de transmissão ao configurar cargas de trabalho de ingestão incremental em dados armazenados no armazenamento de objetos cloud . Veja O que é o Auto Loader?.
SQL usa a função read_files
para invocar a funcionalidade Auto Loader . Você também deve usar a palavra-chave STREAM
para configurar uma leitura de transmissão com read_files
.
A seguir, descrevemos a sintaxe para read_files
em SQL:
CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)
As opções para Auto Loader são par key-valor. Para obter detalhes sobre formatos e opções suportados, consulte Opções.
O exemplo a seguir cria uma tabela de transmissão a partir de arquivos JSON usando Auto Loader:
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
A função read_files
também suporta semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar uma view materializada:
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");
Validar dados com expectativas
Você pode usar expectativas para definir e impor restrições de qualidade de dados. Veja gerenciar a qualidade dos dados com expectativas pipeline.
O código a seguir define uma expectativa chamada valid_data
que descarta registros nulos durante a ingestão de dados:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Consultar tabelas de visualização materializada e transmissão definidas em seu pipeline
O exemplo a seguir define quatro conjuntos de dados:
- Uma tabela de transmissão chamada
orders
que carrega dados JSON . - Uma view materializada chamada
customers
que carrega dados CSV . - Uma view materializada chamada
customer_orders
que une registros do conjunto de dadosorders
ecustomers
, converte o registro de data e hora do pedido em uma data e seleciona os camposcustomer_id
,order_number
,state
eorder_date
. - Uma view materializada chamada
daily_orders_by_state
que agrega a contagem diária de pedidos para cada estado.
Ao consultar visualizações ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado em seu pipeline. Neste exemplo, as tabelas orders
, customers
e customer_orders
são gravadas e lidas do catálogo e esquema default configurados para seu pipeline.
O modo de publicação legado usa o esquema LIVE
para consultar outras tabelas de exibição materializada e transmissão definidas no seu pipeline. No novo pipeline, a sintaxe do esquema LIVE
é silenciosamente ignorada. Veja o esquema LIVE (legado).
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;
Definir uma tabela privada
Você pode usar a cláusula PRIVATE
ao criar uma view materializada ou uma tabela de transmissão. Ao criar uma tabela privada, você cria a tabela, mas não cria os metadados para ela. A cláusula PRIVATE
instrui o pipeline declarativo LakeFlow a criar uma tabela que esteja disponível para o pipeline , mas que não deve ser acessada fora pipeline. Para reduzir o tempo de processamento, uma tabela privada persiste durante toda a vida útil do pipeline que a cria, e não apenas durante uma única atualização.
Tabelas privadas podem ter o mesmo nome que tabelas no catálogo. Se você especificar um nome não qualificado para uma tabela dentro de um pipeline, se houver uma tabela privada e uma tabela de catálogo com esse nome, a tabela privada será usada.
As tabelas privadas eram anteriormente chamadas de tabelas temporárias.
Excluir permanentemente registros de uma view materializada ou tabela de transmissão
Para excluir permanentemente registros de uma tabela de transmissão com vetores de exclusão habilitados, como para compliance GDPR , operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma tabela de transmissão, consulte Excluir registros permanentemente de uma tabela de transmissão.
A visualização materializada sempre reflete os dados nas tabelas subjacentes quando elas são atualizadas. Para excluir dados em uma view materializada, você deve excluir os dados da fonte e refresh a view materializada.
Parametrizar valores usados ao declarar tabelas ou visualizações com SQL
Use SET
para especificar um valor de configuração em uma consulta que declara uma tabela ou view, incluindo configurações Spark . Qualquer tabela ou view que você definir em um arquivo de origem após a instrução SET
terá acesso ao valor definido. Quaisquer configurações Spark especificadas usando a instrução SET
são usadas ao executar a consulta Spark para qualquer tabela ou view após a instrução SET . Para ler um valor de configuração em uma consulta, use a sintaxe de interpolação de strings ${}
. O exemplo a seguir define um valor de configuração do Spark chamado startDate
e usa esse valor em uma consulta:
SET startDate='2025-01-01';
CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}
Para especificar vários valores de configuração, use uma instrução SET
separada para cada valor.
Limitações
A cláusula PIVOT
não é suportada. As operações pivot
no Spark exigem o carregamento rápido de dados de entrada para compute o esquema de saída. Esse recurso não é suportado no pipeline declarativo LakeFlow .
A sintaxe CREATE OR REFRESH LIVE TABLE
para criar uma view materializada está obsoleta. Em vez disso, use CREATE OR REFRESH MATERIALIZED VIEW
.