Pular para o conteúdo principal

tutorial: Delta Lake

Este tutorial apresenta operações comuns do Delta Lake no Databricks, incluindo as seguintes:

O senhor pode executar o código de exemplo Python, Scala e SQL neste artigo em um Notebook anexado a um recurso Databricks compute , como um clustering. O senhor também pode executar o código SQL neste artigo de dentro de uma consulta associada a um SQL warehouse em Databricks SQL.

Prepare os dados de origem

Este tutorial se baseia em um dataset chamado People 10 M. Ele contém 10 milhões de registros fictícios que contêm fatos sobre pessoas, como nome e sobrenome, data de nascimento e salário. Este tutorial pressupõe que este dataset esteja em um Unity Catalog volume associado ao seu destino Databricks workspace.

Para obter o People 10 M dataset para este tutorial, faça o seguinte:

  1. Vá para a página People 10 M no Kaggle.
  2. Clique em download para download um arquivo chamado archive.zip em seu computador local.
  3. Extraia o arquivo chamado export.csv do arquivo archive.zip. O arquivo export.csv contém os dados para este tutorial.

Para upload o arquivo export.csv no volume, faça o seguinte:

  1. Na barra lateral, clique em Catálogo .
  2. No Catalog Explorer , navegue até o volume em que o senhor deseja acessar upload o arquivo export.csv e abra-o.
  3. Clique em fazer upload para este volume .
  4. Arraste e solte ou procure e selecione o arquivo export.csv em sua máquina local.
  5. Clique em upload .

Nos exemplos de código a seguir, substitua /Volumes/main/default/my-volume/export.csv pelo caminho para o arquivo export.csv no volume de destino.

Crie uma tabela

Todas as tabelas criadas em Databricks usam Delta Lake por default. Databricks recomenda o uso das tabelas gerenciais do site Unity Catalog.

No exemplo de código anterior e nos exemplos de código a seguir, substitua o nome da tabela main.default.people_10m pelo catálogo de três partes, esquema e nome da tabela de destino no Unity Catalog.

nota

Delta Lake é o default para todas as leituras, gravações e criação de tabelas do comando Databricks.

Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

As operações anteriores criam uma nova tabela gerenciar. Para obter informações sobre as opções disponíveis quando o senhor cria uma tabela Delta, consulte CREATE TABLE.

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar CREATE TABLE LIKE para criar uma nova tabela vazia Delta que duplica o esquema e as propriedades da tabela de uma tabela de origem Delta. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para a produção, conforme mostrado no exemplo de código a seguir:

SQL
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Para criar uma tabela vazia, o senhor também pode usar a API DeltaTableBuilder no Delta Lake para Python e Scala. Em comparação com o DataFrameWriter equivalente APIs, esses APIs facilitam a especificação de informações adicionais, como comentários de colunas, propriedades de tabelas e colunas geradas.

info

Visualização

Esse recurso está em Public Preview.

Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()

Subir para uma mesa

Para merge um conjunto de atualizações e inserções em uma tabela Delta existente, o senhor usa o método DeltaTable.merge para Python e Scalae a instrução MERGE INTO para SQL. Por exemplo, o exemplo a seguir pega os dados da tabela de origem e os mescla na tabela de destino Delta. Quando há uma linha correspondente em ambas as tabelas, o Delta Lake atualiza a coluna de dados usando a expressão fornecida. Quando não há uma linha correspondente, o Delta Lake adiciona uma nova linha. Essa operação é conhecida como upsert .

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])

data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

No SQL, se o senhor especificar *, isso atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta apresentará um erro de análise.

O senhor deve especificar um valor para cada coluna da tabela ao realizar uma operação de inserção (por exemplo, quando não houver uma linha correspondente no site dataset). No entanto, você não precisa atualizar todos os valores.

Para ver os resultados, consulte a tabela.

Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Leia uma tabela

Você acessa os dados nas tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos exemplos a seguir:

Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)

Escreva para uma mesa

O Delta Lake usa sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo append, conforme mostrado nos exemplos a seguir:

Python
df.write.mode("append").saveAsTable("main.default.people_10m")

Para substituir todos os dados em uma tabela, use o modo de substituição como nos exemplos a seguir:

Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Atualizar uma tabela

O senhor pode atualizar os dados que correspondem a um predicado em uma tabela Delta. Por exemplo, na tabela de exemplo people_10m, para alterar uma abreviação na coluna gender de M ou F para Male ou Female, o senhor pode executar o seguinte:

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)

Excluir de uma tabela

O senhor pode remover os dados que correspondem a um predicado de uma tabela Delta. Por exemplo, na tabela de exemplo people_10m, para excluir todas as linhas correspondentes a pessoas com um valor na coluna birthDate antes de 1955, o senhor pode executar o seguinte:

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
important

A exclusão remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente vacuum. Veja vacuum para obter detalhes.

Exibir histórico da tabela

Para view a história de uma tabela, o senhor usa o método DeltaTable.history para Python e Scalae a instrução DESCRIBE HISTORY em SQL, que fornece informações de proveniência, incluindo a versão da tabela, operações, usuário e assim por diante, para cada gravação em uma tabela.

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Consultar uma versão anterior da tabela (viagem do tempo)

A viagem do tempo do Delta Lake permite que você consulte um snapshot antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique a versão ou o carimbo de data/hora da tabela. Por exemplo, para consultar a versão 0 ou o registro de data e hora 2024-05-15T22:43:15.000+00:00Z do histórico anterior, use o seguinte:

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Para carimbos de data/hora, somente data ou carimbo de data/hora strings são aceitos, por exemplo, "2024-05-15T22:43:15.000+00:00" ou "2024-05-15 22:43:15".

As opções do DataFrameReader permitem que o senhor crie um DataFrame a partir de uma tabela Delta que é fixada em uma versão específica ou em um registro de data e hora da tabela, por exemplo:

Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Para obter detalhes, consulte Work with Delta Lake table história.

Otimizar uma tabela

Depois de realizar várias alterações em uma tabela, você pode ter muitos arquivos pequenos. Para aumentar a velocidade das consultas de leitura, o senhor pode usar a otimização de operações para compactar arquivos pequenos em arquivos maiores:

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Z-order por colunas

Para melhorar ainda mais o desempenho da leitura, o senhor pode agrupar informações relacionadas no mesmo conjunto de arquivos pelo endereço Z-ordering. Os algoritmos de pular dados do Delta Lake usam essa colocação para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para Z-order dados, o senhor especifica as colunas a serem ordenadas em Z-order por operações. Por exemplo, para colocar em gender, execução:

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Para obter o conjunto completo de opções disponíveis ao executar as operações de otimização, consulte Otimizar a disposição do arquivo de dados.

Limpar o Snapshot com VACUUM

Delta Lake fornece isolamento de Snapshot para leituras, o que significa que é seguro executar e otimizar operações mesmo quando outros usuários ou trabalhos estiverem consultando a tabela. Eventualmente, porém, o senhor deve limpar o Snapshot antigo. O senhor pode fazer isso executando o vacuum operações:

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Para obter detalhes sobre o uso eficaz das operações do vacuum, consulte Remover arquivos de dados não utilizados com o vacuum.