Move tables between Lakeflow Declarative Pipelines
This article describes how to move streaming tables and materialized views between pipelines. After the move, the pipeline you move the flow to updates the table, rather than the original pipeline. This is useful in many scenarios, including:
- Split a large pipeline into smaller ones.
- Merge multiple pipelines in a single larger one.
- Change the refresh frequency for some tables in a pipeline.
- Move tables from a pipeline that uses the legacy publishing mode to the default publishing mode. For details about the legacy publishing mode, see Legacy publishing mode for pipelines. To see how you can migrate the publishing mode for an entire pipeline at once, see Enable the default publishing mode in a pipeline.
Requirements
The following are requirements for moving a table between pipelines.
-
You must use Databricks Runtime 16.3 or above when running the
ALTER ...
command. -
Both source and destination pipelines must be:
- In the same workspace
- Owned by the Databricks user account or service principal running the operation
-
The destination pipeline must use the default publishing mode. This enables you to publish tables to multiple catalogs and schemas.
Alternately, both pipelines must use the legacy publishing mode and both must have the same catalog and target value in settings. For information about the legacy publishing mode, see LIVE schema (legacy).
This feature does not support moving a pipeline using the default publishing mode to a pipeline using the legacy publishing mode.
Move a table between pipelines
The following instructions describe how to move a streaming table or materialized view from one pipeline to another.
-
Stop the source pipeline if it is running. Wait for it to completely stop.
-
Remove the table’s definition from the source pipeline’s notebook or file and store it somewhere for future reference.
Include any supporting queries or code that is needed for the pipeline to run correctly.
-
From a notebook or a SQL editor, run the following SQL command to reassign the table from the source pipeline to the destination pipeline:
SQLALTER [MATERIALIZED VIEW | STREAMING TABLE | TABLE] <table-name>
SET TBLPROPERTIES("pipelines.pipelineId"="<destination-pipeline-id>");The command uses
ALTER MATERIALIZED VIEW
andALTER STREAMING TABLE
for Unity Catalog managed materialized views and streaming tables, respectively. To perform the same action on an Hive metastore table, useALTER TABLE
.For example, if you want to move a streaming table named
sales
to a pipeline with the IDabcd1234-ef56-ab78-cd90-1234efab5678
, you would run the following command:SQLALTER STREAMING TABLE sales
SET TBLPROPERTIES("pipelines.pipelineId"="abcd1234-ef56-ab78-cd90-1234efab5678");noteThe
pipelineId
must be a valid pipeline identifier. Thenull
value is not permitted. -
Add the table’s definition to the destination pipeline’s notebook/file.
noteIf the catalog or target schema differ between the source and destination, copying the query exactly might not work. Partially qualified tables in the definition can resolve differently. You might need to update the definition while moving.
The move is complete. You can now run both the source and destination pipelines. The destination pipeline updates the table.
Troubleshooting
The following table describes errors that could happen when moving a table between pipelines.
Error | Description |
---|---|
| The source pipeline is in the default publish mode, and the destination uses the LIVE schema (legacy) mode. This is not supported. If the source uses the default publishing mode, then the destination must, as well. |
| Only moving tables between Lakeflow Declarative Pipelines is supported. Pipelines for streaming tables and materialized views created with Databricks SQL are not supported. |
| The |
Table fails to update in the destination after the move. | To quickly mitigate in this case, move the table back to the source pipeline following the same instructions. |
| Both the source and destination pipelines must be owned by the user performing the move operation. |
| The table listed in the error message already exists. This can happen if a backing table for the pipeline already exists. In this case, |
Example with multiple tables in a pipeline
Pipelines can contain more than one table. You can still move one table at a time between pipelines. In this scenario, there are three tables (table_a
, table_b
, table_c
) that read from each other sequentially in the source pipeline. We want to move one table, table_b
, to another pipeline.
Initial source pipeline code:
import dlt
from pyspark.sql.functions import col
@dlt.table
def table_a():
return spark.read.table("source_table")
# Table to be moved to new pipeline:
@dlt.table
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
@dlt.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
We move table_b
to another pipeline by copying and removing the table definition from the source and updating table_b
’s pipelineId.
First, pause any schedules and wait for updates to complete on both the source and target pipelines. Then modify the source pipeline to remove code for the table being moved. The updated source pipeline example code becomes:
import dlt
from pyspark.sql.functions import col
@dlt.table
def table_a():
return spark.read.table("source_table")
# Removed, to be in new pipeline:
# @dlt.table
# def table_b():
# return (
# spark.read.table("table_a")
# .select(col("column1"), col("column2"))
# )
@dlt.table
def table_c():
return (
spark.read.table("table_b")
.groupBy(col("column1"))
.agg(sum("column2").alias("sum_column2"))
)
Go to the SQL editor to run the ALTER pipelineId
command.
ALTER MATERIALIZED VIEW table_b
SET TBLPROPERTIES("pipelines.pipelineId"="<new-pipeline-id>");
Next, go to the destination pipeline and add the definition of table_b
. If the default catalog and schema are the same in the pipeline settings, no code changes are required.
The target pipeline code:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="table_b")
def table_b():
return (
spark.read.table("table_a")
.select(col("column1"), col("column2"))
)
If the default catalog and schema differ in the pipeline settings, you must add in the fully qualified name using the pipeline’s catalog and schema.
For example, the target pipeline code could be:
import dlt
from pyspark.sql.functions import col
@dlt.table(name="source_catalog.source_schema.table_b")
def table_b():
return (
spark.read.table("source_catalog.source_schema.table_a")
.select(col("column1"), col("column2"))
)
Run (or re-enable schedules) for both the source and target pipelines.
The pipelines are now disjoint. The query for table_c
reads from table_b
(now in the target pipeline) and table_b
reads from table_a
(in the source pipeline). When you do a triggered execution on the source pipeline table_b
is not updated because it is no longer managed by the source pipeline. The source pipeline treats table_b
as a table external to the pipeline. This is comparable to defining a materialized view reading from a Delta table in Unity Catalog that is not managed by the pipeline.
Limitations
The following are limitations for moving tables between pipelines.
- Materialized views and streaming tables created with Databricks SQL are not supported.
- Private tables or views are not supported.
- The source and destination pipelines must be pipelines. Null pipelines are not supported.
- Both source and destination pipelines must be in the same workspace.
- Both source and destination pipelines must be owned by the user running the move operation.
- If the source pipeline uses the default publishing mode, the destination pipeline must also be using the default publishing mode. You can't move a table from a pipeline using the default publishing mode to a pipeline that uses the LIVE schema (legacy). See LIVE schema (legacy).
- If the source and destination pipelines are both using the LIVE schema (legacy), then they must have the same
catalog
andtarget
values in settings.