Ingest data from SQL Server

Preview

LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.

This article describes how to ingest data from SQL Server and load it into Databricks using LakeFlow Connect.

The Microsoft SQL Server (SQL Server) connector supports the following:

  • Azure SQL Database

  • Amazon RDS for SQL Server

Overview of steps

  1. Configure your source database for ingestion.

  2. Create a gateway, which connects to the SQL Server database, extracts snapshot and change data from the source database, and stores it in a staging Unity Catalog volume.

  3. Create an ingestion pipeline, which applies snapshot and change data from the staging volume into destination streaming tables.

  4. Schedule the ingestion pipeline.

Before you begin

To create an ingestion pipeline, you must meet the following requirements:

  • Your workspace is enabled for Unity Catalog.

  • Serverless compute is enabled for notebooks, workflows, and Delta Live Tables. See Enable serverless compute.

  • To create a connection: CREATE CONNECTION on the metastore.

    To use an existing connection: USE CONNECTION or ALL PRIVILEGES on the connection.

  • USE CATALOG on the target catalog.

  • USE SCHEMA, CREATE TABLE, and CREATE VOLUME on an existing schema or CREATE SCHEMA on the target catalog.

  • Unrestricted permissions to create clusters, or a custom policy. A custom policy must meet the following requirements:

    • Family: Job Compute

    • Policy family overrides:

      {
        "cluster_type": {
          "type": "fixed",
          "value": "dlt"
        },
        "num_workers": {
          "type": "unlimited",
          "defaultValue": 1,
          "isOptional": true
        },
        "runtime_engine": {
          "type": "fixed",
          "value": "STANDARD",
          "hidden": true
        }
      }
      
    • The worker nodes (node_type_id) are not used but are required to run DLT. Specify a minimal node:

    "driver_node_type_id": {
      "type": "unlimited",
      "defaultValue": "r5.xlarge",
      "isOptional": true
    },
    "node_type_id": {
      "type": "unlimited",
      "defaultValue": "m4.large",
      "isOptional": true
    }
    

    For more information about cluster policies, see Select a cluster policy.

Set up the source database for ingestion

See Configure SQL Server for ingestion into Databricks.

Create a SQL Server connection

The connector uses a Unity Catalog connection object to store and access the credentials for the source database.

Note

Permissions required

  • To create a new connection, CREATE CONNECTION on the metastore. Contact a metastore admin to grant this.

  • To use an existing connection, USE CONNECTION or ALL PRIVILEGES on the connection object. Contact the connection owner to grant these.

To create the connection, do the following:

  1. In the Databricks workspace, click Catalog > External Data > Connections.

  2. Click Create connection. If you don’t see this button, you might not have CREATE CONNECTION privileges.

  3. Enter a unique Connection name.

  4. For Connection type select SQL Server.

  5. For Host, specify the SQL Server domain name.

  6. For User and Password, enter your SQL Server login credentials.

  7. Click Create.

Note

Test connection test that the host is reachable. It does not test user credentials for correct username and password values.

Create a staging catalog and schemas

The SQL Server connector creates a Unity Catalog staging volume to store intermediate data in a staging Unity Catalog catalog and schema that you specify.

The staging catalog and schema can be the same as the destination catalog and schema. The staging catalog cannot be a foreign catalog.

Note

Permissions required

  • To create a new staging catalog, CREATE CATALOG on the metastore. Contact a metastore administrator to grant this.

  • To use an existing staging catalog, USE CATALOG on the catalog. Contact the catalog owner to grant this.

  • To create a new staging schema, CREATE SCHEMA on the catalog. Contact the catalog owner to grant this.

  • To use an existing staging schema, USE SCHEMA, CREATE VOLUME, and CREATE TABLE on the schema. Contact the schema owner to grant these.

  1. In the Databricks workspace, click Catalog.

  2. On the Catalog tab, do one of the following:

  3. Click Create catalog. If you don’t see this button, you don’t have CREATE CATALOG privileges.

  4. Enter a unique name for the catalog, and then click Create.

  5. Select the catalog you created.

  6. Click Create schema. If you don’t see this button, you don’t have CREATE SCHEMA privileges.

  7. Enter a unique name for the schema, and then click Create.

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "SQLSERVER",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "1433",
    "trustServerCertificate": "false",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Create the gateway and ingestion pipeline

The gateway extracts snapshot and change data from the source database and stores it in the staging Unity Catalog volume. To avoid issues with gaps in the change data due to change log retention policies on the source database, run the gateway as a continuous pipeline.

The ingestion pipeline applies the snapshot and change data from the staging volume into destination streaming tables.

Note

Only one ingestion pipeline per gateway is supported.

Even though the API allows it, the ingestion pipeline does not support more than one destination catalog and schema. If you need to write to multiple destination catalogs or schemas, create multiple gateway-ingestion pipeline pairs.

Note

Permissions required

To create a pipeline, you need Unrestricted cluster creation permissions. Contact an account administrator.

Update the Configuration cell in the following notebook with the source connection, target catalog, target schema, and tables to ingest from the source.

Create gateway and ingestion pipeline

Open notebook in new tab

To create the gateway:

output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
  "connection_id": "'"$CONNECTION_ID"'",
  "gateway_storage_catalog": "'"$STAGING_CATALOG"'",
  "gateway_storage_schema": "'"$STAGING_SCHEMA"'",
  "gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
  }
}')

export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

To create the ingestion pipeline:

databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
  "objects": [
    {"table": {
        "source_catalog": "tpc",
        "source_schema": "tpch",
        "source_table": "lineitem",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'",
        "destination_table": "<YOUR_DATABRICKS_TABLE>",
        }},
     {"schema": {
        "source_catalog": "tpc",
        "source_schema": "tpcdi",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'"
        "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
        }}
    ]
  }
}'

Set up a trigger schedule for the ingestion pipeline

Note

Only triggered mode is supported for running ingestion pipelines.

You can create a schedule for the pipeline using the DLT pipeline UI by clicking on the button in the top right-hand corner of the pipeline UI screen.

The UI automatically creates a job to run the pipeline according to the specified schedule. The job is shown in the Jobs tab.

Verify successful data ingestion

The list view in the ingestion pipeline UI shows the number of records processed as data is ingested. These numbers automatically refresh.

Verify replication

The Upserted records and Deleted records columns are not shown by default. You can enable them by clicking on the columns configuration Columns configuration icon button and selecting them.

Start, schedule, and set alerts on your pipeline

  1. After the pipeline has been created, revisit the Databricks workspace, and then click Delta Live Tables.

    The new pipeline appears in the pipeline list.

  2. To view the pipeline details, click the pipeline name.

  3. On the pipeline details page, run the pipeline by clicking Start. You can schedule the pipeline by clicking Schedule.

  4. To set alerts on the pipeline, click Schedule, click More options, and then add a notification.

  5. After ingestion completes, you can query your tables.