Skip to main content

create_sink

Preview

The DLT create_sink API is in Public Preview.

The create_sink() function writes to an event streaming service such as Apache Kafka or Azure Event Hubs or to a Delta table from a DLT pipeline. After creating a sink with the create_sink() function, you use the sink in an append flow to write data into the sink. append flow is the only flow type supported with the create_sink() function. Other flow types, such as apply_changes, are not supported.

The Delta sink supports Unity Catalog external and managed tables and Hive metastore managed tables. Table names must be fully qualified. For example, Unity Catalog tables must use a three-tier identifier: <catalog>.<schema>.<table>. Hive metastore tables must use <schema>.<table>.

note
  • Running a full refresh update does not clear data from sinks. Any reprocessed data will be appended to the sink, and existing data will not be altered.
  • DLT expectations are not supported with the sink API.

Syntax

Python
import dlt

dlt.create_sink(name=<sink_name>, format=<format>, options=<options>)

Parameters

Parameter

Type

Description

name

str

Required. A string that identifies the sink and is used to reference and manage the sink. Sink names must be unique to the pipeline, including across all source code such as notebooks or modules that are part of the pipeline.

format

str

Required. A string that defines the output format, either kafka or delta.

options

dict

A list of sink options, formatted as {"key": "value"}, where the key and value are both strings. All Databricks Runtime options supported by the Kafka and Delta sinks are supported.

Examples

Python
import dlt

# Create a Kafka sink
dlt.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)

# Create an external Delta table sink with a file path
dlt.create_sink(
"my_delta_sink",
"delta",
{ "path": "/path/to/my/delta/table" }
)

# Create a Delta table sink using a table name
dlt.create_sink(
"my_delta_sink",
"delta",
{ "tableName": "my_catalog.my_schema.my_table" }
)