Pular para o conteúdo principal

Criar um conector personalizado

info

Beta

Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.

Esta página mostra como criar um conector para uma fonte que ainda não é suportada no LakeFlow Connect . Primeiro, crie e teste seu conector localmente usando as ferramentas e o padrão da comunidade de conectores LakeFlow localizada no GitHub. O repositório inclui ferramentas de desenvolvimento com AIpara auxiliar em cada fase, incluindo pesquisa de código-fonte, configuração de autenticação, implementação e testes.

Quando seu conector personalizado estiver pronto para uso, experimente-o em seu workspace Databricks e, em seguida, registre-o na comunidade abrindo uma solicitação de pull request.

Para usar um conector de comunidade registrado, consulte Usar um conector de comunidade registrado.

Requisitos

Antes de começar, certifique-se de ter:

  • Python 3.10 ou superior
  • Um espaço de trabalho do Databricks com o Unity Catalog habilitado
  • Credenciais da API da fonte à qual você deseja se conectar.
  • Git instalado localmente

Configure o repositório

Clone o repositório LakeFlow comunidade Connectors e instale as dependências de desenvolvimento.

  1. Clone o repositório:

    Bash
    git clone https://github.com/databrickslabs/lakeflow-community-connectors.git
    cd lakeflow-community-connectors
  2. Crie um ambiente virtual e instale as dependências:

    Bash
    python -m venv .venv
    source .venv/bin/activate
    pip install -e ".[dev]"
  3. Copie um diretório de conector existente (por exemplo, connectors/stripe/) como ponto de partida para seu novo conector:

    Bash
    cp -r connectors/stripe connectors/<your-source>

Implemente a interface LakeflowConnect

Cada conector de comunidade implementa a interface LakeflowConnect , que define como seu conector se autentica, descobre tabelas, retorna esquemas e lê dados.

Python
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""

def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""

def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""

def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""

def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""

def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""

Descrição dos métodos

Método

Descrição

__init__

Recebe os parâmetros de conexão como um dicionário e inicializa o cliente da API para sua fonte.

list_tables

Retorna os nomes de todas as tabelas (ou endpoints API ) que seu conector expõe. O Databricks usa essa lista para preencher a interface de seleção de tabelas.

get_table_schema

Retorna um Spark StructType descrevendo o esquema da tabela fornecida. Chamado antes da primeira execução do pipeline e em cada execução quando a evolução do esquema está habilitada.

read_table_metadata

Retorna um dicionário com primary_keys, cursor_field e ingestion_type. O ingestion_type deve ser um dos snapshot, cdc, cdc_with_deletes ou append.

read_table

Gera registros como dicionários Python e retorna o próximo deslocamento para leituras incrementais. Na primeira execução, start_offset está vazio. Na execução subsequente, contém o deslocamento retornado pela execução anterior.

read_table_deletes

Opcional. Implemente este método somente se ingestion_type for cdc_with_deletes. Retorna a chave do registro excluído e o próximo deslocamento.

Desenvolva seu conector

Siga estes passos para construir e validar um novo conector:

  1. Pesquise a API de origem : Estude as especificações da API de origem, os mecanismos de autenticação, os limites de taxa e os esquemas de dados disponíveis. Identifique quais tabelas ou endpoints devem ser expostos.

  2. Configurar autenticação : Gere a especificação de conexão, configure as credenciais para a origem e verifique a conectividade a partir do seu ambiente de desenvolvimento.

  3. Implemente o conector : Codifique todos os métodos de interface LakeflowConnect necessários para conectar à API de origem e retornar dados no formato esperado.

  4. Testar e iterar : executar os conjuntos de testes padrão em um sistema de origem real e corrigir quaisquer problemas. Consulte a seção Teste seu conector para obter detalhes.

  5. Documente o conector : Escreva um README.md voltado para o usuário e gere o arquivo YAML de especificação do conector que descreve os parâmetros configuráveis do conector.

  6. Construir o artefato de implantação : executar o script de construção para produzir o artefato de arquivo único que pode ser implantado em um workspace.

