SQL Server からデータを取り込む
プレビュー
Microsoft SQL Server コネクタは、ゲート パブリック プレビュー段階です。プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、 LakeFlow Connectを使用してSQL Serverからデータを取り込み、Databricks に読み込む方法について説明します。
手順の概要
- ソース データベースを取り込み用に構成します。
- SQL Server データベースに接続し、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納するゲートウェイを作成します。
- スナップショットを適用し、ステージングボリュームから宛先ストリーミングテーブルにデータを変更する取り込みパイプラインを作成します。
- 取り込み パイプラインをスケジュールします。
データベースのバリエーション
SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。
始める前に
取り込み パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートは、ノートブック、ワークフロー、および DLTで有効になっています。 サーバレス コンピュートの有効化を参照してください。
-
接続を作成するには: メタストアで
CREATE CONNECTION
します。既存の接続を使用するには: 接続を
USE CONNECTION
またはALL PRIVILEGES
します。 -
USE CATALOG
がターゲットカタログに設定されていること。 -
USE SCHEMA
、CREATE TABLE
、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA``CREATE VOLUME
します。 -
クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。
-
ファミリー: ジョブ コンピュート
-
ポリシー ファミリを上書きします:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
ワーカーノード(
node_type_id
)は使用されませんが、DLTを実行するために必要です。最小ノードを指定します。
"driver_node_type_id": {
"type": "unlimited",
"defaultValue": "r5.xlarge",
"isOptional": true
},
"node_type_id": {
"type": "unlimited",
"defaultValue": "m4.large",
"isOptional": true
}クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。
-
取り込み用のソース データベースを設定する
Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。
SQL Server 接続を作成する
コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を格納し、アクセスします。
必要な権限
- 新しい接続を作成するには、メタストア
CREATE CONNECTION
します。これを許可するには、メタストア管理者に連絡してください。 - 既存の接続を使用するには、接続オブジェクトを
USE CONNECTION
またはALL PRIVILEGES
します。接続の所有者に連絡して、これらを許可します。
接続を作成するには、次の手順を実行します。
- Databricks ワークスペースで、 カタログ > 外部データ > 接続 をクリックします。
- [ 接続の作成 ] をクリックします。このボタンが表示されない場合は、
CREATE CONNECTION
権限がない可能性があります。 - 一意の 接続名 を入力します。
- 接続の種類 で SQL Server を選択します。
- ホスト で、SQL Server ドメイン名を指定します。
- ユーザー と パスワード に、SQL Server のログイン資格情報を入力します。
- 作成 をクリックします。
ステージング カタログとスキーマを作成する
SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング・カタログをフォーリンカタログにすることはできません。
必要な権限
- 新しいステージング カタログを作成するには、メタストアで
CREATE CATALOG
します。メタストア管理者に連絡して、これを付与してください。 - 既存のステージング カタログを使用するには、カタログ
USE CATALOG
します。カタログの所有者に連絡して、これを付与してください。 - 新しいステージング スキーマを作成するには、カタログ
CREATE SCHEMA
します。カタログの所有者に連絡して、これを付与してください。 - 既存のステージング スキーマを使用するには、スキーマで 、
CREATE VOLUME
、CREATE TABLE
USE SCHEMA
します。スキーマの所有者に連絡して、これらを付与してください。
- Catalog Explorer
- CLI
- Databricks ワークスペースで、 カタログ をクリックします。
- カタログ タブで、次のいずれかの操作を行います。
- [ カタログを作成 ] をクリックします。このボタンが表示されない場合は、
CREATE CATALOG
権限がありません。 - カタログの一意の名前を入力します。次に、[ 作成 ] をクリックします。
- 作成したカタログを選択します。
- スキーマの作成 をクリックします。このボタンが表示されない場合は、
CREATE SCHEMA
権限がありません。 - スキーマの一意の名前を入力します。「 作成 」をクリックします。
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
ゲートウェイと取り込み パイプラインを作成する
ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納します。ソース データベースの変更ログ保持ポリシーによる変更データのギャップの問題を回避するには、ゲートウェイを連続パイプラインとして実行します。
取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。
ゲートウェイごとに 1 つの取り込み パイプラインのみがサポートされています。
API で許可されている場合でも、取り込み パイプラインでは複数の送信先カタログとスキーマはサポートされていません。複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイと取り込み パイプラインのペアを作成します。
必要な権限
パイプラインを作成するには、 Unrestricted cluster creation
アクセス許可が必要です。アカウント管理者に問い合わせてください。
- Databricks Asset Bundles
- Notebook
- CLI
このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init
-
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (
resources/sqlserver_pipeline.yml
)。 - データ取り込みの頻度を制御するワークフロー ファイル (
resources/sqlserver.yml
)。
次に、
resources/sqlserver_pipeline.yml
ファイルの例を示します。YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW次に、
resources/sqlserver_job.yml
ファイルの例を示します。YAMLresources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id} - パイプライン定義ファイル (
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
次のノートブックの Configuration
セルを、ソースから取り込むソース接続、ターゲット カタログ、ターゲット スキーマ、およびテーブルで更新します。
ゲートウェイと取り込み パイプラインを作成する
ゲートウェイを作成するには:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
取り込みパイプラインを作成するには:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
取り込み パイプラインのトリガー スケジュールを設定する
トリガー モードは、取り込み パイプラインの実行でのみサポートされています。
パイプラインのスケジュールを作成するには、DLT パイプライン UI を使用して、パイプライン UI 画面の右上隅にあるボタンをクリックします。
UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。ジョブは [ジョブ] タブに表示されます。
データ取り込みが成功したことを確認する
取り込みパイプライン UI のリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。
Upserted records
列とDeleted records
列は、デフォルトでは表示されません。これらを有効にするには、列の設定 ボタンをクリックして選択します。
パイプラインの開始、スケジュール設定、アラートの設定
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。