Tutorial: Delta Lake

Este tutorial apresenta operações comuns do Delta Lake no Databricks, incluindo as seguintes:

O senhor pode executar o código de exemplo Python, Scala, e SQL neste artigo de dentro de um Notebook anexado a um recurso Databricks compute , como um cluster. O senhor também pode executar o código SQL neste artigo de dentro de uma consulta associada a um SQL warehouse em Databricks SQL.

Preparar os dados de origem

Este tutorial se baseia em um dataset chamado People 10 M. Ele contém 10 milhões de registros fictícios que contêm fatos sobre pessoas, como nome e sobrenome, data de nascimento e salário. Este tutorial pressupõe que este dataset esteja em um Unity Catalog volume associado ao seu destino Databricks workspace.

Para obter o People 10 M dataset para este tutorial, faça o seguinte:

  1. Acesse a página People 10 M no Kaggle.

  2. Clique em download para download um arquivo chamado archive.zip em seu computador local.

  3. Extraia o arquivo chamado export.csv do arquivo archive.zip. O arquivo export.csv contém os dados para este tutorial.

Para upload o arquivo export.csv no volume, faça o seguinte:

  1. Na barra lateral, clique em Catalog.

  2. No Catalog Explorer, navegue até o volume em que o senhor deseja fazer upload do arquivo export.csv e abra-o.

  3. Clique em upload para acessar esse volume.

  4. Arraste e solte ou procure e selecione o arquivo export.csv em seu computador local.

  5. Clique em upload.

Nos exemplos de código a seguir, substitua /Volumes/main/default/my-volume/export.csv pelo caminho do arquivo export.csv no volume de destino.

Criar uma tabela

Todas as tabelas criadas em Databricks usam Delta Lake por default. Databricks recomenda o uso das tabelas gerenciais do site Unity Catalog.

No exemplo de código anterior e nos exemplos de código a seguir, substitua o nome da tabela main.default.people_10m pelo catálogo de três partes, esquema e nome da tabela de destino no Unity Catalog.

Observação

Delta Lake é o default para todos os comandos de leitura, gravação e criação de tabela Databricks.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

As operações anteriores criam uma nova tabela gerenciar. Para obter informações sobre as opções disponíveis quando o senhor cria uma tabela Delta, consulte CREATE TABLE.

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar CREATE TABLE LIKE para criar uma nova tabela vazia Delta que duplica o esquema e as propriedades da tabela de uma tabela de origem Delta. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, conforme mostrado no exemplo de código a seguir:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Para criar uma tabela vazia, o senhor também pode usar a API DeltaTableBuilder no Delta Lake para Python e Scala. Em comparação com o DataFrameWriter equivalente APIs, esses APIs facilitam a especificação de informações adicionais, como comentários de colunas, propriedades de tabelas e colunas geradas.

Visualização

Esse recurso está na Visualização pública.

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()
DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert em uma tabela

Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, o senhor usa o método DeltaTable.merge para Python e Scala e a instrução MERGE INTO para SQL. Por exemplo, o exemplo a seguir pega os dados da tabela de origem e os mescla na tabela de destino Delta. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha. Essa operação é conhecida como upsert.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()
CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

No SQL, se o senhor especificar *, isso atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gera um erro de análise.

O senhor deve especificar um valor para cada coluna da tabela ao realizar uma operação de inserção (por exemplo, quando não houver uma linha correspondente no site existente dataset). No entanto, o senhor não precisa atualizar todos os valores.

Para ver os resultados, consulte a tabela.

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SELECT * FROM main.default.people_10m WHERE id >= 9999998

Ler uma tabela

Você acessa os dados nas tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:

people_df = spark.read.table("main.default.people_10m")
display(people_df)
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SELECT * FROM main.default.people_10m;

Gravar em uma tabela

O Delta Lake usa sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo append, conforme mostrado nos exemplos a seguir:

df.write.mode("append").saveAsTable("main.default.people_10m")
df.write.mode("append").saveAsTable("main.default.people_10m")
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Para substituir todos os dados em uma tabela, use o modo de substituição, como nos exemplos a seguir:

df.write.mode("overwrite").saveAsTable("main.default.people_10m")
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Atualizar uma tabela

O senhor pode atualizar os dados que correspondem a um predicado em uma tabela Delta. Por exemplo, na tabela people_10m de exemplo, para alterar uma abreviação na coluna gender de M ou F para Male ou Female, o senhor pode executar o seguinte:

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Excluir de uma tabela

O senhor pode remover os dados que correspondem a um predicado de uma tabela Delta. Por exemplo, na tabela people_10m do exemplo, para excluir todas as linhas correspondentes a pessoas com um valor na coluna birthDate antes de 1955, o senhor pode executar o seguinte:

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Importante

A exclusão remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente vacuum. Consulte o vácuo para obter detalhes.

Exibir histórico da tabela

Para view a história de uma tabela, o senhor usa o método DeltaTable.history para Python e Scalae a instrução DESCRIBE HISTORY em SQL, que fornece informações de proveniência, incluindo a versão da tabela, operações, usuário e assim por diante, para cada gravação em uma tabela.

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
DESCRIBE HISTORY main.default.people_10m

Consultar uma versão anterior da tabela (viagem do tempo)

A viagem do tempo do Delta Lake permite que você consulte um snapshot antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique a versão ou o registro de data e hora da tabela. Por exemplo, para consultar a versão 0 ou o registro de data e hora 2024-05-15T22:43:15.000+00:00Z do histórico anterior, use o seguinte:

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para carimbos de data/hora, somente data ou carimbo de data/hora strings são aceitos, por exemplo, "2024-05-15T22:43:15.000+00:00" ou "2024-05-15 22:43:15".

As opções do DataFrameReader permitem que o senhor crie um DataFrame a partir de uma tabela Delta que é fixada em uma versão específica ou em um registro de data e hora da tabela, por exemplo:

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Para obter detalhes, consulte o histórico da tabela Work with Delta Lake.

Otimizar uma tabela

Depois de realizar várias alterações em uma tabela, o senhor pode ter muitos arquivos pequenos. Para aumentar a velocidade das consultas de leitura, o senhor pode usar a otimização de operações para compactar arquivos pequenos em arquivos maiores:

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
OPTIMIZE main.default.people_10m

Z-Order por colunas

Para melhorar ainda mais o desempenho da leitura, o senhor pode agrupar informações relacionadas no mesmo conjunto de arquivos pelo endereço Z-ordering. Os algoritmos de pular dados do Delta Lake usam essa colocação para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para Z-order dados, o senhor especifica as colunas a serem ordenadas em Z-order por operações. Por exemplo, para colocar por gender, execução:

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Para obter o conjunto completo de opções disponíveis ao executar as operações de otimização, consulte Otimizar a disposição do arquivo de dados.

Limpar os instantâneos com VACUUM

Delta Lake fornece o isolamento Snapshot para leituras, o que significa que é seguro executar e otimizar operações mesmo quando outros usuários ou trabalhos estiverem consultando a tabela. Eventualmente, porém, o senhor deve limpar o Snapshot antigo. O senhor pode fazer isso executando o vacuum operações:

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
VACUUM main.default.people_10m

Para obter detalhes sobre o uso eficaz das operações do vacuum, consulte Remover arquivos de dados não utilizados com o vacuum.