Ingerir dados do SQL Server
Visualização
O LakeFlow Connect está em um Public Preview fechado. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .
Este artigo descreve como ingerir dados de SQL Server e carregá-los em Databricks usando LakeFlow Connect.
O conector do Microsoft SQL Server (SQL Server) oferece suporte ao seguinte:
- Azure SQL Database
- Amazon RDS para SQL Server
Visão geral das etapas
- Configure seu banco de dados de origem para ingestão.
- Crie um gateway que se conecte ao banco de dados SQL Server, extraia o Snapshot e altere os dados do banco de dados de origem e armazene-os em um volume de preparação Unity Catalog.
- Crie uma ingestão pipeline, que aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.
- Programar a ingestão pipeline.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
O compute sem servidor está habilitado para Notebook, fluxo de trabalho e Delta Live Tables. Consulte Ativar serverless compute .
-
Para criar uma conexão:
CREATE CONNECTION
na metastore.Para usar uma conexão existente:
USE CONNECTION
ouALL PRIVILEGES
na conexão. -
USE CATALOG
no catálogo de destino. -
USE SCHEMA
,CREATE TABLE
eCREATE VOLUME
em um esquema existente ouCREATE SCHEMA
no catálogo de destino. -
Permissões irrestritas para criar clustering ou uma política personalizada. Uma política personalizada deve atender aos seguintes requisitos:
-
Família: Job compute
-
A família de políticas substitui:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
Os nós worker (
node_type_id
) não são usados, mas são necessários para a execução da DLT. Especifique um nó mínimo:
"driver_node_type_id": {
"type": "unlimited",
"defaultValue": "r5.xlarge",
"isOptional": true
},
"node_type_id": {
"type": "unlimited",
"defaultValue": "m4.large",
"isOptional": true
}Para obter mais informações sobre a política de cluster, consulte Selecionar uma política de cluster.
-
Configurar o banco de dados de origem para ingestão
Consulte Configurar o SQL Server para ingestão no Databricks.
Criar uma conexão com o SQL Server
O conector usa um objeto de conexão do Unity Catalog para armazenar e acessar as credenciais do banco de dados de origem.
Permissões necessárias
- Para criar uma nova conexão,
CREATE CONNECTION
na metastore. Entre em contato com um administrador da metastore para conceder isso. - Para usar uma conexão existente,
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. Entre em contato com o proprietário da conexão para concedê-las.
Para criar a conexão, faça o seguinte:
- No site Databricks workspace, clique em Catalog > External Data > Connections (Catálogo > Dados externos > Conexões ).
- Clique em Criar conexão . Se você não vê esse botão, talvez não tenha os privilégios
CREATE CONNECTION
. - Insira um nome de conexão exclusivo.
- Para o tipo de conexão , selecione SQL Server .
- Para Host , especifique o nome de domínio do SQL Server.
- Para User e Password , digite suas credenciais de login do SQL Server.
- Clique em Criar .
Teste a conexão para verificar se o host está acessível. Ele não testa as credenciais do usuário quanto aos valores corretos de nome de usuário e senha.
Crie um catálogo de preparação e esquemas
O conector do SQL Server cria um volume de preparação do Unity Catalog para armazenar dados intermediários em um catálogo e esquema de preparação do Unity Catalog que o senhor especificar.
O catálogo de teste e o esquema podem ser iguais aos do catálogo e do esquema de destino. O catálogo de teste não pode ser um catálogo estrangeiro.
Permissões necessárias
- Para criar um novo catálogo de preparo,
CREATE CATALOG
na metastore. Entre em contato com um administrador da metastore para conceder isso. - Para usar um catálogo de teste existente,
USE CATALOG
no catálogo. Entre em contato com o proprietário do catálogo para conceder isso. - Para criar um novo esquema de teste,
CREATE SCHEMA
no catálogo. Entre em contato com o proprietário do catálogo para conceder isso. - Para usar um esquema de teste existente,
USE SCHEMA
,CREATE VOLUME
eCREATE TABLE
no esquema. Entre em contato com o proprietário do esquema para concedê-los.
- Catalog Explorer
- CLI
- In the Databricks workspace, click Catalog.
- On the Catalog tab, do one of the following:
- Click Create catalog. If you don’t see this button, you don’t have
CREATE CATALOG
privileges. - Enter a unique name for the catalog, and then click Create.
- Select the catalog you created.
- Click Create schema. If you don’t see this button, you don’t have
CREATE SCHEMA
privileges. - Enter a unique name for the schema, and then click Create.
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Criar o gateway e o pipeline de ingestão
O gateway extrai o Snapshot e altera os dados do banco de dados de origem e os armazena no volume de preparação Unity Catalog. log Para evitar problemas com lacunas nos dados de alteração devido a políticas de retenção de alterações no banco de dados de origem, execute o gateway como um pipeline contínuo.
A ingestão pipeline aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.
Só há suporte para um pipeline de ingestão por gateway.
Embora a API permita isso, o pipeline de ingestão não é compatível com mais de um catálogo e esquema de destino. Se o senhor precisar gravar em vários catálogos ou esquemas de destino, crie vários pares de gateway-ingestion pipeline.
Permissões necessárias
Para criar um pipeline, o senhor precisa das permissões Unrestricted cluster creation
. Entre em contato com um administrador do account.
- Databricks Asset Bundles
- Notebook
- CLI
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles (DABs). Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init
-
Add two new resource files to the bundle:
- A pipeline definition file (
resources/sqlserver_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/sqlserver.yml
).
The following is an example
resources/sqlserver_pipeline.yml
file:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEWThe following is an example
resources/sqlserver_job.yml
file:YAMLresources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
Update the Configuration
cell in the following notebook with the source connection, target catalog, target schema, and tables to ingest from the source.
Create gateway and ingestion pipeline
To create the gateway:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
To create the ingestion pipeline:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
Configurar um programa de acionamento para a ingestão pipeline
Somente o modo acionado é compatível com a execução do pipeline de ingestão.
O senhor pode criar um programa para o pipeline usando a UI do DLT pipeline clicando no botão no canto superior direito da tela da UI do pipeline.
A UI cria automaticamente um Job para executar o pipeline de acordo com o programar especificado. O trabalho é mostrado em Jobs tab.
Verificar se a ingestão de dados foi bem-sucedida
A lista view na interface de usuário pipeline de ingestão mostra o número de registros processados à medida que os dados são ingeridos. Esses números são automaticamente refresh.
As colunas Upserted records
e Deleted records
não são exibidas em default. Você pode ativá-las clicando no botão de configuração das colunas e selecionando-as.
começar, programar e definir alertas em seu pipeline
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em Delta Live Tables .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, execute o pipeline clicando em começar . O senhor pode programar o pipeline clicando em programar .
-
Para definir o alerta no site pipeline, clique em programar , clique em Mais opções e, em seguida, adicione uma notificação.
-
Depois que a ingestão for concluída, você poderá consultar suas tabelas.