メインコンテンツまでスキップ

カスタムコネクタを作成する

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

このページでは、 LakeFlow Connectでまだサポートされていないシステム用のコネクタを作成する方法を示します。 まず、 GitHubのLakeFlowコミュニティ コネクタ リポジトリにあるツールとテンプレートを使用して、コネクタをローカルで構築してテストします。 このリポジトリには、ソースコード調査、認証設定、実装、テストなど、各段階を支援するAI搭載の開発ツールが含まれています。

カスタム コネクタを使用する準備ができたら、 Databricksワークスペースで試してから、プル リクエストを開いてコミュニティに登録します。

登録済みのコミュニティコネクタを使用するには、 「登録済みのコミュニティコネクタを使用する」を参照してください。

要件

始める前に、以下のものを用意してください。

  • Python 3.10以上
  • Unity Catalogが有効になっているDatabricksワークスペース
  • 接続先のソースのAPI認証情報
  • Gitがローカルにインストールされています

リポジトリを設定する

LakeFlowコミュニティ コネクタリポジトリのクローンを作成し、開発依存関係をインストールします。

  1. リポジトリをクローンする:

    Bash
    git clone https://github.com/databrickslabs/lakeflow-community-connectors.git
    cd lakeflow-community-connectors
  2. 仮想環境を作成し、依存関係をインストールします。

    Bash
    python -m venv .venv
    source .venv/bin/activate
    pip install -e ".[dev]"
  3. 既存のコネクタディレクトリ(例: connectors/stripe/ )をコピーして、新しいコネクタの開始点として使用します。

    Bash
    cp -r connectors/stripe connectors/<your-source>

LakeflowConnectインターフェースを実装する

各コミュニティコネクタは、 LakeflowConnectインターフェースを実装します。このインターフェースは、コネクタが認証を行い、テーブルを検出し、スキーマを返し、データを読み取る方法を定義します。

Python
class LakeflowConnect:
def __init__(self, options: dict[str, str]) -> None:
"""Initialize with connection parameters"""

def list_tables(self) -> list[str]:
"""Return names of all tables supported by this connector."""

def get_table_schema(self, table_name: str, table_options: dict[str, str]) -> StructType:
"""Return the Spark schema for a table."""

def read_table_metadata(self, table_name: str, table_options: dict[str, str]) -> dict:
"""Return metadata: primary_keys, cursor_field, ingestion_type
(snapshot|cdc|cdc_with_deletes|append)."""

