O senhor começa a usar o site COPY INTO para carregar dados
Você pode usar o comando SQL COPY INTO para carregar dados de um local de arquivo em uma tabela Delta. COPY INTO é repetível e idempotente — os arquivos no local de origem que já foram carregados são ignorados na execução subsequente.
COPY INTO Oferece estas funcionalidades:
- Filtros de arquivos ou pastas facilmente configuráveis a partir de armazenamento cloud , incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog .
- Suporte para múltiplos formatos de arquivo de origem: CSV, JSON, XML, Avro, ORC, Parquet, texto e arquivos binários.
- Processamento de arquivos exatamente uma vez (idempotente) por default.
- Inferência, mapeamento, fusão e evolução do esquema da tabela de destino.
COPY INTO Respeita a configuração workspace para vetores de exclusão. Se ativado, os vetores de exclusão são habilitados na tabela de destino quando COPY INTO é executado em um SQL warehouse ou compute executando Databricks Runtime 14.0 ou superior. Após a ativação dos vetores de exclusão, eles bloqueiam as consultas a uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Consulte Vetores de exclusão no Databricks e Ativar vetores de exclusão automaticamente.
Antes de começar
Um administrador account deve configurar o acesso aos dados no armazenamento de objetos cloud antes que os usuários possam carregar uso de dados COPY INTO.
Carregar dados em uma tabela Delta Lake sem esquema.
No Databricks Runtime 11.3 LTS e superior, você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido durante um comando COPY INTO definindo mergeSchema para true em COPY_OPTIONS. O exemplo a seguir utiliza o conjunto de dados Wanderbricks . Substitua <catalog>, <schema> e <volume> por um catálogo, esquema e volume onde você tem permissões CREATE TABLE .
- SQL
- Python
- R
- Scala
CREATE TABLE IF NOT EXISTS <catalog>.<schema>.booking_updates_schemaless;
COPY INTO <catalog>.<schema>.booking_updates_schemaless
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
table_name = '<catalog>.<schema>.booking_updates_schemaless'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" + \
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_schemaless"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("CREATE TABLE IF NOT EXISTS ", table_name, sep = ""))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')",
" COPY_OPTIONS ('mergeSchema' = 'true')",
sep = ""
))
val table_name = "<catalog>.<schema>.booking_updates_schemaless"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("CREATE TABLE IF NOT EXISTS " + table_name)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('mergeSchema' = 'true', 'multiLine' = 'true')" +
" COPY_OPTIONS ('mergeSchema' = 'true')"
)
Esta instrução SQL é idempotente. Isso significa que você pode programá-lo para ser executado repetidamente, e ele carregará apenas os novos dados na sua tabela Delta .
A tabela Delta vazia não é utilizável fora de COPY INTO. INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Após os dados serem inseridos na tabela com COPY INTO, a tabela torna-se consultável.
Consulte Criar tabelas de destino para COPY INTO.
Defina o esquema e carregue os dados em uma tabela do Delta Lake.
O exemplo a seguir cria uma tabela Delta e usa o comando SQL COPY INTO para carregar dados de amostra do conjunto de dados Wanderbricks na tabela. Os arquivos de origem são arquivos JSON armazenados em um volume Unity Catalog . Você pode executar o código de exemplo Python, R, Scala ou SQL a partir de um Notebook conectado a um cluster Databricks . Você também pode executar o código SQL a partir de uma consulta associada a um SQL warehouse no Databricks SQL. Substitua <catalog>, <schema> e <volume> por um catálogo, esquema e volume onde você tenha permissões CREATE TABLE .
- SQL
- Python
- R
- Scala
DROP TABLE IF EXISTS <catalog>.<schema>.booking_updates_upload;
CREATE TABLE <catalog>.<schema>.booking_updates_upload (
booking_id BIGINT,
user_id BIGINT,
status STRING,
total_amount DOUBLE
);
COPY INTO <catalog>.<schema>.booking_updates_upload
FROM '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true');
SELECT * FROM <catalog>.<schema>.booking_updates_upload;
table_name = '<catalog>.<schema>.booking_updates_upload'
source_data = '/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates'
source_format = 'JSON'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"booking_id BIGINT, " + \
"user_id BIGINT, " + \
"status STRING, " + \
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format + \
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
booking_updates_upload_data = spark.sql("SELECT * FROM " + table_name)
display(booking_updates_upload_data)
library(SparkR)
sparkR.session()
table_name = "<catalog>.<schema>.booking_updates_upload"
source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
source_format = "JSON"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"booking_id BIGINT, ",
"user_id BIGINT, ",
"status STRING, ",
"total_amount DOUBLE)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
" FORMAT_OPTIONS ('multiLine' = 'true')",
sep = ""
))
booking_updates_upload_data = tableToDF(table_name)
display(booking_updates_upload_data)
val table_name = "<catalog>.<schema>.booking_updates_upload"
val source_data = "/Volumes/<catalog>/<schema>/<volume>/wanderbricks/booking_updates"
val source_format = "JSON"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"booking_id BIGINT, " +
"user_id BIGINT, " +
"status STRING, " +
"total_amount DOUBLE)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format +
" FORMAT_OPTIONS ('multiLine' = 'true')"
)
val booking_updates_upload_data = spark.table(table_name)
display(booking_updates_upload_data)
Para limpar, execute o seguinte código para excluir a tabela de exemplo.
- SQL
- Python
- R
- Scala
DROP TABLE <catalog>.<schema>.booking_updates_upload
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
Limpe arquivos de metadados
O senhor pode executar vacuum para limpar os arquivos de metadados não referenciados criados por COPY INTO em Databricks Runtime 15.2 e acima.