メインコンテンツまでスキップ

チュートリアル: Delta Lake テーブルの作成と管理

このチュートリアルでは、サンプル データを使用して一般的な Delta テーブル操作について説明します。Delta Lake は、Databricks 上のテーブルの基盤を提供する最適化されたストレージ レイヤーです。特に指定がない限り、Databricks 上のすべてのテーブルは Delta テーブルです。

始める前に

このチュートリアルを完了するには、次のものが必要です。

これらの例は 、「Synthetic Person Records: 10K to 10M Records」 というデータセットに依存しています。このデータセットには、人物の名、姓、性別、年齢などの架空の記録が含まれています。

まず、このチュートリアルのデータセットをダウンロードします。

  1. Kaggle の「合成人物レコード: 10K から 10M のレコード」ページにアクセスしてください。
  2. 「ダウンロード」 をクリックし、 「データセットを zip としてダウンロード」をクリックします 。これにより、 archive.zipという名前のファイルがローカル マシンにダウンロードされます。
  3. archive.zipファイルからarchiveフォルダーを抽出します。

次に、 person_10000.csvデータセットを Databricks ワークスペース内の Unity Catalogボリュームにアップロードします。Databricks 、ボリュームによってファイルへのアクセス、保存、管理、整理の機能が提供されるため、データをUnity Catalogボリュームにアップロードすることをお勧めします。

  1. クリックしてカタログエクスプローラーを開きますデータアイコン。サイドバーの カタログ
  2. カタログエクスプローラーで、追加またはプラスアイコン データを追加しボリュームを作成します
  3. ボリュームにmy-volumeという名前を付け、ボリュームの種類として 管理対象ボリューム を選択します。
  4. workspaceカタログとdefaultスキーマを選択し、 [作成] をクリックします。
  5. my-volumeを開いて、 「このボリュームにアップロード」 をクリックします。
  6. ローカル マシンのarchiveフォルダー内からperson_10000.csvファイルをドラッグ アンド ドロップするか、参照して選択します。
  7. アップロード 」をクリックします。

最後に、サンプルコードを実行するためのノートブックを作成します。

  1. クリック追加またはプラスアイコンサイドバーに 新しいものが追加されました
  2. クリックノートブックのアイコン。 ノートブック 新しいノートブックを作成します。
  3. ノートブックの言語を選択してください。

テーブルの作成

person_10000.csvからworkspace.default.people_10kという名前の新しいUnity Catalogマネージドテーブルを作成します。 Delta Lake は、Databricks のすべてのテーブル作成、読み取り、書き込みコマンドのデフォルトです。

Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()

# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")

# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)

テーブルを作成または複製するには、いくつかの方法があります。詳細については、 CREATE TABLEを参照してください。

Databricks Runtime 13.3 LTS 以降では、 CREATE TABLE LIKEを使用して、ソース Delta テーブルのスキーマとテーブル プロパティを複製する新しい空の Delta テーブルを作成できます。これは、テーブルを開発環境から本番運用にプロモートするときに役立ちます。

SQL
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
備考

プレビュー

この機能は パブリック プレビュー段階です。

空のテーブルを作成するには、 PythonおよびScalaDeltaTableBuilder API を使用します。DataFrameWriterおよびDataFrameWriterV2と比較すると、 DeltaTableBuilder API を使用すると、列コメント、テーブル プロパティ、生成された列などの追加情報を指定するのが簡単になります。

Python
from delta.tables import DeltaTable

(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)

display(spark.read.table("workspace.default.people_10k_2"))

テーブルへのアップサート

upsert と呼ばれる操作を使用して、テーブル内の既存のレコードを変更したり、新しいレコードを追加したりします。既存の Delta テーブルに更新と挿入のセットをマージするには、 PythonScalaDeltaTable.mergeメソッドと、SQL のMERGE INTOステートメントを使用します。

たとえば、ソース テーブルpeople_10k_updatesのデータをターゲット Delta テーブルworkspace.default.people_10kにマージします。両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。一致する行がない場合、Delta Lake は新しい行を追加します。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])

data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]

# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")

# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')

(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)

# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)

SQL では、 *演算子は、ソース テーブルにターゲット テーブルと同じ列があることを前提として、ターゲット テーブルのすべての列を更新または挿入します。ターゲット テーブルに同じ列がない場合、クエリは分析エラーをスローします。また、挿入操作を実行するときは、テーブル内のすべての列に値を指定する必要があります。列の値は空にすることができます (例: '' )。挿入操作を実行するときに、すべての値を更新する必要はありません。

テーブルの読み込み

