Carregar tabelas de uso de dados transmitidos no Databricks SQL
Visualização
Esse recurso está em Public Preview.
Databricks recomenda o uso de tabelas transmitidas para ingerir o uso de dados Databricks SQL. Uma tabela transmitida é uma tabela gerenciada do Unity Catalog com suporte extra para transmissão ou processamento de dados incremental. Um pipeline DLT é criado automaticamente para cada tabela transmitida. Você pode usar tabelas transmitidas para carregamento de dados incrementais de Kafka e armazenamento de objetos cloud .
Este artigo demonstra o uso de tabelas transmitidas para carregar dados do armazenamento de objetos cloud configurado como um volume do Unity Catalog (recomendado) ou local externo.
Observação
Para saber como usar as tabelas Delta Lake como fontes e coletores de transmissão, consulte leituras e gravações de transmissão da tabela Delta.
Antes de começar
Antes de começar, certifique-se de ter o seguinte:
Um Databricks account com o serverless ativado. Para obter mais informações, consulte Ativar o SQL warehouse sem servidor.
Um workspace com o Unity Catalog habilitado. Para obter mais informações, consulte Configurar e gerenciar Unity Catalog.
Um SQL warehouse que usa o canal
Current
.Para consultar tabelas de transmissão criadas por um Delta Live Tables pipeline, o senhor deve usar um compute compartilhado usando Databricks Runtime 13.3 LTS e acima ou um SQL warehouse. As tabelas de transmissão criadas em um Unity Catalog habilitado pipeline não podem ser consultadas a partir de um clusters atribuído ou sem isolamento.
O privilégio
READ FILES
em um local externo do Unity Catalog. Para obter informações, consulte Criar um local externo para conectar o armazenamento em nuvem ao Databricks.O privilégio
USE CATALOG
no catálogo no qual você cria a tabela de transmissão.O privilégio
USE SCHEMA
no esquema no qual você cria a tabela de transmissão.O privilégio
CREATE TABLE
no esquema no qual você cria a tabela de transmissão.O caminho para seus dados de origem.
Exemplo de caminho de volume:
/Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>
Exemplo de caminho de local externo:
s3://myBucket/analysis
Observação
Este artigo pressupõe que os dados que você deseja carregar estão em um local de armazenamento cloud que corresponde a um volume do Unity Catalog ou local externo ao qual você tem acesso.
Descubra e visualize os dados de origem
Na barra lateral do seu workspace, clique em query e, em seguida, clique em Criar query.
No editor query , selecione um SQL warehouse que usa o canal
Current
na lista suspensa.Cole o seguinte no editor, substituindo os valores entre colchetes angulares (
<>
) pela informação que identifica seus dados de origem e, em seguida, clique em execução.Observação
Você pode encontrar erros de inferência de esquema ao executar a função com valor de tabela
read_files
se o default para a função não puder analisar seus dados. Por exemplo, pode ser necessário configurar o modo de várias linhas para arquivos CSV ou JSON de várias linhas. Para obter uma lista de opções do analisador, consulte função com valor de tabela read_files./* Discover your data in a volume */ LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>" /* Preview your data in a volume */ SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10 /* Discover your data in an external location */ LIST "s3://<bucket>/<path>/<folder>" /* Preview your data */ SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
Carregar dados em uma tabela transmitida
Para criar uma tabela transmitida a partir de dados no armazenamento de objetos cloud , cole o seguinte no editor query e clique em execução:
/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')
/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')
Atualize uma tabela transmitida usando um pipeline DLT
Esta seção descreve os padrões para atualizar uma tabela de transmissão com os últimos dados disponíveis das fontes definidas na query.
CREATE
as operações para tabelas transmitidas usam um Databricks SQL warehouse para a criação inicial e carregamento de dados na tabela transmitida. As operações REFRESH
para mesas transmitidas usam Delta Live Tables (DLT). Um pipeline DLT é criado automaticamente para cada tabela transmitida. Quando uma tabela transmitida é refresh , uma atualização no pipeline DLT é iniciada para processar a refresh.
Após a execução do comando REFRESH
, o link do pipeline DLT é retornado. Você pode usar o link do pipeline DLT para verificar o status da refresh.
Observação
Somente o dono da mesa pode refresh uma mesa transmitida para obter os dados mais recentes. O usuário que cria a tabela é o proprietário e o proprietário não pode ser alterado.
Veja O que é Delta Live Tables?.
Ingerir apenas novos dados
Por default, a função read_files
lê todos os dados existentes no diretório de origem durante a criação da tabela e processa os registros recém-chegados a cada refresh.
Para evitar a ingestão de dados que já existam no diretório de origem no momento da criação da tabela, defina a opção includeExistingFiles
como false
. Isso significa que somente os dados que chegam ao diretório após a criação da tabela são processados. Por exemplo:
CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)
Atualize totalmente uma mesa transmitida
refresh completa reprocessa todos os dados disponíveis na origem com a definição mais recente. Não é recomendável chamar full refresh em fontes que não guardam toda a história dos dados ou possuem curtos períodos de retenção, como Kafka, pois o full refresh trunca os dados existentes. Talvez você não consiga recuperar dados antigos se eles não estiverem mais disponíveis na origem.
Por exemplo:
REFRESH STREAMING TABLE my_bronze_table FULL
programar uma mesa de transmissão para atualização automática
Para configurar uma tabela transmitida para refresh automaticamente com base em um programar definido, cole o seguinte no editor query e clique em execução:
ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
CRON '<cron-string>'
[ AT TIME ZONE '<timezone-id>' ]];
Por exemplo, query programar refresh, consulte ALTER transmissão TABLE.
Rastrear o status de uma atualização
Você pode view o status de refresh de uma tabela de transmissão viewo pipeline que gerencia a tabela de transmissão na IU do Delta Live Tables ou viewa informaçãorefresh retornada pelo comando DESCRIBE EXTENDED
para a tabela de transmissão.
DESCRIBE EXTENDED <table-name>
ingestão de transmissão de Kafka
Para obter um exemplo de ingestão transmitida de Kafka, consulte read_kafka.
Conceder aos usuários acesso a uma mesa de transmissão
Para conceder aos usuários o privilégio SELECT
na tabela de transmissão para que eles possam query -la, cole o seguinte no editor query e clique em execução:
GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>
Para obter mais informações sobre a concessão de privilégios em objetos protegíveis Unity Catalog , consulte Privilégios e objetos protegíveisUnity Catalog .