メインコンテンツまでスキップ

create_sink

備考

プレビュー

DLT create_sink API は パブリック プレビュー段階です。

create_sink() 関数は、Apache Kafka や Azure Event Hubs などのイベント ストリーミング サービス、または DLT パイプラインから Delta テーブルに書き込みます。create_sink() 関数を使用してシンクを作成した後、追加フローでシンクを使用してデータをシンクに書き込みます。追加フローは、 create_sink() 関数でサポートされている唯一のフロー タイプです。その他のフロー タイプ( apply_changesなど)はサポートされていません。

Deltaシンクは、Unity Catalog外部およびマネージドテーブルとHive metastoreマネージドテーブルをサポートします。テーブル名は完全修飾名である必要があります。たとえば、 Unity Catalog テーブルでは、 <catalog>.<schema>.<table>の 3 層識別子を使用する必要があります。 Hive metastore テーブルでは <schema>.<table>.

注記
  • 完全な更新更新を実行しても、シンクからデータはクリアされません。再処理されたデータはシンクに追加され、既存のデータは変更されません。
  • DLT のエクスペクテーションは、 sink API ではサポートされていません。

構文

Python
import dlt

dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)

パラメーター

パラメーター

タイプ

説明

name

str

必須。シンクを識別し、シンクの参照と管理に使用される文字列。シンク名は、パイプラインの一部であるノートブックやモジュールなど、すべてのソース コードを含め、パイプラインに対して一意である必要があります。

format

str

必須。出力形式 ( kafka または delta) を定義する文字列。

options

dict

シンクオプションのリスト。 {"key": "value"}形式で、キーと値は両方とも文字列です。Kafka シンクと Delta シンクでサポートされているすべての Databricks Runtime オプションがサポートされています。

Python
import dlt

# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Create an external Delta table sink with a file path
dlt.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dlt.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)