Pular para o conteúdo principal

tutorial: COPY INTO com Spark SQL

Databricks recomenda que o senhor use o comando COPY INTO para carregamento de dados em massa e incremental para fontes de dados que contêm milhares de arquivos. A Databricks recomenda que o senhor use o Auto Loader para casos de uso avançado.

Neste tutorial, o senhor usa o comando COPY INTO para carregar dados do armazenamento de objetos na nuvem para uma tabela em seu Databricks workspace.

Requisitos

Etapa 1. Configure seu ambiente e crie um gerador de dados

Este tutorial pressupõe uma familiaridade básica com o Databricks e uma configuração do default workspace . Se não conseguir executar o código fornecido, entre em contato com o administrador do site workspace para certificar-se de que tem acesso ao recurso compute e a um local onde possa gravar dados.

Observe que o código fornecido usa um parâmetro source para especificar o local que você configurará como sua COPY INTO fonte de dados. Conforme escrito, esse código aponta para um local em DBFS root. Se o senhor tiver permissões de gravação em um local de armazenamento de objetos externo, substitua a parte dbfs:/ das cadeias de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de não apontar isso para os dados de produção e de manter o diretório /user/{username}/copy-into-demo aninhado para evitar sobrescrever ou excluir dados existentes.

  1. Crie um novo Notebook e associe-o a um recurso compute.

  2. Copie e execute o código a seguir para redefinir o local de armazenamento e o banco de dados usados neste site tutorial:

    Python
    %python
    # Set parameters for isolation in workspace and reset demo

    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"

    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")

    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")

    dbutils.fs.rm(source, True)
  3. Copie e execute o código a seguir para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:

    SQL
    -- Configure random data generator

    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};

    CREATE TABLE user_ids (user_id STRING);

    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");

    CREATE FUNCTION get_ping()
    RETURNS INT
    RETURN int(rand() * 250);

    CREATE FUNCTION is_active()
    RETURNS BOOLEAN
    RETURN CASE
    WHEN rand() > .25 THEN true
    ELSE false
    END;

Etapa 2: gravar os dados de amostra no armazenamento em nuvem

A gravação em formatos de dados diferentes do Delta Lake é rara no Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.

  1. Copie e execute o código a seguir para gravar um lote de dados brutos em JSON:

    SQL
    -- Write a new batch of data to the data source

    INSERT INTO user_ping_raw
    SELECT *,
    get_ping() ping,
    current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;

Etapa 3: Use COPY INTO para carregar dados JSON de forma independente

Você deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO. Você não precisa fornecer nada além do nome da tabela em sua declaração CREATE TABLE .

  1. Copie e execute o código a seguir para criar a tabela de destino Delta e carregar os dados da fonte:

    SQL
    -- Create target table and load data

    CREATE TABLE IF NOT EXISTS user_ping_target;

    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")

Como essa ação é idempotente, o senhor pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.

Etapa 4: visualize o conteúdo da sua tabela

O senhor pode executar uma consulta simples em SQL para revisar manualmente o conteúdo dessa tabela.

  1. Copie e execute o código a seguir para visualizar sua tabela:

    SQL
    -- Review updated table

    SELECT * FROM user_ping_target

Etapa 5: carregar mais dados e visualizar os resultados

Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO várias vezes sem a chegada de novos dados.

Etapa 6: Limpeza tutorial

Quando terminar de usar o site tutorial, o senhor poderá limpar o recurso associado se não quiser mais mantê-lo.

Copie e execute o código a seguir para eliminar o banco de dados, as tabelas e remover todos os dados:

Python
%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Recurso adicional