メインコンテンツまでスキップ

SQL Server からデータを取り込む

備考

プレビュー

Microsoft SQL Server コネクタは、ゲート パブリック プレビュー段階です。プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、 LakeFlow Connectを使用してSQL Serverからデータを取り込み、Databricks に読み込む方法について説明します。

手順の概要

  1. ソース データベースを取り込み用に構成します。
  2. SQL Server データベースに接続し、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納するゲートウェイを作成します。
  3. スナップショットを適用し、ステージングボリュームから宛先ストリーミングテーブルにデータを変更する取り込みパイプラインを作成します。
  4. 取り込み パイプラインをスケジュールします。

データベースのバリエーション

SQL Server コネクタは、Azure SQL データベースと Amazon RDS SQL データベースをサポートしています。これには、Azure 仮想マシン (VM) と Amazon EC2 で実行されている SQL Server が含まれます。このコネクタは、Azure ExpressRoute と AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートしています。

始める前に

取り込み パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、および DLTで有効になっています。 サーバレス コンピュートの有効化を参照してください。

  • 接続を作成するには: メタストアで CREATE CONNECTION します。

    既存の接続を使用するには: 接続を USE CONNECTION または ALL PRIVILEGES します。

  • USE CATALOG がターゲットカタログに設定されていること。

  • USE SCHEMACREATE TABLE、およびターゲット・カタログ上の既存のスキーマまたはCREATE SCHEMA``CREATE VOLUMEします。

  • クラスターまたはカスタム ポリシーを作成するための無制限のアクセス許可。 カスタムポリシーは、次の要件を満たしている必要があります。

    • ファミリー: ジョブ コンピュート

    • ポリシー ファミリを上書きします:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
    • ワーカーノード(node_type_id)は使用されませんが、DLTを実行するために必要です。最小ノードを指定します。

    "driver_node_type_id": {
    "type": "unlimited",
    "defaultValue": "r5.xlarge",
    "isOptional": true
    },
    "node_type_id": {
    "type": "unlimited",
    "defaultValue": "m4.large",
    "isOptional": true
    }

    クラスターポリシーの詳細については、「 クラスターポリシーの選択」を参照してください。

取り込み用のソース データベースを設定する

Databricks への取り込み用に Microsoft SQL Server を構成するを参照してください。

SQL Server 接続を作成する

コネクタは、Unity Catalog 接続オブジェクトを使用して、ソース データベースの資格情報を格納し、アクセスします。

注記

必要な権限

  • 新しい接続を作成するには、メタストア CREATE CONNECTION します。これを許可するには、メタストア管理者に連絡してください。
  • 既存の接続を使用するには、接続オブジェクトを USE CONNECTION または ALL PRIVILEGES します。接続の所有者に連絡して、これらを許可します。

接続を作成するには、次の手順を実行します。

  1. Databricks ワークスペースで、 カタログ > 外部データ > 接続 をクリックします。
  2. [ 接続の作成 ] をクリックします。このボタンが表示されない場合は、 CREATE CONNECTION 権限がない可能性があります。
  3. 一意の 接続名 を入力します。
  4. 接続の種類SQL Server を選択します。
  5. ホスト で、SQL Server ドメイン名を指定します。
  6. ユーザーパスワード に、SQL Server のログイン資格情報を入力します。
  7. 作成 をクリックします。

ステージング カタログとスキーマを作成する

SQL Server コネクタは、指定したステージング Unity Catalog カタログとスキーマに中間データを格納するためのUnity Catalogステージング ボリュームを作成します。

ステージング カタログとスキーマは、宛先カタログとスキーマと同じにすることができます。ステージング・カタログをフォーリンカタログにすることはできません。

注記

必要な権限

  • 新しいステージング カタログを作成するには、メタストアで CREATE CATALOG します。メタストア管理者に連絡して、これを付与してください。
  • 既存のステージング カタログを使用するには、カタログ USE CATALOG します。カタログの所有者に連絡して、これを付与してください。
  • 新しいステージング スキーマを作成するには、カタログ CREATE SCHEMA します。カタログの所有者に連絡して、これを付与してください。
  • 既存のステージング スキーマを使用するには、スキーマで 、 CREATE VOLUMECREATE TABLE USE SCHEMAします。スキーマの所有者に連絡して、これらを付与してください。
  1. Databricks ワークスペースで、 カタログ をクリックします。
  2. カタログ タブで、次のいずれかの操作を行います。
  3. [ カタログを作成 ] をクリックします。このボタンが表示されない場合は、 CREATE CATALOG 権限がありません。
  4. カタログの一意の名前を入力します。次に、[ 作成 ] をクリックします。
  5. 作成したカタログを選択します。
  6. スキーマの作成 をクリックします。このボタンが表示されない場合は、 CREATE SCHEMA 権限がありません。
  7. スキーマの一意の名前を入力します。「 作成 」をクリックします。

