Crie um pipeline de ingestão baseado em consultas.
Visualização
Este recurso está em Pré-visualização Pública.
Esta página mostra como criar um pipeline de ingestão baseado em consultas no LakeFlow Connect.
Requisitos
Antes de criar um pipeline de ingestão baseado em consultas, você deve primeiro atender aos seguintes requisitos:
- Unity Catalog está habilitado para seu workspace Databricks .
- Seu ambiente compute serverless permite conectividade de rede com o banco de dados de origem. Consulte a seção Redes e as recomendações de rede para a Lakehouse Federation.
- Para ingestão de conexão externa : Você tem uma conexão existente com o banco de dados de origem ou privilégios
CREATE CONNECTIONno metastore. Consulte Conectar para gerenciar fontes de ingestão. - Para ingestão de catálogos externos : Você precisa ter um catálogo externo já registrado na Federação Lakehouse ou ter privilégios para criar um.
- Você tem privilégios
CREATEeUSE SCHEMAno catálogo e esquema de destino.
Opção 1: Ingestão de conexão estrangeira
Utilize essa abordagem quando você tiver uma conexão que armazena credenciais de autenticação para o banco de dados de origem. As fontes de dados suportadas incluem Oracle, Teradata, SQL Server, MySQL, MariaDB e PostgreSQL.
- Databricks UI
- Declarative Automation Bundles
A interface do usuário Databricks oferece suporte à criação pipeline baseados em consultas. Para implantar com compute clássica (Beta), use Declarative Automation Bundles.
-
Na barra lateral workspace Databricks , clique em inserção de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique na sua fonte (por exemplo, Oracle ou SQL Server ). O assistente de ingestão é aberto.
-
Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.
-
Em Catálogo de destino , selecione um catálogo Unity Catalog para armazenar os dados recebidos.
-
Selecione a conexão do Unity Catalog que armazena as credenciais necessárias para acessar o banco de dados de origem.
Se não houver nenhuma conexão existente, clique em Criar conexão e insira os detalhes da conexão. Você deve ter privilégios
CREATE CONNECTIONno metastore. -
Clique em Criar pipeline e continue .
-
Na página Origem , selecione os esquemas e tabelas a serem importados.
-
Para cada tabela, especifique a coluna do cursor . Deve ser uma única coluna com valores que aumentam monotonicamente (por exemplo,
updated_atourow_id). Se você não selecionar uma coluna de cursor que aumente monotonicamente, o conector realizará uma carga completa em cada execução. -
Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).
-
Clique em Avançar .
-
Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.
Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios
USE CATALOGeCREATE SCHEMAno catálogo pai. -
Clique em Salvar e continuar .
-
(Opcional) Na página Configurações , clique em Criar programa e defina a frequência refresh .
-
(Opcional) Configure notificações email para sucesso ou falha pipeline .
-
Clique em Salvar e pipelinede execução .
Implantei um pipeline de ingestão baseado em consultas usando Declarative Automation Bundles. Os pacotes contêm definições YAML de pipeline e Job, são gerenciados com a CLI Databricks e podem ser implantados em vários espaços de trabalho de destino. Para mais informações, consulte O que são pacotes de automação declarativa?.
-
Criar um novo pacote:
Bashdatabricks bundle init -
Adicione um arquivo de definição de pipeline ao pacote (por exemplo,
resources/query_based_pipeline.yml):YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_query_based:
name: query-based-ingestion-pipeline
ingestion_definition:
connection_name: <your-uc-connection-name>
objects:
- table:
source_catalog: <source-catalog>
source_schema: <source-schema>
source_table: <source-table>
table_configuration:
query_based_connector_config:
cursor_columns:
- updated_at
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog} -
Adicione um arquivo de definição de tarefa que controle o programa de ingestão (por exemplo,
resources/query_based_job.yml):YAMLresources:
jobs:
query_based_job:
name: query_based_job
trigger:
periodic:
interval: 1
unit: HOURS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_query_based.id} -
implantar o pacote:
Bashdatabricks bundle deploy
Opção 2: Ingestão de catálogo estrangeiro
Use essa abordagem quando quiser importar dados de um catálogo externo registrado na Federação Lakehouse. A ingestão de catálogo estrangeiro oferece suporte a todas as fontes de dados da Lakehouse Federation e ao acompanhamento de exclusão.
- Databricks UI
- Declarative Automation Bundles
-
Na barra lateral workspace Databricks , clique em inserção de dados .
-
Na página Adicionar dados , em Conectores do Databricks , clique na sua fonte. O assistente de ingestão é aberto.
-
Na página do pipeline de ingestão , insira um nome exclusivo para o pipeline.
-
Em Catálogo de destino , selecione um catálogo Unity Catalog para armazenar os dados recebidos.
-
Em Tipo de conexão , selecione Catálogo estrangeiro e escolha o catálogo estrangeiro registrado na Federação Lakehouse.
-
Clique em Criar pipeline e continue .
-
Na página Origem , selecione os esquemas e tabelas a serem importados.
-
Para cada tabela, especifique a coluna do cursor . Deve ser uma única coluna com valores que aumentam monotonicamente (por exemplo,
updated_atourow_id). -
Opcionalmente, altere a configuração default da história acompanhamento. Para mais informações, consulte Habilitar história acompanhamento (SCD tipo 2).
-
Clique em Avançar .
-
Na página Destino , selecione o catálogo e o esquema do Unity Catalog nos quais deseja gravar.
Se não quiser usar um esquema existente, clique em Criar esquema . Você deve ter privilégios
USE CATALOGeCREATE SCHEMAno catálogo pai. -
Clique em Salvar e continuar .
-
(Opcional) Na página Configurações , clique em Criar programa e defina a frequência refresh .
-
(Opcional) Configure notificações email para sucesso ou falha pipeline .
-
Clique em Salvar e pipelinede execução .
-
Criar um novo pacote:
Bashdatabricks bundle init -
Adicione um arquivo de definição de pipeline ao pacote (por exemplo,
resources/foreign_catalog_pipeline.yml):YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
resources:
pipelines:
pipeline_foreign_catalog:
name: foreign-catalog-ingestion-pipeline
ingestion_definition:
ingest_from_uc_foreign_catalog: true
objects:
- table:
source_catalog: <foreign-catalog-name>
source_schema: <source-schema>
source_table: <source-table>
cursor_columns:
- updated_at
primary_keys:
- id
deletion_condition: 'deleted_at IS NOT NULL'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog} -
Adicione um arquivo de definição de tarefa (por exemplo,
resources/foreign_catalog_job.yml):YAMLresources:
jobs:
foreign_catalog_job:
name: foreign_catalog_job
trigger:
periodic:
interval: 1
unit: HOURS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_foreign_catalog.id} -
implantar o pacote:
Bashdatabricks bundle deploy
Configurar acompanhamento incremental
Os conectores baseados em consultas usam uma coluna de cursor para determinar quais linhas são novas ou foram atualizadas desde a última execução do pipeline. A escolha da coluna do cursor é crucial para uma ingestão incremental eficaz.
Ao selecionar uma coluna do cursor, considere o seguinte:
- Utilize uma coluna de registro de data e hora, se possível. Colunas como
updated_atoulast_modifiedsão ideais porque refletem diretamente quando uma linha foi alterada pela última vez. - IDs inteiros funcionam para fontes somente de acréscimo. Se as linhas nunca forem atualizadas, uma coluna de ID de incremento automático (como
idourow_id) pode servir como cursor. Não utilize um ID inteiro como cursor se as linhas puderem ser atualizadas sem alterar o ID. - A coluna deve aumentar monotonicamente. Os valores nunca devem diminuir. Se a coluna puder ser definida para um valor anterior (por exemplo, por meio de um preenchimento retroativo), as linhas gravadas antes do limite máximo anterior não serão reinseridas.
- Você só pode especificar uma única coluna de cursor. Não é possível especificar várias colunas como um cursor composto.
Depois que o conector armazena a marca d'água alta do cursor, ele usa a marca d'água alta como o filtro de limite inferior (cursor_column > last_value) na próxima execução. Linhas com valor de cursor NULL não são incluídas.
Configurar história envio (SCD)
Para acompanhar todo o histórico de alterações de linhas nas tabelas de destino, configure SCD tipo 2. Consulte Ativar acompanhamento de história (SCD tipo 2).
Padrões comuns
Para configurações avançadas pipeline , consulte Padrões comuns para gerenciar pipeline de ingestão.