Tutorial: COPY INTO com Spark SQL
Databricks recomenda que o senhor use o comando COPY INTO para carregamento de dados incremental e em massa para fontes de dados que contêm milhares de arquivos. A Databricks recomenda que o senhor use o Auto Loader para casos de uso avançados.
Neste tutorial, o senhor usa o comando COPY INTO
para carregar dados do armazenamento de objetos cloud em uma tabela em seu Databricks workspace.
Requisitos
Um Databricks account, e um Databricks workspace em seu account. Para criá-los, consulte Get começar with Databricks.
Um produto para todos os fins cluster em seu workspace executando Databricks Runtime 11.3 LTS ou acima. Para criar um clusters todo-propósito, consulte a referência de configuração de computação.
Familiaridade com a interface de usuário Databricks workspace . Consulte Navegar no espaço de trabalho.
Familiaridade com o site Databricks Notebook.
Um local no qual o senhor pode gravar dados; esta demonstração usa o DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com Unity Catalog.
passo 1. Configure seu ambiente e crie um gerador de dados
Este tutorial pressupõe uma familiaridade básica com o Databricks e uma configuração do default workspace . Se não conseguir executar o código fornecido, entre em contato com o administrador do site workspace para certificar-se de que tem acesso ao recurso compute e a um local onde possa gravar dados.
Observe que o código fornecido usa um parâmetro source
para especificar o local que o senhor configurará como sua COPY INTO
fonte de dados. Conforme escrito, esse código aponta para um local em DBFS root. Se o senhor tiver permissões de gravação em um local de armazenamento de objetos externo, substitua a parte dbfs:/
das cadeias de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de não apontar isso para os dados de produção e de manter o diretório /user/{username}/copy-into-demo
aninhado para evitar sobrescrever ou excluir dados existentes.
Crie um novo Notebook SQL e anexe-o a um cluster que esteja executando o Databricks Runtime 11.3 LTS ou acima.
Copie e execute o código a seguir para redefinir o local de armazenamento e o banco de dados usados neste site tutorial:
%python # Set parameters for isolation in workspace and reset demo username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0] database = f"copyinto_{username}_db" source = f"dbfs:/user/{username}/copy-into-demo" spark.sql(f"SET c.username='{username}'") spark.sql(f"SET c.database={database}") spark.sql(f"SET c.source='{source}'") spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") spark.sql("CREATE DATABASE ${c.database}") spark.sql("USE ${c.database}") dbutils.fs.rm(source, True)
Copie e execute o código a seguir para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:
-- Configure random data generator CREATE TABLE user_ping_raw (user_id STRING, ping INTEGER, time TIMESTAMP) USING json LOCATION ${c.source}; CREATE TABLE user_ids (user_id STRING); INSERT INTO user_ids VALUES ("potato_luver"), ("beanbag_lyfe"), ("default_username"), ("the_king"), ("n00b"), ("frodo"), ("data_the_kid"), ("el_matador"), ("the_wiz"); CREATE FUNCTION get_ping() RETURNS INT RETURN int(rand() * 250); CREATE FUNCTION is_active() RETURNS BOOLEAN RETURN CASE WHEN rand() > .25 THEN true ELSE false END;
passo 2: gravar os dados de amostra no armazenamento em nuvem
A gravação em formatos de dados diferentes do Delta Lake é rara no Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.
Copie e execute o código a seguir para gravar um lote de dados brutos em JSON:
-- Write a new batch of data to the data source INSERT INTO user_ping_raw SELECT *, get_ping() ping, current_timestamp() time FROM user_ids WHERE is_active()=true;
passo 3: Use COPY INTO para carregar dados JSON idempotentemente
O senhor deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO
. Em Databricks Runtime 11.3 LTS e acima, o senhor não precisa fornecer nada além de um nome de tabela na declaração CREATE TABLE
. Nas versões anteriores do Databricks Runtime, o senhor deve fornecer um esquema ao criar uma tabela vazia.
Copie e execute o código a seguir para criar a tabela de destino Delta e carregar os dados da fonte:
-- Create target table and load data CREATE TABLE IF NOT EXISTS user_ping_target; COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ("mergeSchema" = "true") COPY_OPTIONS ("mergeSchema" = "true")
Como essa ação é idempotente, o senhor pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.
passo 4: Visualize o conteúdo da sua tabela
O senhor pode executar uma consulta simples em SQL para revisar manualmente o conteúdo dessa tabela.
Copie e execute o código a seguir para visualizar sua tabela:
-- Review updated table SELECT * FROM user_ping_target
passo 5: Carregar mais dados e visualizar resultados
Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO
e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO
várias vezes sem a chegada de novos dados.
passo 6: Tutorial de limpeza
Quando terminar de usar o site tutorial, o senhor poderá limpar o recurso associado se não quiser mais mantê-lo.
Copie e execute o código a seguir para eliminar o banco de dados, as tabelas e remover todos os dados:
%python # Drop database and tables and remove data spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE") dbutils.fs.rm(source, True)
Para interromper seu recurso compute, acesse os clusters tab e encerre seu recurso cluster.
Outros recursos
Os COPY INTO artigos de referência