チュートリアル:初めてのDelta Live Tablesパイプラインの実行

このチュートリアルでは、初めての Delta Live Tables パイプラインの設定、基本的な ETL コードの記述、およびパイプラインの更新の実行を行う手順を説明します。

このチュートリアルのすべてのステップは、 Unity Catalog が有効になっているワークスペース用に設計されています。 また Delta Live Tables パイプラインを従来の Hive metastoreと連携するように設定することもできます。 レガシーHiveメタストアでのDelta Live Tablesパイプラインの使用 を参照してください。

このチュートリアルでは、Databricks ノートブックを使用して新しいパイプライン コードを開発および検証する手順について説明します。 Python または SQL ファイルのソース コードを使用してパイプラインを構成することもできます。

Delta Live Tables 構文を使用して記述されたソース コードが既にある場合は、コードを実行するようにパイプラインを構成できます。 Delta Live Tables パイプラインの構成を参照してください。

で完全に宣言型のSQL Databricks SQL構文を使用して、マテリアライズドビューとストリーミングテーブルの更新スケジュールを Unity Catalog 管理オブジェクトとして登録し、設定することができます。Databricks SQL でのマテリアライズド ビューの使用およびDatabricks SQL でのストリーミング テーブルを使用したデータの読み込みを参照してください。

例: ニューヨークの赤ちゃんの名前データの取り込みと処理

この記事の例では、 ニューヨーク州の赤ちゃんの名前のレコードを含む一般公開されているデータセットを使用しています。 この例では、Delta Live Tables パイプラインを使用して次の操作を行う方法を示します。

  • ボリュームからテーブルに生のCSVデータを読み取ります。

  • インジェスト テーブルからレコードを読み取り、Delta Live Tables のエクスペクテーション を使用して、クレンジングされたデータを含む新しいテーブルを作成します。

  • クレンジングされたレコードを、派生データセットを作成するDelta Live Tablesクエリへの入力として使用します。

このコードは、メダリオンアーキテクチャの簡略化された例を示しています。メダリオンレイクハウスアーキテクチャとはを参照してください。

この例の実装は、Python と SQL で提供されています。 手順に従って新しいパイプラインとノートブックを作成し、提供されたコードをコピーして貼り付けます。

完全なコードを含むサンプル ノートブック も提供されています。

要件

  • パイプラインを開始するには、 クラスター作成権限、またはDelta Live Tablesクラスターを定義するクラスターポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムはパイプラインを実行する前にクラスターを作成しますが、適切な権限がない場合は失敗します。

  • すべてのユーザーがデフォルトで、サーバレス パイプラインを使用して更新をトリガーできます。 サーバレスはアカウントレベルで有効にする必要があり、ワークスペースリージョンでは使用できない場合があります。 サーバレス コンピュートの有効化を参照してください。

  • このチュートリアルの例では、 Unity Catalog を使用します。 Databricks では、ターゲット スキーマに複数のデータベース オブジェクトが作成されるため、このチュートリアルを実行するために新しいスキーマを作成することをお勧めします。

    • カタログに新しいスキーマを作成するには、 ALL PRIVILEGES 権限または USE CATALOG 権限と CREATE SCHEMA 権限が必要です。

    • 新しいスキーマを作成できない場合は、既存のスキーマに対してこのチュートリアルを実行してください。 次の権限が必要です。

      • USE CATALOG 親カタログの場合。

      • ALL PRIVILEGES または、ターゲット スキーマに対する USE SCHEMACREATE MATERIALIZED VIEW、および CREATE TABLE 権限を使用します。

    • このチュートリアルでは、ボリュームを使用してサンプル データを格納します。 Databricks では、このチュートリアルでは新しいボリュームを作成することをお勧めします。 このチュートリアルで新しいスキーマを作成する場合は、そのスキーマに新しいボリュームを作成できます。

      • 既存のスキーマに新しいボリュームを作成するには、次の権限が必要です。

        • USE CATALOG 親カタログの場合。

        • ALL PRIVILEGES または、ターゲット スキーマに対する USE SCHEMA 権限と CREATE VOLUME 権限。

      • オプションで、既存のボリュームを使用できます。 次の権限が必要です。

        • USE CATALOG 親カタログの場合。

        • USE SCHEMA 親スキーマの場合。

        • ALL PRIVILEGES または、ターゲットボリュームで READ VOLUMEWRITE VOLUME します。

    これらのアクセス許可を設定するには、Databricks 管理者に問い合わせてください。 Unity Catalog の特権の詳細については、Unity Catalog の特権とセキュリティ保護可能なオブジェクトを参照してください。

