Create a MySQL ingestion pipeline
The MySQL connector is in Public Preview. Contact your Databricks account team to request access.
Learn how to ingest data from MySQL into Databricks using Lakeflow Connect. The MySQL connector supports Amazon RDS for MySQL, Aurora MySQL, Azure Database for MySQL, Google Cloud SQL for MySQL, and MySQL running on EC2.
Before you begin
To create an ingestion gateway and an ingestion pipeline, you must meet the following requirements:
-
Your workspace is enabled for Unity Catalog.
-
Serverless compute is enabled for your workspace. See Serverless compute requirements.
-
If you plan to create a connection: You have
CREATE CONNECTIONprivileges on the metastore.If your connector supports UI-based pipeline authoring, you can create the connection and the pipeline at the same time by completing the steps on this page. However, if you use API-based pipeline authoring, you must create the connection in Catalog Explorer before you complete the steps on this page. See Connect to managed ingestion sources.
-
If you plan to use an existing connection: You have
USE CONNECTIONprivileges orALL PRIVILEGESon the connection. -
You have
USE CATALOGprivileges on the target catalog. -
You have
USE SCHEMA,CREATE TABLE, andCREATE VOLUMEprivileges on an existing schema orCREATE SCHEMAprivileges on the target catalog.
-
Unrestricted permissions to create clusters, or a custom policy (API only). A custom policy for the gateway must meet the following requirements:
-
Family: Job Compute
-
Policy family overrides:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
The following compute policy enables Databricks to scale the ingestion gateway to meet the needs of your workload. The minimum requirement is 4 cores. However, for better snapshot extraction performance, Databricks recommends using larger instance types with more memory and CPU cores.
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "n2-highmem-8"
},
"node_type_id": {
"type": "fixed",
"value": "n2-standard-4"
}
}
For more information about cluster policies, see Select a compute policy.
-
To ingest from MySQL, you must also complete the source setup.
Option 1: Databricks UI
Admin users can create a connection and a pipeline at the same time in the UI. This is the simplest way to create managed ingestion pipelines.
-
In the sidebar of the Databricks workspace, click Data Ingestion.
-
On the Add data page, under Databricks connectors, click MySQL. The ingestion wizard opens.
-
On the Ingestion gateway page of the wizard, enter a unique name for the gateway.
-
Select a catalog and a schema for the staging ingestion data, then click Next.
-
On the Ingestion pipeline page, enter a unique name for the pipeline.
-
For Destination catalog, select a catalog to store the ingested data.
-
Select the Unity Catalog connection that stores the credentials required to access the source data.
If there are no existing connections to the source, click Create connection and enter the authentication details you obtained from the source setup. You must have
CREATE CONNECTIONprivileges on the metastore.noteThe Test Connection button might fail for MySQL users using
sha256_passwordorcaching_sha2_passwordauthentication. This is a known limitation. You can still proceed with creating the connection. -
Click Create pipeline and continue.
-
On the Source page, select the databases and tables to ingest.
-
Optionally change the default history tracking setting. For more information, see Enable history tracking (SCD type 2).
-
Click Next.
-
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don't want to use an existing schema, click Create schema. You must have
USE CATALOGandCREATE SCHEMAprivileges on the parent catalog. -
Click Save and continue.
-
(Optional) On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.
-
(Optional) Set email notifications for pipeline operation success or failure.
-
Click Save and run pipeline.
Option 2: Programmatic interfaces
Before you ingest using Databricks Asset Bundles, Databricks APIs, Databricks SDKs, or the Databricks CLI, you must have access to an existing Unity Catalog connection. For instructions, see Connect to managed ingestion sources.
Create the staging catalog and schema
The staging catalog and schema can be the same as the destination catalog and schema. The staging catalog can't be a foreign catalog.
export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "MYSQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Create the gateway and the ingestion pipeline
The ingestion gateway extracts snapshot and change data from the source database and stores it in a Unity Catalog staging volume. You must run the gateway as a continuous pipeline. This accommodates binlog retention policies on the source database.
The ingestion pipeline applies the snapshot and change data from the staging volume into destination streaming tables.
- Databricks Asset Bundles
- Notebook
- CLI
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init -
Add two new resource files to the bundle:
- A pipeline definition file (
resources/mysql_pipeline.yml). - A workflow file that controls the frequency of data ingestion (
resources/mysql_job.yml).
The following is an example
resources/mysql_pipeline.ymlfile:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: mysql-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <mysql-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_mysql:
name: mysql-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table mydb.customers to dest_catalog.dest_schema.customers
source_schema: public
source_table: customers
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema
# The destination table name will be the same as it is on the source
source_schema: sales
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}The following is an example
resources/mysql_job.ymlfile:YAMLresources:
jobs:
mysql_dab_job:
name: mysql_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_mysql.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
Update the Configuration cell in the following notebook with the source connection, target catalog, target schema, and tables to ingest from the source.
Create gateway and ingestion pipeline
To create the gateway:
export GATEWAY_PIPELINE_NAME="mysql-gateway"
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
To create the ingestion pipeline:
export INGESTION_PIPELINE_NAME="mysql-ingestion-pipeline"
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_schema": "public",
"source_table": "customers",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "customers"
}},
{"schema": {
"source_schema": "sales",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'