SQL Server からデータを取り込む
プレビュー
Microsoft SQL Server コネクタは、ゲート パブリック プレビュー段階です。プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、 からデータを取り込み、SQL Server Databricksを使用してLakeFlow Connect に読み込む方法について説明します。
Microsoft SQL Server (SQL Server) コネクタは、次のものをサポートしています。
- Azure SQL Database
- SQL Server の Amazon RDS
手順の概要
- ソース データベースをインジェスト用に構成します。
- SQL Server データベースに接続し、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納するゲートウェイを作成します。
- ステージング ボリュームのスナップショットと変更データを宛先ストリーミング テーブルに適用するインジェスト パイプラインを作成します。
- インジェスト パイプラインをスケジュールします。
データベースのバリエーション
SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートは、ノートブック、ワークフロー、DLT で有効になっています。 Enable サーバレス コンピュートを参照してください。
-
接続を作成するには: メタストアで
CREATE CONNECTION
します。既存の接続を使用するには: 接続を
USE CONNECTION
またはALL PRIVILEGES
します。 -
USE CATALOG
ターゲットカタログ上。 -
USE SCHEMA
、CREATE TABLE
、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA``CREATE VOLUME
します。 -
クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。
-
ファミリー: ジョブ コンピュート
-
ポリシー ファミリのオーバーライド:
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
},
"num_workers": {
"type": "unlimited",
"defaultValue": 1,
"isOptional": true
},
"runtime_engine": {
"type": "fixed",
"value": "STANDARD",
"hidden": true
}
} -
ワーカーノード(
node_type_id
)は使用されませんが、DLTを実行するために必要です。 最小ノードを指定します。
"driver_node_type_id": {
"type": "unlimited",
"defaultValue": "r5.xlarge",
"isOptional": true
},
"node_type_id": {
"type": "unlimited",
"defaultValue": "m4.large",
"isOptional": true
}クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。
-
インジェスト用のソース データベースを設定する
「Databricks へのインジェスト用に Microsoft SQL Server を構成する」を参照してください。
SQL Server 接続を作成する
コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を格納し、アクセスします。
必要な権限
- 新しい接続を作成するには、メタストア
CREATE CONNECTION
します。 これを許可するには、メタストア管理者に連絡してください。 - 既存の接続を使用するには、接続オブジェクトを
USE CONNECTION
またはALL PRIVILEGES
します。 接続の所有者に連絡して、これらを許可します。
接続を作成するには、次の操作を行います。
- Databricks ワークスペースで、[カタログ] > [外部データ (External Data >)] 接続 をクリックします。
- [ 接続の作成 ] をクリックします。 このボタンが表示されない場合は、
CREATE CONNECTION
権限がない可能性があります。 - 一意の [接続名 ] を入力します。
- [接続の種類 ] で [ SQL Server ] を選択します。
- [ホスト ] で、SQL Server ドメイン名を指定します。
- [ユーザー ] と [パスワード] に、SQL Server のログイン資格情報を入力します。
- 作成 をクリックします。
接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。
ステージング カタログとスキーマを作成する
SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。 ステージング・カタログをフォーリンカタログにすることはできません。
必要な権限
- 新しいステージング カタログを作成するには、メタストアで
CREATE CATALOG
します。 メタストア管理者に連絡して、これを付与してください。 - 既存のステージング カタログを使用するには、カタログ
USE CATALOG
します。 カタログの所有者に連絡して、これを付与してください。 - 新しいステージング スキーマを作成するには、カタログ
CREATE SCHEMA
します。 カタログの所有者に連絡して、これを付与してください。 - 既存のステージングスキーマ、スキーマの
USE SCHEMA
、CREATE VOLUME
、CREATE TABLE
を使用するには。 スキーマの所有者に連絡して、これらを付与してください。
- Catalog Explorer
- CLI
- In the Databricks workspace, click Catalog.
- On the Catalog tab, do one of the following:
- Click Create catalog. If you don’t see this button, you don’t have
CREATE CATALOG
privileges. - Enter a unique name for the catalog, and then click Create.
- Select the catalog you created.
- Click Create schema. If you don’t see this button, you don’t have
CREATE SCHEMA
privileges. - Enter a unique name for the schema, and then click Create.
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
ゲートウェイとインジェスト パイプラインを作成する
ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納します。 ソース データベースの変更ログ保持ポリシーによる変更データのギャップの問題を回避するには、ゲートウェイを連続パイプラインとして実行します。
インジェスト パイプラインは、スナップショットと変更データをステージング ボリュームから送信先ストリーミング テーブルに適用します。
ゲートウェイごとに 1 つのインジェスト パイプラインのみがサポートされています。
API で許可されている場合でも、インジェスト パイプラインでは複数の送信先カタログとスキーマはサポートされていません。 複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイとインジェスト パイプラインのペアを作成します。
必要な権限
パイプラインを作成するには、 Unrestricted cluster creation
アクセス許可が必要です。 アカウント管理者に問い合わせてください。
- Databricks Asset Bundles
- Notebook
- CLI
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init
-
Add two new resource files to the bundle:
- A pipeline definition file (
resources/sqlserver_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/sqlserver.yml
).
The following is an example
resources/sqlserver_pipeline.yml
file:YAMLvariables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema
resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEW
pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}
channel: PREVIEWThe following is an example
resources/sqlserver_job.yml
file:YAMLresources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
Update the Configuration
cell in the following notebook with the source connection, target catalog, target schema, and tables to ingest from the source.
Create gateway and ingestion pipeline
To create the gateway:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
To create the ingestion pipeline:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
インジェスト パイプラインのトリガー スケジュールを設定する
トリガー モードは、インジェスト パイプラインの実行でのみサポートされています。
パイプラインのスケジュールを作成するには、DLT パイプライン UI を使用して、パイプライン UI 画面の右上隅にあるボタンをクリックします。
UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。 ジョブは [ジョブ] タブに表示されます。
データ取り込みが成功したことを確認する
インジェストパイプライン UI のリストビューには、データの取り込み時に処理されたレコードの数が表示されます。 これらの番号は自動的に更新されます。
Upserted records
列とDeleted records
列は、デフォルトでは表示されません。これらを有効にするには、列の設定 ボタンをクリックして選択します。
パイプラインの開始、スケジュール設定、アラートの設定
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、[ スケジュール] をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。