メインコンテンツまでスキップ

Salesforce からデータを取り込む

LakeFlow Connectを使用して Salesforce からDatabricksにデータを取り込む方法を学習します。

要件

  • 取り込みパイプラインを作成するには、まず次の要件を満たす必要があります。

    • ワークスペースでUnity Catalogが有効になっている必要があります。

    • サーバレス コンピュートは、ワークスペースで有効にする必要があります。 サーバレス コンピュートの要件を参照してください。

    • 新しい接続を作成する場合:メタストアに対してCREATE CONNECTION権限が必要です。Unity Catalogの「権限の管理」を参照してください。

      コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。

    • 既存の接続を使用する予定の場合: 接続オブジェクトに対する USE CONNECTION 権限または ALL PRIVILEGES が必要です。

    • ターゲット・カタログに対する USE CATALOG 権限が必要です。

    • 既存のスキーマに対する USE SCHEMA 権限と CREATE TABLE 権限、またはターゲット・カタログに対する CREATE SCHEMA 権限が必要です。

  • Salesforce は接続されたアプリケーションに使用制限を適用します。初回認証を成功させるには、次の表の権限が必要です。これらの権限がない場合、Salesforce は接続をブロックし、管理者が Databricks 接続アプリケーションをインストールすることを要求します。

条件

必要な許可

API アクセス制御が有効になっています。

Customize Application そしてModify All Dataまたは Manage Connected Apps

API アクセス制御が有効になっていません。

Approve Uninstalled Connected Apps

背景については、Salesforce ドキュメントの「接続アプリケーションの使用制限の変更に備える」を参照してください。

  • Salesforce から取り込むには、次のことをお勧めします。

  • Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。

インジェスト パイプラインを作成する

備考

ベータ版

取り込み中に行をフィルタリングすることで、パフォーマンスを向上させ、データの重複を減らすことができます。「取り込む行を選択する」を参照してください。

  1. Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
  2. データの追加 ページの Databricks コネクタ で、 Salesforce をクリックします。
  3. 取り込みウィザードの 「接続」 ページで、Salesforce アクセス資格情報を保存する接続を選択します。メタストアにCREATE CONNECTION権限がある場合は、クリックして プラスアイコン。接続を作成してSalesforceの認証詳細を使用して新しい接続を作成します。
  4. 次へ をクリックします。
  5. インジェスチョン設定 ページで、パイプラインの一意の名前を入力します。
  6. イベント ログを書き込むカタログとスキーマを選択します。カタログに対してUSE CATALOGCREATE SCHEMA権限を持っている場合は、クリックできます。 プラスアイコン。 新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
  7. パイプラインの作成および続行 をクリックします。
  8. [ソース] ページで、取り込むテーブルを選択します。 [すべてのテーブル] を選択すると、ソース スキーマ内の既存および将来のすべてのテーブルが取り込まれます。
  9. 保存して続行 をクリックします。
  10. [宛先] ページで、データをロードするカタログとスキーマを選択します。(オプション)カタログに対してUSE CATALOGおよびCREATE SCHEMA権限を持っている場合は、 プラスアイコン。 新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
  11. 保存して続行 をクリックします。
  12. (オプション) スケジュールと通知 ページで、 プラスアイコン。スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。
  13. (オプション)クリック プラスアイコン。通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。

これらの例を使用してパイプラインを構成します。

数式フィールドを段階的に取り込む

備考

ベータ版

この機能はベータ版です。

デフォルトでは、数式フィールドはパイプラインの実行ごとに完全なスナップショットを使用して取り込まれます。ただし、パイプライン定義のconfigurationブロックでフラグpipelines.enableSalesforceFormulaFieldsMVComputation: "true"を設定することで、数式フィールドの増分取り込みを有効にすることができます。

次のパイプライン定義ファイルは、増分式フィールドの取り込みを有効にします。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog
schema: my_schema
configuration:
pipelines.enableSalesforceFormulaFieldsMVComputation: 'true'
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Account
destination_catalog: my_catalog
destination_schema: my_schema

詳細については、 「Salesforce 数式フィールドを段階的に取り込む」を参照してください。

2つのSalesforceオブジェクトを別々のスキーマに取り込む

次のパイプライン定義ファイルは、2 つの Salesforce オブジェクトを別々のスキーマに取り込みます。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table

1つのSalesforceオブジェクトを3回取り込む

次のパイプライン定義ファイルは、Salesforce オブジェクトを 3 つの異なる宛先テーブルに取り込みます。オプションで、取り込まれたテーブルに新しい名前を付けて、同じ宛先スキーマに複数のテーブルが取り込まれるときに区別することができます (重複はサポートされていません)。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename

バンドルジョブ定義ファイル

以下は、宣言型自動化バンドルで使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。

YAML
resources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job

trigger:
periodic:
interval: 1
unit: DAYS

email_notifications:
on_failure:
- <email-address>

tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id}

一般的なパターン

高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。

次のステップ

パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。

追加のリソース