Pular para o conteúdo principal

Ingerir dados do SQL Server

Esta página descreve como ingerir dados de SQL Server e carregá-los em Databricks usando LakeFlow Connect. O conector do SQL Server é compatível com os bancos de dados SQL do Azure SQL e do Amazon RDS. Isso inclui o SQL Server em execução nas máquinas virtuais (VMs) do Azure e no Amazon EC2. O conector também oferece suporte ao SQL Server no local usando o Azure ExpressRoute e a rede AWS Direct Connect.

Antes de começar

Para criar um pipeline de ingestão, o senhor deve atender aos seguintes requisitos:

  • Seu workspace está habilitado para Unity Catalog.

  • O compute sem servidor está habilitado para o seu workspace. Consulte os requisitos do compute sem servidor.

  • Se você planeja criar uma conexão: Você tem privilégios CREATE CONNECTION na metastore.

    Se o seu conector for compatível com a criação de pipeline com base na interface do usuário, o senhor poderá criar a conexão e o pipeline ao mesmo tempo, concluindo as etapas desta página. No entanto, se o senhor usar a criação de pipeline baseada em API, deverá criar a conexão no Catalog Explorer antes de concluir as etapas desta página. Consulte Conectar-se a fontes de ingestão de gerenciar.

  • Se você planeja usar uma conexão existente: Você tem privilégios USE CONNECTION ou ALL PRIVILEGES na conexão.

  • Você tem privilégios USE CATALOG no catálogo de destino.

  • Você tem privilégios USE SCHEMA, CREATE TABLE e CREATE VOLUME em um esquema existente ou privilégios CREATE SCHEMA no catálogo de destino.

  • O senhor tem acesso a uma instância primária do SQL Server. Os recursos de acompanhamento de alterações e captura de dados de alterações (CDC) não são suportados em réplicas de leitura ou instâncias secundárias.

  • Permissões irrestritas para criar clustering ou uma política personalizada. Uma política personalizada deve atender aos seguintes requisitos:

    • Família: Job compute

    • A família de políticas substitui:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • Databricks recomenda especificar o menor número possível de nós worker para gateways de ingestão porque eles não afetam o desempenho do gateway. A política compute a seguir permite que o Databricks dimensione o gateway de ingestão para atender às necessidades de sua carga de trabalho. O requisito mínimo é de 8 núcleos para permitir a extração eficiente e eficiente de dados do seu banco de dados de origem.

      Python
      {
      "driver_node_type_id": {
      "type": "fixed",
      "value": "n2-highmem-64"
      },
      "node_type_id": {
      "type": "fixed",
      "value": "n2-standard-4"
      }
      }

    Para obter mais informações sobre a política de cluster, consulte Selecionar uma política de compute.

Para ingerir a partir do SQL Server, o senhor também deve concluir a configuração da fonte.

Opção 1: UI da Databricks

Os usuários administradores podem criar uma conexão e um pipeline ao mesmo tempo na interface do usuário. Essa é a maneira mais simples de criar um pipeline de ingestão gerencial.

  1. Na barra lateral do site Databricks workspace, clique em ingestão de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique em SQL Server .

    O assistente de ingestão é aberto.

  3. Na página Ingestion Gateway do assistente, insira um nome exclusivo para o gateway.

  4. Selecione um catálogo e um esquema para os dados de ingestão intermediária e clique em Avançar.

  5. Na página Ingestion pipeline (Pipeline de ingestão ), digite um nome exclusivo para o pipeline.

  6. Em Catálogo de destino , selecione um catálogo para armazenar os dados ingeridos.

  7. Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar os dados de origem.

    Se não houver conexões existentes com a origem, clique em Criar conexão e insira os detalhes de autenticação obtidos na configuração da fonte. Você deve ter privilégios CREATE CONNECTION na metastore.

  8. Clique em Create pipeline (Criar pipeline) e continue .

  9. Na página Origem , selecione as tabelas a serem ingeridas.

  10. Opcionalmente, altere a configuração default história acompanhamento. Para obter mais informações, consulte Enable história acompanhamento (SCD type 2).

  11. Clique em Avançar .

  12. Na página Destination (Destino ), selecione o catálogo e o esquema do Unity Catalog para gravar.

    Se você não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo principal.

  13. Clique em Salvar e continuar .

  14. (Opcional) Na página Settings (Configurações ), clique em Create programar (Criar programa ). Defina a frequência para refresh as tabelas de destino.

  15. (Opcional) Defina as notificações do site email para o sucesso ou fracasso das operações do pipeline.

  16. Clique em Save e execute pipeline .

