SQL Server からデータを取り込む
プレビュー
LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、LakeFlow Connect を使用して SQL Server からデータを取り込み、Databricks に読み込む方法について説明します。
Microsoft SQL Server (SQL Server) コネクタは、次のものをサポートしています。
Azure SQL Database
SQL Server の Amazon RDS
STの概要
取り込み用にソース データベースを構成します。
SQL Serverデータベースに接続し、ソース データベースからスナップショットと変更データを抽出し、それをステージングUnity Catalogボリュームに保存するゲートウェイを作成します。
ステージング ボリュームからのスナップショットと変更データを宛先ストリーミング テーブルに適用する取り込みパイプラインを作成します。
取り込みパイプラインをスケジュールします。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
ワークスペースは Unity Catalog に対して有効になっています。
サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。
接続を作成するには: メタストアで
CREATE CONNECTION
します。既存の接続を使用するには: 接続を
USE CONNECTION
またはALL PRIVILEGES
します。USE CATALOG
ターゲットカタログ上。USE SCHEMA
、CREATE TABLE
、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA
CREATE VOLUME
します。クラスターを作成するための無制限のアクセス許可、またはカスタムポリシー。 カスタムポリシーは、次の要件を満たしている必要があります。
ファミリー: ジョブ コンピュート
ポリシー ファミリのオーバーライド:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }
ワーカーノード(
node_type_id
)は使用されませんが、DLTを実行するために必要です。 最小ノードを指定します。
"driver_node_type_id": { "type": "unlimited", "defaultValue": "r5.xlarge", "isOptional": true }, "node_type_id": { "type": "unlimited", "defaultValue": "m4.large", "isOptional": true }
クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。
SQL Server接続を作成する
コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を保存およびアクセスします。
注:
必要な権限
新しい接続を作成するには、メタストア
CREATE CONNECTION
します。 これを許可するには、メタストア管理者に連絡してください。既存の接続を使用するには、接続オブジェクトを
USE CONNECTION
またはALL PRIVILEGES
します。 接続の所有者に連絡して、これらを許可します。
接続を作成するには、次の手順を実行します。
Databricks ワークスペースで、 [カタログ] > [外部データ] > [接続] をクリックします。
[ 接続の作成] をクリックします。 このボタンが表示されない場合は、
CREATE CONNECTION
権限がない可能性があります。一意の [接続名] を入力します。
接続タイプとしてSQL Serverを選択します。
ホストには、SQL Server ドメイン名を指定します。
ユーザーとパスワードに、SQL Server のログイン資格情報を入力します。
[作成]をクリックします。
注:
接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。
ステージング カタログとスキーマを作成する
SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。 ステージング・カタログをフォーリンカタログにすることはできません。
注:
必要な権限
新しいステージング カタログを作成するには、メタストアで
CREATE CATALOG
します。 メタストア管理者に連絡して、これを付与してください。既存のステージング カタログを使用するには、カタログ
USE CATALOG
します。 カタログの所有者に連絡して、これを付与してください。新しいステージング スキーマを作成するには、カタログ
CREATE SCHEMA
します。 カタログの所有者に連絡して、これを付与してください。既存のステージング スキーマを使用するには、スキーマで 、
CREATE VOLUME
、CREATE TABLE
USE SCHEMA
します。スキーマの所有者に連絡して、これらを付与してください。
Databricks ワークスペースで、 [カタログ]をクリックします。
[カタログ]タブで、次のいずれかを実行します。
[ カタログを作成] をクリックします。 このボタンが表示されない場合は、
CREATE CATALOG
権限がありません。カタログの一意の名前を入力します。次に、[ 作成] をクリックします。
作成したカタログを選択します。
「スキーマの作成」をクリックします。このボタンが表示されない場合は、
CREATE SCHEMA
権限がありません。スキーマの一意の名前を入力します。「 作成」をクリックします。
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
ゲートウェイと取り込みパイプラインを作成する
ゲートウェイはスナップショットと変更データをソース データベースから抽出し、ステージングUnity Catalogボリュームに保存します。 ソース データベースの変更ログ保持ポリシーが原因で変更データにギャップが生じる問題を回避するには、ゲートウェイを継続的なパイプラインとして実行します。
取り込みパイプラインは、ステージング ボリュームのスナップショットと変更データを宛先ストリーミング テーブルに適用します。
注:
ゲートウェイごとに 1 つの取り込みパイプラインのみがサポートされます。
API では許可されていますが、取り込みパイプラインでは複数の宛先カタログとスキーマはサポートされません。 複数の宛先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイ取り込みパイプライン ペアを作成します。
注:
必要な権限
パイプラインを作成するには、 Unrestricted cluster creation
権限が必要です。 アカウント管理者に連絡してください。
次のノートブックのConfiguration
セルを、ソース接続、ターゲット カタログ、ターゲット スキーマ、およびソースから取り込むテーブルで更新します。
ゲートウェイを作成するには:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
取り込みパイプラインを作成するには:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
"destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
}}
]
}
}'
取り込みパイプラインのトリガースケジュールを設定する
注:
取り込みパイプラインの実行にはトリガー モードのみがサポートされます。
パイプライン UI 画面の右上隅にあるボタンをクリックすると、DLT パイプライン UI を使用してパイプラインのスケジュールを作成できます。
UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。 ジョブは「ジョブ」タブに表示されます。
データ取り込みが成功したことを確認する
取り込みパイプライン UI のリスト ビューには、データが取り込まれるときに処理されるレコードの数が表示されます。 これらの数字は自動的に更新されます。
Upserted records
列とDeleted records
列は、デフォルトでは表示されません。これらを有効にするには、列の設定 ボタンをクリックして選択します。
パイプラインを開始、スケジュール、アラートを設定する
パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
パイプラインの詳細を表示するには、パイプライン名をクリックします。
パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。
パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。
インジェストが完了したら、テーブルに対してクエリを実行できます。