メインコンテンツまでスキップ

SQL Server からデータを取り込む

備考

プレビュー

Microsoft SQL Server コネクタは、ゲート パブリック プレビュー段階です。プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、 からデータを取り込み、SQL Server Databricksを使用してLakeFlow Connect に読み込む方法について説明します。

Microsoft SQL Server (SQL Server) コネクタは、次のものをサポートしています。

  • Azure SQL Database
  • SQL Server の Amazon RDS

手順の概要

  1. ソース データベースをインジェスト用に構成します。
  2. SQL Server データベースに接続し、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納するゲートウェイを作成します。
  3. ステージング ボリュームのスナップショットと変更データを宛先ストリーミング テーブルに適用するインジェスト パイプラインを作成します。
  4. インジェスト パイプラインをスケジュールします。

データベースのバリエーション

SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、DLT で有効になっています。 Enable サーバレス コンピュートを参照してください。

  • 接続を作成するには: メタストアで CREATE CONNECTION します。

    既存の接続を使用するには: 接続を USE CONNECTION または ALL PRIVILEGES します。

  • USE CATALOG ターゲットカタログ上。

  • USE SCHEMACREATE TABLE、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA``CREATE VOLUMEします。

  • クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。

    • ファミリー: ジョブ コンピュート

    • ポリシー ファミリのオーバーライド:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • ワーカーノード(node_type_id)は使用されませんが、DLTを実行するために必要です。 最小ノードを指定します。

    "driver_node_type_id": {
    "type": "unlimited",
    "defaultValue": "r5.xlarge",
    "isOptional": true
    },
    "node_type_id": {
    "type": "unlimited",
    "defaultValue": "m4.large",
    "isOptional": true
    }

    クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。

インジェスト用のソース データベースを設定する

「Databricks へのインジェスト用に Microsoft SQL Server を構成する」を参照してください

SQL Server 接続を作成する

コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を格納し、アクセスします。

注記

必要な権限

  • 新しい接続を作成するには、メタストア CREATE CONNECTION します。 これを許可するには、メタストア管理者に連絡してください。
  • 既存の接続を使用するには、接続オブジェクトを USE CONNECTION または ALL PRIVILEGES します。 接続の所有者に連絡して、これらを許可します。

接続を作成するには、次の操作を行います。

  1. Databricks ワークスペースで、[カタログ] > [外部データ (External Data >)] 接続 をクリックします。
  2. [ 接続の作成 ] をクリックします。 このボタンが表示されない場合は、 CREATE CONNECTION 権限がない可能性があります。
  3. 一意の [接続名 ] を入力します。
  4. [接続の種類 ] で [ SQL Server ] を選択します。
  5. [ホスト ] で、SQL Server ドメイン名を指定します。
  6. [ユーザー ] と [パスワード] に、SQL Server のログイン資格情報を入力します。
  7. 作成 をクリックします。
注記

接続テスト は、ホストが到達可能であることをテストします。 ユーザ クレデンシャルが正しいユーザ名とパスワードの値であるかどうかはテストされません。

ステージング カタログとスキーマを作成する

SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。

ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。 ステージング・カタログをフォーリンカタログにすることはできません。

注記

必要な権限

  • 新しいステージング カタログを作成するには、メタストアで CREATE CATALOG します。 メタストア管理者に連絡して、これを付与してください。
  • 既存のステージング カタログを使用するには、カタログ USE CATALOG します。 カタログの所有者に連絡して、これを付与してください。
  • 新しいステージング スキーマを作成するには、カタログ CREATE SCHEMA します。 カタログの所有者に連絡して、これを付与してください。
  • 既存のステージングスキーマ、スキーマの USE SCHEMACREATE VOLUMECREATE TABLE を使用するには。 スキーマの所有者に連絡して、これらを付与してください。
  1. In the Databricks workspace, click Catalog.
  2. On the Catalog tab, do one of the following:
  3. Click Create catalog. If you don’t see this button, you don’t have CREATE CATALOG privileges.
  4. Enter a unique name for the catalog, and then click Create.
  5. Select the catalog you created.
  6. Click Create schema. If you don’t see this button, you don’t have CREATE SCHEMA privileges.
  7. Enter a unique name for the schema, and then click Create.

ゲートウェイとインジェスト パイプラインを作成する

ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納します。 ソース データベースの変更ログ保持ポリシーによる変更データのギャップの問題を回避するには、ゲートウェイを連続パイプラインとして実行します。

インジェスト パイプラインは、スナップショットと変更データをステージング ボリュームから送信先ストリーミング テーブルに適用します。

注記

ゲートウェイごとに 1 つのインジェスト パイプラインのみがサポートされています。

API で許可されている場合でも、インジェスト パイプラインでは複数の送信先カタログとスキーマはサポートされていません。 複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイとインジェスト パイプラインのペアを作成します。

注記

必要な権限

パイプラインを作成するには、 Unrestricted cluster creation アクセス許可が必要です。 アカウント管理者に問い合わせてください。

This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.

  1. Create a new bundle using the Databricks CLI:

    Bash
    databricks bundle init
  2. Add two new resource files to the bundle:

    • A pipeline definition file (resources/sqlserver_pipeline.yml).
    • A workflow file that controls the frequency of data ingestion (resources/sqlserver.yml).

    The following is an example resources/sqlserver_pipeline.yml file:

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: sqlserver-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <sqlserver-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    pipeline_sqlserver:
    name: sqlserver-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
    source_catalog: test
    source_schema: ingestion_demo
    source_table: lineitem
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: test
    source_schema: ingestion_whole_schema
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    The following is an example resources/sqlserver_job.yml file:

    YAML
    resources:
    jobs:
    sqlserver_dab_job:
    name: sqlserver_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
  3. Deploy the pipeline using the Databricks CLI:

    Bash
    databricks bundle deploy

インジェスト パイプラインのトリガー スケジュールを設定する

注記

トリガー モードは、インジェスト パイプラインの実行でのみサポートされています。

パイプラインのスケジュールを作成するには、DLT パイプライン UI を使用して、パイプライン UI 画面の右上隅にあるボタンをクリックします。

UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。 ジョブは [ジョブ] タブに表示されます。

データ取り込みが成功したことを確認する

インジェストパイプライン UI のリストビューには、データの取り込み時に処理されたレコードの数が表示されます。 これらの番号は自動的に更新されます。

レプリケーションの検証

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 列構成アイコン ボタンをクリックして選択します。

パイプラインの開始、スケジュール設定、アラートの設定

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、[ スケジュール] をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。