Salesforce からデータを取り込む

プレビュー

LakeFlow Connect はゲート付きパブリック プレビュー段階です。 プレビューに参加するには、Databricks アカウント チームにお問い合わせください。

この記事では、LakeFlow Connect を使用して Salesforce からデータを取り込み、Databricks にロードする方法について説明します。 結果として得られる取り込みパイプラインはUnity Catalogによって管理され、サーバレス コンピュートとDelta Live Tablesによって強化されます。

Salesforce インジェスト コネクタは、次のソースをサポートしています。

  • Salesforce Sales Cloud

始める前に

取り込みパイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースは Unity Catalog に対して有効になっています。

  • サーバレス コンピュートは、ノートブック、ワークフロー、およびDelta Live Tablesで有効です。 「サーバレス コンピュートを有効にする」を参照してください。

  • 接続を作成するには: メタストアに CREATE CONNECTION があります。

    既存の接続を使用するには: 接続オブジェクトに USE CONNECTION または ALL PRIVILEGES があります。

  • USE CATALOG ターゲットカタログ上。

  • USE SCHEMA ターゲットカタログ上の既存のスキーマまたはCREATE SCHEMACREATE TABLE

  • (推奨)Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。 ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。

Salesforce 接続を作成する

必要なアクセス許可: メタストア CREATE CONNECTION 。 これを許可するには、メタストア管理者に連絡してください。

既存の接続を使用してインジェストパイプラインを作成する場合は、次のセクションに進んでください。 接続には USE CONNECTION または ALL PRIVILEGES が必要です。

Salesforce 接続を作成するには、次の手順を実行します。

  1. Databricksワークスペースで、 [カタログ] > [外部ロケーション] > [接続] > [接続の作成] をクリックします。

  2. [ Connection name] で、Salesforce 接続の一意の名前を入力します。

  3. [接続タイプ] で [Salesforce] をクリックします。

  4. [認証タイプ] を [OAuth] に設定します。

  5. Salesforce サンドボックス アカウントから取り込んでいる場合は、サンドボックスをtrueに設定します。

  6. 「Salesforce でログイン」をクリックします。

    Salesforce ログイン
  7. Salesforce サンドボックスから取り込む場合は、[ カスタムドメインを使用] をクリックします。 サンドボックスの URL を入力し、ログインに進みます。 Databricks では、Databricks インジェスト専用の Salesforce ユーザーとしてログインすることをお勧めします。

    カスタムドメインボタンを使用する
    サンドボックスのURLを入力
  8. [接続の作成] ページに戻ったら、[作成] をクリックします。

インジェスト パイプラインを作成する

必要な権限: 接続 USE CONNECTION または ALL PRIVILEGES

このステップでは、インジェスト パイプラインの作成方法を説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、デフォルトで宛先内の同じ名前 (ただしすべて小文字) を持つストリーミング テーブルに対応します。

  1. Databricksワークスペースのサイドバーで、「データ取り込み」をクリックします。

  2. [ データの追加 ] ページの [Databricks コネクタ] で、[ Salesforce] をクリックします。

    Salesforce インジェスト ウィザードが開きます。

  3. ウィザードの [パイプライン ] ページで、インジェスト パイプラインの一意の名前を入力します。

  4. [宛先カタログ] ドロップダウンで、カタログを選択します。取り込まれたデータとイベント ログは、このカタログに書き込まれます。

  5. Salesforce データへのアクセスに必要な資格情報を保存する Unity Catalog 接続を選択します。

    Salesforce 接続がない場合は、[ 接続の作成] をクリックします。 メタストアに対する CREATE CONNECTION 特権が必要です。

  6. [パイプラインの作成] をクリックして続行します

  7. [ソース] ページで、Databricks に取り込む Salesforce テーブルを選択し、[次へ] をクリックします。

    スキーマを選択した場合、Salesforce インジェスト コネクタは、ソース スキーマ内のすべての既存および将来のテーブルをマネージド テーブル Unity Catalog 書き込みます。

  8. [宛先] ページで、書き込む Unity Catalog カタログとスキーマを選択します。

    既存のスキーマを使用しない場合は、[ スキーマの作成] をクリックします。 親カタログに対する USE CATALOG 権限と CREATE SCHEMA 権限が必要です。

  9. [パイプラインを保存] をクリックして続行します

  10. [設定] ページで、[スケジュールの作成] をクリックします。宛先テーブルを更新する頻度を設定します。

  11. オプションで、パイプライン操作の成功または失敗に関するEメール 通知を設定します。

  12. パイプラインの保存と実行」をクリックします。

このタブでは、Databricks アセット バンドル (DAB) を使用してインジェスト パイプラインをデプロイする方法について説明します。 バンドルには、ジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理され、異なるターゲット ワークスペース (開発、ステージング、本番運用など) で共有および実行できます。 詳細については、「アセットバンドルDatabricks」を参照してください。

  1. Databricks CLI を使用して新しいバンドルを作成します。

    databricks bundle init
    
  2. バンドルに 2 つの新しいリソース ファイルを追加します。

    • パイプライン定義ファイル (resources/sfdc_pipeline.yml)。

    • データ取り込みの頻度を制御するワークフロー ファイル (resources/sfdc_job.yml)。

    次に、 resources/sfdc_pipeline.yml ファイルの例を示します。

    variables:
      dest_catalog:
        default: main
      dest_schema:
        default: ingest_destination_schema
    
    # The main pipeline for sfdc_dab
    resources:
      pipelines:
        pipeline_sfdc:
          name: salesforce_pipeline
          ingestion_definition:
            connection_name: <salesforce-connection>
            objects:
              # An array of objects to ingest from Salesforce. This example
              # ingests the AccountShare, AccountPartner, and ApexPage objects.
              - table:
                  source_schema: objects
                  source_table: AccountShare
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: AccountPartner
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - table:
                  source_schema: objects
                  source_table: ApexPage
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          channel: "preview"
    

    次に、 resources/sfdc_job.yml ファイルの例を示します。

    resources:
      jobs:
        sfdc_dab_job:
          name: sfdc_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
    
  3. Databricks CLI を使用してパイプラインをデプロイします。

    databricks bundle deploy
    

パイプラインを作成するには:

databricks pipelines create --json "<pipeline-definition | json-file-path>"

パイプラインを更新するには:

databricks pipelines update --json "<<pipeline-definition | json-file-path>"

パイプライン定義を取得するには:

databricks pipelines get "<pipeline-id>"

パイプラインを削除するには:

databricks pipelines delete "<pipeline-id>"

詳細については、次のコマンドを実行してください。

databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help

パイプラインを開始、スケジュール、アラートを設定する

  1. パイプラインが作成されたら、Databricks ワークスペースに戻り、 [Delta Live Tables]をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 [開始]をクリックしてパイプラインを実行します。 [スケジュール]をクリックすると、パイプラインをスケジュールできます。

  4. パイプラインにアラートを設定するには、 [スケジュール]をクリックし、 [その他のオプション]をクリックして、通知を追加します。

  5. インジェストが完了したら、テーブルに対してクエリを実行できます。

注:

パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。 1 つのビューには、数式フィールドのスナップショットが含まれます。 もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。 これらのビューは、宛先テーブルで結合されます。