Use sinks to stream records to external services with DLT
The DLT sink
API is in Public Preview.
This article describes the DLT sink
API and how to use it with flows to write records transformed by a pipeline to an external data sink. External data sinks include Unity Catalog managed and external tables, and event streaming services such as Apache Kafka or Azure Event Hubs.
The DLT sink
API is only available for Python.
What are DLT sinks?
DLT sinks are targets for DLT flows. By default DLT flows emit data to either a streaming table or materialized view target. These are both Databricks managed Delta tables. DLT sinks are an alternative target that you use to write transformed data to targets such as event streaming services like Apache Kafka or Azure Event Hubs, and external tables managed by Unity Catalog. Using sinks, you now have more options for persisting the output of your DLT pipelines.
When should I use DLT sinks?
Databricks recommends using DLT sinks if you need to:
- Build out an operational use case like fraud detection, real-time analytics, and customer recommendations. Operational use cases typically read data from a message bus, such as an Apache Kafka topic, and then process data with low latency and write the processed records back to a message bus. This approach enables you to achieve lower latency by not writing or reading from cloud storage.
- Write transformed data from your DLT flows to tables managed by an external Delta instance, including Unity Catalog managed and external tables.
- Perform reverse extract-transform-load (ETL) into sinks external to Databricks, such as Apache Kafka topics. This approach enables you to effectively support use cases where data needs to be read or used outside of Unity Catalog tables or other Databricks-managed storage.
How do I use DLT sinks?
As event data is ingested from a streaming source into your DLT pipeline, you process and refine this data using DLT functionality and then use append flow processing to stream the transformed data records to a DLT sink. You create this sink using the create_sink()
function. For more details about the create_sink
function, see the sink API reference.
If you have a DLT pipeline that creates or processes your streaming event data and prepares data records for writing, then you are ready to use a DLT sink.
Implementing a DLT sink consists of two steps:
- Create the DLT sink.
- Use an append flow to write the prepared records to the sink.
Create a DLT sink
Databricks supports three types of destination sinks into which you write your records processed from your stream data:
- Delta table sinks (including Unity Catalog managed and external tables)
- Apache Kafka sinks
- Azure Event Hubs sinks
Below are examples of configurations for Delta, Kafka, and Azure Event Hubs sinks:
- Delta sinks
- Kafka and Azure Event Hubs sinks
To create a Delta sink by file path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
To create a Delta sink by table name using a fully qualified catalog and schema path:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)
This code works for both Apache Kafka and Azure Event Hubs sinks.
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.windows.net:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
For more details on using the create_sink
function, see the sink API reference.
After your sink is created, you can begin streaming processed records to the sink.
Write to a DLT sink with an append flow
With your sink created, the next step is to write processed records to it by specifying it as the target for records output by an append flow. You do this by specifying your sink as the target
value in the append_flow
decorator.
- For Unity Catalog managed and external tables, use the format
delta
and specify the path or table name in options. Your DLT pipelines must be configured to use Unity Catalog. - For Apache Kafka topics, use the format
kafka
and specify the topic name, connection information, and authentication information in the options. These are the same options a Spark Structured Streaming Kafka sink supports. See Configure the Kafka Structured Streaming writer. - For Azure Event Hubs, use the format
kafka
and specify the Event Hubs name, connection information, and authentication information in the options. These are the same options supported in a Spark Structured Streaming Event Hubs sink that uses the Kafka interface. See Service Principal authentication with Microsoft Entra ID and Azure Event Hubs.
Below are examples of how to set up flows to write to Delta, Kafka, and Azure Event Hubs sinks with records processed by your DLT pipeline.
- Delta sink
- Kafka and Azure Event Hubs sinks
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
The value
parameter is mandatory for an Azure Event Hubs sink. Additional parameters such as key
, partition
, headers
, and topic
are optional.
For more details on the append_flow
decorator, see Using multiple flows to write to a single target.
Limitations
-
Only the Python API is supported. SQL is not supported.
-
Only streaming queries are supported. Batch queries are not supported.
-
Only
append_flow
can be used to write to sinks. Other flows, such asapply_changes
, are not supported, and you cannot use a sink in a DLT dataset definition. For example, the following is not supported:Python@table("from_sink_table")
def fromSink():
return read_stream("my_sink") -
For Delta sinks, the table name must be fully qualified. Specifically, for Unity Catalog managed external tables, the table name must be of the form
<catalog>.<schema>.<table>
. For the Hive metastore, it must be in the form<schema>.<table>
. -
Running a full refresh update does not clean up previously computed results data in the sinks. This means that any reprocessed data is be appended to the sink, and existing data is not be altered.
-
DLT expectations are not supported.