Pular para o conteúdo principal

Tutorial: Criar e gerenciar tabelas Delta Lake

Este tutorial demonstra operações comuns em tabelas Delta usando dados de exemplo. Delta Lake é a camada de armazenamento otimizada que fornece a base para as tabelas no Databricks. Salvo indicação em contrário, todas as tabelas no Databricks são tabelas Delta.

Antes de começar

Para concluir este tutorial, você precisa de:

  • Permissão para usar um recurso compute existente ou criar um novo recurso compute . Veja calcular.
  • Permissões Unity Catalog : USE CATALOG, USE SCHEMA e CREATE TABLE no catálogo workspace . Para definir essas permissões, consulte os privilégios de administrador Databricks ou Unity Catalog e os objetos protegíveis.

Esses exemplos se baseiam em um dataset chamado Registros Sintéticos de Pessoas: 10 mil a 10 milhões de registros . Este dataset contém registros fictícios de pessoas, incluindo seus nomes e sobrenomes, sexo e idade.

Primeiro, download o dataset para este tutorial.

  1. Visite a página Synthetic Person Records: 10K to 10M Records no Kaggle.
  2. Clique em "Download" e, em seguida, em "Download dataset as zip" . Isso downloads de um arquivo chamado archive.zip para sua máquina local.
  3. Extraia a pasta archive do arquivo archive.zip .

Em seguida, upload o dataset person_10000.csv para um volume Unity Catalog dentro do seu workspace Databricks . A Databricks recomenda o upload dos seus dados para um volume do Unity Catalog, pois os volumes oferecem recursos para acessar, armazenar, gerenciar e organizar arquivos.

  1. Abra o Explorador de Catálogo clicando em Ícone de dados. Catálogo na barra lateral.
  2. No Explorador de Catálogo, clique em Ícone de adicionar ou ícone de mais Adicionar dados e criar um volume .
  3. Dê o nome de my-volume ao volume e selecione gerenciar volume como o tipo de volume.
  4. Selecione o catálogo workspace e o esquema default e clique em Criar .
  5. Abra my-volume e clique em carregar para este volume .
  6. Arraste e solte ou navegue até e selecione o arquivo person_10000.csv dentro da pasta archive em sua máquina local.
  7. Clique em upload .

Por fim, crie um Notebook para executar o código de exemplo.

  1. Clique Ícone de adicionar ou ícone de mais Novidade na barra lateral.
  2. Clique Ícone Notebook . Notebook para criar um novo Notebook.
  3. Escolha um idioma para o Notebook.

Crie uma tabela

Crie uma nova tabela de gerenciamento Unity Catalog chamada workspace.default.people_10k a partir de person_10000.csv. Delta Lake é o default para todos os comandos de criação, leitura e gravação de tabelas no Databricks.

Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Existem diversas maneiras diferentes de criar ou clonar tabelas. Para mais informações, consulte CREATE TABLE.

No Databricks Runtime 13.3 LTS e versões superiores, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela Delta de origem. Isso pode ser útil ao promover tabelas de um ambiente de desenvolvimento para um ambiente de produção.

SQL
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
info

Visualização

Esse recurso está em Public Preview.

Use a API DeltaTableBuilder para Python e Scala para criar uma tabela vazia. Em comparação com DataFrameWriter e DataFrameWriterV2, a API DeltaTableBuilder torna mais fácil especificar informações adicionais, como comentários de coluna, propriedades da tabela e colunas geradas.

Python
from delta.tables import DeltaTable

(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

Subir para uma mesa

Modifique registros existentes em uma tabela ou adicione novos usando uma operação chamada upsert . Para merge um conjunto de atualizações e inserções em uma tabela Delta existente, use o método DeltaTable.merge em Python e Scala e a instrução MERGE INTO em SQL.

Por exemplo, merge dados da tabela de origem people_10k_updates para a tabela Delta de destino workspace.default.people_10k. Quando existe uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não houver uma linha correspondente, o Delta Lake adiciona uma nova linha.

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])

data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

Em SQL, o operador * atualiza ou insere todas as colunas na tabela de destino, assumindo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gera um erro de análise. Além disso, você deve especificar um valor para cada coluna da sua tabela ao realizar uma operação de inserção. Os valores da coluna podem estar vazios, por exemplo, ''. Ao realizar uma operação de inserção, não é necessário atualizar todos os valores.

