MySQL取り込みパイプラインを作成する
プレビュー
MySQL コネクタはパブリック プレビュー段階です。アクセスをリクエストするには、Databricks アカウント チームにお問い合わせください。
LakeFlow Connectを使用してMySQLからDatabricksにデータを取り込む方法を学習します。 MySQLコネクタは、 Amazon RDS for MySQL 、Aurora MySQL 、 Azure Database for MySQL 、Google クラウドSQL for MySQL 、およびEC2上で実行されるMySQLサポートします。
始める前に
インジェスト ゲートウェイとインジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースで Unity Catalog が有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。
-
接続を作成する場合: メタストアに対する
CREATE CONNECTION権限があります。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合は、このページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、API ベースのパイプライン オーサリングを使用する場合は、このページのステップを完了する前に、カタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する予定の場合: 接続に対して
USE CONNECTION権限またはALL PRIVILEGES権限があります。 -
ターゲット カタログに対する
USE CATALOG権限があります。 -
既存のスキーマに対する
USE SCHEMA、CREATE TABLE、およびCREATE VOLUME権限、またはターゲット カタログに対するCREATE SCHEMA権限があります。 -
クラスターを作成するための無制限の権限、またはカスタム ポリシー (API のみ)。ゲートウェイのカスタム ポリシーは、次の要件を満たす必要があります。
-
ファミリー: ジョブコンピュート
-
ポリシー ファミリを上書きします:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
次のコンピュート ポリシーにより、 Databricksワークロードのニーズに合わせて取り込みゲートウェイをスケーリングできます。 最小要件は 4 コアです。ただし、スナップショット抽出のパフォーマンスを向上させるために、 Databricks 、より多くのメモリと CPU コアを備えたより大きなインスタンス タイプを使用することをお勧めします。
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "r5n.2xlarge"
},
"node_type_id": {
"type": "fixed",
"value": "m5n.large"
}
}
クラスターポリシーの詳細については、 「コンピュート ポリシーの選択」を参照してください。
-
MySQL から取り込むには、ソースのセットアップも完了する必要があります。
オプション1: Databricks UI
管理者ユーザーは、UI で接続とパイプラインを同時に作成できます。これは、管理された取り込みパイプラインを作成する最も簡単な方法です。
-
Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
-
[データの追加] ページの [Databricks コネクタ] で、 [MySQL] をクリックします。取り込みウィザードが開きます。
-
ウィザードの 「取り込みゲートウェイ」 ページで、ゲートウェイの一意の名前を入力します。
-
ステージング取り込みデータのカタログとスキーマを選択し、 「次へ」 をクリックします。
-
[取り込みパイプライン] ページで、パイプラインの一意の名前を入力します。
-
宛先カタログ では、取り込んだデータを保存するカタログを選択します。
-
ソース データにアクセスするために必要な資格情報を保存するUnity Catalog接続を選択します。
ソースへの既存の接続がない場合は、 「接続の作成」 をクリックし、ソース設定から取得した認証詳細を入力します。メタストアに対する
CREATE CONNECTION権限が必要です。
sha256_passwordまたはcaching_sha2_password認証を使用している MySQL ユーザーの場合、 [テスト接続] ボタンが失敗する可能性があります。これは既知の制限です。接続の作成を続行できます。
-
パイプラインの作成および続行 をクリックします。
-
[ソース] ページで、取り込むデータベースとテーブルを選択します。
-
必要に応じて、デフォルトの履歴追跡設定を変更します。詳細については、 「履歴追跡の有効化 ( SCDタイプ 2)」を参照してください。
-
次へ をクリックします。
-
宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。
既存のスキーマを使用しない場合は、 「スキーマの作成」 をクリックします。親カタログに対する
USE CATALOGおよびCREATE SCHEMA権限が必要です。 -
保存して続行 をクリックします。
-
(オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。
-
(オプション) パイプライン操作の成功または失敗に関する電子メール通知を設定します。
-
パイプラインの保存と実行 をクリックします。
オプション2: プログラムインターフェース
Databricks Asset Bundles、 Databricks APIs 、 Databricks SDK、またはDatabricks CLIを使用して取り込む前に、既存のUnity Catalog接続にアクセスできる必要があります。 手順については、 「管理対象取り込みソースへの接続」を参照してください。
ステージングカタログとスキーマを作成する
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。
export CONNECTION_NAME="my_mysql_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_mysql_connector"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="mysql-instance.region.rds.amazonaws.com"
export DB_PORT="3306"
export DB_USER="databricks_replication"
export DB_PASSWORD="your_secure_password"
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "MYSQL",
"options": {
"host": "'"$DB_HOST"'",
"port": "'"$DB_PORT"'",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
ゲートウェイと取り込みパイプラインを作成する
インジェスチョン ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、Unity Catalog ステージング ボリュームに保存します。ゲートウェイを継続的なパイプラインとして実行する必要があります。これにより、ソース データベースの binlog 保持ポリシーが調整されます。
取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。
- Databricks Asset Bundles
- Notebook
- CLI
このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/mysql_pipeline.yml)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/mysql_job.yml)。
以下は
resources/mysql_pipeline.ymlファイルの例です。YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: mysql-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <mysql-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_mysql:
name: mysql-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table mydb.customers to dest_catalog.dest_schema.customers
source_schema: public
source_table: customers
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the mydb.sales schema to dest_catalog.dest_schema
# The destination table name will be the same as it is on the source
source_schema: sales
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}以下は
resources/mysql_job.ymlファイルの例です。YAMLresources:
jobs:
mysql_dab_job:
name: mysql_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_mysql.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
次のノートブックのConfigurationセルを、ソース接続、ターゲット カタログ、ターゲット スキーマ、およびソースから取り込むテーブルで更新します。
ゲートウェイと取り込み パイプラインを作成する
ゲートウェイを作成するには:
export GATEWAY_PIPELINE_NAME="mysql-gateway"
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
取り込みパイプラインを作成するには:
export INGESTION_PIPELINE_NAME="mysql-ingestion-pipeline"
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_schema": "public",
"source_table": "customers",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "customers"
}},
{"schema": {
"source_schema": "sales",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'