Pular para o conteúdo principal

Desenvolver código de pipeline com SQL

O DLT apresenta várias novas palavras-chave e funções do site SQL para definir a visualização materializada e as tabelas de transmissão no pipeline. SQL O suporte ao desenvolvimento de pipeline se baseia nos conceitos básicos do site Spark SQL e adiciona suporte à funcionalidade de transmissão estruturada.

Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver o código do pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são difíceis de implementar com o SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.

Para obter uma referência completa da sintaxe do DLT SQL, consulte Referência da linguagem DLT SQL.

Noções básicas de SQL para desenvolvimento de pipeline

SQL O código que cria o conjunto de dados DLT usa a sintaxe CREATE OR REFRESH para definir a visualização materializada e as tabelas de transmissão em relação aos resultados da consulta.

A palavra-chave STREAM indica se a fonte de dados referenciada em uma cláusula SELECT deve ser lida com a semântica de transmissão.

Lê e grava o site default no catálogo e no esquema especificados durante a configuração do site pipeline. Consulte Definir o catálogo e o esquema de destino.

O código-fonte da DLT é muito diferente dos scripts SQL: O DLT avalia todas as definições do dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes da execução de qualquer consulta. A ordem das consultas que aparecem em um Notebook ou script define a ordem de avaliação do código, mas não a ordem de execução da consulta.

Crie um site view materializado com o SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar um view materializado com SQL:

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Criar uma tabela de transmissão com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL. Ao ler uma fonte para uma tabela de transmissão, a palavra-chave STREAM indica o uso da semântica de transmissão para a fonte. Não use a palavra-chave STREAM ao criar um site materializado view:

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
nota

Use a palavra-chave transmissão para usar a semântica de transmissão para ler a partir da fonte. Se a leitura encontrar uma alteração ou exclusão em um registro existente, um erro será gerado. É mais seguro ler de fontes estáticas ou somente anexadas. Para ingerir dados com commit de alteração, o senhor pode usar Python e a opção SkipChangeCommits para lidar com erros.

Carregar dados do armazenamento de objetos

O DLT suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.

nota

Esses exemplos usam dados disponíveis no /databricks-datasets montado automaticamente em seu site workspace. A Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que são volumes do Unity Catalog?

Databricks recomenda o uso das tabelas Auto Loader e de transmissão ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que é o Auto Loader?

O SQL usa a função read_files para invocar a funcionalidade do Auto Loader. O senhor também deve usar a palavra-chave STREAM para configurar uma transmissão lida com read_files.

A seguir, descrevemos a sintaxe do site read_files no SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
FROM STREAM read_files(
"<file-path>",
[<option-key> => <option_value>, ...]
)

As opções para Auto Loader são par key-value. Para obter detalhes sobre formatos e opções compatíveis, consulte Opções.

O exemplo a seguir cria uma tabela de transmissão a partir dos arquivos JSON usando Auto Loader:

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT *
FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

A função read_files também suporta a semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar um view materializado:

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT *
FROM read_files(
"/databricks-datasets/retail-org/sales_orders",
format => "json");

Valide os dados de acordo com as expectativas

Você pode usar as expectativas para definir e aplicar restrições de qualidade de dados. Veja como gerenciar a qualidade dos dados com pipeline expectativas.

O código a seguir define uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar a visualização materializada e as tabelas de transmissão definidas em seu pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de transmissão denominada orders que carrega dados do site JSON.
  • Um view materializado chamado customers que carrega os dados do CSV.
  • Um view materializado chamado customer_orders que une registros dos conjuntos de dados orders e customers, converte o carimbo de data/hora da ordem em uma data e seleciona os campos customer_id, order_number, state e order_date.
  • Um view materializado chamado daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
nota

Ao consultar a visualização ou as tabelas no site pipeline, o senhor pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado no site pipeline. Neste exemplo, as tabelas orders, customers e customer_orders são gravadas e lidas no catálogo default e no esquema configurado para o seu pipeline.

O modo de publicação legado usa o esquema LIVE para consultar outras visualizações materializadas e tabelas de transmissão definidas em seu site pipeline. No novo pipeline, a sintaxe do esquema LIVE é silenciosamente ignorada. Consulte esquema LIVE (legado).

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;

Definir uma tabela privada

O senhor pode usar a cláusula PRIVATE ao criar uma tabela materializada view ou uma tabela de transmissão. Ao criar uma tabela privada, você cria a tabela, mas não cria os metadados para a tabela. A cláusula PRIVATE instrui a DLT a criar uma tabela que está disponível para o pipeline, mas que não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela privada persiste durante o tempo de vida do pipeline que a cria, e não apenas em uma única atualização.

As tabelas privadas podem ter o mesmo nome das tabelas no catálogo. Se o senhor especificar um nome não qualificado para uma tabela em um pipeline, se houver uma tabela privada e uma tabela de catálogo com esse nome, a tabela privada será usada.

As mesas privadas eram anteriormente chamadas de tabelas temporárias.

Excluir permanentemente registros de uma tabela materializada view ou de transmissão

Para excluir permanentemente registros de uma tabela de transmissão com vetores de exclusão ativados, como para GDPR compliance, operações adicionais devem ser realizadas nas tabelas Delta subjacentes do objeto. Para garantir a exclusão de registros de uma tabela de transmissão, consulte Excluir permanentemente registros de uma tabela de transmissão.

A visualização materializada sempre reflete os dados nas tabelas subjacentes quando são atualizadas. Para excluir dados em um Materialized view, o senhor deve excluir os dados da fonte e refresh o Materialized view.

Parametrize os valores usados ao declarar tabelas ou exibições com SQL

Use SET para especificar um valor de configuração em uma consulta que declare uma tabela ou view, incluindo as configurações de Spark. Qualquer tabela ou view que o senhor definir em um Notebook após a declaração SET terá acesso ao valor definido. Todas as configurações de Spark especificadas com a instrução SET são usadas na execução da consulta Spark para qualquer tabela ou view após a instrução SET. Para ler um valor de configuração em uma consulta, use a sintaxe de interpolação de strings ${}. O exemplo a seguir define um valor de configuração do Spark chamado startDate e usa esse valor em uma consulta:

SET startDate='2025-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar vários valores de configuração, use uma instrução SET separada para cada valor.

Limitações

A cláusula PIVOT não é suportada. As pivot operações em Spark exigem o carregamento ansioso dos dados de entrada para compute o esquema de saída. Esse recurso não é compatível com o DLT.

nota

A sintaxe CREATE OR REFRESH LIVE TABLE para criar um view materializado está obsoleta. Em vez disso, use CREATE OR REFRESH MATERIALIZED VIEW.