メインコンテンツまでスキップ

MySQL取り込みパイプラインを作成する

備考

プレビュー

MySQL コネクタはパブリック プレビュー段階です。アクセスをリクエストするには、Databricks アカウント チームにお問い合わせください。

LakeFlow Connectを使用してMySQLからDatabricksにデータを取り込む方法を学習します。 MySQLコネクタは、 Amazon RDS for MySQL 、Aurora MySQL 、 Azure Database for MySQL 、Google クラウドSQL for MySQL 、およびEC2上で実行されるMySQLサポートします。

始める前に

インジェスト ゲートウェイとインジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースで Unity Catalog が有効になっています。

  • サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。

  • 接続を作成する場合: メタストアに対するCREATE CONNECTION権限があります。

    コネクタが UI ベースのパイプライン オーサリングをサポートしている場合は、このページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン オーサリングを使用する場合は、このページのステップを完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。

  • 既存の接続を使用する予定の場合: 接続に対してUSE CONNECTION権限またはALL PRIVILEGES権限があります。

  • ターゲット カタログに対するUSE CATALOG権限があります。

  • 既存のスキーマに対するUSE SCHEMACREATE TABLE 、およびCREATE VOLUME権限、またはターゲット カタログに対するCREATE SCHEMA権限があります。

  • クラスターを作成するための無制限の権限、またはカスタム ポリシー (API のみ)。ゲートウェイのカスタム ポリシーは、次の要件を満たす必要があります。

    • ファミリー: ジョブコンピュート

    • ポリシー ファミリを上書きします:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • 次のコンピュート ポリシーにより、 Databricksワークロードのニーズに合わせて取り込みゲートウェイをスケーリングできます。 最小要件は 4 コアです。ただし、スナップショット抽出のパフォーマンスを向上させるために、 Databricks 、より多くのメモリと CPU コアを備えたより大きなインスタンス タイプを使用することをお勧めします。

      Python
      {
      "driver_node_type_id": {
      "type": "fixed",
      "value": "r5n.2xlarge"
      },
      "node_type_id": {
      "type": "fixed",
      "value": "m5n.large"
      }
      }

    クラスターポリシーの詳細については、 「コンピュート ポリシーの選択」を参照してください。

MySQL から取り込むには、ソースのセットアップも完了する必要があります。

オプション1: Databricks UI

管理者ユーザーは、UI で接続とパイプラインを同時に作成できます。これは、管理された取り込みパイプラインを作成する最も簡単な方法です。

  1. Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。

  2. [データの追加] ページの [Databricks コネクタ] で、 [MySQL] をクリックします。取り込みウィザードが開きます。

  3. ウィザードの 「取り込みゲートウェイ」 ページで、ゲートウェイの一意の名前を入力します。

  4. ステージング取り込みデータのカタログとスキーマを選択し、 「次へ」 をクリックします。

  5. [取り込みパイプライン] ページで、パイプラインの一意の名前を入力します。

  6. 宛先カタログ では、取り込んだデータを保存するカタログを選択します。

  7. ソース データにアクセスするために必要な資格情報を保存するUnity Catalog接続を選択します。

    ソースへの既存の接続がない場合は、 「接続の作成」 をクリックし、ソース設定から取得した認証詳細を入力します。メタストアに対するCREATE CONNECTION権限が必要です。

注記

sha256_passwordまたはcaching_sha2_password認証を使用している MySQL ユーザーの場合、 [テスト接続] ボタンが失敗する可能性があります。これは既知の制限です。接続の作成を続行できます。

  1. パイプラインの作成および続行 をクリックします。

  2. [ソース] ページで、取り込むデータベースとテーブルを選択します。

  3. 必要に応じて、デフォルトの履歴追跡設定を変更します。詳細については、 「履歴追跡の有効化 ( SCDタイプ 2)」を参照してください。

  4. 次へ をクリックします。

  5. 宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、 「スキーマの作成」 をクリックします。親カタログに対するUSE CATALOGおよびCREATE SCHEMA権限が必要です。

  6. 保存して続行 をクリックします。

  7. (オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。

  8. (オプション) パイプライン操作の成功または失敗に関する電子メール通知を設定します。

  9. パイプラインの保存と実行 をクリックします。

オプション2: プログラムインターフェース

Databricks Asset Bundles、 Databricks APIs 、 Databricks SDK、またはDatabricks CLIを使用して取り込む前に、既存のUnity Catalog接続にアクセスできる必要があります。 手順については、 「管理対象取り込みソースへの接続」を参照してください。

ステージングカタログとスキーマを作成する

ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。

Bash
export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"

output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "MYSQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

ゲートウェイと取り込みパイプラインを作成する

インジェスチョン ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、Unity Catalog ステージング ボリュームに保存します。ゲートウェイを継続的なパイプラインとして実行する必要があります。これにより、ソース データベースの binlog 保持ポリシーが調整されます。

取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。

このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    Bash
    databricks bundle init
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル ( resources/mysql_pipeline.yml )。
    • データ取り込みの頻度を制御するワークフロー ファイル ( resources/mysql_job.yml )。

    以下はresources/mysql_pipeline.ymlファイルの例です。

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: mysql-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <mysql-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    pipeline_mysql:
    name: mysql-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table mydb.customers to dest_catalog.dest_schema.customers
    source_schema: public
    source_table: customers
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema
    # The destination table name will be the same as it is on the source
    source_schema: sales
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}

    以下はresources/mysql_job.ymlファイルの例です。

    YAML
    resources:
    jobs:
    mysql_dab_job:
    name: mysql_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_mysql.id}
  3. Databricks CLI を使用してパイプラインをデプロイします。

    Bash
    databricks bundle deploy

追加のリソース