O senhor começa a usar o site COPY INTO para carregar dados
O comando COPY INTO SQL permite que o senhor carregue dados de um local de arquivo em uma tabela Delta. Essa é uma operação re-triável e idempotente; os arquivos no local de origem que já foram carregados são ignorados.
COPY INTO oferece os seguintes recursos:
- Filtros de arquivo ou diretório facilmente configuráveis do armazenamento em nuvem, incluindo volumes S3, ADLS, ABFS, GCS e Unity Catalog.
- Suporte a vários formatos de arquivos de origem: CSV, JSON, XML, Avro, ORC, Parquet, arquivos de texto e binários
- Processamento de arquivos exatamente uma vez (idempotente) por default
- Inferência, mapeamento, fusão e evolução do esquema da tabela de destino
Para obter uma experiência de ingestão de arquivos mais dimensionável e robusta, o site Databricks recomenda que os usuários do SQL utilizem tabelas de transmissão. Consulte as tabelas de transmissão.
COPY INTO Respeita a configuração workspace para vetores de exclusão. Se ativado, os vetores de exclusão são habilitados na tabela de destino quando COPY INTO é executado em um SQL warehouse ou compute executando Databricks Runtime 14.0 ou superior. Uma vez ativados, os vetores de exclusão bloqueiam consultas em uma tabela no Databricks Runtime 11.3 LTS e versões anteriores. Consulte Vetores de exclusão no Databricks e Ativar vetores de exclusão automaticamente.
Requisitos
Um administrador do account deve seguir as etapas em Configurar acesso a dados para ingestão para configurar o acesso aos dados no armazenamento de objetos na nuvem antes que os usuários possam carregar o uso de dados COPY INTO.
Exemplo: Carregar dados em uma tabela Delta Lake sem esquema
Esse recurso está disponível em Databricks Runtime 11.3 LTS e acima.
O senhor pode criar tabelas Delta de espaço reservado vazias para que o esquema seja inferido posteriormente durante um comando COPY INTO, definindo mergeSchema para true em COPY_OPTIONS:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
A instrução SQL acima é idempotente e pode ser programada para ser executada para ingerir dados exatamente uma vez em uma tabela Delta.
A tabela Delta vazia não pode ser usada fora do site COPY INTO. INSERT INTO e MERGE INTO não são suportados para gravar dados em tabelas Delta sem esquema. Depois que os dados são inseridos na tabela com COPY INTO, a tabela se torna consultável.
Consulte Criar tabelas de destino para COPY INTO.
Exemplo: Definir esquema e carregar dados em uma tabela do Delta Lake
O exemplo a seguir mostra como criar uma tabela Delta e, em seguida, usar o comando COPY INTO SQL para carregar dados de amostra do conjunto de dadosDatabricks na tabela. O senhor pode executar o código Python de exemplo, R, Scala ou SQL de um Notebook anexado a um Databricks cluster. O senhor também pode executar o código SQL a partir de uma consulta associada a um SQL warehouse em Databricks SQL.
- SQL
- Python
- R
- Scala
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Para limpar, execute o código a seguir, que exclui a tabela:
- Python
- R
- Scala
- SQL
spark.sql("DROP TABLE " + table_name)
sql(paste("DROP TABLE ", table_name, sep = ""))
spark.sql("DROP TABLE " + table_name)
DROP TABLE default.loan_risks_upload
Limpe arquivos de metadados
O senhor pode executar vacuum para limpar os arquivos de metadados não referenciados criados por COPY INTO em Databricks Runtime 15.2 e acima.
Referência
- Databricks Runtime 7.x e acima:
COPY INTO
Recurso adicional
-
Carregar uso de dados COPY INTO com volumes Unity Catalog ou locais externos
-
Para obter padrões de uso comuns, incluindo exemplos de várias operações
COPY INTOna mesma tabela Delta, consulte Padrões comuns de carregamento de dados usandoCOPY INTO.