メインコンテンツまでスキップ

SQL Server の統合CDCパイプラインの作成

備考

ベータ版

この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。

統合されたCDCパイプラインは、単一のパイプラインを使用してSQL ServerからDatabricksへ変更データを取り込みます。個別の取り込みゲートウェイと取り込みパイプラインを必要とする標準のゲートウェイベースアーキテクチャとは異なり、統合されたCDCパイプラインは、抽出と適用ステージの両方を1回のパイプライン更新で実行します。

統合CDCコネクタの利用時期

次の表は、統合されたCDCパイプラインと標準のゲートウェイベースのアーキテクチャを比較したものです。

機能

標準 CDC (ゲートウェイベース)

統合 CDC

パイプライン数

2つ(取り込みゲートウェイと取り込みパイプライン)

1つ(統一されたパイプライン)

設定

ゲートウェイを作成し、ゲートウェイIDを参照する取り込みパイプラインを作成します。

Unity Catalog接続を参照する単一のパイプラインを作成する

ゲートウェイモード

ゲートウェイは継続的に実行されます

パイプラインは、更新ごとに抽出を組み込みます。

接続リファレンス

ingestion_gateway_id

connection_name (Unity Catalog接続)

コネクタタイプ

暗黙

明示的: connector_type: CDC

ステージングボリューム

ゲートウェイはステージングボリュームを内部で管理します。

ステージングボリュームはdata_staging_optionsを使用して構成します。指定されていない場合、パイプラインはデフォルト設定を自動的に作成します。

ソースデータベースの設定については、Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。同じソース構成はアーキテクチャの両方に適用されます。

統合CDCパイプラインの実行方法

各パイプラインの更新は、2つのステージを順次実行します。

  1. 抽出。 パイプラインはUnity Catalog接続を使用してソースデータベースに接続します。最初の実行時または完全更新時に、初期スナップショットが取得されます。後続の実行では、データベースの組み込み変更追跡メカニズムを使用して、増分変更 (挿入、更新、削除) をキャプチャします。パイプラインは抽出されたデータを Unity Catalog ステージングボリュームに書き込みます。
  2. アプリケーション。 パイプラインはステージングボリュームから読み込み、Unity Catalog にある宛先ストリーミングテーブルに変更を適用します。マージ操作は、設定されたプライマリキーとSCDタイプを使用します。パイプラインは厳密に 1 回のセマンティクスを保証します。

ベータ期間中、各パイプラインの更新の最大ランタイムは約30分です。ソースに1回の更新で処理しきれないほどの変更がある場合、次回のスケジュールされた更新は前回の更新が停止した箇所から再開されます。データを定期的に取り込むには、Lakeflow ジョブ タスクを使用してパイプラインをスケジュールします。

要件

  • ワークスペースは Unity Catalog が有効になっています。

  • サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。

  • 接続を作成する場合は、メタストアに対する CREATE CONNECTION 特権が必要です。Unity Catalog での特権の管理を参照してください。

    コネクタがUIベースのパイプラインオーサリングをサポートしている場合、このページのステップを完了することで、接続とパイプラインを同時に作成できます。ただし、API ベースのパイプライン オーサリングを使用する場合は、このページのステップを完了する前に Catalog Explorer で接続を作成する必要があります。「管理対象取り込みソースへの接続」を参照してください。

  • 既存の接続を使用する場合:接続に対するUSE CONNECTION権限、またはALL PRIVILEGESが必要です。

  • ターゲットカタログに対する USE CATALOG 権限があります。

  • 既存のスキーマに対してUSE SCHEMACREATE TABLE、およびCREATE VOLUMEの権限、またはターゲットカタログに対してCREATE SCHEMAの権限があります。

  • ワークスペースは統合CDCコネクター機能が有効になっていなければなりません。Databricks アカウント チームにお問い合わせください。

  • プライマリ SQL Server インスタンスにアクセスできます。統合CDCコネクターは、リードレプリカ、スタンバイインスタンス、またはセカンダリインスタンスをサポートしていません。

  • SQL Server のソース セットアップが完了しました。Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。

  • 次の権限があります。

    • CREATE CONNECTION メタストアの(新しいUnity Catalog接続を作成する場合)、または既存の接続の USE CONNECTION
    • USE CATALOG 宛先カタログで。
    • USE SCHEMA CREATE TABLE およびアップグレード先スキーマで
    • CREATE VOLUME 宛先スキーマに、またはdata_staging_optionsで指定されたスキーマにステージングボリュームは、data_staging_options が設定されていなくても必要です。パイプラインが宛先スキーマに自動的に作成するためです。

