チュートリアル:初めてのDelta Live Tablesパイプラインの実行

このチュートリアルでは、Databricks ノートブックのコードから Delta Live Tables パイプラインを構成し、パイプライン更新をトリガーしてパイプラインを実行する方法を示します。 このチュートリアルには、 PythonおよびSQLインターフェイスを使用してサンプル コードを含むサンプル データセットを取り込んで処理するサンプル パイプラインが含まれています。 このチュートリアルの手順を使用して、適切に定義された Delta Live Tables 構文を持つ任意のノートブックでパイプラインを作成することもできます。

Delta Live Tables パイプラインを構成し、Databricks ワークスペース UI または API、CLI、Databricks アセット バンドルなどの自動ツール オプションを使用して、または Databricks ワークフロー内のタスクとして更新をトリガーできます。 Delta Live Tables の機能と特長を理解するために、Databricks では、まず UI を使用してパイプラインを作成および実行することをお勧めします。 さらに、UI でパイプラインを構成すると、Delta Live Tables はプログラムによるワークフローの実装に使用できるパイプラインの JSON 構成を生成します。

Delta Live Tables機能を実証するために、このチュートリアルの例では、公開されているデータセットをダウンロードします。 ただし、 Databricks 、実際のユースケースを実装する Pipeline で使用されるデータソースに接続してデータを取り込む方法がいくつかあります。 「Delta Live Tables を使用してデータを取り込む」を参照してください。

要件

  • パイプラインを開始するには、 クラスター作成権限、またはDelta Live Tablesクラスターを定義するクラスターポリシーへのアクセス権が必要です。 Delta Live Tables ランタイムはパイプラインを実行する前にクラスターを作成しますが、適切な権限がない場合は失敗します。

  • このチュートリアルの例を使用するには、ワークスペースでUnity Catalogが有効になっている必要があります。

  • Unity Catalogで次の権限が必要です。

    • READ VOLUME WRITE VOLUME、または ALL PRIVILEGESは、my-volumeボリュームです。

    • USE SCHEMA またはdefaultスキーマのALL PRIVILEGES

    • USE CATALOG またはmainカタログの場合はALL PRIVILEGES

    これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。

  • このチュートリアルの例では、Unity Catalogボリュームを使用してサンプル データを保存します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ名、スキーマ名、およびボリューム名を使用して、例で使用するボリュームパスを設定します。

ワークスペースでUnity Catalog 有効になっていない場合は、 必要としないサンプルを含む ノートブックが Unity Catalogこの記事に添付されています。これらの例を使用するには、パイプラインの作成時にストレージ オプションとしてHive metastoreを選択します。

Delta Live Tables クエリはどこで実行しますか?

Delta Live Tablesクエリは主にDatabricksノートブックに実装されていますが、 Delta Live Tablesノートブックのセルで対話的に実行できるように設計されていません。 Databricks ノートブックで Delta Live Tables 構文を含むセルを実行すると、エラー メッセージが表示されます。 クエリを実行するには、ノートブックをパイプラインの一部として構成する必要があります。

重要

  • Delta Live Tablesのクエリを作成する場合、ノートブックのセルごとの実行順序に依存することはできません。 Delta Live Tablesノートブックで定義されたすべてのコードを評価して実行しますが、ノートブック実行 allコマンドとは実行モデルが異なります。

  • 単一の Delta Live Tables ソース コード ファイル内で言語を混在させることはできません。 たとえば、ノートブックには Python クエリまたは SQL クエリのみを含めることができます。 パイプラインで複数の言語を使用する必要がある場合は、パイプラインで複数の言語固有のノートブックまたはファイルを使用します。

ファイルに保存された Python コードを使用することもできます。 たとえば、Python パイプラインにインポートできる Python モジュールを作成したり、SQL クエリで使用する Python ユーザー定義関数 (UDF) を定義したりすることができます。 Python モジュールのインポートの詳細については、 「Git フォルダーまたはワークスペース ファイルから Python モジュールをインポートする」を参照してください。 Python UDF の使用方法については、 「ユーザー定義スカラー関数 - Python」を参照してください。

