メインコンテンツまでスキップ

Salesforce からデータを取り込む

この記事では、LakeFlow Connect を使用して Salesforce からデータを取り込み、Databricks に読み込む方法について説明します。結果として得られるインジェスト パイプラインは Unity Catalog によって制御され、サーバレス コンピュートと DLT によって駆動されます。

Salesforce インジェスト コネクタは、次のソースをサポートしています。

  • Salesforce Sales Cloud

始める前に

インジェスト パイプラインを作成するには、次の要件を満たす必要があります。

  • ワークスペースが Unity Catalog に対して有効になっています。

  • サーバレス コンピュートがワークスペースで有効になっています。 サーバレス コンピュートの有効化を参照してください。

  • 接続を作成する予定の場合: メタストアに対する CREATE CONNECTION 権限があります。

    既存の接続を使用する予定の場合: 接続オブジェクトに対する USE CONNECTION 権限または ALL PRIVILEGES があります。

  • ターゲット・カタログに対する USE CATALOG 権限があります。

  • 既存のスキーマに対する USE SCHEMA 権限と CREATE TABLE 権限、またはターゲット カタログに対する CREATE SCHEMA 権限を持っている。

Salesforce Sales Cloud から取り込むには、次のことをお勧めします。

  • Databricks がデータを取得するために使用できる Salesforce ユーザーを作成します。ユーザーが API アクセスと、取り込む予定のすべてのオブジェクトにアクセスできることを確認します。

Salesforce 接続を作成する

必要なアクセス許可: メタストア CREATE CONNECTION 。これを許可するには、メタストア管理者に連絡してください。

既存の接続を使用してインジェストパイプラインを作成する場合は、次のセクションに進んでください。接続には USE CONNECTION または ALL PRIVILEGES が必要です。

Salesforce 接続を作成するには、次の手順を実行します。

  1. Databricksワークスペースで、 カタログ > 外部ロケーション > 接続 > 接続の作成 をクリックします。

  2. 接続名 で、Salesforce 接続の一意の名前を入力します。

  3. 接続タイプSalesforce をクリックします。

  4. Salesforce サンドボックス アカウントから取り込む場合は、 サンドボックスであるtrueに設定します。

  5. Salesforce でログイン をクリックします。

    Salesforce ログイン

  6. Salesforce サンドボックスから取り込む場合は、 カスタムドメインを使用 をクリックします。 サンドボックスのURLを入力し、ログインに進みます。 Databricks では、Databricks インジェスト専用の Salesforce ユーザーとしてログインすることをお勧めします。

    カスタムドメインボタンを使用する

    サンドボックスのURLを入力

  7. 接続の作成 ページに戻ったら、 作成 をクリックします。

インジェスト パイプラインを作成する

必要な権限: 接続 USE CONNECTION または ALL PRIVILEGES

この手順では、インジェスト パイプラインを作成する方法について説明します。取り込まれた各テーブルは、明示的に名前を変更しない限り、デフォルトによって宛先に同じ名前 (ただしすべて小文字) のストリーミングテーブルに対応します。

  1. In the sidebar of the Databricks workspace, click Data Ingestion.

  2. On the Add data page, under Databricks connectors, click Salesforce.

    The Salesforce ingestion wizard opens.

  3. On the Pipeline page of the wizard, enter a unique name for the ingestion pipeline.

  4. In the Destination catalog dropdown, select a catalog. Ingested data and event logs will be written to this catalog.

  5. Select the Unity Catalog connection that stores the credentials required to access Salesforce data.

    If there are no Salesforce connections, click Create connection. You must have the CREATE CONNECTION privilege on the metastore.

  6. Click Create pipeline and continue.

  7. On the Source page, select the tables to ingest, and then click Next.

    If you select All tables, the Salesforce ingestion connector writes all existing and future tables in the source schema to Unity Catalog managed tables.

  8. On the Destination page, select the Unity Catalog catalog and schema to write to.

    If you don’t want to use an existing schema, click Create schema. You must have the USE CATALOG and CREATE SCHEMA privileges on the parent catalog.

  9. Click Save pipeline and continue.

  10. On the Settings page, click Create schedule. Set the frequency to refresh the destination tables.

  11. Optionally, set email notifications for pipeline operation success or failure.

  12. Click Save and run pipeline.

JSON パイプライン定義の例:

JSON
"ingestion_definition": {

"connection_name": "<connection-name>",

"objects": [

{

"table": {

"source_schema": "<source-schema>",

"source_table": "<source-table>",

"destination_catalog": "<destination-catalog>",

"destination_schema": "<destination-schema>",

"table_configuration": {

"include_columns": ["<column-a>", "<column-b>", "<column-c>"]

}

}

}

]

}

パイプラインの開始、スケジュール設定、アラートの設定

  1. パイプラインが作成されたら、 Databricks ワークスペースに再度アクセスし、[ パイプライン ] をクリックします。

    新しいパイプラインがパイプライン リストに表示されます。

  2. パイプラインの詳細を表示するには、パイプライン名をクリックします。

  3. パイプラインの詳細ページで、 スケジュール をクリックしてパイプラインをスケジュールできます。

  4. パイプラインに通知を設定するには、[ 設定 ] をクリックし、通知を追加します。

注記

パイプラインを実行すると、特定のテーブルに対して 2 つのソース ビューが表示される場合があります。1 つのビューには、数式フィールドのスナップショットが含まれています。もう一方のビューには、数式以外のフィールドの増分データ取得が含まれています。これらのビューは、宛先テーブルで結合されます。

例: 2 つの Salesforce オブジェクトを別々のスキーマに取り込む

このセクションのパイプライン定義の例では、2 つの Salesforce オブジェクトを別々のスキーマに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: AccountShare
destination_catalog: my_catalog_1 # Location of this table
destination_schema: my_schema_1 # Location of this table
- table:
source_schema: objects
source_table: AccountPartner
destination_catalog: my_catalog_2 # Location of this table
destination_schema: my_schema_2 # Location of this table

例: 1 つの Salesforce オブジェクトを 3 回取り込む

このセクションのパイプライン定義の例では、Salesforce オブジェクトを 3 つの異なる宛先テーブルに取り込みます。マルチデスティネーションパイプラインのサポートはAPIのみです。

オプションで、取り込むテーブルの名前を変更できます。パイプライン内のテーブルの名前を変更すると、そのテーブルは API のみのパイプラインになり、UI でパイプラインを編集できなくなります。

YAML
resources:
pipelines:
pipeline_sfdc:
name: salesforce_pipeline
catalog: my_catalog_1 # Location of the pipeline event log
schema: my_schema_1 # Location of the pipeline event log
ingestion_definition:
connection_name: <salesforce-connection>
objects:
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_1 # Location of first copy
destination_schema: my_schema_1 # Location of first copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of second copy
destination_schema: my_schema_2 # Location of second copy
- table:
source_schema: objects
source_table: Order
destination_catalog: my_catalog_2 # Location of third copy, renamed
destination_schema: my_schema_2 # Location of third copy, renamed
destination_table: order_duplicate # Table rename

追加のリソース