メインコンテンツまでスキップ

MySQL用の統合CDCパイプラインを作成する

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

統合CDCパイプラインは、単一のパイプラインを使用してMySQLからDatabricksに変更データを取り込みます。標準のゲートウェイベースのアーキテクチャとは異なり、統合CDCパイプラインは抽出ステージと適用ステージの両方を1回のパイプライン更新で実行します。標準アーキテクチャには、個別の取り込みゲートウェイと取り込みパイプラインが必要です。

統合CDCコネクタを使用するタイミング

次の表は、統合 CDC パイプラインと標準のゲートウェイベースのアーキテクチャを比較したものです。

機能

標準CDC(ゲートウェイベース)

統合 CDC

パイプラインの数

2つ(インジェストゲートウェイとインジェストパイプライン)

1つの(統合されたパイプライン)

設定

ゲートウェイを作成してから、ゲートウェイIDを参照する取り込みパイプラインを作成します。

Unity Catalog接続を参照する単一のパイプラインを作成する

ゲートウェイモード:

ゲートウェイは、独立した長時間実行プロセスとして継続的に実行されます

抽出は、スケジュールされた各パイプラインの更新に埋め込まれています。

接続リファレンス

ingestion_gateway_id

connection_name (Unity Catalog接続)

コネクタタイプ

暗黙的なデフォルトCDCの動作

明示的: connector_type: CDC

ステージングボリューム

ゲートウェイによって内部的に管理されます。

宛先スキーマに自動作成される、または次の方法で構成される data_staging_options

パイプラインモード

連続

トリガーモードのみ

コンピュート

ゲートウェイにはクラシック、マネージド取り込みパイプラインにはサーバレス

Classicコンピュートのみ。サーバレスはサポートされていません。

自動フルリフレッシュ

既存のMySQLゲートウェイベースのフローではサポートされていません

サポートされています

最大テーブル数

パイプラインあたり250

パイプラインあたり250

SCDタイプ2

サポートされていない

サポートされていない

認証

ユーザー名/パスワード

ユーザー名/パスワード

ソースデータベースのセットアップについては、「Databricks への取り込み用に MySQL を構成する」を参照してください。同じソース構成が両方のアーキテクチャに適用されます。

統合された CDC パイプラインの実行方法

各パイプラインの更新は、2つのステージを順次実行します:

  1. 抽出。 パイプラインは、Unity Catalog接続を使用してソースデータベースに接続します。初回実行時または完全更新時に、初期スナップショットが取得されます。後続の実行時に、バイナリログ (binlog) を使用して増分変更 (挿入、更新、削除) を取得します。パイプラインは、抽出したデータをUnity Catalogステージングボリュームに書き込みます。
  2. アプリケーション。 パイプラインはステージングボリュームから読み取り、変更をUnity Catalog内の宛先ストリーミングテーブルに適用します。マージ操作では、設定された主キーとSCDタイプを使用します。パイプラインはexactry-onceセマンティクスを保証します。

ベータ期間中、各パイプラインの更新の最大ランタイムは約30分です。ソースに1回の更新で処理できる量を超える変更がある場合、次回のスケジュールされた更新は前回の更新が停止した場所から再開されます。定期的にデータを取り込むには、Lakeflow Jobsタスクを使用してパイプラインをスケジュールします。

