メインコンテンツまでスキップ

チュートリアル: Delta Lake

このチュートリアルでは、以下のようなDatabricks上でのDelta Lakeの一般的なオペレーションを紹介します:

PythonScalaSQLこの記事の例のDatabricks、 、コードは、 クラスター などの コンピュート リソースにアタッチされた ノートブック 内から実行できます。この記事の コードは、 の Databricks SQLSQLウェアハウス に関連付けられたクエリ 内から実行することもできます。SQL

ソース データを準備する

このチュートリアルでは、People 10 M というデータセットを使用します。これには、姓名、生年月日、給与など、人々に関する事実を保持する 1,000 万件の架空のレコードが含まれています。 このチュートリアルでは、このデータセットが、ターゲットの Databricks ワークスペースに関連付けられている Unity Catalog ボリューム 内にあることを前提としています。

このチュートリアルの People 10 M データセットを取得するには、次の手順を実行します。

  1. Kaggle の People 10 M ページに移動します。
  2. archive.zip という名前のファイルをローカルマシンにダウンロードするには 、ダウンロードをクリックしてください。
  3. archive.zip ファイルから export.csv という名前のファイルを抽出します。export.csvファイルには、このチュートリアルのデータが含まれています。

export.csvファイルをボリュームにアップロードするには、次の手順を実行します。

  1. サイドバーで、[ カタログ] をクリックします。
  2. カタログエクスプローラーでexport.csv ファイルをアップロードするボリュームを参照して開きます。
  3. このボリュームにアップロード 」をクリックしてください。
  4. ローカル コンピューター上の export.csv ファイルをドラッグ アンド ドロップするか、参照して選択します。
  5. アップロード 」をクリックします。

次のコード例では、/Volumes/main/default/my-volume/export.csv をターゲットボリュームの export.csv ファイルへのパスに置き換えてください。

テーブルの作成

Databricks で作成されたすべてのテーブルは、Delta Lake by デフォルトを使用します。Databricks マネージドテーブルの使用 Unity Catalog おすすめします。

前のコード例と次のコード例では、テーブル名 main.default.people_10m を、Unity Catalog のターゲットの 3 部構成のカタログ、スキーマ、およびテーブル名に置き換えます。

注記

Delta Lake は、すべての読み取り、書き込み、およびテーブル作成コマンド Databricksのデフォルトです。

Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

上記の操作では、新しいマネージドテーブルが作成されます。 Delta テーブルを作成するときに使用できるオプションについては、CREATE TABLEを参照してください。

Databricks Runtime 13.3 LTS 以降では、 CREATE TABLE LIKE を使用して、ソース Delta テーブルのスキーマとテーブル プロパティを複製する新しい空の Delta テーブルを作成できます。 これは、次のコード例に示すように、テーブルを開発環境から本番運用に昇格する場合に特に便利です。

SQL
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

空のテーブルを作成するには、Delta Lake for PythonScalaDeltaTableBuilder API を使用することもできます。同等の データフレームWriter APIsと比較すると、これらの APIs では、列のコメント、テーブルのプロパティ、 生成された列などの追加情報を簡単に指定できます。

備考

プレビュー

この機能は パブリック プレビュー段階です。

Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()

テーブルにアップサート

一連の更新と挿入を既存の Delta テーブルにマージするには、PythonScala の場合は DeltaTable.merge メソッドを使用し、SQL の場合は MERGE INTO ステートメントを使用します。たとえば、次の例では、ソース テーブルからデータを取得し、それをターゲット Delta テーブルにマージします。 両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。 一致する行がない場合、Delta Lake は新しい行を追加します。 この操作は 、アップサート と呼ばれます。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])

data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

SQL で *を指定すると、ソース・テーブルのカラムがターゲット・テーブルと同じであると仮定して、ターゲット・テーブルのすべてのカラムが更新または挿入されます。 ターゲット テーブルに同じ列がない場合、クエリは分析エラーをスローします。

