メインコンテンツまでスキップ

LakeFlow 宣言型パイプラインとシンクによる外部サービスへのレコードのストリーミング

備考

プレビュー

LakeFlow 宣言型パイプライン sink API はパブリック プレビュー段階です。

この記事では、 LakeFlow 宣言型パイプライン sink API と、 それをフロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。 外部データ シンクには、Unity Catalog のマネージド テーブルと外部テーブル、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。

注記

LakeFlow 宣言型パイプライン sink APIは、Pythonでのみ使用できます。

LakeFlow 宣言型パイプラインシンクとは

LakeFlow 宣言型パイプライン シンクは、 LakeFlow 宣言型パイプライン フローのターゲットです。 By デフォルト LakeFlow 宣言型パイプライン フローは、ストリーミング テーブルまたはマテリアライズドビュー ターゲットにデータを出力します。 これらは両方とも Databricks マネージド Delta テーブルです。LakeFlow 宣言型パイプライン シンクは、 Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、 Unity Catalogによって管理される外部テーブルなどのターゲットに変換されたデータを書き込むために使用する代替ターゲットです。 シンクを使用すると、 LakeFlow 宣言型パイプラインの出力を保持するためのオプションが増えました。

LakeFlow 宣言型パイプラインシンクはいつ使用する必要がありますか?

Databricksでは、次の要件がある場合にLakeFlow宣言型パイプライン シンクを使用することをお勧めします。

  • 不正検出、リアルタイム アナリティクス、顧客レコメンデーションなどの運用ユースケースを構築します。 運用上のユースケースでは、通常、Apache Kafka トピックなどのメッセージバスからデータを読み取り、データを低レイテンシで処理して、処理されたレコードをメッセージバスに書き戻します。 このアプローチにより、クラウドストレージからの書き込みや読み取りを行わないことで、レイテンシーを短縮できます。
  • 変換されたデータを LakeFlow 宣言型パイプライン フローから、外部 Delta インスタンスによって管理されるテーブル ( Unity Catalog 管理テーブルや外部テーブルなど) に書き込みます。
  • Apache Kafkaトピックなど、Databricks外部のシンクへのリバースETL(抽出、変換、ロード)を行います。このアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。

LakeFlow 宣言型パイプラインシンクの使用方法

イベント データがストリーミング ソース から LakeFlow 宣言型パイプラインに取り込まれると、LakeFlow宣言型パイプラインの機能を使用してこのデータを処理および調整し、追加フロー処理を使用して、変換されたデータ レコードを LakeFlow 宣言型パイプライン シンクにストリームします。 このシンクは、 create_sink() 関数を使用して作成します。create_sink 関数の詳細については、シンク API リファレンスを参照してください。

ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備するパイプラインがある場合は、 LakeFlow 宣言型パイプライン シンクを使用する準備ができています。

LakeFlow宣言型パイプラインシンクの実装は、次の 2 つの手順で構成されます。

  1. LakeFlow 宣言型パイプライン シンクを作成します。
  2. 追加フローを使用して、準備されたレコードをシンクに書き込みます。

LakeFlow 宣言型パイプライン シンクを作成する

Databricks では、ストリーム データから処理されたレコードを書き込む 3 種類の宛先シンクがサポートされています。

  • Delta テーブル シンク (Unity Catalog のマネージド テーブルと外部テーブルを含む)
  • Apache Kafka シンク
  • Azure Event Hubs シンク

Delta、Kafka、Azure Event Hubs シンクの構成例を次に示します。

ファイルパスで Delta シンクを作成するには:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

完全修飾カタログとスキーマパスを使用してテーブル名で Delta シンクを作成するには:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

create_sink 関数の使用の詳細については、シンク API リファレンスを参照してください。

シンクが作成されたら、処理されたレコードをシンクにストリーミングできます。

追加フローを使用して LakeFlow 宣言型パイプライン シンクに書き込む

シンクを作成したら、次のステップは、追加フローによって出力されるレコードのターゲットとして指定することにより、処理されたレコードをシンクに書き込むことです。これを行うには、シンクを append_flow デコレータのtarget値として指定します。

  • Unity Catalog の管理テーブルと外部テーブルの場合は、 delta 形式を使用し、オプションでパスまたはテーブル名を指定します。LakeFlow 宣言型パイプラインは、Unity Catalogを使用するように設定する必要があります。
  • Apache Kafka トピックの場合は、 kafka の形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。 これらは、 Spark 構造化ストリーミング Kafka シンクがサポートするオプションと同じです。 Kafka構造化ストリーミング ライターの構成を参照してください。
  • Azure Event Hubs の場合は、 kafka 形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 Microsoft Entra IDとAzure Event Hubsを使用したサービスプリンシパル認証を参照してください。

次の例は、LakeFlow宣言型パイプラインによって処理されたレコードを使用して、Delta 、Kafka、およびAzure Event Hubs シンクに書き込むようにフローを設定する方法を示しています。

Python
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

append_flowデコレーターの詳細については、「複数のフローを使用して 1 つのターゲットに書き込む」を参照してください。

制限

  • Python API のみがサポートされています。 SQL はサポートされていません。

  • ストリーミング クエリのみがサポートされています。バッチ クエリはサポートされていません。

  • シンクへの書き込みに使用できるのは append_flow のみです。create_auto_cdc_flowなどの他のフローはサポートされておらず、LakeFlow 宣言型パイプライン データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • Delta シンクの場合、テーブル名は完全修飾である必要があります。 具体的には、 Unity Catalog 管理外部テーブルの場合、テーブル名は <catalog>.<schema>.<table> の形式である必要があります。 Hive metastoreの場合、<schema>.<table>形式である必要があります。

  • 完全更新を実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

  • LakeFlow 宣言型パイプラインの期待値はサポートされていません。

リソース