チュートリアル:Delta Lake

このチュートリアルでは、以下のようなDatabricks上でのDelta Lakeの一般的なオペレーションを紹介します:

この記事の PythonScala、SQL 、 のDatabricks サンプル コードは、クラスターなどの コンピュート リソースに接続された プラットフォーム 内から実行できます。この記事の コードは、 の Databricks SQLSQL ウェアハウス に関連付けられた クエリ 内から実行することもできます。SQL

ソースデータを準備する

このチュートリアルでは、People 10 M というデータセットを使用します。このデータセットには、氏名、生年月日、給与など、人物に関する事実を保持する 1,000 万件の架空のレコードが含まれています。 このチュートリアルでは、このデータセットがターゲットのUnity Catalog Databricksワークスペースに関連付けられた ボリューム 内にあることを前提としています。

このチュートリアルの People 10 M データセットを取得するには、次の手順を実行します。

  1. Kaggleの People 10 M ページに移動します。

  2. ダウンロード 」をクリックして、 archive.zip という名前のファイルをローカル・マシンにダウンロードします。

  3. archive.zip ファイルから export.csv という名前のファイルを抽出します。export.csvファイルにはこのチュートリアルのデータが含まれています。

export.csv ファイルをボリュームにアップロードするには、次の操作を行います。

  1. サイドバーで [ カタログ] をクリックします。

  2. 「カタログエクスプローラ」(Catalog Explorer) で、export.csvファイルをアップロードするボリュームを参照して開きます。

  3. [ このボリュームにアップロード] をクリックします。

  4. ローカルマシン上の export.csv ファイルをドラッグ&ドロップするか、参照して選択します。

  5. アップロード」をクリックします。

次のコード例では、 /Volumes/main/default/my-volume/export.csv をターゲット ボリューム内の export.csv ファイルへのパスに置き換えます。

テーブルを作成する

Databricks で作成されたすべてのテーブルは、デフォルトで Delta Lake を使用します。 Databricks では、Unity Catalog で管理されるテーブルの使用を推奨しています。

前のコード例と次のコード例では、テーブル名main.default.people_10mを、Unity Catalog 内のターゲットの 3 部構成のカタログ、スキーマ、およびテーブル名に置き換えます。

Delta Lake は、すべての読み取り、書き込み、およびテーブル作成コマンド Databricks の既定値です。

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

上記の操作では、新しいマネージド テーブルが作成されます。 Deltaテーブルを作成するときに使用できるオプションの詳細については、 CREATE TABLEを参照してください。

Databricks Runtime 13.3 LTS 以降では、 CREATE TABLE LIKEを使用して、ソース Delta テーブルのスキーマとテーブル プロパティを複製する新しい空の Delta テーブルを作成できます。 これは、次のコード例に示すように、開発環境から本番運用にテーブルを昇格させる場合に特に役立ちます。

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

空のテーブルを作成するには、Delta Lake for PythonおよびScalaDeltaTableBuilder API を使用することもできます。 同等の DataFrameWriter APIsと比較すると、これらのAPIs使用すると、列コメント、テーブル プロパティ、生成された列などの追加情報を指定することが容易になります。

プレビュー

この機能はパブリックプレビュー段階にあります。

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()
DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

テーブルへのアップサート

既存の Delta テーブルに更新と挿入のセットをマージするには、 PythonScalaの場合はDeltaTable.mergeメソッドを使用し、SQL の場合はMERGE INTOステートメントを使用します。 たとえば、次の例では、ソース テーブルからデータを取得し、それをターゲット Delta テーブルにマージします。 両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。 一致する行がない場合、Delta Lake は新しい行を追加します。 この操作は、 アップサートと呼ばれます。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()
CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

SQL で*を指定すると、ソース テーブルにターゲット テーブルと同じ列があると想定して、ターゲット テーブルのすべての列が更新または挿入されます。 ターゲット テーブルに同じ列がない場合、クエリは分析エラーをスローします。

挿入操作を実行するときは、テーブル内のすべての列に値を指定する必要があります (たとえば、既存のデータセットに一致する行がない場合など)。 ただし、すべての値を更新する必要はありません。

結果を表示するには、テーブルに対してクエリーを実行します。

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SELECT * FROM main.default.people_10m WHERE id >= 9999998

