メインコンテンツまでスキップ

Lakeflow Spark宣言型パイプラインでシンクを使用して外部サービスにレコードをストリーミングする

備考

プレビュー

Lakeflow Spark宣言型パイプラインsink APIパブリック プレビュー段階です。

この記事では、 Lakeflow Spark宣言型パイプラインsink APIと、それをフローで使用して、パイプラインによって変換されたレコードを外部データ シンクに書き込む方法について説明します。 外部データ シンクには、Unity Catalog で管理されるテーブルと外部テーブル、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスが含まれます。

注記

sink API は Python でのみ使用できます。

シンクとは何ですか?

シンクはパイプライン内のフローのターゲットです。当然ながら、パイプライン フローはストリーミング テーブルまたはマテリアライズドビュー ターゲットのいずれかにデータを送信します。 これらは両方とも Databricks によって管理される Delta テーブルです。シンクは、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービスや、Unity Catalog によって管理される外部テーブルなどのターゲットに変換されたデータを書き込むために使用する代替ターゲットです。シンクを使用すると、パイプラインの出力を永続化するためのオプションが増えます。

シンクはいつ使用したらよいでしょうか?

Databricks では、次の場合にシンクを使用することをお勧めします。

  • 不正行為の検出、トレンド分析、顧客への推奨事項などの運用上のユースケースを構築します。 運用ユースケースでは通常、Apache Kafka トピックなどのメッセージ バスからデータを読み取り、低レイテンシでデータを処理し、処理されたレコードをメッセージ バスに書き戻します。このアプローチにより、クラウド ストレージへの書き込みや読み取りを行わないため、レイテンシを短縮できます。
  • フローから変換されたデータを、Unity Catalog 管理テーブルや外部テーブルなどの外部 Delta インスタンスによって管理されるテーブルに書き込みます。
  • Apache Kafkaトピックなど、Databricks外部のシンクへのリバースETL(抽出、変換、ロード)を行います。このアプローチにより、Unity Catalog テーブルやその他の Databricks で管理されるストレージの外部でデータを読み取ったり使用したりする必要があるユースケースを効果的にサポートできます。
  • Databricks で直接サポートされていないデータ形式に書き込む必要があります。Pythonカスタム データソースを使用すると、カスタムPythonコードを使用して任意のデータソースに書き込むシンクを作成できます。 PySparkカスタム データ ソース」を参照してください。

シンクはどのように使用すればいいですか?

イベント データがストリーミング ソースからパイプラインに取り込まれると、パイプライン機能を使用してこのデータを処理および調整し、追加フロー処理を使用して変換されたデータ レコードをシンクにストリーミングします。 このシンクはcreate_sink()関数を使用して作成します。create_sink関数の詳細については、シンク API リファレンスを参照してください。

ストリーミング イベント データを作成または処理し、書き込み用のデータ レコードを準備するパイプラインがある場合は、シンクを使用する準備ができています。

シンクの実装は、次の 2 つのステップで構成されます。

  1. シンクを作成します。
  2. 追加フローを使用して、準備されたレコードをシンクに書き込みます。

シンクを作成する

Databricks は、ストリーム データから処理されたレコードを書き込むいくつかの種類の宛先シンクをサポートしています。

  • Deltaテーブルシンク ( Unity Catalog管理および外部テーブルを含む)
  • Apache Kafka シンク
  • Azure Event Hubs シンク
  • Pythonカスタムデータソースを使用してPythonで記述されたカスタムシンク

以下は、 Delta 、 Kafka 、 Azure Event Hubs シンク、およびPythonカスタム データ ソースの構成の例です。

ファイル パスで Delta シンクを作成するには:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

完全修飾カタログとスキーマ パスを使用してテーブル名で Delta シンクを作成するには:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

create_sink関数の使用の詳細については、シンク API リファレンスを参照してください。

シンクが作成されたら、処理されたレコードをシンクにストリーミングできるようになります。

追加フローでシンクに書き込む

シンクを作成したら、次のステップでは、追加フローによって出力されるレコードのターゲットとして指定して、処理済みのレコードをシンクに書き込みます。これを行うには、 append_flowデコレータでシンクをtarget値として指定します。

  • Unity Catalog で管理されるテーブルと外部テーブルの場合は、 delta形式を使用し、オプションでパスまたはテーブル名を指定します。パイプラインは Unity Catalog を使用するように設定する必要があります。
  • Apache Kafka トピックの場合は、 kafka形式を使用し、オプションでトピック名、接続情報、認証情報を指定します。これらは、 Spark構造化ストリーミングKafkaシンクがサポートするオプションと同じです。 「Kafka 構造化ストリーミング ライターを構成する」を参照してください。
  • Azure Event Hubs の場合は、 kafka 形式を使用し、オプションで Event Hubs の名前、接続情報、認証情報を指定します。 これらは、Kafka インターフェイスを使用する Spark 構造化ストリーミング Event Hubs シンクでサポートされているものと同じです。 Microsoft Entra IDとAzure Event Hubsを使用したサービスプリンシパル認証を参照してください。

以下は、パイプラインによって処理されたレコードを Delta、Kafka、および Azure Event Hubs シンクに書き込むようにフローを設定する方法の例です。

Python
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

append_flowデコレータの詳細については、 「複数のフローを使用して単一のターゲットに書き込む」を参照してください。

制限事項

  • Python API のみがサポートされています。SQL はサポートされていません。

  • ストリーミング クエリのみがサポートされます。バッチクエリはサポートされていません。

  • シンクへの書き込みに使用できるのはappend_flowのみです。create_auto_cdc_flowなどの他のフローはサポートされておらず、 Lakeflow Spark宣言型パイプライン データセット定義でシンクを使用することはできません。 たとえば、以下はサポートされていません。

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • Delta シンクの場合、テーブル名は完全修飾されている必要があります。具体的には、 Unity Catalogで管理される外部テーブルの場合、テーブル名は<catalog>.<schema>.<table>形式にする必要があります。Hive Hive metastoreの場合、テーブル名は<schema>.<table>形式にする必要があります。

  • 完全な更新アップデートを実行しても、シンク内の以前のコンピュート結果データはクリーンアップされません。 つまり、再処理されたデータはシンクに追加され、既存のデータは変更されません。

  • パイプラインの期待値はサポートされていません。

リソース