コンピュートの要件

統合された CDC パイプラインは、クラシック コンピュートまたはサーバレス コンピュートで実行されます。

  • Classic コンピュート コンピュートプレーンは、ネットワーク経由でSQL Serverインスタンスに到達できる必要があります。クラウドホスト型SQL Server (Azure SQL、Amazon RDS) の場合、Databricksコンピュートプレーンからの受信接続を許可します。オンプレミスの SQL Server には、Azure ExpressRoute、AWS Direct Connect、または VPN を使用してください。
  • サーバレスコンピュート。 Databricks サーバレス コンピュートとソースデータベースの間の サーバレスネットワーク接続 を構成します。オンプレミスのソースには、構成されたサーバレスエグレスを経由するネットワーク パスが必要です(たとえば、ExpressRoute または VPN を使用したトランジットゲートウェイやピアリングされたVNetなど)。

従来のコンピュートの場合、無制限のクラスター作成権限、またはcluster_typedltに、runtime_engineSTANDARDに固定し、効率的な抽出のために少なくとも8コアが推奨されるカスタムクラスターポリシーを使用できます。

SQL ServerへのUnity Catalog接続を作成する

パイプラインを作成する前に、SQL Server への Unity Catalog 接続を作成してください。SQL Server 接続を作成するを参照してください。

統合CDCパイプラインを作成

API、Databricks CLI、ノートブック、または宣言型オートメーションバンドルを使用して、統合されたCDCパイプラインを作成します。UI作成はまだ利用できません。

重要

すべてのパイプライン作成リクエストには"channel": "PREVIEW"を含める必要があります。

バンドルファイルでパイプラインリソースを定義します(例:resources/integrated_cdc_pipeline.yml):

YAML
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'

resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_catalog: 'my_database'
source_schema: 'dbo'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'

パイプラインをスケジュール実行するには、パイプラインをトリガーするジョブ(例:resources/integrated_cdc_job.yml)を定義します。各抽出段階は少なくとも10分間実行されるため、60分以上の間隔が適切な目安となります。

YAML
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'

Databricks CLI でバンドルをデプロイする:

Shell
databricks bundle deploy
databricks bundle run integrated_cdc_job

詳細については、「宣言型オートメーションバンドルとは」をご覧ください。

定期的な更新をスケジュールする

統合されたCDCパイプラインは、トリガーモードでのみ実行されます。繰り返しスケジュールでデータを取り込むには、パイプラインを実行する Lakeflow Jobs タスクを作成します。各更新には約30分かかり、1回の更新で変更のバックログ全体を処理しきれない場合があります。後続の更新が追いつくように、十分な頻度でパイプラインをスケジュール設定してください。60分という開始点は、ほとんどのワークロードに適しています。トリガーが作動中に前回の更新がまだ実行されている場合、新しい更新はキューに追加されます。

構成リファレンス

パイプラインパラメーター

パラメーター

Type

説明

name

string

パイプラインの名前。

pipeline_type

string

MANAGED_INGESTIONである必要があります。

channel

string

PREVIEWである必要があります。

serverless

Boolean

true サーバレスコンピュート向け、false クラシックコンピュート向けサーバレス コンピュートには、ソース データベースへのサーバレス ネットワーキングが必要です。

