Salesforce からデータを取り込む
LakeFlow Connectを使用して Salesforce からDatabricksにデータを取り込む方法を学習します。
要件
-
取り込みパイプラインを作成するには、まず次の要件を満たす必要があります。
-
ワークスペースでUnity Catalogが有効になっている必要があります。
-
サーバレス コンピュートは、ワークスペースで有効にする必要があります。 サーバレス コンピュートの要件を参照してください。
-
新しい接続を作成する場合:メタストアに対して
CREATE CONNECTION権限が必要です。Unity Catalogの「権限の管理」を参照してください。コネクタが UI ベースのパイプライン オーサリングをサポートしている場合、管理者はこのページのステップを完了することで、接続とパイプラインを同時に作成できます。 ただし、パイプラインを作成するユーザーが API ベースのパイプライン オーサリングを使用している場合、または管理者以外のユーザーである場合、管理者はまずカタログ エクスプローラーで接続を作成する必要があります。 「管理対象取り込みソースへの接続」を参照してください。
-
既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION権限またはALL PRIVILEGESが必要です。 -
ターゲット・カタログに対する
USE CATALOG権限が必要です。 -
既存のスキーマに対する
USE SCHEMA権限とCREATE TABLE権限、またはターゲット・カタログに対するCREATE SCHEMA権限が必要です。
-
-
Salesforce は接続されたアプリケーションに使用制限を適用します。初回認証を成功させるには、次の表の権限が必要です。これらの権限がない場合、Salesforce は接続をブロックし、管理者が Databricks 接続アプリケーションをインストールすることを要求します。
条件 | 必要な許可 |
|---|---|
API アクセス制御が有効になっています。 |
|
API アクセス制御が有効になっていません。 |
|
背景については、Salesforce ドキュメントの「接続アプリケーションの使用制限の変更に備える」を参照してください。
-
Salesforce から取り込むには、次のことをお勧めします。
-
Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。
インジェスト パイプラインを作成する
ベータ版
取り込み中に行をフィルタリングすることで、パフォーマンスを向上させ、データの重複を減らすことができます。「取り込む行を選択する」を参照してください。
- Databricks UI
- Databricks Asset Bundles
- Databricksワークスペースのサイドバーで、 データ取り込み をクリックします。
- データの追加 ページの Databricks コネクタ で、 Salesforce をクリックします。
- 取り込みウィザードの 「接続」 ページで、Salesforce アクセス資格情報を保存する接続を選択します。メタストアに
CREATE CONNECTION権限がある場合は、クリックして接続を作成して 、 Salesforceの認証詳細を使用して新しい接続を作成します。
- 次へ をクリックします。
- インジェスチョン設定 ページで、パイプラインの一意の名前を入力します。
- イベント ログを書き込むカタログとスキーマを選択します。カタログに対して
USE CATALOGとCREATE SCHEMA権限を持っている場合は、クリックできます。新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
- パイプラインの作成および続行 をクリックします。
- [ソース] ページで、取り込むテーブルを選択します。 [すべてのテーブル] を選択すると、ソース スキーマ内の既存および将来のすべてのテーブルが取り込まれます。
- 保存して続行 をクリックします。
- [宛先] ページで、データをロードするカタログとスキーマを選択します。(オプション)カタログに対して
USE CATALOGおよびCREATE SCHEMA権限を持っている場合は、新しいスキーマを作成するには、ドロップダウン メニューでスキーマを作成します。
- 保存して続行 をクリックします。
- (オプション) スケジュールと通知 ページで、
スケジュールを作成します 。宛先テーブルを更新する頻度を設定します。
- (オプション)クリック
通知を追加して パイプライン操作の成功または失敗に関する電子メール通知を設定し、 [保存してパイプラインを実行] をクリックします。
Declarative Automation Bundle を使用して、Salesforce パイプラインをコードとして管理します。 バンドルにはジョブとタスクの YAML 定義を含めることができ、 Databricks CLIを使用して管理でき、さまざまなターゲット ワークスペース (開発、ステージング、本番運用など) で共有して実行できます。 詳細については、 「宣言的オートメーション バンドルとは何ですか?」を参照してください。 。
-
Databricks CLI を使用して新しいバンドルを作成します。
Bashdatabricks bundle init -
バンドルに 2 つの新しいリソース ファイルを追加します。
- パイプライン定義ファイル (例:
resources/sfdc_pipeline.yml)。パイプライン.ingestion_定義を参照してください。 および例。 - データ取り込みの頻度を制御するジョブ定義ファイル (例:
resources/sfdc_job.yml)。
- パイプライン定義ファイル (例:
-
Databricks CLI を使用してパイプラインをデプロイします。
Bashdatabricks bundle deploy
例
これらの例を使用してパイプラインを構成します。
数式フィールドを段階的に取り込む
ベータ版
この機能はベータ版です。
デフォルトでは、数式フィールドはパイプラインの実行ごとに完全なスナップショットを使用して取り込まれます。ただし、パイプライン定義のconfigurationブロックでフラグpipelines.enableSalesforceFormulaFieldsMVComputation: "true"を設定することで、数式フィールドの増分取り込みを有効にすることができます。
次のパイプライン定義ファイルは、増分式フィールドの取り込みを有効にします。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog
schema: my_schema
configuration:
pipelines.enableSalesforceFormulaFieldsMVComputation: 'true'
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Account
destination_catalog: my_catalog
destination_schema: my_schema
詳細については、 「Salesforce 数式フィールドを段階的に取り込む」を参照してください。
2つのSalesforceオブジェクトを別々のスキーマに取り込む
次のパイプライン定義ファイルは、2 つの Salesforce オブジェクトを別々のスキーマに取り込みます。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table
1つのSalesforceオブジェクトを3回取り込む
次のパイプライン定義ファイルは、Salesforce オブジェクトを 3 つの異なる宛先テーブルに取り込みます。オプションで、取り込まれたテーブルに新しい名前を付けて、同じ宛先スキーマに複数のテーブルが取り込まれるときに区別することができます (重複はサポートされていません)。
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename
バンドルジョブ定義ファイル
以下は、宣言型自動化バンドルで使用するジョブ定義ファイルの例です。ジョブは毎日、最後の実行からちょうど 1 日後に実行されます。
resources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id}
一般的なパターン
高度なパイプライン構成については、 「管理された取り込みパイプラインの一般的なパターン」を参照してください。
次のステップ
パイプラインを開始、スケジュールし、アラートを設定します。一般的なパイプラインメンテナンスタスクを参照してください。