ステップ 0: データをダウンロードする

この例では、Unity Catalog ボリュームからデータを読み込みます。 次のコードは、CSV ファイルをダウンロードし、指定したボリュームに格納します。 新しいノートブックを開き、次のコードを実行して、このデータを指定したボリュームにダウンロードします。

my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"

dbutils.fs.cp(download_url, volume_path + filename)

<catalog-name><schema-name>、および <volume-name> を、Unity Catalog ボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。指定されたコードは、指定されたスキーマとボリュームの作成を試みます (これらのオブジェクトが存在しない場合)。 Unity Catalog でオブジェクトを作成して書き込むには、適切な特権が必要です。 要件を参照してください。

チュートリアルを続行する前に、このノートブックが正常に実行されていることを確認してください。 このノートブックをパイプラインの一部として構成しないでください。

ステップ 1: パイプラインを作成する

Delta Live Tables は、Delta Live Tables 構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインには複数の言語固有のノートブックまたはファイルを追加できます。

重要

ソースコードフィールドにはアセットを設定しないでください。このフィールドを空のままにすると、ソース コード オーサリング用のノートブックが作成され、構成されます。

このチュートリアルの手順では、サーバレス コンピュート と Unity Catalogを使用します。 これらの手順で指定されていないすべての設定オプションには、デフォルト設定を使用します。

ワークスペースでサーバレスが有効になっていないか、サポートされていない場合は、デフォルト コンピュートの設定を使用して、記述されているとおりにチュートリアルを完了できます。 パイプラインの作成 UI の 宛先セクションの ストレージ オプションUnity Catalog を手動で選択する必要があります。

新しいパイプラインを設定するには、次の手順を実行します。

  1. サイドバーで、 Delta Live Tablesをクリックします。

  2. パイプラインの作成をクリックします。

  3. パイプライン名に、一意のパイプライン名を入力します。

  4. サーバレスチェックボックスを選択します。

  5. 配信先 で、テーブルが発行される Unity Catalog の場所を構成するには、 カタログスキーマを選択します。

  6. Advancedで、構成の追加をクリックし、データをダウンロードしたカタログ、スキーマ、およびボリュームのパイプライン・パラメーターを、以下のパラメーター名を使用して定義します。

    • my_catalog

    • my_schema

    • my_volume

  7. 作成をクリックします。

新しいパイプラインのパイプラインUIが表示されます。 ソース コード ノートブックは、パイプラインに対して自動的に作成され、構成されます。

ノートブックは、ユーザー・ディレクトリー内の新しいディレクトリーに作成されます。 新しいディレクトリとファイルの名前は、パイプラインの名前と一致します。 たとえば、 /Users/your.username@databricks.com/my_pipeline/my_pipeline.

このノートブックにアクセスするためのリンクは、パイプラインの詳細パネルの ソース コードフィールドの下にあります。リンクをクリックしてノートブックを開き、次のステップに進みます。

ステップ 2: Python または SQLを使用してノートブックでマテリアライズド ビューとストリーミング テーブルを宣言する

Datbricks ノートブックを使用して、Delta Live Tables パイプラインのソース コードを対話形式で開発および検証できます。 この機能を使用するには、ノートブックをパイプラインにアタッチする必要があります。 新しく作成したノートブックを作成したパイプラインにアタッチするには、次のようにします。

  1. 右上の 「Connect 」をクリックして、コンピュート設定メニューを開きます。

  2. ステップ 1 で作成したパイプラインの名前にカーソルを合わせます。

  3. 接続」をクリックします。

