Comece a usar COPY INTO para carregar dados
O comando SQL COPY INTO
permite carregar dados de um local de arquivo em uma tabela Delta. Esta é uma operação re-testável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO
oferece os seguintes recursos:
Filtros de arquivos ou diretórios facilmente configuráveis a partir de armazenamento clouds , incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.
Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários
Processamento de arquivo exatamente uma vez (idempotente) por default
Inferência, mapeamento, fusão e evolução do esquema da tabela de destino
Observação
Para uma experiência de ingestão de ficheiros mais escalável e robusta, a Databricks recomenda que os utilizadores SQL aproveitem as tabelas de transmissão. Consulte Carregar tabelas de uso de dados transmissão em Databricks SQL.
Aviso
COPY INTO
respeita a configuração workspace para vetores de exclusão. Se ativados, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO
forem executados em um SQL warehouse ou compute com Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam as consultas a uma tabela em Databricks Runtime 11.3 LTS e abaixo. Consulte O que são vetores de exclusão? e Auto-enable deletion vectors (Ativação automática de vetores de exclusão).
Requisitos
Um administrador account deve seguir as passos em Configurar acesso a dados para ingestão para configurar o acesso a dados no armazenamento de objetos cloud antes que os usuários possam carregar o uso de dados COPY INTO
.
Exemplo: carregar dados em uma tabela Delta Lake sem esquema
Observação
Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.
Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO
definindo mergeSchema
como true
em COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução SQL acima é idempotente e pode ser agendada para execução para ingerir dados exatamente uma vez em uma tabela Delta.
Observação
A tabela Delta vazia não pode ser usada fora de COPY INTO
. INSERT INTO
e MERGE INTO
não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO
, a tabela torna-se consultável.
Consulte Criar tabelas de destino para COPY INTO.
Exemplo: Definir esquema e carregar dados em uma tabela Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando SQL COPY INTO
para carregar dados de amostra do conjunto de dados do Databricks na tabela. O senhor pode executar o exemplo de código Python, R, Scala ou SQL a partir de um Notebook conectado a um cluster Databricks. O senhor também pode executar o código SQL de uma consulta associada a um SQL warehouse no Databricks SQL.
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o seguinte código, que exclui a tabela:
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
Referência
Databricks Runtime 7.xe acima: COPY INTO
Recursos adicionais
Para padrões de uso comuns, incluindo exemplos de várias operações
COPY INTO
na mesma tabela Delta, consulte Padrões de carregamento de dados comuns usando COPY INTO.