SQL Server からデータを取り込む
LakeFlow Connectを使用してSQL ServerからDatabricksにデータを取り込む方法を学習します。
SQL Server コネクタは、Azure SQL Database、Azure SQL Managed Instance、Amazon RDS SQL データベースをサポートします。これには、Azure 仮想マシン (VM) および Amazon EC2 上で実行される SQL Server が含まれます。このコネクタは、Azure ExpressRoute および AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートします。
要件
-
インジェスト ゲートウェイとインジェスト パイプラインを作成するには、まず次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。
-
接続を作成する場合:メタストアに対する権限は
CREATE CONNECTIONです。Unity Catalogの「権限の管理」を参照してください。コネクタが UI ベースのパイプラインオーサリングをサポートしている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。ただし、API ベースのパイプラインオーサリングを使用する場合は、このページの手順を完了する前に、Catalog Explorer で接続を作成する必要があります。「管理された取り込みソースに接続する」を参照してください。
-
既存の接続を使用する予定の場合: 接続に対する
USE CONNECTION権限またはALL PRIVILEGESがあります。 -
ターゲット・カタログに対する
USE CATALOG権限があります。 -
既存のスキーマに対する
USE SCHEMA、CREATE TABLE、CREATE VOLUME権限、またはターゲットカタログに対するCREATE SCHEMA権限を持っている。 -
プライマリ SQL Server インスタンスにアクセスできる。変更追跡機能とチェンジデータキャプチャ機能は、リードレプリカまたはセカンダリインスタンスではサポートされていません。
-
クラスターを作成するための無制限の権限、またはカスタム ポリシー (API のみ)。ゲートウェイのカスタム ポリシーは、次の要件を満たす必要があります。
-
ファミリー: ジョブ コンピュート
-
ポリシー ファミリを上書きします:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
}- Databricks では、ゲートウェイのパフォーマンスに影響を与えないため、インジェスト ゲートウェイに可能な限り最小のワーカー ノードを指定することをお勧めします。次のコンピュート ポリシーを使用すると、 Databricks ワークロードのニーズに合わせてインジェスト ゲートウェイをスケーリングできます。 ソース データベースから効率的かつパフォーマンスの高いデータ抽出を可能にするための最小要件は 8 コアです。
Python{
"driver_node_type_id": {
"type": "fixed",
"value": "n2-highmem-64"
},
"node_type_id": {
"type": "fixed",
"value": "n2-standard-4"
}
}詳細については、 情報 クラスターポリシーについては、「 コンピュートポリシーの選択」を参照してください。
-
-
-
SQL Serverから取り込むには、まず「 Databricksへの取り込み用にMicrosoft SQL Server構成する」の手順を完了する必要があります。
ゲートウェイと取り込みパイプラインを作成する
- Databricks UI
- Databricks Asset Bundles
- Databricks notebook
- Terraform
-
Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
-
[ データの追加 ] ページの [Databricks コネクタ ] で、[ SQL Server ] をクリックします。
-
インジェスト ウィザードの [ 接続] ページで、 Databricks へのインジェスト用に Microsoft SQL Server を構成するから SQL Server アクセス資格情報を保存する接続を選択します。メタストアに
CREATE CONNECTION権限がある場合は、クリックして接続を作成して 、 SQL Serverの認証詳細を使用して新しい接続を作成します。
-
次へ をクリックします。
-
インジェスト設定 ページで、インジェスト パイプラインの一意の名前を入力します。このパイプラインは、ステージング場所から宛先にデータを移動します。
-
イベント ログを書き込むカタログとスキーマを選択します。イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、エラーが含まれます。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
-
(オプション) すべてのテーブルの自動完全更新を オン に設定します。自動更新がオンになっている場合、パイプラインは、影響を受けるテーブルを完全に更新することによって、ログ クリーンアップ イベントや特定の種類のスキーマ進化などの問題を自動的に修正しようとします。 履歴の追跡が有効になっている場合、完全更新によってその履歴は消去されます。
-
インジェストゲートウェイの一意の名前を入力します。ゲートウェイは、ソースから変更を抽出し、取り込みパイプラインがロードできるようにステージングするパイプラインです。
-
ステージング場所 のカタログとスキーマを選択します。抽出されたデータをステージングするためのボリュームがこの場所に作成されます。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
-
パイプラインの作成および続行 をクリックします。
-
[ソース] ページで、取り込むテーブルを選択します。特定のテーブルを選択した場合は、テーブル設定を構成できます。
a. (オプション) [設定] タブで、取り込まれたテーブルごとに 宛先名 を指定します。これは、オブジェクトを同じスキーマに複数回取り込むときに、宛先テーブルを区別するのに役立ちます。「宛先テーブルに名前を付ける」を参照してください。
a. (オプション) デフォルトの 履歴追跡 設定を変更します。「履歴追跡を有効にする (SCD タイプ 2)」を参照してください。
-
「次へ」 をクリックし、 「保存して続行」 をクリックします。
-
[宛先] ページで、データをロードするカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
-
保存して続行 をクリックします。
-
データベース設定 ページで、 「検証」 をクリックして、ソースが Databricks 取り込み用に適切に構成されていることを確認します。不足している構成があれば返されます。ステップを解決するには、 「構成を完了する」 をクリックします。 次に 「次へ」 をクリックします。または、 「検証をスキップ」 をクリックします。
-
(オプション) スケジュールと通知 ページで、
スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。
-
(オプション)クリック
通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。
宣言型自動化バンドルを使用してデータを取り込む前に、既存の接続にアクセスできる必要があります。手順については、 「管理対象の取り込みソースへの接続」を参照してください。
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。 バンドル パイプライン YAML ファイルのgateway_definitionセクションでステージング場所を指定します。
インジェスト ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、それを Unity Catalog ステージング ボリュームに格納します。ゲートウェイは、連続パイプラインとして実行する必要があります。これにより、ソース・データベースにある変更ログの保持ポリシーに対応できます。
取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。
バンドルにはジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理でき、さまざまなターゲット ワークスペース (開発、ステージング、本番運用など) で共有して実行できます。 詳細については、 「宣言的オートメーション バンドルとは何ですか?」を参照してください。 。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (例:
resources/sqlserver_pipeline.yml)。パイプライン.ingestion_定義を参照してください。 および例。 - データ取り込みの頻度を制御するジョブ定義ファイル (例:
resources/sqlserver_job.yml)。
- パイプライン定義ファイル (例:
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
次のノートブックの Configuration セルを、ソースから取り込むソース接続、ターゲット カタログ、ターゲット スキーマ、およびテーブルで更新します。
Terraform を使用して、SQL Server 取り込みパイプラインをデプロイおよび管理できます。ゲートウェイおよび取り込みパイプラインを作成するためのTerraform構成を含む完全なサンプル フレームワークについては、 GitHubのLakeFlow Connect Terraformリポジトリを参照してください。
データ取り込みが成功したことを確認する
パイプラインの詳細ページのリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 ボタンをクリックして選択します。
例
これらの例を使用してパイプラインを構成します。
パイプライン構成
- Databricks Asset Bundles
- Databricks notebook
次のパイプライン定義ファイル:
variables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
以下はパイプライン仕様のConfigurationセクションの例です。
# The name of the UC connection with the credentials to access the source database
connection_name = "my_connection"
# The name of the UC catalog and schema to store the replicated tables
target_catalog_name = "main"
target_schema_name = "lakeflow_sqlserver_connector_cdc"
# The name of the UC catalog and schema to store the staging volume with intermediate
# CDC and snapshot data. Use the destination catalog/schema by default.
stg_catalog_name = target_catalog_name
stg_schema_name = target_schema_name
# The name of the Gateway pipeline to create
gateway_pipeline_name = "cdc_gateway"
# The name of the Ingestion pipeline to create
ingestion_pipeline_name = "cdc_ingestion"
# Construct the full list of tables to replicate.
# IMPORTANT: The letter case of catalog, schema, and table names must match exactly
# the case used in the source database system tables.
tables_to_replicate = replicate_full_db_schema("MY_DB", ["MY_DB_SCHEMA"])
# Append tables from additional schemas as needed:
# + replicate_tables_from_db_schema("MY_DB", "MY_SCHEMA_2", ["table3", "table4"])
バンドルジョブ定義ファイル
以下は、宣言型自動化バンドルで使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。
resources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
一般的なパターン
高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。
次のステップ
パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。