def read_table(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Yield records as JSON dicts and return the next offset
for incremental reads."""

def read_table_deletes(self, table_name: str, start_offset: dict,
table_options: dict[str, str]) -> (Iterator[dict], dict):
"""Optional: Only required if ingestion_type is 'cdc_with_deletes'."""

方法の説明

手法

説明

__init__

接続問題を辞書として受け取り、ソースのAPIクライアントを初期化します。

list_tables

コネクタが公開するすべてのテーブル(またはAPIエンドポイント)の名前を返します。Databricksはこのリストを使用して、テーブル選択UIにデータを入力します。

get_table_schema

指定されたテーブルのスキーマを記述する Spark StructTypeを返します。最初のパイプライン実行前、およびスキーマ進化が有効になっている場合は各実行時に呼び出されます。

read_table_metadata

primary_keyscursor_fieldingestion_typeを含む辞書を返します。ingestion_typesnapshotcdccdc_with_deletes 、またはappendのいずれかでなければなりません。

read_table

レコードをPython辞書として出力し、増分読み取りのための次のオフセットを返します。初回実行時、 start_offsetは空です。後続の実行では、前回の実行によって返されたオフセットが含まれます。

read_table_deletes

任意。ingestion_typecdc_with_deletesの場合にのみ、このメソッドを実装してください。削除されたレコードキーを抽出し、次のオフセットを返します。

コネクターを育成する

次のステップに従って、新しいコネクタを構築して検証します。

  1. ソースAPIを調査する : ソースのAPI仕様、認証メカニズム、レート制限、および利用可能なデータ スキーマを調査します。 公開するテーブルまたはエンドポイントを特定します。

  2. 認証の設定 :接続仕様を生成し、システムの認証情報を設定し、開発環境から接続を確認します。

  3. コネクタを実装します 。ソース API に接続し、期待される形式でデータを返すために必要なすべてのLakeflowConnectインターフェース メソッドをコーディングします。

  4. テストと反復 :標準テストスイートを実際のソースシステムに対して実行し、問題点を修正する。詳細については、 「コネクタのテスト」を参照してください。

  5. コネクタを文書化する : ユーザー向けのREADME.mdを作成し、コネクタの設定可能な要素を記述するコネクタ仕様 YAML ファイルを生成します。

  6. デプロイメント アーティファクトをビルドする : ビルド スクリプトを実行して、ワークスペースにデプロイできる単一ファイル アーティファクトを生成します。

コネクタをテストしてください

このリポジトリでは、いくつかのテスト手法が提供されています。

汎用テストスイート(必須)

提供された認証情報を使用して実際のソースに接続し、認証、スキーマ検出、データ読み取りなど、エンドツーエンドの機能を検証します。

Bash
python -m pytest tests/generic/ --connector <your-source> --credentials credentials.json

書き戻しテスト(推奨)

書き込み・読み取り・検証のサイクルを実行して、増分読み取りと削除を検証します。これにより、オフセット追跡とCDCロジックが正しく機能していることが確認できます。

Bash
python -m pytest tests/writeback/ --connector <your-source> --credentials credentials.json

単体テスト

コネクタ内の複雑なカスタムロジック(ページネーション処理、型変換、エラー回復など)については、単体テストを作成してください。

デプロイメントアーティファクトを作成する

コネクタがテストスイートに合格したら、マージスクリプトを実行して単一ファイルのデプロイメントアーティファクトを生成します。パイプラインはランタイム時にフルリポジトリではなくこのファイルを使用します。

Bash
python tools/scripts/merge_python_source.py --connector <your-source>

これにより、コネクタコードと依存関係をすべて含んだ自己完結型のPythonファイルがdist/<your-source>/に生成されます。

取り込みパイプラインを作成する

コネクタを試すには:

  1. Databricksワークスペースのサイドバーで、 [+新規 ] > [データの追加またはアップロード] をクリックし、 [コミュニティ コネクタ] の下にある [+ コミュニティ コネクタの追加] を選択します。

  2. 「ソース名」 には、コネクタの名前を入力してください。

  3. GitHubリポジトリ URL] には、コネクタのソース コードをホストするGitHubリポジトリの URL を入力します。

  4. 「コネクタを追加」 をクリックします。

  5. 「+接続を作成」 をクリックするか、既存の接続を選択してから、 「次へ」 をクリックします。

  6. [パイプライン名] にパイプライン の名前を入力します。

  7. イベントログの保存場所 には、カタログ名とスキーマ名を入力してください。Databricksはパイプラインのイベントログをここに保存します。取り込まれたテーブルも、デフォルトではここに書き込まれます。

  8. ルートパス には、ワークスペースのパスを入力してください(例: /Workspace/Users/<your-email>/connectors )。Databricksはコネクタのソースコードをここに複製して保存します。

  9. パイプラインの作成 をクリックします。

  10. パイプラインエディタで、 ingest.pyを開き、 オブジェクト フィールドを変更して、取り込みたいテーブルを含めます。例えば:

    Python
    from databricks.labs.community_connector.pipeline import ingest

    pipeline_spec = {
    "connection_name": "my_connector_connection", # Required: UC connection name
    "objects": [
    {"table": {"source_table": "my_table"}},
    ],
    }

    ingest(spark, pipeline_spec)
  11. パイプラインを手動で実行するか、スケジュール設定してください。

パイプライン構成オプション

ingest.pyでは、以下のオプションを設定できます。

オプション

説明

connection_name

必須。ソースの認証情報を保存する接続の名前。

objects

必須。取り込むテーブルのリスト。各エントリは{"table": {"source_table": "..."}}形式です。tableオブジェクト内にオプションのdestination_tableを指定することもできます。

destination_catalog

取り込まれたテーブルが書き込まれるカタログ。デフォルトでは、パイプライン作成時に設定されたカタログが使用されます。

destination_schema

取り込まれたテーブルが書き込まれるスキーマ。デフォルトでは、パイプライン作成時に設定されたスキーマが使用されます。

scd_type

ゆっくりと変化する次元戦略: SCD_TYPE_1SCD_TYPE_2 、またはAPPEND_ONLY 。 デフォルトはSCD_TYPE_1です。

primary_keys

テーブルのデフォルトの主キーを上書きします。列名の一覧を提供してください。

コネクタを登録する

コネクタを構築してテストした後、 LakeFlowコミュニティ コネクタリポジトリでプル リクエストを開き、コミュニティで利用できるようにします。