Tutorial: COPY INTO com Spark SQL
Databricks recomenda que você use o comando COPY INTO para carregamento de dados incremental e em massa para fonte de dados que contém milhares de arquivos. Databricks recomenda que você use o Auto Loader para casos de uso avançados.
Neste tutorial, você usa o comando COPY INTO
para carregar dados do armazenamento de objetos cloud em uma tabela em seu workspace do Databricks.
Requisitos
Uma account Databricks e um workspace Databricks em sua account. Para criá-los, consulte Get começar: configuração account e workspace .
Um produto para todos os fins cluster em seu workspace executando Databricks Runtime 11.3 LTS ou acima. Para criar um clusters todo-propósito, consulte compute configuration reference.
Familiaridade com a interface do usuário workspace Databricks. Consulte Navegar na área de trabalho.
Familiaridade trabalhando com Databricks Notebook.
Um local onde você pode gravar dados; esta demonstração usa a DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com o Unity Catalog.
passo 1. Configure seu ambiente e crie um gerador de dados
Este tutorial pressupõe familiaridade básica com Databricks e uma configuração workspace default . Se você não conseguir executar o código fornecido, entre em contato com o administrador workspace para garantir que você tenha acesso aos recursos compute e um local no qual possa gravar dados.
Observe que o código fornecido usa um parâmetro source
para especificar o local que você configurará como sua fonte de dados COPY INTO
. Conforme escrito, esse código aponta para um local na DBFS root. Se você tiver permissões de gravação em um local de armazenamento de objeto externo, substitua a parte dbfs:/
das strings de origem pelo caminho para seu armazenamento de objeto. Como esse bloco de código também faz uma exclusão recursiva para Reset esta demonstração, certifique-se de não apontar isso para dados de produção e de manter o diretório aninhado /user/{username}/copy-into-demo
para evitar sobrescrever ou excluir dados existentes.
Crie um novo SQL Notebooke anexe-o a um cluster executando Databricks Runtime 11.3 LTS ou acima.
Copie e execute o seguinte código para Reset o local de armazenamento e o banco de dados usados neste tutorial:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copie e execute o seguinte código para configurar algumas tabelas e funções que serão utilizadas para gerar dados aleatoriamente:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
passo 2: gravar os dados de amostra no armazenamento em nuvem
Escrever em formatos de dados diferentes de Delta Lake é raro em Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.
Copie e execute o seguinte código para gravar lotes de dados JSON brutos:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
passo 3: Use COPY INTO para carregar dados JSON idempotentemente
O senhor deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO
. Em Databricks Runtime 11.3 LTS e acima, o senhor não precisa fornecer nada além de um nome de tabela na declaração CREATE TABLE
. Nas versões anteriores do Databricks Runtime, o senhor deve fornecer um esquema ao criar uma tabela vazia.
Copie e execute o seguinte código para criar sua tabela Delta de destino e carregar dados de sua fonte:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Como esta ação é idempotente, você pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.
passo 4: Visualize o conteúdo da sua tabela
Você pode executar uma query SQL simples para revisar manualmente o conteúdo desta tabela.
Copie e execute o seguinte código para visualizar sua tabela:
-- Review updated table SELECT * FROM user_ping_target
passo 5: Carregar mais dados e visualizar resultados
Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO
e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO
várias vezes sem a chegada de novos dados.
passo 6: Tutorial de limpeza
Quando terminar este tutorial, você poderá limpar os recursos associados se não quiser mais mantê-los.
Copie e execute o seguinte código para descartar o banco de dados, tabelas e remover todos os dados:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Para interromper seu recurso compute , vá para a tab clusterss e finalize seus clusters.
Recursos adicionais
Os artigos de referência COPY INTO