Skip to main content

Create a query-based ingestion pipeline

Preview

This feature is in Public Preview.

This page shows how to create a query-based ingestion pipeline in Lakeflow Connect.

Requirements

Before you create a query-based ingestion pipeline, you must first meet the following requirements:

  • Unity Catalog is enabled for your Databricks workspace.
  • Your serverless compute environment allows network connectivity to the source database. See Networking and Networking recommendations for Lakehouse Federation.
  • For foreign connection ingestion: You have an existing connection to the source database or CREATE CONNECTION privileges on the metastore. See Connect to managed ingestion sources.
  • For foreign catalog ingestion: You have an existing foreign catalog registered in Lakehouse Federation or the privileges to create one.
  • You have CREATE and USE SCHEMA privileges on the destination catalog and schema.

Option 1: Foreign connection ingestion

Use this approach when you have a connection that stores authentication credentials for the source database. Supported sources include Oracle, Teradata, SQL Server, MySQL, MariaDB, and PostgreSQL.

The Databricks UI supports query-based pipeline creation. To deploy with classic compute (Beta), use Declarative Automation Bundles.

  1. In the Databricks workspace sidebar, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click your source (for example, Oracle or SQL Server). The ingestion wizard opens.

  3. On the Ingestion pipeline page, enter a unique name for the pipeline.

  4. For Destination catalog, select a Unity Catalog catalog to store the ingested data.

  5. Select the Unity Catalog connection that stores the credentials required to access the source database.

    If there's no existing connection, click Create connection and enter the connection details. You must have CREATE CONNECTION privileges on the metastore.

  6. Click Create pipeline and continue.

  7. On the Source page, select the schemas and tables to ingest.

  8. For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example, updated_at or row_id). If you don't select a monotonically increasing cursor column, the connector performs a full load on each run.

  9. Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).

  10. Click Next.

  11. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  12. Click Save and continue.

  13. (Optional) On the Settings page, click Create schedule and set the refresh frequency.

  14. (Optional) Set email notifications for pipeline success or failure.

  15. Click Save and run pipeline.

Option 2: Foreign catalog ingestion

Use this approach when you want to ingest from a foreign catalog registered in Lakehouse Federation. Foreign catalog ingestion supports all Lakehouse Federation data sources and supports deletion tracking.

  1. In the Databricks workspace sidebar, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click your source. The ingestion wizard opens.

  3. On the Ingestion pipeline page, enter a unique name for the pipeline.

  4. For Destination catalog, select a Unity Catalog catalog to store the ingested data.

  5. For Connection type, select Foreign catalog and choose the foreign catalog registered in Lakehouse Federation.

  6. Click Create pipeline and continue.

  7. On the Source page, select the schemas and tables to ingest.

  8. For each table, specify the cursor column. This must be a single column with values that increase monotonically (for example, updated_at or row_id).

  9. Optionally, change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).

  10. Click Next.

  11. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don't want to use an existing schema, click Create schema. You must have USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  12. Click Save and continue.

  13. (Optional) On the Settings page, click Create schedule and set the refresh frequency.

  14. (Optional) Set email notifications for pipeline success or failure.

  15. Click Save and run pipeline.

Configure incremental tracking

Query-based connectors use a cursor column to determine which rows are new or updated since the last pipeline run. Your choice of cursor column is critical for effective incremental ingestion.

Consider the following when you select a cursor column:

  • Use a timestamp column, if possible. Columns like updated_at or last_modified are ideal because they directly reflect when a row was last changed.
  • Integer IDs work for append-only sources. If rows are never updated, an auto-incrementing ID column (such as id or row_id) can serve as the cursor. Don't use an integer ID as a cursor if rows can be updated without changing the ID.
  • The column must increase monotonically. Values must never decrease. If the column can be set to a past value (for example, by a backfill), rows written before the previous high-water mark aren't reingested.
  • You can only specify a single cursor column. You can't specify multiple columns as a composite cursor.

After the connector stores the cursor high-water mark, it uses the high-water mark as the lower bound filter (cursor_column > last_value) on the next run. Rows with a NULL cursor value are not ingested.

Configure history tracking (SCD)

To track the full history of row changes in destination tables, configure SCD type 2. See Enable history tracking (SCD type 2).

Common patterns

For advanced pipeline configurations, see Common patterns for managed ingestion pipelines.

Additional resources