tutorial: COPY INTO com Spark SQL
Databricks recomenda que o senhor use o comando COPY INTO para carregamento de dados em massa e incremental para fontes de dados que contêm milhares de arquivos. A Databricks recomenda que o senhor use o Auto Loader para casos de uso avançado.
Neste tutorial, o senhor usa o comando COPY INTO
para carregar dados do armazenamento de objetos na nuvem para uma tabela em seu Databricks workspace.
Requisitos
- Um Databricks account, e um Databricks workspace em seu account. Para criá-los, consulte Get começar with Databricks.
- Um clustering para todos os fins em seu workspace executando Databricks Runtime 11.3 LTS ou acima. Para criar um todo-propósito de clusters, consulte a referência de configuração de computação.
- Familiaridade com a interface de usuário Databricks workspace . Consulte Navegar no site workspace.
- Familiaridade com o site Databricks Notebook.
- Um local no qual o senhor pode gravar dados; esta demonstração usa o DBFS root como exemplo, mas o Databricks recomenda um local de armazenamento externo configurado com Unity Catalog.
Etapa 1. Configure seu ambiente e crie um gerador de dados
Este tutorial pressupõe uma familiaridade básica com o Databricks e uma configuração do default workspace . Se não conseguir executar o código fornecido, entre em contato com o administrador do site workspace para certificar-se de que tem acesso ao recurso compute e a um local onde possa gravar dados.
Observe que o código fornecido usa um parâmetro source
para especificar o local que você configurará como sua COPY INTO
fonte de dados. Conforme escrito, esse código aponta para um local em DBFS root. Se o senhor tiver permissões de gravação em um local de armazenamento de objetos externo, substitua a parte dbfs:/
das cadeias de caracteres de origem pelo caminho para o armazenamento de objetos. Como esse bloco de código também faz uma exclusão recursiva para redefinir essa demonstração, certifique-se de não apontar isso para os dados de produção e de manter o diretório /user/{username}/copy-into-demo
aninhado para evitar sobrescrever ou excluir dados existentes.
-
Crie um novo Notebook SQL e anexe-o a um clustering que esteja executando Databricks Runtime 11.3 LTS ou acima.
-
Copie e execute o código a seguir para redefinir o local de armazenamento e o banco de dados usados neste site tutorial:
Python%python
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"copyinto_{username}_db"
source = f"dbfs:/user/{username}/copy-into-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
dbutils.fs.rm(source, True) -
Copie e execute o código a seguir para configurar algumas tabelas e funções que serão usadas para gerar dados aleatoriamente:
SQL-- Configure random data generator
CREATE TABLE user_ping_raw
(user_id STRING, ping INTEGER, time TIMESTAMP)
USING json
LOCATION ${c.source};
CREATE TABLE user_ids (user_id STRING);
INSERT INTO user_ids VALUES
("potato_luver"),
("beanbag_lyfe"),
("default_username"),
("the_king"),
("n00b"),
("frodo"),
("data_the_kid"),
("el_matador"),
("the_wiz");
CREATE FUNCTION get_ping()
RETURNS INT
RETURN int(rand() * 250);
CREATE FUNCTION is_active()
RETURNS BOOLEAN
RETURN CASE
WHEN rand() > .25 THEN true
ELSE false
END;
Etapa 2: gravar os dados de amostra no armazenamento em nuvem
A gravação em formatos de dados diferentes do Delta Lake é rara no Databricks. O código fornecido aqui grava em JSON, simulando um sistema externo que pode despejar resultados de outro sistema no armazenamento de objetos.
-
Copie e execute o código a seguir para gravar um lote de dados brutos em JSON:
SQL-- Write a new batch of data to the data source
INSERT INTO user_ping_raw
SELECT *,
get_ping() ping,
current_timestamp() time
FROM user_ids
WHERE is_active()=true;
Etapa 3: Use COPY INTO para carregar dados JSON de forma independente
O senhor deve criar uma tabela Delta Lake de destino antes de poder usar COPY INTO
. Em Databricks Runtime 11.3 LTS e acima, o senhor não precisa fornecer nada além de um nome de tabela na declaração CREATE TABLE
. Nas versões anteriores do Databricks Runtime, o senhor deve fornecer um esquema ao criar uma tabela vazia.
-
Copie e execute o código a seguir para criar a tabela de destino Delta e carregar os dados da fonte:
SQL-- Create target table and load data
CREATE TABLE IF NOT EXISTS user_ping_target;
COPY INTO user_ping_target
FROM ${c.source}
FILEFORMAT = JSON
FORMAT_OPTIONS ("mergeSchema" = "true")
COPY_OPTIONS ("mergeSchema" = "true")
Como essa ação é idempotente, o senhor pode executá-la várias vezes, mas os dados serão carregados apenas uma vez.
Etapa 4: visualize o conteúdo da sua tabela
O senhor pode executar uma consulta simples em SQL para revisar manualmente o conteúdo dessa tabela.
-
Copie e execute o código a seguir para visualizar sua tabela:
SQL-- Review updated table
SELECT * FROM user_ping_target
Etapa 5: carregar mais dados e visualizar os resultados
Você pode executar novamente as passos 2 a 4 várias vezes para obter novos lotes de dados JSON brutos aleatórios em sua origem, carregá-los idempotentemente no Delta Lake com COPY INTO
e visualizar os resultados. Tente executar essas passos fora de ordem ou várias vezes para simular vários lotes de dados brutos sendo gravados ou executando COPY INTO
várias vezes sem a chegada de novos dados.
Etapa 6: Limpeza tutorial
Quando terminar de usar o site tutorial, o senhor poderá limpar o recurso associado se não quiser mais mantê-lo.
-
Copie e execute o código a seguir para eliminar o banco de dados, as tabelas e remover todos os dados:
Python%python
# Drop database and tables and remove data
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True) -
Para interromper o recurso compute, acesse o site de clustering tab e encerre o clustering.
Recurso adicional
- Os COPY INTO artigos de referência