catalog

string

デフォルトの宛先カタログです。テーブルごとのdestination_catalogが指定されていない場合に使用します。

schema

string

デフォルトの宛先スキーマです。テーブルごとのdestination_schemaが指定されていない場合に使用します。

ingestion_definition.connection_name

string

Unity Catalog のソースデータベースへの接続。

ingestion_definition.connector_type

string

CDCである必要があります。

ingestion_definition.objects

array

取り込むテーブルまたはスキーマのリスト。

ingestion_definition.data_staging_options

オブジェクト

オプション。パイプラインがステージング ボリュームを作成するカタログとスキーマ。パイプラインの宛先スキーマにデフォルト設定されます。

テーブルの指定

パラメーター

必須

説明

source_catalog

はい

ソースデータベース名です。

source_schema

はい

ソーススキーマ名です。

source_table

はい

ソーステーブル名。

destination_catalog

No

宛先カタログ。デフォルトはパイプラインのcatalogです。

destination_schema

No

アップグレード先スキーマです。デフォルトはパイプラインのschemaです。

destination_table

No

宛先テーブル名です。デフォルトはsource_tableです。

テーブル構成

パラメーター

デフォルト

説明

primary_keys

自動検出

各行を識別する列指定されていない場合は、ソース主キーから自動的に検出されます。

scd_type

SCD_TYPE_1

SCD_TYPE_1 最新バージョンのみを保持します。SCD_TYPE_2は完全な履歴を保持し、ソース上でSQL Server CDCを必要とします。SCDタイプ2は変更の追跡ではサポートされていません。

sequence_by

自動検出

CDC イベントの論理順序付けに使用される列。指定されていない場合、ソースCDCメカニズムに基づいて自動検出されます。

SQL Server のデータ型マッピングについては、「SQL Server コネクタ リファレンス」を参照してください。統合されたCDCパイプラインは、自動的な型の拡張をサポートします。ソース列の型が拡張された場合(例えば、INT から BIGINT へ)、宛先テーブルは自動的に適応します。

パイプラインを監視してください

統合CDCパイプラインを作成して開始した後、以下の方法でそのステータスを監視します。

  • Databricks UI。パイプライン 」セクションでパイプラインを開くと、更新ステータス、テーブルごとの取り込みメトリクス、およびリネージを表示できます。

  • REST API。

    Text
    GET /api/2.0/pipelines/<pipeline-id>
  • イベントAPI。

    Text
    GET /api/2.0/pipelines/<pipeline-id>/events

最初のパイプライン更新では、選択したすべてのテーブルの完全なスナップショットが実行されます。これには増分更新よりも時間がかかる場合があります。大きなテーブルの場合、初期スナップショットの完了には複数回のスケジュールされた更新が必要となる場合があります。後続の各更新は、以前の更新が中断したところから続行されます。

データ取り込みを確認するには:

SQL
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;

