カスタムコネクタを作成する
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
このページでは、 LakeFlow Connectでまだサポートされていないシステム用のコネクタを作成する方法を示します。 まず、 GitHubのLakeFlowコミュニティ コネクタ リポジトリにあるツールとテンプレートを使用して、コネクタをローカルで構築してテストします。 このリポジトリには、ソースコード調査、認証設定、実装、テストなど、各段階を支援するAI搭載の開発ツールが含まれています。
カスタム コネクタを使用する準備ができたら、 Databricksワークスペースで試してから、プル リクエストを開いてコミュニティに登録します。
登録済みのコミュニティコネクタを使用するには、 「登録済みのコミュニティコネクタを使用する」を参照してください。
要件
始める前に、以下のものを用意してください。
- Python 3.10以上
- Unity Catalogが有効になっているDatabricksワークスペース
- 接続先のソースのAPI認証情報
- Gitがローカルにインストールされています
リポジトリを設定する
LakeFlowコミュニティ コネクタリポジトリのクローンを作成し、開発依存関係をインストールします。
-
リポジトリをクローンする:
Bashgit clone https://github.com/databrickslabs/lakeflow-community-connectors.git
cd lakeflow-community-connectors -
仮想環境を作成し、依存関係をインストールします。
Bashpython -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]" -
既存のコネクタディレクトリ(例:
connectors/stripe/)をコピーして、新しいコネクタの開始点として使用します。Bashcp -r connectors/stripe connectors/<your-source>
LakeflowConnectインターフェースを実装する
各コミュニティコネクタは、 LakeflowConnectインターフェースを実装します。このインターフェースは、コネクタが認証を行い、テーブルを検出し、スキーマを返し、データを読み取る方法を定義します。
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""
def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""
def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""
def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""
def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""
def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""
方法の説明
手法 | 説明 |
|---|---|
| 接続問題を辞書として受け取り、ソースのAPIクライアントを初期化します。 |
| コネクタが公開するすべてのテーブル(またはAPIエンドポイント)の名前を返します。Databricksはこのリストを使用して、テーブル選択UIにデータを入力します。 |
| 指定されたテーブルのスキーマを記述する Spark |
|
|
| レコードをPython辞書として出力し、増分読み取りのための次のオフセットを返します。初回実行時、 |
| 任意。 |
コネクターを育成する
次のステップに従って、新しいコネクタを構築して検証します。
-
ソースAPIを調査する : ソースのAPI仕様、認証メカニズム、レート制限、および利用可能なデータ スキーマを調査します。 公開するテーブルまたはエンドポイントを特定します。
-
認証の設定 :接続仕様を生成し、システムの認証情報を設定し、開発環境から接続を確認します。
-
コネクタを実装します 。ソース API に接続し、期待される形式でデータを返すために必要なすべての
LakeflowConnectインターフェース メソッドをコーディングします。 -
テストと反復 :標準テストスイートを実際のソースシステムに対して実行し、問題点を修正する。詳細については、 「コネクタのテスト」を参照してください。
-
コネクタを文書化する : ユーザー向けの
README.mdを作成し、コネクタの設定可能な要素を記述するコネクタ仕様 YAML ファイルを生成します。 -
デプロイメント アーティファクトをビルドする : ビルド スクリプトを実行して、ワークスペースにデプロイできる単一ファイル アーティファクトを生成します。
コネクタをテストしてください
このリポジトリでは、いくつかのテスト手法が提供されています。
汎用テストスイート(必須)
提供された認証情報を使用して実際のソースに接続し、認証、スキーマ検出、データ読み取りなど、エンドツーエンドの機能を検証します。
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json
書き戻しテスト(推奨)
書き込み・読み取り・検証のサイクルを実行して、増分読み取りと削除を検証します。これにより、オフセット追跡とCDCロジックが正しく機能していることが確認できます。
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json
単体テスト
コネクタ内の複雑なカスタムロジック(ページネーション処理、型変換、エラー回復など)については、単体テストを作成してください。
デプロイメントアーティファクトを作成する
コネクタがテストスイートに合格したら、マージスクリプトを実行して単一ファイルのデプロイメントアーティファクトを生成します。パイプラインはランタイム時にフルリポジトリではなくこのファイルを使用します。
python tools/scripts/merge_python_source.py --connector <your-source>
これにより、コネクタコードと依存関係をすべて含んだ自己完結型のPythonファイルがdist/<your-source>/に生成されます。
取り込みパイプラインを作成する
コネクタを試すには:
-
Databricksワークスペースのサイドバーで、 [+新規 ] > [データの追加またはアップロード] をクリックし、 [コミュニティ コネクタ] の下にある [+ コミュニティ コネクタの追加] を選択します。
-
「ソース名」 には、コネクタの名前を入力してください。
-
GitHubリポジトリ URL] には、コネクタのソース コードをホストするGitHubリポジトリの URL を入力します。
-
「コネクタを追加」 をクリックします。
-
「+接続を作成」 をクリックするか、既存の接続を選択してから、 「次へ」 をクリックします。
-
[パイプライン名] にパイプライン の名前を入力します。
-
イベントログの保存場所 には、カタログ名とスキーマ名を入力してください。Databricksはパイプラインのイベントログをここに保存します。取り込まれたテーブルも、デフォルトではここに書き込まれます。
-
ルートパス には、ワークスペースのパスを入力してください(例:
/Workspace/Users/<your-email>/connectors)。Databricksはコネクタのソースコードをここに複製して保存します。 -
パイプラインの作成 をクリックします。
-
パイプラインエディタで、
ingest.pyを開き、 オブジェクト フィールドを変更して、取り込みたいテーブルを含めます。例えば:Pythonfrom databricks.labs.community_connector.pipeline import ingest
pipeline_spec = {
"connection_name": "my_connector_connection", # Required: UC connection name
"objects": [
{"table": {"source_table": "my_table"}},
],
}
ingest(spark, pipeline_spec) -
パイプラインを手動で実行するか、スケジュール設定してください。
パイプライン構成オプション
ingest.pyでは、以下のオプションを設定できます。
オプション | 説明 |
|---|---|
| 必須。ソースの認証情報を保存する接続の名前。 |
| 必須。取り込むテーブルのリスト。各エントリは |
| 取り込まれたテーブルが書き込まれるカタログ。デフォルトでは、パイプライン作成時に設定されたカタログが使用されます。 |
| 取り込まれたテーブルが書き込まれるスキーマ。デフォルトでは、パイプライン作成時に設定されたスキーマが使用されます。 |
| ゆっくりと変化する次元戦略: |
| テーブルのデフォルトの主キーを上書きします。列名の一覧を提供してください。 |
コネクタを登録する
コネクタを構築してテストした後、 LakeFlowコミュニティ コネクタリポジトリでプル リクエストを開き、コミュニティで利用できるようにします。