Tutorial: COPY INTO com Spark SQL

Databricks recomenda que você use o comando COPY INTO para carregamento de dados incremental e em massa para fonte de dados que contém milhares de arquivos. Databricks recomenda que você use o Auto Loader para casos de uso avançados.

Neste tutorial, você usa o comando COPY INTO para carregar dados do armazenamento de objetos cloud em uma tabela em seu workspace do Databricks.

Requisitos

  1. Uma account Databricks e um workspace Databricks em sua account. Para criá-los, consulte Get começar: configuração account e workspace .

  2. Um produto para todos os fins cluster em seu workspace executando Databricks Runtime 11.3 LTS ou acima. Para criar um clusters todo-propósito, consulte compute configuration reference.

  3. Familiaridade com a interface do usuário workspace Databricks. Consulte Navegar na área de trabalho.

  4. Familiaridade trabalhando com Databricks Notebook.

  5. Um local onde você pode gravar dados; esta demonstração usa a DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com o Unity Catalog.

passo 1. Configure seu ambiente e crie um gerador de dados

Este tutorial pressupõe familiaridade básica com Databricks e uma configuração workspace default . Se você não conseguir executar o código fornecido, entre em contato com o administrador workspace para garantir que você tenha acesso aos recursos compute e um local no qual possa gravar dados.

Observe que o código fornecido usa um parâmetro source para especificar o local que você configurará como sua fonte de dados COPY INTO . Conforme escrito, esse código aponta para um local na DBFS root. Se você tiver permissões de gravação em um local de armazenamento de objeto externo, substitua a parte dbfs:/ das strings de origem pelo caminho para seu armazenamento de objeto. Como esse bloco de código também faz uma exclusão recursiva para Reset esta demonstração, certifique-se de não apontar isso para dados de produção e de manter o diretório aninhado /user/{username}/copy-into-demo para evitar sobrescrever ou excluir dados existentes.

  1. Crie um novo SQL Notebooke anexe-o a um cluster executando Databricks Runtime 11.3 LTS ou acima.

  2. Copie e execute o seguinte código para Reset o local de armazenamento e o banco de dados usados neste tutorial:

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Copie e execute o seguinte código para configurar algumas tabelas e funções que serão utilizadas para gerar dados aleatoriamente:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

passo 2: gravar os dados de amostra no armazenamento em nuvem

Escrever em formatos de dados diferentes de Delta Lake é raro em Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.

  1. Copie e execute o seguinte código para gravar lotes de dados JSON brutos:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

passo 3: Use COPY INTO para carregar dados JSON idempotentemente

O senhor deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO. Em Databricks Runtime 11.3 LTS e acima, o senhor não precisa fornecer nada além de um nome de tabela na declaração CREATE TABLE. Nas versões anteriores do Databricks Runtime, o senhor deve fornecer um esquema ao criar uma tabela vazia.

  1. Copie e execute o seguinte código para criar sua tabela Delta de destino e carregar dados de sua fonte:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Como esta ação é idempotente, você pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.

passo 4: Visualize o conteúdo da sua tabela

Você pode executar uma query SQL simples para revisar manualmente o conteúdo desta tabela.

  1. Copie e execute o seguinte código para visualizar sua tabela:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

passo 5: Carregar mais dados e visualizar resultados

Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO várias vezes sem a chegada de novos dados.

passo 6: Tutorial de limpeza

Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los.

  1. Copie e execute o seguinte código para descartar o banco de dados, tabelas e remover todos os dados:

    %python
    # Drop database and tables and remove data
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    dbutils.fs.rm(source, True)
    
  2. Para interromper seu recurso compute , vá para a tab clusterss e finalize seus clusters.

Recursos adicionais