Create a query-based ingestion pipeline
This feature is in Public Preview.
This page shows how to create a query-based ingestion pipeline in Lakeflow Connect.
Requirements
Before you create a query-based ingestion pipeline, you must first meet the following requirements:
- Unity Catalog is enabled for your Databricks workspace.
- Your serverless compute environment allows network connectivity to the source database. See Networking and Networking recommendations for Lakehouse Federation.
- For foreign connection ingestion: You have an existing connection to the source database or
CREATE CONNECTIONprivileges on the metastore. See Connect to managed ingestion sources. - For foreign catalog ingestion: You have an existing foreign catalog registered in Lakehouse Federation or the privileges to create one.
- You have
CREATEandUSE SCHEMAprivileges on the destination catalog and schema.
Option 1: Foreign connection ingestion
Use this approach when you have a connection that stores authentication credentials for the source database. Supported sources include Oracle, Teradata, SQL Server, MySQL, MariaDB, and PostgreSQL.
- Databricks UI
- Declarative Automation Bundles
The Databricks UI supports query-based pipeline creation. To deploy with classic compute (Beta), use Declarative Automation Bundles.
-
In the Databricks workspace sidebar, click Data Ingestion.
-
On the Add data page, under Databricks connectors, click your source (for example, Oracle or SQL Server). The ingestion wizard opens.
-
On the Ingestion pipeline page, enter a unique name for the pipeline.
-
For Destination catalog, select a Unity Catalog catalog to store the ingested data.
-
Select the Unity Catalog connection that stores the credentials required to access the source database.
If there's no existing connection, click Create connection and enter the connection details. You must have
CREATE CONNECTIONprivileges on the metastore. -
Click Create pipeline and continue.
-
On the Source page, select the schemas and tables to ingest.
-
For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example,
updated_atorrow_id). If you don't select a monotonically increasing cursor column, the connector performs a full load on each run. -
Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).
-
Click Next.
-
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don't want to use an existing schema, click Create schema. You must have
USE CATALOGandCREATE SCHEMAprivileges on the parent catalog. -
Click Save and continue.
-
(Optional) On the Settings page, click Create schedule and set the refresh frequency.
-
(Optional) Set email notifications for pipeline success or failure.
-
Click Save and run pipeline.
Deploy a query-based ingestion pipeline using Declarative Automation Bundles. Bundles contain YAML definitions of pipelines and jobs, are managed with the Databricks CLI, and can be deployed to multiple target workspaces. For more information, see What are Declarative Automation Bundles?.
-
Create a new bundle:
Bashdatabricks bundle init -
Add a pipeline definition file to the bundle (for example,
resources/query_based_pipeline.yml):YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_query_based:
name: query-based-ingestion-pipeline
ingestion_definition:
connection_name: <your-uc-connection-name>
objects:
- table:
source_catalog: <source-catalog>
source_schema: <source-schema>
source_table: <source-table>
table_configuration:
query_based_connector_config:
cursor_columns:
- updated_at
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog} -
Add a job definition file that controls the ingestion schedule (for example,
resources/query_based_job.yml):YAMLresources:
jobs:
query_based_job:
name: query_based_job
trigger:
periodic:
interval: 1
unit: HOURS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_query_based.id} -
Deploy the bundle:
Bashdatabricks bundle deploy
Option 2: Foreign catalog ingestion
Use this approach when you want to ingest from a foreign catalog registered in Lakehouse Federation. Foreign catalog ingestion supports all Lakehouse Federation data sources and supports deletion tracking.
- Databricks UI
- Declarative Automation Bundles
-
In the Databricks workspace sidebar, click Data Ingestion.
-
On the Add data page, under Databricks connectors, click your source. The ingestion wizard opens.
-
On the Ingestion pipeline page, enter a unique name for the pipeline.
-
For Destination catalog, select a Unity Catalog catalog to store the ingested data.
-
For Connection type, select Foreign catalog and choose the foreign catalog registered in Lakehouse Federation.
-
Click Create pipeline and continue.
-
On the Source page, select the schemas and tables to ingest.
-
For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example,
updated_atorrow_id). -
Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).
-
Click Next.
-
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don't want to use an existing schema, click Create schema. You must have
USE CATALOGandCREATE SCHEMAprivileges on the parent catalog. -
Click Save and continue.
-
(Optional) On the Settings page, click Create schedule and set the refresh frequency.
-
(Optional) Set email notifications for pipeline success or failure.
-
Click Save and run pipeline.
-
Create a new bundle:
Bashdatabricks bundle init -
Add a pipeline definition file to the bundle (for example,
resources/foreign_catalog_pipeline.yml):YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_foreign_catalog:
name: foreign-catalog-ingestion-pipeline
ingestion_definition:
ingest_from_uc_foreign_catalog: true
objects:
- table:
source_catalog: <foreign-catalog-name>
source_schema: <source-schema>
source_table: <source-table>
cursor_columns:
- updated_at
primary_keys:
- id
deletion_condition: 'deleted_at IS NOT NULL'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog} -
Add a job definition file (for example,
resources/foreign_catalog_job.yml):YAMLresources:
jobs:
foreign_catalog_job:
name: foreign_catalog_job
trigger:
periodic:
interval: 1
unit: HOURS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id} -
Deploy the bundle:
Bashdatabricks bundle deploy
Configure incremental tracking
Query-based connectors use a cursor column to determine which rows are new or updated since the last pipeline run. Your choice of cursor column is critical for effective incremental ingestion.
Consider the following when you select a cursor column:
- Use a timestamp column, if possible. Columns like
updated_atorlast_modifiedare ideal because they directly reflect when a row was last changed. - Integer IDs work for append-only sources. If rows are never updated, an auto-incrementing ID column (such as
idorrow_id) can serve as the cursor. Don't use an integer ID as a cursor if rows can be updated without changing the ID. - The column must increase monotonically. Values must never decrease. If the column can be set to a past value (for example, by a backfill), rows written before the previous high-water mark aren't reingested.
- You can only specify a single cursor column. You can't specify multiple columns as a composite cursor.
After the connector stores the cursor high-water mark, it uses the high-water mark as the lower bound filter (cursor_column > last_value) on the next run. Rows with a NULL cursor value are not ingested.
Configure history tracking (SCD)
To track the full history of row changes in destination tables, configure SCD type 2. See Enable history tracking (SCD type 2).
Common patterns
For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.