Ingerir dados do SQL Server
Visualização
O conector do Microsoft SQL Server está em uma versão prévia pública fechada. Para participar da pré-visualização, entre em contato com a equipe do Databricks account .
Este artigo descreve como ingerir dados de SQL Server e carregá-los em Databricks usando LakeFlow Connect.
Visão geral das etapas
- Configure seu banco de dados de origem para ingestão.
- Crie um gateway que se conecte ao banco de dados SQL Server, extraia o Snapshot e altere os dados do banco de dados de origem e armazene-os em um volume de preparação Unity Catalog.
- Crie uma ingestão pipeline, que aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.
- Programar a ingestão pipeline.
Variações do banco de
O conector do SQL Server é compatível com os bancos de dados SQL do Azure SQL e do Amazon RDS. Isso inclui o SQL Server em execução nas máquinas virtuais (VMs) do Azure e no Amazon EC2. O conector também oferece suporte ao SQL Server no local usando o Azure ExpressRoute e a rede AWS Direct Connect.
Antes de começar
Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:
-
Seu workspace está habilitado para Unity Catalog.
-
O compute sem servidor está habilitado para Notebook, fluxo de trabalho e DLT. Consulte Ativar serverless compute .
-
Para criar uma conexão:
CREATE CONNECTION
na metastore.Para usar uma conexão existente:
USE CONNECTION
ouALL PRIVILEGES
na conexão. -
USE CATALOG
no catálogo de destino. -
USE SCHEMA
,CREATE TABLE
eCREATE VOLUME
em um esquema existente ouCREATE SCHEMA
no catálogo de destino. -
Permissões irrestritas para criar clustering ou uma política personalizada. Uma política personalizada deve atender aos seguintes requisitos:
-
Família: Job compute
-
A família de políticas substitui:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
Os nós do worker (
node_type_id
) não são usados, mas são necessários para a execução do DLT. Especifique um nó mínimo:
"driver_node_type_id": {
"type": "unlimited",
"defaultValue": "r5.xlarge",
"isOptional": true
},
"node_type_id": {
"type": "unlimited",
"defaultValue": "m4.large",
"isOptional": true
}Para obter mais informações sobre a política de cluster, consulte Selecionar uma política de cluster.
-
Configurar o banco de dados de origem para ingestão
Consulte Configurar o Microsoft SQL Server para ingestão no Databricks.
Criar uma conexão com o SQL Server
O conector usa um objeto de conexão do Unity Catalog para armazenar e acessar as credenciais do banco de dados de origem.
Permissões necessárias
- Para criar uma nova conexão,
CREATE CONNECTION
na metastore. Entre em contato com um administrador da metastore para conceder isso. - Para usar uma conexão existente,
USE CONNECTION
ouALL PRIVILEGES
no objeto de conexão. Entre em contato com o proprietário da conexão para concedê-las.
Para criar a conexão, faça o seguinte:
- No site Databricks workspace, clique em Catalog > External Data > Connections .
- Clique em Criar conexão . Se você não vê esse botão, talvez não tenha os privilégios
CREATE CONNECTION
. - Insira um nome de conexão exclusivo.
- Para o tipo de conexão , selecione SQL Server .
- Para Host , especifique o nome de domínio do SQL Server.
- Para User e Password , digite suas credenciais de login do SQL Server.
- Clique em Criar .
Crie um catálogo de preparação e esquemas
O conector do SQL Server cria um volume de preparação do Unity Catalog para armazenar dados intermediários em um catálogo e esquema de preparação do Unity Catalog que o senhor especificar.
O catálogo de teste e o esquema podem ser iguais aos do catálogo e do esquema de destino. O catálogo de teste não pode ser um catálogo estrangeiro.
Permissões necessárias
- Para criar um novo catálogo de preparo,
CREATE CATALOG
na metastore. Entre em contato com um administrador da metastore para conceder isso. - Para usar um catálogo de teste existente,
USE CATALOG
no catálogo. Entre em contato com o proprietário do catálogo para conceder isso. - Para criar um novo esquema de teste,
CREATE SCHEMA
no catálogo. Entre em contato com o proprietário do catálogo para conceder isso. - Para usar um esquema de teste existente,
USE SCHEMA
,CREATE VOLUME
eCREATE TABLE
no esquema. Entre em contato com o proprietário do esquema para concedê-los.
- Catalog Explorer
- CLI
- No site Databricks workspace, clique em Catalog (Catálogo ).
- No Catalog tab, siga um destes procedimentos:
- Clique em Criar catálogo . Se você não vê esse botão, você não tem privilégios
CREATE CATALOG
. - Insira um nome exclusivo para o catálogo e clique em Criar .
- Selecione o catálogo que você criou.
- Clique em Criar esquema . Se você não vê esse botão, você não tem privilégios
CREATE SCHEMA
. - Insira um nome exclusivo para o esquema e clique em Criar .
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Criar o gateway e o pipeline de ingestão
O gateway extrai o Snapshot e altera os dados do banco de dados de origem e os armazena no volume de preparação Unity Catalog. log Para evitar problemas com lacunas nos dados de alteração devido a políticas de retenção de alterações no banco de dados de origem, execute o gateway como um pipeline contínuo.
A ingestão pipeline aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.
Só há suporte para um pipeline de ingestão por gateway.
Embora a API permita isso, o pipeline de ingestão não é compatível com mais de um catálogo e esquema de destino. Se o senhor precisar gravar em vários catálogos ou esquemas de destino, crie vários pares de gateway-ingestion pipeline.
Permissões necessárias
Para criar um pipeline, o senhor precisa das permissões Unrestricted cluster creation
. Entre em contato com um administrador do account.
- Databricks Asset Bundles
- Notebook
- CLI
Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.
-
Crie um novo pacote usando a CLI do Databricks:
Bashdatabricks bundle init
-
Adicione dois novos arquivos de recurso ao pacote:
- Um arquivo de definição de pipeline (
resources/sqlserver_pipeline.yml
). - Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (
resources/sqlserver.yml
).
Veja a seguir um exemplo de arquivo
resources/sqlserver_pipeline.yml
:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEWVeja a seguir um exemplo de arquivo
resources/sqlserver_job.yml
:YAMLresources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id} - Um arquivo de definição de pipeline (
-
implantado o pipeline usando o Databricks CLI:
Bashdatabricks bundle deploy
Atualize a célula Configuration
no Notebook a seguir com a conexão de origem, o catálogo de destino, o esquema de destino e as tabelas a serem ingeridas da origem.
Criar gateway e pipeline de ingestão
Para criar o gateway:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
Para criar o pipeline de ingestão:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
Configurar um programa de acionamento para a ingestão pipeline
Somente o modo acionado é compatível com a execução do pipeline de ingestão.
O senhor pode criar um programa para o pipeline usando a interface do usuário DLT pipeline , clicando no botão no canto superior direito da tela da interface do usuário pipeline.
A UI cria automaticamente um Job para executar o pipeline de acordo com a programação especificada. O trabalho é mostrado em Jobs tab.
Verificar se a ingestão de dados foi bem-sucedida
A lista view na interface de usuário de ingestão pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números são automaticamente refresh.
As colunas Upserted records
e Deleted records
não são exibidas por default. Você pode ativá-las clicando no botão de configuração das colunas e selecionando-as.
começar, programar e definir alertas em seu pipeline
-
Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .
O novo pipeline aparece na lista pipeline.
-
Para acessar view os detalhes de pipeline, clique no nome pipeline.
-
Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .
-
Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.