SQL Server からデータを取り込む
November 21, 2024
プレビュー
LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。
この記事では、LakeFlow Connect を使用して SQL Server からデータを取り込み、Databricks に読み込む方法について説明します。
Microsoft SQL Server (SQL Server) コネクタは、次のものをサポートしています。
Azure SQL Database
SQL Server の Amazon RDS
ステップの概要
取り込み用にソース データベースを構成します。
SQL Serverデータベースに接続し、ソース データベースからスナップショットと変更データを抽出し、それをステージングUnity Catalogボリュームに保存するゲートウェイを作成します。
ステージング ボリュームからのスナップショットと変更データを宛先ストリーミング テーブルに適用する取り込みパイプラインを作成します。
取り込みパイプラインをスケジュールします。
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
ワークスペースは Unity Catalog に対して有効になっています。
サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。
接続を作成するには: メタストアで
CREATE CONNECTION
します。既存の接続を使用するには: 接続を
USE CONNECTION
またはALL PRIVILEGES
します。USE CATALOG
ターゲットカタログ上。USE SCHEMA
、CREATE TABLE
、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA
CREATE VOLUME
します。クラスターを作成するための無制限のアクセス許可、またはカスタムポリシー。 カスタムポリシーは、次の要件を満たしている必要があります。
ファミリー: ジョブ コンピュート
ポリシー ファミリのオーバーライド:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }
ワーカーノード(
node_type_id
)は使用されませんが、DLTを実行するために必要です。 最小ノードを指定します。
"driver_node_type_id": { "type": "unlimited", "defaultValue": "r5.xlarge", "isOptional": true }, "node_type_id": { "type": "unlimited", "defaultValue": "m4.large", "isOptional": true }
クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。
SQL Server接続を作成する
コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を保存およびアクセスします。
注:
必要な権限
新しい接続を作成するには、メタストア
CREATE CONNECTION
します。 これを許可するには、メタストア管理者に連絡してください。既存の接続を使用するには、接続オブジェクトを
USE CONNECTION
またはALL PRIVILEGES
します。 接続の所有者に連絡して、これらを許可します。
接続を作成するには、次の手順を実行します。
Databricks ワークスペースで、 [カタログ] > [外部データ] > [接続] をクリックします。
[ 接続の作成] をクリックします。 このボタンが表示されない場合は、
CREATE CONNECTION
権限がない可能性があります。一意の [接続名] を入力します。
接続タイプとしてSQL Serverを選択します。
ホストには、SQL Server ドメイン名を指定します。
ユーザーとパスワードに、SQL Server のログイン資格情報を入力します。
[作成]をクリックします。
注:
接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。
ステージングカタログとスキーマを作成する
SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。
ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。 ステージング・カタログをフォーリンカタログにすることはできません。
注:
必要な権限
新しいステージング カタログを作成するには、メタストアで
CREATE CATALOG
します。 メタストア管理者に連絡して、これを付与してください。既存のステージング カタログを使用するには、カタログ
USE CATALOG
します。 カタログの所有者に連絡して、これを付与してください。新しいステージング スキーマを作成するには、カタログ
CREATE SCHEMA
します。 カタログの所有者に連絡して、これを付与してください。既存のステージング スキーマを使用するには、スキーマで 、
CREATE VOLUME
、CREATE TABLE
USE SCHEMA
します。スキーマの所有者に連絡して、これらを付与してください。
Databricks ワークスペースで、 [カタログ]をクリックします。
[カタログ]タブで、次のいずれかを実行します。
[ カタログを作成] をクリックします。 このボタンが表示されない場合は、
CREATE CATALOG
権限がありません。カタログの一意の名前を入力します。次に、[ 作成] をクリックします。
作成したカタログを選択します。
「スキーマの作成」をクリックします。このボタンが表示されない場合は、
CREATE SCHEMA
権限がありません。スキーマの一意の名前を入力します。「 作成」をクリックします。
ゲートウェイと取り込みパイプラインを作成する
ゲートウェイはスナップショットと変更データをソース データベースから抽出し、ステージングUnity Catalogボリュームに保存します。 ソース データベースの変更ログ保持ポリシーが原因で変更データにギャップが生じる問題を回避するには、ゲートウェイを継続的なパイプラインとして実行します。
取り込みパイプラインは、ステージング ボリュームのスナップショットと変更データを宛先ストリーミング テーブルに適用します。
注:
ゲートウェイごとに 1 つの取り込みパイプラインのみがサポートされます。
API では許可されていますが、取り込みパイプラインでは複数の宛先カタログとスキーマはサポートされません。 複数の宛先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイ取り込みパイプライン ペアを作成します。
注:
必要な権限
パイプラインを作成するには、 Unrestricted cluster creation
権限が必要です。 アカウント管理者に連絡してください。
取り込みパイプラインのトリガースケジュールを設定する
注:
取り込みパイプラインの実行にはトリガー モードのみがサポートされます。
パイプライン UI 画面の右上隅にあるボタンをクリックすると、DLT パイプライン UI を使用してパイプラインのスケジュールを作成できます。
UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。 ジョブは「ジョブ」タブに表示されます。
データ取り込みが成功したことを確認する
取り込みパイプライン UI のリスト ビューには、データが取り込まれるときに処理されるレコードの数が表示されます。 これらの数字は自動的に更新されます。
Upserted records
列とDeleted records
列は、デフォルトでは表示されません。これらを有効にするには、列の設定 ボタンをクリックして選択します。
パイプラインの開始、スケジュール、アラートを設定する
パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
パイプラインの詳細を表示するには、パイプライン名をクリックします。
パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。
パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。
インジェストが完了したら、テーブルに対してクエリを実行できます。