Teste seu conector

O repositório oferece diversas abordagens de teste:

Conjunto de testes genéricos (obrigatório)

Conecta-se a uma fonte real usando as credenciais fornecidas para verificar a funcionalidade de ponta a ponta, incluindo autenticação, descoberta de esquema e leitura de dados.

Bash
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json

Teste de gravação (recomendado)

Executa ciclos de escrita-leitura-verificação para validar leituras e exclusões incrementais. Isso confirma que seu acompanhamento de deslocamento e lógica CDC funcionam corretamente.

Bash
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json

Testes unitários

Escreva testes unitários para qualquer lógica personalizada complexa em seu conector, como tratamento de paginação, coerção de tipo ou recuperação de erros.

Construa o artefato de implantação

Após o seu conector passar nos conjuntos de testes, execute o script merge para gerar um artefato de implantação em arquivo único. O pipeline utiliza este arquivo em tempo de execução em vez do repositório completo.

Bash
python tools/scripts/merge_python_source.py --connector <your-source>

Isso gera um arquivo Python independente em dist/<your-source>/ que inclui todo o código do conector e as dependências.

Crie um pipeline de ingestão.

Para testar seu conector:

  1. Na barra lateral do seu workspace Databricks , clique em +Novo > Adicionar ou upload dados e, em seguida, selecione + Adicionar conector da comunidade em Conectores da comunidade .

  2. No campo Nome da fonte , insira o nome do seu conector.

  3. No campo URL do repositório GitHub , insira a URL do repositório GitHub que hospeda o código-fonte do seu conector.

  4. Clique em Adicionar Conector .

  5. Clique em + Criar conexão ou selecione uma conexão existente e, em seguida, clique em Avançar .

  6. No campo "Nome do pipeline" , insira um nome para o pipeline.

  7. Em "Local do log de eventos" , insira um nome de catálogo e um nome de esquema. Databricks armazena o log de eventos pipeline aqui. As tabelas ingeridas também são gravadas aqui por default.

  8. Em Caminho raiz , insira o caminho do seu workspace (por exemplo, /Workspace/Users/<your-email>/connectors). O Databricks clona e armazena o código-fonte do conector aqui.

  9. Clique em Criar pipeline .

  10. No editor de pipeline, abra ingest.py e modifique o campo de objetos para incluir as tabelas que você deseja ingerir. Por exemplo:

    Python
    from databricks.labs.community_connector.pipeline import ingest

    pipeline_spec = {
    "connection_name": "my_connector_connection", # Required: UC connection name
    "objects": [
    {"table": {"source_table": "my_table"}},
    ],
    }

    ingest(spark, pipeline_spec)
  11. executar o pipeline manualmente ou programá-lo.

Opções de configuração do pipeline

Você pode configurar as seguintes opções em ingest.py:

Opção

Descrição

connection_name

Obrigatório. O nome da conexão que armazena as credenciais de autenticação da origem.

objects

Obrigatório. Uma lista de tabelas a serem ingeridas. Cada entrada tem o formato {"table": {"source_table": "..."}}. Você também pode especificar um destination_table opcional dentro do objeto table .

destination_catalog

O catálogo onde as tabelas ingeridas são gravadas. Por padrão, utiliza-se o catálogo definido durante a criação pipeline .

destination_schema

O esquema onde as tabelas ingeridas são gravadas. Por padrão, será utilizado o esquema definido durante a criação pipeline .

scd_type

A estratégia de dimensões que mudam lentamente (SCD): SCD_TYPE_1, SCD_TYPE_2 ou APPEND_ONLY. por padrão é SCD_TYPE_1.

primary_keys

Substituir a chave primária default de uma tabela. Forneça uma lista com os nomes das colunas.

Registre seu conector

Após criar e testar seu conector, abra uma solicitação de pull no repositório de Conectores da comunidadeLakeFlow para disponibilizá-lo à comunidade.