シンクを使用して DLT でレコードを外部サービスにストリームする
プレビュー
DLT sink
API は パブリック プレビュー段階です。
この記事では、DLT sink
API と、 それをフロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。外部データ シンクには、Unity Catalog のマネージド テーブルと外部テーブル、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。
DLT sink
API は Python でのみ使用できます。
DLTシンクとは?
DLT シンクは、DLT フローのターゲットです。By デフォルト DLT フローは、ストリーミングテーブルまたはマテリアライズドビュー ターゲットにデータを出力します。 これらは両方とも Databricks マネージド Delta テーブルです。DLT シンクは、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity Catalog によって管理される外部テーブルなどのターゲットに変換されたデータを書き込むために使用する代替ターゲットです。シンクを使用すると、DLT パイプラインの出力を保持するためのオプションが増えました。
DLTシンクはいつ使用すべきですか?
Databricks では、次の必要がある場合は DLT シンクを使用することをお勧めします。
- 不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。
- DLT フローから変換されたデータを、Unity Catalog のマネージド テーブルや外部テーブルなど、外部の Delta インスタンスによって管理されるテーブルに書き込みます。
- トピックなど、 ETL外部のシンクへの逆抽出、変換、ロード()を行います。DatabricksApacheKafkaこのアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。
DLTシンクの使用方法を教えてください。
イベント・データがストリーミング・ソースから DLT パイプラインに取り込まれるとき、 DLT 機能を使用してこのデータを処理および調整し、アペンド・フロー処理を使用して、変換されたデータ・レコードを DLT シンクにストリームします。 このシンクは、 create_sink()
関数を使用して作成します。create_sink
関数の詳細については、シンク API リファレンスを参照してください。
ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備する DLT パイプラインがある場合は、DLT シンクを使用する準備ができています。
DLT シンクの実装は、次の 2 つの手順で構成されます。
- DLT シンクを作成します。
- 追加フローを使用して、準備されたレコードをシンクに書き込みます。
DLT シンクの作成
Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。
- Delta テーブル シンク (Unity Catalog のマネージド テーブルと外部テーブルを含む)
- Apache Kafka シンク
- Azure Event Hubs シンク
Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。
- Delta sinks
- Kafka and Azure Event Hubs sinks
To create a Delta sink by file path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
To create a Delta sink by table name using a fully qualified catalog and schema path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
This code works for both Apache Kafka and Azure Event Hubs sinks.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
create_sink
関数の使用の詳細については、シンク API リファレンスを参照してください。
シンクが作成されたら、処理されたレコードをシンクにストリーミングできます。
追加フローを使用した DLT シンクへの書き込み
シンクを作成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。これを行うには、シンクを append_flow
デコレータのtarget
値として指定します。
- Unity Catalog の管理テーブルと外部テーブルの場合は、
delta
形式を使用し、オプションでパスまたはテーブル名を指定します。 DLT パイプラインは、Unity Catalog を使用するように構成する必要があります。 - Apache Kafka トピックの場合は、
kafka
の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka構造化ストリーミング ライターの構成を参照してください。 - Azure Event Hubs の場合は、
kafka
形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 「Microsoft Entra ID を使用したサービスプリンシパル認証」と「Event Hubs Azure」を参照してください。
以下は、DLT パイプラインによって処理されるレコードを使用して Delta、Kafka、Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。
- Delta sink
- Kafka and Azure Event Hubs sinks
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
The value
parameter is mandatory for an Azure Event Hubs sink. Additional parameters such as key
, partition
, headers
, and topic
are optional.
append_flow
デコレーターの詳細については、「複数のフローを使用して 1 つのターゲットに書き込む」を参照してください。
制限
-
Python API のみがサポートされています。 SQL はサポートされていません。
-
ストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。
-
シンクへの書き込みに使用できるのは
append_flow
のみです。apply_changes
などの他のフローはサポートされておらず、DLT データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は
<catalog>.<schema>.<table>
の形式である必要があります。 Hive metastoreの場合、<schema>.<table>
形式である必要があります。 -
完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。
-
DLT のエクスペクテーションはサポートされていません。