メインコンテンツまでスキップ

SQL Server からデータを取り込む

備考

プレビュー

Microsoft SQL Server コネクタは パブリック プレビュー段階です。

このページでは、 からデータを取り込み、SQL Server Databricksを使用してLakeflowコネクト に読み込む方法について説明します。SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。

始める前に

取り込み パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっています。

  • サーバレス コンピュートがワークスペースで有効になっています。 Enable サーバレス コンピュートを参照してください。

  • 接続を作成する予定の場合: メタストアに対する CREATE CONNECTION 権限があります。

    コネクタが UI ベースのパイプラインオーサリングをサポートしている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。ただし、API ベースのパイプラインオーサリングを使用する場合は、このページの手順を完了する前に、Catalog Explorer で接続を作成する必要があります。「管理された取り込みソースに接続する」を参照してください

  • 既存の接続を使用する予定の場合: 接続に対する USE CONNECTION 権限または ALL PRIVILEGES があります。

  • ターゲット・カタログに対する USE CATALOG 権限があります。

  • 既存のスキーマに対する USE SCHEMACREATE TABLECREATE VOLUME 権限、またはターゲットカタログに対する CREATE SCHEMA 権限を持っている。

  • プライマリ SQL Server インスタンスにアクセスできる。変更追跡機能とチェンジデータキャプチャ機能は、リードレプリカまたはセカンダリインスタンスではサポートされていません。

  • クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。

    • ファミリー: ジョブ コンピュート

    • ポリシー ファミリを上書きします:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • Databricks では、ゲートウェイのパフォーマンスに影響を与えないため、インジェスト ゲートウェイに可能な限り最小のワーカー ノードを指定することをお勧めします。

      "driver_node_type_id": {
      "type": "unlimited",
      "defaultValue": "r5.xlarge",
      "isOptional": true
      },
      "node_type_id": {
      "type": "unlimited",
      "defaultValue": "m4.large",
      "isOptional": true
      }

    クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。

SQL Server から取り込むには、 ソースのセットアップも完了する必要があります。

オプション 1: Databricks UI

管理者ユーザーは、UI で接続とパイプラインを同時に作成できます。これは、管理インジェスト パイプラインを作成する最も簡単な方法です。

  1. Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。

  2. [ データの追加 ] ページの [Databricks コネクタ ] で、[ SQL Server ] をクリックします。

    インジェスト ウィザードが開きます。

  3. ウィザードの [インジェスト ゲートウェイ ] ページで、ゲートウェイの一意の名前を入力します。

  4. ステージング取り込みデータのカタログとスキーマを選択し、[ 次へ ] をクリックします。

  5. [ インジェスト パイプライン ] ページで、パイプラインの一意の名前を入力します。

  6. [宛先カタログ] で、取り込んだデータを保存するカタログを選択します。

  7. ソース データへのアクセスに必要な資格情報を格納する Unity Catalog 接続を選択します。

    ソースへの既存の接続がない場合は、[ 接続の作成 ] をクリックし、 ソース設定から取得した認証の詳細を入力します。メタストアに対する CREATE CONNECTION 権限が必要です。

  8. パイプラインの作成および続行 をクリックします。

  9. [ ソース ] ページで、取り込むテーブルを選択します。

  10. 必要に応じて、デフォルトの履歴追跡設定を変更します。詳細については、「 履歴追跡」を参照してください。

  11. 次へ をクリックします。

  12. 宛先 ページで、書き込む Unity Catalog カタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、[ スキーマの作成 ] をクリックします。親カタログに対する USE CATALOG 権限と CREATE SCHEMA 権限が必要です。

  13. 保存して続行 をクリックします。

  14. (オプション) 設定 ページで、 スケジュールの作成 をクリックします。宛先テーブルを更新する頻度を設定します。

  15. (オプション)パイプライン操作の成功または失敗のEメール 通知を設定します。

  16. パイプラインの保存と実行 をクリックします。

オプション 2: その他のインターフェイス

Databricks Asset Bundles、Databricks APIs、Databricks SDK、または Databricks CLIを使用して取り込む前に、既存の Unity Catalog 接続にアクセスできる必要があります。手順については、「 管理されたインジェスト ソースに接続する」を参照してください。

ステージング カタログとスキーマを作成する

ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

ゲートウェイと取り込み パイプラインを作成する

インジェスト ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、それを Unity Catalog ステージング ボリュームに格納します。ゲートウェイは、連続パイプラインとして実行する必要があります。これにより、ソース・データベースにある変更ログの保持ポリシーに対応できます。

取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。

注記

各インジェスト パイプラインは、1 つのインジェスト ゲートウェイに関連付ける必要があります。

インジェスト パイプラインは、複数の宛先カタログとスキーマをサポートしていません。複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイとパイプラインのペアを作成します。

このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    Bash
    databricks bundle init
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/sqlserver_pipeline.yml)。
    • データ取り込みの頻度を制御するワークフロー ファイル (resources/sqlserver.yml)。

    次に、 resources/sqlserver_pipeline.yml ファイルの例を示します。

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: sqlserver-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <sqlserver-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    pipeline_sqlserver:
    name: sqlserver-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
    source_catalog: test
    source_schema: ingestion_demo
    source_table: lineitem
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: test
    source_schema: ingestion_whole_schema
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    次に、 resources/sqlserver_job.yml ファイルの例を示します。

    YAML
    resources:
    jobs:
    sqlserver_dab_job:
    name: sqlserver_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
  3. Databricks CLI を使用してパイプラインをデプロイします。

    Bash
    databricks bundle deploy

パイプラインの開始、スケジュール設定、アラートの設定

パイプラインの詳細ページでパイプラインのスケジュールを作成できます。

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。

パイプラインに追加するスケジュールごとに、 Lakeflowコネクト によってそのジョブが自動的に作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。オプションで、ジョブにタスクを追加できます。

データ取り込みが成功したことを確認する

パイプラインの詳細ページのリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。

レプリケーションの検証

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 列構成アイコン ボタンをクリックして選択します。

追加のリソース