Salesforce からデータを取り込む
プレビュー
Salesforce インジェスト コネクタは パブリック プレビュー段階です。
この記事では、 を使用して Salesforce からデータを取り込み、Databricks LakeFlow Connectに読み込む方法について説明します。結果として得られるインジェスト パイプラインは Unity Catalog によって制御され、サーバレス コンピュートと DLT によって駆動されます。
Salesforce インジェスト コネクタは、次のソースをサポートしています。
- Salesforce Sales Cloud
始める前に
インジェスト パイプラインを作成するには、次の要件を満たす必要があります。
-
ワークスペースが Unity Catalog に対して有効になっています。
-
サーバレス コンピュートがワークスペースで有効になっています。 Enable サーバレス コンピュートを参照してください。
-
接続を作成する予定の場合: メタストアに対する
CREATE CONNECTION
権限があります。既存の接続を使用する予定の場合: 接続オブジェクトに対する
USE CONNECTION
権限またはALL PRIVILEGES
があります。 -
ターゲット・カタログに対する
USE CATALOG
権限があります。 -
既存のスキーマに対する
USE SCHEMA
権限とCREATE TABLE
権限、またはターゲット カタログに対するCREATE SCHEMA
権限を持っている。
Salesforce Sales Cloud から取り込むには、次のことをお勧めします。
- (推奨)Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。 ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。
Salesforce 接続を作成する
必要なアクセス許可: メタストア CREATE CONNECTION
。 これを許可するには、メタストア管理者に連絡してください。
既存の接続を使用してインジェストパイプラインを作成する場合は、次のセクションに進んでください。 接続には USE CONNECTION
または ALL PRIVILEGES
が必要です。
Salesforceとの接続を作成するには、次の操作を行います。
-
Databricksワークスペースで、 カタログ > 外部ロケーション > 接続 > 接続の作成 を クリックします。
-
Connection name で、Salesforce 接続の一意の名前を入力します。
-
接続タイプ で Salesforce をクリックします。
-
Salesforce サンドボックス アカウントから取り込む場合は、 Is サンドボックス を
true
に設定します。 -
Salesforce でログイン をクリックします。
-
Salesforce サンドボックスから取り込む場合は、 カスタムドメインを使用 をクリックします。 サンドボックスのURLを入力し、ログインに進みます。 Databricks では、Databricks インジェスト専用の Salesforce ユーザーとしてログインすることをお勧めします。
-
接続の作成 ページに戻ったら、 作成 をクリックします。
インジェスト パイプラインを作成する
必要な権限: 接続 USE CONNECTION
または ALL PRIVILEGES
。
この手順では、インジェスト パイプラインを作成する方法について説明します。 取り込まれた各テーブルは、明示的に名前を変更しない限り、デフォルトで宛先の同じ名前 (ただしすべて小文字) のストリーミング テーブルに対応します。
- Databricks UI
- Databricks Asset Bundles
- Databricks CLI
-
In the sidebar of the Databricks workspace, click Data Ingestion.
-
On the Add data page, under Databricks connectors, click Salesforce.
The Salesforce ingestion wizard opens.
-
On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.
-
In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.
-
Select the Unity Catalog connection that stores the credentials required to access Salesforce data.
If there are no Salesforce connections, click Create connection. You must have the
CREATE CONNECTION
privilege on the metastore. -
Click Create pipeline and continue.
-
On the Source page, select the Salesforce tables to ingest into Databricks, and then click Next.
If you select All tables, the Salesforce ingestion connector writes all existing and future tables in the source schema to Unity Catalog managed tables.
-
On the Destination page, select the Unity Catalog catalog and schema to write to.
If you don’t want to use an existing schema, click Create schema. You must have the
USE CATALOG
andCREATE SCHEMA
privileges on the parent catalog. -
Click Save pipeline and continue.
-
On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.
-
Optionally, set email notifications for pipeline operation success or failure.
-
Click Save and run pipeline.
This tab describes how to deploy an ingestion pipeline using Databricks Asset Bundles. Bundles can contain YAML definitions of jobs and tasks, are managed using the Databricks CLI, and can be shared and run in different target workspaces (such as development, staging, and production). For more information, see Databricks Asset Bundles.
-
Create a new bundle using the Databricks CLI:
Bashdatabricks bundle init
-
Add two new resource files to the bundle:
- A pipeline definition file (
resources/sfdc_pipeline.yml
). - A workflow file that controls the frequency of data ingestion (
resources/sfdc_job.yml
).
The following is an example
resources/sfdc_pipeline.yml
file:YAMLvariables:
dest_catalog:
default: main
dest_schema:
default: ingest_destination_schema
# The main pipeline for sfdc_dab
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: ${var.dest_catalog}
target: ${var.dest_schema}
ingestion_definition:
connection_name: <salesforce-connection>
objects:
# An array of objects to ingest from Salesforce. This example
# ingests the AccountShare, AccountPartner, and ApexPage objects.
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
- table:
source_schema: objects
source_table: ApexPage
destination_catalog: ${var.dest_catalog}
destination_schema: ${var.dest_schema}
channel: 'preview'The following is an example
resources/sfdc_job.yml
file:YAMLresources:
jobs:
sfdc_dab_job:
name: sfdc_dab_job
trigger:
# Run this job every day, exactly one day from the last run
# See https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- <email-address>
tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.pipeline_sfdc.id} - A pipeline definition file (
-
Deploy the pipeline using the Databricks CLI:
Bashdatabricks bundle deploy
To create the pipeline:
databricks pipelines create --json "<pipeline-definition | json-file-path>"
To update the pipeline:
databricks pipelines update --json "<<pipeline-definition | json-file-path>"
To get the pipeline definition:
databricks pipelines get "<pipeline-id>"
To delete the pipeline:
databricks pipelines delete "<pipeline-id>"
For more information, you can run:
databricks pipelines --help
databricks pipelines <create|update|get|delete|...> --help
パイプラインの開始、スケジュール設定、アラートの設定
-
パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。
新しいパイプラインがパイプライン リストに表示されます。
-
パイプラインの詳細を表示するには、パイプライン名をクリックします。
-
パイプラインの詳細ページで、[ スケジュール] をクリックしてパイプラインをスケジュールできます。
-
パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。
パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。 1 つのビューには、数式フィールドのスナップショットが含まれています。 もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。 これらのビューは、宛先テーブルで結合されます。