チュートリアル:初めてのDelta Live Tablesパイプラインの実行
このチュートリアルでは、初めての Delta Live Tables パイプラインの設定、基本的な ETL コードの記述、およびパイプラインの更新の実行を行う手順を説明します。
このチュートリアルのすべてのステップは、 Unity Catalog が有効になっているワークスペース用に設計されています。 また Delta Live Tables パイプラインを従来の Hive metastoreと連携するように設定することもできます。 Delta Live Tablesレガシー での パイプラインの使用Hive metastore を参照してください。
注
このチュートリアルでは、Databricks ノートブックを使用して新しいパイプライン コードを開発および検証する手順について説明します。 Python または SQL ファイルのソース コードを使用してパイプラインを構成することもできます。
Delta Live Tables 構文を使用して記述されたソース コードが既にある場合は、コードを実行するようにパイプラインを構成できます。 「Delta Live Tables パイプラインの構成」を参照してください。
で完全に宣言型のSQL Databricks SQL構文を使用して、マテリアライズドビューとストリーミングテーブルの更新スケジュールを Unity Catalog 管理オブジェクトとして登録し、設定することができます。「Databricks SQL でのマテリアライズド ビューの使用」および「Databricks SQL でのストリーミング テーブルを使用したデータの読み込み」を参照してください。
例: ニューヨークの赤ちゃんの名前データの取り込みと処理
この記事の例では、 ニューヨーク州の赤ちゃんの名前のレコードを含む一般公開されているデータセットを使用しています。 この例では、Delta Live Tables パイプラインを使用して次の操作を行う方法を示します。
ボリュームからテーブルに生のCSVデータを読み取ります。
インジェスト テーブルからレコードを読み取り、Delta Live Tables の期待 値を使用して、クレンジングされたデータを含む新しいテーブルを作成します。
クレンジングされたレコードを、派生データセットを作成するDelta Live Tablesクエリへの入力として使用します。
このコードは、メダリオンアーキテクチャの簡略化された例を示しています。「メダリオンレイクハウスアーキテクチャとは」を参照してください。
この例の実装は、Python と SQL に対して提供されています。 ステップに従って新しいパイプラインとノートブックを作成し、提供されたコードをコピーして貼り付けます。
完全なコードを含むサンプル ノートブック も提供されています。
要件
パイプラインを開始するには、 クラスター作成権限、またはDelta Live Tablesクラスターを定義するクラスターポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムはパイプラインを実行する前にクラスターを作成しますが、適切な権限がない場合は失敗します。
すべてのユーザーが、サーバレス パイプライン by デフォルトを使用して更新をトリガーできます。 サーバレスはアカウントレベルで有効にする必要があり、ワークスペースリージョンでは使用できない場合があります。 Enable サーバレス コンピュートを参照してください。
このチュートリアルの例では、 Unity Catalog を使用します。 Databricks では、ターゲット スキーマに複数のデータベース オブジェクトが作成されるため、このチュートリアルを実行するために新しいスキーマを作成することをお勧めします。
カタログに新しいスキーマを作成するには、
ALL PRIVILEGES
権限またはUSE CATALOG
権限とCREATE SCHEMA
権限が必要です。新しいスキーマを作成できない場合は、既存のスキーマに対してこのチュートリアルを実行してください。 次の権限が必要です。
USE CATALOG
親カタログの場合。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
、CREATE MATERIALIZED VIEW
、およびCREATE TABLE
権限を使用します。
このチュートリアルでは、ボリュームを使用してサンプル データを格納します。 Databricks では、このチュートリアルでは新しいボリュームを作成することをお勧めします。 このチュートリアルで新しいスキーマを作成する場合は、そのスキーマに新しいボリュームを作成できます。
既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。
USE CATALOG
親カタログの場合。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
権限とCREATE VOLUME
権限。
オプションで、既存のボリュームを使用できます。 次の権限が必要です。
USE CATALOG
親カタログの場合。USE SCHEMA
親スキーマの場合。ALL PRIVILEGES
または、ターゲットボリュームでREAD VOLUME
とWRITE VOLUME
します。
これらのアクセス許可を設定するには、Databricks 管理者に問い合わせてください。 Unity Catalog の特権の詳細については、「 Unity Catalog の特権とセキュリティ保護可能なオブジェクト」を参照してください。
ステップ 0: データをダウンロードする
この例では、Unity Catalog ボリュームからデータを読み込みます。 次のコードは、CSV ファイルをダウンロードし、指定したボリュームに格納します。 新しいノートブックを開き、次のコードを実行して、このデータを指定したボリュームにダウンロードします。
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
<catalog-name>
、<schema-name>
、および <volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。指定されたコードは、指定されたスキーマとボリュームの作成を試みます (これらのオブジェクトが存在しない場合)。 Unity Catalog でオブジェクトを作成して書き込むには、適切な特権が必要です。 「要件」を参照してください。
注
チュートリアルを続行する前に、このノートブックが正常に実行されていることを確認してください。 このノートブックをパイプラインの一部として構成しないでください。
ステップ 1: パイプラインを作成する
Delta Live Tables は、Delta Live Tables 構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。
重要
「ソースコード」フィールドにはアセットを設定しないでください。このフィールドを黒のままにすると、ソース コード オーサリング用のノートブックが作成され、構成されます。
このチュートリアルの手順では、サーバレス コンピュート と Unity Catalogを使用します。 これらの手順に記載されていないすべての設定オプションには、デフォルト設定を使用します。
注
ワークスペースでサーバレスが有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記述されているとおりにチュートリアルを完了できます。 パイプラインの作成 UI の [宛先] セクションの [ストレージ オプション] で Unity Catalog を手動で選択する必要があります。
新しいパイプラインを設定するには、次の手順を実行します。
サイドバーの 「Delta Live Tables 」をクリックします。
「パイプラインの作成」をクリックします。
一意の パイプライン名を指定します。
サーバレスの横のチェックボックスをオンにします。
データを公開する カタログ を選択します。
カタログで スキーマ を選択します。
新しいスキーマ名を指定して、スキーマを作成します。
[詳細設定] の [設定を追加] ボタンを使用して 3 つのパイプライン パラメーターを定義し、3 つの設定を追加します。データをダウンロードしたカタログ、スキーマ、およびボリュームを、次のパラメーター名を使用して指定します。
my_catalog
my_schema
my_volume
[作成]をクリックします。
新しく作成したパイプラインの UI が表示されます。 ソース コード ノートブックは、パイプラインに対して自動的に作成され、構成されます。
ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、 /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
このノートブックにアクセスするためのリンクは、パイプラインの詳細パネルの [ソース コード] フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。
ステップ 2: Python または SQL を使用してノートブックでマテリアライズド ビューとストリーミング テーブルを宣言する
Datbricks ノートブックを使用して、Delta Live Tables パイプラインのソース コードを対話形式で開発および検証できます。 この機能を使用するには、ノートブックをパイプラインにアタッチする必要があります。 新しく作成したノートブックを作成したパイプラインにアタッチするには、次のようにします。
右上の 「Connect 」をクリックして、コンピュート設定メニューを開きます。
ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。
「接続」をクリックします。
UI が変更され、右上に [検証] ボタンと [開始 ] ボタンが表示されます。 パイプライン コード開発のノートブック サポートの詳細については、「ノートブックでの API の開発とデバッグ」Delta Live Tablesを参照してください。
重要
Delta Live Tables パイプラインは、計画中にノートブック内のすべてのセルを評価します。 汎用コンピュートに対して実行されるノートブックやジョブとしてスケジュールされるノートブックとは異なり、パイプラインはセルが指定された順序で実行されることを保証するものではありません。
ノートブックには、1 つのプログラミング言語のみを含めることができます。 パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。
Python または SQL を使用したコードの開発の詳細については、「 Python を使用したパイプライン コードの開発 」または 「SQL を使用したパイプライン コードの開発」を参照してください。
パイプライン コードの例
このチュートリアルの例を実装するには、次のコードをコピーして、パイプラインのソース コードとして構成されたノートブックのセルに貼り付けます。
提供されるコードは、次の処理を行います。
必要なモジュールをインポートします(Pythonのみ)。
パイプラインの構成中に定義されたパラメーターを参照します。
ボリュームから取り込む
baby_names_raw
という名前のストリーミング テーブルを定義します。取り込まれたデータを検証する
baby_names_prepared
という名前の具体化ビュー (Materialized View) を定義します。データの高度に絞り込まれたビューを持つ
top_baby_names_2021
という名前の実体化ビュー (Materialized View) を定義します。
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
ノートブックの例
次のノートブックには、この記事で説明したものと同じコード例が含まれています。 これらのノートブックには、この記事のステップ と同じ要件があります。 「要件」を参照してください。
ノートブックをインポートするには、次の手順を実行します。
ノートブックの UI を開きます。
[+ 新しいノートブック>] をクリックします。
空のノートブックが開きます。
[ファイル] > [インポート...] をクリックします。[インポート] ダイアログが表示されます。
[インポート元] の [URL] オプションを選択します。
ノートブックの URL を貼り付けます。
「インポート」をクリックします。
このチュートリアルでは、Delta Live Tables パイプラインを構成して実行する前に、データ セットアップ ノートブックを実行する必要があります。 次のノートブックをインポートし、ノートブックをコンピュート リソースにアタッチし、 my_catalog
、 my_schema
、 my_volume
の必要な変数を入力して、[ すべて実行] をクリックします。
次のノートブックは、Python または SQL の例を示しています。 ノートブックをインポートすると、ユーザーのホームディレクトリに保存されます。
次のいずれかのノートブックをインポートした後、パイプラインを作成するステップを完了し、ダウンロードしたノートブックを選択する にはソース コード ファイル ピッカーを使用します。 ソース コードとして構成されたノートブックを使用してパイプラインを作成した後、パイプライン UI で [開始 ] をクリックして更新をトリガーします。