チュートリアル:初めてのDelta Live Tablesパイプラインの実行
January 14, 2025
このチュートリアルでは、初めての Delta Live Tables パイプラインの設定、基本的な ETL コードの記述、およびパイプラインの更新の実行を行う手順を説明します。
このチュートリアルのすべてのステップは、 Unity Catalog が有効になっているワークスペース用に設計されています。 また Delta Live Tables パイプラインを従来の Hive metastoreと連携するように設定することもできます。 レガシーHiveメタストアでのDelta Live Tablesパイプラインの使用 を参照してください。
注
このチュートリアルでは、Databricks ノートブックを使用して新しいパイプライン コードを開発および検証する手順について説明します。 Python または SQL ファイルのソース コードを使用してパイプラインを構成することもできます。
Delta Live Tables 構文を使用して記述されたソース コードが既にある場合は、コードを実行するようにパイプラインを構成できます。 Delta Live Tables パイプラインの構成を参照してください。
で完全に宣言型のSQL Databricks SQL構文を使用して、マテリアライズドビューとストリーミングテーブルの更新スケジュールを Unity Catalog 管理オブジェクトとして登録し、設定することができます。Databricks SQL でのマテリアライズド ビューの使用およびDatabricks SQL でのストリーミング テーブルを使用したデータの読み込みを参照してください。
例: ニューヨークの赤ちゃんの名前データの取り込みと処理
この記事の例では、 ニューヨーク州の赤ちゃんの名前のレコードを含む一般公開されているデータセットを使用しています。 この例では、Delta Live Tables パイプラインを使用して次の操作を行う方法を示します。
ボリュームからテーブルに生のCSVデータを読み取ります。
インジェスト テーブルからレコードを読み取り、Delta Live Tables のエクスペクテーション を使用して、クレンジングされたデータを含む新しいテーブルを作成します。
クレンジングされたレコードを、派生データセットを作成するDelta Live Tablesクエリへの入力として使用します。
このコードは、メダリオンアーキテクチャの簡略化された例を示しています。メダリオンレイクハウスアーキテクチャとはを参照してください。
この例の実装は、Python と SQL で提供されています。 手順に従って新しいパイプラインとノートブックを作成し、提供されたコードをコピーして貼り付けます。
完全なコードを含むサンプル ノートブック も提供されています。
要件
パイプラインを開始するには、 クラスター作成権限、またはDelta Live Tablesクラスターを定義するクラスターポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムはパイプラインを実行する前にクラスターを作成しますが、適切な権限がない場合は失敗します。
すべてのユーザーがデフォルトで、サーバレス パイプラインを使用して更新をトリガーできます。 サーバレスはアカウントレベルで有効にする必要があり、ワークスペースリージョンでは使用できない場合があります。 サーバレス コンピュートの有効化を参照してください。
このチュートリアルの例では、 Unity Catalog を使用します。 Databricks では、ターゲット スキーマに複数のデータベース オブジェクトが作成されるため、このチュートリアルを実行するために新しいスキーマを作成することをお勧めします。
カタログに新しいスキーマを作成するには、
ALL PRIVILEGES
権限またはUSE CATALOG
権限とCREATE SCHEMA
権限が必要です。新しいスキーマを作成できない場合は、既存のスキーマに対してこのチュートリアルを実行してください。 次の権限が必要です。
USE CATALOG
親カタログの場合。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
、CREATE MATERIALIZED VIEW
、およびCREATE TABLE
権限を使用します。
このチュートリアルでは、ボリュームを使用してサンプル データを格納します。 Databricks では、このチュートリアルでは新しいボリュームを作成することをお勧めします。 このチュートリアルで新しいスキーマを作成する場合は、そのスキーマに新しいボリュームを作成できます。
既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。
USE CATALOG
親カタログの場合。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
権限とCREATE VOLUME
権限。
オプションで、既存のボリュームを使用できます。 次の権限が必要です。
USE CATALOG
親カタログの場合。USE SCHEMA
親スキーマの場合。ALL PRIVILEGES
または、ターゲットボリュームでREAD VOLUME
とWRITE VOLUME
します。
これらのアクセス許可を設定するには、Databricks 管理者に問い合わせてください。 Unity Catalog の特権の詳細については、Unity Catalog の特権とセキュリティ保護可能なオブジェクトを参照してください。
ステップ 0: データをダウンロードする
この例では、Unity Catalog ボリュームからデータを読み込みます。 次のコードは、CSV ファイルをダウンロードし、指定したボリュームに格納します。 新しいノートブックを開き、次のコードを実行して、このデータを指定したボリュームにダウンロードします。
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
<catalog-name>
、<schema-name>
、および <volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。指定されたコードは、指定されたスキーマとボリュームの作成を試みます (これらのオブジェクトが存在しない場合)。 Unity Catalog でオブジェクトを作成して書き込むには、適切な特権が必要です。 要件を参照してください。
注
チュートリアルを続行する前に、このノートブックが正常に実行されていることを確認してください。 このノートブックをパイプラインの一部として構成しないでください。
ステップ 1: パイプラインを作成する
Delta Live Tables は、Delta Live Tables 構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。
重要
ソースコードフィールドにはアセットを設定しないでください。このフィールドを空のままにすると、ソース コード オーサリング用のノートブックが作成され、構成されます。
このチュートリアルの手順では、サーバレス コンピュート と Unity Catalogを使用します。 これらの手順で指定されていないすべての設定オプションには、デフォルト設定を使用します。
注
ワークスペースでサーバレスが有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記述されているとおりにチュートリアルを完了できます。 パイプラインの作成 UI の 宛先セクションの ストレージ オプションで Unity Catalog を手動で選択する必要があります。
新しいパイプラインを設定するには、次の手順を実行します。
サイドバーで、 Delta Live Tablesをクリックします。
パイプラインの作成をクリックします。
パイプライン名に、一意のパイプライン名を入力します。
サーバレスチェックボックスを選択します。
配信先 で、テーブルが発行される Unity Catalog の場所を構成するには、 カタログと スキーマを選択します。
Advancedで、構成の追加をクリックし、データをダウンロードしたカタログ、スキーマ、およびボリュームのパイプライン・パラメーターを、以下のパラメーター名を使用して定義します。
my_catalog
my_schema
my_volume
作成をクリックします。
新しいパイプラインのパイプラインUIが表示されます。 ソース コード ノートブックは、パイプラインに対して自動的に作成され、構成されます。
ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、 /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
このノートブックにアクセスするためのリンクは、パイプラインの詳細パネルの ソース コードフィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。
ステップ 2: Python または SQLを使用してノートブックでマテリアライズド ビューとストリーミング テーブルを宣言する
Datbricks ノートブックを使用して、Delta Live Tables パイプラインのソース コードを対話形式で開発および検証できます。 この機能を使用するには、ノートブックをパイプラインにアタッチする必要があります。 新しく作成したノートブックを作成したパイプラインにアタッチするには、次のようにします。
右上の 「Connect 」をクリックして、コンピュート設定メニューを開きます。
ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。
「接続」をクリックします。
UI が変更され、右上に 検証 ボタンと 起動ボタンが表示されます。 パイプライン コード開発のノートブック サポートの詳細については、ノートブックでのDelta Live Tablesパイプラインの開発とデバッグを参照してください。
重要
Delta Live Tables パイプラインは、計画中にノートブック内のすべてのセルを評価します。 汎用コンピュートに対して実行されるノートブックやジョブとしてスケジュールされるノートブックとは異なり、パイプラインはセルが指定された順序で実行されることを保証するものではありません。
ノートブックには、1 つのプログラミング言語のみを含めることができます。 パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。
Python または SQL を使用したコードの開発の詳細については、「 Python を使用したパイプライン コードの開発 」または 「SQL を使用したパイプライン コードの開発」を参照してください。
パイプライン コードの例
このチュートリアルの例を実装するには、次のコードをコピーして、パイプラインのソース コードとして構成されたノートブックのセルに貼り付けます。
提供されるコードは、次の処理を行います。
必要なモジュールをインポートします(Pythonのみ)。
パイプラインの構成中に定義されたパラメーターを参照します。
ボリュームから取り込む
baby_names_raw
という名前のストリーミング テーブルを定義します。取り込まれたデータを検証する
baby_names_prepared
という名前の具体化ビュー (Materialized View) を定義します。データの高度に絞り込まれたビューを持つ
top_baby_names_2021
という名前の実体化ビュー (Materialized View) を定義します。
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
ノートブックの例
次のノートブックには、この記事で説明したものと同じコード例が含まれています。 これらのノートブックには、この記事のステップ と同じ要件があります。 「要件」を参照してください。
ノートブックをインポートするには、次の手順を実行します。
ノートブックの UI を開きます。
[+ 新しいノートブック>] をクリックします。
空のノートブックが開きます。
[ファイル] > [インポート...] をクリックします。[インポート] ダイアログが表示されます。
[インポート元] の [URL] オプションを選択します。
ノートブックの URL を貼り付けます。
「インポート」をクリックします。
このチュートリアルでは、Delta Live Tables パイプラインを構成して実行する前に、データ セットアップ ノートブックを実行する必要があります。 次のノートブックをインポートし、ノートブックをコンピュート リソースにアタッチし、 my_catalog
、 my_schema
、 my_volume
の必要な変数を入力して、[ すべて実行] をクリックします。
次のノートブックは、Python または SQL の例を示しています。 ノートブックをインポートすると、ユーザーのホームディレクトリに保存されます。
次のいずれかのノートブックをインポートした後、パイプラインを作成するステップを完了し、ダウンロードしたノートブックを選択するにはソース コード ファイル ピッカーを使用します。 ソース コードとして構成されたノートブックを使用してパイプラインを作成した後、パイプライン UI で 起動をクリックして更新をトリガーします。