tutorial: COPY INTO com Spark SQL
Databricks recomenda que você use o comando COPY INTO para carregamento incremental e em massa de dados para fontes de dados que contenham milhares de arquivos.
Neste tutorial, você usará o comando COPY INTO para carregar dados JSON de um volume Unity Catalog em uma tabela Delta em seu workspace Databricks . Você utiliza o dataset de exemplo Wanderbricks como fonte de dados. Para casos de uso de ingestão mais avançados, consulte O que é o Auto Loader?.
Requisitos
- Acesso a um recurso compute . Veja calcular.
- Um workspace habilitado para o Catálogo Unity com permissões para criar esquemas e volumes em um catálogo. Consulte Conectar-se ao armazenamento de objetos cloud usando Unity Catalog.
o passo 1: Configure seu ambiente
O código neste tutorial usa um volume Unity Catalog para armazenar arquivos de origem JSON . Substitua <catalog> por um catálogo onde você tenha permissões CREATE SCHEMA e CREATE VOLUME . Se não conseguir executar o código, entre em contato com o administrador do seu workspace .
Crie um Notebook e associe-o a um recurso compute . Em seguida, execute o seguinte código para configurar um esquema e um volume para este tutorial.
- Python
- SQL
# Set parameters and reset demo environment
catalog = "<catalog>"
username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"
spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")
-- Reset demo environment
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;
CREATE SCHEMA <catalog>.copy_into_tutorial;
CREATE VOLUME <catalog>.copy_into_tutorial.copy_into_source;
Passo 2: Grave os dados de exemplo no volume como JSON
O comando COPY INTO carrega dados de fontes baseadas em arquivos. Leia da tabela de exemplo Wanderbricks bookings e grave muitos registros como arquivos JSON em seu volume, simulando dados que chegam de um sistema externo.
- Python
- SQL
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")
A gravação de arquivos em um volume requer Python. Em um fluxo de trabalho real, esses dados viriam de um sistema externo.
%python
# Write a batch of Wanderbricks bookings data as JSON to the volume
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Etapa 3: Use COPY INTO para carregar dados JSON de forma independente
Crie uma tabela Delta de destino antes de usar COPY INTO. Você não precisa fornecer nada além do nome da tabela em sua declaração CREATE TABLE . Como essa ação é idempotente, Databricks carrega os dados apenas uma vez, mesmo que você execute o código várias vezes.
- Python
- SQL
# Create target table and load data
spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")
spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")
-- Create target table and load data
CREATE TABLE IF NOT EXISTS <catalog>.copy_into_tutorial.bookings_target;
COPY INTO <catalog>.copy_into_tutorial.bookings_target
FROM '/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
Etapa 4: visualize o conteúdo da sua tabela
Verifique se a tabela contém 20 linhas dos primeiros lotes de dados de reservas do Wanderbricks e se o esquema foi inferido corretamente a partir dos arquivos JSON de origem.
- Python
- SQL
# Review loaded data
display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))
-- Review loaded data
SELECT * FROM <catalog>.copy_into_tutorial.bookings_target
Etapa 5: carregar mais dados e visualizar os resultados
Você pode simular dados adicionais vindos de um sistema externo escrevendo outro lote de registros e executando COPY INTO novamente. Execute o seguinte código para escrever um segundo lote de dados.
- Python
- SQL
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")
A gravação de arquivos em um volume requer Python. Em um fluxo de trabalho real, esses dados viriam de um sistema externo.
%python
# Write another batch of Wanderbricks bookings data as JSON
bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json("/Volumes/<catalog>/copy_into_tutorial/copy_into_source/bookings")
Em seguida, execute o comando COPY INTO da etapa 3 novamente e visualize a tabela para confirmar os novos registros. Somente os arquivos novos são carregados.
- Python
- SQL
# Confirm new data was loaded
display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))
-- Confirm new data was loaded
SELECT COUNT(*) AS total_rows FROM <catalog>.copy_into_tutorial.bookings_target
Etapa 6: Limpeza tutorial
Ao concluir este tutorial, você pode limpar os recursos associados, caso não queira mais mantê-los. Elimine o esquema, as tabelas e o volume, e remova todos os dados.
- Python
- SQL
# Drop schema and all associated objects
spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
-- Drop schema and all associated objects
DROP SCHEMA IF EXISTS <catalog>.copy_into_tutorial CASCADE;