チュートリアル: Delta Lake
このチュートリアルでは、以下のようなDatabricks上でのDelta Lakeの一般的なオペレーションを紹介します:
- テーブルを作成してください。
- テーブルにアップサートします。
- テーブルから読み取ります。
- テーブルの履歴を表示します。
- 以前のバージョンのテーブルをクエリします。
- テーブルを最適化してください。
- Z-Orderインデックスを追加します。
- 参照されていないファイルvacuum。
PythonScalaSQLこの記事の例のDatabricks、 、コードは、 クラスター などの コンピュート リソースにアタッチされた ノートブック 内から実行できます。この記事の コードは、 の Databricks SQLSQLウェアハウス に関連付けられたクエリ 内から実行することもできます。SQL
ソース データを準備する
このチュートリアルでは、People 10 M というデータセットを使用します。これには、姓名、生年月日、給与など、人々に関する事実を保持する 1,000 万件の架空のレコードが含まれています。 このチュートリアルでは、このデータセットが、ターゲットの Databricks ワークスペースに関連付けられている Unity Catalog ボリューム 内にあることを前提としています。
このチュートリアルの People 10 M データセットを取得するには、次の手順を実行します。
- Kaggle の People 10 M ページに移動します。
archive.zip
という名前のファイルをローカルマシンにダウンロードするには 、ダウンロードをクリックしてください。archive.zip
ファイルからexport.csv
という名前のファイルを抽出します。export.csv
ファイルには、このチュートリアルのデータが含まれています。
export.csv
ファイルをボリュームにアップロードするには、次の手順を実行します。
- サイドバーで、[ カタログ] をクリックします。
- カタログエクスプローラーで 、
export.csv
ファイルをアップロードするボリュームを参照して開きます。 - 「 このボリュームにアップロード 」をクリックしてください。
- ローカル コンピューター上の
export.csv
ファイルをドラッグ アンド ドロップするか、参照して選択します。 - 「 アップロード 」をクリックします。
次のコード例では、/Volumes/main/default/my-volume/export.csv
をターゲットボリュームの export.csv
ファイルへのパスに置き換えてください。
テーブルの作成
Databricks で作成されたすべてのテーブルは、Delta Lake by デフォルトを使用します。Databricks マネージドテーブルの使用 Unity Catalog おすすめします。
前のコード例と次のコード例では、テーブル名 main.default.people_10m
を、Unity Catalog のターゲットの 3 部構成のカタログ、スキーマ、およびテーブル名に置き換えます。
Delta Lake は、すべての読み取り、書き込み、およびテーブル作成コマンド Databricksのデフォルトです。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
上記の操作では、新しいマネージドテーブルが作成されます。 Delta テーブルを作成するときに使用できるオプションについては、CREATE TABLEを参照してください。
Databricks Runtime 13.3 LTS 以降では、 CREATE TABLE LIKE を使用して、ソース Delta テーブルのスキーマとテーブル プロパティを複製する新しい空の Delta テーブルを作成できます。 これは、次のコード例に示すように、テーブルを開発環境から本番運用に昇格する場合に特に便利です。
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
空のテーブルを作成するには、Delta Lake for Python と Scala の DeltaTableBuilder
API を使用することもできます。同等の データフレームWriter APIsと比較すると、これらの APIs では、列のコメント、テーブルのプロパティ、 生成された列などの追加情報を簡単に指定できます。
プレビュー
この機能は パブリック プレビュー段階です。
- Python
- Scala
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
テーブルにアップサート
一連の更新と挿入を既存の Delta テーブルにマージするには、Python と Scala の場合は DeltaTable.merge
メソッドを使用し、SQL の場合は MERGE INTO ステートメントを使用します。たとえば、次の例では、ソース テーブルからデータを取得し、それをターゲット Delta テーブルにマージします。 両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。 一致する行がない場合、Delta Lake は新しい行を追加します。 この操作は 、アップサート と呼ばれます。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
SQL で *
を指定すると、ソース・テーブルのカラムがターゲット・テーブルと同じであると仮定して、ターゲット・テーブルのすべてのカラムが更新または挿入されます。 ターゲット テーブルに同じ列がない場合、クエリは分析エラーをスローします。
挿入操作を実行するときは、テーブル内のすべての列に値を指定する必要があります (たとえば、既存のデータセットに一致する行がない場合)。 ただし、すべての値を更新する必要はありません。
結果を表示するには、テーブルに対してクエリーを実行します。
- Python
- Scala
- SQL
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SELECT * FROM main.default.people_10m WHERE id >= 9999998
表を読んでください
次の例に示すように、テーブル名またはテーブルパスを使用してDeltaテーブルのデータにアクセスします:
- Python
- Scala
- SQL
people_df = spark.read.table("main.default.people_10m")
display(people_df)
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SELECT * FROM main.default.people_10m;
テーブルに書きなさい
Delta Lakeは、テーブルにデータを書き込むための標準構文を使用します。
既存の Delta テーブルに新しいデータをアトミックに追加するには、次の例に示すように追加モードを使用します。
- Python
- Scala
- SQL
df.write.mode("append").saveAsTable("main.default.people_10m")
df.write.mode("append").saveAsTable("main.default.people_10m")
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
テーブル内のすべてのデータを置き換えるには、次の例のように上書きモードを使用してください。
- Python
- Scala
- SQL
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
テーブルを更新する
Delta テーブル内の述語に一致するデータを更新できます。 たとえば、 people_10m
テーブルの例では、 gender
列の省略形を M
または F
から Male
または Female
に変更するには、次のコマンドを実行します。
- Python
- Scala
- SQL
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
テーブルからの削除
述語に一致するデータを Delta テーブルから削除できます。 たとえば、 people_10m
テーブルの例では、 birthDate
列の値が 1955
より前の人物に対応するすべての行を削除するには、次のコマンドを実行できます。
- Python
- Scala
- SQL
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
削除では、最新バージョンの Delta テーブルからデータが削除されますが、古いバージョンが明示的に vacuumされるまで、物理ストレージからは削除されません。 詳細については、 vacuum を参照してください。
テーブル履歴の表示
テーブルの履歴を表示するには、Python と Scala の DeltaTable.history
メソッドと、テーブルへの書き込みごとにテーブルのバージョン、操作、ユーザーなどの出所情報を提供する SQL の DESCRIBE HISTORY ステートメントを使用します。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
DESCRIBE HISTORY main.default.people_10m
以前のバージョンのテーブルをクエリする (タイムトラベル)
Delta Lakeのタイムトラベルでは、Deltaテーブルの古いスナップショットを照会することができます。
古いバージョンのテーブルをクエリするには、テーブルのバージョンまたはタイムスタンプを指定します。 たとえば、前の履歴からバージョン 0 またはタイムスタンプ 2024-05-15T22:43:15.000+00:00Z
をクエリするには、次を使用します。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
タイムスタンプの場合、日付またはタイムスタンプ文字列 ( "2024-05-15T22:43:15.000+00:00"
や "2024-05-15 22:43:15"
など) のみが受け入れられます。
データフレームReader オプションを使用すると、次のように、テーブルの特定のバージョンまたはタイムスタンプに固定された Delta テーブルから データフレーム を作成できます。
- Python
- Scala
- SQL
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
詳細については、「 Delta Lake テーブルの履歴の操作」を参照してください。
テーブルを最適化します
テーブルに対して複数の変更を行った後、小さなファイルがたくさんある場合があります。 読み取りクエリの速度を向上させるために、最適化操作を使用して小さなファイルを大きなファイルに折りたたむことができます。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
OPTIMIZE main.default.people_10m
列ごとのZ-Order
読み取りパフォーマンスをさらに向上させるために、 Z-Orderingで同じファイルセット内の関連情報をコロケーションできます。 Delta Lake データスキップ アルゴリズムでは、このコロケーションを使用して、読み取る必要のあるデータの量を大幅に減らします。 データを Z-Order するには、操作ごとに Z-Order で順序付ける列を指定します。 たとえば、 gender
でコロケーションするには、次のコマンドを実行します。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
最適化操作を実行する際に使用できるすべてのオプションについては、「データファイルのレイアウトの最適化」を参照してください。
スナップショットをクリーンアップする VACUUM
Delta Lake は読み取り用のスナップショット分離を提供するため、他のユーザーやジョブがテーブルに対してクエリを実行している間でも、最適化操作を安全に実行できます。 ただし、最終的には古いスナップショットをクリーンアップする必要があります。 これを行うには、 vacuum 操作を実行します。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
VACUUM main.default.people_10m
vacuum操作を効果的に使用する方法について詳しくは、vacuumを使用した未使用のデータファイルの削除を参照してください。