Leia uma tabela

Use o nome ou o caminho da tabela para acessar os dados nas tabelas Delta. Para acessar as tabelas Unity Catalog , use o nome completo da tabela. O acesso baseado em caminho é suportado apenas para volumes e tabelas externas, não para tabelas gerenciadas. Para obter mais informações, consulte Regras de caminho e acesso nos volumes Unity Catalog.

Python
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

Escreva para uma mesa

O Delta Lake utiliza a sintaxe padrão para gravar dados em tabelas. Para adicionar novos dados a uma tabela Delta existente, use o modo de acréscimo. Diferentemente da operação de upsert, a escrita em uma tabela não verifica registros duplicados.

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])

data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Os resultados das células Databricks Notebook exibem um máximo de 10.000 linhas ou 2 MB, o que for menor. Como workspace.default.people_10k contém mais de 10.000 linhas, apenas as primeiras 10.000 linhas aparecem na saída do Notebook para display(df). As linhas adicionais estão presentes na tabela, mas não são exibidas na saída do Notebook devido a essa limitação. Você pode view as linhas adicionais filtrando-as especificamente.

Para substituir todos os dados em uma tabela, use o modo de sobrescrita.

Python
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

Atualizar uma tabela

Atualizar dados em uma tabela Delta com base em um predicado. Por exemplo, altere os valores na coluna gender de Female para F, de Male para M e de Other para O.

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)

deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

Excluir de uma tabela

Remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, o código abaixo demonstra duas operações de exclusão: primeiro, excluindo as linhas onde a idade é menor que 18 anos e, em seguida, excluindo as linhas onde a idade é menor que 21 anos.

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
importante

A exclusão remove os dados da versão mais recente da tabela Delta , mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente vacuum. Para mais informações, veja vacuum.

Exibir histórico da tabela

Use o método DeltaTable.history em Python e Scala e a instrução DESCRIBE HISTORY em SQL para view as informações de proveniência de cada gravação em uma tabela.

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

Consulte uma versão anterior da tabela usando viagem do tempo

Consultar um Snapshot mais antigo de uma tabela Delta usando Delta Lake viagem do tempo. Para consultar uma versão específica, utilize o número da versão ou o carimbo de data/hora da tabela. Por exemplo, consulte a versão 0 ou o carimbo de data/hora 2026-01-05T23:09:47.000+00:00 do histórico da tabela.

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

Para registros de data e hora, somente strings de data ou de registro de data e hora são aceitas. Por exemplo, as strings devem ser formatadas como "2026-01-05T22:43:15.000+00:00" ou "2026-01-05 22:43:15".

Use as opções DataFrameReader para criar um DataFrame a partir de uma tabela Delta que esteja fixado em uma versão ou timestamp específico da tabela.

Python
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

Para mais informações, veja Trabalhar com tabela história.

Otimizar uma tabela

Alterações múltiplas em uma tabela podem criar diversos arquivos pequenos, o que diminui o desempenho das consultas de leitura. Utilize as operações de otimização para melhorar a velocidade, combinando arquivos pequenos em arquivos maiores. Consulte OPTIMIZE.

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
nota

Se a otimização preditiva estiver ativada, você não precisa otimizar manualmente. A otimização preditiva gerencia automaticamente as tarefas de manutenção. Para obter mais informações, consulte Otimização preditiva para tabelas de gerenciamento Unity Catalog.

Z-order por colunas

Para Z-order e melhorar ainda mais o desempenho de leitura, especifique as colunas pelas quais ordenar nas operações. Por exemplo, coloque pela coluna de alta cardinalidade firstName. Para obter mais informações sobre Z-ordering, consulte Ignoração de dados.

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

Limpe o Snapshot com as operações vacuum

Delta Lake possui isolamento de snapshot para leituras, o que significa que é seguro executar e otimizar operações enquanto outros usuários ou trabalhos estão consultando a tabela. No entanto, você deve eventualmente limpar os Snapshots antigos, pois isso reduz os custos de armazenamento, melhora o desempenho das consultas e garante compliance dos dados. execute as VACUUM operações para limpar o Snapshot antigo. Veja vacuum.

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

Para obter mais informações sobre como usar as operações vacuum de maneira eficaz, consulte Remover arquivos de dados não utilizados com vacuum.

Próximos passos

Recurso relacionado