SQL Server からデータを取り込む
プレビュー
Microsoft SQL Server コネクタは パブリック プレビュー段階です。
このページでは、 からデータを取り込み、SQL Server Databricksを使用してLakeFlow Connect に読み込む方法について説明します。SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。
手順の概要
- ソース データベースを取り込み用に構成します。
- インジェスト ゲートウェイを作成します。ゲートウェイは SQL Server データベースに接続し、スナップショットと変更データを抽出し、ステージング用に Unity Catalog ボリュームに格納します。
- インジェスト パイプラインを作成します。パイプラインは、スナップショットを適用し、ステージングボリュームから宛先ストリーミングテーブルに変更データを適用します。
- 取り込み パイプラインをスケジュールします。
始める前に
取り込み パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートは、ノートブック、ワークフロー、および DLTで有効になっています。 サーバレス コンピュートの有効化を参照してください。
-
接続を作成するには: メタストアで
CREATE CONNECTION
します。既存の接続を使用するには: 接続を
USE CONNECTION
またはALL PRIVILEGES
します。 -
USE CATALOG
がターゲットカタログに設定されていること。 -
USE SCHEMA
、CREATE TABLE
、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA``CREATE VOLUME
します。 -
クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。
-
ファミリー: ジョブ コンピュート
-
ポリシー ファミリを上書きします:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
Databricks では、ゲートウェイのパフォーマンスに影響を与えないため、インジェスト ゲートウェイに可能な限り最小のワーカー ノードを指定することをお勧めします。
"driver_node_type_id": {
"type": "unlimited",
"defaultValue": "r5.xlarge",
"isOptional": true
},
"node_type_id": {
"type": "unlimited",
"defaultValue": "m4.large",
"isOptional": true
}
クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。
-
取り込み用のソース データベースを設定する
Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。
SQL Server 接続を作成する
- Databricks ワークスペースで、 カタログ > 外部データ > 接続 をクリックします。
- [ 接続の作成 ] をクリックします。このボタンが表示されない場合は、
CREATE CONNECTION
権限がありません。 - 一意の 接続名 を入力します。
- 接続の種類 で SQL Server を選択します。
- ホスト で、SQL Server ドメイン名を指定します。
- ユーザー と パスワード に、SQL Server のログイン資格情報を入力します。
- 作成 をクリックします。
ステージング カタログとスキーマを作成する
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。
- Catalog Explorer
- CLI
- Databricks ワークスペースで、 カタログ をクリックします。
- カタログ タブで、次のいずれかの操作を行います。
- [ カタログを作成 ] をクリックします。このボタンが表示されない場合は、
CREATE CATALOG
権限がありません。 - カタログの一意の名前を入力します。次に、[ 作成 ] をクリックします。
- 作成したカタログを選択します。
- 「スキーマの作成 」をクリックします。このボタンが表示されない場合は、
CREATE SCHEMA
権限がありません。 - スキーマの一意の名前を入力します。「 作成 」をクリックします。
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
ゲートウェイと取り込み パイプラインを作成する
インジェスト ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、それを Unity Catalog ステージング ボリュームに格納します。ゲートウェイは、連続パイプラインとして実行する必要があります。これにより、ソース・データベースにある変更ログの保持ポリシーに対応できます。
取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。
各インジェスト パイプラインは、1 つのインジェスト ゲートウェイに関連付ける必要があります。
インジェスト パイプラインは、複数の宛先カタログとスキーマをサポートしていません。複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイとパイプラインのペアを作成します。
- Databricks Asset Bundles
- Notebook
- CLI
このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init
-
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/sqlserver_pipeline.yml
)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/sqlserver.yml
)。
次に、
resources/sqlserver_pipeline.yml
ファイルの例を示します。YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW次に、
resources/sqlserver_job.yml
ファイルの例を示します。YAMLresources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
次のノートブックの Configuration
セルを、ソースから取り込むソース接続、ターゲット カタログ、ターゲット スキーマ、およびテーブルで更新します。
ゲートウェイと取り込み パイプラインを作成する
ゲートウェイを作成するには:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
取り込みパイプラインを作成するには:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
パイプラインの開始、スケジュール設定、アラートの設定
パイプラインの詳細ページでパイプラインのスケジュールを作成できます。
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。
パイプラインに追加するスケジュールごとに、 LakeFlow Connect によってそのジョブが自動的に作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。オプションで、ジョブにタスクを追加できます。
データ取り込みが成功したことを確認する
パイプラインの詳細ページのリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。
Upserted records
列とDeleted records
列は、デフォルトでは表示されません。これらを有効にするには、列の設定 ボタンをクリックして選択します。