SQL Server の統合CDCパイプラインの作成
ベータ版
この機能はベータ版です。ワークスペース管理者は、 プレビュー ページからこの機能へのアクセスを制御できます。Databricksのプレビューを管理するを参照してください。
統合されたCDCパイプラインは、単一のパイプラインを使用してSQL ServerからDatabricksへ変更データを取り込みます。個別の取り込みゲートウェイと取り込みパイプラインを必要とする標準のゲートウェイベースアーキテクチャとは異なり、統合されたCDCパイプラインは、抽出と適用ステージの両方を1回のパイプライン更新で実行します。
統合CDCコネクタの利用時期
次の表は、統合されたCDCパイプラインと標準のゲートウェイベースのアーキテクチャを比較したものです。
機能 | 標準 CDC (ゲートウェイベース) | 統合 CDC |
|---|---|---|
パイプライン数 | 2つ(取り込みゲートウェイと取り込みパイプライン) | 1つ(統一されたパイプライン) |
設定 | ゲートウェイを作成し、ゲートウェイIDを参照する取り込みパイプラインを作成します。 | Unity Catalog接続を参照する単一のパイプラインを作成する |
ゲートウェイモード | ゲートウェイは継続的に実行されます | パイプラインは、更新ごとに抽出を組み込みます。 |
接続リファレンス |
|
|
コネクタタイプ | 暗黙 | 明示的: |
ステージングボリューム | ゲートウェイはステージングボリュームを内部で管理します。 | ステージングボリュームは |
ソースデータベースの設定については、Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。同じソース構成はアーキテクチャの両方に適用されます。
統合CDCパイプラインの実行方法
各パイプラインの更新は、2つのステージを順次実行します。
- 抽出。 パイプラインはUnity Catalog接続を使用してソースデータベースに接続します。最初の実行時または完全更新時に、初期スナップショットが取得されます。後続の実行では、データベースの組み込み変更追跡メカニズムを使用して、増分変更 (挿入、更新、削除) をキャプチャします。パイプラインは抽出されたデータを Unity Catalog ステージングボリュームに書き込みます。
- アプリケーション。 パイプラインはステージングボリュームから読み込み、Unity Catalog にある宛先ストリーミングテーブルに変更を適用します。マージ操作は、設定されたプライマリキーとSCDタイプを使用します。パイプラインは厳密に 1 回のセマンティクスを保証します。
ベータ期間中、各パイプラインの更新の最大ランタイムは約30分です。ソースに1回の更新で処理しきれないほどの変更がある場合、次回のスケジュールされた更新は前回の更新が停止した箇所から再開されます。データを定期的に取り込むには、Lakeflow ジョブ タスクを使用してパイプラインをスケジュールします。
要件
-
ワークスペースは Unity Catalog が有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。
-
接続を作成する場合は、メタストアに対する
CREATE CONNECTION特権が必要です。Unity Catalog での特権の管理を参照してください。コネクタがUIベースのパイプラインオーサリングをサポートしている場合、このページのステップを完了することで、接続とパイプラインを同時に作成できます。ただし、API ベースのパイプライン オーサリングを使用する場合は、このページのステップを完了する前に Catalog Explorer で接続を作成する必要があります。「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する場合:接続に対する
USE CONNECTION権限、またはALL PRIVILEGESが必要です。 -
ターゲットカタログに対する
USE CATALOG権限があります。 -
既存のスキーマに対して
USE SCHEMA、CREATE TABLE、およびCREATE VOLUMEの権限、またはターゲットカタログに対してCREATE SCHEMAの権限があります。 -
ワークスペースは統合CDCコネクター機能が有効になっていなければなりません。Databricks アカウント チームにお問い合わせください。
-
プライマリ SQL Server インスタンスにアクセスできます。統合CDCコネクターは、リードレプリカ、スタンバイインスタンス、またはセカンダリインスタンスをサポートしていません。
-
SQL Server のソース セットアップが完了しました。Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。
-
次の権限があります。
CREATE CONNECTIONメタストアの(新しいUnity Catalog接続を作成する場合)、または既存の接続のUSE CONNECTION。USE CATALOG宛先カタログで。USE SCHEMACREATE TABLEおよびアップグレード先スキーマでCREATE VOLUME宛先スキーマに、またはdata_staging_optionsで指定されたスキーマにステージングボリュームは、data_staging_optionsが設定されていなくても必要です。パイプラインが宛先スキーマに自動的に作成するためです。
コンピュートの要件
統合された CDC パイプラインは、クラシック コンピュートまたはサーバレス コンピュートで実行されます。
- Classic コンピュート コンピュートプレーンは、ネットワーク経由でSQL Serverインスタンスに到達できる必要があります。クラウドホスト型SQL Server (Azure SQL、Amazon RDS) の場合、Databricksコンピュートプレーンからの受信接続を許可します。オンプレミスの SQL Server には、Azure ExpressRoute、AWS Direct Connect、または VPN を使用してください。
- サーバレスコンピュート。 Databricks サーバレス コンピュートとソースデータベースの間の サーバレスネットワーク接続 を構成します。オンプレミスのソースには、構成されたサーバレスエグレスを経由するネットワーク パスが必要です(たとえば、ExpressRoute または VPN を使用したトランジットゲートウェイやピアリングされたVNetなど)。
従来のコンピュートの場合、無制限のクラスター作成権限、またはcluster_typeをdltに、runtime_engineをSTANDARDに固定し、効率的な抽出のために少なくとも8コアが推奨されるカスタムクラスターポリシーを使用できます。
SQL ServerへのUnity Catalog接続を作成する
パイプラインを作成する前に、SQL Server への Unity Catalog 接続を作成してください。SQL Server 接続を作成するを参照してください。
統合CDCパイプラインを作成
API、Databricks CLI、ノートブック、または宣言型オートメーションバンドルを使用して、統合されたCDCパイプラインを作成します。UI作成はまだ利用できません。
すべてのパイプライン作成リクエストには"channel": "PREVIEW"を含める必要があります。
- Declarative Automation Bundles
- Databricks notebook
- Databricks CLI
- REST API
バンドルファイルでパイプラインリソースを定義します(例:resources/integrated_cdc_pipeline.yml):
variables:
pipeline_name:
description: 'Name for the integrated CDC pipeline'
connection_name:
description: 'Unity Catalog connection name'
dest_catalog:
description: 'Destination catalog for ingested data'
dest_schema:
description: 'Destination schema for ingested data'
resources:
pipelines:
integrated_cdc_pipeline:
name: ${var.pipeline_name}
pipeline_type: MANAGED_INGESTION
channel: PREVIEW
serverless: false
catalog: ${var.dest_catalog}
schema: ${var.dest_schema}
ingestion_definition:
connection_name: ${var.connection_name}
connector_type: CDC
objects:
- table:
source_catalog: 'my_database'
source_schema: 'dbo'
source_table: 'customers'
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
destination_table: 'customers'
table_configuration:
primary_keys:
- 'customer_id'
scd_type: 'SCD_TYPE_1'
パイプラインをスケジュール実行するには、パイプラインをトリガーするジョブ(例:resources/integrated_cdc_job.yml)を定義します。各抽出段階は少なくとも10分間実行されるため、60分以上の間隔が適切な目安となります。
resources:
jobs:
integrated_cdc_job:
name: '${var.pipeline_name}-job'
tasks:
- task_key: 'cdc_ingestion'
pipeline_task:
pipeline_id: ${resources.pipelines.integrated_cdc_pipeline.id}
schedule:
quartz_cron_expression: '0 0 * * * ?'
timezone_id: 'UTC'
Databricks CLI でバンドルをデプロイする:
databricks bundle deploy
databricks bundle run integrated_cdc_job
詳細については、「宣言型オートメーションバンドルとは」をご覧ください。
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
pipeline = w.pipelines.create(
name="<pipeline-name>",
pipeline_type="MANAGED_INGESTION",
channel="PREVIEW",
serverless=False,
catalog="<destination-catalog>",
schema="<destination-schema>",
ingestion_definition={
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "<source-database>",
"source_schema": "<source-schema>",
"source_table": "<source-table>",
}
}
],
},
)
print(f"Pipeline created: {pipeline.pipeline_id}")
databricks pipelines create --json '{
"name": "<pipeline-name>",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "<destination-catalog>",
"schema": "<destination-schema>",
"ingestion_definition": {
"connection_name": "<unity-catalog-connection-name>",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "<source-database>",
"source_schema": "<source-schema>",
"source_table": "<source-table>"
}
}
]
}
}'
次の例では、SQL Server データベースから2つのテーブルをレプリケートします。customers テーブルは SCD タイプ 1 を使用し、orders テーブルは SCD タイプ 2 を使用します(SQL Server CDC がソースで必要です)。両方とも最上位の宛先main.ingestionを継承します。"serverless": true をサーバレスコンピュートで実行するように設定します。
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-sqlserver-connection",
"connector_type": "CDC",
"objects": [
{
"table": {
"source_catalog": "my_database",
"source_schema": "dbo",
"source_table": "customers",
"table_configuration": {
"primary_keys": ["customer_id"],
"scd_type": "SCD_TYPE_1"
}
}
},
{
"table": {
"source_catalog": "my_database",
"source_schema": "dbo",
"source_table": "orders",
"table_configuration": {
"primary_keys": ["order_id"],
"scd_type": "SCD_TYPE_2"
}
}
}
],
"data_staging_options": {
"catalog_name": "main",
"schema_name": "ingestion_staging"
}
}
}
ソーススキーマ内のすべてのテーブルを複製するには、個別のtableオブジェクトではなく、schemaオブジェクトを使用します。パイプラインは、ソース上でCDCまたは変更追跡が有効になっていないテーブルをスキップします。
POST /api/2.0/pipelines
{
"name": "my-integrated-cdc-schema-pipeline",
"pipeline_type": "MANAGED_INGESTION",
"channel": "PREVIEW",
"serverless": false,
"catalog": "main",
"schema": "ingestion",
"ingestion_definition": {
"connection_name": "my-sqlserver-connection",
"connector_type": "CDC",
"objects": [
{
"schema": {
"source_catalog": "my_database",
"source_schema": "dbo",
"destination_catalog": "main",
"destination_schema": "ingestion"
}
}
]
}
}
パイプラインの更新を開始します:
POST /api/2.0/pipelines/<pipeline-id>/updates
{
"full_refresh": false
}
定期的な更新をスケジュールする
統合されたCDCパイプラインは、トリガーモードでのみ実行されます。繰り返しスケジュールでデータを取り込むには、パイプラインを実行する Lakeflow Jobs タスクを作成します。各更新には約30分かかり、1回の更新で変更のバックログ全体を処理しきれない場合があります。後続の更新が追いつくように、十分な頻度でパイプラインをスケジュール設定してください。60分という開始点は、ほとんどのワークロードに適しています。トリガーが作動中に前回の更新がまだ実行されている場合、新しい更新はキューに追加されます。
構成リファレンス
パイプラインパラメーター
パラメーター | Type | 説明 |
|---|---|---|
| string | パイプラインの名前。 |
| string |
|
| string |
|
| Boolean |
|
| string | デフォルトの宛先カタログです。テーブルごとの |
| string | デフォルトの宛先スキーマです。テーブルごとの |
| string | Unity Catalog のソースデータベースへの接続。 |
| string |
|
| array | 取り込むテーブルまたはスキーマのリスト。 |
| オブジェクト | オプション。パイプラインがステージング ボリュームを作成するカタログとスキーマ。パイプラインの宛先スキーマにデフォルト設定されます。 |
テーブルの指定
パラメーター | 必須 | 説明 |
|---|---|---|
| はい | ソースデータベース名です。 |
| はい | ソーススキーマ名です。 |
| はい | ソーステーブル名。 |
| No | 宛先カタログ。デフォルトはパイプラインの |
| No | アップグレード先スキーマです。デフォルトはパイプラインの |
| No | 宛先テーブル名です。デフォルトは |
テーブル構成
パラメーター | デフォルト | 説明 |
|---|---|---|
| 自動検出 | 各行を識別する列指定されていない場合は、ソース主キーから自動的に検出されます。 |
|
|
|
| 自動検出 | CDC イベントの論理順序付けに使用される列。指定されていない場合、ソースCDCメカニズムに基づいて自動検出されます。 |
SQL Server のデータ型マッピングについては、「SQL Server コネクタ リファレンス」を参照してください。統合されたCDCパイプラインは、自動的な型の拡張をサポートします。ソース列の型が拡張された場合(例えば、INT から BIGINT へ)、宛先テーブルは自動的に適応します。
パイプラインを監視してください
統合CDCパイプラインを作成して開始した後、以下の方法でそのステータスを監視します。
-
Databricks UI。 「 パイプライン 」セクションでパイプラインを開くと、更新ステータス、テーブルごとの取り込みメトリクス、およびリネージを表示できます。
-
REST API。
TextGET /api/2.0/pipelines/<pipeline-id> -
イベントAPI。
TextGET /api/2.0/pipelines/<pipeline-id>/events
最初のパイプライン更新では、選択したすべてのテーブルの完全なスナップショットが実行されます。これには増分更新よりも時間がかかる場合があります。大きなテーブルの場合、初期スナップショットの完了には複数回のスケジュールされた更新が必要となる場合があります。後続の各更新は、以前の更新が中断したところから続行されます。
データ取り込みを確認するには:
-- Check row counts in the destination table
SELECT COUNT(*) FROM <destination_catalog>.<destination_schema>.<destination_table>;
-- View recent changes (SCD Type 2 tables)
SELECT * FROM <destination_catalog>.<destination_schema>.<destination_table>
ORDER BY __START_AT DESC
LIMIT 10;
フル更新および自動フル更新の動作については、ターゲットテーブルを完全に更新をご覧ください。
統合型CDCパイプラインでは、垂直オートスケールがデフォルトで有効になっています。メモリ不足が原因でパイプラインの更新が失敗した場合、次回の更新でより大きなドライバーが自動的にプロビジョニングされます。この動作を上書きするには、カスタムのクラスターポリシーを使用します。
制限事項:
- ベータ版 統合CDCコネクタには、ワークスペースレベルでの有効化が必要です。Databricks アカウント チームにお問い合わせください。
- トリガーモードのみです。 統合CDCパイプラインは、連続的(常時稼働)な実行には対応していません。Lakeflowジョブのタスクを使用したパイプラインのスケジュール
- APIのみの作成。 パイプラインの作成は、REST API、Databricks CLI、ノートブック、およびDeclarative Automation Bundlesを介して利用できます。UI作成はまだ対応していません。
- チャンネルは
PREVIEWである必要があります。 パイプライン仕様には"channel": "PREVIEW"を含める必要があります。 - 接続とコネクタの種類は変更できません。
connection_nameおよびconnector_typeはパイプラインの作成後に変更できません。ソースを変更するには、新しいパイプラインを作成してください。 - パイプラインあたりの推奨最大テーブル数は300です。
- プライマリインスタンスのみ。 統合CDCコネクターは、リードレプリカ、スタンバイインスタンス、またはセカンダリインスタンスをサポートしていません。
- 主キーのないテーブル パイプラインはすべての非LOBカラムを複合キーとして扱います。重複行は、SCD タイプ 2 を有効にしない限り、1つの行にまとめられる可能性があります。
- 初期スナップショットは複数回の更新にまたがる可能性があります。 大規模なテーブルの場合、初期スナップショットは1回の更新で完了しない可能性があります。次回以降のスケジュール更新は、前回の更新の続きから再開されます。
- 更新ごとに約30分かかります。 ベータ期間中は、パイプラインが1回の更新ですべての変更バックログを必ずしも処理するとは限りません。その後のスケジュールされた更新は、前回の更新が中断した箇所から処理を再開します。このランタイムをベータ版の期間中に構成することはできません。
- ログのパージにはフル更新が必要です。 SQL Serverがパイプラインによる処理の前に変更追跡ログまたはCDCログをパージした場合、影響を受けるテーブルで完全更新を実行してください。パイプラインはこの状態を検出し、イベントログにエラーを表示します。
トラブルシューティング
一部のエラーコードは、INGESTION_GATEWAY_ をプレフィックスとして使用します。これはレガシーな命名規則であり、別途取り込みゲートウェイが必要になるわけではありません。
エラー | 原因 | 解決方法 |
|---|---|---|
| パイプラインは直接公開モードではありません。 | 統合CDCパイプラインには、直接公開モードが自動的に設定されます。このエラーが表示された場合は、パイプラインを再作成してください。 |
| CDC または変更追跡は、1つ以上のソーステーブルで有効になっていません。 | 影響を受けるテーブルでCDCまたは変更追跡を有効にしてください。Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。 |
| 指定されたソーステーブルが存在しません、または削除されています。 | テーブルが存在し、接続ユーザーにアクセス権があることを確認してください。 |
| ソーススキーマは存在しません。 | ソースデータベースにスキーマが存在することを確認してください。 |
| ソースデータベースタイプはサポートされていません。 | 統合CDCコネクタはSQL ServerおよびOracleをサポートします。 |
| テーブル仕様に |
|
| ワークスペースの機能フラグは有効になっていません。 | ワークスペースで統合CDCコネクターを有効にするには、Databricks アカウント チームにお問い合わせください。 |
ここに記載されていない問題が発生した場合は、
- Databricks UIで、または
GET /api/2.0/pipelines/<pipeline-id>/eventsを通じてパイプラインイベントログを確認します。 - Catalog Explorer から Unity Catalog 接続をテストして、ソースが到達可能であることを確認してください。
- 変更追跡またはCDCがソースデータベースとテーブルで有効になっていることを確認してください。
- データベースユーザーに、Microsoft SQL Server データベースユーザー要件に記載されている SQL Server 権限があることを確認します。
- パイプライン仕様に
"channel": "PREVIEW"が含まれていることを確認してください。