Pular para o conteúdo principal

tutorial: COPY INTO com Spark SQL

Databricks recomenda que você use o comando COPY INTO para carregamento incremental e em massa de dados para fontes de dados que contenham milhares de arquivos.

Neste tutorial, você usará o comando COPY INTO para carregar dados JSON de um volume Unity Catalog em uma tabela Delta em seu workspace Databricks . Você utiliza o dataset de exemplo Wanderbricks como fonte de dados. Para casos de uso de ingestão mais avançados, consulte O que é o Auto Loader?.

Requisitos

o passo 1: Configure seu ambiente

O código neste tutorial usa um volume Unity Catalog para armazenar arquivos de origem JSON . Substitua <catalog> por um catálogo onde você tenha permissões CREATE SCHEMA e CREATE VOLUME . Se não conseguir executar o código, entre em contato com o administrador do seu workspace .

Crie um Notebook e associe-o a um recurso compute . Em seguida, execute o seguinte código para configurar um esquema e um volume para este tutorial.

Python
# Set parameters and reset demo environment

catalog = "<catalog>"

username = spark.sql("SELECT regexp_replace(session_user(), '[^a-zA-Z0-9]', '_')").first()[0]
schema = f"copyinto_{username}_db"
volume = "copy_into_source"
source = f"/Volumes/{catalog}/{schema}/{volume}"

spark.sql(f"SET c.catalog={catalog}")
spark.sql(f"SET c.schema={schema}")
spark.sql(f"SET c.volume={volume}")

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")
spark.sql(f"CREATE SCHEMA {catalog}.{schema}")
spark.sql(f"CREATE VOLUME {catalog}.{schema}.{volume}")

Passo 2: Grave os dados de exemplo no volume como JSON

O comando COPY INTO carrega dados de fontes baseadas em arquivos. Leia da tabela de exemplo Wanderbricks bookings e grave muitos registros como arquivos JSON em seu volume, simulando dados que chegam de um sistema externo.

Python
# Write a batch of Wanderbricks bookings data as JSON to the volume

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_1 = bookings.orderBy("booking_id").limit(20)
batch_1.write.mode("append").json(f"{source}/bookings")

Etapa 3: Use COPY INTO para carregar dados JSON de forma independente

Crie uma tabela Delta de destino antes de usar COPY INTO. Você não precisa fornecer nada além do nome da tabela em sua declaração CREATE TABLE . Como essa ação é idempotente, Databricks carrega os dados apenas uma vez, mesmo que você execute o código várias vezes.

Python
# Create target table and load data

spark.sql(f"CREATE TABLE IF NOT EXISTS {catalog}.{schema}.bookings_target")

spark.sql(f"""
COPY INTO {catalog}.{schema}.bookings_target
FROM '/Volumes/{catalog}/{schema}/{volume}/bookings'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
""")

Etapa 4: visualize o conteúdo da sua tabela

Verifique se a tabela contém 20 linhas dos primeiros lotes de dados de reservas do Wanderbricks e se o esquema foi inferido corretamente a partir dos arquivos JSON de origem.

Python
# Review loaded data

display(spark.sql(f"SELECT * FROM {catalog}.{schema}.bookings_target"))

Etapa 5: carregar mais dados e visualizar os resultados

Você pode simular dados adicionais vindos de um sistema externo escrevendo outro lote de registros e executando COPY INTO novamente. Execute o seguinte código para escrever um segundo lote de dados.

Python
# Write another batch of Wanderbricks bookings data as JSON

bookings = spark.read.table("samples.wanderbricks.bookings")
batch_2 = bookings.orderBy(bookings.booking_id.desc()).limit(20)
batch_2.write.mode("append").json(f"{source}/bookings")

Em seguida, execute o comando COPY INTO da etapa 3 novamente e visualize a tabela para confirmar os novos registros. Somente os arquivos novos são carregados.

Python
# Confirm new data was loaded

display(spark.sql(f"SELECT COUNT(*) AS total_rows FROM {catalog}.{schema}.bookings_target"))

Etapa 6: Limpeza tutorial

Ao concluir este tutorial, você pode limpar os recursos associados, caso não queira mais mantê-los. Elimine o esquema, as tabelas e o volume, e remova todos os dados.

Python
# Drop schema and all associated objects

spark.sql(f"DROP SCHEMA IF EXISTS {catalog}.{schema} CASCADE")

Recurso adicional