メインコンテンツまでスキップ

SQL Server からデータを取り込む

LakeFlow Connectを使用してSQL ServerからDatabricksにデータを取り込む方法を学習します。

SQL Server コネクタは、Azure SQL Database、Azure SQL Managed Instance、Amazon RDS SQL データベースをサポートします。これには、Azure 仮想マシン (VM) および Amazon EC2 上で実行される SQL Server が含まれます。このコネクタは、Azure ExpressRoute および AWS Direct Connect ネットワークを使用したオンプレミスの SQL Server もサポートします。

要件

  • インジェスト ゲートウェイとインジェスト パイプラインを作成するには、まず次の要件を満たす必要があります。

    • ワークスペースが Unity Catalog に対して有効になっています。

    • サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの要件を参照してください。

    • 接続を作成する場合:メタストアに対する権限はCREATE CONNECTIONです。Unity Catalogの「権限の管理」を参照してください。

      コネクタが UI ベースのパイプラインオーサリングをサポートしている場合は、このページの手順を完了することで、接続とパイプラインを同時に作成できます。ただし、API ベースのパイプラインオーサリングを使用する場合は、このページの手順を完了する前に、Catalog Explorer で接続を作成する必要があります。「管理された取り込みソースに接続する」を参照してください

    • 既存の接続を使用する予定の場合: 接続に対する USE CONNECTION 権限または ALL PRIVILEGES があります。

    • ターゲット・カタログに対する USE CATALOG 権限があります。

    • 既存のスキーマに対する USE SCHEMACREATE TABLECREATE VOLUME 権限、またはターゲットカタログに対する CREATE SCHEMA 権限を持っている。

    • プライマリ SQL Server インスタンスにアクセスできる。変更追跡機能とチェンジデータキャプチャ機能は、リードレプリカまたはセカンダリインスタンスではサポートされていません。

    • クラスターを作成するための無制限の権限、またはカスタム ポリシー (API のみ)。ゲートウェイのカスタム ポリシーは、次の要件を満たす必要があります。

      • ファミリー: ジョブ コンピュート

      • ポリシー ファミリを上書きします:

      {
      "cluster_type": {
      "type": "fixed",
      "value": "dlt"
      },
      "num_workers": {
      "type": "unlimited",
      "defaultValue": 1,
      "isOptional": true
      },
      "runtime_engine": {
      "type": "fixed",
      "value": "STANDARD",
      "hidden": true
      }
      }
      • Databricks では、ゲートウェイのパフォーマンスに影響を与えないため、インジェスト ゲートウェイに可能な限り最小のワーカー ノードを指定することをお勧めします。次のコンピュート ポリシーを使用すると、 Databricks ワークロードのニーズに合わせてインジェスト ゲートウェイをスケーリングできます。 ソース データベースから効率的かつパフォーマンスの高いデータ抽出を可能にするための最小要件は 8 コアです。
      Python
      {
      "driver_node_type_id": {
      "type": "fixed",
      "value": "n2-highmem-64"
      },
      "node_type_id": {
      "type": "fixed",
      "value": "n2-standard-4"
      }
      }

      詳細については、 情報 クラスターポリシーについては、「 コンピュートポリシーの選択」を参照してください。

  • SQL Serverから取り込むには、まず「 Databricksへの取り込み用にMicrosoft SQL Server構成する」の手順を完了する必要があります。

