MySQL用の統合CDCパイプラインを作成する
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
統合CDCパイプラインは、単一のパイプラインを使用してMySQLからDatabricksに変更データを取り込みます。標準のゲートウェイベースのアーキテクチャとは異なり、統合CDCパイプラインは抽出ステージと適用ステージの両方を1回のパイプライン更新で実行します。標準アーキテクチャには、個別の取り込みゲートウェイと取り込みパイプラインが必要です。
統合CDCコネクタを使用するタイミング
次の表は、統合 CDC パイプラインと標準のゲートウェイベースのアーキテクチャを比較したものです。
機能 | 標準CDC(ゲートウェイベース) | 統合 CDC |
|---|---|---|
パイプラインの数 | 2つ(インジェストゲートウェイとインジェストパイプライン) | 1つの(統合されたパイプライン) |
設定 | ゲートウェイを作成してから、ゲートウェイIDを参照する取り込みパイプラインを作成します。 | Unity Catalog接続を参照する単一のパイプラインを作成する |
ゲートウェイモード: | ゲートウェイは、独立した長時間実行プロセスとして継続的に実行されます | 抽出は、スケジュールされた各パイプラインの更新に埋め込まれています。 |
接続リファレンス |
|
|
コネクタタイプ | 暗黙的なデフォルトCDCの動作 | 明示的: |
ステージングボリューム | ゲートウェイによって内部的に管理されます。 | 宛先スキーマに自動作成される、または次の方法で構成される |
パイプラインモード | 連続 | トリガーモードのみ |
コンピュート | ゲートウェイにはクラシック、マネージド取り込みパイプラインにはサーバレス | Classicコンピュートのみ。サーバレスはサポートされていません。 |
自動フルリフレッシュ | 既存のMySQLゲートウェイベースのフローではサポートされていません | サポートされています |
最大テーブル数 | パイプラインあたり250 | パイプラインあたり250 |
SCDタイプ2 | サポートされていない | サポートされていない |
認証 | ユーザー名/パスワード | ユーザー名/パスワード |
ソースデータベースのセットアップについては、「Databricks への取り込み用に MySQL を構成する」を参照してください。同じソース構成が両方のアーキテクチャに適用されます。
統合された CDC パイプラインの実行方法
各パイプラインの更新は、2つのステージを順次実行します:
- 抽出。 パイプラインは、Unity Catalog接続を使用してソースデータベースに接続します。初回実行時または完全更新時に、初期スナップショットが取得されます。後続の実行時に、バイナリログ (binlog) を使用して増分変更 (挿入、更新、削除) を取得します。パイプラインは、抽出したデータをUnity Catalogステージングボリュームに書き込みます。
- アプリケーション。 パイプラインはステージングボリュームから読み取り、変更をUnity Catalog内の宛先ストリーミングテーブルに適用します。マージ操作では、設定された主キーとSCDタイプを使用します。パイプラインはexactry-onceセマンティクスを保証します。
ベータ期間中、各パイプラインの更新の最大ランタイムは約30分です。ソースに1回の更新で処理できる量を超える変更がある場合、次回のスケジュールされた更新は前回の更新が停止した場所から再開されます。定期的にデータを取り込むには、Lakeflow Jobsタスクを使用してパイプラインをスケジュールします。
要件
-
ワークスペースでUnity Catalogが有効になっています。
-
接続を作成する予定の場合:メタストアに対する
CREATE CONNECTION権限があります。Unity Catalog での権限の管理を参照してください。コネクタがUIベースのパイプラインオーサリングをサポートしている場合は、このページのステップを完了することで、接続とパイプラインを同時に作成できます。ただし、APIベースのパイプラインオーサリングを使用する場合は、このページのステップを完了する前に、カタログエクスプローラーで接続を作成する必要があります。管理された取り込みソースに接続するを参照してください。
-
既存の接続を使用する予定の場合: 接続に対して
USE CONNECTION権限またはALL PRIVILEGES権限を持っている必要があります。 -
ターゲットカタログには
USE CATALOG権限があります。 -
既存のスキーマに対する
USE SCHEMA、CREATE TABLE、およびCREATE VOLUME権限、またはターゲットカタログに対するCREATE SCHEMA権限を持っています。 -
お使いのワークスペースでは、統合CDCコネクター機能を有効にする必要があります。Databricks アカウント チームにお問い合わせください。
-
プライマリ MySQLインスタンスにアクセスできます。統合されたCDCコネクタは、読み取りレプリカをサポートしていません。
-
binlog_format=ROWおよびbinlog_row_image=FULLを使用して、ソースデータベースでバイナリログを有効にします。 -
MySQL ソースのセットアップが完了しました。Databricks への取り込み用に MySQL を構成するを参照してください。
-
次の権限が必要です。
CREATE CONNECTIONメタストア上 (新しいUnity Catalog接続を作成する場合)、または既存の接続でUSE CONNECTION。USE CATALOG送信先カタログ上。USE SCHEMAおよび宛先スキーマでのCREATE TABLE。CREATE VOLUME宛先スキーマ、またはdata_staging_optionsで指定されたスキーマについて。data_staging_optionsが設定されていなくても、パイプラインが宛先スキーマに自動作成するため、ステージングボリュームが必要です。
コンピュートの要件
MySQLの統合CDCパイプラインには、クラシックコンピュートが必要です。サーバレスコンピュートはサポートされていません。
- クラシックコンピュート :コンピュートプレーンは、DatabricksワークスペースのVPCまたはVNetで実行され、ネットワーク経由でMySQLインスタンスに到達する必要があります。サポートされているネットワークパスには、VPCまたはVNetピアリング、パブリックエンドポイント、およびオンプレミスMySQLの場合、AWS Direct Connect、Azure ExpressRoute、またはVPNが含まれます。
クラシックコンピュートの場合、無制限のクラスター作成権限を使用するか、cluster_type が dlt に固定され、runtime_engine が STANDARD に固定されたカスタムの クラスターポリシー を使用します。Databricks では、効率的な抽出のために少なくとも8コアが推奨されます。
MySQLへのUnity Catalog接続を作成します。
パイプラインを作成する前に、MySQLへのUnity Catalog接続を作成します。MySQL接続を作成するを参照してください。
統合された CDC パイプラインを作成します
API、Databricks CLI、ノートブック、または宣言型自動化バンドルを使用して、統合されたCDCパイプラインを作成します。UIの作成はまだ利用できません。
すべてのパイプライン作成リクエストには、"channel": "PREVIEW" が含まれている必要があります。
- Declarative Automation Bundles
- Databricks notebook
- Databricks CLI
- REST API
バンドルファイル(例:resources/integrated_cdc_pipeline.yml)でパイプラインリソースを定義します:
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'
resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_schema: 'my_database'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'
スケジュールに従ってパイプラインを実行するには、パイプラインをトリガーするジョブ(例:resources/integrated_cdc_job.yml)を定義します。各抽出ステージは最低10分間実行されるため、60分以上の間隔が良い出発点となります:
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'
Databricks CLI を使用してバンドルをデプロイします:
databricks bundle deploy
databricks bundle run integrated_cdc_job
詳細情報については、宣言型オートメーションバンドルとは何ですか?をご覧ください。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
pipeline = w.pipelines.create(
name="<pipeline-name>",
pipeline_type="MANAGED_INGESTION",
channel="PREVIEW",
serverless=False,
catalog="<destination-catalog>",
schema="<destination-schema>",
ingestion_definition={
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_schema": "<source-database>",
"source_table": "<source-table>",
}
}
],
},
)
print(f"Pipeline created: {pipeline.pipeline_id}")
databricks pipelines create --json '{
"name": "<pipeline-name>",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_schema": "<source-database>",
"source_table": "<source-table>"
}
}
]
}
}'
次の例では、MySQLデータベースから2つのテーブルを複製します。どちらもトップレベルの宛先main.ingestionを継承します。MySQLの統合CDCパイプラインには、"serverless": falseが必要です。
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-mysql-connection",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_schema": "my_database",
"source_table": "customers",
"table_configuration": {
"primary_keys": ["customer_id"],
"scd_type": "SCD_TYPE_1"
}
}
},
{
"table": {
"source_schema": "my_database",
"source_table": "orders",
"table_configuration": {
"primary_keys": ["order_id"],
"scd_type": "SCD_TYPE_1"
}
}
}
],
"data_staging_options": {
"catalog_name": "main",
"schema_name": "ingestion_staging"
}
}
}
ソースデータベース内のすべてのテーブルをレプリケートするには、個々のtableオブジェクトの代わりにschemaオブジェクトを使用します。
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-schema-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-mysql-connection",
"connector_type": "CDC",
"objects": [
{
"schema": {
"source_schema": "my_database",
"destination_catalog": "main",
"destination_schema": "ingestion"
}
}
]
}
}
パイプラインの更新を開始するには:
POST /api/2.0/pipelines/<pipeline-id>/updates
{
"full_refresh": false
}
定期的な更新をスケジュール
統合CDCパイプラインはトリガーモードでのみ実行されます。定期的なスケジュールでデータを取り込むには、パイプラインを実行するLakeflow Jobsタスクを作成します。各更新の実行には約30分かかります。また、1回の更新ですべての変更バックログの処理を完了できない場合があります。パイプラインのスケジュールを60分間隔またはそれよりも頻繁に設定し、後続の更新が追いつくようにしてください。前回の更新がまだ実行中にトリガーが起動した場合、新しい更新はキューに入れられます。
構成リファレンス
パイプラインパラメーター
パラメーター | Type | 説明 |
|---|---|---|
| string | パイプラインの名前。 |
| string |
|
| string |
|
| Boolean | MySQL統合CDCパイプラインの場合、 |
| string | デフォルトの宛先カタログ。テーブルごとの |
| string | デフォルトの宛先スキーマ。テーブルごとの |
| string | ソースデータベースへの Unity Catalog 接続。 |
| string |
|
| array | 取り込むテーブルまたはスキーマのリスト。 |
| オブジェクト | オプション。パイプラインがステージングボリュームを作成するカタログとスキーマ。パイプラインの宛先スキーマにデフォルト設定されます。 |
テーブルの仕様
パラメーター | 必須 | 説明 |
|---|---|---|
| はい | ソースのMySQLデータベース名。 |
| はい | ソーステーブル名。 |
| No | 宛先カタログです。デフォルトでは、パイプラインの |
| No | 送信先スキーマ。パイプラインの |
| No | 宛先テーブル名。 |
テーブル構成
パラメーター | デフォルト | 説明 |
|---|---|---|
| 自動検出されました。 | 各行を識別する列。指定されていない場合、ソースの主キーから自動検出されます。 |
|
|
|
| 自動検出されました。 | CDCイベントの順序付けに使用される列。指定されていない場合、ソースCDCメカニズムに基づいて自動検出されます。 |
| 無効 | サポートされていないDDL操作が検出された場合に、自動フル更新を構成します。自動フル更新ポリシーを参照してください。 |
MySQLのデータ型マッピングについては、MySQLコネクタの参照を参照してください。統合CDCパイプラインは自動型拡張をサポートしています。ソース列の型が拡張される(例:INTからBIGINT)と、宛先テーブルは自動的に適応します。
パイプラインを監視します
統合CDCパイプラインを作成して開始したら、次を使用してそのステータスを監視してください。
-
Databricks UI。 更新ステータス、テーブルごとの取り込みメトリクス、およびリネージを表示するには、 パイプライン セクションでパイプラインを開きます。
-
REST API。
TextGET /api/2.0/pipelines/<pipeline-id> -
イベントAPI。
TextGET /api/2.0/pipelines/<pipeline-id>/events
最初のパイプラインの更新は、選択されたすべてのテーブルの完全なスナップショットを実行します。これは増分更新よりも時間がかかる場合があります。大規模なテーブルの場合、初期スナップショットは完了するために複数のスケジュールされた更新が必要になる場合があります。その後の各更新は、前回の更新が中断したところから再開されます。
データ取り込みを確認するには:
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;
-- View recent changes (SCD Type 1 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;
フル更新と自動フル更新の動作については、ターゲットテーブルを完全に更新するを参照してください。
統合CDCパイプラインは、デフォルトで垂直オートスケールを有効にします。メモリ不足の状態が原因でパイプラインの更新が失敗した場合、次回の更新で、より大きなドライバーが自動的にプロビジョニングされます。この動作をオーバーライドするには、カスタムクラスターポリシーを使用してください。
制限事項
- ベータ版。 統合CDCコネクタには、ワークスペースレベルでの有効化が必要です。Databricksアカウントチームにご連絡ください。
- トリガーモードのみ。 統合 CDC パイプラインは、継続的 (常時稼動) な実行をサポートしていません。Lakeflow Jobs タスクを使用してパイプラインをスケジュールします。
- APIのみの作成。 パイプラインの作成は、REST API、Databricks CLI、ノートブック、および宣言型オートメーションバンドルを介して利用できます。UIでの作成はまだサポートされていません。
- チャンネルは
PREVIEWである必要があります。 パイプライン仕様には"channel": "PREVIEW"が含まれる必要があります。 - 接続とコネクタのタイプは変更できません。 パイプラインの作成後は、
connection_nameとconnector_typeは変更できません。ソースを変更するには、新しいパイプラインを作成します。 - パイプラインあたりのテーブルは最大250です。
- プライマリインスタンスのみ。 統合CDCコネクタは読み取りレプリカをサポートしていません。プライマリMySQLインスタンスに接続します。
- SCDタイプ2はサポートされていません。
- 主キーのないテーブル。 パイプラインは、すべての非LOB列を複合キーとして扱います。重複する行は、1つの行にまとめられる場合があります。
- 初期スナップショットは複数の更新にまたがる場合があります。 大きなテーブルの場合、初期スナップショットは1回の更新で完了しない場合があります。後続のスケジュールされた更新は、前回の更新が停止したところから再開します。
- **各更新は、約30分で実行されます。** ベータ期間中、パイプラインは1回の更新ですべての変更バックログを処理するわけではありません。その後のスケジュールされた更新は、前回の更新が終了した時点から処理を再開します。ベータ期間中、このランタイムを設定することはできません。
- バイナリログのパージには完全な更新が必要です。 パイプラインが変更を処理する前にMySQLバイナリログがパージされた場合は、影響を受けるテーブルで完全な更新を実行します。パイプラインがこの状態を検出し、イベントログにエラーを表示します。
- サーバレスコンピュートはサポートされていません。 MySQL統合CDCパイプラインにはクラシックコンピュートが必要です。
トラブルシューティング
一部のエラーコードでは、INGESTION_GATEWAY_ のプレフィックスが使用されます。これは従来の命名規則であり、個別のインジェストゲートウェイが必要であることを示すものではありません。
エラー | 原因 | 解決方法 |
|---|---|---|
| パイプラインは直接発行モードではありません。 | 統合CDCパイプラインでは、直接公開Modeが自動的に設定されます。このエラーが表示された場合は、パイプラインを再作成してください。 |
| バイナリログが有効になっていないか、 |
|
| 指定されたソーステーブルが存在しないか、または削除されています。 | テーブルが存在し、接続ユーザーがアクセス権限を持っていることを確認してください。 |
| ソーススキーマは存在しません。 | ソースデータベースにスキーマが存在することを確認してください。 |
| ソースデータベースタイプはサポートされていません。 | 統合されたCDCコネクタは、MySQL、SQL Server、およびOracleをサポートしています。 |
| テーブル仕様に |
|
| ワークスペースの機能フラグは有効になっていません。 | 統合CDCコネクタをワークスペースで有効にするには、Databricksアカウントチームにお問い合わせください。 |
ここに記載されていない問題が発生した場合は、次の点にご注意ください:
- Databricks UI または
GET /api/2.0/pipelines/<pipeline-id>/eventsを通じてパイプラインイベントログを確認してください。 - Catalog Explorer から Unity Catalog 接続をテストして、ソースに到達可能であることを確認します。
binlog_format=ROWとbinlog_row_image=FULLを使用して、ソースデータベースでバイナリログが有効になっていることを確認してください。- データベースユーザーにMySQLユーザー権限の付与に記載されているMySQL権限があることを確認してください。
- パイプライン仕様に
"channel": "PREVIEW"が含まれていることを確認してください。