メインコンテンツまでスキップ

Lakeflow 宣言型パイプラインとシンクによる外部サービスへのレコードのストリーミング

備考

プレビュー

Lakeflow 宣言型パイプライン sink API はパブリック プレビュー段階です。

この記事では、 Lakeflow 宣言型パイプライン sink API と、 それをフロー と共に使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。 外部データ シンクには、Unity Catalog のマネージド テーブルと外部テーブル、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。

注記

Lakeflow 宣言型パイプライン sink APIは、Pythonでのみ使用できます。

Lakeflow 宣言型パイプラインシンクとは

Lakeflow 宣言型パイプライン シンクは、 Lakeflow 宣言型パイプライン フローのターゲットです。 By デフォルト Lakeflow 宣言型パイプライン フローは、ストリーミング テーブルまたはマテリアライズドビュー ターゲットにデータを出力します。 これらは両方とも Databricks マネージド Delta テーブルです。Lakeflow 宣言型パイプライン シンクは、 Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、 Unity Catalogによって管理される外部テーブルなどのターゲットに変換されたデータを書き込むために使用する代替ターゲットです。 シンクを使用すると、 Lakeflow 宣言型パイプラインの出力を保持するためのオプションが増えました。

Lakeflow 宣言型パイプラインシンクはいつ使用する必要がありますか?

Databricksでは、次の要件がある場合にLakeflow宣言型パイプライン シンクを使用することをお勧めします。

  • 不正行為の検出、トレンド分析、顧客への推奨事項などの運用上のユースケースを構築します。 運用ユースケースでは通常、Apache Kafka トピックなどのメッセージ バスからデータを読み取り、低レイテンシでデータを処理し、処理されたレコードをメッセージ バスに書き戻します。このアプローチにより、クラウド ストレージへの書き込みや読み取りを行わないため、レイテンシを短縮できます。
  • 変換されたデータを Lakeflow 宣言型パイプライン フローから、外部 Delta インスタンスによって管理されるテーブル ( Unity Catalog 管理テーブルや外部テーブルなど) に書き込みます。
  • Apache Kafkaトピックなど、Databricks外部のシンクへのリバースETL(抽出、変換、ロード)を行います。このアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。
  • Databricks で直接サポートされていないデータ形式に書き込む必要があります。Pythonカスタム データソースを使用すると、カスタムPythonコードを使用して任意のデータソースに書き込むシンクを作成できます。 PySparkカスタム データ ソース」を参照してください。

Lakeflow 宣言型パイプラインシンクの使用方法

イベント データがストリーミング ソース から Lakeflow 宣言型パイプラインに取り込まれると、Lakeflow宣言型パイプラインの機能を使用してこのデータを処理および調整し、追加フロー処理を使用して、変換されたデータ レコードを Lakeflow 宣言型パイプライン シンクにストリームします。 このシンクは、 create_sink() 関数を使用して作成します。create_sink 関数の詳細については、シンク API リファレンスを参照してください。

ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備するパイプラインがある場合は、 Lakeflow 宣言型パイプライン シンクを使用する準備ができています。

Lakeflow宣言型パイプラインシンクの実装は、次の 2 つの手順で構成されます。

  1. Lakeflow 宣言型パイプライン シンクを作成します。
  2. 追加フローを使用して、準備されたレコードをシンクに書き込みます。

Lakeflow 宣言型パイプライン シンクを作成する

Databricks は、ストリーム データから処理されたレコードを書き込むいくつかの種類の宛先シンクをサポートしています。

  • Deltaテーブルシンク ( Unity Catalog管理および外部テーブルを含む)
  • Apache Kafka シンク
  • Azure Event Hubs シンク
  • Pythonカスタムデータソースを使用してPythonで記述されたカスタムシンク

以下は、 Delta 、 Kafka 、 Azure Event Hubs シンク、およびPythonカスタム データ ソースの構成の例です。

ファイル パスで Delta シンクを作成するには:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

完全修飾カタログとスキーマ パスを使用してテーブル名で Delta シンクを作成するには:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

create_sink関数の使用の詳細については、シンク API リファレンスを参照してください。

シンクが作成されたら、処理されたレコードをシンクにストリーミングできるようになります。

追加フローを使用して Lakeflow 宣言型パイプライン シンクに書き込む

シンクを作成したら、次のステップでは、追加フローによって出力されるレコードのターゲットとして指定して、処理済みのレコードをシンクに書き込みます。これを行うには、 append_flowデコレータでシンクをtarget値として指定します。

  • Unity Catalog の管理テーブルと外部テーブルの場合は、 delta 形式を使用し、オプションでパスまたはテーブル名を指定します。Lakeflow 宣言型パイプラインは、Unity Catalogを使用するように設定する必要があります。
  • Apache Kafka トピックの場合は、 kafka形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。これらは、 Spark構造化ストリーミングKafkaシンクがサポートするオプションと同じです。 「Kafka 構造化ストリーミング ライターを構成する」を参照してください。
  • Azure Event Hubs の場合は、 kafka 形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 Microsoft Entra IDとAzure Event Hubsを使用したサービスプリンシパル認証を参照してください。

次の例は、Lakeflow宣言型パイプラインによって処理されたレコードを使用して、Delta 、Kafka、およびAzure Event Hubs シンクに書き込むようにフローを設定する方法を示しています。

Python
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

append_flowデコレータの詳細については、 「複数のフローを使用して単一のターゲットに書き込む」を参照してください。

制限事項

  • Python API のみがサポートされています。SQL はサポートされていません。

  • ストリーミング クエリのみがサポートされます。バッチクエリはサポートされていません。

  • シンクへの書き込みに使用できるのは append_flow のみです。create_auto_cdc_flowなどの他のフローはサポートされておらず、Lakeflow 宣言型パイプライン データセット定義でシンクを使用することはできません。たとえば、次のものはサポートされていません。

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • Delta シンクの場合、テーブル名は完全修飾されている必要があります。具体的には、 Unity Catalogで管理される外部テーブルの場合、テーブル名は<catalog>.<schema>.<table>形式にする必要があります。Hive Hive metastoreの場合、テーブル名は<schema>.<table>形式にする必要があります。

  • 完全な更新アップデートを実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。 つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

  • Lakeflow 宣言型パイプラインの期待値はサポートされていません。

リソース