Skip to main content

Use sinks to stream records to external services with DLT

Preview

The DLT sink API is in Public Preview.

This article describes the DLT sink API and how to use it with flows to write records transformed by a pipeline to an external data sink. External data sinks include Unity Catalog managed and external tables, and event streaming services such as Apache Kafka or Azure Event Hubs.

note

The DLT sink API is only available for Python.

What are DLT sinks?

DLT sinks are targets for DLT flows. By default DLT flows emit data to either a streaming table or materialized view target. These are both Databricks managed Delta tables. DLT sinks are an alternative target that you use to write transformed data to targets such as event streaming services like Apache Kafka or Azure Event Hubs, and external tables managed by Unity Catalog. Using sinks, you now have more options for persisting the output of your DLT pipelines.

When should I use DLT sinks?

Databricks recommends using DLT sinks if you need to:

  • Build out an operational use case like fraud detection, real-time analytics, and customer recommendations. Operational use cases typically read data from a message bus, such as an Apache Kafka topic, and then process data with low latency and write the processed records back to a message bus. This approach enables you to achieve lower latency by not writing or reading from cloud storage.
  • Write transformed data from your DLT flows to tables managed by an external Delta instance, including Unity Catalog managed and external tables.
  • Perform reverse extract-transform-load (ETL) into sinks external to Databricks, such as Apache Kafka topics. This approach enables you to effectively support use cases where data needs to be read or used outside of Unity Catalog tables or other Databricks-managed storage.

How do I use DLT sinks?

As event data is ingested from a streaming source into your DLT pipeline, you process and refine this data using DLT functionality and then use append flow processing to stream the transformed data records to a DLT sink. You create this sink using the create_sink() function. For more details about the create_sink function, see the sink API reference.

If you have a DLT pipeline that creates or processes your streaming event data and prepares data records for writing, then you are ready to use a DLT sink.

Implementing a DLT sink consists of two steps:

  1. Create the DLT sink.
  2. Use an append flow to write the prepared records to the sink.

Create a DLT sink

Databricks supports three types of destination sinks into which you write your records processed from your stream data:

  • Delta table sinks (including Unity Catalog managed and external tables)
  • Apache Kafka sinks
  • Azure Event Hubs sinks

Below are examples of configurations for Delta, Kafka, and Azure Event Hubs sinks:

To create a Delta sink by file path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

To create a Delta sink by table name using a fully qualified catalog and schema path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

For more details on using the create_sink function, see the sink API reference.

After your sink is created, you can begin streaming processed records to the sink.

Write to a DLT sink with an append flow

With your sink created, the next step is to write processed records to it by specifying it as the target for records output by an append flow. You do this by specifying your sink as the target value in the append_flow decorator.

  • For Unity Catalog managed and external tables, use the format delta and specify the path or table name in options. Your DLT pipelines must be configured to use Unity Catalog.
  • For Apache Kafka topics, use the format kafka and specify the topic name, connection information, and authentication information in the options. These are the same options a Spark Structured Streaming Kafka sink supports. See Configure the Kafka Structured Streaming writer.
  • For Azure Event Hubs, use the format kafka and specify the Event Hubs name, connection information, and authentication information in the options. These are the same options supported in a Spark Structured Streaming Event Hubs sink that uses the Kafka interface. See Service Principal authentication with Microsoft Entra ID and Azure Event Hubs.

Below are examples of how to set up flows to write to Delta, Kafka, and Azure Event Hubs sinks with records processed by your DLT pipeline.

Python
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

For more details on the append_flow decorator, see Using multiple flows to write to a single target.

Limitations

  • Only the Python API is supported. SQL is not supported.

  • Only streaming queries are supported. Batch queries are not supported.

  • Only append_flow can be used to write to sinks. Other flows, such as apply_changes, are not supported, and you cannot use a sink in a DLT dataset definition. For example, the following is not supported:

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • For Delta sinks, the table name must be fully qualified. Specifically, for Unity Catalog managed external tables, the table name must be of the form <catalog>.<schema>.<table>. For the Hive metastore, it must be in the form <schema>.<table>.

  • Running a full refresh update does not clean up previously computed results data in the sinks. This means that any reprocessed data is be appended to the sink, and existing data is not be altered.

  • DLT expectations are not supported.

Resources