-- View recent changes (SCD Type 2 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;

フル更新および自動フル更新の動作については、ターゲットテーブルを完全に更新をご覧ください。

統合型CDCパイプラインでは、垂直オートスケールがデフォルトで有効になっています。メモリ不足が原因でパイプラインの更新が失敗した場合、次回の更新でより大きなドライバーが自動的にプロビジョニングされます。この動作を上書きするには、カスタムのクラスターポリシーを使用します。

制限事項:

  • ベータ版 統合CDCコネクタには、ワークスペースレベルでの有効化が必要です。Databricks アカウント チームにお問い合わせください。
  • トリガーモードのみです。 統合CDCパイプラインは、連続的(常時稼働)な実行には対応していません。Lakeflowジョブのタスクを使用したパイプラインのスケジュール
  • APIのみの作成。 パイプラインの作成は、REST API、Databricks CLI、ノートブック、およびDeclarative Automation Bundlesを介して利用できます。UI作成はまだ対応していません。
  • チャンネルはPREVIEW である必要があります。 パイプライン仕様には"channel": "PREVIEW"を含める必要があります。
  • 接続とコネクタの種類は変更できません。 connection_name および connector_type はパイプラインの作成後に変更できません。ソースを変更するには、新しいパイプラインを作成してください。
  • パイプラインあたりの推奨最大テーブル数は300です。
  • プライマリインスタンスのみ。 統合CDCコネクターは、リードレプリカ、スタンバイインスタンス、またはセカンダリインスタンスをサポートしていません。
  • 主キーのないテーブル パイプラインはすべての非LOBカラムを複合キーとして扱います。重複行は、SCD タイプ 2 を有効にしない限り、1つの行にまとめられる可能性があります。
  • 初期スナップショットは複数回の更新にまたがる可能性があります。 大規模なテーブルの場合、初期スナップショットは1回の更新で完了しない可能性があります。次回以降のスケジュール更新は、前回の更新の続きから再開されます。
  • 更新ごとに約30分かかります。 ベータ期間中は、パイプラインが1回の更新ですべての変更バックログを必ずしも処理するとは限りません。その後のスケジュールされた更新は、前回の更新が中断した箇所から処理を再開します。このランタイムをベータ版の期間中に構成することはできません。
  • ログのパージにはフル更新が必要です。 SQL Serverがパイプラインによる処理の前に変更追跡ログまたはCDCログをパージした場合、影響を受けるテーブルで完全更新を実行してください。パイプラインはこの状態を検出し、イベントログにエラーを表示します。

トラブルシューティング

注記

一部のエラーコードは、INGESTION_GATEWAY_ をプレフィックスとして使用します。これはレガシーな命名規則であり、別途取り込みゲートウェイが必要になるわけではありません。

エラー

原因

解決方法

NOT_IN_DEFAULT_PUBLISHING_MODE

パイプラインは直接公開モードではありません。

統合CDCパイプラインには、直接公開モードが自動的に設定されます。このエラーが表示された場合は、パイプラインを再作成してください。

INGESTION_GATEWAY_CDC_NOT_ENABLED

CDC または変更追跡は、1つ以上のソーステーブルで有効になっていません。

影響を受けるテーブルでCDCまたは変更追跡を有効にしてください。Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。

INGESTION_GATEWAY_MISSING_TABLE_IN_SOURCE

指定されたソーステーブルが存在しません、または削除されています。

テーブルが存在し、接続ユーザーにアクセス権があることを確認してください。

INGESTION_GATEWAY_SOURCE_SCHEMA_MISSING_ENTITY

ソーススキーマは存在しません。

ソースデータベースにスキーマが存在することを確認してください。

UNSUPPORTED_SOURCE_TYPE_FOR_CDC_CONNECTOR

ソースデータベースタイプはサポートされていません。

統合CDCコネクタはSQL ServerおよびOracleをサポートします。

SOURCE_TABLE_REQUIRED

テーブル仕様にsource_tableがありません。

objects配列の各テーブル仕様にsource_tableを追加します。

Integrated CDC connector is disabled

ワークスペースの機能フラグは有効になっていません。

ワークスペースで統合CDCコネクターを有効にするには、Databricks アカウント チームにお問い合わせください。

ここに記載されていない問題が発生した場合は、

  1. Databricks UIで、またはGET /api/2.0/pipelines/<pipeline-id>/eventsを通じてパイプラインイベントログを確認します。
  2. Catalog Explorer から Unity Catalog 接続をテストして、ソースが到達可能であることを確認してください。
  3. 変更追跡またはCDCがソースデータベースとテーブルで有効になっていることを確認してください。
  4. データベースユーザーに、Microsoft SQL Server データベースユーザー要件に記載されている SQL Server 権限があることを確認します。
  5. パイプライン仕様に"channel": "PREVIEW"が含まれていることを確認してください。

その他のリソース