Referência de linguagem SQL do Delta Live Tables

Este artigo fornece detalhes para a interface de programação SQL do Delta Live Tables.

Você pode usar as funções definidas pelo usuário (UDFs) do Python em sua query SQL, mas deve definir essas UDFs nos arquivos Python antes de chamá-los nos arquivos de origem SQL. Consulte Funções escalares definidas pelo usuário - Python.

Limitações

A cláusula PIVOT não é suportada. As operações pivot no Spark requerem carregamento rápido de dados de entrada para compute o esquema da saída. Esse recurso não é compatível com Delta Live Tables.

Crie uma visualização materializada do Delta Live Tables ou uma tabela transmitida

Você usa a mesma sintaxe SQL básica ao declarar uma tabela transmitida ou uma view materializada (também conhecida como LIVE TABLE).

Você só pode declarar tabelas transmitidas usando query que lê contra uma fonte transmitida. Databricks recomenda o uso do Auto Loader para ingestão de transmissão de arquivos do armazenamento de objetos cloud . Consulte Sintaxe do Auto Loader SQL.

Você deve incluir a função STREAM() em torno de um nome dataset ao especificar outras tabelas ou view em seu pipeline como uma fonte de transmissão.

A seguir está a descrição da sintaxe para declaração view materializada e tabelas de transmissão com SQL:

CREATE OR REFRESH [TEMPORARY] { STREAMING TABLE | LIVE TABLE } table_name
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  AS select_statement

Criar uma exibição Delta Live Tables

O seguinte descreve a sintaxe para declarar view com SQL:

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Sintaxe SQL Auto Loader

O seguinte descreve a sintaxe para trabalhar com o Auto Loader em SQL:

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Você pode usar as opções de formato compatíveis com o Auto Loader. Usando a função map() , você pode passar qualquer número de opções para o método cloud_files() . As opções são valor- keypar, em que a key e os valores são strings. Para obter detalhes sobre opções e formatos de suporte, consulte Opções de formato de arquivo.

Exemplo: definir tabelas

Você pode criar um dataset lendo de uma fonte de dados externa ou de dataset definido em um pipeline. Para ler um dataset interno , anexe a palavra-chave LIVE ao nome dataset . O exemplo a seguir define dois dataset diferentes: uma tabela chamada taxi_raw que usa um arquivo JSON como fonte de entrada e uma tabela chamada filtered_data que usa a tabela taxi_raw como entrada:

CREATE OR REFRESH LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH LIVE TABLE filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Exemplo: Ler de uma fonte transmitida

Para ler dados de uma fonte de transmissão, por exemplo, Auto Loader ou um conjunto de dados interno, defina uma tabela STREAMING :

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Para mais informações sobre dados transmitidos, consulte transformação de dados com Delta Live Tables.

Controle como as tabelas são materializadas

As tabelas também oferecem controle adicional de sua materialização:

Observação

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização dos dados. A menos que você espere que sua tabela cresça além de um terabyte, você geralmente não deve especificar colunas de partição.

Exemplo: especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema ao definir uma tabela. O exemplo a seguir especifica o esquema para a tabela de destino, incluindo o uso de colunas geradas pelo Delta Lake e a definição de colunas de partição para a tabela:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Por default, Delta Live Tables infere o esquema da definição table se você não especificar um esquema.

Exemplo: Definir restrições de tabela

Observação

O suporte a restrições de tabela está em Public Preview. Para definir restrições de tabela, seu pipeline deve ser um pipeline habilitado para o Unity Catalog e configurado para usar o canal preview.

Ao especificar um esquema, o senhor pode definir a chave primária key e a chave estrangeira. As restrições são informativas e não são aplicadas. O exemplo a seguir define uma tabela com uma restrição primária e estrangeira key:

CREATE OR REFRESH LIVE TABLE sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Definir valores de configuração para uma tabela ou exibição

Use SET para especificar um valor de configuração para uma tabela ou view, incluindo configurações do Spark. Qualquer tabela ou view definida em um Notebook após a instrução SET tem acesso ao valor definido. Quaisquer configurações do Spark especificadas usando a instrução SET são usadas ao executar a query do Spark para qualquer tabela ou view após a instrução SET. Para ler um valor de configuração em uma query, use a sintaxe de interpolação strings ${}. O exemplo a seguir define um valor de configuração do Spark chamado startDate e usa esse valor em uma query:

SET startDate='2020-01-01';

CREATE OR REFRESH LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Para especificar vários valores de configuração, use uma instrução SET separada para cada valor.

Propriedades SQL

CREATE TABLE ou VIEW

TEMPORARY

Criar uma tabela, mas não publicar os metadados da tabela. A cláusula TEMPORARY instrui o Delta Live Tables a criar uma tabela que esteja disponível para o pipeline, mas que não deva ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária persiste durante o tempo de vida do pipeline que a cria, e não apenas em uma única atualização.

