Configure SQL Server for ingestion
Preview
LakeFlow Connect is in gated Public Preview. To participate in the preview, contact your Databricks account team.
This article describes how to configure SQL Server for ingestion. This is a prerequisite for ingesting data from SQL Server and loading it into Databricks using LakeFlow Connect.
Change tracking vs. change data capture (CDC)
Databricks supports Microsoft change data capture (CDC) or Microsoft change tracking to extract data from SQL Server. If a table has a primary key, Databricks recommends using change tracking for optimal performance. When no primary key is present, CDC must be used. For more information about these two options, see Track data changes (SQL Server) in the SQL Server documentation.
To use Microsoft change data capture (CDC), you must have SQL Server 2017 or above. To use Microsoft change tracking, you must have SQL Server 2012 or above.
Create a SQL Server user
Databricks recommends that you create a database user that is solely used for Databricks ingestion.
This database user must have the following privileges regardless of the method employed for tracking data changes in the source database:
Read access to the following system tables and views:
sys.databases
sys.schemas
sys.tables
sys.columns
sys.key_constraints
sys.foreign_keys
sys.check_constraints
sys.default_constraints
sys.change_tracking_tables
sys.change_tracking_databases
sys.objects
sys.triggers
Execute permissions on the following system stored procedures:
sp_tables
sp_columns
sp_columns_100
sp_pkeys
sp_statistics
SELECT
on the schemas and tables you want to ingest.
Enable change tracking
To use change tracking, you must enable change tracking in all ingested databases and tables.
Note
Change tracking can be used only on tables with a primary key.
Enable change tracking on a database
Run the following, replacing <database-name>
with the name of the database you want to enable change tracking on.
ALTER DATABASE <database-name> SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 3 DAYS, AUTO_CLEANUP = ON)
Enable change tracking on a table
Run the following, replacing <schema-name>.<table-name>
with the name of the schema and table you want to enable change tracking on.
ALTER TABLE <schema-name>.<table-name> ENABLE CHANGE_TRACKING
Additional read permissions
You need the VIEW CHANGE TRACKING
privilege on the ingested tables or on a schema that contains tables being tracked.
To grant schema-level permissions, run the following:
GRANT VIEW CHANGE TRACKING ON SCHEMA::<schema-name> TO <cdc-username>;
To grant table-level permissions, run the following:
GRANT VIEW CHANGE TRACKING ON OBJECT::<schema-name>.<table-name> TO <cdc-username>;
Set up DDL capture and schema evolution
The connector has the ability to track the data definition language (DDL) on ingested database objects and apply relevant table schema changes to the destination tables or add new tables in case of full schema replication.
To perform DDL capture, additional database objects (for example, internal tables, stored procedures, and triggers) need to be set up. There are two ways to set them up:
Automatic: Requires additional privileges to be granted to the replication user.
Manual: Requires manual creation of database objects.
Option 1: Automatic setup for DDL capture
Databricks requires additional privileges for automatic DDL capture when using change tracking. Run the following commands to grant these privileges to the user configured for use with Databricks:
CREATE PROCEDURE
on the database:GRANT CREATE PROCEDURE TO <cdc-username>;
CREATE TABLE
on the database:GRANT CREATE TABLE TO <cdc-username>;
SELECT
,EXECUTE
, andINSERT
on the schema:GRANT SELECT,INSERT,EXECUTE ON SCHEMA::dbo TO <cdc-username>;
ALTER
on the database:GRANT ALTER ON DATABASE::<database-name> TO <cdc-username>;
ALTER
on the schema or on all tables to ingest:GRANT ALTER ON SCHEMA::[<schema-name>] TO [<cdc-username>];
Option 2: Manual setup for DDL capture
To manually set up DDL capture, run the following:
CREATE TABLE "dbo"."replicate_io_audit_ddl_1"(
"SERIAL_NUMBER" INT IDENTITY NOT NULL,
"CURRENT_USER" NVARCHAR(128),
"SCHEMA_NAME" NVARCHAR(128),
"TABLE_NAME" NVARCHAR(128),
"TYPE" NVARCHAR(30),
"OPERATION_TYPE" NVARCHAR(30),
"SQL_TXT" NVARCHAR(2000),
"LOGICAL_POSITION" BIGINT,
CONSTRAINT "replicate_io_audit_ddlPK" PRIMARY KEY("SERIAL_NUMBER","LOGICAL_POSITION")
)
ALTER TABLE dbo.replicate_io_audit_ddl_1 ENABLE CHANGE_TRACKING;
GRANT VIEW CHANGE TRACKING ON OBJECT::dbo.replicate_io_audit_ddl_1 TO [CdcUserName];
CREATE TABLE dbo.replicate_io_audit_tbl_schema_1(
"COLUMN_ID" BIGINT,
"DATA_DEFAULT" BIGINT,
"COLUMN_NAME" VARCHAR(128) NOT NULL,
"TABLE_NAME" NVARCHAR(128) NOT NULL,
"SCHEMA_NAME" NVARCHAR(128) NOT NULL,
"HIDDEN_COLUMN" NVARCHAR(3),
"DATA_TYPE" NVARCHAR(128),
"DATA_LENGTH" BIGINT,
"CHAR_LENGTH" BIGINT,
"DATA_SCALE" BIGINT,
"DATA_PRECISION" BIGINT,
"IDENTITY_COLUMN" NVARCHAR(3),
"VIRTUAL_COLUMN" NVARCHAR(3),
"NULLABLE" NVARCHAR(1),
"LOGICAL_POSITION" BIGINT);
CREATE TABLE dbo.replicate_io_audit_tbl_cons_1(
"SCHEMA_NAME" VARCHAR(128),
"TABLE_NAME" VARCHAR(128),
"COLUMN_NAME" VARCHAR(4000),
"COL_POSITION" BIGINT,
"CONSTRAINT_NAME" VARCHAR(128),
"CONSTRAINT_TYPE" VARCHAR(1),
"LOGICAL_POSITION" BIGINT);
CREATE OR ALTER TRIGGER replicate_io_audit_ddl_trigger_1
ON DATABASE
AFTER ALTER_TABLE
AS SET NOCOUNT ON;
DECLARE @DbName nvarchar(255), @SchemaName nvarchar(max), @TableName nvarchar(255), @data XML, @operation NVARCHAR(30), @isCTEnabledDBLevel bit, @isCTEnabledTableLevel bit;
SET @data = EVENTDATA();
SET @DbName = DB_NAME();
SET @SchemaName = @data.value('(/EVENT_INSTANCE/SchemaName)[1]', 'NVARCHAR(MAX)');
SET @TableName = @data.value('(/EVENT_INSTANCE/ObjectName)[1]', 'NVARCHAR(255)');
SET @operation = @data.value('(/EVENT_INSTANCE/EventType)[1]', 'NVARCHAR(30)');
SET @isCTEnabledDBLevel = (SELECT COUNT(*) FROM sys.change_tracking_databases WHERE database_id=DB_ID(@DbName));
SET @isCTEnabledTableLevel = (SELECT COUNT(*) FROM sys.change_tracking_tables WHERE object_id = object_id(@SchemaName + '.' + @TableName));
IF (@isCTEnabledDBLevel = 1 AND @isCTEnabledTableLevel = 1)
BEGIN
INSERT INTO
dbo.replicate_io_audit_ddl_1
("CURRENT_USER","SCHEMA_NAME", "TABLE_NAME", "TYPE", "OPERATION_TYPE", "SQL_TXT", "LOGICAL_POSITION")
VALUES (SUSER_NAME(),
@data.value('(/EVENT_INSTANCE/SchemaName)[1]', 'NVARCHAR(128)'),
@data.value('(/EVENT_INSTANCE/EventType)[1]', 'NVARCHAR(128)'),
@data.value('(/EVENT_INSTANCE/ObjectType)[1]', 'NVARCHAR(30)'),
@data.value('(/EVENT_INSTANCE/EventType)[1]', 'NVARCHAR(30)'),
@data.value('(/EVENT_INSTANCE/TSQLCommand/CommandText)[1]',
'NVARCHAR(2000)'),
CHANGE_TRACKING_CURRENT_VERSION() );
END
GRANT VIEW DEFINITION ON DATABASE::[YourDatabaseName] TO [CdcUserName];
Enable built-in CDC for the source database
To enable the source database for CDC, run the following stored procedure in Azure SQL, replacing the value for <database-name>
. You must be logged into the database you want to enable for CDC.
USE <database-name>
EXEC sys.sp_cdc_enable_db
To enable CDC in a database in Amazon RDS for SQL Server, run the following:
EXEC msdb.dbo.rds_cdc_enable_db '<database-name>'
For more information, see Enable change data capture for a database in the SQL Server documentation.
Enable built-in CDC on the source table
To enable CDC on the source table, run the following stored procedure in Azure SQL:
EXEC sys.sp_cdc_enable_table
@source_schema = N'MySchema',
@source_name = N'MyTable',
@role_name = NULL,
@supports_net_changes = 1
Replace the values for
source_schema
,source_name
, androle_name
.@support_net_changes
only supports a value of1
if the table has a primary key.
For more information, see Enable change data capture for a table in the SQL Server documentation.
Additional read permissions
You need the SELECT
privilege on the schema cdc
that contains the change tables that are created when CDC is enabled.
GRANT SELECT ON SCHEMA::cdc to [cdc-username];