Pular para o conteúdo principal

Desenvolver código de pipeline com SQL

O pipeline declarativo LakeFlow introduz diversas novas palavras-chave e funções SQL para definir tabelas de visualização materializada e transmissão no pipeline. O suporte SQL para desenvolvimento de pipeline se baseia nos conceitos básicos do Spark SQL e adiciona suporte para a funcionalidade de transmissão estruturada.

Usuários familiarizados com PySpark DataFrames podem preferir desenvolver código de pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são desafiadores de implementar com SQL, como operações de metaprogramação. Veja Desenvolver código de pipeline com Python.

Para uma referência completa da sintaxe SQL do pipeline declarativo LakeFlow , consulte Referência da linguagem SQL do pipeline declarativoLakeFlow.

Noções básicas de SQL para desenvolvimento de pipeline

O código SQL que cria o conjunto de dados do pipeline declarativo LakeFlow usa a sintaxe CREATE OR REFRESH para definir tabelas de exibição materializada e transmissão em relação aos resultados da consulta.

A palavra-chave STREAM indica se a fonte de dados referenciada em uma cláusula SELECT deve ser lida com semântica de transmissão.

Lê e grava default no catálogo e esquema especificados durante a configuração pipeline . Consulte Definir o catálogo de destino e o esquema.

O código-fonte do pipeline declarativo LakeFlow difere criticamente dos scripts SQL : o pipeline declarativo LakeFlow avalia todas as definições dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes que qualquer consulta seja executada. A ordem das consultas que aparecem nos arquivos de origem define a ordem de avaliação do código, mas não a ordem de execução da consulta.

Crie uma view materializada com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma view materializada com SQL:

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Crie uma tabela de transmissão com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL. Ao ler uma fonte para uma tabela de transmissão, a palavra-chave STREAM indica o uso da semântica de transmissão para a fonte. Não use a palavra-chave STREAM ao criar uma view materializada:

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
nota

Use a palavra-chave transmissão para usar a semântica de transmissão para ler a fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente de acréscimos. Para ingerir dados que tenham confirmação de alteração, você pode usar Python e a opção SkipChangeCommits para lidar com erros.

Carregar dados do armazenamento de objetos

O pipeline declarativo LakeFlow suporta o carregamento de dados de todos os formatos suportados pelo Databricks. Veja Opções de formato de dados.

nota

Esses exemplos usam dados disponíveis em /databricks-datasets montados automaticamente em seu workspace. Databricks recomenda usar caminhos de volume ou URIs cloud para referenciar dados armazenados no armazenamento de objetos cloud . Veja O que são volumes Unity Catalog ?.

Databricks recomenda o uso Auto Loader e das tabelas de transmissão ao configurar cargas de trabalho de ingestão incremental em dados armazenados no armazenamento de objetos cloud . Veja O que é o Auto Loader?.

SQL usa a função read_files para invocar a funcionalidade Auto Loader . Você também deve usar a palavra-chave STREAM para configurar uma leitura de transmissão com read_files.

A seguir, descrevemos a sintaxe para read_files em SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

As opções para Auto Loader são par key-valor. Para obter detalhes sobre formatos e opções suportados, consulte Opções.

O exemplo a seguir cria uma tabela de transmissão a partir de arquivos JSON usando Auto Loader:

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

A função read_files também suporta semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar uma view materializada:

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

Validar dados com expectativas

Você pode usar expectativas para definir e impor restrições de qualidade de dados. Veja gerenciar a qualidade dos dados com expectativas pipeline.

O código a seguir define uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar tabelas de visualização materializada e transmissão definidas em seu pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de transmissão chamada orders que carrega dados JSON .
  • Uma view materializada chamada customers que carrega dados CSV .
  • Uma view materializada chamada customer_orders que une registros do conjunto de dados orders e customers , converte o registro de data e hora do pedido em uma data e seleciona os campos customer_id, order_number, state e order_date .
  • Uma view materializada chamada daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
nota

Ao consultar visualizações ou tabelas em seu pipeline, você pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado em seu pipeline. Neste exemplo, as tabelas orders, customers e customer_orders são gravadas e lidas do catálogo e esquema default configurados para seu pipeline.

O modo de publicação legado usa o esquema LIVE para consultar outras tabelas de exibição materializada e transmissão definidas no seu pipeline. No novo pipeline, a sintaxe do esquema LIVE é silenciosamente ignorada. Veja o esquema LIVE (legado).

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Definir uma tabela privada

Você pode usar a cláusula PRIVATE ao criar uma view materializada ou uma tabela de transmissão. Ao criar uma tabela privada, você cria a tabela, mas não cria os metadados para ela. A cláusula PRIVATE instrui o pipeline declarativo LakeFlow a criar uma tabela que esteja disponível para o pipeline , mas que não deve ser acessada fora pipeline. Para reduzir o tempo de processamento, uma tabela privada persiste durante toda a vida útil do pipeline que a cria, e não apenas durante uma única atualização.

Tabelas privadas podem ter o mesmo nome que tabelas no catálogo. Se você especificar um nome não qualificado para uma tabela dentro de um pipeline, se houver uma tabela privada e uma tabela de catálogo com esse nome, a tabela privada será usada.

As tabelas privadas eram anteriormente chamadas de tabelas temporárias.

Excluir permanentemente registros de uma view materializada ou tabela de transmissão

Para excluir permanentemente registros de uma tabela de transmissão com vetores de exclusão habilitados, como para compliance GDPR , operações adicionais devem ser executadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma tabela de transmissão, consulte Excluir registros permanentemente de uma tabela de transmissão.

A visualização materializada sempre reflete os dados nas tabelas subjacentes quando elas são atualizadas. Para excluir dados em uma view materializada, você deve excluir os dados da fonte e refresh a view materializada.

Parametrizar valores usados ao declarar tabelas ou visualizações com SQL

Use SET para especificar um valor de configuração em uma consulta que declara uma tabela ou view, incluindo configurações Spark . Qualquer tabela ou view que você definir em um arquivo de origem após a instrução SET terá acesso ao valor definido. Quaisquer configurações Spark especificadas usando a instrução SET são usadas ao executar a consulta Spark para qualquer tabela ou view após a instrução SET . Para ler um valor de configuração em uma consulta, use a sintaxe de interpolação de strings ${}. O exemplo a seguir define um valor de configuração do Spark chamado startDate e usa esse valor em uma consulta:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar vários valores de configuração, use uma instrução SET separada para cada valor.

Limitações

A cláusula PIVOT não é suportada. As operações pivot no Spark exigem o carregamento rápido de dados de entrada para compute o esquema de saída. Esse recurso não é suportado no pipeline declarativo LakeFlow .

nota

A sintaxe CREATE OR REFRESH LIVE TABLE para criar uma view materializada está obsoleta. Em vez disso, use CREATE OR REFRESH MATERIALIZED VIEW.