Tutorial: Criar e gerenciar tabelas Delta Lake
Este tutorial demonstra operações comuns em tabelas Delta usando dados de exemplo. Delta Lake é a camada de armazenamento otimizada que fornece a base para as tabelas no Databricks. Salvo indicação em contrário, todas as tabelas no Databricks são tabelas Delta.
Antes de começar
Para concluir este tutorial, você precisa de:
- Permissão para usar um recurso compute existente ou criar um novo recurso compute . Veja calcular.
- Permissões Unity Catalog :
USE CATALOG,USE SCHEMAeCREATE TABLEno catálogoworkspace. Para definir essas permissões, consulte os privilégios de administrador Databricks ou Unity Catalog e os objetos protegíveis.
Esses exemplos se baseiam em um dataset chamado Registros Sintéticos de Pessoas: 10 mil a 10 milhões de registros . Este dataset contém registros fictícios de pessoas, incluindo seus nomes e sobrenomes, sexo e idade.
Primeiro, download o dataset para este tutorial.
- Visite a página Synthetic Person Records: 10K to 10M Records no Kaggle.
- Clique em "Download" e, em seguida, em "Download dataset as zip" . Isso downloads de um arquivo chamado
archive.zippara sua máquina local. - Extraia a pasta
archivedo arquivoarchive.zip.
Em seguida, upload o dataset person_10000.csv para um volume Unity Catalog dentro do seu workspace Databricks . A Databricks recomenda o upload dos seus dados para um volume do Unity Catalog, pois os volumes oferecem recursos para acessar, armazenar, gerenciar e organizar arquivos.
- Abra o Explorador de Catálogo clicando em
Catálogo na barra lateral.
- No Explorador de Catálogo, clique em
Adicionar dados e criar um volume .
- Dê o nome de
my-volumeao volume e selecione gerenciar volume como o tipo de volume. - Selecione o catálogo
workspacee o esquemadefaulte clique em Criar . - Abra
my-volumee clique em carregar para este volume . - Arraste e solte ou navegue até e selecione o arquivo
person_10000.csvdentro da pastaarchiveem sua máquina local. - Clique em upload .
Por fim, crie um Notebook para executar o código de exemplo.
- Clique
Novidade na barra lateral.
- Clique
Notebook para criar um novo Notebook.
- Escolha um idioma para o Notebook.
Crie uma tabela
Crie uma nova tabela de gerenciamento Unity Catalog chamada workspace.default.people_10k a partir de person_10000.csv. Delta Lake é o default para todos os comandos de criação, leitura e gravação de tabelas no Databricks.
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
Existem diversas maneiras diferentes de criar ou clonar tabelas. Para mais informações, consulte CREATE TABLE.
No Databricks Runtime 13.3 LTS e versões superiores, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela Delta de origem. Isso pode ser útil ao promover tabelas de um ambiente de desenvolvimento para um ambiente de produção.
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
Visualização
Esse recurso está em Public Preview.
Use a API DeltaTableBuilder para Python e Scala para criar uma tabela vazia. Em comparação com DataFrameWriter e DataFrameWriterV2, a API DeltaTableBuilder torna mais fácil especificar informações adicionais, como comentários de coluna, propriedades da tabela e colunas geradas.
- Python
- Scala
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
Subir para uma mesa
Modifique registros existentes em uma tabela ou adicione novos usando uma operação chamada upsert . Para merge um conjunto de atualizações e inserções em uma tabela Delta existente, use o método DeltaTable.merge em Python e Scala e a instrução MERGE INTO em SQL.
Por exemplo, merge dados da tabela de origem people_10k_updates para a tabela Delta de destino workspace.default.people_10k. Quando existe uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não houver uma linha correspondente, o Delta Lake adiciona uma nova linha.
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
Em SQL, o operador * atualiza ou insere todas as colunas na tabela de destino, assumindo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gera um erro de análise. Além disso, você deve especificar um valor para cada coluna da sua tabela ao realizar uma operação de inserção. Os valores da coluna podem estar vazios, por exemplo, ''. Ao realizar uma operação de inserção, não é necessário atualizar todos os valores.
Leia uma tabela
Use o nome ou o caminho da tabela para acessar os dados nas tabelas Delta. Para acessar as tabelas Unity Catalog , use o nome completo da tabela. O acesso baseado em caminho é suportado apenas para volumes e tabelas externas, não para tabelas gerenciadas. Para obter mais informações, consulte Regras de caminho e acesso nos volumes Unity Catalog.
- Python
- Scala
- SQL
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SELECT * FROM workspace.default.people_10k;
Escreva para uma mesa
O Delta Lake utiliza a sintaxe padrão para gravar dados em tabelas. Para adicionar novos dados a uma tabela Delta existente, use o modo de acréscimo. Diferentemente da operação de upsert, a escrita em uma tabela não verifica registros duplicados.
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Os resultados das células Databricks Notebook exibem um máximo de 10.000 linhas ou 2 MB, o que for menor. Como workspace.default.people_10k contém mais de 10.000 linhas, apenas as primeiras 10.000 linhas aparecem na saída do Notebook para display(df). As linhas adicionais estão presentes na tabela, mas não são exibidas na saída do Notebook devido a essa limitação. Você pode view as linhas adicionais filtrando-as especificamente.
Para substituir todos os dados em uma tabela, use o modo de sobrescrita.
- Python
- Scala
- SQL
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
Atualizar uma tabela
Atualizar dados em uma tabela Delta com base em um predicado. Por exemplo, altere os valores na coluna gender de Female para F, de Male para M e de Other para O.
- Python
- Scala
- SQL
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
Excluir de uma tabela
Remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, o código abaixo demonstra duas operações de exclusão: primeiro, excluindo as linhas onde a idade é menor que 18 anos e, em seguida, excluindo as linhas onde a idade é menor que 21 anos.
- Python
- Scala
- SQL
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
A exclusão remove os dados da versão mais recente da tabela Delta , mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente vacuum. Para mais informações, veja vacuum.
Exibir histórico da tabela
Use o método DeltaTable.history em Python e Scala e a instrução DESCRIBE HISTORY em SQL para view as informações de proveniência de cada gravação em uma tabela.
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
DESCRIBE HISTORY workspace.default.people_10k
Consulte uma versão anterior da tabela usando viagem do tempo
Consultar um Snapshot mais antigo de uma tabela Delta usando Delta Lake viagem do tempo. Para consultar uma versão específica, utilize o número da versão ou o carimbo de data/hora da tabela. Por exemplo, consulte a versão 0 ou o carimbo de data/hora 2026-01-05T23:09:47.000+00:00 do histórico da tabela.
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
Para registros de data e hora, somente strings de data ou de registro de data e hora são aceitas. Por exemplo, as strings devem ser formatadas como "2026-01-05T22:43:15.000+00:00" ou "2026-01-05 22:43:15".
Use as opções DataFrameReader para criar um DataFrame a partir de uma tabela Delta que esteja fixado em uma versão ou timestamp específico da tabela.
- Python
- Scala
- SQL
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
Para mais informações, veja Trabalhar com tabela história.
Otimizar uma tabela
Alterações múltiplas em uma tabela podem criar diversos arquivos pequenos, o que diminui o desempenho das consultas de leitura. Utilize as operações de otimização para melhorar a velocidade, combinando arquivos pequenos em arquivos maiores. Consulte OPTIMIZE.
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
OPTIMIZE workspace.default.people_10k
Se a otimização preditiva estiver ativada, você não precisa otimizar manualmente. A otimização preditiva gerencia automaticamente as tarefas de manutenção. Para obter mais informações, consulte Otimização preditiva para tabelas de gerenciamento Unity Catalog.
Z-order por colunas
Para Z-order e melhorar ainda mais o desempenho de leitura, especifique as colunas pelas quais ordenar nas operações. Por exemplo, coloque pela coluna de alta cardinalidade firstName. Para obter mais informações sobre Z-ordering, consulte Ignoração de dados.
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
Limpe o Snapshot com as operações vacuum
Delta Lake possui isolamento de snapshot para leituras, o que significa que é seguro executar e otimizar operações enquanto outros usuários ou trabalhos estão consultando a tabela. No entanto, você deve eventualmente limpar os Snapshots antigos, pois isso reduz os custos de armazenamento, melhora o desempenho das consultas e garante compliance dos dados. execute as VACUUM operações para limpar o Snapshot antigo. Veja vacuum.
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
VACUUM workspace.default.people_10k
Para obter mais informações sobre como usar as operações vacuum de maneira eficaz, consulte Remover arquivos de dados não utilizados com vacuum.