Pular para o conteúdo principal

Desenvolver código de pipeline com SQL

O DLT apresenta várias novas palavras-chave e funções do site SQL para definir a visualização materializada e as tabelas de transmissão no pipeline. SQL O suporte ao desenvolvimento de pipeline se baseia nos conceitos básicos do site Spark SQL e adiciona suporte à funcionalidade de transmissão estruturada.

Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver o código do pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são difíceis de implementar com o SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.

Para obter uma referência completa da sintaxe do DLT SQL, consulte Referência da linguagem DLT SQL.

Noções básicas de SQL para desenvolvimento de pipeline

SQL O código que cria o conjunto de dados DLT usa a sintaxe CREATE OR REFRESH para definir a visualização materializada e as tabelas de transmissão em relação aos resultados da consulta.

A palavra-chave STREAM indica se a fonte de dados referenciada em uma cláusula SELECT deve ser lida com a semântica de transmissão.

Lê e grava o site default no catálogo e no esquema especificados durante a configuração do site pipeline. Consulte Definir o catálogo e o esquema de destino.

O código-fonte da DLT é muito diferente dos scripts SQL: O DLT avalia todas as definições do dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes da execução de qualquer consulta. A ordem das consultas que aparecem em um Notebook ou script define a ordem de avaliação do código, mas não a ordem de execução da consulta.

Crie um site view materializado com o SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar um view materializado com SQL:

SQL
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Criar uma tabela de transmissão com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL:

nota

Nem todas as fontes de dados suportam leituras de transmissão, e algumas fontes de dados devem sempre ser processadas com a semântica de transmissão.

SQL
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Carregar dados do armazenamento de objetos

O DLT suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.

nota

Esses exemplos usam dados disponíveis no /databricks-datasets montado automaticamente em seu site workspace. A Databricks recomenda o uso de caminhos de volume ou URIs de nuvem para fazer referência aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que são volumes do Unity Catalog?

Databricks recomenda o uso das tabelas Auto Loader e de transmissão ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos na nuvem. Consulte O que é o Auto Loader?

O SQL usa a função read_files para invocar a funcionalidade do Auto Loader. O senhor também deve usar a palavra-chave STREAM para configurar uma transmissão lida com read_files.

O exemplo a seguir cria uma tabela de transmissão a partir dos arquivos JSON usando Auto Loader:

SQL
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A função read_files também suporta a semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar um view materializado:

SQL
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Valide os dados de acordo com as expectativas

Você pode usar as expectativas para definir e aplicar restrições de qualidade de dados. Veja como gerenciar a qualidade dos dados com pipeline expectativas.

O código a seguir define uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

SQL
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar a visualização materializada e as tabelas de transmissão definidas em seu pipeline

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de transmissão denominada orders que carrega dados do site JSON.
  • Um view materializado chamado customers que carrega os dados do CSV.
  • Um view materializado chamado customer_orders que une registros dos conjuntos de dados orders e customers, converte o carimbo de data/hora da ordem em uma data e seleciona os campos customer_id, order_number, state e order_date.
  • Um view materializado chamado daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.
nota

Ao consultar a visualização ou as tabelas no site pipeline, o senhor pode especificar o catálogo e o esquema diretamente ou pode usar o padrão configurado no site pipeline. Neste exemplo, as tabelas orders, customers e customer_orders são gravadas e lidas no catálogo default e no esquema configurado para o seu pipeline.

O modo de publicação legado usa o esquema LIVE para consultar outras visualizações materializadas e tabelas de transmissão definidas em seu site pipeline. No novo pipeline, a sintaxe do esquema LIVE é silenciosamente ignorada. Consulte esquema LIVE (legado).

SQL
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM customer_orders
GROUP BY state, order_date;