要件

  • ワークスペースでUnity Catalogが有効になっています。

  • 接続を作成する予定の場合:メタストアに対するCREATE CONNECTION権限があります。Unity Catalog での権限の管理を参照してください。

    コネクタがUIベースのパイプラインオーサリングをサポートしている場合は、このページのステップを完了することで、接続とパイプラインを同時に作成できます。ただし、APIベースのパイプラインオーサリングを使用する場合は、このページのステップを完了する前に、カタログエクスプローラーで接続を作成する必要があります。管理された取り込みソースに接続するを参照してください。

  • 既存の接続を使用する予定の場合: 接続に対してUSE CONNECTION権限またはALL PRIVILEGES権限を持っている必要があります。

  • ターゲットカタログにはUSE CATALOG権限があります。

  • 既存のスキーマに対するUSE SCHEMACREATE TABLE、およびCREATE VOLUME権限、またはターゲットカタログに対するCREATE SCHEMA権限を持っています。

  • お使いのワークスペースでは、統合CDCコネクター機能を有効にする必要があります。Databricks アカウント チームにお問い合わせください。

  • プライマリ MySQLインスタンスにアクセスできます。統合されたCDCコネクタは、読み取りレプリカをサポートしていません。

  • binlog_format=ROWおよびbinlog_row_image=FULLを使用して、ソースデータベースでバイナリログを有効にします。

  • MySQL ソースのセットアップが完了しました。Databricks への取り込み用に MySQL を構成するを参照してください。

  • 次の権限が必要です。

    • CREATE CONNECTION メタストア上 (新しいUnity Catalog接続を作成する場合)、または既存の接続でUSE CONNECTION
    • USE CATALOG 送信先カタログ上。
    • USE SCHEMA および宛先スキーマでのCREATE TABLE
    • CREATE VOLUME 宛先スキーマ、またはdata_staging_optionsで指定されたスキーマについて。data_staging_optionsが設定されていなくても、パイプラインが宛先スキーマに自動作成するため、ステージングボリュームが必要です。

コンピュートの要件

MySQLの統合CDCパイプラインには、クラシックコンピュートが必要です。サーバレスコンピュートはサポートされていません。

  • クラシックコンピュート :コンピュートプレーンは、DatabricksワークスペースのVPCまたはVNetで実行され、ネットワーク経由でMySQLインスタンスに到達する必要があります。サポートされているネットワークパスには、VPCまたはVNetピアリング、パブリックエンドポイント、およびオンプレミスMySQLの場合、AWS Direct Connect、Azure ExpressRoute、またはVPNが含まれます。

クラシックコンピュートの場合、無制限のクラスター作成権限を使用するか、cluster_typedlt に固定され、runtime_engineSTANDARD に固定されたカスタムの クラスターポリシー を使用します。Databricks では、効率的な抽出のために少なくとも8コアが推奨されます。

MySQLへのUnity Catalog接続を作成します。

パイプラインを作成する前に、MySQLへのUnity Catalog接続を作成します。MySQL接続を作成するを参照してください。

統合された CDC パイプラインを作成します

API、Databricks CLI、ノートブック、または宣言型自動化バンドルを使用して、統合されたCDCパイプラインを作成します。UIの作成はまだ利用できません。

重要

すべてのパイプライン作成リクエストには、"channel": "PREVIEW" が含まれている必要があります。

バンドルファイル(例:resources/integrated_cdc_pipeline.yml)でパイプラインリソースを定義します:

YAML
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'

resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_schema: 'my_database'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'

スケジュールに従ってパイプラインを実行するには、パイプラインをトリガーするジョブ(例:resources/integrated_cdc_job.yml)を定義します。各抽出ステージは最低10分間実行されるため、60分以上の間隔が良い出発点となります:

YAML
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'

Databricks CLI を使用してバンドルをデプロイします:

Shell
databricks bundle deploy
databricks bundle run integrated_cdc_job

詳細情報については、宣言型オートメーションバンドルとは何ですか?をご覧ください。

定期的な更新をスケジュール

統合CDCパイプラインはトリガーモードでのみ実行されます。定期的なスケジュールでデータを取り込むには、パイプラインを実行するLakeflow Jobsタスクを作成します。各更新の実行には約30分かかります。また、1回の更新ですべての変更バックログの処理を完了できない場合があります。パイプラインのスケジュールを60分間隔またはそれよりも頻繁に設定し、後続の更新が追いつくようにしてください。前回の更新がまだ実行中にトリガーが起動した場合、新しい更新はキューに入れられます。

構成リファレンス

パイプラインパラメーター

パラメーター

Type

説明

name

string

パイプラインの名前。

pipeline_type

string

MANAGED_INGESTIONである必要があります。

channel

string

PREVIEWである必要があります。

serverless

Boolean

