チュートリアル:Delta Lake

このチュートリアルでは、以下のようなDatabricks上でのDelta Lakeの一般的なオペレーションを紹介します:

この記事のサンプル Python、R、Scala、SQL コードは、Databricks クラスター に接続された ノートブック 内から実行できます。この記事の SQL コードは、 Databricks SQL SQL ウェアハウス に関連付けられた クエリ 内から実行することもできます。

次のコード例の一部では、スキーマ(データベースとも呼ばれます)とテーブルまたはビュー(例:default.people10m)で構成される2レベルの名前空間表記を使用しています。これらの例をUnity Catalogで使用するには、2レベルのネームスペースを、カタログ、スキーマ、およびテーブルまたはビューで構成されるUnity Catalogの3レベルのネームスペース表記に置き換えます(たとえばmain.default.people10m)。

テーブルを作成する

Databricksで作成されたすべてのテーブルは、デフォルトでDelta Lakeを使用します。

Delta Lake は、すべての読み取り、書き込み、およびテーブル作成コマンド Databricks の既定値です。

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")
DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

上記の操作では、データから推論されたスキーマを使用して新しい マネージ テーブル を作成します。 Delta テーブルを作成するときに使用できるオプションに関する情報については、「 CREATE TABLE 」を参照してください。

マネージドテーブルの場合、Databricksはデータの場所を決定します。場所を取得するには、次のようなDESCRIBE DETAILステートメントを使用できます:

display(spark.sql('DESCRIBE DETAIL people_10m'))
display(sql("DESCRIBE DETAIL people_10m"))
display(spark.sql("DESCRIBE DETAIL people_10m"))
DESCRIBE DETAIL people_10m;

データを挿入する前にスキーマを指定してテーブルを作成したい場合があります。これは、次のSQLコマンドを使用して実行できます:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

Databricks Runtime 13.0以降では、CREATE TABLE LIKEを使用して、ソースDeltaテーブルのスキーマとテーブルプロパティを複製する新しい空のDeltaテーブルを作成できます。これは、次のコード例のように、テーブルを開発環境から運用環境にプロモートするときに特に役立ちます:

CREATE TABLE prod.people10m LIKE dev.people10m

Delta LakeのDeltaTableBuilder APIを使用してテーブルを作成することもできます。DataFrameWriterのAPIと比較して、このAPIでは、列のコメント、テーブルのプロパティ、生成された列などの追加情報を簡単に指定できます。

プレビュー

この機能はパブリックプレビュー段階にあります。

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()
// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

テーブルへのアップサート

一連の更新と挿入を既存のDeltaテーブルにマージするには、MERGE INTOステートメントを使用します。たとえば、次のステートメントはソーステーブルからデータを取得し、それをターゲットDeltaテーブルにマージします。両方のテーブルに一致する行がある場合、Delta Lakeは指定された式を使用してデータ列を更新します。一致する行がない場合、Delta Lakeは新しい行を追加します。このオペレーションはアップサートと呼ばれます。

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

*を指定すると、ターゲットテーブル内のすべての列が更新または挿入されます。これは、ソーステーブルにターゲットテーブルの列と同じ列があることを前提としています。そうでない場合、クエリーは分析エラーをスローします。

INSERTオペレーションを行う場合(例えば、既存のデータセットに一致する行がない場合)には、テーブルのすべての列に値を指定する必要があります。ただし、すべての値を更新する必要はありません。

結果を表示するには、テーブルに対してクエリーを実行します。

SELECT * FROM people_10m WHERE id >= 9999998

テーブルを読む

次の例に示すように、テーブル名またはテーブルパスを使用してDeltaテーブルのデータにアクセスします:

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)
people_df = tableToDF(table_name)

display(people_df)
val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)
SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

テーブルに書き込む

Delta Lakeは、テーブルにデータを書き込むための標準構文を使用します。

新しいデータを既存のDeltaテーブルにアトミックに追加するには、次の例のようにappendモードを使用します:

