メインコンテンツまでスキップ

SQL Server からデータを取り込む

備考

プレビュー

Microsoft SQL Server コネクタは パブリック プレビュー段階です。

このページでは、 からデータを取り込み、SQL Server Databricksを使用してLakeFlow Connect に読み込む方法について説明します。SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。

手順の概要

  1. ソース データベースを取り込み用に構成します。
  2. インジェスト ゲートウェイを作成します。ゲートウェイは SQL Server データベースに接続し、スナップショットと変更データを抽出し、ステージング用に Unity Catalog ボリュームに格納します。
  3. インジェスト パイプラインを作成します。パイプラインは、スナップショットを適用し、ステージングボリュームから宛先ストリーミングテーブルに変更データを適用します。
  4. 取り込み パイプラインをスケジュールします。

始める前に

取り込み パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、および DLTで有効になっています。 サーバレス コンピュートの有効化を参照してください。

  • 接続を作成するには: メタストアで CREATE CONNECTION します。

    既存の接続を使用するには: 接続を USE CONNECTION または ALL PRIVILEGES します。

  • USE CATALOG がターゲットカタログに設定されていること。

  • USE SCHEMACREATE TABLE、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA``CREATE VOLUMEします。

  • クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。

    • ファミリー: ジョブ コンピュート

    • ポリシー ファミリを上書きします:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • Databricks では、ゲートウェイのパフォーマンスに影響を与えないため、インジェスト ゲートウェイに可能な限り最小のワーカー ノードを指定することをお勧めします。

      "driver_node_type_id": {
      "type": "unlimited",
      "defaultValue": "r5.xlarge",
      "isOptional": true
      },
      "node_type_id": {
      "type": "unlimited",
      "defaultValue": "m4.large",
      "isOptional": true
      }

    クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。

取り込み用のソース データベースを設定する

Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。

SQL Server 接続を作成する

  1. Databricks ワークスペースで、 カタログ > 外部データ > 接続 をクリックします。
  2. [ 接続の作成 ] をクリックします。このボタンが表示されない場合は、 CREATE CONNECTION 権限がありません。
  3. 一意の 接続名 を入力します。
  4. 接続の種類SQL Server を選択します。
  5. ホスト で、SQL Server ドメイン名を指定します。
  6. ユーザーパスワード に、SQL Server のログイン資格情報を入力します。
  7. 作成 をクリックします。

ステージング カタログとスキーマを作成する

ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング カタログをフォーリンカタログにすることはできません。

  1. Databricks ワークスペースで、 カタログ をクリックします。
  2. カタログ タブで、次のいずれかの操作を行います。
  3. [ カタログを作成 ] をクリックします。このボタンが表示されない場合は、 CREATE CATALOG 権限がありません。
  4. カタログの一意の名前を入力します。次に、[ 作成 ] をクリックします。
  5. 作成したカタログを選択します。
  6. 「スキーマの作成 」をクリックします。このボタンが表示されない場合は、 CREATE SCHEMA 権限がありません。
  7. スキーマの一意の名前を入力します。「 作成 」をクリックします。

ゲートウェイと取り込み パイプラインを作成する

インジェスト ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、それを Unity Catalog ステージング ボリュームに格納します。ゲートウェイは、連続パイプラインとして実行する必要があります。これにより、ソース・データベースにある変更ログの保持ポリシーに対応できます。

取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。

注記

各インジェスト パイプラインは、1 つのインジェスト ゲートウェイに関連付ける必要があります。

インジェスト パイプラインは、複数の宛先カタログとスキーマをサポートしていません。複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイとパイプラインのペアを作成します。

このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    Bash
    databricks bundle init
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/sqlserver_pipeline.yml)。
    • データ取り込みの頻度を制御するワークフロー ファイル (resources/sqlserver.yml)。

    次に、 resources/sqlserver_pipeline.yml ファイルの例を示します。

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: sqlserver-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <sqlserver-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    pipeline_sqlserver:
    name: sqlserver-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
    source_catalog: test
    source_schema: ingestion_demo
    source_table: lineitem
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: test
    source_schema: ingestion_whole_schema
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    次に、 resources/sqlserver_job.yml ファイルの例を示します。

    YAML
    resources:
    jobs:
    sqlserver_dab_job:
    name: sqlserver_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
  3. Databricks CLI を使用してパイプラインをデプロイします。

    Bash
    databricks bundle deploy

パイプラインの開始、スケジュール設定、アラートの設定

パイプラインの詳細ページでパイプラインのスケジュールを作成できます。

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。

パイプラインに追加するスケジュールごとに、 LakeFlow Connect によってそのジョブが自動的に作成されます。 インジェスト パイプラインは、ジョブ内のタスクです。オプションで、ジョブにタスクを追加できます。

データ取り込みが成功したことを確認する

パイプラインの詳細ページのリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。

レプリケーションの検証

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 列構成アイコン ボタンをクリックして選択します。

追加のリソース