Criar um conector personalizado
Beta
Este recurso está em versão Beta. Os administradores do espaço de trabalho podem controlar o acesso a este recurso na página de Pré-visualizações . Veja as prévias do Gerenciador Databricks.
Esta página mostra como criar um conector para uma fonte que ainda não é suportada no LakeFlow Connect . Primeiro, crie e teste seu conector localmente usando as ferramentas e o padrão da comunidade de conectores LakeFlow localizada no GitHub. O repositório inclui ferramentas de desenvolvimento com AIpara auxiliar em cada fase, incluindo pesquisa de código-fonte, configuração de autenticação, implementação e testes.
Quando seu conector personalizado estiver pronto para uso, experimente-o em seu workspace Databricks e, em seguida, registre-o na comunidade abrindo uma solicitação de pull request.
Para usar um conector de comunidade registrado, consulte Usar um conector de comunidade registrado.
Requisitos
Antes de começar, certifique-se de ter:
- Python 3.10 ou superior
- Um espaço de trabalho do Databricks com o Unity Catalog habilitado
- Credenciais da API da fonte à qual você deseja se conectar.
- Git instalado localmente
Configure o repositório
Clone o repositório LakeFlow comunidade Connectors e instale as dependências de desenvolvimento.
-
Clone o repositório:
Bashgit clone https://github.com/databrickslabs/lakeflow-community-connectors.git
cd lakeflow-community-connectors -
Crie um ambiente virtual e instale as dependências:
Bashpython -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]" -
Copie um diretório de conector existente (por exemplo,
connectors/stripe/) como ponto de partida para seu novo conector:Bashcp -r connectors/stripe connectors/<your-source>
Implemente a interface LakeflowConnect
Cada conector de comunidade implementa a interface LakeflowConnect , que define como seu conector se autentica, descobre tabelas, retorna esquemas e lê dados.
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""
def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""
def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""
def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""
def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""
def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""
Descrição dos métodos
Método | Descrição |
|---|---|
| Recebe os parâmetros de conexão como um dicionário e inicializa o cliente da API para sua fonte. |
| Retorna os nomes de todas as tabelas (ou endpoints API ) que seu conector expõe. O Databricks usa essa lista para preencher a interface de seleção de tabelas. |
| Retorna um Spark |
| Retorna um dicionário com |
| Gera registros como dicionários Python e retorna o próximo deslocamento para leituras incrementais. Na primeira execução, |
| Opcional. Implemente este método somente se |
Desenvolva seu conector
Siga estes passos para construir e validar um novo conector:
-
Pesquise a API de origem : Estude as especificações da API de origem, os mecanismos de autenticação, os limites de taxa e os esquemas de dados disponíveis. Identifique quais tabelas ou endpoints devem ser expostos.
-
Configurar autenticação : Gere a especificação de conexão, configure as credenciais para a origem e verifique a conectividade a partir do seu ambiente de desenvolvimento.
-
Implemente o conector : Codifique todos os métodos de interface
LakeflowConnectnecessários para conectar à API de origem e retornar dados no formato esperado. -
Testar e iterar : executar os conjuntos de testes padrão em um sistema de origem real e corrigir quaisquer problemas. Consulte a seção Teste seu conector para obter detalhes.
-
Documente o conector : Escreva um
README.mdvoltado para o usuário e gere o arquivo YAML de especificação do conector que descreve os parâmetros configuráveis do conector. -
Construir o artefato de implantação : executar o script de construção para produzir o artefato de arquivo único que pode ser implantado em um workspace.
Teste seu conector
O repositório oferece diversas abordagens de teste:
Conjunto de testes genéricos (obrigatório)
Conecta-se a uma fonte real usando as credenciais fornecidas para verificar a funcionalidade de ponta a ponta, incluindo autenticação, descoberta de esquema e leitura de dados.
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json
Teste de gravação (recomendado)
Executa ciclos de escrita-leitura-verificação para validar leituras e exclusões incrementais. Isso confirma que seu acompanhamento de deslocamento e lógica CDC funcionam corretamente.
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json
Testes unitários
Escreva testes unitários para qualquer lógica personalizada complexa em seu conector, como tratamento de paginação, coerção de tipo ou recuperação de erros.
Construa o artefato de implantação
Após o seu conector passar nos conjuntos de testes, execute o script merge para gerar um artefato de implantação em arquivo único. O pipeline utiliza este arquivo em tempo de execução em vez do repositório completo.
python tools/scripts/merge_python_source.py --connector <your-source>
Isso gera um arquivo Python independente em dist/<your-source>/ que inclui todo o código do conector e as dependências.
Crie um pipeline de ingestão.
Para testar seu conector:
-
Na barra lateral do seu workspace Databricks , clique em +Novo > Adicionar ou upload dados e, em seguida, selecione + Adicionar conector da comunidade em Conectores da comunidade .
-
No campo Nome da fonte , insira o nome do seu conector.
-
No campo URL do repositório GitHub , insira a URL do repositório GitHub que hospeda o código-fonte do seu conector.
-
Clique em Adicionar Conector .
-
Clique em + Criar conexão ou selecione uma conexão existente e, em seguida, clique em Avançar .
-
No campo "Nome do pipeline" , insira um nome para o pipeline.
-
Em "Local do log de eventos" , insira um nome de catálogo e um nome de esquema. Databricks armazena o log de eventos pipeline aqui. As tabelas ingeridas também são gravadas aqui por default.
-
Em Caminho raiz , insira o caminho do seu workspace (por exemplo,
/Workspace/Users/<your-email>/connectors). O Databricks clona e armazena o código-fonte do conector aqui. -
Clique em Criar pipeline .
-
No editor de pipeline, abra
ingest.pye modifique o campo de objetos para incluir as tabelas que você deseja ingerir. Por exemplo:Pythonfrom databricks.labs.community_connector.pipeline import ingest
pipeline_spec = {
"connection_name": "my_connector_connection", # Required: UC connection name
"objects": [
{"table": {"source_table": "my_table"}},
],
}
ingest(spark, pipeline_spec) -
executar o pipeline manualmente ou programá-lo.
Opções de configuração do pipeline
Você pode configurar as seguintes opções em ingest.py:
Opção | Descrição |
|---|---|
| Obrigatório. O nome da conexão que armazena as credenciais de autenticação da origem. |
| Obrigatório. Uma lista de tabelas a serem ingeridas. Cada entrada tem o formato |
| O catálogo onde as tabelas ingeridas são gravadas. Por padrão, utiliza-se o catálogo definido durante a criação pipeline . |
| O esquema onde as tabelas ingeridas são gravadas. Por padrão, será utilizado o esquema definido durante a criação pipeline . |
| A estratégia de dimensões que mudam lentamente (SCD): |
| Substituir a chave primária default de uma tabela. Forneça uma lista com os nomes das colunas. |
Registre seu conector
Após criar e testar seu conector, abra uma solicitação de pull no repositório de Conectores da comunidadeLakeFlow para disponibilizá-lo à comunidade.