INSERT INTO people10m SELECT * FROM more_people
df.write.mode("append").saveAsTable("people10m")
df.write.mode("append").saveAsTable("people10m")

テーブル内のすべてのデータをアトミックに置換するには、次の例のようにoverwriteモードを使用します:

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
df.write.mode("overwrite").saveAsTable("people10m")
df.write.mode("overwrite").saveAsTable("people10m")

テーブルを更新する

Deltaテーブルの述語にマッチするデータを更新することができます。たとえば、people10mという名前のテーブルまたは/tmp/delta/people-10mのパスで、gender列の略称をMまたはFからMaleまたはFemaleに変更するには、次のコマンドを実行します:

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

テーブルから削除する

Deltaテーブルから述語にマッチするデータを削除することができます。たとえば、people10mという名前のテーブルまたは/tmp/delta/people-10mのパスで、1955より前のbirthDate列の値を持つpeopleに対応するすべての行を削除するには、次のコマンドを実行します:

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

重要

delete Deltaテーブルの最新バージョンからデータを削除しますが、古いバージョンが明示的にバキュームされるまで物理ストレージからは削除されません。詳細については、「vacuum」を参照してください。

テーブル履歴の表示

テーブルの履歴を表示するには、DESCRIBE HISTORYステートメントを使用します。DESCRIBE HISTORYステートメントは、テーブルへの書き込みごとに、テーブルのバージョン、オペレーション、ユーザーなどの実績情報を提供します。

DESCRIBE HISTORY people_10m

以前のバージョンのテーブルをクエリーする(タイムトラベル)

Delta Lakeのタイムトラベルでは、Deltaテーブルの古いスナップショットを照会することができます。

テーブルの古いバージョンを照会するには、SELECTステートメントでバージョンまたはタイムスタンプを指定します。たとえば、上記の履歴からバージョン0をクエリーするには、次を使用します:

SELECT * FROM people_10m VERSION AS OF 0

または

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

タイムスタンプの場合、日付またはタイムスタンプの文字列のみが受け入れられます(例:"2019-01-01"および"2019-01-01'T'00:00:00.000Z"

DataFrameReaderオプションを使用すると、Pythonなどの特定のバージョンのテーブルに固定されたDeltaテーブルからDataFrameを作成できます:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

または、代わりに:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

詳細については、「Delta Lakeのテーブル履歴を取り扱う」を参照してください。

テーブルを最適化する

テーブルに対して複数の変更を実行すると、小さなファイルが多数存在する可能性があります。読み取りクエリーの速度を向上させるには、OPTIMIZEを使用して小さなファイルを大きなファイルに折りたたむことができます:

OPTIMIZE people_10m

列ごとのZ-order

読み取りパフォーマンスをさらに向上させるために、Z-ordering によって同じファイルセット内に関連情報を同じ場所に配置できます。この共局性は、Delta Lakeデータスキップアルゴリズムによって自動的に使用され、読み取る必要があるデータの量が大幅に削減されます。データをZ-order するには、ZORDER BY句で順序付けする列を指定します。たとえば、genderによって同じ場所に配置するには、次を実行します:

OPTIMIZE people_10m
ZORDER BY (gender)

OPTIMIZEの実行時に使用できるオプションの完全なセットについては、「Delta LakeでOPTIMIZEによりデータファイルを圧縮する」を参照してください。

VACUUMでスナップショットをクリーンアップ

Delta Lakeは読み取りのスナップショット分離を提供します。これは、他のユーザーまたはジョブがテーブルをクエリーしている間でも、OPTIMIZEを安全に実行できることを意味します。ただし、最終的には古いスナップショットをクリーンアップする必要があります。これを行うには、VACUUMコマンドを実行します:

VACUUM people_10m

VACUUMを効果的に使用する方法の詳細については、「バキュームを使用して未使用のデータファイルを削除する」を参照してください。