Skip to main content

Examples of flows in Lakeflow Declarative Pipelines

Example: Write to a streaming table from multiple Kafka topics

The following examples creates a streaming table named kafka_target and writes to that streaming table from two Kafka topics:

Python
import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

To learn more about the read_kafka() table-valued function used in the SQL queries, see read_kafka in the SQL language reference.

In Python, you can programmatically create multiple flows that target a single table. The following example shows this pattern for a list of Kafka topics.

note

This pattern has the same requirements as using a for loop to create tables. You must explicitly pass a Python value to the function defining the flow. See Create tables in a for loop.

Python
import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

Example: Run a one-time data backfill

If you want to run a query to append data to an existing streaming table, use append_flow.

After appending a set of existing data, you have multiple options:

  • If you want the query to append new data if it arrives in the backfill directory, leave the query in place.
  • If you want this to be a one time backfill, and never run again, remove the query after running the pipeline once.
  • If you want the query to run once, and only run again in cases where the data is being fully refreshed, set the once parameter to True on the append flow. In SQL, use INSERT INTO ONCE.

The following examples run a query to append historical data to a streaming table:

Python
import dlt

@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dlt.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

For a more in-depth example, see Backfilling historical data with Lakeflow Declarative Pipelines.

Example: Use append flow processing instead of UNION

Instead of using a query with a UNION clause, you can use append flow queries to combine multiple sources and write to a single streaming table. Using append flow queries instead of UNION allows you to append to a streaming table from multiple sources without running a full refresh.

The following Python example includes a query that combines multiple data sources with a UNION clause:

Python
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)

raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)

return raw_orders_us.union(raw_orders_eu)

The following examples replace the UNION query with append flow queries:

Python
dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")