O senhor começa a usar o site COPY INTO para carregar dados
O comando COPY INTO
SQL permite que o senhor carregue dados de um local de arquivo em uma tabela Delta. Essa é uma operação re-triável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO
oferece os seguintes recursos:
- Filtros de arquivo ou diretório facilmente configuráveis do armazenamento em nuvem, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
- Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários
- Processamento de arquivos exatamente uma vez (idempotente) por default
- Inferência, mapeamento, fusão e evolução do esquema da tabela de destino
Para obter uma experiência de ingestão de arquivos mais dimensionável e robusta, o site Databricks recomenda que os usuários do SQL utilizem tabelas de transmissão. Consulte as tabelas Load use de dados transmission em Databricks SQL.
COPY INTO
respeita a configuração workspace para vetores de exclusão. Se ativados, os vetores de exclusão serão ativados na tabela de destino quando COPY INTO
forem executados em um SQL warehouse ou compute com Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam as consultas a uma tabela em Databricks Runtime 11.3 LTS e abaixo. Consulte O que são vetores de exclusão? e habilite automaticamente os vetores de exclusão.
Requisitos
Um administrador do account deve seguir as etapas em Configurar acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos na nuvem antes que os usuários possam carregar o uso de dados COPY INTO
.
Exemplo: Carregar dados em uma tabela Delta Lake sem esquema
Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.
O senhor pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO
, definindo mergeSchema
para true
em COPY_OPTIONS
:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução SQL acima é idempotente e pode ser programada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.
A tabela Delta vazia não pode ser usada fora do site COPY INTO
. INSERT INTO
e MERGE INTO
não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO
, a tabela se torna consultável.
Consulte Criar tabelas de destino para COPY INTO
.
Exemplo: Definir esquema e carregar dados em uma tabela do Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando COPY INTO
SQL para carregar dados de amostra do conjunto de dadosDatabricks na tabela. O senhor pode executar o código Python de exemplo, R, Scala ou SQL de um Notebook anexado a um Databricks cluster. O senhor também pode executar o código SQL a partir de uma consulta associada a um SQL warehouse em Databricks SQL.
- SQL
- Python
- R
- Scala
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o código a seguir, que exclui a tabela:
- Python
- R
- Scala
- SQL
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
Limpe arquivos de metadados
O senhor pode executar vacuum para limpar os arquivos de metadados não referenciados criados por COPY INTO
em Databricks Runtime 15.2 e acima.
Referência
- Databricks Runtime 7.x e acima:
COPY INTO
Recurso adicional
-
Carregar uso de dados COPY INTO com volumes Unity Catalog ou locais externos
-
Para obter padrões de uso comuns, incluindo exemplos de várias operações
COPY INTO
na mesma tabela Delta, consulte Padrões comuns de carregamento de dados usandoCOPY INTO
.