Pular para o conteúdo principal

Ingerir dados do SQL Server

info

Visualização

O conector do Microsoft SQL Server está em Public Preview.

Esta página descreve como ingerir dados de SQL Server e carregá-los em Databricks usando LakeFlow Connect. O conector do SQL Server é compatível com os bancos de dados SQL do Azure SQL e do Amazon RDS. Isso inclui o SQL Server em execução nas máquinas virtuais (VMs) do Azure e no Amazon EC2. O conector também oferece suporte ao SQL Server no local usando o Azure ExpressRoute e a rede AWS Direct Connect.

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.

  • O compute sem servidor está habilitado para o seu workspace. Consulte Ativar serverless compute .

  • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION na metastore.

    Se o seu conector for compatível com a criação de pipeline com base na interface do usuário, o senhor poderá criar a conexão e o pipeline ao mesmo tempo, concluindo as etapas desta página. No entanto, se o senhor usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas desta página. Consulte Conectar-se a fontes de ingestão de gerenciar.

  • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

  • Você tem privilégios USE CATALOG no catálogo de destino.

  • Você tem privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

  • O senhor tem acesso a uma instância primária do SQL Server. Os recursos de acompanhamento de alterações e captura de dados de alterações (CDC) não são suportados em réplicas de leitura ou instâncias secundárias.

  • Permissões irrestritas para criar clustering ou uma política personalizada. Uma política personalizada deve atender aos seguintes requisitos:

    • Família: Job compute

    • A família de políticas substitui:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • Databricks recomenda especificar o menor número possível de nós worker para gateways de ingestão porque eles não afetam o desempenho do gateway.

      "driver_node_type_id": {
      "type": "unlimited",
      "defaultValue": "r5.xlarge",
      "isOptional": true
      },
      "node_type_id": {
      "type": "unlimited",
      "defaultValue": "m4.large",
      "isOptional": true
      }

    Para obter mais informações sobre a política de cluster, consulte Selecionar uma política de cluster.

Para ingerir a partir do SQL Server, o senhor também deve concluir a configuração da fonte.

Opção 1: UI da Databricks

Os usuários administradores podem criar uma conexão e um pipeline ao mesmo tempo na interface do usuário. Essa é a maneira mais simples de criar um pipeline de ingestão gerencial.

  1. Na barra lateral do site Databricks workspace, clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em SQL Server .

    O assistente de ingestão é aberto.

  3. Na página Ingestion Gateway do assistente, insira um nome exclusivo para o gateway.

  4. Selecione um catálogo e um esquema para os dados de ingestão intermediária e clique em Avançar.

  5. Na página Ingestion pipeline (Pipeline de ingestão ), digite um nome exclusivo para o pipeline.

  6. Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.

  7. Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.

    Se não houver conexões existentes com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da fonte. Você deve ter privilégios CREATE CONNECTION na metastore.

  8. Clique em Create pipeline (Criar pipeline) e continue .

  9. Na página Origem , selecione as tabelas a serem ingeridas.

  10. Opcionalmente, altere a configuração default história acompanhamento. Para obter mais informações, consulte história acompanhamento.

  11. Clique em Avançar .

  12. Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.

    Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo principal.

  13. Clique em Salvar e continuar .

  14. (Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.

  15. (Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.

  16. Clique em Save e execute pipeline .

Opção 2: Outras interfaces

Antes de fazer a ingestão usando Databricks ativo Bundles, Databricks APIs, Databricks SDKs ou o Databricks CLI, o senhor deve ter acesso a uma conexão Unity Catalog existente. Para obter instruções, consulte Conectar-se a fontes de ingestão de gerenciar.

Crie o catálogo e o esquema de preparação

O catálogo de teste e o esquema podem ser iguais aos do catálogo e do esquema de destino. O catálogo de teste não pode ser um catálogo estrangeiro.

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Criar o gateway e o pipeline de ingestão

O gateway de ingestão extrai o Snapshot e altera os dados do banco de dados de origem e os armazena no volume de preparação Unity Catalog. O senhor deve executar o gateway como um pipeline contínuo. Isso ajuda a acomodar quaisquer políticas de retenção de log de alterações que o senhor tenha no banco de dados de origem.

A ingestão pipeline aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.

nota

Cada pipeline de ingestão deve estar associado a exatamente um gateway de ingestão.

O pipeline de ingestão não é compatível com mais de um catálogo e esquema de destino. Se o senhor precisar gravar em vários catálogos ou esquemas de destino, crie vários pares gateway-pipeline.

Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.

  1. Crie um novo pacote usando a CLI do Databricks:

    Bash
    databricks bundle init
  2. Adicione dois novos arquivos de recurso ao pacote:

    • Um arquivo de definição de pipeline (resources/sqlserver_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (resources/sqlserver.yml).

    Veja a seguir um exemplo de arquivo resources/sqlserver_pipeline.yml:

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: sqlserver-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <sqlserver-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    pipeline_sqlserver:
    name: sqlserver-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
    source_catalog: test
    source_schema: ingestion_demo
    source_table: lineitem
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: test
    source_schema: ingestion_whole_schema
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    Veja a seguir um exemplo de arquivo resources/sqlserver_job.yml:

    YAML
    resources:
    jobs:
    sqlserver_dab_job:
    name: sqlserver_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
  3. implantado o pipeline usando o Databricks CLI:

    Bash
    databricks bundle deploy

começar, programar e definir alertas em seu pipeline

O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .

  4. Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.

Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.

Verificar se a ingestão de dados foi bem-sucedida

A lista view na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

Verificar a replicação

As colunas Upserted records e Deleted records não são exibidas por default. Você pode ativá-las clicando no Ícone de configuração de colunas botão de configuração das colunas e selecionando-as.

Recurso adicional