チュートリアル: Delta Lake テーブルの作成と管理
このチュートリアルでは、サンプル データを使用して一般的な Delta テーブル操作について説明します。Delta Lake は、Databricks 上のテーブルの基盤を提供する最適化されたストレージ レイヤーです。特に指定がない限り、Databricks 上のすべてのテーブルは Delta テーブルです。
始める前に
このチュートリアルを完了するには、次のものが必要です。
- 既存のコンピュート リソースを使用するか、新しいコンピュート リソースを作成する権限。 「コンピュート」を参照。
- Unity Catalog権限:
workspaceカタログのUSE CATALOG、USE SCHEMA、およびCREATE TABLE。 これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。
これらの例は 、「Synthetic Person Records: 10K to 10M Records」 というデータセットに依存しています。このデータセットには、人物の名、姓、性別、年齢などの架空の記録が含まれています。
まず、このチュートリアルのデータセットをダウンロードします。
- Kaggle の「合成人物レコード: 10K から 10M のレコード」ページにアクセスしてください。
- 「ダウンロード」 をクリックし、 「データセットを zip としてダウンロード」をクリックします 。これにより、
archive.zipという名前のファイルがローカル マシンにダウンロードされます。 archive.zipファイルからarchiveフォルダーを抽出します。
次に、 person_10000.csvデータセットを Databricks ワークスペース内の Unity Catalogボリュームにアップロードします。Databricks 、ボリュームによってファイルへのアクセス、保存、管理、整理の機能が提供されるため、データをUnity Catalogボリュームにアップロードすることをお勧めします。
- クリックしてカタログエクスプローラーを開きます
サイドバーの カタログ 。
- カタログエクスプローラーで、
データを追加し 、 ボリュームを作成します 。
- ボリュームに
my-volumeという名前を付け、ボリュームの種類として 管理対象ボリューム を選択します。 workspaceカタログとdefaultスキーマを選択し、 [作成] をクリックします。my-volumeを開いて、 「このボリュームにアップロード」 をクリックします。- ローカル マシンの
archiveフォルダー内からperson_10000.csvファイルをドラッグ アンド ドロップするか、参照して選択します。 - 「 アップロード 」をクリックします。
最後に、サンプルコードを実行するためのノートブックを作成します。
- クリック
サイドバーに 新しいものが追加されました 。
- クリック
ノートブック 新しいノートブックを作成します。
- ノートブックの言語を選択してください。
テーブルの作成
person_10000.csvからworkspace.default.people_10kという名前の新しいUnity Catalogマネージドテーブルを作成します。 Delta Lake は、Databricks のすべてのテーブル作成、読み取り、書き込みコマンドのデフォルトです。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/workspace/default/my-volume/person_10000.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
# If you know the table does not already exist, you can use this command instead.
# df.write.saveAsTable("workspace.default.people_10k")
# View the new table.
df = spark.read.table("workspace.default.people_10k")
display(df)
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
val df = spark.read
.format("csv")
.option("header", true)
.schema(schema)
.load("/Volumes/workspace/default/my-volume/person_10000.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("workspace.default.people_10k").createOrReplace()
// If you know the table does not already exist, you can use this command instead.
// df.saveAsTable("workspace.default.people_10k")
// View the new table.
val df2 = spark.read.table("workspace.default.people_10k")
display(df2)
-- Create the table with only the required columns and rename person_id to id.
CREATE OR REPLACE TABLE workspace.default.people_10k AS
SELECT
person_id AS id,
firstname,
lastname,
gender,
age
FROM read_files(
'/Volumes/workspace/default/my-volume/person_10000.csv',
format => 'csv',
header => true
);
-- View the new table.
SELECT * FROM workspace.default.people_10k;
テーブルを作成または複製するには、いくつかの方法があります。詳細については、 CREATE TABLEを参照してください。
Databricks Runtime 13.3 LTS 以降では、 CREATE TABLE LIKEを使用して、ソース Delta テーブルのスキーマとテーブル プロパティを複製する新しい空の Delta テーブルを作成できます。これは、テーブルを開発環境から本番運用にプロモートするときに役立ちます。
CREATE TABLE workspace.default.people_10k_prod LIKE workspace.default.people_10k
プレビュー
この機能は パブリック プレビュー段階です。
空のテーブルを作成するには、 PythonおよびScalaのDeltaTableBuilder API を使用します。DataFrameWriterおよびDataFrameWriterV2と比較すると、 DeltaTableBuilder API を使用すると、列コメント、テーブル プロパティ、生成された列などの追加情報を指定するのが簡単になります。
- Python
- Scala
from delta.tables import DeltaTable
(
DeltaTable.createIfNotExists(spark)
.tableName("workspace.default.people_10k_2")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("lastName", "STRING", comment="surname")
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
)
display(spark.read.table("workspace.default.people_10k_2"))
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("workspace.default.people_10k")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build()
)
.addColumn("gender", "STRING")
.addColumn("age", "INT")
.execute()
display(spark.read.table("workspace.default.people_10k"))
テーブルへのアップサート
upsert と呼ばれる操作を使用して、テーブル内の既存のレコードを変更したり、新しいレコードを追加したりします。既存の Delta テーブルに更新と挿入のセットをマージするには、 PythonとScalaのDeltaTable.mergeメソッドと、SQL のMERGE INTOステートメントを使用します。
たとえば、ソース テーブルpeople_10k_updatesのデータをターゲット Delta テーブルworkspace.default.people_10kにマージします。両方のテーブルに一致する行がある場合、Delta Lake は指定された式を使用してデータ列を更新します。一致する行がない場合、Delta Lake は新しい行を追加します。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from delta.tables import DeltaTable
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10001, 'Billy', 'Luppitt', 'M', 55),
(10002, 'Mary', 'Smith', 'F', 98),
(10003, 'Elias', 'Leadbetter', 'M', 48),
(10004, 'Jane', 'Doe', 'F', 30),
(10005, 'Joshua', '', 'M', 90),
(10006, 'Ginger', '', 'F', 16),
]
# Create the source table if it does not exist. Otherwise, replace the existing source table.
people_10k_updates = spark.createDataFrame(data, schema)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
# Merge the source and target tables.
deltaTable = DeltaTable.forName(spark, 'workspace.default.people_10k')
(deltaTable.alias("people_10k")
.merge(
people_10k_updates.alias("people_10k_updates"),
"people_10k.id = people_10k_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# View the additions to the table.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] >= 10001)
display(df_filtered)
import org.apache.spark.sql.types._
import io.delta.tables._
// Define schema
val schema = StructType(Array(
StructField("id", IntegerType, true),
StructField("firstName", StringType, true),
StructField("lastName", StringType, true),
StructField("gender", StringType, true),
StructField("age", IntegerType, true)
))
// Create data as Seq of Tuples
val data = Seq(
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16)
)
// Create DataFrame directly from Seq of Tuples
val people_10k_updates = spark.createDataFrame(data).toDF(
"id", "firstName", "lastName", "gender", "age"
)
people_10k_updates.createOrReplaceTempView("people_10k_updates")
// Merge the source and target tables
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.as("people_10k")
.merge(
people_10k_updates.as("people_10k_updates"),
"people_10k.id = people_10k_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
// View the additions to the table.
val df = spark.read.table("workspace.default.people_10k")
val df_filtered = df.filter($"id" >= 10001)
display(df_filtered)
-- Create the source table if it does not exist. Otherwise, replace the existing source table.
CREATE OR REPLACE TABLE workspace.default.people_10k_updates(
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert new data into the source table.
INSERT INTO workspace.default.people_10k_updates VALUES
(10001, "Billy", "Luppitt", "M", 55),
(10002, "Mary", "Smith", "F", 98),
(10003, "Elias", "Leadbetter", "M", 48),
(10004, "Jane", "Doe", "F", 30),
(10005, "Joshua", "", "M", 90),
(10006, "Ginger", "", "F", 16);
-- Merge the source and target tables.
MERGE INTO workspace.default.people_10k AS people_10k
USING workspace.default.people_10k_updates AS people_10k_updates
ON people_10k.id = people_10k_updates.id
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *;
-- View the additions to the table.
SELECT * FROM workspace.default.people_10k WHERE id >= 10001
SQL では、 *演算子は、ソース テーブルにターゲット テーブルと同じ列があることを前提として、ターゲット テーブルのすべての列を更新または挿入します。ターゲット テーブルに同じ列がない場合、クエリは分析エラーをスローします。また、挿入操作を実行するときは、テーブル内のすべての列に値を指定する必要があります。列の値は空にすることができます (例: '' )。挿入操作を実行するときに、すべての値を更新する必要はありません。
テーブルの読み込み
Delta テーブルのデータにアクセスするには、テーブル名またはパスを使用します。Unity Catalogマネージドテーブルにアクセスするには、完全修飾テーブル名を使用します。 パスベースのアクセスは、ボリュームと外部テーブルに対してのみサポートされており、マネージドテーブルに対してはサポートされていません。 詳細については、 Unity Catalogボリュームのパスのルールとアクセス」を参照してください。
- Python
- Scala
- SQL
people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
val people_df = spark.read.table("workspace.default.people_10k")
display(people_df)
SELECT * FROM workspace.default.people_10k;
テーブルへの書き込み
Delta Lake は、テーブルにデータを書き込むために標準の構文を使用します。既存の Delta テーブルに新しいデータを追加するには、追加モードを使用します。upserting とは異なり、テーブルへの書き込みでは重複レコードはチェックされません。
- Python
- Scala
- SQL
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
data = [
(10007, 'Miku', 'Hatsune', 'F', 25)
]
# Create the new data.
df = spark.createDataFrame(data, schema)
# Append the new data to the target table.
df.write.mode("append").saveAsTable("workspace.default.people_10k")
# View the new addition.
df = spark.read.table("workspace.default.people_10k")
df_filtered = df.filter(df["id"] == 10007)
display(df_filtered)
// Create the new data.
val data = Seq(
(10007, "Miku", "Hatsune", "F", 25)
)
val df = spark.createDataFrame(data)
.toDF("id", "firstName", "lastName", "gender", "age")
// Append the new data to the target table
df.write.mode("append").saveAsTable("workspace.default.people_10k")
// View the new addition.
val df2 = spark.read.table("workspace.default.people_10k")
val df_filtered = df2.filter($"id" === 10007)
display(df_filtered)
CREATE OR REPLACE TABLE workspace.default.people_10k_new (
id INT,
firstName STRING,
lastName STRING,
gender STRING,
age INT
);
-- Insert the new data.
INSERT INTO workspace.default.people_10k_new VALUES
(10007, 'Miku', 'Hatsune', 'F', 25);
-- Append the new data to the target table.
INSERT INTO workspace.default.people_10k
SELECT * FROM workspace.default.people_10k_new;
-- View the new addition.
SELECT * FROM workspace.default.people_10k WHERE id = 10007;
Databricks ノートブックのセル出力には、最大 10,000 行または 2 MB のいずれか小さい方が表示されます。workspace.default.people_10kは 10,000 を超える行が含まれているため、ノートブックのdisplay(df)出力には最初の 10,000 行のみが表示されます。追加の行はテーブルに存在しますが、この制限によりノートブックの出力にはレンダリングされません。追加の行を具体的にフィルタリングすることで、それらの行を表示できます。
テーブル内のすべてのデータを置き換えるには、上書きモードを使用します。
- Python
- Scala
- SQL
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
df.write.mode("overwrite").saveAsTable("workspace.default.people_10k")
INSERT OVERWRITE TABLE workspace.default.people_10k SELECT * FROM workspace.default.people_10k_2
テーブルの更新
述語に基づいて Delta テーブル内のデータを更新します。たとえば、 gender列の値をFemaleからFに、 MaleからMに、 OtherからOに変更します。
- Python
- Scala
- SQL
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and update rows using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'Female'",
set = { "gender": "'F'" }
)
# Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'Male',
set = { 'gender': lit('M') }
)
deltaTable.update(
condition = col('gender') == 'Other',
set = { 'gender': lit('O') }
)
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and update rows using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'Female'",
Map("gender" -> "'F'")
)
// Declare the predicate and update rows using Spark SQL functions.
deltaTable.update(
col("gender") === "Male",
Map("gender" -> lit("M")));
deltaTable.update(
col("gender") === "Other",
Map("gender" -> lit("O")));
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
-- Declare the predicate and update rows.
UPDATE workspace.default.people_10k SET gender = 'F' WHERE gender = 'Female';
UPDATE workspace.default.people_10k SET gender = 'M' WHERE gender = 'Male';
UPDATE workspace.default.people_10k SET gender = 'O' WHERE gender = 'Other';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
テーブルからの削除
Delta テーブルから述語に一致するデータを削除します。たとえば、次のコードは 2 つの削除操作を示しています。最初に年齢が 18 未満の行を削除し、次に年齢が 21 未満の行を削除します。
- Python
- Scala
- SQL
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
# Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
# Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col('age') < '21')
# View the updated table.
df = spark.read.table("workspace.default.people_10k")
display(df)
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
// Declare the predicate and delete rows using a SQL-formatted string.
deltaTable.delete("age < '18'")
// Declare the predicate and delete rows using Spark SQL functions.
deltaTable.delete(col("age") < "21")
// View the updated table.
val df = spark.read.table("workspace.default.people_10k")
display(df)
-- Delete rows using a predicate.
DELETE FROM workspace.default.people_10k WHERE age < '21';
-- View the updated table.
SELECT * FROM workspace.default.people_10k;
削除すると、 Deltaテーブルの最新バージョンからデータが削除されますが、古いバージョンが明示的にvacuumまで物理ストレージからは削除されません。 詳細については、 vacuumを参照してください。
テーブル履歴の表示
テーブルへの各書き込みの起源情報を表示するには、 PythonとScalaのDeltaTable.historyメソッドと SQL のDESCRIBE HISTORYステートメントを使用します。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
display(deltaTable.history())
DESCRIBE HISTORY workspace.default.people_10k
タイムトラベルを使用してテーブルの以前のバージョンをクエリする
Delta Lake タイムトラベルを使用して、Delta テーブルの古いスナップショットをクエリします。特定のバージョンを照会するには、テーブルのバージョン番号またはタイムスタンプを使用します。たとえば、テーブルの履歴からバージョン0またはタイムスタンプ2026-01-05T23:09:47.000+00:00をクエリします。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaHistory = deltaTable.history()
# Query using the version number.
display(deltaHistory.where("version == 0"))
# Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
val deltaHistory = deltaTable.history()
// Query using the version number.
display(deltaHistory.where("version == 0"))
// Query using the timestamp.
display(deltaHistory.where("timestamp == '2026-01-05T23:09:47.000+00:00'"))
-- Query using the version number
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Query using the timestamp
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
タイムスタンプの場合、日付またはタイムスタンプの文字列のみが受け入れられます。たとえば、文字列は"2026-01-05T22:43:15.000+00:00"または"2026-01-05 22:43:15"としてフォーマットする必要があります。
DataFrameReaderオプションを使用して、テーブルの特定のバージョンまたはタイムスタンプに固定された Delta テーブルから DataFrame を作成します。
- Python
- Scala
- SQL
# Query using the version number.
df = spark.read.option('versionAsOf', 0).table("workspace.default.people_10k")
# Query using the timestamp.
df = spark.read.option('timestampAsOf', '2026-01-05T23:09:47.000+00:00').table("workspace.default.people_10k")
display(df)
// Query using the version number.
val dfVersion = spark.read
.option("versionAsOf", 0)
.table("workspace.default.people_10k")
// Query using the timestamp.
val dfTimestamp = spark.read
.option("timestampAsOf", "2026-01-05T23:09:47.000+00:00")
.table("workspace.default.people_10k")
display(dfVersion)
display(dfTimestamp)
-- Create a temporary view from version 0 of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_v0 AS
SELECT * FROM workspace.default.people_10k VERSION AS OF 0;
-- Create a temporary view from a previous timestamp of the table.
CREATE OR REPLACE TEMPORARY VIEW people_10k_t0 AS
SELECT * FROM workspace.default.people_10k TIMESTAMP AS OF '2026-01-05T23:09:47.000+00:00';
SELECT * FROM people_10k_v0;
SELECT * FROM people_10k_t0;
詳細については、 「テーブル履歴の操作」を参照してください。
テーブルの最適化
テーブルを複数回変更すると、複数の小さなファイルが作成され、読み取りクエリのパフォーマンスが低下する可能性があります。最適化操作を使用して、小さなファイルを大きなファイルに結合することで速度を向上させます。OPTIMIZE を参照してください。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeCompaction()
OPTIMIZE workspace.default.people_10k
予測的最適化が有効な場合は、手動で最適化する必要はありません。 予測的最適化はメンテナンスタスクを自動的に管理します。 詳細については、 Unity Catalogマネージド テーブルの予測的最適化を参照してください。
列によるZ-Order
データをZ-Order読み取りパフォーマンスをさらに向上させるには、操作で順序付ける列を指定します。 たとえば、高カーディナリティ列firstNameでコロケートします。Z-Orderingの詳細については、 「データ スキップ」を参照してください。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.optimize().executeZOrderBy("firstName")
OPTIMIZE workspace.default.people_10k
ZORDER BY (firstName)
vacuum操作でスナップショットをクリーンアップする
Delta Lake には読み取り用のスナップショット分離機能があるため、他のユーザーまたはジョブがテーブルをクエリしている間でも最適化操作を安全に実行できます。ただし、最終的には古いスナップショットをクリーンアップする必要があります。そうすることで、ストレージ コストが削減され、クエリ パフォーマンスが向上し、データのコンプライアンスが確保されます。古いスナップショットをクリーンアップするには、 VACUUM操作を実行します。vacuumを参照してください。
- Python
- Scala
- SQL
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "workspace.default.people_10k")
deltaTable.vacuum()
VACUUM workspace.default.people_10k
vacuum操作を効果的に使用する方法の詳細については、 「未使用のデータ ファイルをvacuumで削除する」を参照してください。