Lakeflow Spark宣言型パイプラインのシンク
デフォルトでは、パイプラインのフローは、Unity Catalog によって管理される Delta テーブル(通常、ストリーミングテーブルまたはマテリアライズドビュー)に結果を書き込みます。シンクは、イベントストリーミングサービスやカスタムデータストアなど、Databricks が管理するストレージの外部にある宛先に、変換されたデータを書き込むために使用する代替ターゲットです。
シンクは append フローとともに使用されます。シンクAPIのいずれかを使用してシンクを定義し、次にappend_flow定義内のtargetとして参照します。
シンクを使うタイミング
Databricksでは、次の要件がある場合にシンクを使用することをお勧めします:
- Build operational use cases with low latency, such as fraud detection, real-time analytics, or customer recommendations, where data must flow to a message bus rather than cloud storage. For workloads that require millisecond latency, see Use real-time mode in Lakeflow Spark Declarative Pipelines.
- 変換されたデータを、外部Deltaインスタンスによって管理されるテーブル (Unity Catalog マネージドテーブルや外部テーブルなど) に書き込みます。
- Databricks の外部での利用のために処理済みデータを Apache Kafka トピックに書き戻すといった外部システムへのリバース ETL を行います。
- Databricksでネイティブにサポートされていない形式に、Pythonカスタムデータソースを使用して書き込みます。
シンクのタイプ
パイプラインは次のシンクの種類をサポートしています。
シンクタイプ | 説明 |
|---|---|
Delta テーブル シンク | Unity Catalog の管理型または外部 Delta テーブルに書き込むことができます。ファイルパス、または完全修飾テーブル名を指定してください。 |
Apache Kafka シンク | パイプラインランタイムに含まれる Kafka コネクタを使用して、Apache Kafka トピックに書き込みます。 |
Azure Event Hubs シンク | Kafka インターフェースを使用した Azure Event Hubs への書き込みKafka シンクと同じオプションを使用します。 |
Pythonカスタムシンク | Pythonカスタムデータソースを使用して、 |
ForEachBatch シンク | ストリーミングデータの各マイクロバッチにカスタムPythonロジックを適用できます。複数の宛先へ書き込む必要がある場合、アップサートを実行する場合、またはストリーミング書き込みをネイティブでサポートしていないターゲットを使用する場合に利用します。 |
シンク APIs
パイプラインは、シンクを作成するための2つのAPIsを提供します。
create_sink(): サポートされている種類(Delta、Kafka、AEH、またはPythonカスタムデータソース)の名前付きシンクを作成します。Pythonのみで利用可能です。パイプラインでのシンクの使用を参照してください。foreach_batch_sink():ストリーミングデータの各マイクロバッチで実行されるPython関数を装飾します。カスタムロジックの記述に最大限の柔軟性を提供します。「ForEachBatch を使用してパイプライン内の任意のデータシンクに書き込む」を参照してください。
両方のシンクタイプは、append_flow の target として参照されます。
制限事項:
- シンクはPythonでのみ使用できます。SQL はサポートされていません。
- ストリーミングクエリのみがサポートされています。バッチクエリはサポートされていません。
- Only
append_flowcan write to sinks;create_auto_cdc_flowand other flow types are not supported. - Pipeline expectations are not supported for sinks.
- フル更新を実行しても、シンク内の以前に書き込まれたデータはクリーンアップされません。