Pular para o conteúdo principal

Crie um pipeline de ingestão baseado em consultas.

info

Visualização

Este recurso está em Pré-visualização Pública.

Esta página mostra como criar um pipeline de ingestão baseado em consultas no LakeFlow Connect.

Requisitos

Antes de criar um pipeline de ingestão baseado em consultas, você deve primeiro atender aos seguintes requisitos:

  • Unity Catalog está habilitado para seu workspace Databricks .
  • Seu ambiente compute serverless permite conectividade de rede com o banco de dados de origem. Consulte a seção Redes e as recomendações de rede para a Lakehouse Federation.
  • Para ingestão de conexão externa : Você tem uma conexão existente com o banco de dados de origem ou privilégios CREATE CONNECTION no metastore. Consulte Conectar para gerenciar fontes de ingestão.
  • Para ingestão de catálogos externos : Você precisa ter um catálogo externo já registrado na Federação Lakehouse ou ter privilégios para criar um.
  • Você tem privilégios CREATE e USE SCHEMA no catálogo e esquema de destino.

Opção 1: Ingestão de conexão estrangeira

Utilize essa abordagem quando você tiver uma conexão que armazena credenciais de autenticação para o banco de dados de origem. As fontes de dados suportadas incluem Oracle, Teradata, SQL Server, MySQL, MariaDB e PostgreSQL.

A interface do usuário Databricks oferece suporte à criação pipeline baseados em consultas. Para implantar com compute clássica (Beta), use Declarative Automation Bundles.

  1. Na barra lateral workspace Databricks , clique em inserção de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique na sua fonte (por exemplo, Oracle ou SQL Server ). O assistente de ingestão é aberto.

  3. Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.

  4. Em Catálogo de destino , selecione um catálogo Unity Catalog para armazenar os dados recebidos.

  5. Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar o banco de dados de origem.

    Se não houver nenhuma conexão existente, clique em Criar conexão e insira os detalhes da conexão. Você deve ter privilégios CREATE CONNECTION no metastore.

  6. Clique em Criar pipeline e continue .

  7. Na página Origem , selecione os esquemas e tabelas a serem importados.

  8. Para cada tabela, especifique a coluna do cursor . Deve ser uma única coluna com valores que aumentam monotonicamente (por exemplo, updated_at ou row_id). Se você não selecionar uma coluna de cursor que aumente monotonicamente, o conector realizará uma carga completa em cada execução.

  9. Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).

  10. Clique em Avançar .

  11. Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.

    Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo pai.

  12. Clique em Salvar e continuar .

  13. (Opcional) Na página Configurações , clique em Criar programa e defina a frequência refresh .

  14. (Opcional) Configure notificações email para sucesso ou falha pipeline .

  15. Clique em Salvar e pipelinede execução .

Opção 2: Ingestão de catálogo estrangeiro

Use essa abordagem quando quiser importar dados de um catálogo externo registrado na Federação Lakehouse. A ingestão de catálogo estrangeiro oferece suporte a todas as fontes de dados da Lakehouse Federation e ao acompanhamento de exclusão.

  1. Na barra lateral workspace Databricks , clique em inserção de dados .

  2. Na página Adicionar dados , em Conectores do Databricks , clique na sua fonte. O assistente de ingestão é aberto.

  3. Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.

  4. Em Catálogo de destino , selecione um catálogo Unity Catalog para armazenar os dados recebidos.

  5. Em Tipo de conexão , selecione Catálogo estrangeiro e escolha o catálogo estrangeiro registrado na Federação Lakehouse.

  6. Clique em Criar pipeline e continue .

  7. Na página Origem , selecione os esquemas e tabelas a serem importados.

  8. Para cada tabela, especifique a coluna do cursor . Deve ser uma única coluna com valores que aumentam monotonicamente (por exemplo, updated_at ou row_id).

  9. Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).

  10. Clique em Avançar .

  11. Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.

    Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios USE CATALOG e CREATE SCHEMA no catálogo pai.

  12. Clique em Salvar e continuar .

  13. (Opcional) Na página Configurações , clique em Criar programa e defina a frequência refresh .

  14. (Opcional) Configure notificações email para sucesso ou falha pipeline .

  15. Clique em Salvar e pipelinede execução .

Configurar acompanhamento incremental

Os conectores baseados em consultas usam uma coluna de cursor para determinar quais linhas são novas ou foram atualizadas desde a última execução do pipeline. A escolha da coluna do cursor é crucial para uma ingestão incremental eficaz.

Ao selecionar uma coluna do cursor, considere o seguinte:

  • Utilize uma coluna de registro de data e hora, se possível. Colunas como updated_at ou last_modified são ideais porque refletem diretamente quando uma linha foi alterada pela última vez.
  • IDs inteiros funcionam para fontes somente de acréscimo. Se as linhas nunca forem atualizadas, uma coluna de ID de incremento automático (como id ou row_id) pode servir como cursor. Não utilize um ID inteiro como cursor se as linhas puderem ser atualizadas sem alterar o ID.
  • A coluna deve aumentar monotonicamente. Os valores nunca devem diminuir. Se a coluna puder ser definida para um valor anterior (por exemplo, por meio de um preenchimento retroativo), as linhas gravadas antes do limite máximo anterior não serão reinseridas.
  • Você só pode especificar uma única coluna de cursor. Não é possível especificar várias colunas como um cursor composto.

Depois que o conector armazena a marca d'água alta do cursor, ele usa a marca d'água alta como o filtro de limite inferior (cursor_column > last_value) na próxima execução. Linhas com valor de cursor NULL não são incluídas.

Configurar história envio (SCD)

Para acompanhar todo o histórico de alterações de linhas nas tabelas de destino, configure SCD tipo 2. Consulte Ativar acompanhamento de história (SCD tipo 2).

Padrões comuns

Para configurações avançadas pipeline , consulte Padrões comuns para gerenciar pipeline de ingestão.

Recursos adicionais