MySQL統合CDCパイプラインの場合、falseである必要があります。サーバレスコンピュートはサポートされていません。

catalog

string

デフォルトの宛先カタログ。テーブルごとのdestination_catalogが指定されていない場合に使用されます。

schema

string

デフォルトの宛先スキーマ。テーブルごとのdestination_schemaが指定されていない場合に使用されます。

ingestion_definition.connection_name

string

ソースデータベースへの Unity Catalog 接続。

ingestion_definition.connector_type

string

CDCである必要があります。

ingestion_definition.objects

array

取り込むテーブルまたはスキーマのリスト。

ingestion_definition.data_staging_options

オブジェクト

オプション。パイプラインがステージングボリュームを作成するカタログとスキーマ。パイプラインの宛先スキーマにデフォルト設定されます。

テーブルの仕様

パラメーター

必須

説明

source_schema

はい

ソースのMySQLデータベース名。

source_table

はい

ソーステーブル名。

destination_catalog

No

宛先カタログです。デフォルトでは、パイプラインのcatalogです。

destination_schema

No

送信先スキーマ。パイプラインの schema にデフォルト設定されます。

destination_table

No

宛先テーブル名。source_tableがデフォルトです。

テーブル構成

パラメーター

デフォルト

説明

primary_keys

自動検出されました。

各行を識別する列。指定されていない場合、ソースの主キーから自動検出されます。

scd_type

SCD_TYPE_1

SCD_TYPE_1 最新バージョンのみを保持します。SCDタイプ2は、MySQL統合CDCパイプラインではサポートされていません。

sequence_by

自動検出されました。

CDCイベントの順序付けに使用される列。指定されていない場合、ソースCDCメカニズムに基づいて自動検出されます。

auto_full_refresh_policy

無効

サポートされていないDDL操作が検出された場合に、自動フル更新を構成します。自動フル更新ポリシーを参照してください。

MySQLのデータ型マッピングについては、MySQLコネクタの参照を参照してください。統合CDCパイプラインは自動型拡張をサポートしています。ソース列の型が拡張される(例:INTからBIGINT)と、宛先テーブルは自動的に適応します。

パイプラインを監視します

統合CDCパイプラインを作成して開始したら、次を使用してそのステータスを監視してください。

  • Databricks UI。 更新ステータス、テーブルごとの取り込みメトリクス、およびリネージを表示するには、 パイプライン セクションでパイプラインを開きます。

  • REST API。

    Text
    GET /api/2.0/pipelines/<pipeline-id>
  • イベントAPI。

    Text
    GET /api/2.0/pipelines/<pipeline-id>/events

最初のパイプラインの更新は、選択されたすべてのテーブルの完全なスナップショットを実行します。これは増分更新よりも時間がかかる場合があります。大規模なテーブルの場合、初期スナップショットは完了するために複数のスケジュールされた更新が必要になる場合があります。その後の各更新は、前回の更新が中断したところから再開されます。

データ取り込みを確認するには:

SQL
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;

