DLT シンクを使用してレコードを外部サービスにストリームする
プレビュー
DLT sink
API は パブリック プレビュー段階です。
この記事では、DLT sink
API と DLT フロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンク ( Unity Catalog マネージド テーブルや外部テーブル、 Hive metastore テーブル、イベント ストリーミング サービス ( Apache Kafka や Event Hubs など) に書き込む方法について説明します Azure 。
DLTシンクとは?
DLT シンクを使用すると、 Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、 Unity Catalog または Hive metastoreによって管理される外部テーブルなどのターゲットに変換されたデータを書き込むことができます。 以前は、DLT パイプラインで作成されたストリーミングテーブルとマテリアライズドビューは、 Databricks 管理 Delta テーブルにのみ永続化できました。 シンクを使用すると、DLT パイプラインの出力を保持するためのオプションが増えました。
DLTシンクはいつ使用すべきですか?
Databricks では、次の必要がある場合は DLT シンクを使用することをお勧めします。
- 不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。
- DLT フローから変換されたデータを、外部 Delta インスタンスによって管理されるテーブル ( Unity Catalog 管理テーブル、外部テーブル、 Hive metastore テーブルなど) に書き込みます。
- トピックなど、 ETL外部のシンクへの逆抽出、変換、ロード()を行います。DatabricksApacheKafkaこのアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。
DLTシンクの使用方法を教えてください。
spark.readStream
とdlt.read_stream
を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。- シンクへの書き込みに使用できるのは
append_flow
のみです。apply_changes
などの他のフローはサポートされていません。 - 完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。
イベント データがストリーミング ソースから DLT パイプラインに取り込まれるときは、DLT 機能を使用してこのデータを処理および調整し、追加フロー処理を使用して、変換されたデータ レコードを DLT シンクにストリームします。 このシンクは、 create_sink()
関数を使用して作成します。 create_sink
関数の使用の詳細については、シンク API リファレンスを参照してください。
DLT シンクを実装するには、次の手順を使用します。
- DLT パイプラインを設定して、ストリーミング イベント データを処理し、DLT シンクに書き込むためのデータ レコードを準備します。
- 優先ターゲット シンク形式を使用するように DLT シンクを構成および作成します。
- 追加フローを使用して、準備されたレコードをシンクに書き込みます。
これらの手順については、このトピックの残りの部分で説明します。
シンクに書き込むためのレコードを準備するための DLT パイプラインの設定
最初の手順では、DLT パイプラインを設定して、未加工のイベント ストリーム データをシンクに書き込む準備済みデータに変換します。
このプロセスをよりよく理解するために、Databricks の wikipedia-datasets
サンプル データからのクリックストリーム イベント データを処理する DLT パイプラインの次の例に従うことができます。このパイプラインは、未加工のデータセットを解析して、Apache Spark のドキュメンテーション ページにリンクしている Wikipedia ページを特定し、そのデータを参照リンクに含まれるテーブル行のみに段階的に絞り込みます Apache_Spark.
この例では、DLT パイプラインは メダリオンアーキテクチャを使用して構造化されており、データをさまざまなレイヤーに整理して品質と処理効率を向上させています。
まず、JSON を使用して、データセットの生の レコードをブロンズレイヤーにロードします。Auto Loaderこの Python コードは、ソースからの未処理の生データを含む clickstream_raw
という名前のストリーミングテーブルを作成する方法を示しています。
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
このコードの実行後、データはメダリオンアーキテクチャの "bronze" (または "生データ") レベルになり、クリーンアップする必要があります。 次の手順では、データを "シルバー" レベルに絞り込み、データ型と列名をクリーンアップし、DLT の期待値を使用してデータの完全性を確保します。
次のコードは、ブロンズレイヤーのデータを clickstream_clean
silver テーブルにクリーニングして検証することで、これを行う方法を示しています。
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
パイプライン構造の「ゴールドレイヤー」を作成するには、クリーニングされたクリックストリームデータをフィルタリングして、参照ページが Apache_Spark
エントリを分離します。 この最後のコード例では、ターゲット シンク テーブルへの書き込みに必要な列のみを選択します。
次のコードは、ゴールドレイヤーを表す spark_referrers
というテーブルを作成する方法を示しています。
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
このデータ準備プロセスが完了したら、クリーニングされたレコードが書き込まれる宛先シンクを構成する必要があります。
DLT シンクの構成
Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。
- Delta テーブルシンク
- Apache Kafka シンク
- Azure Event Hubs シンク
Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。
- Delta sinks
- Kafka and Azure Event Hubs sinks
To create a Delta sink by file path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
To create a Delta sink by table name using a fully qualified catalog and schema path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
This code works for both Apache Kafka and Azure Event Hubs sinks.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
シンクが構成され、DLT パイプラインが準備されたので、処理されたレコードをシンクにストリーミングを開始できます。
追加フローを使用した DLT シンクへの書き込み
シンクを構成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。 これを行うには、シンクを append_flow
デコレータのtarget
値として指定します。
- Unity Catalog の管理テーブルと外部テーブルの場合は、
delta
形式を使用し、オプションでパスまたはテーブル名を指定します。 DLT パイプラインは、Unity Catalog を使用するように構成する必要があります。 - Apache Kafka トピックの場合は、
kafka
の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka構造化ストリーミング ライターの構成を参照してください。 - Azure Event Hubs の場合は、
kafka
形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。 - Hive metastoreテーブルの場合は、
delta
形式を使用し、オプションでパスまたはテーブル名を指定します。DLT パイプラインは、 Hive metastore.
以下は、DLT パイプラインによって処理されるレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。
- Delta sink
- Kafka and Azure Event Hubs sinks
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
The value
parameter is mandatory for an Azure Event Hubs sink. Additional parameters such as key
, partition
, headers
, and topic
are optional.
append_flow
デコレーターの詳細については、「追加フローを使用して複数のソース ストリームからストリーミングテーブルに書き込む」を参照してください。
制限
-
Python API のみがサポートされています。 SQL はサポートされていません。
-
spark.readStream
とdlt.read_stream
を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。 -
シンクへの書き込みに使用できるのは
append_flow
のみです。apply_changes
などの他のフローはサポートされておらず、DLT データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は
<catalog>.<schema>.<table>
の形式である必要があります。 Hive metastoreの場合、<schema>.<table>
形式である必要があります。 -
FullRefresh
を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。 -
DLT の期待値はサポートされていません。