SQL Server からデータを取り込む

プレビュー

LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、LakeFlow Connect を使用して SQL Server からデータを取り込み、Databricks に読み込む方法について説明します。

Microsoft SQL Server (SQL Server) コネクタは、次のものをサポートしています。

  • Azure SQL Database

  • SQL Server の Amazon RDS

STの概要

  1. 取り込み用にソース データベースを構成します。

  2. SQL Serverデータベースに接続し、ソース データベースからスナップショットと変更データを抽出し、それをステージングUnity Catalogボリュームに保存するゲートウェイを作成します。

  3. ステージング ボリュームからのスナップショットと変更データを宛先ストリーミング テーブルに適用する取り込みパイプラインを作成します。

  4. 取り込みパイプラインをスケジュールします。

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースは Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。

  • 接続を作成するには: メタストアで CREATE CONNECTION します。

    既存の接続を使用するには: 接続を USE CONNECTION または ALL PRIVILEGES します。

  • USE CATALOG ターゲットカタログ上。

  • USE SCHEMACREATE TABLE、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMACREATE VOLUMEします。

  • CREATE VOLUME 既存のスキーマ上。

取り込み用のソースデータベースを設定する

「取り込み用に SQL Server を構成する」を参照してください。

SQL Server接続を作成する

コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を保存およびアクセスします。

注:

必要な権限

  • 新しい接続を作成するには、メタストア CREATE CONNECTION します。 これを許可するには、メタストア管理者に連絡してください。

  • 既存の接続を使用するには、接続オブジェクトを USE CONNECTION または ALL PRIVILEGES します。 接続の所有者に連絡して、これらを許可します。

接続を作成するには、次の手順を実行します。

  1. Databricks ワークスペースで、 [カタログ] > [外部データ] > [接続] をクリックします。

  2. [ 接続の作成] をクリックします。 このボタンが表示されない場合は、 CREATE CONNECTION 権限がない可能性があります。

  3. 一意の [接続名] を入力します。

  4. 接続タイプとしてSQL Serverを選択します。

  5. ホストには、SQL Server ドメイン名を指定します。

  6. ユーザーパスワードに、SQL Server のログイン資格情報を入力します。

  7. [作成]をクリックします。

注:

接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。

ステージング カタログとスキーマを作成する

SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。

ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。 ステージング・カタログをフォーリンカタログにすることはできません。

注:

必要な権限

  • 新しいステージング カタログを作成するには、メタストアで CREATE CATALOG します。 メタストア管理者に連絡して、これを付与してください。

  • 既存のステージング カタログを使用するには、カタログ USE CATALOG します。 カタログの所有者に連絡して、これを付与してください。

  • 新しいステージング スキーマを作成するには、カタログ CREATE SCHEMA します。 カタログの所有者に連絡して、これを付与してください。

  • 既存のステージング スキーマを使用するには、スキーマで 、 CREATE VOLUMECREATE TABLE USE SCHEMAします。スキーマの所有者に連絡して、これらを付与してください。

  1. Databricks ワークスペースで、 [カタログ]をクリックします。

  2. [カタログ]タブで、次のいずれかを実行します。

  3. [ カタログを作成] をクリックします。 このボタンが表示されない場合は、 CREATE CATALOG 権限がありません。

  4. カタログの一意の名前を入力します。次に、[ 作成] をクリックします。

  5. 作成したカタログを選択します。

  6. 「スキーマの作成」をクリックします。このボタンが表示されない場合は、 CREATE SCHEMA 権限がありません。

  7. スキーマの一意の名前を入力します。「 作成」をクリックします。

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "SQLSERVER",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "1433",
    "trustServerCertificate": "false",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

ゲートウェイと取り込みパイプラインを作成する

ゲートウェイはスナップショットと変更データをソース データベースから抽出し、ステージングUnity Catalogボリュームに保存します。 ソース データベースの変更ログ保持ポリシーが原因で変更データにギャップが生じる問題を回避するには、ゲートウェイを継続的なパイプラインとして実行します。

取り込みパイプラインは、ステージング ボリュームのスナップショットと変更データを宛先ストリーミング テーブルに適用します。

注:

ゲートウェイごとに 1 つの取り込みパイプラインのみがサポートされます。

API では許可されていますが、取り込みパイプラインでは複数の宛先カタログとスキーマはサポートされません。 複数の宛先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイ取り込みパイプライン ペアを作成します。

注:

必要な権限

パイプラインを作成するには、 Unrestricted cluster creation権限が必要です。 アカウント管理者に連絡してください。

次のノートブックのConfigurationセルを、ソース接続、ターゲット カタログ、ターゲット スキーマ、およびソースから取り込むテーブルで更新します。

ゲートウェイと取り込みパイプラインを作成する

ノートブックを新しいタブで開く

ゲートウェイを作成するには:

output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
  "connection_id": "'"$CONNECTION_ID"'",
  "gateway_storage_catalog": "'"$STAGING_CATALOG"'",
  "gateway_storage_schema": "'"$STAGING_SCHEMA"'",
  "gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
  }
}')

export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

取り込みパイプラインを作成するには:

databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
  "objects": [
    {"table": {
        "source_catalog": "tpc",
        "source_schema": "tpch",
        "source_table": "lineitem",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'",
        "destination_table": "<YOUR_DATABRICKS_TABLE>",
        }},
     {"schema": {
        "source_catalog": "tpc",
        "source_schema": "tpcdi",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'"
        "destination_table": "<YOUR_DATABRICKS_SECOND_TABLE>",
        }}
    ]
  }
}'

取り込みパイプラインのトリガースケジュールを設定する

注:

取り込みパイプラインの実行にはトリガー モードのみがサポートされます。

パイプライン UI 画面の右上隅にあるボタンをクリックすると、DLT パイプライン UI を使用してパイプラインのスケジュールを作成できます。

UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。 ジョブは「ジョブ」タブに表示されます。

データ取り込みが成功したことを確認する

取り込みパイプライン UI のリスト ビューには、データが取り込まれるときに処理されるレコードの数が表示されます。 これらの数字は自動的に更新されます。

レプリケーションの検証

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 列構成アイコン ボタンをクリックして選択します。

パイプラインを開始、スケジュール、アラートを設定する

  1. パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。

  4. パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。

  5. インジェストが完了したら、テーブルに対してクエリを実行できます。