Delta Live Tables シンクを使用してレコードを外部サービスにストリームする
プレビュー
Delta Live Tables sink
API は プライベート プレビュー段階です。
この記事では、Delta Live Tablessink
API Unity CatalogHive metastoreApacheKafkaAmazonKinesisと DLT フロー を使用して、パイプラインによって変換されたレコードを外部データ シンク ( 管理テーブルや外部テーブル、 テーブル、イベント ストリーミング サービス ( 、 、などのイベント ストリーミング サービスに書き込む方法について説明します) またはAzure Event Hubs。
Delta Live Tables シンクとは何ですか?
Delta Live Tables シンクを使用すると、 Apache Kafka、 Amazon Kinesis、 Azure Event Hubs などのイベント ストリーミング サービスや、 Unity Catalog または the Hive metastoreによって管理される外部テーブルなどのターゲットに変換されたデータを書き込むことができます。 以前は、Delta Live Tables パイプラインで作成されたストリーミング テーブルとマテリアライズドビューは、Databricks で管理される Delta テーブルにのみ保持できました。 シンクを使用すると、Delta Live Tables パイプラインの出力を保持するためのオプションが増えました。
Delta Live Tables シンクはいつ使用する必要がありますか?
Databricks では、次の必要がある場合は Delta Live Tables シンクを使用することをお勧めします。
不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。
変換されたデータを Delta Live Tables フローから、外部 Delta インスタンスによって管理されるテーブル ( Unity Catalog 管理テーブル、外部テーブル、 Hive metastore テーブルなど) に書き込みます。
トピックなど、 ETL外部のシンクへの逆抽出、変換、ロード()を行います。DatabricksApacheKafkaこのアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。
Delta Live Tables シンク操作方法使用しますか?
注:
spark.readStream
とdlt.read_stream
を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。シンクへの書き込みに使用できるのは
append_flow
のみです。apply_changes
などの他のフローはサポートされていません。完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。
イベント・データがストリーミング・ソースから Delta Live Tables パイプラインに取り込まれるとき、 Delta Live Tables 機能を使用してこのデータを処理および調整し、アペンド・フロー処理を使用して、変換されたデータ・レコードを Delta Live Tables シンクにストリームします。 このシンクは、 create_sink()
関数を使用して作成します。 create_sink
関数の使用の詳細については、シンク API リファレンスを参照してください。
Delta Live Tables シンクを実装するには、次の手順に従います。
Delta Live Tables パイプラインを設定して、ストリーミング イベント データを処理し、Delta Live Tables シンクに書き込むためのデータ レコードを準備します。
優先ターゲット シンク形式を使用するように Delta Live Tables シンクを構成して作成します。
追加フローを使用して、準備されたレコードをシンクに書き込みます。
これらの手順については、このトピックの残りの部分で説明します。
Delta Live Tables パイプラインを設定して、シンクに書き込むためのレコードを準備する
最初の手順では、Delta Live Tables パイプラインを設定して、未加工のイベント ストリーム データをシンクに書き込む準備済みデータに変換します。
このプロセスをよりよく理解するために、Databricks の wikipedia-datasets
サンプル データからのクリックストリーム イベント データを処理する Delta Live Tables パイプラインの次の例に従うことができます。 このパイプラインは、未加工のデータセットを解析して、Apache Spark のドキュメンテーション ページにリンクしている Wikipedia ページを特定し、そのデータを参照リンクに含まれるテーブル行のみに段階的に絞り込みます Apache_Spark.
この例では、 Delta Live Tables パイプラインは メダリオンアーキテクチャを使用して構造化されており、データをさまざまなレイヤーに整理して品質と処理効率を向上させています。
まず、JSON を使用して、データセットの生の レコードをブロンズレイヤーにロードします。Auto Loaderこの Python コードは、ソースからの未処理の生データを含む clickstream_raw
という名前のストリーミングテーブルを作成する方法を示しています。
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
このコードの実行後、データはメダリオンアーキテクチャの "bronze" (または "生データ") レベルになり、クリーンアップする必要があります。 次の手順では、データを "シルバー" レベルに絞り込み、データ型と列名をクリーンアップし、 Delta Live Tables エクスペクテーションを使用してデータの完全性を確保します。
次のコードは、ブロンズレイヤーのデータを clickstream_clean
silver テーブルにクリーニングして検証することで、これを行う方法を示しています。
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
パイプライン構造の「ゴールドレイヤー」を作成するには、クリーニングされたクリックストリームデータをフィルタリングして、参照ページが Apache_Spark
エントリを分離します。 この最後のコード例では、ターゲット シンク テーブルへの書き込みに必要な列のみを選択します。
次のコードは、ゴールドレイヤーを表す spark_referrers
というテーブルを作成する方法を示しています。
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
このデータ準備プロセスが完了したら、クリーニングされたレコードが書き込まれる宛先シンクを構成する必要があります。
Delta Live Tables シンクを構成する
Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。
Delta テーブルシンク
Apache Kafka シンク
Azure Event Hubs シンク
Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。
ファイルパスで Delta シンクを作成するには:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
完全修飾カタログとスキーマパスを使用してテーブル名で Delta シンクを作成するには:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
このコードは、Apache Kafka シンクと Azure Event Hubs シンクの両方で機能します。
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"subscribe": "dlt-sink",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
シンクが構成され、Delta Live Tables パイプラインが準備されたので、処理されたレコードをシンクにストリーミングを開始できます。
追加フローを使用して Delta Live Tables シンクに書き込む
シンクを構成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。 これを行うには、シンクを append_flow
デコレータのtarget
値として指定します。
Unity Catalog の管理テーブルと外部テーブルの場合は、
delta
形式を使用し、オプションでパスまたはテーブル名を指定します。 Delta Live Tables パイプラインは、Unity Catalog を使用するように構成する必要があります。Apache Kafka トピックの場合は、
kafka
の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka 構造化ストリーミング ライターの構成を参照してください。Azure Event Hubs の場合は、
kafka
形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。Hive metastoreテーブルの場合は、
delta
形式を使用し、オプションでパスまたはテーブル名を指定します。Delta Live Tables パイプラインは、Hive metastoreを使用するように設定する必要があります。
以下は、Delta Live Tables パイプラインによって処理されたレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
value
パラメーターは、Azure Event Hubs シンクに必須です。key
、 partition
、 headers
、 topic
などの追加のパラメーターはオプションです。
append_flow
デコレーターの詳細については、「追加フローを使用して複数のソース ストリームからストリーミング テーブルに書き込む」を参照してください。
制限事項
Python API のみがサポートされています。 SQL はサポートされていません。
spark.readStream
とdlt.read_stream
を使用したストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。シンクへの書き込みに使用できるのは
append_flow
のみです。apply_changes
などの他のフローはサポートされておらず、Delta Live Tables データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。@table("from_sink_table") def fromSink(): return read_stream("my_sink")
Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は
<catalog>.<schema>.<table>
の形式である必要があります。 Hive metastoreの場合、<schema>.<table>
形式である必要があります。FullRefresh
を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。Delta Live Tables のエクスペクテーションはサポートされていません。
Delta Live Tables シンク API リファレンス
注:
sink
APIを使用するには、 プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。完全な更新更新を実行しても、シンクからデータはクリアされません。再処理されたデータはシンクに追加され、既存のデータは変更されません。
Delta Live Tables のエクスペクテーションは、
sink
API ではサポートされていません。
Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスに書き込むか、Delta Live Tables パイプラインから Delta テーブルに書き込むには、dlt
Python モジュールに含まれている create_sink()
関数を使用します。create_sink()
関数を使用してシンクを作成した後、追加フローでシンクを使用してデータをシンクに書き込みます。追加フローは、 create_sink()
関数でサポートされている唯一のフロー タイプです。 その他のフロー タイプ( apply_changes
など)はサポートされていません。
次に、 create_sink()
関数を使用してシンクを作成するための構文を示します。
create_sink(<sink_name>, <format>, <options>)
引数 |
---|
タイプ: シンクを識別し、シンクの参照と管理に使用される文字列。 シンク名は、パイプラインの一部であるノートブックやモジュールなど、すべてのソース コードを含め、パイプラインに対して一意である必要があります。 このパラメーターは必須です。 |
タイプ: 出力形式 ( このパラメーターは必須です。 |
タイプ: シンクオプションのオプションのリストで、 |
例: create_sink()
関数を使用した Kafka シンクの作成
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
例: create_sink()
関数とファイル・システム・パスを持つ Delta シンクの作成
次の例では、ファイル システム パスをテーブルに渡すことで Delta テーブルに書き込むシンクを作成します。
create_sink(
"my_delta_sink",
"delta",
{ "path": "//path/to/my/delta/table" }
)
例: create_sink()
関数と Unity Catalog テーブル名を使用して Delta シンクを作成する
注:
Deltaシンクは、Unity Catalog外部およびマネージドテーブルとHive metastoreマネージドテーブルをサポートします。テーブル名は完全修飾名である必要があります。 たとえば、 Unity Catalog テーブルでは、<catalog>.<schema>.<table>
という 3 層の識別子を使用する必要があります。 Hive metastore テーブルでは<schema>.<table>
を使用する必要があります。
次の例では、Unity Catalog のテーブルの名前を渡すことで Delta テーブルに書き込むシンクを作成します。
create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)
例: 追加フローを使用して Delta シンクに書き込む
次の例では、Delta テーブルに書き込むシンクを作成し、そのシンクに書き込む追加フローを作成します。
create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
@append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
例: 追加フローを使用して Kafka シンクに書き込む
次の例では、Kafka トピックに書き込むシンクを作成し、そのシンクに書き込むための追加フローを作成します。
create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
@append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))
Kafka に書き込まれる DataFrame のスキーマには、「 Kafka 構造化ストリーミング ライターの構成」で指定されている列が含まれている必要があります。