例: ニューヨークの赤ちゃんの名前データの取り込みと処理

この記事の例では、ニューヨーク州の赤ちゃんの名前の記録を含む、公開されているデータセットを使用しています。 これらの例は、Delta Live Tables パイプラインを使用して次のことを行う方法を示しています。

  • 公開されているデータセットから生の CSV データをテーブルに読み取ります。

  • 生データテーブルからレコードを読み取り、Delta Live Tables Expectationsを使用して、クレンジングされたデータを含む新しいテーブルを作成します。

  • クレンジングされたレコードを、派生データセットを作成するDelta Live Tablesクエリへの入力として使用します。

このコードは、メダリオンアーキテクチャの簡略化された例を示しています。「メダリオンレイクハウスアーキテクチャとは」を参照してください。

この例の実装は、 PythonおよびSQLインターフェイス用に提供されています。 ステップ に従ってサンプル コードを含む新しいパイプラインを作成することも、ステップ 1 に進んで、このページで提供されているパイプラインいずれかを使用することもできます。

Python で Delta Live Tables パイプラインを実装する

Delta Live Tables データセットを作成する Python コードは、DataFrames を返す必要があります。 Python と DataFrames に慣れていないユーザーには、Databricks では SQL インターフェイスの使用を推奨しています。 「SQL を使用して Delta Live Tables パイプラインを実装する」を参照してください。

すべてのDelta Live Tables Python APIs dlt モジュールに実装されています。 Python で実装された Delta Live Tables パイプライン コードは、Python ノートブックおよびファイルの先頭にあるdltモジュールを明示的にインポートする必要があります。 Delta Live Tables 、多くのPythonスクリプトとは重要な点で異なります。Delta Delta Live Tablesデータセットを作成するために、データ取り込みと変換を実行する関数を呼び出す必要はありません。 代わりに、Delta Live Tables は、パイプラインにロードされたすべてのファイル内のdltモジュールからのデコレーター関数を解釈し、データフロー グラフを構築します。

このチュートリアルの例を実装するには、次の Python コードをコピーして新しい Python ノートブックに貼り付けます。 各サンプル コード スニペットを、説明されている順序でノートブック内の独自のセルに追加します。 ノートブックを作成するためのオプションを確認するには、 「ノートブックの作成」を参照してください。

Python インターフェイスを使用してパイプラインを作成する場合、デフォルトでは、テーブル名は関数名によって定義されます。 たとえば、次の Python の例では、 baby_names_rawbaby_names_preparedtop_baby_names_2021という名前の 3 つのテーブルが作成されます。 nameを使用してテーブル名をオーバーライドできます。 「Delta Live Tables マテリアライズド ビューまたはストリーミング テーブルの作成」を参照してください。

重要

パイプラインの実行時に予期しない動作を回避するには、データセットを定義する関数に副作用を引き起こす可能性のあるコードを含めないでください。 詳細については、 Python リファレンスを参照してください。

Delta Live Tablesモジュールをインポートする

すべてのDelta Live Tables Python APIは、dlt モジュールに実装されています。Pythonノートブックやファイルの先頭でdltモジュールを明示的にインポートする。

次の例は、このインポートと、pyspark.sql.functionsのインポートステートメントを示しています。

import dlt
from pyspark.sql.functions import *

データのダウンロード

この例のデータを取得するには、次のように CSV ファイルをダウンロードしてボリュームに保存します。

import os

os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"

dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")

<catalog-name><schema-name>、および <volume-name> を、 Unity Catalogボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。

オブジェクトストレージ内のファイルからのテーブルの作成

Delta Live Tables は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。

