Skip to main content

Create a custom connector

Beta

This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.

This page shows how to create a connector for a source that isn't supported in Lakeflow Connect yet. First, build and test your connector locally using the tools and templates in the Lakeflow Community Connectors repository on GitHub. The repository includes AI-powered development tools to assist with each phase, including source research, authentication setup, implementation, and testing.

When your custom connector is ready to use, try it in your Databricks workspace, then register it with the community by opening a pull request.

To use a registered community connector, see Use a registered community connector.

Requirements

Before you start, make sure you have:

  • Python 3.10 or above
  • A Databricks workspace with Unity Catalog enabled
  • API credentials for the source you want to connect to
  • Git installed locally

Set up the repository

Clone the Lakeflow Community Connectors repository and install the development dependencies.

  1. Clone the repository:

    Bash
    git clone https://github.com/databrickslabs/lakeflow-community-connectors.git
    cd lakeflow-community-connectors
  2. Create a virtual environment and install dependencies:

    Bash
    python -m venv .venv
    source .venv/bin/activate
    pip install -e ".[dev]"
  3. Copy an existing connector directory (for example, connectors/stripe/) as a starting point for your new connector:

    Bash
    cp -r connectors/stripe connectors/<your-source>

Implement the LakeflowConnect interface

Each community connector implements the LakeflowConnect interface, which defines how your connector authenticates, discovers tables, returns schemas, and reads data.

Python
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""

def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""

def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""

def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""

def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""

def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""

Method descriptions

Method

Description

__init__

Receives the connection parameters as a dictionary and initializes the API client for your source.

list_tables

Returns the names of all tables (or API endpoints) your connector exposes. Databricks uses this list to populate the table selection UI.

get_table_schema

Returns a Spark StructType describing the schema for the given table. Called before the first pipeline run and on each run when schema evolution is enabled.

read_table_metadata

Returns a dictionary with primary_keys, cursor_field, and ingestion_type. The ingestion_type must be one of snapshot, cdc, cdc_with_deletes, or append.

read_table

Yields records as Python dictionaries and returns the next offset for incremental reads. On the first run, start_offset is empty. On subsequent runs, it contains the offset returned by the previous run.

read_table_deletes

Optional. Only implement this method if ingestion_type is cdc_with_deletes. Yields deleted record keys and returns the next offset.

Develop your connector

Follow these steps to build and validate a new connector:

  1. Research the source API: Study the source's API specifications, authentication mechanisms, rate limits, and available data schemas. Identify which tables or endpoints to expose.

  2. Set up authentication: Generate the connection specification, configure credentials for the source, and verify connectivity from your development environment.

  3. Implement the connector: Code all required LakeflowConnect interface methods to connect to the source API and return data in the expected format.

  4. Test and iterate: Run the standard test suites against a real source system and fix any issues. See Test your connector for details.

  5. Document the connector: Write a user-facing README.md and generate the connector spec YAML file that describes the connector's configurable parameters.

  6. Build the deployment artifact: Run the build script to produce the single-file artifact that can be deployed in a workspace.

Test your connector

The repository provides several testing approaches:

Generic test suite (required)

Connects to a real source using your provided credentials to verify end-to-end functionality, including authentication, schema discovery, and data reads.

Bash
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json

Executes write-read-verify cycles to validate incremental reads and deletes. This confirms that your offset tracking and CDC logic work correctly.

Bash
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json

Unit tests

Write unit tests for any complex custom logic in your connector, such as pagination handling, type coercion, or error recovery.

Build the deployment artifact

After your connector passes the test suites, run the merge script to generate a single-file deployment artifact. The pipeline uses this file at runtime rather than the full repository.

Bash
python tools/scripts/merge_python_source.py --connector <your-source>

This produces a self-contained Python file in dist/<your-source>/ that includes all connector code and dependencies.

Create an ingestion pipeline

To try your connector:

  1. In the sidebar of your Databricks workspace, click +New > Add or upload data, then select + Add Community Connector under Community connectors.

  2. For Source name, enter the name of your connector.

  3. For GitHub repository URL, enter the URL of the GitHub repository that hosts your connector source code.

  4. Click Add Connector.

  5. Click + Create connection or select an existing connection, then click Next.

  6. For Pipeline name, enter a name for the pipeline.

  7. For Event log location, enter a catalog name and a schema name. Databricks stores the pipeline event log here. Ingested tables are also written here by default.

  8. For Root path, enter your workspace path (for example, /Workspace/Users/<your-email>/connectors). Databricks clones and stores the connector source code here.

  9. Click Create pipeline.

  10. In the pipeline editor, open ingest.py and modify the objects field to include the tables you want to ingest. For example:

    Python
    from databricks.labs.community_connector.pipeline import ingest

    pipeline_spec = {
    "connection_name": "my_connector_connection", # Required: UC connection name
    "objects": [
    {"table": {"source_table": "my_table"}},
    ],
    }

    ingest(spark, pipeline_spec)
  11. Run the pipeline manually or schedule it.

Pipeline configuration options

You can configure the following options in ingest.py:

Option

Description

connection_name

Required. The name of the connection that stores authentication credentials for the source.

objects

Required. A list of tables to ingest. Each entry has the format {"table": {"source_table": "..."}}. You can also specify an optional destination_table inside the table object.

destination_catalog

The catalog where ingested tables are written. Defaults to the catalog set during pipeline creation.

destination_schema

The schema where ingested tables are written. Defaults to the schema set during pipeline creation.

scd_type

The slowly changing dimension strategy: SCD_TYPE_1, SCD_TYPE_2, or APPEND_ONLY. Defaults to SCD_TYPE_1.

primary_keys

Override the default primary keys for a table. Provide a list of column names.

Register your connector

After you build and test your connector, open a pull request in the Lakeflow Community Connectors repository to make it available to the community.