Skip to main content

Use sinks in pipelines

Use the Lakeflow Spark Declarative Pipelines sink API with flows to write records transformed by a pipeline to an external data sink. External data sinks include Unity Catalog managed and external tables, and event streaming services such as Apache Kafka or Azure Event Hubs. You can also use data sinks to write to custom data sources by writing Python code for that data source.

For an overview of sink concepts and when to use them, see Sinks in Lakeflow Spark Declarative Pipelines.

note

Sink workflow

As event data is ingested from a streaming source into your pipeline, you process and refine this data in transformations in your pipeline. You then use append flow processing to stream the transformed data records to a sink. You create this sink using the create_sink() function. For more details about the create_sink function, see the sink API reference.

If you have a pipeline that creates or processes your streaming event data and prepares data records for writing, then you are ready to use a sink.

Implementing a sink consists of two steps:

  1. Create the sink.
  2. Use an append flow or update flow to write the prepared records to the sink.

Create a sink

Databricks supports several types of destination sinks into which you write your records processed from your stream data:

  • Delta table sinks (including Unity Catalog managed and external tables)
  • Apache Kafka sinks
  • Azure Event Hubs sinks
  • Custom sinks written in Python, using the Python custom data sources

Below are examples of configurations for Delta, Kafka and Azure Event Hubs sinks, and Python custom data sources:

To create a Delta sink by file path:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

To create a Delta sink by table name using a fully qualified catalog and schema path:

Python
dp.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "catalog_name.schema_name.table_name" }
)

For more details on using the create_sink function, see the sink API reference.

After your sink is created, you can begin streaming processed records to the sink.

Write to a sink with an append flow

With your sink created, the next step is to write processed records to it by specifying it as the target for records output by an append flow. You do this by specifying your sink as the target value in the append_flow decorator.

  • For Unity Catalog managed and external tables, use the format delta and specify the path or table name in options. Your pipeline must be configured to use Unity Catalog.
  • For Apache Kafka topics, use the format kafka and specify the topic name, connection information, and authentication information in the options. These are the same options a Spark Structured Streaming Kafka sink supports. See Configure the Kafka Structured Streaming writer.
  • For Azure Event Hubs, use the format kafka and specify the Event Hubs name, connection information, and authentication information in the options. These are the same options supported in a Spark Structured Streaming Event Hubs sink that uses the Kafka interface. See Authentication.

Below are examples of how to set up flows to write to Delta, Kafka, and Azure Event Hubs sinks with records processed by your pipeline.

Python
@dp.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

For more details on the append_flow decorator, see Default flows and append flows.

Limitations

  • Only the Python API is supported. SQL is not supported.

  • Only streaming queries are supported. Batch queries are not supported.

  • Only append_flow and update_flow can be used to write to sinks. Other flows, such as create_auto_cdc_flow, are not supported, and you cannot use a sink in a pipeline dataset definition. For example, the following is not supported:

    Python
    @table("from_sink_table")
    def fromSink():
    return read_stream("my_sink")
  • For Delta sinks, the table name must be fully qualified. Specifically, for Unity Catalog managed external tables, the table name must be of the form <catalog>.<schema>.<table>. For the Hive metastore, it must be in the form <schema>.<table>.

  • Running a full refresh update does not clean up previously computed results data in the sinks. This means that any reprocessed data is appended to the sink, and existing data is not altered.

  • Pipeline expectations are not supported.

  • Serverless egress control supports only Kafka and Delta Lake sink connectors. See What is serverless egress control?.

Additional resources