Delta テーブルのデータにアクセスするには、テーブル名またはパスを使用します。Unity Catalogマネージドテーブルにアクセスするには、完全修飾テーブル名を使用します。 パスベースのアクセスは、ボリュームと外部テーブルに対してのみサポートされており、マネージドテーブルに対してはサポートされていません。 詳細については、 Unity Catalogボリュームのパスのルールとアクセス」を参照してください。

Python
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)

テーブルへの書き込み

Delta Lake は、テーブルにデータを書き込むために標準の構文を使用します。既存の Delta テーブルに新しいデータを追加するには、追加モードを使用します。upserting とは異なり、テーブルへの書き込みでは重複レコードはチェックされません。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col

schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])

data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]

# Create the new data.
df = spark.createDataFrame(data, schema)

# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")

# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)

Databricks ノートブックのセル出力には、最大 10,000 行または 2 MB のいずれか小さい方が表示されます。workspace.default.people_10kは 10,000 を超える行が含まれているため、ノートブックのdisplay(df)出力には最初の 10,000 行のみが表示されます。追加の行はテーブルに存在しますが、この制限によりノートブックの出力にはレンダリングされません。追加の行を具体的にフィルタリングすることで、それらの行を表示できます。

テーブル内のすべてのデータを置き換えるには、上書きモードを使用します。

Python
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")

テーブルの更新

述語に基づいて Delta テーブル内のデータを更新します。たとえば、 gender列の値をFemaleからFに、 MaleからMに、 OtherからOに変更します。

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)

# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)

deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)

テーブルからの削除

Delta テーブルから述語に一致するデータを削除します。たとえば、次のコードは 2 つの削除操作を示しています。最初に年齢が 18 未満の行を削除し、次に年齢が 21 未満の行を削除します。

Python
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")

# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")

# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')

# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
重要

削除すると、 Deltaテーブルの最新バージョンからデータが削除されますが、古いバージョンが明示的にvacuumまで物理ストレージからは削除されません。 詳細については、 vacuumを参照してください。

テーブル履歴の表示

テーブルへの各書き込みの起源情報を表示するには、 PythonScalaDeltaTable.historyメソッドと SQL のDESCRIBE HISTORYステートメントを使用します。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())

タイムトラベルを使用してテーブルの以前のバージョンをクエリする

Delta Lake タイムトラベルを使用して、Delta テーブルの古いスナップショットをクエリします。特定のバージョンを照会するには、テーブルのバージョン番号またはタイムスタンプを使用します。たとえば、テーブルの履歴からバージョン0またはタイムスタンプ2026-01-05T23:09:47.000+00:00をクエリします。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()

# Query using the version number.
display(deltaHistory.where("version == 0"))

# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))

タイムスタンプの場合、日付またはタイムスタンプの文字列のみが受け入れられます。たとえば、文字列は"2026-01-05T22:43:15.000+00:00"または"2026-01-05 22:43:15"としてフォーマットする必要があります。

DataFrameReaderオプションを使用して、テーブルの特定のバージョンまたはタイムスタンプに固定された Delta テーブルから DataFrame を作成します。

Python
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")

# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")

display(df)

詳細については、 「テーブル履歴の操作」を参照してください。

テーブルの最適化

テーブルを複数回変更すると、複数の小さなファイルが作成され、読み取りクエリのパフォーマンスが低下する可能性があります。最適化操作を使用して、小さなファイルを大きなファイルに結合することで速度を向上させます。OPTIMIZE を参照してください。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
注記

予測的最適化が有効な場合は、手動で最適化する必要はありません。 予測的最適化はメンテナンスタスクを自動的に管理します。 詳細については、 Unity Catalogマネージド テーブルの予測的最適化を参照してください。

列によるZ-Order

データをZ-Order読み取りパフォーマンスをさらに向上させるには、操作で順序付ける列を指定します。 たとえば、高カーディナリティ列firstNameでコロケートします。Z-Orderingの詳細については、 「データ スキップ」を参照してください。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")

vacuum操作でスナップショットをクリーンアップする

Delta Lake には読み取り用のスナップショット分離機能があるため、他のユーザーまたはジョブがテーブルをクエリしている間でも最適化操作を安全に実行できます。ただし、最終的には古いスナップショットをクリーンアップする必要があります。そうすることで、ストレージ コストが削減され、クエリ パフォーマンスが向上し、データのコンプライアンスが確保されます。古いスナップショットをクリーンアップするには、 VACUUM操作を実行します。vacuumを参照してください。

Python
from delta.tables import *

deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()

vacuum操作を効果的に使用する方法の詳細については、 「未使用のデータ ファイルをvacuumで削除する」を参照してください。

次のステップ

関連リソース