Comece a usar COPY INTO para carregar dados
O comando SQL COPY INTO
permite carregar dados de um local de arquivo em uma tabela Delta. Esta é uma operação re-testável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO
oferece os seguintes recursos:
Filtros de arquivos ou diretórios facilmente configuráveis a partir de armazenamento clouds , incluindo volumes S3, ADLS Gen2, ABFS, GCS e Unity Catalog.
Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários
Processamento de arquivo exatamente uma vez (idempotente) por default
Inferência, mapeamento, fusão e evolução do esquema da tabela de destino
Observação
Para obter uma experiência de ingestão de arquivos mais dimensionável e robusta, o site Databricks recomenda que os usuários do SQL utilizem tabelas de transmissão. Consulte as tabelas Load use de dados transmission em Databricks SQL.
Aviso
COPY INTO
respeita a configuração workspace para vetores de exclusão. Se ativados, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO
forem executados em um SQL warehouse ou compute com Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam as consultas a uma tabela em Databricks Runtime 11.3 LTS e abaixo. Consulte O que são vetores de exclusão? e Auto-enable deletion vectors (Ativação automática de vetores de exclusão).
Requisitos
Um administrador account deve seguir as passos em Configurar acesso a dados para ingestão para configurar o acesso a dados no armazenamento de objetos cloud antes que os usuários possam carregar o uso de dados COPY INTO
.
Exemplo: carregar dados em uma tabela Delta Lake sem esquema
Observação
Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.
Você pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO
definindo mergeSchema
como true
em COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução SQL acima é idempotente e pode ser agendada para execução para ingerir dados exatamente uma vez em uma tabela Delta.
Observação
A tabela Delta vazia não pode ser usada fora de COPY INTO
. INSERT INTO
e MERGE INTO
não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO
, a tabela torna-se consultável.
Consulte Criar tabelas de destino para COPY INTO.
Exemplo: Definir esquema e carregar dados em uma tabela Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando COPY INTO
SQL para carregar dados de amostra do conjunto de dadosDatabricks na tabela. O senhor pode executar o código de exemplo Python, R, Scala, ou SQL a partir de um arquivo Notebook anexado a um Databricks cluster. O senhor também pode executar o código SQL a partir de uma consulta associada a um SQL warehouse em Databricks SQL.
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o seguinte código, que exclui a tabela:
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
Referência
Databricks Runtime 7.xe acima: COPY INTO
Recursos adicionais
Para padrões de uso comuns, incluindo exemplos de várias operações
COPY INTO
na mesma tabela Delta, consulte Padrões de carregamento de dados comuns usando COPY INTO.