メインコンテンツまでスキップ

DLT シンクを使用してレコードを外部サービスにストリームする

備考

プレビュー

DLT sink API は パブリック プレビュー段階です。

この記事では、DLT sink API と DLT フロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンク ( Unity Catalog マネージド テーブルや外部テーブル、 Hive metastore テーブル、イベント ストリーミング サービス ( Apache Kafka や Event Hubs など) に書き込む方法について説明します Azure 。

DLTシンクとは?

DLT シンクを使用すると、 Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、 Unity Catalog または Hive metastoreによって管理される外部テーブルなどのターゲットに変換されたデータを書き込むことができます。 以前は、DLT パイプラインで作成されたストリーミングテーブルとマテリアライズドビューは、 Databricks 管理 Delta テーブルにのみ永続化できました。 シンクを使用すると、DLT パイプラインの出力を保持するためのオプションが増えました。

DLTシンクはいつ使用すべきですか?

Databricks では、次の必要がある場合は DLT シンクを使用することをお勧めします。

  • 不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。
  • DLT フローから変換されたデータを、外部 Delta インスタンスによって管理されるテーブル ( Unity Catalog 管理テーブル、外部テーブル、 Hive metastore テーブルなど) に書き込みます。
  • トピックなど、 ETL外部のシンクへの逆抽出、変換、ロード()を行います。DatabricksApacheKafkaこのアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。

DLTシンクの使用方法を教えてください。

注記
  • spark.readStreamdlt.read_stream を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。
  • シンクへの書き込みに使用できるのは append_flow のみです。 apply_changesなどの他のフローはサポートされていません。
  • 完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

イベント データがストリーミング ソースから DLT パイプラインに取り込まれるときは、DLT 機能を使用してこのデータを処理および調整し、追加フロー処理を使用して、変換されたデータ レコードを DLT シンクにストリームします。 このシンクは、 create_sink() 関数を使用して作成します。 create_sink 関数の使用の詳細については、シンク API リファレンスを参照してください。

DLT シンクを実装するには、次の手順を使用します。

  1. DLT パイプラインを設定して、ストリーミング イベント データを処理し、DLT シンクに書き込むためのデータ レコードを準備します。
  2. 優先ターゲット シンク形式を使用するように DLT シンクを構成および作成します。
  3. 追加フローを使用して、準備されたレコードをシンクに書き込みます。

これらの手順については、このトピックの残りの部分で説明します。

シンクに書き込むためのレコードを準備するための DLT パイプラインの設定

最初の手順では、DLT パイプラインを設定して、未加工のイベント ストリーム データをシンクに書き込む準備済みデータに変換します。

このプロセスをよりよく理解するために、Databricks の wikipedia-datasets サンプル データからのクリックストリーム イベント データを処理する DLT パイプラインの次の例に従うことができます。このパイプラインは、未加工のデータセットを解析して、Apache Spark のドキュメンテーション ページにリンクしている Wikipedia ページを特定し、そのデータを参照リンクに含まれるテーブル行のみに段階的に絞り込みます Apache_Spark.

この例では、DLT パイプラインは メダリオンアーキテクチャを使用して構造化されており、データをさまざまなレイヤーに整理して品質と処理効率を向上させています。

まず、JSON を使用して、データセットの生の レコードをブロンズレイヤーにロードします。Auto Loaderこの Python コードは、ソースからの未処理の生データを含む clickstream_rawという名前のストリーミングテーブルを作成する方法を示しています。

Python
import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)

このコードの実行後、データはメダリオンアーキテクチャの "bronze" (または "生データ") レベルになり、クリーンアップする必要があります。 次の手順では、データを "シルバー" レベルに絞り込み、データ型と列名をクリーンアップし、DLT の期待値を使用してデータの完全性を確保します。

次のコードは、ブロンズレイヤーのデータを clickstream_clean silver テーブルにクリーニングして検証することで、これを行う方法を示しています。

Python
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)

パイプライン構造の「ゴールドレイヤー」を作成するには、クリーニングされたクリックストリームデータをフィルタリングして、参照ページが Apache_Sparkエントリを分離します。 この最後のコード例では、ターゲット シンク テーブルへの書き込みに必要な列のみを選択します。

次のコードは、ゴールドレイヤーを表す spark_referrers というテーブルを作成する方法を示しています。

Python
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)

このデータ準備プロセスが完了したら、クリーニングされたレコードが書き込まれる宛先シンクを構成する必要があります。

DLT シンクの構成

Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。

  • Delta テーブルシンク
  • Apache Kafka シンク
  • Azure Event Hubs シンク

Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。

To create a Delta sink by file path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

To create a Delta sink by table name using a fully qualified catalog and schema path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)

シンクが構成され、DLT パイプラインが準備されたので、処理されたレコードをシンクにストリーミングを開始できます。

追加フローを使用した DLT シンクへの書き込み

シンクを構成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。 これを行うには、シンクを append_flow デコレータのtarget値として指定します。

  • Unity Catalog の管理テーブルと外部テーブルの場合は、 delta 形式を使用し、オプションでパスまたはテーブル名を指定します。 DLT パイプラインは、Unity Catalog を使用するように構成する必要があります。
  • Apache Kafka トピックの場合は、 kafka の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka構造化ストリーミング ライターの構成を参照してください。
  • Azure Event Hubs の場合は、 kafka 形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。
  • Hive metastoreテーブルの場合は、delta 形式を使用し、オプションでパスまたはテーブル名を指定します。DLT パイプラインは、 Hive metastore.

以下は、DLT パイプラインによって処理されるレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。

Python
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

append_flow デコレーターの詳細については、「追加フローを使用して複数のソース ストリームからストリーミングテーブルに書き込む」を参照してください。

制限

  • Python API のみがサポートされています。 SQL はサポートされていません。

  • spark.readStreamdlt.read_stream を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。

  • シンクへの書き込みに使用できるのは append_flow のみです。 apply_changesなどの他のフローはサポートされておらず、DLT データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は <catalog>.<schema>.<table> の形式である必要があります。 Hive metastoreの場合、<schema>.<table>形式である必要があります。

  • FullRefreshを実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

  • DLT の期待値はサポートされていません。

リソース