STREAMING

Crie uma tabela que leia um dataset de entrada como uma transmissão. O dataset de entrada deve ser uma fonte de dados transmitida, por exemplo, Auto Loader ou uma tabela STREAMING.

PARTITIONED BY

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.

LOCATION

Um local de armazenamento opcional para dados da tabela. Se não for definido, o sistema assumirá default o local de armazenamento do pipeline.

COMMENT

Uma descrição opcional para a tabela.

column_constraint

Uma restrição opcional informativa primária key ou estrangeira key na coluna.

table_constraint

Uma restrição opcional informativa primária key ou estrangeira key na tabela.

TBLPROPERTIES

Uma lista opcional de propriedades de tabela para a tabela.

select_statement

Uma query Delta Live Tables que define o dataset para a tabela.

Cláusula CONSTRAINT

EXPECT expectation_name

Defina a restrição de qualidade de dados expectation_name. Se a restrição ON VIOLATION não estiver definida, adicione linhas que violem a restrição ao dataset de destino.

ON VIOLATION

Ação opcional a ser executada para linhas com falha:

  • FAIL UPDATE: pare imediatamente a execução do pipeline.

  • DROP ROW: Elimine o registro e continue processando.

captura de dados de alterações (CDC) com SQL em Delta Live Tables

Use a instrução APPLY CHANGES INTO para usar a funcionalidade Delta Live Tables CDC, conforme descrito a seguir:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Você define restrições de qualidade de dados para um destino APPLY CHANGES usando a mesma cláusula CONSTRAINT que não éAPPLY CHANGES query. Veja como gerenciar a qualidade dos dados com Delta Live Tables.

Observação

O comportamento default para os eventos INSERT e UPDATE é atualizar os eventos CDC da origem: atualize todas as linhas na tabela de destino que correspondam à(s) key(s) especificada(s) ou insira uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de eventos DELETE pode ser especificada com a condição APPLY AS DELETE WHEN .

Importante

Você deve declarar uma tabela de transmissão de destino para aplicar as alterações. Você pode opcionalmente especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino APPLY CHANGES , você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados do campo sequence_by .

Consulte APLICAR ALTERAÇÕES em API: Simplificar a captura de dados de alterações (CDC) em Delta Live Tables.

Cláusulas

KEYS

A coluna ou combinação de colunas que identificam exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino.

Esta cláusula é necessária.

IGNORE NULL UPDATES

Permitir a ingestão de atualizações contendo um subconjunto da coluna de destino. Quando um evento CDC corresponde a uma linha existente e IGNORE NULL UPDATES é especificado, as colunas com um null manterão seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null.

Esta cláusula é opcional.

O default é sobrescrever colunas existentes com valores null.

APPLY AS DELETE WHEN

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Para lidar com dados fora de ordem, a linha excluída é retida temporariamente como uma marca para exclusão na tabela Delta subjacente e uma view é criada no metastore que filtra essas marcas para exclusão. O intervalo de retenção pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds .

Esta cláusula é opcional.

APPLY AS TRUNCATE WHEN

Especifica quando um evento CDC deve ser tratado como uma tabela completa TRUNCATE. Como essa cláusula aciona um truncamento completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exigem essa funcionalidade.

A cláusula APPLY AS TRUNCATE WHEN é suportada apenas para SCD tipo 1. SCD tipo 2 não suporta truncamento.

Esta cláusula é opcional.

SEQUENCE BY

O nome da coluna que especifica a ordem lógica dos eventos CDC nos dados de origem. Delta Live Tables usa esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.

Esta cláusula é necessária.

COLUMNS

Especifica um subconjunto de colunas a serem incluídas na tabela de destino. Você também pode:

  • Especifique a lista completa de colunas a serem incluídas: COLUMNS (userId, name, city).

  • Especifique uma lista de colunas a serem excluídas: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula é opcional.

O default é incluir todas as colunas na tabela de destino quando a cláusula COLUMNS não for especificada.

STORED AS

Se os registros devem ser armazenados como SCD tipo 1 ou SCD tipo 2.

Esta cláusula é opcional.

O default é SCD tipo 1.

TRACK HISTORY ON

Especifica um subconjunto de colunas de saída para gerar registros de história quando houver alterações nessas colunas especificadas. Você também pode:

  • Especifique a lista completa de colunas a serem rastreadas: COLUMNS (userId, name, city).

  • Especifique uma lista de colunas a serem excluídas do acompanhamento: COLUMNS * EXCEPT (operation, sequenceNum)

Esta cláusula é opcional. O default é acompanhar a história de todas as colunas de saída quando houver alguma alteração, equivalente a TRACK HISTORY ON *.