挿入操作を実行するときは、テーブル内のすべての列に値を指定する必要があります (たとえば、既存のデータセットに一致する行がない場合)。 ただし、すべての値を更新する必要はありません。

結果を表示するには、テーブルに対してクエリーを実行します。

Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

表を読んでください

次の例に示すように、テーブル名またはテーブルパスを使用してDeltaテーブルのデータにアクセスします:

Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)

テーブルに書きなさい

Delta Lakeは、テーブルにデータを書き込むための標準構文を使用します。

既存の Delta テーブルに新しいデータをアトミックに追加するには、次の例に示すように追加モードを使用します。

Python
df.write.mode("append").saveAsTable("main.default.people_10m")

テーブル内のすべてのデータを置き換えるには、次の例のように上書きモードを使用してください。

Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")

テーブルを更新する

Delta テーブル内の述語に一致するデータを更新できます。 たとえば、 people_10m テーブルの例では、 gender 列の省略形を M または F から Male または Femaleに変更するには、次のコマンドを実行します。

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)

テーブルからの削除

述語に一致するデータを Delta テーブルから削除できます。 たとえば、 people_10m テーブルの例では、 birthDate 列の値が 1955より前の人物に対応するすべての行を削除するには、次のコマンドを実行できます。

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
important

削除では、最新バージョンの Delta テーブルからデータが削除されますが、古いバージョンが明示的に vacuumされるまで、物理ストレージからは削除されません。 詳細については、 vacuum を参照してください。

テーブル履歴の表示

テーブルの履歴を表示するには、PythonScalaDeltaTable.history メソッドと、テーブルへの書き込みごとにテーブルのバージョン、操作、ユーザーなどの出所情報を提供する SQL の DESCRIBE HISTORY ステートメントを使用します。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

以前のバージョンのテーブルをクエリする (タイムトラベル)

Delta Lakeのタイムトラベルでは、Deltaテーブルの古いスナップショットを照会することができます。

古いバージョンのテーブルをクエリするには、テーブルのバージョンまたはタイムスタンプを指定します。 たとえば、前の履歴からバージョン 0 またはタイムスタンプ 2024-05-15T22:43:15.000+00:00Z をクエリするには、次を使用します。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

タイムスタンプの場合、日付またはタイムスタンプ文字列 ( "2024-05-15T22:43:15.000+00:00""2024-05-15 22:43:15"など) のみが受け入れられます。

データフレームReader オプションを使用すると、次のように、テーブルの特定のバージョンまたはタイムスタンプに固定された Delta テーブルから データフレーム を作成できます。

Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

詳細については、「 Delta Lake テーブルの履歴の操作」を参照してください。

テーブルを最適化します

テーブルに対して複数の変更を行った後、小さなファイルがたくさんある場合があります。 読み取りクエリの速度を向上させるために、最適化操作を使用して小さなファイルを大きなファイルに折りたたむことができます。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

列ごとのZ-Order

読み取りパフォーマンスをさらに向上させるために、 Z-Orderingで同じファイルセット内の関連情報をコロケーションできます。 Delta Lake データスキップ アルゴリズムでは、このコロケーションを使用して、読み取る必要のあるデータの量を大幅に減らします。 データを Z-Order するには、操作ごとに Z-Order で順序付ける列を指定します。 たとえば、 genderでコロケーションするには、次のコマンドを実行します。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

最適化操作を実行する際に使用できるすべてのオプションについては、「データファイルのレイアウトの最適化」を参照してください。

スナップショットをクリーンアップする VACUUM

Delta Lake は読み取り用のスナップショット分離を提供するため、他のユーザーやジョブがテーブルに対してクエリを実行している間でも、最適化操作を安全に実行できます。 ただし、最終的には古いスナップショットをクリーンアップする必要があります。 これを行うには、 vacuum 操作を実行します。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

vacuum操作を効果的に使用する方法について詳しくは、vacuumを使用した未使用のデータファイルの削除を参照してください。