@dlt.tableデコレータは、関数によって返されたDataFrameの結果を含むテーブルを作成するように Delta Live Tables に指示します。 @dlt.tablePythonSparkDataFrameに新しいテーブルを登録するための を返す 関数定義の前に、Delta Live Tables デコレータを追加します。次の例は、関数名をテーブル名として使用し、テーブルに説明コメントを追加する方法を示しています。

@dlt.table(
  comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
  df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
  df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
  return df_renamed_column

パイプラインの上流のデータセットからテーブルを追加する

dlt.read()を使用して、現在のDelta Live Tablesパイプラインで宣言されている他のデータセットからデータを読み取ることができます。この方法で新しいテーブルを宣言すると、更新を実行する前にDelta Live Tablesによって自動的に解決される依存関係が作成されます。次のコードには、エクスペクテーションに基づいてデータ品質を監視および強制する例も含まれています。「Delta Live Tablesによるデータ品質の管理」を参照してください。

@dlt.table(
  comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
  return (
    dlt.read("baby_names_raw")
      .withColumnRenamed("Year", "Year_Of_Birth")
      .select("Year_Of_Birth", "First_Name", "Count")
  )

エンリッチされたデータビューを含むテーブルを作成する

Delta Live Tablesはパイプラインの更新を一連の依存関係グラフとして処理するので、特定のビジネスロジックを含むテーブルを宣言することで、ダッシュボード、BI、分析を強化する高度に強化されたビューを宣言できます。

Delta Live Tables 内のテーブルは、概念的にはマテリアライズド ビューと同等です。 ビューがクエリされるたびにロジックを実行する Spark の従来のビューとは異なり、Delta Live Tables テーブルは、クエリ結果の最新バージョンをデータ ファイルに保存します。 Delta Live Tables はパイプライン内のすべてのデータセットの更新を管理するため、マテリアライズド ビューのレイテンシ要件に合わせてパイプラインの更新をスケジュールし、これらのテーブルに対するクエリに利用可能な最新バージョンのデータが含まれていることを確認できます。

以下のコードで定義されたテーブルは、パイプラインの上流データから派生したマテリアライズドビューとの概念的な類似性を示しています:

@dlt.table(
  comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
  return (
    dlt.read("baby_names_prepared")
      .filter(expr("Year_Of_Birth == 2021"))
      .groupBy("First_Name")
      .agg(sum("Count").alias("Total_Count"))
      .sort(desc("Total_Count"))
      .limit(10)
  )

ノートブックを使用するパイプラインを構成するには、 「パイプラインの作成」を参照してください。

SQL を使用して Delta Live Tables パイプラインを実装する

DatabricksDelta Live Tablesでは、SQL SQLユーザーが 上に新しいETL 、取り込み、および変換パイプラインを構築するための推奨方法として、 を使用したDatabricks を推奨しています。Delta Live Tables の SQL インターフェイスは、多くの新しいキーワード、構造、およびテーブル値関数を使用して標準の Spark SQL を拡張します。 標準SQLへのこれらの追加により、ユーザーは新しいツールや追加の概念を学習することなく、データセット間の依存関係を宣言し、本番運用グレードのインフラストラクチャをデプロイできるようになります。

Spark DataFrames に精通しており、メタプログラミング操作など、SQL で実装するのが難しいより広範なテストと操作のサポートを必要とするユーザーの場合、Databricks では Python インターフェイスの使用を推奨しています。 「Python を使用して Delta Live Tables パイプラインを実装する」を参照してください。

データのダウンロード

この例のデータを取得するには、次のコードをコピーし、新しいノートブックに貼り付けて、ノートブックを実行します。 ノートブックを作成するためのオプションを確認するには、 「ノートブックを作成する」を参照してください。

%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"

<catalog-name><schema-name>、および <volume-name> を、 Unity Catalogボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。

Unity Catalog内のファイルからテーブルを作成する

この例の残りの部分では、次の SQL スニペットをコピーし、前のセクションのノートブックとは別の新しい SQL ノートブックに貼り付けます。 各サンプル SQL スニペットを、説明されている順序でノートブック内の独自のセルに追加します。

Delta Live Tables は、Databricks でサポートされているすべての形式からのデータの読み込みをサポートしています。 「データ形式のオプション」を参照してください。

Delta Live TablesのすべてのSQLステートメントは、CREATE OR REFRESH構文とセマンティクスを使用します。パイプラインを更新すると、Delta Live Tablesは、増分処理によってテーブルの論理的に正しい結果を達成できるか、それとも完全な再計算が必要かどうかを判断します。

次の例では、 ボリュームに保存されている ファイルからデータをロードしてテーブルを作成します。CSVUnity Catalog

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
  '/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

<catalog-name><schema-name>、および <volume-name> を、 Unity Catalogボリュームのカタログ、スキーマ、およびボリューム名に置き換えます。

上流のデータセットからパイプラインにテーブルを追加する

live 仮想スキーマを使用して、現在の Delta Live Tables パイプラインで宣言されている他のデータセットからデータをクエリできます。この方法で新しいテーブルを宣言すると、Delta Live Tables が更新を実行する前に自動的に解決する依存関係が作成されます。 live スキーマは、Delta Live Tables に実装されるカスタム キーワードであり、データセットを発行する場合にターゲット スキーマに置き換えることができます。パイプラインでの Unity Catalogの使用Delta Live Tables および Delta Live Tables従来の での パイプラインの使用Hive metastore を参照してください。

次のコードには、エクスペクテーションに基づいてデータ品質を監視および強制する例も含まれています。Delta Live Tablesによるデータ品質の管理を参照してください

CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
  CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
  CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
  Year AS Year_Of_Birth,
  First_Name,
  Count
FROM live.baby_names_sql_raw;

エンリッチしたデータビューを作成する

Delta Live Tablesはパイプラインの更新を一連の依存関係グラフとして処理するので、特定のビジネスロジックを含むテーブルを宣言することで、ダッシュボード、BI、分析を強化する高度に強化されたビューを宣言できます。

次のクエリでは、具体化されたビューを使用して、アップストリーム データからエンリッチされたビューを作成します。 ビューがクエリされるたびにロジックを実行する Spark の従来のビューとは異なり、マテリアライズド ビューはクエリ結果の最新バージョンをデータ ファイルに保存します。 Delta Live Tables はパイプライン内のすべてのデータセットの更新を管理するため、マテリアライズド ビューのレイテンシ要件に合わせてパイプラインの更新をスケジュールし、これらのテーブルに対するクエリに利用可能な最新バージョンのデータが含まれていることを確認できます。

CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
  First_Name,
  SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;

ノートブックを使用するパイプラインを構成するには、 「パイプラインの作成」に進みます。

パイプラインを作成する

  • コンピュート リソースはサーバレス DLT パイプラインのフル プロバイダであるため、パイプラインにサーバレスを選択した場合はコンピュート設定は使用できません。

  • サーバーレス DLT パイプラインの適格性と有効化に関する情報については、 「サーバーレス コンピュートの有効化」を参照してください。

Delta Live Tables は、Delta Live Tables 構文を使用して、ノートブックまたはファイル ( ソース コードと呼ばれます) で定義された依存関係を解決することでパイプラインを作成します。 各ソース コード ファイルには 1 つの言語のみを含めることができますが、パイプラインで異なる言語のソース コードを混在させることができます。

  1. サイドバーの「Delta Live Tables」をクリックし、 「Create Pipeline」をクリックします。

  2. パイプラインに名前を付けます。

  3. (オプション)サーバレス DLT パイプラインを使用してパイプラインを実行するには、[ サーバレス ]チェックボックスをオンにします。 サーバレスを選択すると、コンピュートの設定がUIから削除され、Budget ポリシーの設定が表示されます。サーバレス Delta Live Tables パイプラインの設定を参照してください。

  4. (オプション) 製品エディションを選択します。

  5. [パイプラインモード][トリガー]を選択します。

  6. パイプラインのソース コードを含む 1 つ以上のノートブックを構成します。 [パス]テキストボックスにノートブックへのパスを入力するか、ファイルピッカーアイコンを押してノートブックを選択します。

  7. パイプラインによって公開されるデータセットの宛先 ( Hive metastoreまたはUnity Catalogを選択します。 データセットの公開 を参照してください。

    • Hive metastore :

      • (オプション)パイプラインからの出力データの保存場所を入力します。ストレージの場所を空のままにすると、システムはデフォルトの場所を使用します。

      • (オプション) データセットを に公開するための ターゲット スキーマHive metastore を指定します。

    • Unity Catalog: データセットを に公開するための カタログ ターゲット スキーマUnity Catalog を指定します。

  8. (オプション) サーバレスを選択していない場合は、パイプラインのコンピュート設定を行うことができます。 コンピュート設定のオプションについては、「Delta Live Tables パイプラインのコンピュートを構成する」を参照してください。

  9. (オプション)[ 通知の追加 ] をクリックして、パイプライン イベントの通知を受信する 1 つ以上の Eメール アドレスを構成します。 「パイプラインイベントのEメール通知の追加」を参照してください。

  10. (オプション)パイプラインの詳細設定を構成します。 詳細設定のオプションについては、「 Delta Live Tables パイプラインの構成」を参照してください。

  11. [作成]をクリックします。

[作成] をクリックすると、[パイプラインの詳細]ページが表示されます。 Delta Live Tablesタブでパイプライン名をクリックしてパイプラインにアクセスすることもできます。

パイプラインの更新を開始する

パイプラインの更新を開始するには、 Delta Live Tables開始アイコン 上部パネルのボタンをクリックします。 パイプラインが開始されていることを確認するメッセージが返されます。

更新が正常に開始された後、Delta Live Tablesシステムは次のようになります:

  1. Delta Live Tables システムによって作成されたクラスター構成を使用してクラスターを開始します。 カスタム クラスター構成を指定することもできます。

  2. 存在しないテーブルを作成し、既存のテーブルのスキーマが正しいことを確認します。

  3. 使用可能な最新のデータでテーブルを更新します。

  4. アップデートが完了したらクラスターをシャットダウンします。

実行モードはデフォルトで [ 本番 ] に設定され、更新ごとに一時的なコンピュート リソースがデプロイされます。 開発モードを使用してこの動作を変更し、 開発 およびテスト中に同じコンピュート リソースを複数のパイプライン更新に使用できます。 開発モードと本番モードを参照してください。

データセットを公開する

Delta Live Tablesデータセットをクエリに使用できるようにするには、テーブルを Hive metastore または Unity Catalogにパブリッシュします。データの公開ターゲットを指定しない場合、 Delta Live Tables パイプラインで作成されたテーブルには、同じパイプライン内の他のオペレーションからのみアクセスできます。 Delta Live Tablesレガシー での パイプラインの使用Hive metastore および パイプラインでの Unity Catalogの使用Delta Live Tables を参照してください。

ソースコードのノートブックの例

これらのノートブックを Databricks ワークスペースにインポートし、それを使用して Delta Live Tables パイプラインをデプロイできます。 「パイプラインの作成」を参照してください。

Delta Live Tablesを使い始めるノートブック(Python版)

ノートブックを新しいタブで開く

Delta Live Tablesを使い始めるノートブック(SQL版)

ノートブックを新しいタブで開く

Unity Catalogなしのワークスペースのソースコード例

有効にしなくても、これらのノートブックをDatabricks ワークスペースにインポートし、Unity Catalog Delta Live Tablesパイプラインのデプロイに使用できます。「パイプラインの作成」を参照してください。

Delta Live Tablesを使い始めるノートブック(Python版)

ノートブックを新しいタブで開く

Delta Live Tablesを使い始めるノートブック(SQL版)

ノートブックを新しいタブで開く