Create a custom connector
This feature is in Beta. Workspace admins can control access to this feature from the Previews page. See Manage Databricks previews.
This page shows how to create a connector for a source that isn't supported in Lakeflow Connect yet. First, build and test your connector locally using the tools and templates in the Lakeflow Community Connectors repository on GitHub. The repository includes AI-powered development tools to assist with each phase, including source research, authentication setup, implementation, and testing.
When your custom connector is ready to use, try it in your Databricks workspace, then register it with the community by opening a pull request.
To use a registered community connector, see Use a registered community connector.
Requirements
Before you start, make sure you have:
- Python 3.10 or above
- A Databricks workspace with Unity Catalog enabled
- API credentials for the source you want to connect to
- Git installed locally
Set up the repository
Clone the Lakeflow Community Connectors repository and install the development dependencies.
-
Clone the repository:
Bashgit clone https://github.com/databrickslabs/lakeflow-community-connectors.git
cd lakeflow-community-connectors -
Create a virtual environment and install dependencies:
Bashpython -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]" -
Copy an existing connector directory (for example,
connectors/stripe/) as a starting point for your new connector:Bashcp -r connectors/stripe connectors/<your-source>
Implement the LakeflowConnect interface
Each community connector implements the LakeflowConnect interface, which defines how your connector authenticates, discovers tables, returns schemas, and reads data.
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""
def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""
def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""
def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""
def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""
def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""
Method descriptions
Method | Description |
|---|---|
| Receives the connection parameters as a dictionary and initializes the API client for your source. |
| Returns the names of all tables (or API endpoints) your connector exposes. Databricks uses this list to populate the table selection UI. |
| Returns a Spark |
| Returns a dictionary with |
| Yields records as Python dictionaries and returns the next offset for incremental reads. On the first run, |
| Optional. Only implement this method if |
Develop your connector
Follow these steps to build and validate a new connector:
-
Research the source API: Study the source's API specifications, authentication mechanisms, rate limits, and available data schemas. Identify which tables or endpoints to expose.
-
Set up authentication: Generate the connection specification, configure credentials for the source, and verify connectivity from your development environment.
-
Implement the connector: Code all required
LakeflowConnectinterface methods to connect to the source API and return data in the expected format. -
Test and iterate: Run the standard test suites against a real source system and fix any issues. See Test your connector for details.
-
Document the connector: Write a user-facing
README.mdand generate the connector spec YAML file that describes the connector's configurable parameters. -
Build the deployment artifact: Run the build script to produce the single-file artifact that can be deployed in a workspace.
Test your connector
The repository provides several testing approaches:
Generic test suite (required)
Connects to a real source using your provided credentials to verify end-to-end functionality, including authentication, schema discovery, and data reads.
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json
Write-back testing (recommended)
Executes write-read-verify cycles to validate incremental reads and deletes. This confirms that your offset tracking and CDC logic work correctly.
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json
Unit tests
Write unit tests for any complex custom logic in your connector, such as pagination handling, type coercion, or error recovery.
Build the deployment artifact
After your connector passes the test suites, run the merge script to generate a single-file deployment artifact. The pipeline uses this file at runtime rather than the full repository.
python tools/scripts/merge_python_source.py --connector <your-source>
This produces a self-contained Python file in dist/<your-source>/ that includes all connector code and dependencies.
Create an ingestion pipeline
To try your connector:
-
In the sidebar of your Databricks workspace, click +New > Add or upload data, then select + Add Community Connector under Community connectors.
-
For Source name, enter the name of your connector.
-
For GitHub repository URL, enter the URL of the GitHub repository that hosts your connector source code.
-
Click Add Connector.
-
Click + Create connection or select an existing connection, then click Next.
-
For Pipeline name, enter a name for the pipeline.
-
For Event log location, enter a catalog name and a schema name. Databricks stores the pipeline event log here. Ingested tables are also written here by default.
-
For Root path, enter your workspace path (for example,
/Workspace/Users/<your-email>/connectors). Databricks clones and stores the connector source code here. -
Click Create pipeline.
-
In the pipeline editor, open
ingest.pyand modify the objects field to include the tables you want to ingest. For example:Pythonfrom databricks.labs.community_connector.pipeline import ingest
pipeline_spec = {
"connection_name": "my_connector_connection", # Required: UC connection name
"objects": [
{"table": {"source_table": "my_table"}},
],
}
ingest(spark, pipeline_spec) -
Run the pipeline manually or schedule it.
Pipeline configuration options
You can configure the following options in ingest.py:
Option | Description |
|---|---|
| Required. The name of the connection that stores authentication credentials for the source. |
| Required. A list of tables to ingest. Each entry has the format |
| The catalog where ingested tables are written. Defaults to the catalog set during pipeline creation. |
| The schema where ingested tables are written. Defaults to the schema set during pipeline creation. |
| The slowly changing dimension strategy: |
| Override the default primary keys for a table. Provide a list of column names. |
Register your connector
After you build and test your connector, open a pull request in the Lakeflow Community Connectors repository to make it available to the community.