チュートリアル: 初めての DLT パイプラインを実行する
このチュートリアルでは、最初の DLT パイプラインを構成し、基本的な ETL コードを記述し、パイプラインの更新を実行する手順について説明します。
このチュートリアルのすべての手順は、Unity Catalog が有効になっているワークスペース向けに設計されています。 DLT パイプラインを従来の Hive metastoreと連携するように設定することもできます。 DLT パイプラインとレガシーHive metastoreの使用を参照してください。
このチュートリアルでは、Databricks ノートブックを使用して新しいパイプライン コードを開発および検証する手順について説明します。 Python または SQL ファイルのソース コードを使用してパイプラインを構成することもできます。
DLT 構文を使用して記述されたソース コードが既にある場合は、コードを実行するようにパイプラインを構成できます。DLT パイプラインの設定を参照してください。
では、完全に宣言型のSQL Databricks SQL構文を使用して、マテリアライズドビュー とストリーミングテーブルの更新スケジュールを Unity Catalog マネージド オブジェクトとして登録し、設定することができます。Databricks SQLのマテリアライズドビュー の使用およびDatabricks SQLのストリーミングテーブルを使用したデータのロードを参照してください。
例: ニューヨークの赤ちゃんの名前データの取り込みと処理
この記事の例では、 ニューヨーク州の赤ちゃんの名前のレコードを含む一般公開されているデータセットを使用しています。 この例では、DLT パイプラインを使用して次の操作を行う方法を示します。
- ボリュームからテーブルに生のCSVデータを読み取ります。
- インジェスト テーブルからレコードを読み取り、DLTのエクスペクテーション を使用して、クレンジングされたデータを含む新しいテーブルを作成します。
- クレンジングされたレコードを、派生データセットを作成する DLT クエリへの入力として使用します。
このコードは、メダリオンアーキテクチャの簡略化された例を示しています。 メダリオンレイクハウスアーキテクチャとは?をご覧ください。
この例の実装は、Python と SQL で提供されています。 手順に従って新しいパイプラインとノートブックを作成し、提供されたコードをコピーして貼り付けます。
完全なコードを含むサンプル ノートブック も提供されています。
必要条件
-
パイプラインを開始するには、 クラスタリングの作成アクセス許可 、または DLT クラスタリングを定義するクラスターポリシーへのアクセス権が必要です。 DLT ランタイムは、パイプラインを実行する前にクラスタリングを作成し、適切なアクセス許可がないと失敗します。
-
デフォルトですべてのユーザーが、サーバレス パイプラインを使用して更新をトリガーできます。 サーバレスはアカウントレベルで有効にする必要があり、ワークスペースリージョンでは使用できない場合があります。 サーバレス コンピュートの有効化を参照してください。
-
このチュートリアルの例では、 Unity Catalog を使用します。 Databricks では、ターゲット スキーマに複数のデータベース オブジェクトが作成されるため、このチュートリアルを実行するために新しいスキーマを作成することをお勧めします。
-
カタログに新しいスキーマを作成するには、
ALL PRIVILEGES
権限またはUSE CATALOG
権限とCREATE SCHEMA
権限が必要です。 -
新しいスキーマを作成できない場合は、既存のスキーマに対してこのチュートリアルを実行してください。 次の権限が必要です。
USE CATALOG
親カタログの場合。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
、CREATE MATERIALIZED VIEW
、およびCREATE TABLE
権限を使用します。
-
このチュートリアルでは、ボリュームを使用してサンプル データを格納します。 Databricks では、このチュートリアルでは新しいボリュームを作成することをお勧めします。 このチュートリアルで新しいスキーマを作成する場合は、そのスキーマに新しいボリュームを作成できます。
-
既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。
USE CATALOG
親カタログの場合。ALL PRIVILEGES
または、ターゲット スキーマに対するUSE SCHEMA
権限とCREATE VOLUME
権限。
-
オプションで、既存のボリュームを使用できます。 次の権限が必要です。
USE CATALOG
親カタログの場合。USE SCHEMA
親スキーマの場合。ALL PRIVILEGES
または、ターゲットボリュームでREAD VOLUME
とWRITE VOLUME
します。
-
これらのアクセス許可を設定するには、Databricks 管理者に問い合わせてください。 Unity Catalog の特権の詳細については、「 Unity Catalog の特権とセキュリティ保護可能なオブジェクト」を参照してください。
-
ステップ 0: データをダウンロードする
この例では、Unity Catalog ボリュームからデータを読み込みます。 次のコードは、CSV ファイルをダウンロードし、指定したボリュームに格納します。 新しいノートブックを開き、次のコードを実行して、このデータを指定したボリュームにダウンロードします。
import urllib
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
urllib.request.urlretrieve(download_url, volume_path + filename)
<catalog-name>
、<schema-name>
、および <volume-name>
を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。指定されたコードは、指定されたスキーマとボリュームの作成を試みます (これらのオブジェクトが存在しない場合)。 Unity Catalog でオブジェクトを作成して書き込むには、適切な特権が必要です。 「要件」を参照してください。
チュートリアルを続行する前に、このノートブックが正常に実行されていることを確認してください。 このノートブックをパイプラインの一部として構成しないでください。
ステップ 1: パイプラインを作成する
DLT は、DLT 構文を使用してノートブックまたはファイル ( ソース コード と呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。
ソースコード フィールドにはアセットを設定しないでください。このフィールドを空のままにすると、ソース コード オーサリング用のノートブックが作成され、構成されます。
このチュートリアルの手順では、サーバレス コンピュート と Unity Catalogを使用します。 これらの手順で指定されていないすべての設定オプションには、デフォルト設定を使用します。
ワークスペースでサーバレスが有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記述されているとおりにチュートリアルを完了できます。 パイプラインの作成 UI の 宛先 セクションの ストレージ オプション で Unity Catalog を手動で選択する必要があります。
新しいパイプラインを設定するには、次の手順を実行します。
-
サイドバーで、「 パイプライン」 をクリックします。
-
パイプラインの作成 をクリックします。
-
パイプライン名 に、一意のパイプライン名を入力します。
-
サーバレス チェックボックスを選択します。
-
配信先 で、テーブルが発行される Unity Catalog の場所を構成するには、 カタログ と スキーマ を選択します。
-
Advanced で、 構成の追加 をクリックし、データをダウンロードしたカタログ、スキーマ、およびボリュームのパイプライン・パラメーターを、以下のパラメーター名を使用して定義します。
my_catalog
my_schema
my_volume
-
作成 をクリックします。
新しいパイプラインのパイプラインUIが表示されます。 ソース コード ノートブックは、パイプラインに対して自動的に作成され、構成されます。
ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、 /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
このノートブックにアクセスするためのリンクは、 パイプラインの詳細 パネルの ソース コード フィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。
ステップ 2: マテリアライズドビュー と ストリーミングテーブルを Python または SQL のノートブックで宣言する
Databricks ノートブックを使用して、DLT パイプラインのソースコードを対話形式で開発および検証できます。この機能を使用するには、ノートブックをパイプラインにアタッチする必要があります。新しく作成したノートブックを作成したパイプラインにアタッチするには、次のようにします。
- 右上の 接続 をクリックして、コンピュート設定メニューを開きます。
- ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。
- 「 接続 」をクリックします。
UI が変更され、右上に 検証 ボタンと 開始 ボタンが表示されます。 パイプライン コード開発のノートブック サポートの詳細については、「 ノートブックでの DLT パイプラインの開発とデバッグ」を参照してください。
- DLT パイプラインは、計画中にノートブック内のすべてのセルを評価します。汎用コンピュートに対して実行されるノートブックやジョブとしてスケジュールされるノートブックとは異なり、パイプラインはセルが指定された順序で実行されることを保証するものではありません。
- ノートブックには、1 つのプログラミング言語のみを含めることができます。 パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。
Python または SQL を使用したコードの開発の詳細については、 Python を使用したパイプライン コードの開発 または SQL を使用したパイプライン コードの開発を参照してください。
パイプライン コードの例
このチュートリアルの例を実装するには、次のコードをコピーして、パイプラインのソース コードとして構成されたノートブックのセルに貼り付けます。
提供されるコードは、次の処理を行います。
- 必要なモジュールをインポートします(Pythonのみ)。
- パイプラインの構成中に定義されたパラメーターを参照します。
- ボリュームから取り込む
baby_names_raw
という名前のストリーミングテーブルを定義します。 - 取り込まれたデータを検証する
baby_names_prepared
という名前のマテリアライズドビュー (Materialized View) を定義します。 - データの高度に洗練されたビューを持つ
top_baby_names_2021
という名前のマテリアライズドビューを定義します。
- Python
- SQL
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
ステップ 3: パイプラインの更新を開始する
パイプラインの更新を開始するには、ノートブック UI の右上にある 起動 ボタンをクリックします。
ノートブックの例
次のノートブックには、この記事で説明したものと同じコード例が含まれています。 これらのノートブックには、この記事の手順と同じ要件があります。 「要件」を参照してください。
ノートブックをインポートするには、次の手順を実行します。
-
ノートブックの UI を開きます。
- + 新規 > ノートブック をクリックします。
- 空のノートブックが開きます。
-
ファイル > インポート... をクリックします。 インポート ダイアログが表示されます。
-
インポート元 の URL オプションを選択します。
-
ノートブックの URL を貼り付けます。
-
「 インポート 」をクリックします。
このチュートリアルでは、DLT パイプラインを構成して実行する前に、データ セットアップ ノートブックを実行する必要があります。次のノートブックをインポートし、ノートブックをコンピュート リソースにアタッチし、 my_catalog
、 my_schema
、 my_volume
の必要な変数を入力して、[ すべて実行 ] をクリックします。
パイプラインのデータ ダウンロードのチュートリアル
次のノートブックは、Python または SQL の例を示しています。 ノートブックをインポートすると、ユーザーのホームディレクトリに保存されます。
次のいずれかのノートブックをインポートした後、パイプラインを作成するステップを完了し、ダウンロードしたノートブックを選択するには ソース コード ファイル ピッカーを使用します。 ソース コードとして構成されたノートブックを使用してパイプラインを作成した後、パイプライン UI で 起動 をクリックして更新をトリガーします。