Tutorial: Delta Lake

Este tutorial apresenta operações comuns do Delta Lake no Databricks, incluindo as seguintes:

O senhor pode executar o exemplo de código Python, R, Scala e SQL neste artigo de dentro de um Notebook conectado a um cluster da Databricks. O senhor também pode executar o código SQL neste artigo de dentro de uma consulta associada a um SQL warehouse no Databricks SQL.

Observação

Alguns dos exemplos de código a seguir usam uma notação de namespace de dois níveis que consiste em um esquema (também chamado de banco de dados) e uma tabela ou view (por exemplo, default.people10m). Para usar esses exemplos com o Unity Catalog, substitua o namespace de dois níveis pela notação de namespace de três níveis do Unity Catalog, que consiste em um catálogo, esquema e tabela ou view (por exemplo, main.default.people10m).

Criar uma tabela

Todas as tabelas criadas no Databricks usam o Delta Lake por padrão.

Observação

Delta Lake é o default para todos os comandos de leitura, gravação e criação de tabela Databricks.

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")
DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

As operações anteriores criam uma nova tabela gerenciar usando o esquema que foi inferido dos dados. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, consulte CREATE TABLE.

Para tabelas gerenciadas, o Databricks determina a localização dos dados.Para obter a localização, você pode usar a instrução DESCRIBE DETAIL, por exemplo:

display(spark.sql('DESCRIBE DETAIL people_10m'))
display(sql("DESCRIBE DETAIL people_10m"))
display(spark.sql("DESCRIBE DETAIL people_10m"))
DESCRIBE DETAIL people_10m;

Às vezes, você pode querer criar uma tabela especificando o esquema antes de inserir os dados. Você pode concluir isso com os seguintes comandos SQL:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar CREATE TABLE LIKE para criar uma nova tabela vazia Delta que duplica o esquema e as propriedades da tabela de uma tabela de origem Delta. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, como no exemplo de código a seguir:

CREATE TABLE prod.people10m LIKE dev.people10m

Você também pode usar a API do DeltaTableBuilder no Delta Lake para criar tabelas. Em comparação com as APIs DataFrameWriter, essa API facilita a especificação de informações adicionais, como comentários de colunas, propriedades de tabelas e colunas geradas.

Visualização

Esse recurso está na Visualização pública.

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()
// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert em uma tabela

Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use a instrução MERGE INTO . Por exemplo, a instrução a seguir pega dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há nenhuma linha correspondente, o Delta Lake adiciona uma nova linha. Essa operação é conhecida como upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Se você especificar *, isso atualizará ou inserirá todas as colunas na tabela de destino. Isso pressupõe que a tabela de origem tenha as mesmas colunas que as da tabela de destino; caso contrário, a consulta gerará um erro de análise.

Você deve especificar um valor para cada coluna na sua tabela quando executar uma operação INSERT (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, você não precisa atualizar todos os valores.

Para ver os resultados, consulte a tabela.

SELECT * FROM people_10m WHERE id >= 9999998

Ler uma tabela

Você acessa os dados nas tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)
people_df = tableToDF(table_name)

display(people_df)
val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)
SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

Gravar em uma tabela

O Delta Lake usa sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo append como nos exemplos a seguir:

INSERT INTO people10m SELECT * FROM more_people
df.write.mode("append").saveAsTable("people10m")
df.write.mode("append").saveAsTable("people10m")

Para substituir atomicamente todos os dados em uma tabela, use o modo overwrite como nos seguintes exemplos:

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
df.write.mode("overwrite").saveAsTable("people10m")
df.write.mode("overwrite").saveAsTable("people10m")

Atualizar uma tabela

Você pode atualizar dados que correspondem a um predicado em uma tabela Delta. Por exemplo, em uma tabela denominada people10m ou um caminho em /tmp/delta/people-10m, para alterar uma abreviação na coluna gender de M ou F para Male ou Female, você pode executar o seguinte:

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

Excluir de uma tabela

Você pode remover dados que correspondam a um predicado de uma tabela Delta. Por exemplo, em uma tabela chamada people10m ou um caminho em /tmp/delta/people-10m, para excluir todas as linhas correspondentes a pessoas com um valor na coluna birthDate antes de 1955, você pode executar o seguinte:

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

Importante

delete remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas tenham recebido explicitamente o comando VACUUM.Consulte VACUUM para obter detalhes.

Exibir histórico da tabela

Para visualizar o histórico de uma tabela, use a instrução DESCRIBE HISTORY, que fornece informações de proveniência, incluindo a versão da tabela, operação, usuário e assim por diante, para cada gravação em uma tabela.

DESCRIBE HISTORY people_10m

Consultar uma versão anterior da tabela (viagem do tempo)

A viagem do tempo do Delta Lake permite que você consulte um snapshot antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique uma versão ou carimbo de data/hora em uma instrução SELECT . Por exemplo, para consultar a versão 0 do histórico acima, use:

SELECT * FROM people_10m VERSION AS OF 0

ou

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para carimbos de data/hora, apenas strings de data ou carimbo de data/hora são aceitas, por exemplo, "2019-01-01" e "2019-01-01'T'00:00:00.000Z".

As opções DataFrameReader permitem que você crie um DataFrame a partir de uma tabela Delta, que é fixada a uma versão específica da tabela, por exemplo no Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

ou então:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

Para obter detalhes, consulte o histórico da tabela Work with Delta Lake.

Otimizar uma tabela

Depois de realizar várias alterações em uma tabela, você pode ter muitos arquivos pequenos. Para melhorar a velocidade das consultas de leitura, você pode utilizar o OPTIMIZE para recolher arquivos pequenos em arquivos maiores:

OPTIMIZE people_10m

Z-Order por colunas

Para melhorar ainda mais o desempenho de leitura, você pode colocalizar informações relacionadas no mesmo conjunto de arquivos por Z-ordering. Essa colocalidade é usada automaticamente pelos algoritmos de salto de dados do Delta Lake para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para dados de Z-order, você especifica as colunas a serem ordenadas na cláusula ZORDER BY . Por exemplo, para colocalizar por gender, execute:

OPTIMIZE people_10m
ZORDER BY (gender)

Para obter o conjunto completo de opções disponíveis ao executar o OPTIMIZE, consulte Compactar arquivos de dados com otimização no Delta Lake.

Limpar os instantâneos com VACUUM

O Delta Lake oferece isolamento de snapshot para leituras, o que significa que é seguro executar OPTIMIZE mesmo quando outros usuários ou jobs estão consultando a tabela. No entanto, eventualmente, você deve limpar os snapshots antigos. Você pode fazer isto executando o comando VACUUM:

VACUUM people_10m

Para obter detalhes sobre o uso eficaz do VACUUM, consulte Remover arquivos de dados não utilizados com o vacuum.