ゲートウェイと取り込みパイプラインを作成する

  1. Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。

  2. [ データの追加 ] ページの [Databricks コネクタ ] で、[ SQL Server ] をクリックします。

  3. インジェスト ウィザードの [ 接続] ページで、 Databricks へのインジェスト用に Microsoft SQL Server を構成するから SQL Server アクセス資格情報を保存する接続を選択します。メタストアにCREATE CONNECTION権限がある場合は、クリックして プラスアイコン。接続を作成してSQL Serverの認証詳細を使用して新しい接続を作成します。

  4. 次へ をクリックします。

  5. インジェスト設定 ページで、インジェスト パイプラインの一意の名前を入力します。このパイプラインは、ステージング場所から宛先にデータを移動します。

  6. イベント ログを書き込むカタログとスキーマを選択します。イベント ログには、監査ログ、データ品質チェック、パイプラインの進行状況、エラーが含まれます。カタログに対してUSE CATALOGCREATE SCHEMA権限を持っている場合は、クリックできます。 プラスアイコン。 新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。

  7. (オプション) すべてのテーブルの自動完全更新を オン に設定します。自動更新がオンになっている場合、パイプラインは、影響を受けるテーブルを完全に更新することによって、ログ クリーンアップ イベントや特定の種類のスキーマ進化などの問題を自動的に修正しようとします。 履歴の追跡が有効になっている場合、完全更新によってその履歴は消去されます。

  8. インジェストゲートウェイの一意の名前を入力します。ゲートウェイは、ソースから変更を抽出し、取り込みパイプラインがロードできるようにステージングするパイプラインです。

  9. ステージング場所 のカタログとスキーマを選択します。抽出されたデータをステージングするためのボリュームがこの場所に作成されます。カタログに対してUSE CATALOGCREATE SCHEMA権限を持っている場合は、クリックできます。 プラスアイコン。 新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。

  10. パイプラインの作成および続行 をクリックします。

  11. [ソース] ページで、取り込むテーブルを選択します。特定のテーブルを選択した場合は、テーブル設定を構成できます。

    a. (オプション) [設定] タブで、取り込まれたテーブルごとに 宛先名 を指定します。これは、オブジェクトを同じスキーマに複数回取り込むときに、宛先テーブルを区別するのに役立ちます。「宛先テーブルに名前を付ける」を参照してください。

    a. (オプション) デフォルトの 履歴追跡 設定を変更します。「履歴追跡を有効にする (SCD タイプ 2)」を参照してください。

  12. 「次へ」 をクリックし、 「保存して続行」 をクリックします。

  13. [宛先] ページで、データをロードするカタログとスキーマを選択します。カタログに対してUSE CATALOGCREATE SCHEMA権限を持っている場合は、クリックできます。 プラスアイコン。 新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。

  14. 保存して続行 をクリックします。

  15. データベース設定 ページで、 「検証」 をクリックして、ソースが Databricks 取り込み用に適切に構成されていることを確認します。不足している構成があれば返されます。ステップを解決するには、 「構成を完了する」 をクリックします。 次に 「次へ」 をクリックします。または、 「検証をスキップ」 をクリックします。

  16. (オプション) スケジュールと通知 ページで、 プラスアイコン。スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。

  17. (オプション)クリック プラスアイコン。通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。

データ取り込みが成功したことを確認する

パイプラインの詳細ページのリストビューには、データの取り込み時に処理されたレコードの数が表示されます。これらの番号は自動的に更新されます。

レプリケーションの検証

Upserted records列とDeleted records列は、デフォルトでは表示されません。これらを有効にするには、列の設定 列構成アイコン ボタンをクリックして選択します。

これらの例を使用してパイプラインを構成します。

パイプライン構成

次のパイプライン定義ファイル:

YAML
variables:
# Common variables used multiple places in the DAB definition.
gateway_name:
default: sqlserver-gateway
dest_catalog:
default: main
dest_schema:
default: ingest-destination-schema

resources:
pipelines:
gateway:
name: ${var.gateway_name}
gateway_definition:
connection_name: <sqlserver-connection>
gateway_storage_catalog: main
gateway_storage_schema: ${var.dest_schema}
gateway_storage_name: ${var.gateway_name}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}

pipeline_sqlserver:
name: sqlserver-ingestion-pipeline
ingestion_definition:
ingestion_gateway_id: ${resources.pipelines.gateway.id}
objects:
# Modify this with your tables!
- table:
# Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
source_catalog: test
source_schema: ingestion_demo
source_table: lineitem
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- schema:
# Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
# table name will be the same as it is on the source.
source_catalog: test
source_schema: ingestion_whole_schema
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
target: ${var.dest_schema}
catalog: ${var.dest_catalog}

バンドルジョブ定義ファイル

以下は、宣言型自動化バンドルで使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。

YAML
resources:
jobs:
sqlserver_dab_job:
name: sqlserver_dab_job

trigger:
periodic:
interval: 1
unit: DAYS

email_notifications:
on_failure:
- <email-address>

tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}

一般的なパターン

高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。

次のステップ

パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。

追加のリソース