UI が変更され、右上に 検証 ボタンと 起動ボタンが表示されます。 パイプライン コード開発のノートブック サポートの詳細については、ノートブックでのDelta Live Tablesパイプラインの開発とデバッグを参照してください。

重要

  • Delta Live Tables パイプラインは、計画中にノートブック内のすべてのセルを評価します。 汎用コンピュートに対して実行されるノートブックやジョブとしてスケジュールされるノートブックとは異なり、パイプラインはセルが指定された順序で実行されることを保証するものではありません。

  • ノートブックには、1 つのプログラミング言語のみを含めることができます。 パイプラインのソースコードノートブックで Python コードと SQL コードを混在させないでください。

Python または SQL を使用したコードの開発の詳細については、「 Python を使用したパイプライン コードの開発 」または 「SQL を使用したパイプライン コードの開発」を参照してください。

パイプライン コードの例

このチュートリアルの例を実装するには、次のコードをコピーして、パイプラインのソース コードとして構成されたノートブックのセルに貼り付けます。

提供されるコードは、次の処理を行います。

  • 必要なモジュールをインポートします(Pythonのみ)。

  • パイプラインの構成中に定義されたパラメーターを参照します。

  • ボリュームから取り込む baby_names_raw という名前のストリーミング テーブルを定義します。

  • 取り込まれたデータを検証する baby_names_prepared という名前の具体化ビュー (Materialized View) を定義します。

  • データの高度に絞り込まれたビューを持つ top_baby_names_2021 という名前の実体化ビュー (Materialized View) を定義します。

# Import modules

import dlt
from pyspark.sql.functions import *

# Assign pipeline parameters to variables

my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")

# Define the path to source data

volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"

# Define a streaming table to ingest data from a volume

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("inferSchema", True)
    .option("header", True)
    .load(volume_path)
  )
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

# Define a materialized view that validates data and renames a column

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    spark.read.table("LIVE.baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

# Define a materialized view that has a filtered, aggregated, and sorted view of the data

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    spark.read.table("LIVE.baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )
-- Define a streaming table to ingest data from a volume

CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
  '/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST'));

-- Define a materialized view that validates data and renames a column

CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM LIVE.baby_names_raw;

-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

ステップ 3: パイプラインの更新を開始する

パイプラインの更新を開始するには、ノートブック UI の右上にある 起動ボタンをクリックします。

ノートブックの例

次のノートブックには、この記事で説明したものと同じコード例が含まれています。 これらのノートブックには、この記事のステップ と同じ要件があります。 「要件」を参照してください。

ノートブックをインポートするには、次の手順を実行します。

  1. ノートブックの UI を開きます。

    • [+ 新しいノートブック>] をクリックします。

    • 空のノートブックが開きます。

  2. [ファイル] > [インポート...] をクリックします。[インポート] ダイアログが表示されます。

  3. [インポート元] の [URL] オプションを選択します。

  4. ノートブックの URL を貼り付けます。

  5. インポート」をクリックします。

このチュートリアルでは、Delta Live Tables パイプラインを構成して実行する前に、データ セットアップ ノートブックを実行する必要があります。 次のノートブックをインポートし、ノートブックをコンピュート リソースにアタッチし、 my_catalogmy_schemamy_volumeの必要な変数を入力して、[ すべて実行] をクリックします。

パイプラインのデータ ダウンロードのチュートリアル

ノートブックを新しいタブで開く

次のノートブックは、Python または SQL の例を示しています。 ノートブックをインポートすると、ユーザーのホームディレクトリに保存されます。

次のいずれかのノートブックをインポートした後、パイプラインを作成するステップを完了し、ダウンロードしたノートブックを選択するにはソース コード ファイル ピッカーを使用します。 ソース コードとして構成されたノートブックを使用してパイプラインを作成した後、パイプライン UI で 起動をクリックして更新をトリガーします。

Delta Live Tablesを使い始めるノートブック(Python版)

ノートブックを新しいタブで開く

Delta Live Tablesを使い始めるノートブック(SQL版)

ノートブックを新しいタブで開く