シンクを作成する
プレビュー
Lakeflow 宣言型パイプライン create_sink API はパブリック プレビュー段階です。
create_sink()関数は、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービス、または宣言型パイプラインから Delta テーブルに書き込みます。create_sink()関数を使用してシンクを作成した後、追加フローでシンクを使用してシンクにデータを書き込みます。追加フローは、 create_sink()関数でサポートされる唯一のフロー タイプです。create_auto_cdc_flowなどの他のフロー タイプはサポートされていません。
DeltaシンクはUnity Catalog外部、マネージドテーブル、およびHive metastoreマネージドテーブルをサポートします。 テーブル名は完全修飾されている必要があります。たとえば、 Unity Catalogテーブルでは 3 層識別子<catalog>.<schema>.<table>使用する必要があります。 Hive metastoreテーブルでは<schema>.<table>使用する必要があります。
- 完全更新を実行しても、シンクからデータは消去されません。再処理されたデータはシンクに追加され、既存のデータは変更されません。
- Lakeflow 宣言型パイプラインの期待値は、 sinkAPIではサポートされていません。
構文
from pyspark import pipelines as dp
dp.create_sink(name=<sink_name>, format=<format>, options=<options>)
問題
| パラメーター | Type | 説明 | 
|---|---|---|
| 
 | 
 | 必須。シンクを識別し、シンクの参照と管理に使用される文字列。シンク名は、パイプラインの一部であるすべてのソース コード ファイルを含め、パイプラインに対して一意である必要があります。 | 
| 
 | 
 | 必須。出力形式を定義する文字列(  | 
| 
 | 
 | シンク オプションのリスト。  
 | 
例
from pyspark import pipelines as dp
# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)
# Create an external Delta table sink with a file path
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dp.create_sink(
  "my_delta_sink",
    "delta",
    { "tableName": "my_catalog.my_schema.my_table" }
)