Lakeflow Spark宣言型パイプラインでシンクを使用して外部サービスにレコードをストリーミングする
プレビュー
Lakeflow Spark宣言型パイプラインsink APIパブリック プレビュー段階です。
この記事では、 Lakeflow Spark宣言型パイプラインsink APIと、それをフローで使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。 外部データ シンクには、Unity Catalog で管理されるテーブルと外部テーブル、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。
sink API は Python でのみ使用できます。
シンクとは何ですか?
シンクはパイプライン内のフローのターゲットです。当然ながら、パイプライン フローはストリーミング テーブルまたはマテリアライズドビュー ターゲットのいずれかにデータを送信します。 これらは両方とも Databricks によって管理される Delta テーブルです。シンクは、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity Catalog によって管理される外部テーブルなどのターゲットに変換されたデータを書き込むために使用する代替ターゲットです。シンクを使用すると、パイプラインの出力を永続化するためのオプションが増えます。
シンクはいつ使用したらよいでしょうか?
Databricks では、次の場合にシンクを使用することをお勧めします。
- 不正行為の検出、トレンド分析、顧客への推奨事項などの運用上のユースケースを構築します。 運用ユースケースでは通常、Apache Kafka トピックなどのメッセージ バスからデータを読み取り、低レイテンシでデータを処理し、処理されたレコードをメッセージ バスに書き戻します。このアプローチにより、クラウド ストレージへの書き込みや読み取りを行わないため、レイテンシを短縮できます。
- フローから変換されたデータを、Unity Catalog 管理テーブルや外部テーブルなどの外部 Delta インスタンスによって管理されるテーブルに書き込みます。
- Apache Kafkaトピックなど、Databricks外部のシンクへのリバースETL(抽出、変換、ロード)を行います。このアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。
- Databricks で直接サポートされていないデータ形式に書き込む必要があります。Pythonカスタム データソースを使用すると、カスタムPythonコードを使用して任意のデータソースに書き込むシンクを作成できます。 PySparkカスタム データ ソース」を参照してください。
シンクはどのように使用すればいいですか?
イベント データがストリーミング ソースからパイプラインに取り込まれると、パイプライン機能を使用してこのデータを処理および調整し、追加フロー処理を使用して変換されたデータ レコードをシンクにストリーミングします。 このシンクはcreate_sink()関数を使用して作成します。create_sink関数の詳細については、シンク API リファレンスを参照してください。
ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備するパイプラインがある場合は、シンクを使用する準備ができています。
シンクの実装は、次の 2 つのステップで構成されます。
- シンクを作成します。
- 追加フローを使用して、準備されたレコードをシンクに書き込みます。
シンクを作成する
Databricks は、ストリーム データから処理されたレコードを書き込むいくつかの種類の宛先シンクをサポートしています。
- Deltaテーブルシンク ( Unity Catalog管理および外部テーブルを含む)
- Apache Kafka シンク
- Azure Event Hubs シンク
- Pythonカスタムデータソースを使用してPythonで記述されたカスタムシンク
以下は、 Delta 、 Kafka 、 Azure Event Hubs シンク、およびPythonカスタム データ ソースの構成の例です。
- Delta sinks
- Kafka and Azure Event Hubs sinks
- Python custom data sources
ファイル パスで Delta シンクを作成するには:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
完全修飾カタログとスキーマ パスを使用してテーブル名で Delta シンクを作成するには:
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
このコードは、Apache Kafka と Azure Event Hubs の両方のシンクで機能します。
credential_name = "<service-credential>"
eh_namespace_name = "dp-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
topic_name = "dp-sink"
dp.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"databricks.serviceCredential": credential_name,
"kafka.bootstrap.servers": bootstrap_servers,
"topic": topic_name
}
)
credential_name 、Unity Catalog サービスの資格情報への参照です。詳細については、 「 Unity Catalogサービスの認証情報を使用して外部クラウド サービスに接続する」を参照してください。
Pythonカスタム データソースがmy_custom_datasourceとして登録されていると仮定すると、次のコードでそのデータソースに書き込むことができます。
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python streaming
# data source that writes data to your system.
# Create LDP sink using my_custom_datasource
dp.create_sink(
name="custom_sink",
format="my_custom_datasource",
options={
<options-needed-for-custom-datasource>
}
)
# Create append flow to send data to RequestBin
@dp.append_flow(name="flow_to_custom_sink", target="custom_sink")
def flow_to_custom_sink():
return read_stream("my_source_data")
create_sink関数の使用の詳細については、シンク API リファレンスを参照してください。
シンクが作成されたら、処理されたレコードをシンクにストリーミングできるようになります。
追加フローでシンクに書き込む
シンクを作成したら、次のステップでは、追加フローによって出力されるレコードのターゲットとして指定して、処理済みのレコードをシンクに書き込みます。これを行うには、 append_flowデコレータでシンクをtarget値として指定します。
- Unity Catalog で管理されるテーブルと外部テーブルの場合は、
delta形式を使用し、オプションでパスまたはテーブル名を指定します。パイプラインは Unity Catalog を使用するように設定する必要があります。 - Apache Kafka トピックの場合は、
kafka形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。これらは、 Spark構造化ストリーミングKafkaシンクがサポートするオプションと同じです。 「Kafka 構造化ストリーミング ライターを構成する」を参照してください。 - Azure Event Hubs の場合は、
kafka形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 Microsoft Entra IDとAzure Event Hubsを使用したサービスプリンシパル認証を参照してください。
以下は、パイプラインによって処理されたレコードを Delta、Kafka、および Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。
- Delta sink
- Kafka and Azure Event Hubs sinks
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dp.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
valueパラメーターは、Azure Event Hubs シンクでは必須です。key 、 partition 、 headers 、およびtopicなどの追加の問題はオプションです。
append_flowデコレータの詳細については、 「複数のフローを使用して単一のターゲットに書き込む」を参照してください。
制限事項
-
Python API のみがサポートされています。SQL はサポートされていません。
-
ストリーミング クエリのみがサポートされます。バッチクエリはサポートされていません。
-
シンクへの書き込みに使用できるのは
append_flowのみです。create_auto_cdc_flowなどの他のフローはサポートされておらず、 Lakeflow Spark宣言型パイプライン データセット定義でシンクを使用することはできません。 たとえば、以下はサポートされていません。Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
Delta シンクの場合、テーブル名は完全修飾されている必要があります。具体的には、 Unity Catalogで管理される外部テーブルの場合、テーブル名は
<catalog>.<schema>.<table>形式にする必要があります。Hive Hive metastoreの場合、テーブル名は<schema>.<table>形式にする必要があります。 -
完全な更新アップデートを実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。 つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。
-
パイプラインの期待値はサポートされていません。