Desenvolver código de pipeline com SQL

Delta Live Tables introduz várias novas palavras-chave e funções no site SQL para definir a visualização materializada e as tabelas de transmissão no pipeline. SQL O suporte ao desenvolvimento de pipeline se baseia nos conceitos básicos do site Spark SQL e adiciona suporte à funcionalidade de transmissão estruturada.

Os usuários familiarizados com o PySpark DataFrames podem preferir desenvolver o código do pipeline com Python. O Python oferece suporte a testes e operações mais abrangentes que são difíceis de implementar com o SQL, como operações de metaprogramação. Consulte Desenvolver código de pipeline com Python.

Para obter uma referência completa da sintaxe SQL do Delta Live Tables, consulte Referência da linguagem SQL do Delta Live Tables.

Noções básicas de SQL para desenvolvimento de pipeline

SQL O código que cria o conjunto de dados Delta Live Tables usa a sintaxe CREATE OR REFRESH para definir a visualização materializada e as tabelas de transmissão em relação aos resultados da consulta.

A palavra-chave STREAM indica se a fonte de dados referenciada em uma cláusula SELECT deve ser lida com a semântica de transmissão.

Delta Live Tables O código-fonte difere criticamente dos scripts do SQL: o Delta Live Tables avalia todas as definições do dataset em todos os arquivos de código-fonte configurados em um pipeline e cria um gráfico de fluxo de dados antes da execução de qualquer consulta. A ordem das consultas que aparecem em um Notebook ou script não define a ordem de execução.

Criar uma visualização materializada com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar um view materializado com SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Criar uma tabela de transmissão com SQL

O exemplo de código a seguir demonstra a sintaxe básica para criar uma tabela de transmissão com SQL:

Observação

Nem todas as fontes de dados suportam leituras de transmissão, e algumas fontes de dados devem sempre ser processadas com a semântica de transmissão.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Carregar dados do armazenamento de objetos

O Delta Live Tables suporta o carregamento de dados de todos os formatos suportados pela Databricks. Consulte Opções de formato de dados.

Observação

Esses exemplos usam dados disponíveis no /databricks-datasets montado automaticamente em seu site workspace. Databricks recomenda o uso de caminhos de volume ou URIs cloud para fazer referência aos dados armazenados no armazenamento de objetos cloud. Consulte O que são volumes do Unity Catalog?

Databricks recomenda o uso das tabelas Auto Loader e transmissão ao configurar cargas de trabalho de ingestão incremental em relação aos dados armazenados no armazenamento de objetos cloud. Consulte O que é o Auto Loader?

O SQL usa a função read_files para invocar a funcionalidade do Auto Loader. O senhor também deve usar a palavra-chave STREAM para configurar uma transmissão lida com read_files.

O exemplo a seguir cria uma tabela de transmissão a partir dos arquivos JSON usando Auto Loader:

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

A função read_files também é compatível com a semântica de lotes para criar uma visualização materializada. O exemplo a seguir usa a semântica de lotes para ler um diretório JSON e criar um view materializado:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Valide os dados de acordo com as expectativas

Você pode usar as expectativas para definir e aplicar restrições de qualidade de dados. Veja como gerenciar a qualidade dos dados com Delta Live Tables.

O código a seguir define uma expectativa chamada valid_data que descarta registros nulos durante a ingestão de dados:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Consultar a visualização materializada e as tabelas de transmissão definidas em seu pipeline

Use o esquema LIVE para consultar outras visualizações materializadas e tabelas de transmissão definidas em seu site pipeline.

O exemplo a seguir define quatro conjuntos de dados:

  • Uma tabela de transmissão denominada orders que carrega dados do site JSON.

  • Um view materializado chamado customers que carrega os dados do CSV.

  • Um view materializado chamado customer_orders que une registros dos conjuntos de dados orders e customers, converte o carimbo de data/hora da ordem em uma data e seleciona os campos customer_id, order_number, state e order_date.

  • Um view materializado chamado daily_orders_by_state que agrega a contagem diária de pedidos para cada estado.

CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;