テーブルを読む

次の例に示すように、テーブル名またはテーブルパスを使用してDeltaテーブルのデータにアクセスします:

people_df = spark.read.table("main.default.people_10m")
display(people_df)
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SELECT * FROM main.default.people_10m;

テーブルに書き込む

Delta Lakeは、テーブルにデータを書き込むための標準構文を使用します。

既存の Delta テーブルに新しいデータをアトミックに追加するには、次の例に示すように追加モードを使用します。

df.write.mode("append").saveAsTable("main.default.people_10m")
df.write.mode("append").saveAsTable("main.default.people_10m")
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

テーブル内のすべてのデータを置き換えるには、次の例のように上書きモードを使用します。

df.write.mode("overwrite").saveAsTable("main.default.people_10m")
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

テーブルを更新する

Delta テーブル内の述語に一致するデータを更新できます。 たとえば、例のpeople_10mテーブルで、 gender列の略語をMまたはFからMaleまたはFemaleに変更するには、次のコマンドを実行します。

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

テーブルから削除する

述語に一致するデータを Delta テーブルから削除できます。 たとえば、例のpeople_10mテーブルで、 1955より前のbirthDate列に値を持つ人物に対応するすべての行を削除するには、次のコマンドを実行します。

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

重要

削除すると、最新バージョンのDeltaテーブルのデータが削除されますが、古いバージョンが明示的にvacuumまで物理ストレージからは削除されません。 詳細についてはVacuumを参照してください。

テーブル履歴の表示

テーブルの履歴を表示するには、 PythonおよびScalaの場合はDeltaTable.historyメソッド、SQL の場合はDESCRIBE HISTORYステートメントを使用します。これらのステートメントは、テーブルへの書き込みごとに、テーブルのバージョン、操作、ユーザーなどの出所情報を提供します。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
DESCRIBE HISTORY main.default.people_10m

以前のバージョンのテーブルをクエリーする(タイムトラベル)

Delta Lakeのタイムトラベルでは、Deltaテーブルの古いスナップショットを照会することができます。

古いバージョンのテーブルをクエリするには、テーブルのバージョンまたはタイムスタンプを指定します。 たとえば、バージョン 0 または前の履歴から 2024-05-15T22:43:15.000+00:00Z タイムスタンプを照会するには、次のコマンドを使用します。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

タイムスタンプの場合、日付またはタイムスタンプの文字列のみが受け入れられます (例: "2024-05-15T22:43:15.000+00:00"または"2024-05-15 22:43:15"

DataFrameReader オプションを使用すると、テーブルの特定のバージョンまたはタイムスタンプに固定された Delta テーブルから DataFrame を作成できます。次に例を示します。

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

詳細については、「Delta Lakeのテーブル履歴を取り扱う」を参照してください。

テーブルを最適化する

テーブルに対して複数の変更を実行した後、小さなファイルが多数存在する場合があります。 読み取りクエリの速度を向上させるには、optimize 操作を使用して、小さなファイルを大きなファイルに折りたたむことができます。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize()
OPTIMIZE main.default.people_10m

列ごとのZ-order

読み取りパフォーマンスをさらに向上させるには、 Z-Orderingを使用して、関連する情報を同じファイル セット内に配置できます。 Delta Lake のデータ スキッピング アルゴリズムは、このコロケーションを使用して、読み取る必要のあるデータの量を大幅に削減します。 データをZ-Orderにするには、 Z-Order操作で順序付けする列を指定します。 たとえば、 genderでコロケートするには、次を実行します。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
OPTIMIZE main.default.people_10m
ZORDER BY (gender)

最適化操作の実行時に使用できるすべてのオプションについては、「 データ ファイルのレイアウトの最適化」を参照してください。

VACUUMでスナップショットをクリーンアップ

Delta Lake は読み取りのスナップショット分離を提供します。つまり、他のユーザーまたはジョブがテーブルをクエリしている間でも、最適化操作を安全に実行できます。 ただし、最終的には古いスナップショットをクリーンアップする必要があります。 これは、 vacuum操作を実行することで実行できます。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
VACUUM main.default.people_10m

vacuum操作を効果的に使用する方法の詳細については、 vacuumによる未使用のデータ ファイルの削除」を参照してください。