Carregar tabelas de uso de dados transmitidos no Databricks SQL

Visualização

Esse recurso está em Public Preview.

Databricks recomenda o uso de tabelas transmitidas para ingerir o uso de dados Databricks SQL. Uma tabela transmitida é uma tabela gerenciada do Unity Catalog com suporte extra para transmissão ou processamento de dados incremental. Um pipeline DLT é criado automaticamente para cada tabela transmitida. Você pode usar tabelas transmitidas para carregamento de dados incrementais de Kafka e armazenamento de objetos cloud .

Este artigo demonstra o uso de tabelas transmitidas para carregar dados do armazenamento de objetos cloud configurado como um volume do Unity Catalog (recomendado) ou local externo.

Observação

Para saber como usar as tabelas Delta Lake como fontes e coletores de transmissão, consulte leituras e gravações de transmissão da tabela Delta.

Antes de começar

Antes de começar, certifique-se de ter o seguinte:

  • Um Databricks account com o serverless ativado. Para obter mais informações, consulte Ativar o SQL warehouse sem servidor.

  • Um workspace com o Unity Catalog habilitado. Para obter mais informações, consulte Configurar e gerenciar Unity Catalog.

  • Um SQL warehouse que usa o canal Current.

  • Para consultar tabelas de transmissão criadas por um Delta Live Tables pipeline, o senhor deve usar um compute compartilhado usando Databricks Runtime 13.3 LTS e acima ou um SQL warehouse. As tabelas de transmissão criadas em um Unity Catalog habilitado pipeline não podem ser consultadas a partir de um clusters atribuído ou sem isolamento.

  • O privilégio READ FILES em um local externo do Unity Catalog. Para obter informações, consulte Criar um local externo para conectar o armazenamento em nuvem ao Databricks.

  • O privilégio USE CATALOG no catálogo no qual você cria a tabela de transmissão.

  • O privilégio USE SCHEMA no esquema no qual você cria a tabela de transmissão.

  • O privilégio CREATE TABLE no esquema no qual você cria a tabela de transmissão.

  • O caminho para seus dados de origem.

    Exemplo de caminho de volume: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Exemplo de caminho de local externo: s3://myBucket/analysis

    Observação

    Este artigo pressupõe que os dados que você deseja carregar estão em um local de armazenamento cloud que corresponde a um volume do Unity Catalog ou local externo ao qual você tem acesso.

Descubra e visualize os dados de origem

  1. Na barra lateral do seu workspace, clique em query e, em seguida, clique em Criar query.

  2. No editor query , selecione um SQL warehouse que usa o canal Current na lista suspensa.

  3. Cole o seguinte no editor, substituindo os valores entre colchetes angulares (<>) pela informação que identifica seus dados de origem e, em seguida, clique em execução.

    Observação

    Você pode encontrar erros de inferência de esquema ao executar a função com valor de tabela read_files se o default para a função não puder analisar seus dados. Por exemplo, pode ser necessário configurar o modo de várias linhas para arquivos CSV ou JSON de várias linhas. Para obter uma lista de opções do analisador, consulte função com valor de tabela read_files.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "s3://<bucket>/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("s3://<bucket>/<path>/<folder>") LIMIT 10
    

Carregar dados em uma tabela transmitida

Para criar uma tabela transmitida a partir de dados no armazenamento de objetos cloud , cole o seguinte no editor query e clique em execução:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('s3://<bucket>/<path>/<folder>')

Atualize uma tabela transmitida usando um pipeline DLT

Esta seção descreve os padrões para atualizar uma tabela de transmissão com os últimos dados disponíveis das fontes definidas na query.

CREATE as operações para tabelas transmitidas usam um Databricks SQL warehouse para a criação inicial e carregamento de dados na tabela transmitida. As operações REFRESH para mesas transmitidas usam Delta Live Tables (DLT). Um pipeline DLT é criado automaticamente para cada tabela transmitida. Quando uma tabela transmitida é refresh , uma atualização no pipeline DLT é iniciada para processar a refresh.

Após a execução do comando REFRESH , o link do pipeline DLT é retornado. Você pode usar o link do pipeline DLT para verificar o status da refresh.

Observação

Somente o dono da mesa pode refresh uma mesa transmitida para obter os dados mais recentes. O usuário que cria a tabela é o proprietário e o proprietário não pode ser alterado.

Veja O que é Delta Live Tables?.

Ingerir apenas novos dados

Por default, a função read_files lê todos os dados existentes no diretório de origem durante a criação da tabela e processa os registros recém-chegados a cada refresh.

Para evitar a ingestão de dados que já existam no diretório de origem no momento da criação da tabela, defina a opção includeExistingFiles como false. Isso significa que somente os dados que chegam ao diretório após a criação da tabela são processados. Por exemplo:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files('s3://mybucket/analysis/*/*/*.json', includeExistingFiles => false)

Atualize totalmente uma mesa transmitida

refresh completa reprocessa todos os dados disponíveis na origem com a definição mais recente. Não é recomendável chamar full refresh em fontes que não guardam toda a história dos dados ou possuem curtos períodos de retenção, como Kafka, pois o full refresh trunca os dados existentes. Talvez você não consiga recuperar dados antigos se eles não estiverem mais disponíveis na origem.

Por exemplo:

REFRESH STREAMING TABLE my_bronze_table FULL

programar uma mesa de transmissão para atualização automática

Para configurar uma tabela transmitida para refresh automaticamente com base em um programar definido, cole o seguinte no editor query e clique em execução:

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Por exemplo, query programar refresh, consulte ALTER transmissão TABLE.

Rastrear o status de uma atualização

Você pode view o status de refresh de uma tabela de transmissão viewo pipeline que gerencia a tabela de transmissão na IU do Delta Live Tables ou viewa informaçãorefresh retornada pelo comando DESCRIBE EXTENDED para a tabela de transmissão.

DESCRIBE EXTENDED <table-name>

ingestão de transmissão de Kafka

Para obter um exemplo de ingestão transmitida de Kafka, consulte read_kafka.

Conceder aos usuários acesso a uma mesa de transmissão

Para conceder aos usuários o privilégio SELECT na tabela de transmissão para que eles possam query -la, cole o seguinte no editor query e clique em execução:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Para obter mais informações sobre a concessão de privilégios em objetos protegíveis Unity Catalog , consulte Privilégios e objetos protegíveisUnity Catalog .