Skip to main content

Use sinks to stream records to external services with Lakeflow Declarative Pipelines

Preview

The Lakeflow Declarative Pipelines sink API is in Public Preview.

This article describes the Lakeflow Declarative Pipelines sink API and how to use it with flows to write records transformed by a pipeline to an external data sink. External data sinks include Unity Catalog managed and external tables, and event streaming services such as Apache Kafka or Azure Event Hubs.

note

The Lakeflow Declarative Pipelines sink API is only available for Python.

What are Lakeflow Declarative Pipelines sinks?

Lakeflow Declarative Pipelines sinks are targets for Lakeflow Declarative Pipelines flows. By default Lakeflow Declarative Pipelines flows emit data to either a streaming table or materialized view target. These are both Databricks managed Delta tables. Lakeflow Declarative Pipelines sinks are an alternative target that you use to write transformed data to targets such as event streaming services like Apache Kafka or Azure Event Hubs, and external tables managed by Unity Catalog. Using sinks, you now have more options for persisting the output of your Lakeflow Declarative Pipelines.

When should I use Lakeflow Declarative Pipelines sinks?

Databricks recommends using Lakeflow Declarative Pipelines sinks if you need to:

  • Build out an operational use case like fraud detection, real-time analytics, and customer recommendations. Operational use cases typically read data from a message bus, such as an Apache Kafka topic, and then process data with low latency and write the processed records back to a message bus. This approach enables you to achieve lower latency by not writing or reading from cloud storage.
  • Write transformed data from your Lakeflow Declarative Pipelines flows to tables managed by an external Delta instance, including Unity Catalog managed and external tables.
  • Perform reverse extract-transform-load (ETL) into sinks external to Databricks, such as Apache Kafka topics. This approach enables you to effectively support use cases where data needs to be read or used outside of Unity Catalog tables or other Databricks-managed storage.

How do I use Lakeflow Declarative Pipelines sinks?

As event data is ingested from a streaming source into your Lakeflow Declarative Pipelines, you process and refine this data using Lakeflow Declarative Pipelines functionality and then use append flow processing to stream the transformed data records to a Lakeflow Declarative Pipelines sink. You create this sink using the create_sink() function. For more details about the create_sink function, see the sink API reference.

If you have a pipeline that creates or processes your streaming event data and prepares data records for writing, then you are ready to use a Lakeflow Declarative Pipelines sink.

Implementing a Lakeflow Declarative Pipelines sink consists of two steps:

  1. Create the Lakeflow Declarative Pipelines sink.
  2. Use an append flow to write the prepared records to the sink.

Create a Lakeflow Declarative Pipelines sink

Databricks supports three types of destination sinks into which you write your records processed from your stream data:

  • Delta table sinks (including Unity Catalog managed and external tables)
  • Apache Kafka sinks
  • Azure Event Hubs sinks

Below are examples of configurations for Delta, Kafka, and Azure Event Hubs sinks:

To create a Delta sink by file path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

To create a Delta sink by table name using a fully qualified catalog and schema path:

Python
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

For more details on using the create_sink function, see the sink API reference.

After your sink is created, you can begin streaming processed records to the sink.

Write to a Lakeflow Declarative Pipelines sink with an append flow

With your sink created, the next step is to write processed records to it by specifying it as the target for records output by an append flow. You do this by specifying your sink as the target value in the append_flow decorator.

  • For Unity Catalog managed and external tables, use the format delta and specify the path or table name in options. Your Lakeflow Declarative Pipelines must be configured to use Unity Catalog.
  • For Apache Kafka topics, use the format kafka and specify the topic name, connection information, and authentication information in the options. These are the same options a Spark Structured Streaming Kafka sink supports. See Configure the Kafka Structured Streaming writer.
  • For Azure Event Hubs, use the format kafka and specify the Event Hubs name, connection information, and authentication information in the options. These are the same options supported in a Spark Structured Streaming Event Hubs sink that uses the Kafka interface. See Service Principal authentication with Microsoft Entra ID and Azure Event Hubs.

Below are examples of how to set up flows to write to Delta, Kafka, and Azure Event Hubs sinks with records processed by your Lakeflow Declarative Pipelines.

Python
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

For more details on the append_flow decorator, see Using multiple flows to write to a single target.

Limitations

  • Only the Python API is supported. SQL is not supported.

  • Only streaming queries are supported. Batch queries are not supported.

  • Only append_flow can be used to write to sinks. Other flows, such as create_auto_cdc_flow, are not supported, and you cannot use a sink in a Lakeflow Declarative Pipelines dataset definition. For example, the following is not supported:

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • For Delta sinks, the table name must be fully qualified. Specifically, for Unity Catalog managed external tables, the table name must be of the form <catalog>.<schema>.<table>. For the Hive metastore, it must be in the form <schema>.<table>.

  • Running a full refresh update does not clean up previously computed results data in the sinks. This means that any reprocessed data is be appended to the sink, and existing data is not be altered.

  • Lakeflow Declarative Pipelines expectations are not supported.

Resources