Pular para o conteúdo principal

tutorial: COPY INTO com Spark SQL

Databricks recomenda que o senhor use o comando COPY INTO para carregamento de dados em massa e incremental para fontes de dados que contêm milhares de arquivos. A Databricks recomenda que o senhor use o Auto Loader para casos de uso avançado.

Neste tutorial, o senhor usa o comando COPY INTO para carregar dados do armazenamento de objetos na nuvem para uma tabela em seu Databricks workspace.

Requisitos

  1. Um Databricks account, e um Databricks workspace em seu account. Para criá-los, consulte Get começar with Databricks.
  2. Um clustering para todos os fins em seu workspace executando Databricks Runtime 11.3 LTS ou acima. Para criar um todo-propósito de clusters, consulte a referência de configuração de computação.
  3. Familiaridade com a interface de usuário Databricks workspace . Consulte Navegar no site workspace.
  4. Familiaridade com o site Databricks Notebook.
  5. Um local no qual o senhor pode gravar dados; esta demonstração usa o DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com Unity Catalog.

Etapa 1. Configure seu ambiente e crie um gerador de dados

Este tutorial pressupõe uma familiaridade básica com o Databricks e uma configuração do default workspace . Se não conseguir executar o código fornecido, entre em contato com o administrador do site workspace para certificar-se de que tem acesso ao recurso compute e a um local onde possa gravar dados.

Observe que o código fornecido usa um parâmetro source para especificar o local que você configurará como sua COPY INTO fonte de dados. Conforme escrito, esse código aponta para um local em DBFS root. Se o senhor tiver permissões de gravação em um local de armazenamento de objetos externo, substitua a parte dbfs:/ das cadeias de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de não apontar isso para os dados de produção e de manter o diretório /user/{username}/copy-into-demo aninhado para evitar sobrescrever ou excluir dados existentes.

  1. Crie um novo Notebook SQL e anexe-o a um clustering que esteja executando Databricks Runtime 11.3 LTS ou acima.

  2. Copie e execute o código a seguir para redefinir o local de armazenamento e o banco de dados usados neste site tutorial:

    Python
    %python
    # Set parameters for isolation in workspace and reset demo

    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"

    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")

    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")

    dbutils.fs.rm(source, True)
  3. Copie e execute o código a seguir para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:

    SQL
    -- Configure random data generator

    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};

    CREATE TABLE user_ids (user_id STRING);

    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");

    CREATE FUNCTION get_ping()
    RETURNS INT
    RETURN int(rand() * 250);

    CREATE FUNCTION is_active()
    RETURNS BOOLEAN
    RETURN CASE
    WHEN rand() > .25 THEN true
    ELSE false
    END;

Etapa 2: gravar os dados de amostra no armazenamento em nuvem

A gravação em formatos de dados diferentes do Delta Lake é rara no Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.

  1. Copie e execute o código a seguir para gravar um lote de dados brutos em JSON:

    SQL
    -- Write a new batch of data to the data source

    INSERT INTO user_ping_raw
    SELECT *,
    get_ping() ping,
    current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;

Etapa 3: Use COPY INTO para carregar dados JSON de forma independente

O senhor deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO. Em Databricks Runtime 11.3 LTS e acima, o senhor não precisa fornecer nada além de um nome de tabela na declaração CREATE TABLE. Nas versões anteriores do Databricks Runtime, o senhor deve fornecer um esquema ao criar uma tabela vazia.

  1. Copie e execute o código a seguir para criar a tabela de destino Delta e carregar os dados da fonte:

    SQL
    -- Create target table and load data

    CREATE TABLE IF NOT EXISTS user_ping_target;

    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")

Como essa ação é idempotente, o senhor pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.

Etapa 4: visualize o conteúdo da sua tabela

O senhor pode executar uma consulta simples em SQL para revisar manualmente o conteúdo dessa tabela.

  1. Copie e execute o código a seguir para visualizar sua tabela:

    SQL
    -- Review updated table

    SELECT * FROM user_ping_target

Etapa 5: carregar mais dados e visualizar os resultados

Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO várias vezes sem a chegada de novos dados.

Etapa 6: Limpeza tutorial

Quando terminar de usar o site tutorial, o senhor poderá limpar o recurso associado se não quiser mais mantê-lo.

  1. Copie e execute o código a seguir para eliminar o banco de dados, as tabelas e remover todos os dados:

    Python
    %python
    # Drop database and tables and remove data

    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
  2. Para interromper o recurso compute, acesse o site de clustering tab e encerre o clustering.

Recurso adicional