create_sink
The DLT create_sink
API is in Public Preview.
The create_sink()
function writes to an event streaming service such as Apache Kafka or Azure Event Hubs or to a Delta table from a DLT pipeline. After creating a sink with the create_sink()
function, you use the sink in an append flow to write data into the sink. append flow is the only flow type supported with the create_sink()
function. Other flow types, such as apply_changes
, are not supported.
The Delta sink supports Unity Catalog external and managed tables and Hive metastore managed tables. Table names must be fully qualified. For example, Unity Catalog tables must use a three-tier identifier: <catalog>.<schema>.<table>
. Hive metastore tables must use <schema>.<table>
.
- Running a full refresh update does not clear data from sinks. Any reprocessed data will be appended to the sink, and existing data will not be altered.
- DLT expectations are not supported with the
sink
API.
Syntax
import dlt
dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)
Parameters
Parameter | Type | Description |
---|---|---|
|
| Required. A string that identifies the sink and is used to reference and manage the sink. Sink names must be unique to the pipeline, including across all source code such as notebooks or modules that are part of the pipeline. |
|
| Required. A string that defines the output format, either |
|
| A list of sink options, formatted as
|
Examples
import dlt
# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Create an external Delta table sink with a file path
dlt.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)
# Create a Delta table sink using a table name
dlt.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)