Opção 2: Outras interfaces

Antes de fazer a ingestão usando Databricks ativo Bundles, Databricks APIs, Databricks SDKs ou o Databricks CLI, o senhor deve ter acesso a uma conexão Unity Catalog existente. Para obter instruções, consulte Conectar-se a fontes de ingestão de gerenciar.

Crie o catálogo e o esquema de preparação

O catálogo de teste e o esquema podem ser iguais aos do catálogo e do esquema de destino. O catálogo de teste não pode ser um catálogo estrangeiro.

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Criar o gateway e o pipeline de ingestão

O gateway de ingestão extrai o Snapshot e altera os dados do banco de dados de origem e os armazena no volume de preparação Unity Catalog. O senhor deve executar o gateway como um pipeline contínuo. Isso ajuda a acomodar quaisquer políticas de retenção de log de alterações que o senhor tenha no banco de dados de origem.

A ingestão pipeline aplica o Snapshot e altera os dados do volume de preparação nas tabelas de transmissão de destino.

Este tab descreve como implantar uma ingestão pipeline usando Databricks ativo Bundles. Os bundles podem conter definições YAML de Job e tarefa, são gerenciados usando o Databricks CLI e podem ser compartilhados e executados em diferentes espaços de trabalho de destino (como desenvolvimento, preparação e produção). Para obter mais informações, consulte Databricks ativo Bundles.

  1. Crie um novo pacote usando a CLI do Databricks:

    Bash
    databricks bundle init
  2. Adicione dois novos arquivos de recurso ao pacote:

    • Um arquivo de definição de pipeline (resources/sqlserver_pipeline.yml).
    • Um arquivo de fluxo de trabalho que controla a frequência da ingestão de dados (resources/sqlserver.yml).

    Veja a seguir um exemplo de arquivo resources/sqlserver_pipeline.yml:

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: sqlserver-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <sqlserver-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    pipeline_sqlserver:
    name: sqlserver-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
    source_catalog: test
    source_schema: ingestion_demo
    source_table: lineitem
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: test
    source_schema: ingestion_whole_schema
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    Veja a seguir um exemplo de arquivo resources/sqlserver_job.yml:

    YAML
    resources:
    jobs:
    sqlserver_dab_job:
    name: sqlserver_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
  3. implantado o pipeline usando o Databricks CLI:

    Bash
    databricks bundle deploy

começar, programar e definir alertas em seu pipeline

O senhor pode criar um programa para o pipeline na página de detalhes do pipeline.

  1. Depois que o pipeline tiver sido criado, acesse novamente o Databricks workspace e clique em pipeline .

    O novo pipeline aparece na lista pipeline.

  2. Para acessar view os detalhes de pipeline, clique no nome pipeline.

  3. Na página de detalhes do pipeline, o senhor pode programar o pipeline clicando em programar .

  4. Para definir notificações no pipeline, clique em Settings (Configurações ) e, em seguida, adicione uma notificação.

Para cada programa que o senhor adicionar a um pipeline, o LakeFlow Connect cria automaticamente um Job para ele. A ingestão pipeline é uma tarefa dentro do trabalho. Opcionalmente, o senhor pode adicionar mais tarefas ao trabalho.

Verificar se a ingestão de dados foi bem-sucedida

A lista view na página de detalhes pipeline mostra o número de registros processados à medida que os dados são ingeridos. Esses números refresh automaticamente.

Verificar a replicação

As colunas Upserted records e Deleted records não são exibidas por default. Você pode ativá-las clicando no Ícone de configuração de colunas botão de configuração das colunas e selecionando-as.

Recurso adicional