-- View recent changes (SCD Type 1 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;

フル更新と自動フル更新の動作については、ターゲットテーブルを完全に更新するを参照してください。

統合CDCパイプラインは、デフォルトで垂直オートスケールを有効にします。メモリ不足の状態が原因でパイプラインの更新が失敗した場合、次回の更新で、より大きなドライバーが自動的にプロビジョニングされます。この動作をオーバーライドするには、カスタムクラスターポリシーを使用してください。

制限事項

  • ベータ版。 統合CDCコネクタには、ワークスペースレベルでの有効化が必要です。Databricksアカウントチームにご連絡ください。
  • トリガーモードのみ。 統合 CDC パイプラインは、継続的 (常時稼動) な実行をサポートしていません。Lakeflow Jobs タスクを使用してパイプラインをスケジュールします。
  • APIのみの作成。 パイプラインの作成は、REST API、Databricks CLI、ノートブック、および宣言型オートメーションバンドルを介して利用できます。UIでの作成はまだサポートされていません。
  • チャンネルは PREVIEW である必要があります。 パイプライン仕様には "channel": "PREVIEW" が含まれる必要があります。
  • 接続とコネクタのタイプは変更できません。 パイプラインの作成後は、connection_nameconnector_typeは変更できません。ソースを変更するには、新しいパイプラインを作成します。
  • パイプラインあたりのテーブルは最大250です。
  • プライマリインスタンスのみ。 統合CDCコネクタは読み取りレプリカをサポートしていません。プライマリMySQLインスタンスに接続します。
  • SCDタイプ2はサポートされていません。
  • 主キーのないテーブル。 パイプラインは、すべての非LOB列を複合キーとして扱います。重複する行は、1つの行にまとめられる場合があります。
  • 初期スナップショットは複数の更新にまたがる場合があります。 大きなテーブルの場合、初期スナップショットは1回の更新で完了しない場合があります。後続のスケジュールされた更新は、前回の更新が停止したところから再開します。
  • **各更新は、約30分で実行されます。** ベータ期間中、パイプラインは1回の更新ですべての変更バックログを処理するわけではありません。その後のスケジュールされた更新は、前回の更新が終了した時点から処理を再開します。ベータ期間中、このランタイムを設定することはできません。
  • バイナリログのパージには完全な更新が必要です。 パイプラインが変更を処理する前にMySQLバイナリログがパージされた場合は、影響を受けるテーブルで完全な更新を実行します。パイプラインがこの状態を検出し、イベントログにエラーを表示します。
  • サーバレスコンピュートはサポートされていません。 MySQL統合CDCパイプラインにはクラシックコンピュートが必要です。

トラブルシューティング

注記

一部のエラーコードでは、INGESTION_GATEWAY_ のプレフィックスが使用されます。これは従来の命名規則であり、個別のインジェストゲートウェイが必要であることを示すものではありません。

エラー

原因

解決方法

NOT_IN_DEFAULT_PUBLISHING_MODE

パイプラインは直接発行モードではありません。

統合CDCパイプラインでは、直接公開Modeが自動的に設定されます。このエラーが表示された場合は、パイプラインを再作成してください。

INGESTION_GATEWAY_CDC_NOT_ENABLED

バイナリログが有効になっていないか、binlog_formatROWに設定されていません。

binlog_format=ROW および binlog_row_image=FULL を使用してバイナリログを有効にします。Databricks への取り込み用に MySQL を構成するを参照してください。

INGESTION_GATEWAY_MISSING_TABLE_IN_SOURCE

指定されたソーステーブルが存在しないか、または削除されています。

テーブルが存在し、接続ユーザーがアクセス権限を持っていることを確認してください。

INGESTION_GATEWAY_SOURCE_SCHEMA_MISSING_ENTITY

ソーススキーマは存在しません。

ソースデータベースにスキーマが存在することを確認してください。

UNSUPPORTED_SOURCE_TYPE_FOR_CDC_CONNECTOR

ソースデータベースタイプはサポートされていません。

統合されたCDCコネクタは、MySQL、SQL Server、およびOracleをサポートしています。

SOURCE_TABLE_REQUIRED

テーブル仕様に source_table がありません。

objects 配列内の各テーブル仕様に source_table を追加します。

Integrated CDC connector is disabled

ワークスペースの機能フラグは有効になっていません。

統合CDCコネクタをワークスペースで有効にするには、Databricksアカウントチームにお問い合わせください。

ここに記載されていない問題が発生した場合は、次の点にご注意ください:

  1. Databricks UI またはGET /api/2.0/pipelines/<pipeline-id>/eventsを通じてパイプラインイベントログを確認してください。
  2. Catalog Explorer から Unity Catalog 接続をテストして、ソースに到達可能であることを確認します。
  3. binlog_format=ROWbinlog_row_image=FULLを使用して、ソースデータベースでバイナリログが有効になっていることを確認してください。
  4. データベースユーザーにMySQLユーザー権限の付与に記載されているMySQL権限があることを確認してください。
  5. パイプライン仕様に"channel": "PREVIEW"が含まれていることを確認してください。

その他のリソース