Delta Live Tables を使用したデータのロード

Delta Live Tables を使用して、 Databricks で Apache Spark でサポートされている任意の DATA からデータを読み込むことができます。データセット (テーブルとビュー) は、Spark DataFrame を返す任意のクエリー (ストリーミング DataFrame s や Spark DataFrames の Pandas など) に対して DeltaLive Tables で定義できます。データ取り込みタスクの場合、Databricks では、ほとんどのユース ケースでストリーミング テーブルを使用することをお勧めします。 ストリーミングテーブルは、 Auto Loader を使用してクラウドオブジェクトストレージから、またはKafkaなどのメッセージバスからデータを取り込むのに適しています。 次の例は、いくつかの一般的なパターンを示しています。

重要

すべてのデータソースがSQLをサポートしているわけではありません。 Delta Live Tables パイプラインで SQL ノートブックと Python ノートブックを混在させて、インジェスト以外のすべての操作に SQL を使用できます。

Delta Live Tables にデフォルトでパッケージ化されていないライブラリの操作の詳細については、 「Delta Live Tables パイプラインの Python 依存関係の管理」を参照してください。

クラウドオブジェクトストレージからファイルをロードする

Databricks では、クラウド オブジェクト ストレージからのほとんどのデータ取り込みタスクに Auto Loader with Delta Live Tables を使用することをお勧めします。 Auto Loader と Delta Live Tables は、クラウドストレージに到着すると、増え続けるデータを増分的かつべき等にロードするように設計されています。 次の例では、 Auto Loader を使用して CSV ファイルと JSON ファイルからデータセットを作成します。

Unity Catalog 対応パイプラインで Auto Loader を含むファイルを読み込むには、 外部ロケーションを使用する必要があります。 Delta Live Tables で Unity Catalog を使用する方法の詳細については、「 Delta Live Tables パイプラインで Unity Catalog を使用する」を参照してください。

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

Auto Loader とは Auto Loader SQL 構文を参照してください。

警告

ファイル通知で Auto Loader を使用し、パイプラインまたはストリーミング テーブルの完全更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用してクリーンアップを実行できます。

メッセージバスからのデータのロード

ストリーミング テーブルを使用してメッセージ バスからデータを取り込むように Delta Live Tables パイプラインを構成できます。 Databricks 、ストリーミング テーブルを継続的な実行と拡張オートスケールと組み合わせて、メッセージ バスからの低レイテンシの読み込みを最も効率的に取り込むことを推奨しています。 「拡張オートスケールDelta Live Tables Pipeline のクラスター使用率を最適化する」を参照してください。

たとえば、次のコードは、Kafka からデータを取り込むようにストリーミングテーブルを構成します。

import dlt

@dlt.table
def kafka_raw():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "<server:ip>")
      .option("subscribe", "topic1")
      .option("startingOffsets", "latest")
      .load()
  )

次の例のように、純粋な SQL でダウンストリーム操作を記述して、このデータに対してストリーミング変換を実行できます。

CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.kafka_raw)
WHERE ...

イベント ハブの操作例については、「 Azure イベント ハブを Delta Live Tables データソースとして使用する」を参照してください。

「ストリーミング データソースの構成」を参照してください。

外部システムからデータを読み込む

Delta Live Tables Databricksでサポートされているあらゆるデータソースからのデータの読み込みをサポートします。 「情報ソースへの接続」を参照してください。 サポートされているデータソースについては、レイクハウスフェデレーションを使用して外部データを読み込むこともできます。 レイクハウスフェデレーション にはDatabricks Runtime 13.3 LTS以上が必要であるため、レイクハウスフェデレーション を使用するには、プレビュー チャンネルを使用するように パイプラインを構成する必要があります。

一部のデータソースは、SQL で同等のサポートを受けていません。 これらのデータソースのいずれかでレイクハウスフェデレーションを使用できない場合は、スタンドアロンの Python ノートブックを使用してソースからデータを取り込むことができます。 その後、このノートブックを SQL ノートブックと共にソース ライブラリとして追加して、Delta Live Tables パイプラインを構築できます。 次の例では、リモート PostgreSQL テーブル内のデータの現在の状態にアクセスするためのマテリアライズドビューを宣言します。

import dlt

@dlt.table
def postgres_raw():
  return (
    spark.read
      .format("postgresql")
      .option("dbtable", table_name)
      .option("host", database_host_url)
      .option("port", 5432)
      .option("database", database_name)
      .option("user", username)
      .option("password", password)
      .load()
  )

クラウドオブジェクトストレージから小さなデータセットまたは静的データセットをロードする

Apache Spark の読み込み構文を使用して、小さなデータセットまたは静的なデータセットを読み込むことができます。 Delta Live Tables では、Databricks 上の Apache Spark でサポートされているすべてのファイル形式がサポートされています。 完全な一覧については、「 データ形式のオプション」を参照してください。

次の例は、JSON を読み込んで Delta Live Tables テーブルを作成する方法を示しています。

@dlt.table
def clickstream_raw():
  return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH LIVE TABLE clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;

SELECT * FROM format.`path`; SQL コンストラクトは、Databricks 上のすべての SQL 環境に共通です。これは、SQL と Delta Live Tablesを使用したファイルへの直接アクセスに推奨されるパターンです。

パイプライン内のシークレットを使用してストレージ認証情報に安全にアクセスする

Databricks シークレット を使用して、アクセス キーやパスワードなどの資格情報を格納できます。 パイプラインでシークレットを構成するには、パイプライン設定クラスター構成で Spark プロパティを使用します。 コンピュートの設定を構成するを参照してください。

次の例では、シークレットを使用して、 Auto Loader を使用して Azure データレイク ストレージ Gen2 (ADLS Gen2) ストレージ アカウントから入力データを読み取るために必要なアクセス キー を格納します 。これと同じ方法を使用して、パイプラインに必要なシークレット (S3 にアクセスするための AWS キー、Apache Hive metastoreのパスワードなど) を設定できます。

Azure Data Lake Storage Gen2 の操作の詳細については、「 Azure Data Lake Storage Gen2 と Blob Storage に接続する」を参照してください。

シークレット値を設定する spark_conf コンフィギュレーション キーに spark.hadoop. プレフィックスを追加する必要があります。

{
    "id": "43246596-a63f-11ec-b909-0242ac120002",
    "clusters": [
      {
        "spark_conf": {
          "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
        },
        "autoscale": {
          "min_workers": 1,
          "max_workers": 5,
          "mode": "ENHANCED"
        }
      }
    ],
    "development": true,
    "continuous": false,
    "libraries": [
      {
        "notebook": {
          "path": "/Users/user@databricks.com/DLT Notebooks/Delta Live Tables quickstart"
        }
      }
    ],
    "name": "DLT quickstart using ADLS2"
}

取り替える

  • <storage-account-name> を ADLS Gen2 ストレージ アカウント名に置き換えます。

  • <scope-name> を Databricks シークレットスコープ名に置き換えます。

  • <secret-name> を Azure ストレージ アカウントのアクセス キーを含むキーの名前に置き換えます。

import dlt

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
  comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(json_path)
  )

取り替える

  • <container-name> は、入力データを格納する Azure ストレージ アカウント コンテナーの名前です。

  • <storage-account-name> を ADLS Gen2 ストレージ アカウント名に置き換えます。

  • <path-to-input-dataset> を入力データセットへのパスに置き換えます。