ゲートウェイと取り込み パイプラインを作成する

ゲートウェイは、ソース データベースからスナップショットと変更データを抽出し、ステージング Unity Catalog ボリュームに格納します。ソース データベースの変更ログ保持ポリシーによる変更データのギャップの問題を回避するには、ゲートウェイを連続パイプラインとして実行します。

取り込み パイプラインは、スナップショットと変更データをステージング ボリュームから宛先ストリーミングテーブルに適用します。

注記

ゲートウェイごとに 1 つの取り込み パイプラインのみがサポートされています。

API で許可されている場合でも、取り込み パイプラインでは複数の送信先カタログとスキーマはサポートされていません。複数の送信先カタログまたはスキーマに書き込む必要がある場合は、複数のゲートウェイと取り込み パイプラインのペアを作成します。

注記

必要な権限

パイプラインを作成するには、 Unrestricted cluster creation アクセス許可が必要です。アカウント管理者に問い合わせてください。

このタブでは、Databricks Asset Bundle を使用して取り込み パイプラインをデプロイする方法について説明します。バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「Databricksアセットバンドル」を参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    Bash
    databricks bundle init
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/sqlserver_pipeline.yml)。
    • データ取り込みの頻度を制御するワークフロー ファイル (resources/sqlserver.yml)。

    次に、 resources/sqlserver_pipeline.yml ファイルの例を示します。

    YAML
    variables:
    # Common variables used multiple places in the DAB definition.
    gateway_name:
    default: sqlserver-gateway
    dest_catalog:
    default: main
    dest_schema:
    default: ingest-destination-schema

    resources:
    pipelines:
    gateway:
    name: ${var.gateway_name}
    gateway_definition:
    connection_name: <sqlserver-connection>
    gateway_storage_catalog: main
    gateway_storage_schema: ${var.dest_schema}
    gateway_storage_name: ${var.gateway_name}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    pipeline_sqlserver:
    name: sqlserver-ingestion-pipeline
    ingestion_definition:
    ingestion_gateway_id: ${resources.pipelines.gateway.id}
    objects:
    # Modify this with your tables!
    - table:
    # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
    source_catalog: test
    source_schema: ingestion_demo
    source_table: lineitem
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    - schema:
    # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
    # table name will be the same as it is on the source.
    source_catalog: test
    source_schema: ingestion_whole_schema
    destination_catalog: ${var.dest_catalog}
    destination_schema: ${var.dest_schema}
    target: ${var.dest_schema}
    catalog: ${var.dest_catalog}
    channel: PREVIEW

    次に、 resources/sqlserver_job.yml ファイルの例を示します。

    YAML
    resources:
    jobs:
    sqlserver_dab_job:
    name: sqlserver_dab_job

    trigger:
    # Run this job every day, exactly one day from the last run
    # See https://docs.databricks.com/api/workspace/jobs/create#trigger
    periodic:
    interval: 1
    unit: DAYS

    email_notifications:
    on_failure:
    - <email-address>

    tasks:
    - task_key: refresh_pipeline
    pipeline_task:
    pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
  3. Databricks CLI を使用してパイプラインをデプロイします。

    Bash
    databricks bundle deploy

取り込み パイプラインのトリガー スケジュールを設定する

注記

トリガー モードは、取り込み パイプラインの実行でのみサポートされています。

パイプラインのスケジュールを作成するには、DLT パイプライン UI を使用して、パイプライン UI 画面の右上隅にあるボタンをクリックします。

UI は、指定されたスケジュールに従ってパイプラインを実行するジョブを自動的に作成します。ジョブは [ジョブ] タブに表示されます。

データ取り込みが成功したことを確認する

取り込みパイプライン UI のリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。

レプリケーションの検証

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 列構成アイコン ボタンをクリックして選択します。

パイプラインの開始、スケジュール設定、アラートの設定

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。

追加のリソース